1 # Copyright (c) 2009-2012 Zmanda Inc. All Rights Reserved.
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
25 This package is a component of the Amanda taper, and is not intended for use by
26 other scripts or applications.
28 This package interface between L<Amanda::Taper::Controller> and L<Amanda::Taper::Scribe>.
30 The worker use an L<Amanda::Taper::Scribe> object to execute the request
31 received from the L<Amanda::Taper::Controller>.
35 use lib '@amperldir@';
39 package Amanda::Taper::Worker;
42 use POSIX qw( :errno_h );
44 use Amanda::Config qw( :getconf config_dir_relative );
45 use Amanda::Debug qw( :logging );
48 use Amanda::MainLoop qw( :GIOCondition );
50 use Amanda::Taper::Protocol;
51 use Amanda::Taper::Scan;
52 use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
53 use Amanda::Logfile qw( :logtype_t log_add make_stats );
54 use Amanda::Xfer qw( :constants );
55 use Amanda::Util qw( quote_string );
59 use base qw( Amanda::Taper::Scribe::Feedback );
65 my $worker_name = shift;
66 my $controller = shift;
67 my $write_timestamp = shift;
71 worker_name => $worker_name,
72 controller => $controller,
74 timestamp => $write_timestamp,
76 # filled in when a write starts:
86 doing_port_write => undef,
89 # periodic status updates
91 status_filename => undef,
94 # filled in after the header is available
97 # filled in when a new tape is started:
101 my $scribe = Amanda::Taper::Scribe->new(
102 taperscan => $controller->{'taperscan'},
104 debug => $Amanda::Config::debug_taper,
105 eject_volume => getconf($CNF_EJECT_VOLUME));
107 $self->{'scribe'} = $scribe;
108 $self->{'scribe'}->start(write_timestamp => $write_timestamp,
109 finished_cb => sub { $self->_scribe_started_cb(@_); });
114 # called when the scribe is fully started up and ready to go
115 sub _scribe_started_cb {
120 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPE_ERROR,
121 worker_name => $self->{'worker_name'},
123 $self->{'state'} = "error";
125 # log the error (note that the message is intentionally not quoted)
126 log_add($L_ERROR, "no-tape error [$err]");
129 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPER_OK,
130 worker_name => $self->{'worker_name'});
131 $self->{'state'} = "idle";
138 my ($msgtype, %params) = @_;
139 $self->_assert_in_state("idle") or return;
141 $self->{'doing_port_write'} = 0;
143 $self->setup_and_start_dump($msgtype,
144 dump_cb => sub { $self->dump_cb(@_); },
150 my ($msgtype, %params) = @_;
154 $self->_assert_in_state("idle") or return;
156 $self->{'doing_port_write'} = 1;
158 $self->setup_and_start_dump($msgtype,
159 dump_cb => sub { $self->dump_cb(@_); },
165 my ($msgtype, %params) = @_;
167 $self->{'scribe'}->start_scan(undef);
172 my ($msgtype, %params) = @_;
174 $self->_assert_in_state("writing") or return;
176 $self->{'perm_cb'}->(allow => 1);
181 my ($msgtype, %params) = @_;
183 $self->_assert_in_state("writing") or return;
185 # log the error (note that the message is intentionally not quoted)
186 log_add($L_ERROR, "no-tape config [$params{reason}]");
188 $self->{'perm_cb'}->(cause => "config", message => $params{'reason'});
191 sub TAKE_SCRIBE_FROM {
193 my ($worker1, $msgtype, %params) = @_;
195 $self->_assert_in_state("writing") or return;
196 $worker1->_assert_in_state("idle") or return;
198 my $scribe = $self->{'scribe'};
199 my $scribe1 = $worker1->{'scribe'};
200 $self->{'scribe'} = $scribe1;
201 $worker1->{'scribe'} = $scribe;
202 # Change the callback to call the new scribe
203 $self->{'xfer'}->set_callback(sub {
204 my ($src, $msg, $xfer) = @_;
205 $scribe1->handle_xmsg($src, $msg, $xfer);
207 # if this is an error message that's not from the scribe's element, then
208 # we'll need to keep track of it ourselves
209 if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
210 push @{$self->{'input_errors'}}, $msg->{'message'};
214 $self->{'label'} = $worker1->{'label'};
215 $self->{'perm_cb'}->(scribe => $scribe1);
216 delete $worker1->{'scribe'};
217 $worker1->{'state'} = 'error';
218 $scribe->quit(finished_cb => sub {});
223 my ($msgtype, %params) = @_;
225 if (!defined $self->{'dumper_status'}) {
226 $self->{'dumper_status'} = "DONE";
227 $self->{'orig_kb'} = $params{'orig_kb'};
228 if (defined $self->{'result'}) {
229 $self->result_cb(undef);
238 my ($msgtype, %params) = @_;
240 $self->{'dumper_status'} = "FAILED";
241 if (defined $self->{'header_xfer'}) {
242 $self->{'header_xfer'}->cancel();
243 } elsif (defined $self->{'result'}) {
244 $self->result_cb(undef);
245 } elsif (!defined $self->{'scribe'}->{'xdt'}) {
246 # ignore, the dump is already cancelled or not yet started.
247 } elsif (!defined $self->{'scribe'}->{'xfer'}) {
248 # ignore, the dump is already cancelled or not yet started.
249 } else { # Abort the dump
250 push @{$self->{'input_errors'}}, "dumper failed";
251 $self->{'scribe'}->cancel_dump(
252 xfer => $self->{'scribe'}->{'xfer'},
253 dump_cb => $self->{'dump_cb'});
259 my ($msgtype, %params) = @_;
261 $self->_assert_in_state("idle") or return;
263 $self->{'scribe'}->close_volume();
268 my %params = %{$self->{'dump_params'}};
272 if ($params{'result'} eq 'DONE') {
273 if (!$self->{'doing_port_write'} or $self->{'dumper_status'} eq "DONE") {
274 $msgtype = Amanda::Taper::Protocol::DONE;
277 $msgtype = Amanda::Taper::Protocol::DONE;
278 $logtype = $L_PARTIAL;
280 } elsif ($params{'result'} eq 'PARTIAL') {
281 $msgtype = Amanda::Taper::Protocol::PARTIAL;
282 $logtype = $L_PARTIAL;
283 } elsif ($params{'result'} eq 'FAILED') {
284 $msgtype = Amanda::Taper::Protocol::FAILED;
288 if ($self->{timer}) {
289 $self->{timer}->remove();
290 undef $self->{timer};
291 $self->{status_fh}->close();
292 undef $self->{status_fh};
293 unlink($self->{status_filename});
294 undef $self->{status_filename};
297 # note that we use total_duration here, which is the total time between
298 # start_dump and dump_cb, so the kps generated here is much less than the
299 # actual tape write speed. Think of this as the *taper* speed, rather than
301 my $stats = make_stats($params{'size'}, $params{'total_duration'}, $self->{'orig_kb'});
303 # consider this a config-derived failure only if there were no errors
304 my $failure_from = (@{$params{'device_errors'}})? 'error' : 'config';
306 my @all_messages = (@{$params{'device_errors'}}, @{$self->{'input_errors'}});
307 push @all_messages, $params{'config_denial_message'} if $params{'config_denial_message'};
308 my $msg = quote_string(join("; ", @all_messages));
310 # write a DONE/PARTIAL/FAIL log line
311 if ($logtype == $L_FAIL) {
312 log_add($L_FAIL, sprintf("%s %s %s %s %s %s",
313 quote_string($self->{'hostname'}.""), # " is required for SWIG..
314 quote_string($self->{'diskname'}.""),
315 $self->{'datestamp'},
320 log_add($logtype, sprintf("%s %s %s %s %s %s%s",
321 quote_string($self->{'hostname'}.""), # " is required for SWIG..
322 quote_string($self->{'diskname'}.""),
323 $self->{'datestamp'},
327 ($logtype == $L_PARTIAL and @all_messages)? " $msg" : ""));
330 # and send a message back to the driver
332 handle => $self->{'handle'},
335 # reflect errors in our own elements in INPUT-ERROR or INPUT-GOOD
336 if (@{$self->{'input_errors'}}) {
337 $msg_params{'input'} = 'INPUT-ERROR';
338 $msg_params{'inputerr'} = join("; ", @{$self->{'input_errors'}});
340 $msg_params{'input'} = 'INPUT-GOOD';
341 $msg_params{'inputerr'} = '';
344 # and errors from the scribe in TAPE-ERROR or TAPE-GOOD
345 if (@{$params{'device_errors'}}) {
346 $msg_params{'taper'} = 'TAPE-ERROR';
347 $msg_params{'tapererr'} = join("; ", @{$params{'device_errors'}});
348 } elsif ($params{'config_denial_message'}) {
349 $msg_params{'taper'} = 'TAPE-ERROR';
350 $msg_params{'tapererr'} = $params{'config_denial_message'};
352 $msg_params{'taper'} = 'TAPE-GOOD';
353 $msg_params{'tapererr'} = '';
356 if ($msgtype ne Amanda::Taper::Protocol::FAILED) {
357 $msg_params{'stats'} = $stats;
360 # reset things to 'idle' before sending the message
361 $self->{'xfer'} = undef;
362 $self->{'xfer_source'} = undef;
363 $self->{'xfer_dest'} = undef;
364 $self->{'handle'} = undef;
365 $self->{'header'} = undef;
366 $self->{'hostname'} = undef;
367 $self->{'diskname'} = undef;
368 $self->{'datestamp'} = undef;
369 $self->{'level'} = undef;
370 $self->{'header'} = undef;
371 $self->{'state'} = 'idle';
372 delete $self->{'result'};
373 delete $self->{'dumper_status'};
374 delete $self->{'dump_params'};
376 $self->{'controller'}->{'proto'}->send($msgtype, %msg_params);
383 sub request_volume_permission {
387 $self->{'perm_cb'} = $params{'perm_cb'};
388 # and send the request to the driver
389 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::REQUEST_NEW_TAPE,
390 handle => $self->{'handle'});
393 sub scribe_notif_new_tape {
397 # TODO: if $params{error} is set, report it back to the driver
398 # (this will be a change to the protocol)
399 log_add($L_INFO, "$params{'error'}") if defined $params{'error'};
401 if ($params{'volume_label'}) {
402 $self->{'label'} = $params{'volume_label'};
404 # add to the trace log
405 log_add($L_START, sprintf("datestamp %s label %s tape %s",
406 $self->{'timestamp'},
407 quote_string($self->{'label'}),
411 print STDERR "taper: wrote label '$self->{label}'\n";
413 # and inform the driver
414 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NEW_TAPE,
415 handle => $self->{'handle'},
416 label => $params{'volume_label'});
418 $self->{'label'} = undef;
420 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NO_NEW_TAPE,
421 handle => $self->{'handle'});
425 sub scribe_notif_part_done {
429 $self->_assert_in_state("writing") or return;
431 my $stats = make_stats($params{'size'}, $params{'duration'}, $self->{'orig_kb'});
433 # log the part, using PART or PARTPARTIAL
434 my $logbase = sprintf("%s %s %s %s %s %s/%s %s %s",
435 quote_string($self->{'label'}),
437 quote_string($self->{'header'}->{'name'}.""), # " is required for SWIG..
438 quote_string($self->{'header'}->{'disk'}.""),
439 $self->{'datestamp'},
440 $params{'partnum'}, -1, # totalparts is always -1
443 if ($params{'successful'}) {
444 log_add($L_PART, $logbase);
446 log_add($L_PARTPARTIAL, "$logbase \"No space left on device\"");
449 # only send a PARTDONE if it was successful
450 if ($params{'successful'}) {
451 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PARTDONE,
452 handle => $self->{'handle'},
453 label => $self->{'label'},
454 fileno => $params{'fileno'},
456 kb => $params{'size'} / 1024);
460 sub scribe_notif_log_info {
464 debug("$params{'message'}");
465 log_add($L_INFO, "$params{'message'}");
471 sub _assert_in_state {
474 if ($self->{'state'} eq $state) {
477 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::BAD_COMMAND,
478 message => "command not appropriate in state '$self->{state}' : '$state'");
483 sub create_status_file {
486 # create temporary file
487 ($self->{status_fh}, $self->{status_filename}) =
488 File::Temp::tempfile("taper_status_file_XXXXXX",
489 DIR => $Amanda::Paths::AMANDA_TMPDIR,
492 # tell amstatus about it by writing it to the dump log
493 my $qdisk = Amanda::Util::quote_string($self->{'diskname'});
494 my $qhost = Amanda::Util::quote_string($self->{'hostname'});
495 print STDERR "taper: status file $qhost $qdisk:" .
496 "$self->{status_filename}\n";
497 print {$self->{status_fh}} "0";
499 # create timer callback, firing every 5s (=5000msec)
500 $self->{timer} = Amanda::MainLoop::timeout_source(5000);
501 $self->{timer}->set_callback(sub {
502 my $size = $self->{scribe}->get_bytes_written();
503 seek $self->{status_fh}, 0, 0;
504 print {$self->{status_fh}} $size, ' ';
505 $self->{status_fh}->flush();
509 sub send_port_and_get_header {
511 my ($finished_cb) = @_;
516 my $steps = define_steps
517 cb_ref => \$finished_cb;
519 step send_port => sub {
520 # get the ip:port pairs for the data connection from the data xfer source,
521 # which should be an Amanda::Xfer::Source::DirectTCPListen
522 my $data_addrs = $self->{'xfer_source'}->get_addrs();
523 $data_addrs = join ";", map { $_->[0] . ':' . $_->[1] } @$data_addrs;
525 # and set up an xfer for the header, too, using DirectTCP as an easy
526 # way to implement a listen/accept/read process. Note that this does
527 # not enforce a maximum size, so this portion of Amanda at least can
528 # handle any size header
530 Amanda::Xfer::Source::DirectTCPListen->new(),
531 Amanda::Xfer::Dest::Buffer->new(0));
532 $self->{'header_xfer'} = Amanda::Xfer->new([$xsrc, $xdst]);
533 $self->{'header_xfer'}->start($steps->{'header_xfer_xmsg_cb'});
535 my $header_addrs = $xsrc->get_addrs();
536 my $header_port = $header_addrs->[0][1];
538 # and tell the driver which ports we're listening on
539 $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PORT,
540 worker_name => $self->{'worker_name'},
541 handle => $self->{'handle'},
542 port => $header_port,
543 ipports => $data_addrs);
546 step header_xfer_xmsg_cb => sub {
547 my ($src, $xmsg, $xfer) = @_;
548 if ($xmsg->{'type'} == $XMSG_INFO) {
549 info($xmsg->{'message'});
550 } elsif ($xmsg->{'type'} == $XMSG_ERROR) {
551 $errmsg = $xmsg->{'message'};
552 } elsif ($xmsg->{'type'} == $XMSG_DONE) {
554 $finished_cb->($errmsg);
556 $steps->{'got_header'}->();
561 step got_header => sub {
562 my $hdr_buf = $xdst->get();
565 $self->{'header_xfer'} = $xsrc = $xdst = undef;
567 if (!defined $hdr_buf) {
568 return $finished_cb->("Got empty header");
571 # parse the header, finally!
572 $self->{'header'} = Amanda::Header->from_string($hdr_buf);
574 $finished_cb->(undef);
578 # do the work of starting a new xfer; this contains the code common to
579 # msg_PORT_WRITE and msg_FILE_WRITE.
580 sub setup_and_start_dump {
582 my ($msgtype, %params) = @_;
583 my %get_xfer_dest_args;
585 $self->{'dump_cb'} = $params{'dump_cb'};
587 # setting up the dump is a bit complex, due to the requirements of
588 # a directtcp port_write. This function:
589 # 1. creates and starts a transfer (make_xfer)
591 # 3. calls the scribe's start_dump method with the new header
593 my $steps = define_steps
594 cb_ref => \$params{'dump_cb'};
597 $self->{'handle'} = $params{'handle'};
598 $self->{'hostname'} = $params{'hostname'};
599 $self->{'diskname'} = $params{'diskname'};
600 $self->{'datestamp'} = $params{'datestamp'};
601 $self->{'level'} = $params{'level'};
602 $self->{'header'} = undef; # no header yet
603 $self->{'orig_kb'} = $params{'orig_kb'};
604 $self->{'input_errors'} = [];
606 if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE &&
607 (my $err = $self->{'scribe'}->check_data_path($params{'data_path'}))) {
608 return $params{'dump_cb'}->(
610 device_errors => [ ['error', "$err"] ],
613 total_duration => 0);
615 $steps->{'process_args'}->();
618 step process_args => sub {
619 # extract the splitting-related parameters, stripping out empty strings
620 my %splitting_args = map {
621 ($params{$_} ne '')? ($_, $params{$_}) : ()
623 dle_tape_splitsize dle_split_diskbuffer dle_fallback_splitsize dle_allow_split
624 part_size part_cache_type part_cache_dir part_cache_max_size
627 # convert numeric values to BigInts
628 for (qw(dle_tape_splitsize dle_fallback_splitsize part_size part_cache_max_size)) {
629 $splitting_args{$_} = Math::BigInt->new($splitting_args{$_})
630 if (exists $splitting_args{$_});
633 my $device = $self->{'scribe'}->get_device();
634 if (!defined $device) {
635 confess "no device is available to create an xfer_dest";
637 $splitting_args{'leom_supported'} = $device->property_get("leom");
638 # and convert those to get_xfer_dest args
639 %get_xfer_dest_args = get_splitting_args_from_config(
641 $get_xfer_dest_args{'max_memory'} = getconf($CNF_DEVICE_OUTPUT_BUFFER_SIZE);
642 if (!getconf_seen($CNF_DEVICE_OUTPUT_BUFFER_SIZE)) {
643 my $block_size4 = $device->block_size * 4;
644 if ($block_size4 > $get_xfer_dest_args{'max_memory'}) {
645 $get_xfer_dest_args{'max_memory'} = $block_size4;
649 $get_xfer_dest_args{'can_cache_inform'} = ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE and $get_xfer_dest_args{'allow_split'});
651 # if we're unable to fulfill the user's splitting needs, we can still give
652 # the dump a shot - but we'll warn them about the problem
653 if ($get_xfer_dest_args{'warning'}) {
654 log_add($L_WARNING, sprintf("%s:%s: %s",
655 $params{'hostname'}, $params{'diskname'},
656 $get_xfer_dest_args{'warning'}));
657 delete $get_xfer_dest_args{'warning'};
660 $steps->{'make_xfer'}->();
663 step make_xfer => sub {
664 $self->_assert_in_state("idle") or return;
665 $self->{'state'} = 'making_xfer';
667 $self->{'xfer_dest'} = $self->{'scribe'}->get_xfer_dest(%get_xfer_dest_args);
670 if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE) {
671 $xfer_source = Amanda::Xfer::Source::DirectTCPListen->new();
673 $xfer_source = Amanda::Xfer::Source::Holding->new($params{'filename'});
675 $self->{'xfer_source'} = $xfer_source;
677 $self->{'xfer'} = Amanda::Xfer->new([$xfer_source, $self->{'xfer_dest'}]);
678 $self->{'xfer'}->start(sub {
679 my ($src, $msg, $xfer) = @_;
680 $self->{'scribe'}->handle_xmsg($src, $msg, $xfer);
682 # if this is an error message that's not from the scribe's element, then
683 # we'll need to keep track of it ourselves
684 if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
685 push @{$self->{'input_errors'}}, $msg->{'message'};
689 # we've started the xfer now, but the destination won't actually write
690 # any data until we call start_dump. And we'll need a header for that.
692 $steps->{'get_header'}->();
695 step get_header => sub {
696 $self->_assert_in_state("making_xfer") or return;
697 $self->{'state'} = 'getting_header';
699 if ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE) {
700 # getting the header is easy for FILE-WRITE..
701 my $hdr = $self->{'header'} = Amanda::Holding::get_header($params{'filename'});
703 # stip out header fields we don't need
704 $hdr->{'cont_filename'} = '';
706 if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
707 confess("Could not read header from '$params{filename}'");
709 $steps->{'start_dump'}->(undef);
711 # ..but quite a bit harder for PORT-WRITE; this method will send the
712 # proper PORT command, then read the header from the dumper and parse
713 # it, placing the result in $self->{'header'}
714 $self->send_port_and_get_header($steps->{'start_dump'});
718 step start_dump => sub {
721 $self->_assert_in_state("getting_header") or return;
722 $self->{'state'} = 'writing';
724 # if $err is set, cancel the dump, treating it as a input error
726 push @{$self->{'input_errors'}}, $err;
727 return $self->{'scribe'}->cancel_dump(
728 xfer => $self->{'xfer'},
729 dump_cb => $params{'dump_cb'});
732 # sanity check the header..
733 my $hdr = $self->{'header'};
734 if ($hdr->{'dumplevel'} != $params{'level'}
735 or $hdr->{'name'} ne $params{'hostname'}
736 or $hdr->{'disk'} ne $params{'diskname'}
737 or $hdr->{'datestamp'} ne $params{'datestamp'}) {
738 confess("Header of dumpfile does not match command from driver");
741 # start producing status
742 $self->create_status_file();
744 # and fix it up before writing it
745 $hdr->{'totalparts'} = -1;
746 $hdr->{'type'} = $Amanda::Header::F_SPLIT_DUMPFILE;
748 $self->{'scribe'}->start_dump(
749 xfer => $self->{'xfer'},
751 dump_cb => $params{'dump_cb'});
759 $self->{'dump_params'} = \%params;
760 $self->{'result'} = $params{'result'};
762 # if we need to the dumper status (to differentiate a dropped network
763 # connection from a normal EOF) and have not done so yet, then send a
764 # DUMPER_STATUS message and re-call this method (dump_cb) with the result.
765 if ($params{'result'} eq "DONE"
766 and $self->{'doing_port_write'}
767 and !exists $self->{'dumper_status'}) {
768 my $controller = $self->{'controller'};
769 my $proto = $controller->{'proto'};
770 my $handle = $self->{'handle'};
771 $proto->send(Amanda::Taper::Protocol::DUMPER_STATUS,
772 handle => "$handle");