Merge tag 'upstream/3.3.3'
[debian/amanda] / perl / Amanda / Taper / Worker.pm
index 9e23fd9ac03c76449e4e2b8ee066d7730b9fcfdc..74ca33287ad53155572dc9d22b495b0d827c81a6 100644 (file)
@@ -1,9 +1,9 @@
-#! @PERL@
-# Copyright (c) 2009, 2010 Zmanda Inc.  All Rights Reserved.
+# Copyright (c) 2009-2012 Zmanda Inc.  All Rights Reserved.
 #
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the GNU General Public License version 2 as published
-# by the Free Software Foundation.
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
 #
 # This program is distributed in the hope that it will be useful, but
 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -39,9 +39,11 @@ use warnings;
 
 package Amanda::Taper::Worker;
 
+use Carp;
 use POSIX qw( :errno_h );
 use Amanda::Changer;
 use Amanda::Config qw( :getconf config_dir_relative );
+use Amanda::Debug qw( :logging );
 use Amanda::Header;
 use Amanda::Holding;
 use Amanda::MainLoop qw( :GIOCondition );
@@ -100,7 +102,8 @@ sub new {
     my $scribe = Amanda::Taper::Scribe->new(
        taperscan => $controller->{'taperscan'},
        feedback => $self,
-       debug => $Amanda::Config::debug_taper);
+       debug => $Amanda::Config::debug_taper,
+       eject_volume => getconf($CNF_EJECT_VOLUME));
 
     $self->{'scribe'} = $scribe;
     $self->{'scribe'}->start(write_timestamp => $write_timestamp,
@@ -220,7 +223,16 @@ sub DONE {
     my $self = shift;
     my ($msgtype, %params) = @_;
 
-    $self->_assert_in_state("writing") or return;
+    if ($params{'handle'} ne $self->{'handle'}) {
+       # ignore message for previous handle
+       return;
+    }
+
+    if (defined $self->{'dumper_status'}) {
+       # ignore duplicate message
+       return
+    }
+
     $self->{'dumper_status'} = "DONE";
     $self->{'orig_kb'} = $params{'orig_kb'};
     if (defined $self->{'result'}) {
@@ -232,14 +244,42 @@ sub FAILED {
     my $self = shift;
     my ($msgtype, %params) = @_;
 
-    $self->_assert_in_state("writing") or return;
+    if ($params{'handle'} ne $self->{'handle'}) {
+       # ignore message for previous handle
+       return;
+    }
+
+    if (defined $self->{'dumper_status'}) {
+       # ignore duplicate message
+       return
+    }
 
     $self->{'dumper_status'} = "FAILED";
-    if (defined $self->{'result'}) {
+    if (defined $self->{'header_xfer'}) {
+       $self->{'header_xfer'}->cancel();
+    } elsif (defined $self->{'result'}) {
        $self->result_cb(undef);
+    } elsif (!defined $self->{'scribe'}->{'xdt'}) {
+       # ignore, the dump is already cancelled or not yet started.
+    } elsif (!defined $self->{'scribe'}->{'xfer'}) {
+       # ignore, the dump is already cancelled or not yet started.
+    } else { # Abort the dump
+       push @{$self->{'input_errors'}}, "dumper failed";
+       $self->{'scribe'}->cancel_dump(
+               xfer => $self->{'scribe'}->{'xfer'},
+               dump_cb => $self->{'dump_cb'});
     }
 }
 
+sub CLOSE_VOLUME {
+    my $self = shift;
+    my ($msgtype, %params) = @_;
+
+    $self->_assert_in_state("idle") or return;
+
+    $self->{'scribe'}->close_volume();
+}
+
 sub result_cb {
     my $self = shift;
     my %params = %{$self->{'dump_params'}};
@@ -247,7 +287,7 @@ sub result_cb {
     my $logtype;
 
     if ($params{'result'} eq 'DONE') {
-       if (!$self->{'doing_port_write'} or $self->{'dumper_status'} eq "DONE") {
+       if ($self->{'dumper_status'} eq "DONE") {
            $msgtype = Amanda::Taper::Protocol::DONE;
            $logtype = $L_DONE;
        } else {
@@ -323,7 +363,7 @@ sub result_cb {
        $msg_params{'taper'} = 'TAPE-ERROR';
        $msg_params{'tapererr'} = join("; ", @{$params{'device_errors'}});
     } elsif ($params{'config_denial_message'}) {
-       $msg_params{'taper'} = 'TAPE-ERROR';
+       $msg_params{'taper'} = 'TAPE-CONFIG';
        $msg_params{'tapererr'} = $params{'config_denial_message'};
     } else {
        $msg_params{'taper'} = 'TAPE-GOOD';
@@ -373,6 +413,8 @@ sub scribe_notif_new_tape {
 
     # TODO: if $params{error} is set, report it back to the driver
     # (this will be a change to the protocol)
+    log_add($L_INFO, "$params{'error'}") if defined $params{'error'};
+
     if ($params{'volume_label'}) {
        $self->{'label'} = $params{'volume_label'};
 
@@ -436,7 +478,8 @@ sub scribe_notif_log_info {
     my $self = shift;
     my %params = @_;
 
-    log_add($L_INFO, $params{'message'});
+    debug("$params{'message'}");
+    log_add($L_INFO, "$params{'message'}");
 }
 
 ##
@@ -475,7 +518,7 @@ sub create_status_file {
     $self->{timer}->set_callback(sub {
        my $size = $self->{scribe}->get_bytes_written();
        seek $self->{status_fh}, 0, 0;
-       print {$self->{status_fh}} $size;
+       print {$self->{status_fh}} $size, '     ';
        $self->{status_fh}->flush();
     });
 }
@@ -484,7 +527,6 @@ sub send_port_and_get_header {
     my $self = shift;
     my ($finished_cb) = @_;
 
-    my $header_xfer;
     my ($xsrc, $xdst);
     my $errmsg;
 
@@ -504,13 +546,10 @@ sub send_port_and_get_header {
        ($xsrc, $xdst) = (
            Amanda::Xfer::Source::DirectTCPListen->new(),
            Amanda::Xfer::Dest::Buffer->new(0));
-       $header_xfer = Amanda::Xfer->new([$xsrc, $xdst]);
-       $header_xfer->start($steps->{'header_xfer_xmsg_cb'});
+       $self->{'header_xfer'} = Amanda::Xfer->new([$xsrc, $xdst]);
+       $self->{'header_xfer'}->start($steps->{'header_xfer_xmsg_cb'});
 
        my $header_addrs = $xsrc->get_addrs();
-       $header_addrs = [ grep { $_->[0] eq '127.0.0.1' } @$header_addrs ];
-       die "Source::DirectTCPListen did not return a localhost address"
-           unless @$header_addrs;
        my $header_port = $header_addrs->[0][1];
 
        # and tell the driver which ports we're listening on
@@ -526,7 +565,7 @@ sub send_port_and_get_header {
        if ($xmsg->{'type'} == $XMSG_INFO) {
            info($xmsg->{'message'});
        } elsif ($xmsg->{'type'} == $XMSG_ERROR) {
-           $errmsg = $xmsg->{'messsage'};
+           $errmsg = $xmsg->{'message'};
        } elsif ($xmsg->{'type'} == $XMSG_DONE) {
            if ($errmsg) {
                $finished_cb->($errmsg);
@@ -540,7 +579,7 @@ sub send_port_and_get_header {
        my $hdr_buf = $xdst->get();
 
        # close stuff up
-       $header_xfer = $xsrc = $xdst = undef;
+       $self->{'header_xfer'} = $xsrc = $xdst = undef;
 
        if (!defined $hdr_buf) {
            return $finished_cb->("Got empty header");
@@ -560,6 +599,8 @@ sub setup_and_start_dump {
     my ($msgtype, %params) = @_;
     my %get_xfer_dest_args;
 
+    $self->{'dump_cb'} = $params{'dump_cb'};
+
     # setting up the dump is a bit complex, due to the requirements of
     # a directtcp port_write.  This function:
     # 1. creates and starts a transfer (make_xfer)
@@ -594,10 +635,10 @@ sub setup_and_start_dump {
     step process_args => sub {
        # extract the splitting-related parameters, stripping out empty strings
        my %splitting_args = map {
-           ($params{$_} ne '')? ($_, $params{$_}) : ()
+           (defined $params{$_} && $params{$_} ne '')? ($_, $params{$_}) : ()
        } qw(
            dle_tape_splitsize dle_split_diskbuffer dle_fallback_splitsize dle_allow_split
-           part_size part_cache_type part_cache_dir part_cache_max_size
+           part_size part_cache_type part_cache_dir part_cache_max_size data_path
        );
 
        # convert numeric values to BigInts
@@ -606,11 +647,23 @@ sub setup_and_start_dump {
                if (exists $splitting_args{$_});
        }
 
+       my $device = $self->{'scribe'}->get_device();
+       if (!defined $device) {
+           confess "no device is available to create an xfer_dest";
+       }
+       $splitting_args{'leom_supported'} = $device->property_get("leom");
        # and convert those to get_xfer_dest args
         %get_xfer_dest_args = get_splitting_args_from_config(
                %splitting_args);
        $get_xfer_dest_args{'max_memory'} = getconf($CNF_DEVICE_OUTPUT_BUFFER_SIZE);
-       $get_xfer_dest_args{'can_cache_inform'} = ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE);
+       if (!getconf_seen($CNF_DEVICE_OUTPUT_BUFFER_SIZE)) {
+           my $block_size4 = $device->block_size * 4;
+           if ($block_size4 > $get_xfer_dest_args{'max_memory'}) {
+               $get_xfer_dest_args{'max_memory'} = $block_size4;
+           }
+       }
+       $device = undef;
+       $get_xfer_dest_args{'can_cache_inform'} = ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE and $get_xfer_dest_args{'allow_split'});
 
        # if we're unable to fulfill the user's splitting needs, we can still give
        # the dump a shot - but we'll warn them about the problem
@@ -664,12 +717,19 @@ sub setup_and_start_dump {
            # getting the header is easy for FILE-WRITE..
            my $hdr = $self->{'header'} = Amanda::Holding::get_header($params{'filename'});
 
+           if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
+               confess("Could not read header from '$params{filename}'");
+           }
+
            # stip out header fields we don't need
            $hdr->{'cont_filename'} = '';
 
-           if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
-               die("Could not read header from '$params{filename}'");
+           if ($self->{'header'}->{'is_partial'}) {
+               $self->{'dumper_status'} = "FAILED";
+           } else {
+               $self->{'dumper_status'} = "DONE";
            }
+
            $steps->{'start_dump'}->(undef);
        } else {
            # ..but quite a bit harder for PORT-WRITE; this method will send the
@@ -685,6 +745,11 @@ sub setup_and_start_dump {
         $self->_assert_in_state("getting_header") or return;
         $self->{'state'} = 'writing';
 
+       # abort if we already got a device_errors
+       if (@{$self->{'scribe'}->{'device_errors'}}) {
+           $self->{'scribe'}->abort_setup(dump_cb => $params{'dump_cb'});
+           return;
+       }
         # if $err is set, cancel the dump, treating it as a input error
         if ($err) {
            push @{$self->{'input_errors'}}, $err;
@@ -699,7 +764,7 @@ sub setup_and_start_dump {
             or $hdr->{'name'} ne $params{'hostname'}
             or $hdr->{'disk'} ne $params{'diskname'}
            or $hdr->{'datestamp'} ne $params{'datestamp'}) {
-            die("Header of dumpfile does not match command from driver");
+            confess("Header of dumpfile does not match command from driver");
         }
 
        # start producing status