1 # Copyright (c) 2009-2012 Zmanda Inc. All Rights Reserved.
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
26 This package is a component of the Amanda taper, and is not intended for use by
27 other scripts or applications.
29 This package interface between L<Amanda::Taper::Controller> and L<Amanda::Taper::Scribe>.
31 The worker use an L<Amanda::Taper::Scribe> object to execute the request
32 received from the L<Amanda::Taper::Controller>.
36 use lib '@amperldir@';
40 package Amanda::Taper::Worker;
43 use POSIX qw( :errno_h );
45 use Amanda::Config qw( :getconf config_dir_relative );
46 use Amanda::Debug qw( :logging );
49 use Amanda::MainLoop qw( :GIOCondition );
51 use Amanda::Taper::Protocol;
52 use Amanda::Taper::Scan;
53 use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
54 use Amanda::Logfile qw( :logtype_t log_add make_stats );
55 use Amanda::Xfer qw( :constants );
56 use Amanda::Util qw( quote_string );
60 use base qw( Amanda::Taper::Scribe::Feedback );
66 my $worker_name = shift;
67 my $controller = shift;
68 my $write_timestamp = shift;
72 worker_name => $worker_name,
73 controller => $controller,
75 timestamp => $write_timestamp,
77 # filled in when a write starts:
87 doing_port_write => undef,
90 # periodic status updates
92 status_filename => undef,
95 # filled in after the header is available
98 # filled in when a new tape is started:
102 my $scribe = Amanda::Taper::Scribe->new(
103 taperscan => $controller->{'taperscan'},
105 debug => $Amanda::Config::debug_taper,
106 eject_volume => getconf($CNF_EJECT_VOLUME));
108 $self->{'scribe'} = $scribe;
109 $self->{'scribe'}->start(write_timestamp => $write_timestamp,
110 finished_cb => sub { $self->_scribe_started_cb(@_); });
115 # called when the scribe is fully started up and ready to go
116 sub _scribe_started_cb {
121 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPE_ERROR,
122 worker_name => $self->{'worker_name'},
124 $self->{'state'} = "error";
126 # log the error (note that the message is intentionally not quoted)
127 log_add($L_ERROR, "no-tape error [$err]");
130 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPER_OK,
131 worker_name => $self->{'worker_name'});
132 $self->{'state'} = "idle";
139 my ($msgtype, %params) = @_;
140 $self->_assert_in_state("idle") or return;
142 $self->{'doing_port_write'} = 0;
144 $self->setup_and_start_dump($msgtype,
145 dump_cb => sub { $self->dump_cb(@_); },
151 my ($msgtype, %params) = @_;
155 $self->_assert_in_state("idle") or return;
157 $self->{'doing_port_write'} = 1;
159 $self->setup_and_start_dump($msgtype,
160 dump_cb => sub { $self->dump_cb(@_); },
166 my ($msgtype, %params) = @_;
168 $self->{'scribe'}->start_scan(undef);
173 my ($msgtype, %params) = @_;
175 $self->_assert_in_state("writing") or return;
177 $self->{'perm_cb'}->(allow => 1);
182 my ($msgtype, %params) = @_;
184 $self->_assert_in_state("writing") or return;
186 # log the error (note that the message is intentionally not quoted)
187 log_add($L_ERROR, "no-tape config [$params{reason}]");
189 $self->{'perm_cb'}->(cause => "config", message => $params{'reason'});
192 sub TAKE_SCRIBE_FROM {
194 my ($worker1, $msgtype, %params) = @_;
196 $self->_assert_in_state("writing") or return;
197 $worker1->_assert_in_state("idle") or return;
199 my $scribe = $self->{'scribe'};
200 my $scribe1 = $worker1->{'scribe'};
201 $self->{'scribe'} = $scribe1;
202 $worker1->{'scribe'} = $scribe;
203 # Change the callback to call the new scribe
204 $self->{'xfer'}->set_callback(sub {
205 my ($src, $msg, $xfer) = @_;
206 $scribe1->handle_xmsg($src, $msg, $xfer);
208 # if this is an error message that's not from the scribe's element, then
209 # we'll need to keep track of it ourselves
210 if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
211 push @{$self->{'input_errors'}}, $msg->{'message'};
215 $self->{'label'} = $worker1->{'label'};
216 $self->{'perm_cb'}->(scribe => $scribe1);
217 delete $worker1->{'scribe'};
218 $worker1->{'state'} = 'error';
219 $scribe->quit(finished_cb => sub {});
224 my ($msgtype, %params) = @_;
226 if ($params{'handle'} ne $self->{'handle'}) {
227 # ignore message for previous handle
231 if (defined $self->{'dumper_status'}) {
232 # ignore duplicate message
236 $self->{'dumper_status'} = "DONE";
237 $self->{'orig_kb'} = $params{'orig_kb'};
238 if (defined $self->{'result'}) {
239 $self->result_cb(undef);
245 my ($msgtype, %params) = @_;
247 if ($params{'handle'} ne $self->{'handle'}) {
248 # ignore message for previous handle
252 if (defined $self->{'dumper_status'}) {
253 # ignore duplicate message
257 $self->{'dumper_status'} = "FAILED";
258 if (defined $self->{'header_xfer'}) {
259 $self->{'header_xfer'}->cancel();
260 } elsif (defined $self->{'result'}) {
261 $self->result_cb(undef);
262 } elsif (!defined $self->{'scribe'}->{'xdt'}) {
263 # ignore, the dump is already cancelled or not yet started.
264 } elsif (!defined $self->{'scribe'}->{'xfer'}) {
265 # ignore, the dump is already cancelled or not yet started.
266 } else { # Abort the dump
267 push @{$self->{'input_errors'}}, "dumper failed";
268 $self->{'scribe'}->cancel_dump(
269 xfer => $self->{'scribe'}->{'xfer'},
270 dump_cb => $self->{'dump_cb'});
276 my ($msgtype, %params) = @_;
278 $self->_assert_in_state("idle") or return;
280 $self->{'scribe'}->close_volume();
285 my %params = %{$self->{'dump_params'}};
289 if ($params{'result'} eq 'DONE') {
290 if ($self->{'dumper_status'} eq "DONE") {
291 $msgtype = Amanda::Taper::Protocol::DONE;
294 $msgtype = Amanda::Taper::Protocol::DONE;
295 $logtype = $L_PARTIAL;
297 } elsif ($params{'result'} eq 'PARTIAL') {
298 $msgtype = Amanda::Taper::Protocol::PARTIAL;
299 $logtype = $L_PARTIAL;
300 } elsif ($params{'result'} eq 'FAILED') {
301 $msgtype = Amanda::Taper::Protocol::FAILED;
305 if ($self->{timer}) {
306 $self->{timer}->remove();
307 undef $self->{timer};
308 $self->{status_fh}->close();
309 undef $self->{status_fh};
310 unlink($self->{status_filename});
311 undef $self->{status_filename};
314 # note that we use total_duration here, which is the total time between
315 # start_dump and dump_cb, so the kps generated here is much less than the
316 # actual tape write speed. Think of this as the *taper* speed, rather than
318 my $stats = make_stats($params{'size'}, $params{'total_duration'}, $self->{'orig_kb'});
320 # consider this a config-derived failure only if there were no errors
321 my $failure_from = (@{$params{'device_errors'}})? 'error' : 'config';
323 my @all_messages = (@{$params{'device_errors'}}, @{$self->{'input_errors'}});
324 push @all_messages, $params{'config_denial_message'} if $params{'config_denial_message'};
325 my $msg = quote_string(join("; ", @all_messages));
327 # write a DONE/PARTIAL/FAIL log line
328 if ($logtype == $L_FAIL) {
329 log_add($L_FAIL, sprintf("%s %s %s %s %s %s",
330 quote_string($self->{'hostname'}.""), # " is required for SWIG..
331 quote_string($self->{'diskname'}.""),
332 $self->{'datestamp'},
337 log_add($logtype, sprintf("%s %s %s %s %s %s%s",
338 quote_string($self->{'hostname'}.""), # " is required for SWIG..
339 quote_string($self->{'diskname'}.""),
340 $self->{'datestamp'},
344 ($logtype == $L_PARTIAL and @all_messages)? " $msg" : ""));
347 # and send a message back to the driver
349 handle => $self->{'handle'},
352 # reflect errors in our own elements in INPUT-ERROR or INPUT-GOOD
353 if (@{$self->{'input_errors'}}) {
354 $msg_params{'input'} = 'INPUT-ERROR';
355 $msg_params{'inputerr'} = join("; ", @{$self->{'input_errors'}});
357 $msg_params{'input'} = 'INPUT-GOOD';
358 $msg_params{'inputerr'} = '';
361 # and errors from the scribe in TAPE-ERROR or TAPE-GOOD
362 if (@{$params{'device_errors'}}) {
363 $msg_params{'taper'} = 'TAPE-ERROR';
364 $msg_params{'tapererr'} = join("; ", @{$params{'device_errors'}});
365 } elsif ($params{'config_denial_message'}) {
366 $msg_params{'taper'} = 'TAPE-CONFIG';
367 $msg_params{'tapererr'} = $params{'config_denial_message'};
369 $msg_params{'taper'} = 'TAPE-GOOD';
370 $msg_params{'tapererr'} = '';
373 if ($msgtype ne Amanda::Taper::Protocol::FAILED) {
374 $msg_params{'stats'} = $stats;
377 # reset things to 'idle' before sending the message
378 $self->{'xfer'} = undef;
379 $self->{'xfer_source'} = undef;
380 $self->{'xfer_dest'} = undef;
381 $self->{'handle'} = undef;
382 $self->{'header'} = undef;
383 $self->{'hostname'} = undef;
384 $self->{'diskname'} = undef;
385 $self->{'datestamp'} = undef;
386 $self->{'level'} = undef;
387 $self->{'header'} = undef;
388 $self->{'state'} = 'idle';
389 delete $self->{'result'};
390 delete $self->{'dumper_status'};
391 delete $self->{'dump_params'};
393 $self->{'controller'}->{'proto'}->send($msgtype, %msg_params);
400 sub request_volume_permission {
404 $self->{'perm_cb'} = $params{'perm_cb'};
405 # and send the request to the driver
406 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::REQUEST_NEW_TAPE,
407 handle => $self->{'handle'});
410 sub scribe_notif_new_tape {
414 # TODO: if $params{error} is set, report it back to the driver
415 # (this will be a change to the protocol)
416 log_add($L_INFO, "$params{'error'}") if defined $params{'error'};
418 if ($params{'volume_label'}) {
419 $self->{'label'} = $params{'volume_label'};
421 # add to the trace log
422 log_add($L_START, sprintf("datestamp %s label %s tape %s",
423 $self->{'timestamp'},
424 quote_string($self->{'label'}),
428 print STDERR "taper: wrote label '$self->{label}'\n";
430 # and inform the driver
431 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NEW_TAPE,
432 handle => $self->{'handle'},
433 label => $params{'volume_label'});
435 $self->{'label'} = undef;
437 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NO_NEW_TAPE,
438 handle => $self->{'handle'});
442 sub scribe_notif_part_done {
446 $self->_assert_in_state("writing") or return;
448 my $stats = make_stats($params{'size'}, $params{'duration'}, $self->{'orig_kb'});
450 # log the part, using PART or PARTPARTIAL
451 my $logbase = sprintf("%s %s %s %s %s %s/%s %s %s",
452 quote_string($self->{'label'}),
454 quote_string($self->{'header'}->{'name'}.""), # " is required for SWIG..
455 quote_string($self->{'header'}->{'disk'}.""),
456 $self->{'datestamp'},
457 $params{'partnum'}, -1, # totalparts is always -1
460 if ($params{'successful'}) {
461 log_add($L_PART, $logbase);
463 log_add($L_PARTPARTIAL, "$logbase \"No space left on device\"");
466 # only send a PARTDONE if it was successful
467 if ($params{'successful'}) {
468 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PARTDONE,
469 handle => $self->{'handle'},
470 label => $self->{'label'},
471 fileno => $params{'fileno'},
473 kb => $params{'size'} / 1024);
477 sub scribe_notif_log_info {
481 debug("$params{'message'}");
482 log_add($L_INFO, "$params{'message'}");
488 sub _assert_in_state {
491 if ($self->{'state'} eq $state) {
494 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::BAD_COMMAND,
495 message => "command not appropriate in state '$self->{state}' : '$state'");
500 sub create_status_file {
503 # create temporary file
504 ($self->{status_fh}, $self->{status_filename}) =
505 File::Temp::tempfile("taper_status_file_XXXXXX",
506 DIR => $Amanda::Paths::AMANDA_TMPDIR,
509 # tell amstatus about it by writing it to the dump log
510 my $qdisk = Amanda::Util::quote_string($self->{'diskname'});
511 my $qhost = Amanda::Util::quote_string($self->{'hostname'});
512 print STDERR "taper: status file $qhost $qdisk:" .
513 "$self->{status_filename}\n";
514 print {$self->{status_fh}} "0";
516 # create timer callback, firing every 5s (=5000msec)
517 $self->{timer} = Amanda::MainLoop::timeout_source(5000);
518 $self->{timer}->set_callback(sub {
519 my $size = $self->{scribe}->get_bytes_written();
520 seek $self->{status_fh}, 0, 0;
521 print {$self->{status_fh}} $size, ' ';
522 $self->{status_fh}->flush();
526 sub send_port_and_get_header {
528 my ($finished_cb) = @_;
533 my $steps = define_steps
534 cb_ref => \$finished_cb;
536 step send_port => sub {
537 # get the ip:port pairs for the data connection from the data xfer source,
538 # which should be an Amanda::Xfer::Source::DirectTCPListen
539 my $data_addrs = $self->{'xfer_source'}->get_addrs();
540 $data_addrs = join ";", map { $_->[0] . ':' . $_->[1] } @$data_addrs;
542 # and set up an xfer for the header, too, using DirectTCP as an easy
543 # way to implement a listen/accept/read process. Note that this does
544 # not enforce a maximum size, so this portion of Amanda at least can
545 # handle any size header
547 Amanda::Xfer::Source::DirectTCPListen->new(),
548 Amanda::Xfer::Dest::Buffer->new(0));
549 $self->{'header_xfer'} = Amanda::Xfer->new([$xsrc, $xdst]);
550 $self->{'header_xfer'}->start($steps->{'header_xfer_xmsg_cb'});
552 my $header_addrs = $xsrc->get_addrs();
553 my $header_port = $header_addrs->[0][1];
555 # and tell the driver which ports we're listening on
556 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PORT,
557 worker_name => $self->{'worker_name'},
558 handle => $self->{'handle'},
559 port => $header_port,
560 ipports => $data_addrs);
563 step header_xfer_xmsg_cb => sub {
564 my ($src, $xmsg, $xfer) = @_;
565 if ($xmsg->{'type'} == $XMSG_INFO) {
566 info($xmsg->{'message'});
567 } elsif ($xmsg->{'type'} == $XMSG_ERROR) {
568 $errmsg = $xmsg->{'message'};
569 } elsif ($xmsg->{'type'} == $XMSG_DONE) {
571 $finished_cb->($errmsg);
573 $steps->{'got_header'}->();
578 step got_header => sub {
579 my $hdr_buf = $xdst->get();
582 $self->{'header_xfer'} = $xsrc = $xdst = undef;
584 if (!defined $hdr_buf) {
585 return $finished_cb->("Got empty header");
588 # parse the header, finally!
589 $self->{'header'} = Amanda::Header->from_string($hdr_buf);
591 $finished_cb->(undef);
595 # do the work of starting a new xfer; this contains the code common to
596 # msg_PORT_WRITE and msg_FILE_WRITE.
597 sub setup_and_start_dump {
599 my ($msgtype, %params) = @_;
600 my %get_xfer_dest_args;
602 $self->{'dump_cb'} = $params{'dump_cb'};
604 # setting up the dump is a bit complex, due to the requirements of
605 # a directtcp port_write. This function:
606 # 1. creates and starts a transfer (make_xfer)
608 # 3. calls the scribe's start_dump method with the new header
610 my $steps = define_steps
611 cb_ref => \$params{'dump_cb'};
614 $self->{'handle'} = $params{'handle'};
615 $self->{'hostname'} = $params{'hostname'};
616 $self->{'diskname'} = $params{'diskname'};
617 $self->{'datestamp'} = $params{'datestamp'};
618 $self->{'level'} = $params{'level'};
619 $self->{'header'} = undef; # no header yet
620 $self->{'orig_kb'} = $params{'orig_kb'};
621 $self->{'input_errors'} = [];
623 if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE &&
624 (my $err = $self->{'scribe'}->check_data_path($params{'data_path'}))) {
625 return $params{'dump_cb'}->(
627 device_errors => [ ['error', "$err"] ],
630 total_duration => 0);
632 $steps->{'process_args'}->();
635 step process_args => sub {
636 # extract the splitting-related parameters, stripping out empty strings
637 my %splitting_args = map {
638 (defined $params{$_} && $params{$_} ne '')? ($_, $params{$_}) : ()
640 dle_tape_splitsize dle_split_diskbuffer dle_fallback_splitsize dle_allow_split
641 part_size part_cache_type part_cache_dir part_cache_max_size data_path
644 # convert numeric values to BigInts
645 for (qw(dle_tape_splitsize dle_fallback_splitsize part_size part_cache_max_size)) {
646 $splitting_args{$_} = Math::BigInt->new($splitting_args{$_})
647 if (exists $splitting_args{$_});
650 my $device = $self->{'scribe'}->get_device();
651 if (!defined $device) {
652 confess "no device is available to create an xfer_dest";
654 $splitting_args{'leom_supported'} = $device->property_get("leom");
655 # and convert those to get_xfer_dest args
656 %get_xfer_dest_args = get_splitting_args_from_config(
658 $get_xfer_dest_args{'max_memory'} = getconf($CNF_DEVICE_OUTPUT_BUFFER_SIZE);
659 if (!getconf_seen($CNF_DEVICE_OUTPUT_BUFFER_SIZE)) {
660 my $block_size4 = $device->block_size * 4;
661 if ($block_size4 > $get_xfer_dest_args{'max_memory'}) {
662 $get_xfer_dest_args{'max_memory'} = $block_size4;
666 $get_xfer_dest_args{'can_cache_inform'} = ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE and $get_xfer_dest_args{'allow_split'});
668 # if we're unable to fulfill the user's splitting needs, we can still give
669 # the dump a shot - but we'll warn them about the problem
670 if ($get_xfer_dest_args{'warning'}) {
671 log_add($L_WARNING, sprintf("%s:%s: %s",
672 $params{'hostname'}, $params{'diskname'},
673 $get_xfer_dest_args{'warning'}));
674 delete $get_xfer_dest_args{'warning'};
677 $steps->{'make_xfer'}->();
680 step make_xfer => sub {
681 $self->_assert_in_state("idle") or return;
682 $self->{'state'} = 'making_xfer';
684 $self->{'xfer_dest'} = $self->{'scribe'}->get_xfer_dest(%get_xfer_dest_args);
687 if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE) {
688 $xfer_source = Amanda::Xfer::Source::DirectTCPListen->new();
690 $xfer_source = Amanda::Xfer::Source::Holding->new($params{'filename'});
692 $self->{'xfer_source'} = $xfer_source;
694 $self->{'xfer'} = Amanda::Xfer->new([$xfer_source, $self->{'xfer_dest'}]);
695 $self->{'xfer'}->start(sub {
696 my ($src, $msg, $xfer) = @_;
697 $self->{'scribe'}->handle_xmsg($src, $msg, $xfer);
699 # if this is an error message that's not from the scribe's element, then
700 # we'll need to keep track of it ourselves
701 if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
702 push @{$self->{'input_errors'}}, $msg->{'message'};
706 # we've started the xfer now, but the destination won't actually write
707 # any data until we call start_dump. And we'll need a header for that.
709 $steps->{'get_header'}->();
712 step get_header => sub {
713 $self->_assert_in_state("making_xfer") or return;
714 $self->{'state'} = 'getting_header';
716 if ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE) {
717 # getting the header is easy for FILE-WRITE..
718 my $hdr = $self->{'header'} = Amanda::Holding::get_header($params{'filename'});
720 if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
721 confess("Could not read header from '$params{filename}'");
724 # stip out header fields we don't need
725 $hdr->{'cont_filename'} = '';
727 if ($self->{'header'}->{'is_partial'}) {
728 $self->{'dumper_status'} = "FAILED";
730 $self->{'dumper_status'} = "DONE";
733 $steps->{'start_dump'}->(undef);
735 # ..but quite a bit harder for PORT-WRITE; this method will send the
736 # proper PORT command, then read the header from the dumper and parse
737 # it, placing the result in $self->{'header'}
738 $self->send_port_and_get_header($steps->{'start_dump'});
742 step start_dump => sub {
745 $self->_assert_in_state("getting_header") or return;
746 $self->{'state'} = 'writing';
748 # abort if we already got a device_errors
749 if (@{$self->{'scribe'}->{'device_errors'}}) {
750 $self->{'scribe'}->abort_setup(dump_cb => $params{'dump_cb'});
753 # if $err is set, cancel the dump, treating it as a input error
755 push @{$self->{'input_errors'}}, $err;
756 return $self->{'scribe'}->cancel_dump(
757 xfer => $self->{'xfer'},
758 dump_cb => $params{'dump_cb'});
761 # sanity check the header..
762 my $hdr = $self->{'header'};
763 if ($hdr->{'dumplevel'} != $params{'level'}
764 or $hdr->{'name'} ne $params{'hostname'}
765 or $hdr->{'disk'} ne $params{'diskname'}
766 or $hdr->{'datestamp'} ne $params{'datestamp'}) {
767 confess("Header of dumpfile does not match command from driver");
770 # start producing status
771 $self->create_status_file();
773 # and fix it up before writing it
774 $hdr->{'totalparts'} = -1;
775 $hdr->{'type'} = $Amanda::Header::F_SPLIT_DUMPFILE;
777 $self->{'scribe'}->start_dump(
778 xfer => $self->{'xfer'},
780 dump_cb => $params{'dump_cb'});
788 $self->{'dump_params'} = \%params;
789 $self->{'result'} = $params{'result'};
791 # if we need to the dumper status (to differentiate a dropped network
792 # connection from a normal EOF) and have not done so yet, then send a
793 # DUMPER_STATUS message and re-call this method (dump_cb) with the result.
794 if ($params{'result'} eq "DONE"
795 and $self->{'doing_port_write'}
796 and !exists $self->{'dumper_status'}) {
797 my $controller = $self->{'controller'};
798 my $proto = $controller->{'proto'};
799 my $handle = $self->{'handle'};
800 $proto->send(Amanda::Taper::Protocol::DUMPER_STATUS,
801 handle => "$handle");