c3178485cf767c273e3655b2d30a7359b623d50f
[debian/amanda] / server-src / taper.pl
1 #! @PERL@
2 # Copyright (c) 2009, 2010 Zmanda Inc.  All Rights Reserved.
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License version 2 as published
6 # by the Free Software Foundation.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 # for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19
20 use lib '@amperldir@';
21 use strict;
22 use warnings;
23
24 package main::Protocol;
25
26 use Amanda::IPC::LineProtocol;
27 use base "Amanda::IPC::LineProtocol";
28
29 use constant START_TAPER => message("START-TAPER",
30     format => [ qw( timestamp ) ],
31 );
32
33 use constant PORT_WRITE => message("PORT-WRITE",
34     format => [ qw( handle hostname diskname level datestamp splitsize
35                     split_diskbuffer fallback_splitsize ) ],
36 );
37
38 use constant FILE_WRITE => message("FILE-WRITE",
39     format => [ qw( handle filename hostname diskname level datestamp splitsize orig_kb) ],
40 );
41
42 use constant NEW_TAPE => message("NEW-TAPE",
43     format => {
44         in => [ qw( ) ],
45         out => [ qw( handle label ) ],
46     },
47 );
48
49 use constant NO_NEW_TAPE => message("NO-NEW-TAPE",
50     format => {
51         in => [ qw( reason ) ],
52         out => [ qw( handle ) ],
53     }
54 );
55
56 use constant FAILED => message("FAILED",
57     format => {
58         in => [ qw( handle ) ],
59         out => [ qw( handle input taper inputerr tapererr ) ],
60     },
61 );
62
63 use constant DONE => message("DONE",
64     format => {
65         in => [ qw( handle orig_kb ) ],
66         out => [ qw( handle input taper stats inputerr tapererr ) ],
67     },
68 );
69
70 use constant QUIT => message("QUIT",
71     on_eof => 1,
72 );
73
74 use constant TAPER_OK => message("TAPER-OK",
75 );
76
77 use constant TAPE_ERROR => message("TAPE-ERROR",
78     format => [ qw( handle message ) ],
79 );
80
81 use constant PARTIAL => message("PARTIAL",
82     format => [ qw( handle input taper stats inputerr tapererr ) ],
83 );
84
85 use constant PARTDONE => message("PARTDONE",
86     format => [ qw( handle label fileno kb stats ) ],
87 );
88
89 use constant REQUEST_NEW_TAPE => message("REQUEST-NEW-TAPE",
90     format => [ qw( handle ) ],
91 );
92
93 use constant PORT => message("PORT",
94     format => [ qw( port ipports ) ],
95 );
96
97 use constant BAD_COMMAND => message("BAD-COMMAND",
98     format => [ qw( message ) ],
99 );
100
101 use constant DUMPER_STATUS => message("DUMPER-STATUS",
102     format => [ qw( handle ) ],
103 );
104
105 \f
106 package main::Controller;
107
108 use POSIX qw( :errno_h );
109 use Amanda::Changer;
110 use Amanda::Config qw( :getconf config_dir_relative );
111 use Amanda::Header;
112 use Amanda::Holding;
113 use Amanda::MainLoop qw( :GIOCondition );
114 use Amanda::MainLoop;
115 use Amanda::Taper::Scan;
116 use Amanda::Taper::Scribe;
117 use Amanda::Logfile qw( :logtype_t log_add );
118 use Amanda::Xfer qw( :constants );
119 use Amanda::Util qw( quote_string );
120 use Amanda::Tapelist;
121 use File::Temp;
122
123 use base qw( Amanda::Taper::Scribe::Feedback );
124
125 sub new {
126     my $class = shift;
127
128     my $self = bless {
129         state => "init",
130
131         # filled in at start
132         proto => undef,
133         scribe => undef,
134         tape_num => 0,
135
136         # filled in when a write starts:
137         xfer => undef,
138         xfer_source => undef,
139         handle => undef,
140         hostname => undef,
141         diskname => undef,
142         datestamp => undef,
143         level => undef,
144         header => undef,
145         last_partnum => -1,
146         doing_port_write => undef,
147         input_errors => [],
148
149         # periodic status updates
150         timer => undef,
151         status_filename => undef,
152         status_fh => undef,
153
154         # filled in after the header is available
155         header => undef,
156
157         # filled in when a new tape is started:
158         label => undef,
159     }, $class;
160     return $self;
161 }
162
163 # The feedback object mediates between messages from the driver and the ongoing
164 # action with the taper.  This is made a little bit complicated because the
165 # driver conversation is fairly contextual, with some responses answering
166 # "questions" asked earlier.  This is modeled with the following taper
167 # "states":
168 #
169 # init:
170 #   waiting for START-TAPER command
171 # starting:
172 #   warming up devices; TAPER-OK not sent yet
173 # idle:
174 #   not currently dumping anything
175 # making_xfer:
176 #   setting up a transfer for a new dump
177 # getting_header:
178 #   getting the header before beginning a new dump
179 # writing:
180 #   in the middle of writing a file (self->{'handle'} set)
181 # error:
182 #   a fatal error has occurred, so this object won't do anything
183
184 sub start {
185     my $self = shift;
186
187     $self->_assert_in_state("init") or return;
188
189     my $message_cb = make_cb(message_cb => sub {
190         my ($msgtype, %params) = @_;
191         my $msg;
192         if (defined $msgtype) {
193             $msg = "unhandled command '$msgtype'";
194         } else {
195             $msg = $params{'error'};
196         }
197         log_add($L_ERROR, $msg);
198         print STDERR "$msg\n";
199         $self->{'proto'}->send(main::Protocol::BAD_COMMAND,
200             message => $msg);
201     });
202     $self->{'proto'} = main::Protocol->new(
203         rx_fh => *STDIN,
204         tx_fh => *STDOUT,
205         message_cb => $message_cb,
206         message_obj => $self,
207         debug => $Amanda::Config::debug_taper?'driver/taper':'',
208     );
209
210     my $changer = Amanda::Changer->new();
211     if ($changer->isa("Amanda::Changer::Error")) {
212         # send a TAPE_ERROR right away
213         $self->{'proto'}->send(main::Protocol::TAPE_ERROR,
214                 handle => '99-9999', # fake handle
215                 message => "$changer");
216
217         # log the error (note that the message is intentionally not quoted)
218         log_add($L_ERROR, "no-tape [$changer]");
219
220         # wait for it to be transmitted, then exit
221         $self->{'proto'}->stop(finished_cb => sub {
222             Amanda::MainLoop::quit();
223         });
224
225         # don't finish start()ing
226         return;
227     }
228
229     my $taperscan = Amanda::Taper::Scan->new(changer => $changer);
230     $self->{'scribe'} = Amanda::Taper::Scribe->new(
231         taperscan => $taperscan,
232         feedback => $self,
233         debug => $Amanda::Config::debug_taper);
234 }
235
236 # called when the scribe is fully started up and ready to go
237 sub _scribe_started_cb {
238     my $self = shift;
239     my ($err) = @_;
240
241     if ($err) {
242         $self->{'proto'}->send(main::Protocol::TAPE_ERROR,
243                 handle => '99-9999', # fake handle
244                 message => "$err");
245         $self->{'state'} = "error";
246
247         # log the error (note that the message is intentionally not quoted)
248         log_add($L_ERROR, "no-tape [$err]");
249
250     } else {
251         $self->{'proto'}->send(main::Protocol::TAPER_OK);
252         $self->{'state'} = "idle";
253     }
254 }
255
256 sub quit {
257     my $self = shift;
258     my %params = @_;
259     my @errors = ();
260
261     my $steps = define_steps
262         cb_ref => \$params{'finished_cb'};
263
264     step quit_scribe => sub {
265         $self->{'scribe'}->quit(finished_cb => sub {
266             my ($err) = @_;
267             push @errors, $err if ($err);
268
269             $steps->{'stop_proto'}->();
270         });
271     };
272
273     step stop_proto => sub {
274         $self->{'proto'}->stop(finished_cb => sub {
275             my ($err) = @_;
276             push @errors, $err if ($err);
277
278             $steps->{'done'}->();
279         });
280     };
281
282     step done => sub {
283         if (@errors) {
284             $params{'finished_cb'}->(join("; ", @errors));
285         } else {
286             $params{'finished_cb'}->();
287         }
288     };
289 }
290
291 ##
292 # Scribe feedback
293
294 sub request_volume_permission {
295     my $self = shift;
296     my %params = @_;
297
298     # set up callbacks from when we hear back from the driver
299     my $new_tape_cb = make_cb(new_tape_cb => sub {
300         my ($msgtype, %msg_params) = @_;
301         $params{'perm_cb'}->(undef);
302     });
303     $self->{'proto'}->set_message_cb(main::Protocol::NEW_TAPE,
304         $new_tape_cb);
305
306     my $no_new_tape_cb = make_cb(no_new_tape_cb => sub {
307         my ($msgtype, %msg_params) = @_;
308
309         # log the error (note that the message is intentionally not quoted)
310         log_add($L_ERROR, "no-tape [CONFIG:$msg_params{reason}]");
311
312         $params{'perm_cb'}->("CONFIG:$msg_params{'reason'}");
313     });
314     $self->{'proto'}->set_message_cb(main::Protocol::NO_NEW_TAPE,
315         $no_new_tape_cb);
316
317     # and send the request to the driver
318     $self->{'proto'}->send(main::Protocol::REQUEST_NEW_TAPE,
319         handle => $self->{'handle'});
320 }
321
322 sub notif_new_tape {
323     my $self = shift;
324     my %params = @_;
325
326     # TODO: if $params{error} is set, report it back to the driver
327     # (this will be a change to the protocol)
328     if ($params{'volume_label'}) {
329         $self->{'label'} = $params{'volume_label'};
330
331         # register in the tapelist
332         my $tl_file = config_dir_relative(getconf($CNF_TAPELIST));
333         my $tl = Amanda::Tapelist::read_tapelist($tl_file);
334         my $tle = $tl->lookup_tapelabel($params{'volume_label'});
335         $tl->remove_tapelabel($params{'volume_label'});
336         $tl->add_tapelabel($self->{'timestamp'}, $params{'volume_label'},
337                 $tle? $tle->{'comment'} : undef);
338         $tl->write($tl_file);
339
340         # add to the trace log
341         log_add($L_START, sprintf("datestamp %s label %s tape %s",
342                 $self->{'timestamp'},
343                 quote_string($self->{'label'}),
344                 ++$self->{'tape_num'}));
345
346         # and the amdump log
347         print STDERR "taper: wrote label `$self->{label}'\n";
348
349         # and inform the driver
350         $self->{'proto'}->send(main::Protocol::NEW_TAPE,
351             handle => $self->{'handle'},
352             label => $params{'volume_label'});
353     } else {
354         $self->{'label'} = undef;
355
356         $self->{'proto'}->send(main::Protocol::NO_NEW_TAPE,
357             handle => $self->{'handle'});
358     }
359 }
360
361 sub notif_part_done {
362     my $self = shift;
363     my %params = @_;
364
365     $self->_assert_in_state("writing") or return;
366
367     $self->{'last_partnum'} = $params{'partnum'};
368
369     my $stats = $self->make_stats($params{'size'}, $params{'duration'}, $self->{'orig_kb'});
370
371     # log the part, using PART or PARTPARTIAL
372     my $logbase = sprintf("%s %s %s %s %s %s/%s %s %s",
373         quote_string($self->{'label'}),
374         $params{'fileno'},
375         quote_string($self->{'header'}->{'name'}.""), # " is required for SWIG..
376         quote_string($self->{'header'}->{'disk'}.""),
377         $self->{'datestamp'},
378         $params{'partnum'}, -1, # totalparts is always -1
379         $self->{'level'},
380         $stats);
381     if ($params{'successful'}) {
382         log_add($L_PART, $logbase);
383     } else {
384         log_add($L_PARTPARTIAL, "$logbase \"No space left on device\"");
385     }
386
387     # only send a PARTDONE if it was successful
388     if ($params{'successful'}) {
389         $self->{'proto'}->send(main::Protocol::PARTDONE,
390             handle => $self->{'handle'},
391             label => $self->{'label'},
392             fileno => $params{'fileno'},
393             stats => $stats,
394             kb => $params{'size'} / 1024);
395     }
396 }
397
398 sub notif_log_info {
399     my $self = shift;
400     my %params = @_;
401
402     log_add($L_INFO, $params{'message'});
403 }
404
405 ##
406 # Driver commands
407
408 sub msg_START_TAPER {
409     my $self = shift;
410     my ($msgtype, %params) = @_;
411
412     $self->_assert_in_state("init") or return;
413
414     $self->{'state'} = "starting";
415     $self->{'scribe'}->start(dump_timestamp => $params{'timestamp'},
416         finished_cb => sub { $self->_scribe_started_cb(@_); });
417     $self->{'timestamp'} = $params{'timestamp'};
418 }
419
420 # defer both PORT_ and FILE_WRITE to a common method
421 sub msg_FILE_WRITE {
422     my $self = shift;
423     my ($msgtype, %params) = @_;
424
425     $self->_assert_in_state("idle") or return;
426
427     $self->{'doing_port_write'} = 0;
428
429     $self->setup_and_start_dump($msgtype,
430         dump_cb => sub { $self->dump_cb(@_); },
431         %params);
432 }
433
434 sub msg_PORT_WRITE {
435     my $self = shift;
436     my ($msgtype, %params) = @_;
437     my $read_cb;
438
439     $self->_assert_in_state("idle") or return;
440
441     $self->{'doing_port_write'} = 1;
442
443     $self->setup_and_start_dump($msgtype,
444         dump_cb => sub { $self->dump_cb(@_); },
445         %params);
446 }
447
448 sub msg_QUIT {
449     my $self = shift;
450     my ($msgtype, %params) = @_;
451     my $read_cb;
452
453     # because the driver hangs up on us immediately after sending QUIT,
454     # and EOF also means QUIT, we tend to get this command repeatedly.
455     # So check to make sure this is only called once
456     return if $self->{'quitting'};
457     $self->{'quitting'} = 1;
458
459     my $finished_cb = make_cb(finished_cb => sub {
460         Amanda::MainLoop::quit();
461     });
462     $self->quit(finished_cb => $finished_cb);
463 };
464
465 ##
466 # Utilities
467
468 sub _assert_in_state {
469     my $self = shift;
470     my ($state) = @_;
471     if ($self->{'state'} eq $state) {
472         return 1;
473     } else {
474         $self->{'proto'}->send(main::Protocol::BAD_COMMAND,
475             message => "command not appropriate in state '$self->{state}'");
476         return 0;
477     }
478 }
479
480 # Make up the [sec .. kb .. kps ..] section of the result messages
481 sub make_stats {
482     my $self = shift;
483     my ($size, $duration, $orig_kb) = @_;
484
485     $duration = 0.1 if $duration == 0;  # prevent division by zero
486     my $kb = $size/1024;
487     my $kps = "$kb.0"/$duration; # Perlish cast from BigInt to float
488
489     if (defined $orig_kb) {
490         return sprintf("[sec %f kb %d kps %f orig-kb %d]", $duration, $kb, $kps, $orig_kb);
491     } else {
492         return sprintf("[sec %f kb %d kps %f]", $duration, $kb, $kps);
493     }
494 }
495
496 sub create_status_file {
497     my $self = shift;
498
499     # create temporary file
500     ($self->{status_fh}, $self->{status_filename}) =
501         File::Temp::tempfile("taper_status_file_XXXXXX",
502                                 DIR => $Amanda::Paths::AMANDA_TMPDIR,
503                                 UNLINK => 1);
504
505     # tell amstatus about it by writing it to the dump log
506     my $qdisk = Amanda::Util::quote_string($self->{'diskname'});
507     my $qhost = Amanda::Util::quote_string($self->{'hostname'});
508     print STDERR "taper: status file $qhost $qdisk:" .
509                     "$self->{status_filename}\n";
510     print {$self->{status_fh}} "0";
511
512     # create timer callback, firing every 5s (=5000msec)
513     $self->{timer} = Amanda::MainLoop::timeout_source(5000);
514     $self->{timer}->set_callback(sub {
515         my $size = $self->{scribe}->get_bytes_written();
516         seek $self->{status_fh}, 0, 0;
517         print {$self->{status_fh}} $size;
518         $self->{status_fh}->flush();
519     });
520 }
521
522 # utility function for setup_and_start_dump, returning keyword args
523 # for $scribe->get_xfer_dest
524 sub get_splitting_config {
525     my $self = shift;
526     my ($msgtype, %params) = @_;
527     my %get_xfer_dest_args;
528
529     my $max_memory;
530     if (getconf_seen($CNF_DEVICE_OUTPUT_BUFFER_SIZE)) {
531         $max_memory = getconf($CNF_DEVICE_OUTPUT_BUFFER_SIZE);
532     } elsif (getconf_seen($CNF_TAPEBUFS)) {
533         $max_memory = getconf($CNF_TAPEBUFS) * 32768;
534     } else {
535         # use the default value
536         $max_memory = getconf($CNF_DEVICE_OUTPUT_BUFFER_SIZE);
537     }
538     $get_xfer_dest_args{'max_memory'} = $max_memory;
539
540     # here, things look a little bit different depending on whether we're
541     # reading from holding (FILE_WRITE) or from a network socket (PORT_WRITE)
542     if ($msgtype eq main::Protocol::FILE_WRITE) {
543         if ($params{'splitsize'} ne 0) {
544             $get_xfer_dest_args{'split_method'} = 'cache_inform';
545             $get_xfer_dest_args{'part_size'} = $params{'splitsize'}+0;
546         } else {
547             $get_xfer_dest_args{'split_method'} = 'none';
548         }
549     } else {
550         # if we have a disk buffer, use it
551         if ($params{'split_diskbuffer'} ne "NULL") {
552             if ($params{'splitsize'} ne 0) {
553                 $get_xfer_dest_args{'split_method'} = 'disk';
554                 $get_xfer_dest_args{'disk_cache_dirname'} = $params{'split_diskbuffer'};
555                 $get_xfer_dest_args{'part_size'} = $params{'splitsize'}+0;
556             } else {
557                 $get_xfer_dest_args{'split_method'} = 'none';
558             }
559         } else {
560             # otherwise, if splitsize is nonzero, use memory
561             if ($params{'splitsize'} ne 0) {
562                 my $size = $params{'fallback_splitsize'}+0;
563                 $size = $params{'splitsize'}+0 unless ($size);
564                 $get_xfer_dest_args{'split_method'} = 'memory';
565                 $get_xfer_dest_args{'part_size'} = $size;
566             } else {
567                 $get_xfer_dest_args{'split_method'} = 'none';
568             }
569         }
570     }
571
572     # implement the fallback to memory buffering if the disk buffer does
573     # not exist or doesnt have enough space
574     my $need_fallback = 0;
575     if ($get_xfer_dest_args{'split_method'} eq 'disk') {
576         if (! -d $get_xfer_dest_args{'disk_cache_dirname'}) {
577             $need_fallback = "'$get_xfer_dest_args{disk_cache_dirname}' not found or not a directory";
578         } else {
579             my $fsusage = Amanda::Util::get_fs_usage($get_xfer_dest_args{'disk_cache_dirname'});
580             my $avail = $fsusage->{'blocks'} * $fsusage->{'bavail'};
581             my $dir = $get_xfer_dest_args{'disk_cache_dirname'};
582             Amanda::Debug::debug("disk cache has $avail bytes available on $dir, but need $get_xfer_dest_args{part_size}");
583             if ($fsusage->{'blocks'} * $fsusage->{'bavail'} < $get_xfer_dest_args{'part_size'}) {
584                 $need_fallback = "insufficient space in disk cache directory";
585             }
586         }
587     }
588
589     if ($need_fallback) {
590         Amanda::Debug::warning("falling back to memory buffer for splitting: $need_fallback");
591         my $size = $params{'fallback_splitsize'}+0;
592         $get_xfer_dest_args{'split_method'} = 'memory';
593         $get_xfer_dest_args{'part_size'} = $size if $size != 0;
594         delete $get_xfer_dest_args{'disk_cache_dirname'};
595     }
596
597     return %get_xfer_dest_args;
598 }
599
600 sub send_port_and_get_header {
601     my $self = shift;
602     my ($finished_cb) = @_;
603
604     my $header_xfer;
605     my ($xsrc, $xdst);
606     my $errmsg;
607
608     my $steps = define_steps
609         cb_ref => \$finished_cb;
610
611     step send_port => sub {
612         # get the ip:port pairs for the data connection from the data xfer source,
613         # which should be an Amanda::Xfer::Source::DirectTCPListen
614         my $data_addrs = $self->{'xfer_source'}->get_addrs();
615         $data_addrs = join ";", map { $_->[0] . ':' . $_->[1] } @$data_addrs;
616
617         # and set up an xfer for the header, too, using DirectTCP as an easy
618         # way to implement a listen/accept/read process.  Note that this does
619         # not enforce a maximum size, so this portion of Amanda at least can
620         # handle any size header
621         ($xsrc, $xdst) = (
622             Amanda::Xfer::Source::DirectTCPListen->new(),
623             Amanda::Xfer::Dest::Buffer->new(0));
624         $header_xfer = Amanda::Xfer->new([$xsrc, $xdst]);
625         $header_xfer->start($steps->{'header_xfer_xmsg_cb'});
626
627         my $header_addrs = $xsrc->get_addrs();
628         $header_addrs = [ grep { $_->[0] eq '127.0.0.1' } @$header_addrs ];
629         die "Source::DirectTCPListen did not return a localhost address"
630             unless @$header_addrs;
631         my $header_port = $header_addrs->[0][1];
632
633         # and tell the driver which ports we're listening on
634         $self->{'proto'}->send(main::Protocol::PORT,
635             port => $header_port,
636             ipports => $data_addrs);
637     };
638
639     step header_xfer_xmsg_cb => sub {
640         my ($src, $xmsg, $xfer) = @_;
641         if ($xmsg->{'type'} == $XMSG_INFO) {
642             info($xmsg->{'message'});
643         } elsif ($xmsg->{'type'} == $XMSG_ERROR) {
644             $errmsg = $xmsg->{'messsage'};
645         } elsif ($xmsg->{'type'} == $XMSG_DONE) {
646             if ($errmsg) {
647                 $finished_cb->($errmsg);
648             } else {
649                 $steps->{'got_header'}->();
650             }
651         }
652     };
653
654     step got_header => sub {
655         my $hdr_buf = $xdst->get();
656
657         # close stuff up
658         $header_xfer = $xsrc = $xdst = undef;
659
660         if (!defined $hdr_buf) {
661             return $finished_cb->("Got empty header");
662         }
663
664         # parse the header, finally!
665         $self->{'header'} = Amanda::Header->from_string($hdr_buf);
666
667         $finished_cb->(undef);
668     };
669 }
670
671 # do the work of starting a new xfer; this contains the code common to
672 # msg_PORT_WRITE and msg_FILE_WRITE.
673 sub setup_and_start_dump {
674     my $self = shift;
675     my ($msgtype, %params) = @_;
676
677     # setting up the dump is a bit complex, due to the requirements of
678     # a directtcp port_write.  This function:
679     # 1. creates and starts a transfer (make_xfer)
680     # 2. gets the header
681     # 3. calls the scribe's start_dump method with the new header
682
683     my $steps = define_steps
684         cb_ref => \$params{'dump_cb'};
685
686     step setup => sub {
687         $self->{'handle'} = $params{'handle'};
688         $self->{'hostname'} = $params{'hostname'};
689         $self->{'diskname'} = $params{'diskname'};
690         $self->{'datestamp'} = $params{'datestamp'};
691         $self->{'level'} = $params{'level'};
692         $self->{'header'} = undef; # no header yet
693         $self->{'last_partnum'} = -1;
694         $self->{'orig_kb'} = $params{'orig_kb'};
695         $self->{'input_errors'} = [];
696
697         $steps->{'make_xfer'}->();
698     };
699
700     step make_xfer => sub {
701         $self->_assert_in_state("idle") or return;
702         $self->{'state'} = 'making_xfer';
703
704         my %get_xfer_dest_args = $self->get_splitting_config($msgtype, %params);
705         my $xfer_dest = $self->{'scribe'}->get_xfer_dest(%get_xfer_dest_args);
706
707         my $xfer_source;
708         if ($msgtype eq main::Protocol::PORT_WRITE) {
709             $xfer_source = Amanda::Xfer::Source::DirectTCPListen->new();
710         } else {
711             $xfer_source = Amanda::Xfer::Source::Holding->new($params{'filename'});
712         }
713         $self->{'xfer_source'} = $xfer_source;
714
715         $self->{'xfer'} = Amanda::Xfer->new([$xfer_source, $xfer_dest]);
716         $self->{'xfer'}->start(sub {
717             my ($src, $msg, $xfer) = @_;
718             $self->{'scribe'}->handle_xmsg($src, $msg, $xfer);
719
720             # if this is an error message that's not from the scribe's element, then
721             # we'll need to keep track of it ourselves
722             if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $xfer_dest) {
723                 push @{$self->{'input_errors'}}, $msg->{'message'};
724             }
725         });
726
727         # we've started the xfer now, but the destination won't actually write
728         # any data until we call start_dump.  And we'll need a header for that.
729
730         $steps->{'get_header'}->();
731     };
732
733     step get_header => sub {
734         $self->_assert_in_state("making_xfer") or return;
735         $self->{'state'} = 'getting_header';
736
737         if ($msgtype eq main::Protocol::FILE_WRITE) {
738             # getting the header is easy for FILE-WRITE..
739             my $hdr = $self->{'header'} = Amanda::Holding::get_header($params{'filename'});
740             if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
741                 die("Could not read header from '$params{filename}'");
742             }
743             $steps->{'start_dump'}->(undef);
744         } else {
745             # ..but quite a bit harder for PORT-WRITE; this method will send the
746             # proper PORT command, then read the header from the dumper and parse
747             # it, placing the result in $self->{'header'}
748             $self->send_port_and_get_header($steps->{'start_dump'});
749         }
750     };
751
752     step start_dump => sub {
753         my ($err) = @_;
754
755         $self->_assert_in_state("getting_header") or return;
756         $self->{'state'} = 'writing';
757
758         # if $err is set, cancel the dump, treating it as a input error
759         if ($err) {
760             push @{$self->{'input_errors'}}, $err;
761             return $self->{'scribe'}->cancel_dump(
762                 xfer => $self->{'xfer'},
763                 dump_cb => $params{'dump_cb'});
764         }
765
766         # sanity check the header..
767         my $hdr = $self->{'header'};
768         if ($hdr->{'dumplevel'} != $params{'level'}
769             or $hdr->{'name'} ne $params{'hostname'}
770             or $hdr->{'disk'} ne $params{'diskname'}
771             or $hdr->{'datestamp'} ne $params{'datestamp'}) {
772             die("Header of dumpfile does not match command from driver");
773         }
774
775         # start producing status
776         $self->create_status_file();
777
778         # and fix it up before writing it
779         $hdr->{'totalparts'} = -1;
780         $hdr->{'type'} = $Amanda::Header::F_SPLIT_DUMPFILE;
781
782         $self->{'scribe'}->start_dump(
783             xfer => $self->{'xfer'},
784             dump_header => $hdr,
785             dump_cb => $params{'dump_cb'});
786     };
787 }
788
789 sub dump_cb {
790     my $self = shift;
791     my %params = @_;
792
793     $self->{'orig_kb'} = $params{'orig_kb'} if defined ($params{'orig_kb'});
794
795     # if we need to the dumper status (to differentiate a dropped network
796     # connection from a normal EOF) and have not done so yet, then send a
797     # DUMPER_STATUS message and re-call this method (dump_cb) with the result.
798     if ($params{'result'} eq "DONE"
799             and $self->{'doing_port_write'}
800             and !exists $params{'dumper_status'}) {
801         $self->{'proto'}->set_message_cb(main::Protocol::DONE,
802             make_cb(sub { my ($DONE_msgtype, %DONE_params) = @_;
803                           $self->{'orig_kb'} = $DONE_params{'orig_kb'};
804                           $self->dump_cb(%params, dumper_status => "DONE"); }));
805         $self->{'proto'}->set_message_cb(main::Protocol::FAILED,
806             make_cb(sub { $self->dump_cb(%params, dumper_status => "FAILED"); }));
807         $self->{'proto'}->send(main::Protocol::DUMPER_STATUS,
808                 handle => $self->{'handle'});
809         return;
810     }
811
812     my ($msgtype, $logtype);
813     if ($params{'result'} eq 'DONE') {
814         if (!$self->{'doing_port_write'} or $params{'dumper_status'} eq "DONE") {
815             $msgtype = main::Protocol::DONE;
816             $logtype = $L_DONE;
817         } else {
818             $msgtype = main::Protocol::DONE;
819             $logtype = $L_PARTIAL;
820         }
821     } elsif ($params{'result'} eq 'PARTIAL') {
822         $msgtype = main::Protocol::PARTIAL;
823         $logtype = $L_PARTIAL;
824     } elsif ($params{'result'} eq 'FAILED') {
825         $msgtype = main::Protocol::FAILED;
826         $logtype = $L_FAIL;
827     }
828
829     if ($self->{timer}) {
830         $self->{timer}->remove();
831         undef $self->{timer};
832         $self->{status_fh}->close();
833         undef $self->{status_fh};
834         unlink($self->{status_filename});
835         undef $self->{status_filename};
836     }
837
838     # note that we use total_duration here, which is the total time between
839     # start_dump and dump_cb, so the kps generated here is much less than the
840     # actual tape write speed.  Think of this as the *taper* speed, rather than
841     # the *tape* speed.
842     my $stats = $self->make_stats($params{'size'}, $params{'total_duration'}, $self->{'orig_kb'});
843
844     # write a DONE/PARTIAL/FAIL log line
845     my $have_msg = @{$params{'device_errors'}};
846     my $msg = join("; ", @{$params{'device_errors'}}, @{$self->{'input_errors'}});
847     $msg = quote_string($msg);
848
849     if ($logtype == $L_FAIL) {
850         log_add($L_FAIL, sprintf("%s %s %s %s %s",
851             quote_string($self->{'hostname'}.""), # " is required for SWIG..
852             quote_string($self->{'diskname'}.""),
853             $self->{'datestamp'},
854             $self->{'level'},
855             $msg));
856     } else {
857         log_add($logtype, sprintf("%s %s %s %s %s %s%s",
858             quote_string($self->{'hostname'}.""), # " is required for SWIG..
859             quote_string($self->{'diskname'}.""),
860             $self->{'datestamp'},
861             $self->{'last_partnum'},
862             $self->{'level'},
863             $stats,
864             ($logtype == $L_PARTIAL and $have_msg)? " $msg" : ""));
865     }
866
867     # and send a message back to the driver
868     my %msg_params = (
869         handle => $self->{'handle'},
870     );
871
872     # reflect errors in our own elements in INPUT-ERROR or INPUT-GOOD
873     if (@{$self->{'input_errors'}}) {
874         $msg_params{'input'} = 'INPUT-ERROR';
875         $msg_params{'inputerr'} = join("; ", @{$self->{'input_errors'}});
876     } else {
877         $msg_params{'input'} = 'INPUT-GOOD';
878         $msg_params{'inputerr'} = '';
879     }
880
881     # and errors from the scribe in TAPE-ERROR or TAPE-GOOD
882     if (@{$params{'device_errors'}}) {
883         $msg_params{'taper'} = 'TAPE-ERROR';
884         $msg_params{'tapererr'} = join("; ", @{$params{'device_errors'}});
885     } else {
886         $msg_params{'taper'} = 'TAPE-GOOD';
887         $msg_params{'tapererr'} = '';
888     }
889
890     if ($msgtype ne main::Protocol::FAILED) {
891         $msg_params{'stats'} = $stats;
892     }
893
894     # reset things to 'idle' before sending the message
895     $self->{'xfer'} = undef;
896     $self->{'xfer_source'} = undef;
897     $self->{'handle'} = undef;
898     $self->{'header'} = undef;
899     $self->{'hostname'} = undef;
900     $self->{'diskname'} = undef;
901     $self->{'datestamp'} = undef;
902     $self->{'level'} = undef;
903     $self->{'header'} = undef;
904     $self->{'state'} = 'idle';
905
906     $self->{'proto'}->send($msgtype, %msg_params);
907 }
908
909 \f
910 package main;
911
912 use Amanda::Util qw( :constants );
913 use Amanda::Config qw( :init );
914 use Amanda::Logfile qw( :logtype_t log_add $amanda_log_trace_log );
915 use Amanda::Debug;
916 use Getopt::Long;
917
918 Amanda::Util::setup_application("taper", "server", $CONTEXT_DAEMON);
919
920 my $config_overrides = new_config_overrides($#ARGV+1);
921 Getopt::Long::Configure(qw{bundling});
922 GetOptions(
923     'o=s' => sub { add_config_override_opt($config_overrides, $_[1]); },
924 ) or usage();
925
926 if (@ARGV != 1) {
927     die "USAGE: taper <config> <config-overwrites>";
928 }
929
930 set_config_overrides($config_overrides);
931 config_init($CONFIG_INIT_EXPLICIT_NAME, $ARGV[0]);
932 my ($cfgerr_level, @cfgerr_errors) = config_errors();
933 if ($cfgerr_level >= $CFGERR_WARNINGS) {
934     config_print_errors();
935     if ($cfgerr_level >= $CFGERR_ERRORS) {
936         die "Errors processing config file";
937     }
938 }
939
940 # our STDERR is connected to the amdump log file, so be sure to do unbuffered
941 # writes to that file
942 my $old_fh = select(STDERR);
943 $| = 1;
944 select($old_fh);
945
946 log_add($L_INFO, "taper pid $$");
947 Amanda::Debug::add_amanda_log_handler($amanda_log_trace_log);
948
949 Amanda::Util::finish_setup($RUNNING_AS_DUMPUSER);
950
951 # transfer control to the main::Controller class implemented above
952 my $controller = main::Controller->new();
953 $controller->start();
954 Amanda::MainLoop::run();
955
956 log_add($L_INFO, "pid-done $$");
957 Amanda::Util::finish_application();