Imported Upstream version 3.3.3
[debian/amanda] / perl / Amanda / Taper / Worker.pm
1 # Copyright (c) 2009-2012 Zmanda Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 # for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19
20 =head1 NAME
21
22 Amanda::Taper::Worker
23
24 =head1 DESCRIPTION
25
26 This package is a component of the Amanda taper, and is not intended for use by
27 other scripts or applications.
28
29 This package interface between L<Amanda::Taper::Controller> and L<Amanda::Taper::Scribe>.
30
31 The worker use an L<Amanda::Taper::Scribe> object to execute the request
32 received from the L<Amanda::Taper::Controller>.
33
34 =cut
35
36 use lib '@amperldir@';
37 use strict;
38 use warnings;
39
40 package Amanda::Taper::Worker;
41
42 use Carp;
43 use POSIX qw( :errno_h );
44 use Amanda::Changer;
45 use Amanda::Config qw( :getconf config_dir_relative );
46 use Amanda::Debug qw( :logging );
47 use Amanda::Header;
48 use Amanda::Holding;
49 use Amanda::MainLoop qw( :GIOCondition );
50 use Amanda::MainLoop;
51 use Amanda::Taper::Protocol;
52 use Amanda::Taper::Scan;
53 use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
54 use Amanda::Logfile qw( :logtype_t log_add make_stats );
55 use Amanda::Xfer qw( :constants );
56 use Amanda::Util qw( quote_string );
57 use Amanda::Tapelist;
58 use File::Temp;
59
60 use base qw( Amanda::Taper::Scribe::Feedback );
61
62 our $tape_num = 0;
63
64 sub new {
65     my $class           = shift;
66     my $worker_name     = shift;
67     my $controller      = shift;
68     my $write_timestamp = shift;
69
70     my $self = bless {
71         state       => "init",
72         worker_name => $worker_name,
73         controller  => $controller,
74         scribe      => undef,
75         timestamp   => $write_timestamp,
76
77         # filled in when a write starts:
78         xfer => undef,
79         xfer_source => undef,
80         xfer_dest => undef,
81         handle => undef,
82         hostname => undef,
83         diskname => undef,
84         datestamp => undef,
85         level => undef,
86         header => undef,
87         doing_port_write => undef,
88         input_errors => [],
89
90         # periodic status updates
91         timer => undef,
92         status_filename => undef,
93         status_fh => undef,
94
95         # filled in after the header is available
96         header => undef,
97
98         # filled in when a new tape is started:
99         label => undef
100     }, $class;
101
102     my $scribe = Amanda::Taper::Scribe->new(
103         taperscan => $controller->{'taperscan'},
104         feedback => $self,
105         debug => $Amanda::Config::debug_taper,
106         eject_volume => getconf($CNF_EJECT_VOLUME));
107
108     $self->{'scribe'} = $scribe;
109     $self->{'scribe'}->start(write_timestamp => $write_timestamp,
110         finished_cb => sub { $self->_scribe_started_cb(@_); });
111
112     return $self;
113 }
114
115 # called when the scribe is fully started up and ready to go
116 sub _scribe_started_cb {
117     my $self = shift;
118     my ($err) = @_;
119
120     if ($err) {
121         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPE_ERROR,
122                 worker_name  => $self->{'worker_name'},
123                 message => "$err");
124         $self->{'state'} = "error";
125
126         # log the error (note that the message is intentionally not quoted)
127         log_add($L_ERROR, "no-tape error [$err]");
128
129     } else {
130         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPER_OK,
131                 worker_name => $self->{'worker_name'});
132         $self->{'state'} = "idle";
133     }
134 }
135
136
137 sub FILE_WRITE {
138     my $self = shift;
139     my ($msgtype, %params) = @_;
140     $self->_assert_in_state("idle") or return;
141
142     $self->{'doing_port_write'} = 0;
143
144     $self->setup_and_start_dump($msgtype,
145         dump_cb => sub { $self->dump_cb(@_); },
146         %params);
147 }
148
149 sub PORT_WRITE {
150     my $self = shift;
151     my ($msgtype, %params) = @_;
152
153     my $read_cb;
154
155     $self->_assert_in_state("idle") or return;
156
157     $self->{'doing_port_write'} = 1;
158
159     $self->setup_and_start_dump($msgtype,
160         dump_cb => sub { $self->dump_cb(@_); },
161         %params);
162 }
163
164 sub START_SCAN {
165     my $self = shift;
166     my ($msgtype, %params) = @_;
167
168     $self->{'scribe'}->start_scan(undef);
169 }
170
171 sub NEW_TAPE {
172     my $self = shift;
173     my ($msgtype, %params) = @_;
174
175     $self->_assert_in_state("writing") or return;
176
177     $self->{'perm_cb'}->(allow => 1);
178 }
179
180 sub NO_NEW_TAPE {
181     my $self = shift;
182     my ($msgtype, %params) = @_;
183
184     $self->_assert_in_state("writing") or return;
185
186     # log the error (note that the message is intentionally not quoted)
187     log_add($L_ERROR, "no-tape config [$params{reason}]");
188
189     $self->{'perm_cb'}->(cause => "config", message => $params{'reason'});
190 }
191
192 sub TAKE_SCRIBE_FROM {
193     my $self = shift;
194     my ($worker1, $msgtype, %params) = @_;
195
196     $self->_assert_in_state("writing") or return;
197     $worker1->_assert_in_state("idle") or return;
198
199     my $scribe = $self->{'scribe'};
200     my $scribe1 = $worker1->{'scribe'};
201     $self->{'scribe'} = $scribe1;
202     $worker1->{'scribe'} = $scribe;
203     # Change the callback to call the new scribe
204     $self->{'xfer'}->set_callback(sub {
205         my ($src, $msg, $xfer) = @_;
206         $scribe1->handle_xmsg($src, $msg, $xfer);
207
208         # if this is an error message that's not from the scribe's element, then
209         # we'll need to keep track of it ourselves
210         if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
211             push @{$self->{'input_errors'}}, $msg->{'message'};
212         }
213     });
214
215     $self->{'label'} = $worker1->{'label'};
216     $self->{'perm_cb'}->(scribe => $scribe1);
217     delete $worker1->{'scribe'};
218     $worker1->{'state'} = 'error';
219     $scribe->quit(finished_cb => sub {});
220  }
221
222 sub DONE {
223     my $self = shift;
224     my ($msgtype, %params) = @_;
225
226     if ($params{'handle'} ne $self->{'handle'}) {
227         # ignore message for previous handle
228         return;
229     }
230
231     if (defined $self->{'dumper_status'}) {
232         # ignore duplicate message
233         return
234     }
235
236     $self->{'dumper_status'} = "DONE";
237     $self->{'orig_kb'} = $params{'orig_kb'};
238     if (defined $self->{'result'}) {
239         $self->result_cb(undef);
240     }
241 }
242
243 sub FAILED {
244     my $self = shift;
245     my ($msgtype, %params) = @_;
246
247     if ($params{'handle'} ne $self->{'handle'}) {
248         # ignore message for previous handle
249         return;
250     }
251
252     if (defined $self->{'dumper_status'}) {
253         # ignore duplicate message
254         return
255     }
256
257     $self->{'dumper_status'} = "FAILED";
258     if (defined $self->{'header_xfer'}) {
259         $self->{'header_xfer'}->cancel();
260     } elsif (defined $self->{'result'}) {
261         $self->result_cb(undef);
262     } elsif (!defined $self->{'scribe'}->{'xdt'}) {
263         # ignore, the dump is already cancelled or not yet started.
264     } elsif (!defined $self->{'scribe'}->{'xfer'}) {
265         # ignore, the dump is already cancelled or not yet started.
266     } else { # Abort the dump
267         push @{$self->{'input_errors'}}, "dumper failed";
268         $self->{'scribe'}->cancel_dump(
269                 xfer => $self->{'scribe'}->{'xfer'},
270                 dump_cb => $self->{'dump_cb'});
271     }
272 }
273
274 sub CLOSE_VOLUME {
275     my $self = shift;
276     my ($msgtype, %params) = @_;
277
278     $self->_assert_in_state("idle") or return;
279
280     $self->{'scribe'}->close_volume();
281 }
282
283 sub result_cb {
284     my $self = shift;
285     my %params = %{$self->{'dump_params'}};
286     my $msgtype;
287     my $logtype;
288
289     if ($params{'result'} eq 'DONE') {
290         if ($self->{'dumper_status'} eq "DONE") {
291             $msgtype = Amanda::Taper::Protocol::DONE;
292             $logtype = $L_DONE;
293         } else {
294             $msgtype = Amanda::Taper::Protocol::DONE;
295             $logtype = $L_PARTIAL;
296         }
297     } elsif ($params{'result'} eq 'PARTIAL') {
298         $msgtype = Amanda::Taper::Protocol::PARTIAL;
299         $logtype = $L_PARTIAL;
300     } elsif ($params{'result'} eq 'FAILED') {
301         $msgtype = Amanda::Taper::Protocol::FAILED;
302         $logtype = $L_FAIL;
303     }
304
305     if ($self->{timer}) {
306         $self->{timer}->remove();
307         undef $self->{timer};
308         $self->{status_fh}->close();
309         undef $self->{status_fh};
310         unlink($self->{status_filename});
311         undef $self->{status_filename};
312     }
313
314     # note that we use total_duration here, which is the total time between
315     # start_dump and dump_cb, so the kps generated here is much less than the
316     # actual tape write speed.  Think of this as the *taper* speed, rather than
317     # the *tape* speed.
318     my $stats = make_stats($params{'size'}, $params{'total_duration'}, $self->{'orig_kb'});
319
320     # consider this a config-derived failure only if there were no errors
321     my $failure_from = (@{$params{'device_errors'}})?  'error' : 'config';
322
323     my @all_messages = (@{$params{'device_errors'}}, @{$self->{'input_errors'}});
324     push @all_messages, $params{'config_denial_message'} if $params{'config_denial_message'};
325     my $msg = quote_string(join("; ", @all_messages));
326
327     # write a DONE/PARTIAL/FAIL log line
328     if ($logtype == $L_FAIL) {
329         log_add($L_FAIL, sprintf("%s %s %s %s %s %s",
330             quote_string($self->{'hostname'}.""), # " is required for SWIG..
331             quote_string($self->{'diskname'}.""),
332             $self->{'datestamp'},
333             $self->{'level'},
334             $failure_from,
335             $msg));
336     } else {
337         log_add($logtype, sprintf("%s %s %s %s %s %s%s",
338             quote_string($self->{'hostname'}.""), # " is required for SWIG..
339             quote_string($self->{'diskname'}.""),
340             $self->{'datestamp'},
341             $params{'nparts'},
342             $self->{'level'},
343             $stats,
344             ($logtype == $L_PARTIAL and @all_messages)? " $msg" : ""));
345     }
346
347     # and send a message back to the driver
348     my %msg_params = (
349         handle => $self->{'handle'},
350     );
351
352     # reflect errors in our own elements in INPUT-ERROR or INPUT-GOOD
353     if (@{$self->{'input_errors'}}) {
354         $msg_params{'input'} = 'INPUT-ERROR';
355         $msg_params{'inputerr'} = join("; ", @{$self->{'input_errors'}});
356     } else {
357         $msg_params{'input'} = 'INPUT-GOOD';
358         $msg_params{'inputerr'} = '';
359     }
360
361     # and errors from the scribe in TAPE-ERROR or TAPE-GOOD
362     if (@{$params{'device_errors'}}) {
363         $msg_params{'taper'} = 'TAPE-ERROR';
364         $msg_params{'tapererr'} = join("; ", @{$params{'device_errors'}});
365     } elsif ($params{'config_denial_message'}) {
366         $msg_params{'taper'} = 'TAPE-CONFIG';
367         $msg_params{'tapererr'} = $params{'config_denial_message'};
368     } else {
369         $msg_params{'taper'} = 'TAPE-GOOD';
370         $msg_params{'tapererr'} = '';
371     }
372
373     if ($msgtype ne Amanda::Taper::Protocol::FAILED) {
374         $msg_params{'stats'} = $stats;
375     }
376
377     # reset things to 'idle' before sending the message
378     $self->{'xfer'} = undef;
379     $self->{'xfer_source'} = undef;
380     $self->{'xfer_dest'} = undef;
381     $self->{'handle'} = undef;
382     $self->{'header'} = undef;
383     $self->{'hostname'} = undef;
384     $self->{'diskname'} = undef;
385     $self->{'datestamp'} = undef;
386     $self->{'level'} = undef;
387     $self->{'header'} = undef;
388     $self->{'state'} = 'idle';
389     delete $self->{'result'};
390     delete $self->{'dumper_status'};
391     delete $self->{'dump_params'};
392
393     $self->{'controller'}->{'proto'}->send($msgtype, %msg_params);
394 }
395
396
397 ##
398 # Scribe feedback
399
400 sub request_volume_permission {
401     my $self = shift;
402     my %params = @_;
403
404     $self->{'perm_cb'} = $params{'perm_cb'};
405     # and send the request to the driver
406     $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::REQUEST_NEW_TAPE,
407         handle => $self->{'handle'});
408 }
409
410 sub scribe_notif_new_tape {
411     my $self = shift;
412     my %params = @_;
413
414     # TODO: if $params{error} is set, report it back to the driver
415     # (this will be a change to the protocol)
416     log_add($L_INFO, "$params{'error'}") if defined $params{'error'};
417
418     if ($params{'volume_label'}) {
419         $self->{'label'} = $params{'volume_label'};
420
421         # add to the trace log
422         log_add($L_START, sprintf("datestamp %s label %s tape %s",
423                 $self->{'timestamp'},
424                 quote_string($self->{'label'}),
425                 ++$tape_num));
426
427         # and the amdump log
428         print STDERR "taper: wrote label '$self->{label}'\n";
429
430         # and inform the driver
431         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NEW_TAPE,
432             handle => $self->{'handle'},
433             label => $params{'volume_label'});
434     } else {
435         $self->{'label'} = undef;
436
437         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NO_NEW_TAPE,
438             handle => $self->{'handle'});
439     }
440 }
441
442 sub scribe_notif_part_done {
443     my $self = shift;
444     my %params = @_;
445
446     $self->_assert_in_state("writing") or return;
447
448     my $stats = make_stats($params{'size'}, $params{'duration'}, $self->{'orig_kb'});
449
450     # log the part, using PART or PARTPARTIAL
451     my $logbase = sprintf("%s %s %s %s %s %s/%s %s %s",
452         quote_string($self->{'label'}),
453         $params{'fileno'},
454         quote_string($self->{'header'}->{'name'}.""), # " is required for SWIG..
455         quote_string($self->{'header'}->{'disk'}.""),
456         $self->{'datestamp'},
457         $params{'partnum'}, -1, # totalparts is always -1
458         $self->{'level'},
459         $stats);
460     if ($params{'successful'}) {
461         log_add($L_PART, $logbase);
462     } else {
463         log_add($L_PARTPARTIAL, "$logbase \"No space left on device\"");
464     }
465
466     # only send a PARTDONE if it was successful
467     if ($params{'successful'}) {
468         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PARTDONE,
469             handle => $self->{'handle'},
470             label => $self->{'label'},
471             fileno => $params{'fileno'},
472             stats => $stats,
473             kb => $params{'size'} / 1024);
474     }
475 }
476
477 sub scribe_notif_log_info {
478     my $self = shift;
479     my %params = @_;
480
481     debug("$params{'message'}");
482     log_add($L_INFO, "$params{'message'}");
483 }
484
485 ##
486 # Utilities
487
488 sub _assert_in_state {
489     my $self = shift;
490     my ($state) = @_;
491     if ($self->{'state'} eq $state) {
492         return 1;
493     } else {
494         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::BAD_COMMAND,
495             message => "command not appropriate in state '$self->{state}' : '$state'");
496         return 0;
497     }
498 }
499
500 sub create_status_file {
501     my $self = shift;
502
503     # create temporary file
504     ($self->{status_fh}, $self->{status_filename}) =
505         File::Temp::tempfile("taper_status_file_XXXXXX",
506                                 DIR => $Amanda::Paths::AMANDA_TMPDIR,
507                                 UNLINK => 1);
508
509     # tell amstatus about it by writing it to the dump log
510     my $qdisk = Amanda::Util::quote_string($self->{'diskname'});
511     my $qhost = Amanda::Util::quote_string($self->{'hostname'});
512     print STDERR "taper: status file $qhost $qdisk:" .
513                     "$self->{status_filename}\n";
514     print {$self->{status_fh}} "0";
515
516     # create timer callback, firing every 5s (=5000msec)
517     $self->{timer} = Amanda::MainLoop::timeout_source(5000);
518     $self->{timer}->set_callback(sub {
519         my $size = $self->{scribe}->get_bytes_written();
520         seek $self->{status_fh}, 0, 0;
521         print {$self->{status_fh}} $size, '     ';
522         $self->{status_fh}->flush();
523     });
524 }
525
526 sub send_port_and_get_header {
527     my $self = shift;
528     my ($finished_cb) = @_;
529
530     my ($xsrc, $xdst);
531     my $errmsg;
532
533     my $steps = define_steps
534         cb_ref => \$finished_cb;
535
536     step send_port => sub {
537         # get the ip:port pairs for the data connection from the data xfer source,
538         # which should be an Amanda::Xfer::Source::DirectTCPListen
539         my $data_addrs = $self->{'xfer_source'}->get_addrs();
540         $data_addrs = join ";", map { $_->[0] . ':' . $_->[1] } @$data_addrs;
541
542         # and set up an xfer for the header, too, using DirectTCP as an easy
543         # way to implement a listen/accept/read process.  Note that this does
544         # not enforce a maximum size, so this portion of Amanda at least can
545         # handle any size header
546         ($xsrc, $xdst) = (
547             Amanda::Xfer::Source::DirectTCPListen->new(),
548             Amanda::Xfer::Dest::Buffer->new(0));
549         $self->{'header_xfer'} = Amanda::Xfer->new([$xsrc, $xdst]);
550         $self->{'header_xfer'}->start($steps->{'header_xfer_xmsg_cb'});
551
552         my $header_addrs = $xsrc->get_addrs();
553         my $header_port = $header_addrs->[0][1];
554
555         # and tell the driver which ports we're listening on
556         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PORT,
557             worker_name => $self->{'worker_name'},
558             handle => $self->{'handle'},
559             port => $header_port,
560             ipports => $data_addrs);
561     };
562
563     step header_xfer_xmsg_cb => sub {
564         my ($src, $xmsg, $xfer) = @_;
565         if ($xmsg->{'type'} == $XMSG_INFO) {
566             info($xmsg->{'message'});
567         } elsif ($xmsg->{'type'} == $XMSG_ERROR) {
568             $errmsg = $xmsg->{'message'};
569         } elsif ($xmsg->{'type'} == $XMSG_DONE) {
570             if ($errmsg) {
571                 $finished_cb->($errmsg);
572             } else {
573                 $steps->{'got_header'}->();
574             }
575         }
576     };
577
578     step got_header => sub {
579         my $hdr_buf = $xdst->get();
580
581         # close stuff up
582         $self->{'header_xfer'} = $xsrc = $xdst = undef;
583
584         if (!defined $hdr_buf) {
585             return $finished_cb->("Got empty header");
586         }
587
588         # parse the header, finally!
589         $self->{'header'} = Amanda::Header->from_string($hdr_buf);
590
591         $finished_cb->(undef);
592     };
593 }
594
595 # do the work of starting a new xfer; this contains the code common to
596 # msg_PORT_WRITE and msg_FILE_WRITE.
597 sub setup_and_start_dump {
598     my $self = shift;
599     my ($msgtype, %params) = @_;
600     my %get_xfer_dest_args;
601
602     $self->{'dump_cb'} = $params{'dump_cb'};
603
604     # setting up the dump is a bit complex, due to the requirements of
605     # a directtcp port_write.  This function:
606     # 1. creates and starts a transfer (make_xfer)
607     # 2. gets the header
608     # 3. calls the scribe's start_dump method with the new header
609
610     my $steps = define_steps
611         cb_ref => \$params{'dump_cb'};
612
613     step setup => sub {
614         $self->{'handle'} = $params{'handle'};
615         $self->{'hostname'} = $params{'hostname'};
616         $self->{'diskname'} = $params{'diskname'};
617         $self->{'datestamp'} = $params{'datestamp'};
618         $self->{'level'} = $params{'level'};
619         $self->{'header'} = undef; # no header yet
620         $self->{'orig_kb'} = $params{'orig_kb'};
621         $self->{'input_errors'} = [];
622
623         if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE &&
624             (my $err = $self->{'scribe'}->check_data_path($params{'data_path'}))) {
625             return $params{'dump_cb'}->(
626                 result => "FAILED",
627                 device_errors => [ ['error', "$err"] ],
628                 size => 0,
629                 duration => 0.0,
630                 total_duration => 0);
631         }
632         $steps->{'process_args'}->();
633     };
634
635     step process_args => sub {
636         # extract the splitting-related parameters, stripping out empty strings
637         my %splitting_args = map {
638             (defined $params{$_} && $params{$_} ne '')? ($_, $params{$_}) : ()
639         } qw(
640             dle_tape_splitsize dle_split_diskbuffer dle_fallback_splitsize dle_allow_split
641             part_size part_cache_type part_cache_dir part_cache_max_size data_path
642         );
643
644         # convert numeric values to BigInts
645         for (qw(dle_tape_splitsize dle_fallback_splitsize part_size part_cache_max_size)) {
646             $splitting_args{$_} = Math::BigInt->new($splitting_args{$_})
647                 if (exists $splitting_args{$_});
648         }
649
650         my $device = $self->{'scribe'}->get_device();
651         if (!defined $device) {
652             confess "no device is available to create an xfer_dest";
653         }
654         $splitting_args{'leom_supported'} = $device->property_get("leom");
655         # and convert those to get_xfer_dest args
656         %get_xfer_dest_args = get_splitting_args_from_config(
657                 %splitting_args);
658         $get_xfer_dest_args{'max_memory'} = getconf($CNF_DEVICE_OUTPUT_BUFFER_SIZE);
659         if (!getconf_seen($CNF_DEVICE_OUTPUT_BUFFER_SIZE)) {
660             my $block_size4 = $device->block_size * 4;
661             if ($block_size4 > $get_xfer_dest_args{'max_memory'}) {
662                 $get_xfer_dest_args{'max_memory'} = $block_size4;
663             }
664         }
665         $device = undef;
666         $get_xfer_dest_args{'can_cache_inform'} = ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE and $get_xfer_dest_args{'allow_split'});
667
668         # if we're unable to fulfill the user's splitting needs, we can still give
669         # the dump a shot - but we'll warn them about the problem
670         if ($get_xfer_dest_args{'warning'}) {
671             log_add($L_WARNING, sprintf("%s:%s: %s",
672                     $params{'hostname'}, $params{'diskname'},
673                     $get_xfer_dest_args{'warning'}));
674             delete $get_xfer_dest_args{'warning'};
675         }
676
677         $steps->{'make_xfer'}->();
678     };
679
680     step make_xfer => sub {
681         $self->_assert_in_state("idle") or return;
682         $self->{'state'} = 'making_xfer';
683
684         $self->{'xfer_dest'} = $self->{'scribe'}->get_xfer_dest(%get_xfer_dest_args);
685
686         my $xfer_source;
687         if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE) {
688             $xfer_source = Amanda::Xfer::Source::DirectTCPListen->new();
689         } else {
690             $xfer_source = Amanda::Xfer::Source::Holding->new($params{'filename'});
691         }
692         $self->{'xfer_source'} = $xfer_source;
693
694         $self->{'xfer'} = Amanda::Xfer->new([$xfer_source, $self->{'xfer_dest'}]);
695         $self->{'xfer'}->start(sub {
696             my ($src, $msg, $xfer) = @_;
697             $self->{'scribe'}->handle_xmsg($src, $msg, $xfer);
698
699             # if this is an error message that's not from the scribe's element, then
700             # we'll need to keep track of it ourselves
701             if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
702                 push @{$self->{'input_errors'}}, $msg->{'message'};
703             }
704         });
705
706         # we've started the xfer now, but the destination won't actually write
707         # any data until we call start_dump.  And we'll need a header for that.
708
709         $steps->{'get_header'}->();
710     };
711
712     step get_header => sub {
713         $self->_assert_in_state("making_xfer") or return;
714         $self->{'state'} = 'getting_header';
715
716         if ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE) {
717             # getting the header is easy for FILE-WRITE..
718             my $hdr = $self->{'header'} = Amanda::Holding::get_header($params{'filename'});
719
720             if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
721                 confess("Could not read header from '$params{filename}'");
722             }
723
724             # stip out header fields we don't need
725             $hdr->{'cont_filename'} = '';
726
727             if ($self->{'header'}->{'is_partial'}) {
728                 $self->{'dumper_status'} = "FAILED";
729             } else {
730                 $self->{'dumper_status'} = "DONE";
731             }
732
733             $steps->{'start_dump'}->(undef);
734         } else {
735             # ..but quite a bit harder for PORT-WRITE; this method will send the
736             # proper PORT command, then read the header from the dumper and parse
737             # it, placing the result in $self->{'header'}
738             $self->send_port_and_get_header($steps->{'start_dump'});
739         }
740     };
741
742     step start_dump => sub {
743         my ($err) = @_;
744
745         $self->_assert_in_state("getting_header") or return;
746         $self->{'state'} = 'writing';
747
748         # abort if we already got a device_errors
749         if (@{$self->{'scribe'}->{'device_errors'}}) {
750             $self->{'scribe'}->abort_setup(dump_cb => $params{'dump_cb'});
751             return;
752         }
753         # if $err is set, cancel the dump, treating it as a input error
754         if ($err) {
755             push @{$self->{'input_errors'}}, $err;
756             return $self->{'scribe'}->cancel_dump(
757                 xfer => $self->{'xfer'},
758                 dump_cb => $params{'dump_cb'});
759         }
760
761         # sanity check the header..
762         my $hdr = $self->{'header'};
763         if ($hdr->{'dumplevel'} != $params{'level'}
764             or $hdr->{'name'} ne $params{'hostname'}
765             or $hdr->{'disk'} ne $params{'diskname'}
766             or $hdr->{'datestamp'} ne $params{'datestamp'}) {
767             confess("Header of dumpfile does not match command from driver");
768         }
769
770         # start producing status
771         $self->create_status_file();
772
773         # and fix it up before writing it
774         $hdr->{'totalparts'} = -1;
775         $hdr->{'type'} = $Amanda::Header::F_SPLIT_DUMPFILE;
776
777         $self->{'scribe'}->start_dump(
778             xfer => $self->{'xfer'},
779             dump_header => $hdr,
780             dump_cb => $params{'dump_cb'});
781     };
782 }
783
784 sub dump_cb {
785     my $self = shift;
786     my %params = @_;
787
788     $self->{'dump_params'} = \%params;
789     $self->{'result'} = $params{'result'};
790
791     # if we need to the dumper status (to differentiate a dropped network
792     # connection from a normal EOF) and have not done so yet, then send a
793     # DUMPER_STATUS message and re-call this method (dump_cb) with the result.
794     if ($params{'result'} eq "DONE"
795             and $self->{'doing_port_write'}
796             and !exists $self->{'dumper_status'}) {
797         my $controller = $self->{'controller'};
798         my $proto = $controller->{'proto'};
799         my $handle = $self->{'handle'};
800         $proto->send(Amanda::Taper::Protocol::DUMPER_STATUS,
801                 handle => "$handle");
802     } else {
803         $self->result_cb();
804     }
805 }
806
807 1;