Imported Upstream version 3.3.0
[debian/amanda] / perl / Amanda / Taper / Worker.pm
1 # Copyright (c) 2009, 2010 Zmanda Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
10 # for more details.
11 #
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
15 #
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 =head1 NAME
20
21 Amanda::Taper::Worker
22
23 =head1 DESCRIPTION
24
25 This package is a component of the Amanda taper, and is not intended for use by
26 other scripts or applications.
27
28 This package interface between L<Amanda::Taper::Controller> and L<Amanda::Taper::Scribe>.
29
30 The worker use an L<Amanda::Taper::Scribe> object to execute the request
31 received from the L<Amanda::Taper::Controller>.
32
33 =cut
34
35 use lib '@amperldir@';
36 use strict;
37 use warnings;
38
39 package Amanda::Taper::Worker;
40
41 use POSIX qw( :errno_h );
42 use Amanda::Changer;
43 use Amanda::Config qw( :getconf config_dir_relative );
44 use Amanda::Debug qw( :logging );
45 use Amanda::Header;
46 use Amanda::Holding;
47 use Amanda::MainLoop qw( :GIOCondition );
48 use Amanda::MainLoop;
49 use Amanda::Taper::Protocol;
50 use Amanda::Taper::Scan;
51 use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
52 use Amanda::Logfile qw( :logtype_t log_add make_stats );
53 use Amanda::Xfer qw( :constants );
54 use Amanda::Util qw( quote_string );
55 use Amanda::Tapelist;
56 use File::Temp;
57
58 use base qw( Amanda::Taper::Scribe::Feedback );
59
60 our $tape_num = 0;
61
62 sub new {
63     my $class           = shift;
64     my $worker_name     = shift;
65     my $controller      = shift;
66     my $write_timestamp = shift;
67
68     my $self = bless {
69         state       => "init",
70         worker_name => $worker_name,
71         controller  => $controller,
72         scribe      => undef,
73         timestamp   => $write_timestamp,
74
75         # filled in when a write starts:
76         xfer => undef,
77         xfer_source => undef,
78         xfer_dest => undef,
79         handle => undef,
80         hostname => undef,
81         diskname => undef,
82         datestamp => undef,
83         level => undef,
84         header => undef,
85         doing_port_write => undef,
86         input_errors => [],
87
88         # periodic status updates
89         timer => undef,
90         status_filename => undef,
91         status_fh => undef,
92
93         # filled in after the header is available
94         header => undef,
95
96         # filled in when a new tape is started:
97         label => undef
98     }, $class;
99
100     my $scribe = Amanda::Taper::Scribe->new(
101         taperscan => $controller->{'taperscan'},
102         feedback => $self,
103         debug => $Amanda::Config::debug_taper);
104
105     $self->{'scribe'} = $scribe;
106     $self->{'scribe'}->start(write_timestamp => $write_timestamp,
107         finished_cb => sub { $self->_scribe_started_cb(@_); });
108
109     return $self;
110 }
111
112 # called when the scribe is fully started up and ready to go
113 sub _scribe_started_cb {
114     my $self = shift;
115     my ($err) = @_;
116
117     if ($err) {
118         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPE_ERROR,
119                 worker_name  => $self->{'worker_name'},
120                 message => "$err");
121         $self->{'state'} = "error";
122
123         # log the error (note that the message is intentionally not quoted)
124         log_add($L_ERROR, "no-tape error [$err]");
125
126     } else {
127         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPER_OK,
128                 worker_name => $self->{'worker_name'});
129         $self->{'state'} = "idle";
130     }
131 }
132
133
134 sub FILE_WRITE {
135     my $self = shift;
136     my ($msgtype, %params) = @_;
137     $self->_assert_in_state("idle") or return;
138
139     $self->{'doing_port_write'} = 0;
140
141     $self->setup_and_start_dump($msgtype,
142         dump_cb => sub { $self->dump_cb(@_); },
143         %params);
144 }
145
146 sub PORT_WRITE {
147     my $self = shift;
148     my ($msgtype, %params) = @_;
149
150     my $read_cb;
151
152     $self->_assert_in_state("idle") or return;
153
154     $self->{'doing_port_write'} = 1;
155
156     $self->setup_and_start_dump($msgtype,
157         dump_cb => sub { $self->dump_cb(@_); },
158         %params);
159 }
160
161 sub START_SCAN {
162     my $self = shift;
163     my ($msgtype, %params) = @_;
164
165     $self->{'scribe'}->start_scan(undef);
166 }
167
168 sub NEW_TAPE {
169     my $self = shift;
170     my ($msgtype, %params) = @_;
171
172     $self->_assert_in_state("writing") or return;
173
174     $self->{'perm_cb'}->(allow => 1);
175 }
176
177 sub NO_NEW_TAPE {
178     my $self = shift;
179     my ($msgtype, %params) = @_;
180
181     $self->_assert_in_state("writing") or return;
182
183     # log the error (note that the message is intentionally not quoted)
184     log_add($L_ERROR, "no-tape config [$params{reason}]");
185
186     $self->{'perm_cb'}->(cause => "config", message => $params{'reason'});
187 }
188
189 sub TAKE_SCRIBE_FROM {
190     my $self = shift;
191     my ($worker1, $msgtype, %params) = @_;
192
193     $self->_assert_in_state("writing") or return;
194     $worker1->_assert_in_state("idle") or return;
195
196     my $scribe = $self->{'scribe'};
197     my $scribe1 = $worker1->{'scribe'};
198     $self->{'scribe'} = $scribe1;
199     $worker1->{'scribe'} = $scribe;
200     # Change the callback to call the new scribe
201     $self->{'xfer'}->set_callback(sub {
202         my ($src, $msg, $xfer) = @_;
203         $scribe1->handle_xmsg($src, $msg, $xfer);
204
205         # if this is an error message that's not from the scribe's element, then
206         # we'll need to keep track of it ourselves
207         if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
208             push @{$self->{'input_errors'}}, $msg->{'message'};
209         }
210     });
211
212     $self->{'label'} = $worker1->{'label'};
213     $self->{'perm_cb'}->(scribe => $scribe1);
214     delete $worker1->{'scribe'};
215     $worker1->{'state'} = 'error';
216     $scribe->quit(finished_cb => sub {});
217  }
218
219 sub DONE {
220     my $self = shift;
221     my ($msgtype, %params) = @_;
222
223     $self->_assert_in_state("writing") or return;
224     $self->{'dumper_status'} = "DONE";
225     $self->{'orig_kb'} = $params{'orig_kb'};
226     if (defined $self->{'result'}) {
227         $self->result_cb(undef);
228     }
229 }
230
231 sub FAILED {
232     my $self = shift;
233     my ($msgtype, %params) = @_;
234
235     $self->_assert_in_state("writing") or return;
236
237     $self->{'dumper_status'} = "FAILED";
238     if (defined $self->{'result'}) {
239         $self->result_cb(undef);
240     }
241 }
242
243 sub result_cb {
244     my $self = shift;
245     my %params = %{$self->{'dump_params'}};
246     my $msgtype;
247     my $logtype;
248
249     if ($params{'result'} eq 'DONE') {
250         if (!$self->{'doing_port_write'} or $self->{'dumper_status'} eq "DONE") {
251             $msgtype = Amanda::Taper::Protocol::DONE;
252             $logtype = $L_DONE;
253         } else {
254             $msgtype = Amanda::Taper::Protocol::DONE;
255             $logtype = $L_PARTIAL;
256         }
257     } elsif ($params{'result'} eq 'PARTIAL') {
258         $msgtype = Amanda::Taper::Protocol::PARTIAL;
259         $logtype = $L_PARTIAL;
260     } elsif ($params{'result'} eq 'FAILED') {
261         $msgtype = Amanda::Taper::Protocol::FAILED;
262         $logtype = $L_FAIL;
263     }
264
265     if ($self->{timer}) {
266         $self->{timer}->remove();
267         undef $self->{timer};
268         $self->{status_fh}->close();
269         undef $self->{status_fh};
270         unlink($self->{status_filename});
271         undef $self->{status_filename};
272     }
273
274     # note that we use total_duration here, which is the total time between
275     # start_dump and dump_cb, so the kps generated here is much less than the
276     # actual tape write speed.  Think of this as the *taper* speed, rather than
277     # the *tape* speed.
278     my $stats = make_stats($params{'size'}, $params{'total_duration'}, $self->{'orig_kb'});
279
280     # consider this a config-derived failure only if there were no errors
281     my $failure_from = (@{$params{'device_errors'}})?  'error' : 'config';
282
283     my @all_messages = (@{$params{'device_errors'}}, @{$self->{'input_errors'}});
284     push @all_messages, $params{'config_denial_message'} if $params{'config_denial_message'};
285     my $msg = quote_string(join("; ", @all_messages));
286
287     # write a DONE/PARTIAL/FAIL log line
288     if ($logtype == $L_FAIL) {
289         log_add($L_FAIL, sprintf("%s %s %s %s %s %s",
290             quote_string($self->{'hostname'}.""), # " is required for SWIG..
291             quote_string($self->{'diskname'}.""),
292             $self->{'datestamp'},
293             $self->{'level'},
294             $failure_from,
295             $msg));
296     } else {
297         log_add($logtype, sprintf("%s %s %s %s %s %s%s",
298             quote_string($self->{'hostname'}.""), # " is required for SWIG..
299             quote_string($self->{'diskname'}.""),
300             $self->{'datestamp'},
301             $params{'nparts'},
302             $self->{'level'},
303             $stats,
304             ($logtype == $L_PARTIAL and @all_messages)? " $msg" : ""));
305     }
306
307     # and send a message back to the driver
308     my %msg_params = (
309         handle => $self->{'handle'},
310     );
311
312     # reflect errors in our own elements in INPUT-ERROR or INPUT-GOOD
313     if (@{$self->{'input_errors'}}) {
314         $msg_params{'input'} = 'INPUT-ERROR';
315         $msg_params{'inputerr'} = join("; ", @{$self->{'input_errors'}});
316     } else {
317         $msg_params{'input'} = 'INPUT-GOOD';
318         $msg_params{'inputerr'} = '';
319     }
320
321     # and errors from the scribe in TAPE-ERROR or TAPE-GOOD
322     if (@{$params{'device_errors'}}) {
323         $msg_params{'taper'} = 'TAPE-ERROR';
324         $msg_params{'tapererr'} = join("; ", @{$params{'device_errors'}});
325     } elsif ($params{'config_denial_message'}) {
326         $msg_params{'taper'} = 'TAPE-ERROR';
327         $msg_params{'tapererr'} = $params{'config_denial_message'};
328     } else {
329         $msg_params{'taper'} = 'TAPE-GOOD';
330         $msg_params{'tapererr'} = '';
331     }
332
333     if ($msgtype ne Amanda::Taper::Protocol::FAILED) {
334         $msg_params{'stats'} = $stats;
335     }
336
337     # reset things to 'idle' before sending the message
338     $self->{'xfer'} = undef;
339     $self->{'xfer_source'} = undef;
340     $self->{'xfer_dest'} = undef;
341     $self->{'handle'} = undef;
342     $self->{'header'} = undef;
343     $self->{'hostname'} = undef;
344     $self->{'diskname'} = undef;
345     $self->{'datestamp'} = undef;
346     $self->{'level'} = undef;
347     $self->{'header'} = undef;
348     $self->{'state'} = 'idle';
349     delete $self->{'result'};
350     delete $self->{'dumper_status'};
351     delete $self->{'dump_params'};
352
353     $self->{'controller'}->{'proto'}->send($msgtype, %msg_params);
354 }
355
356
357 ##
358 # Scribe feedback
359
360 sub request_volume_permission {
361     my $self = shift;
362     my %params = @_;
363
364     $self->{'perm_cb'} = $params{'perm_cb'};
365     # and send the request to the driver
366     $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::REQUEST_NEW_TAPE,
367         handle => $self->{'handle'});
368 }
369
370 sub scribe_notif_new_tape {
371     my $self = shift;
372     my %params = @_;
373
374     # TODO: if $params{error} is set, report it back to the driver
375     # (this will be a change to the protocol)
376     log_add($L_INFO, $params{'error'}) if defined $params{'error'};
377
378     if ($params{'volume_label'}) {
379         $self->{'label'} = $params{'volume_label'};
380
381         # add to the trace log
382         log_add($L_START, sprintf("datestamp %s label %s tape %s",
383                 $self->{'timestamp'},
384                 quote_string($self->{'label'}),
385                 ++$tape_num));
386
387         # and the amdump log
388         print STDERR "taper: wrote label '$self->{label}'\n";
389
390         # and inform the driver
391         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NEW_TAPE,
392             handle => $self->{'handle'},
393             label => $params{'volume_label'});
394     } else {
395         $self->{'label'} = undef;
396
397         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NO_NEW_TAPE,
398             handle => $self->{'handle'});
399     }
400 }
401
402 sub scribe_notif_part_done {
403     my $self = shift;
404     my %params = @_;
405
406     $self->_assert_in_state("writing") or return;
407
408     my $stats = make_stats($params{'size'}, $params{'duration'}, $self->{'orig_kb'});
409
410     # log the part, using PART or PARTPARTIAL
411     my $logbase = sprintf("%s %s %s %s %s %s/%s %s %s",
412         quote_string($self->{'label'}),
413         $params{'fileno'},
414         quote_string($self->{'header'}->{'name'}.""), # " is required for SWIG..
415         quote_string($self->{'header'}->{'disk'}.""),
416         $self->{'datestamp'},
417         $params{'partnum'}, -1, # totalparts is always -1
418         $self->{'level'},
419         $stats);
420     if ($params{'successful'}) {
421         log_add($L_PART, $logbase);
422     } else {
423         log_add($L_PARTPARTIAL, "$logbase \"No space left on device\"");
424     }
425
426     # only send a PARTDONE if it was successful
427     if ($params{'successful'}) {
428         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PARTDONE,
429             handle => $self->{'handle'},
430             label => $self->{'label'},
431             fileno => $params{'fileno'},
432             stats => $stats,
433             kb => $params{'size'} / 1024);
434     }
435 }
436
437 sub scribe_notif_log_info {
438     my $self = shift;
439     my %params = @_;
440
441     log_add($L_INFO, $params{'message'});
442 }
443
444 ##
445 # Utilities
446
447 sub _assert_in_state {
448     my $self = shift;
449     my ($state) = @_;
450     if ($self->{'state'} eq $state) {
451         return 1;
452     } else {
453         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::BAD_COMMAND,
454             message => "command not appropriate in state '$self->{state}' : '$state'");
455         return 0;
456     }
457 }
458
459 sub create_status_file {
460     my $self = shift;
461
462     # create temporary file
463     ($self->{status_fh}, $self->{status_filename}) =
464         File::Temp::tempfile("taper_status_file_XXXXXX",
465                                 DIR => $Amanda::Paths::AMANDA_TMPDIR,
466                                 UNLINK => 1);
467
468     # tell amstatus about it by writing it to the dump log
469     my $qdisk = Amanda::Util::quote_string($self->{'diskname'});
470     my $qhost = Amanda::Util::quote_string($self->{'hostname'});
471     print STDERR "taper: status file $qhost $qdisk:" .
472                     "$self->{status_filename}\n";
473     print {$self->{status_fh}} "0";
474
475     # create timer callback, firing every 5s (=5000msec)
476     $self->{timer} = Amanda::MainLoop::timeout_source(5000);
477     $self->{timer}->set_callback(sub {
478         my $size = $self->{scribe}->get_bytes_written();
479         seek $self->{status_fh}, 0, 0;
480         print {$self->{status_fh}} $size;
481         $self->{status_fh}->flush();
482     });
483 }
484
485 sub send_port_and_get_header {
486     my $self = shift;
487     my ($finished_cb) = @_;
488
489     my $header_xfer;
490     my ($xsrc, $xdst);
491     my $errmsg;
492
493     my $steps = define_steps
494         cb_ref => \$finished_cb;
495
496     step send_port => sub {
497         # get the ip:port pairs for the data connection from the data xfer source,
498         # which should be an Amanda::Xfer::Source::DirectTCPListen
499         my $data_addrs = $self->{'xfer_source'}->get_addrs();
500         $data_addrs = join ";", map { $_->[0] . ':' . $_->[1] } @$data_addrs;
501
502         # and set up an xfer for the header, too, using DirectTCP as an easy
503         # way to implement a listen/accept/read process.  Note that this does
504         # not enforce a maximum size, so this portion of Amanda at least can
505         # handle any size header
506         ($xsrc, $xdst) = (
507             Amanda::Xfer::Source::DirectTCPListen->new(),
508             Amanda::Xfer::Dest::Buffer->new(0));
509         $header_xfer = Amanda::Xfer->new([$xsrc, $xdst]);
510         $header_xfer->start($steps->{'header_xfer_xmsg_cb'});
511
512         my $header_addrs = $xsrc->get_addrs();
513         my $header_port = $header_addrs->[0][1];
514
515         # and tell the driver which ports we're listening on
516         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PORT,
517             worker_name => $self->{'worker_name'},
518             handle => $self->{'handle'},
519             port => $header_port,
520             ipports => $data_addrs);
521     };
522
523     step header_xfer_xmsg_cb => sub {
524         my ($src, $xmsg, $xfer) = @_;
525         if ($xmsg->{'type'} == $XMSG_INFO) {
526             info($xmsg->{'message'});
527         } elsif ($xmsg->{'type'} == $XMSG_ERROR) {
528             $errmsg = $xmsg->{'messsage'};
529         } elsif ($xmsg->{'type'} == $XMSG_DONE) {
530             if ($errmsg) {
531                 $finished_cb->($errmsg);
532             } else {
533                 $steps->{'got_header'}->();
534             }
535         }
536     };
537
538     step got_header => sub {
539         my $hdr_buf = $xdst->get();
540
541         # close stuff up
542         $header_xfer = $xsrc = $xdst = undef;
543
544         if (!defined $hdr_buf) {
545             return $finished_cb->("Got empty header");
546         }
547
548         # parse the header, finally!
549         $self->{'header'} = Amanda::Header->from_string($hdr_buf);
550
551         $finished_cb->(undef);
552     };
553 }
554
555 # do the work of starting a new xfer; this contains the code common to
556 # msg_PORT_WRITE and msg_FILE_WRITE.
557 sub setup_and_start_dump {
558     my $self = shift;
559     my ($msgtype, %params) = @_;
560     my %get_xfer_dest_args;
561
562     # setting up the dump is a bit complex, due to the requirements of
563     # a directtcp port_write.  This function:
564     # 1. creates and starts a transfer (make_xfer)
565     # 2. gets the header
566     # 3. calls the scribe's start_dump method with the new header
567
568     my $steps = define_steps
569         cb_ref => \$params{'dump_cb'};
570
571     step setup => sub {
572         $self->{'handle'} = $params{'handle'};
573         $self->{'hostname'} = $params{'hostname'};
574         $self->{'diskname'} = $params{'diskname'};
575         $self->{'datestamp'} = $params{'datestamp'};
576         $self->{'level'} = $params{'level'};
577         $self->{'header'} = undef; # no header yet
578         $self->{'orig_kb'} = $params{'orig_kb'};
579         $self->{'input_errors'} = [];
580
581         if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE &&
582             (my $err = $self->{'scribe'}->check_data_path($params{'data_path'}))) {
583             return $params{'dump_cb'}->(
584                 result => "FAILED",
585                 device_errors => [ ['error', "$err"] ],
586                 size => 0,
587                 duration => 0.0,
588                 total_duration => 0);
589         }
590         $steps->{'process_args'}->();
591     };
592
593     step process_args => sub {
594         # extract the splitting-related parameters, stripping out empty strings
595         my %splitting_args = map {
596             ($params{$_} ne '')? ($_, $params{$_}) : ()
597         } qw(
598             dle_tape_splitsize dle_split_diskbuffer dle_fallback_splitsize dle_allow_split
599             part_size part_cache_type part_cache_dir part_cache_max_size
600         );
601
602         # convert numeric values to BigInts
603         for (qw(dle_tape_splitsize dle_fallback_splitsize part_size part_cache_max_size)) {
604             $splitting_args{$_} = Math::BigInt->new($splitting_args{$_})
605                 if (exists $splitting_args{$_});
606         }
607
608         my $device = $self->{'scribe'}->get_device();
609         if (!defined $device) {
610             die "no device is available to create an xfer_dest";
611         }
612         $splitting_args{'leom_supported'} = $device->property_get("leom");
613         # and convert those to get_xfer_dest args
614         %get_xfer_dest_args = get_splitting_args_from_config(
615                 %splitting_args);
616         $get_xfer_dest_args{'max_memory'} = getconf($CNF_DEVICE_OUTPUT_BUFFER_SIZE);
617         if (!getconf_seen($CNF_DEVICE_OUTPUT_BUFFER_SIZE)) {
618             my $block_size4 = $device->block_size * 4;
619             if ($block_size4 > $get_xfer_dest_args{'max_memory'}) {
620                 $get_xfer_dest_args{'max_memory'} = $block_size4;
621             }
622         }
623         $device = undef;
624         $get_xfer_dest_args{'can_cache_inform'} = ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE and $get_xfer_dest_args{'allow_split'});
625
626         # if we're unable to fulfill the user's splitting needs, we can still give
627         # the dump a shot - but we'll warn them about the problem
628         if ($get_xfer_dest_args{'warning'}) {
629             log_add($L_WARNING, sprintf("%s:%s: %s",
630                     $params{'hostname'}, $params{'diskname'},
631                     $get_xfer_dest_args{'warning'}));
632             delete $get_xfer_dest_args{'warning'};
633         }
634
635         $steps->{'make_xfer'}->();
636     };
637
638     step make_xfer => sub {
639         $self->_assert_in_state("idle") or return;
640         $self->{'state'} = 'making_xfer';
641
642         $self->{'xfer_dest'} = $self->{'scribe'}->get_xfer_dest(%get_xfer_dest_args);
643
644         my $xfer_source;
645         if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE) {
646             $xfer_source = Amanda::Xfer::Source::DirectTCPListen->new();
647         } else {
648             $xfer_source = Amanda::Xfer::Source::Holding->new($params{'filename'});
649         }
650         $self->{'xfer_source'} = $xfer_source;
651
652         $self->{'xfer'} = Amanda::Xfer->new([$xfer_source, $self->{'xfer_dest'}]);
653         $self->{'xfer'}->start(sub {
654             my ($src, $msg, $xfer) = @_;
655             $self->{'scribe'}->handle_xmsg($src, $msg, $xfer);
656
657             # if this is an error message that's not from the scribe's element, then
658             # we'll need to keep track of it ourselves
659             if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
660                 push @{$self->{'input_errors'}}, $msg->{'message'};
661             }
662         });
663
664         # we've started the xfer now, but the destination won't actually write
665         # any data until we call start_dump.  And we'll need a header for that.
666
667         $steps->{'get_header'}->();
668     };
669
670     step get_header => sub {
671         $self->_assert_in_state("making_xfer") or return;
672         $self->{'state'} = 'getting_header';
673
674         if ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE) {
675             # getting the header is easy for FILE-WRITE..
676             my $hdr = $self->{'header'} = Amanda::Holding::get_header($params{'filename'});
677
678             # stip out header fields we don't need
679             $hdr->{'cont_filename'} = '';
680
681             if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
682                 die("Could not read header from '$params{filename}'");
683             }
684             $steps->{'start_dump'}->(undef);
685         } else {
686             # ..but quite a bit harder for PORT-WRITE; this method will send the
687             # proper PORT command, then read the header from the dumper and parse
688             # it, placing the result in $self->{'header'}
689             $self->send_port_and_get_header($steps->{'start_dump'});
690         }
691     };
692
693     step start_dump => sub {
694         my ($err) = @_;
695
696         $self->_assert_in_state("getting_header") or return;
697         $self->{'state'} = 'writing';
698
699         # if $err is set, cancel the dump, treating it as a input error
700         if ($err) {
701             push @{$self->{'input_errors'}}, $err;
702             return $self->{'scribe'}->cancel_dump(
703                 xfer => $self->{'xfer'},
704                 dump_cb => $params{'dump_cb'});
705         }
706
707         # sanity check the header..
708         my $hdr = $self->{'header'};
709         if ($hdr->{'dumplevel'} != $params{'level'}
710             or $hdr->{'name'} ne $params{'hostname'}
711             or $hdr->{'disk'} ne $params{'diskname'}
712             or $hdr->{'datestamp'} ne $params{'datestamp'}) {
713             die("Header of dumpfile does not match command from driver");
714         }
715
716         # start producing status
717         $self->create_status_file();
718
719         # and fix it up before writing it
720         $hdr->{'totalparts'} = -1;
721         $hdr->{'type'} = $Amanda::Header::F_SPLIT_DUMPFILE;
722
723         $self->{'scribe'}->start_dump(
724             xfer => $self->{'xfer'},
725             dump_header => $hdr,
726             dump_cb => $params{'dump_cb'});
727     };
728 }
729
730 sub dump_cb {
731     my $self = shift;
732     my %params = @_;
733
734     $self->{'dump_params'} = \%params;
735     $self->{'result'} = $params{'result'};
736
737     # if we need to the dumper status (to differentiate a dropped network
738     # connection from a normal EOF) and have not done so yet, then send a
739     # DUMPER_STATUS message and re-call this method (dump_cb) with the result.
740     if ($params{'result'} eq "DONE"
741             and $self->{'doing_port_write'}
742             and !exists $self->{'dumper_status'}) {
743         my $controller = $self->{'controller'};
744         my $proto = $controller->{'proto'};
745         my $handle = $self->{'handle'};
746         $proto->send(Amanda::Taper::Protocol::DUMPER_STATUS,
747                 handle => "$handle");
748     } else {
749         $self->result_cb();
750     }
751 }
752
753 1;