Imported Upstream version 3.3.0
[debian/amanda] / server-src / amfetchdump.pl
index 3cf38a002a97ea1e5dfc184941dd29f4bae73877..1e8c21d23fc9cc956d88fd01e57c89ce25499ff7 100644 (file)
@@ -38,12 +38,12 @@ use Amanda::Recovery::Planner;
 use Amanda::Recovery::Clerk;
 use Amanda::Recovery::Scan;
 
-# Interactive package
-package Amanda::Interactive::amfetchdump;
+# Interactivity package
+package Amanda::Interactivity::amfetchdump;
 use POSIX qw( :errno_h );
 use Amanda::MainLoop qw( :GIOCondition );
 use vars qw( @ISA );
-@ISA = qw( Amanda::Interactive );
+@ISA = qw( Amanda::Interactivity );
 
 sub new {
     my $class = shift;
@@ -78,12 +78,12 @@ sub user_request {
        if (!defined $n_read) {
            return if ($! == EINTR);
            $self->abort();
-           return $params{'finished_cb'}->(
+           return $params{'request_cb'}->(
                Amanda::Changer::Error->new('fatal',
                        message => "Fail to read from stdin"));
        } elsif ($n_read == 0) {
            $self->abort();
-           return $params{'finished_cb'}->(
+           return $params{'request_cb'}->(
                Amanda::Changer::Error->new('fatal',
                        message => "Aborted by user"));
        } else {
@@ -93,7 +93,7 @@ sub user_request {
                chomp $line;
                $buffer = "";
                $self->abort();
-               return $params{'finished_cb'}->(undef, $line);
+               return $params{'request_cb'}->(undef, $line);
            }
        }
     };
@@ -229,11 +229,14 @@ sub clerk_notif_holding {
 
 package main;
 
+use Amanda::MainLoop qw( :GIOCondition );
 sub main {
     my ($finished_cb) = @_;
     my $current_dump;
     my $plan;
     my @xfer_errs;
+    my %all_filter;
+    my $fetch_done;
 
     my $steps = define_steps
        cb_ref => \$finished_cb;
@@ -248,7 +251,7 @@ sub main {
            return failure("Cannot chdir to $destdir: $!", $finished_cb);
        }
 
-       my $interactive = Amanda::Interactive::amfetchdump->new();
+       my $interactivity = Amanda::Interactivity::amfetchdump->new();
        # if we have an explicit device, then the clerk doesn't get a changer --
        # we operate the changer via Amanda::Recovery::Scan
        if (defined $opt_device) {
@@ -256,14 +259,14 @@ sub main {
            return failure($chg, $finished_cb) if $chg->isa("Amanda::Changer::Error");
            my $scan = Amanda::Recovery::Scan->new(
                                chg => $chg,
-                               interactive => $interactive);
+                               interactivity => $interactivity);
            return failure($scan, $finished_cb) if $scan->isa("Amanda::Changer::Error");
            $clerk = Amanda::Recovery::Clerk->new(
                feedback => main::Feedback->new($chg, $opt_device),
                scan     => $scan);
        } else {
            my $scan = Amanda::Recovery::Scan->new(
-                               interactive => $interactive);
+                               interactivity => $interactivity);
            return failure($scan, $finished_cb) if $scan->isa("Amanda::Changer::Error");
 
            $clerk = Amanda::Recovery::Clerk->new(
@@ -363,11 +366,11 @@ sub main {
            if ($hdr->{'srv_encrypt'}) {
                push @filters,
                    Amanda::Xfer::Filter::Process->new(
-                       [ $hdr->{'srv_encrypt'}, $hdr->{'srv_decrypt_opt'} ], 0, 0);
+                       [ $hdr->{'srv_encrypt'}, $hdr->{'srv_decrypt_opt'} ], 0);
            } elsif ($hdr->{'clnt_encrypt'}) {
                push @filters,
                    Amanda::Xfer::Filter::Process->new(
-                       [ $hdr->{'clnt_encrypt'}, $hdr->{'clnt_decrypt_opt'} ], 0, 0);
+                       [ $hdr->{'clnt_encrypt'}, $hdr->{'clnt_decrypt_opt'} ], 0);
            } else {
                return failure("could not decrypt encrypted dump: no program specified",
                            $finished_cb);
@@ -388,17 +391,17 @@ sub main {
                # TODO: this assumes that srvcompprog takes "-d" to decrypt
                push @filters,
                    Amanda::Xfer::Filter::Process->new(
-                       [ $hdr->{'srvcompprog'}, "-d" ], 0, 0);
+                       [ $hdr->{'srvcompprog'}, "-d" ], 0);
            } elsif ($hdr->{'clntcompprog'}) {
                # TODO: this assumes that clntcompprog takes "-d" to decrypt
                push @filters,
                    Amanda::Xfer::Filter::Process->new(
-                       [ $hdr->{'clntcompprog'}, "-d" ], 0, 0);
+                       [ $hdr->{'clntcompprog'}, "-d" ], 0);
            } else {
                push @filters,
                    Amanda::Xfer::Filter::Process->new(
                        [ $Amanda::Constants::UNCOMPRESS_PATH,
-                         $Amanda::Constants::UNCOMPRESS_OPT ], 0, 0);
+                         $Amanda::Constants::UNCOMPRESS_OPT ], 0);
            }
 
            # adjust the header
@@ -413,7 +416,7 @@ sub main {
            push @filters,
                Amanda::Xfer::Filter::Process->new(
                    [ $Amanda::Constants::COMPRESS_PATH,
-                     $compress_opt ], 0, 0);
+                     $compress_opt ], 0);
 
            # adjust the header
            $hdr->{'compressed'} = 1;
@@ -436,8 +439,42 @@ sub main {
            syswrite $hdr_fh, $hdr->to_string(32768, 32768), 32768;
        }
 
+       # start reading all filter stderr
+       foreach my $filter (@filters) {
+           my $fd = $filter->get_stderr_fd();
+           $fd.="";
+           $fd = int($fd);
+           my $src = Amanda::MainLoop::fd_source($fd,
+                                                $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
+           my $buffer = "";
+           $all_filter{$src} = 1;
+           $src->set_callback( sub {
+               my $b;
+               my $n_read = POSIX::read($fd, $b, 1);
+               if (!defined $n_read) {
+                   return;
+               } elsif ($n_read == 0) {
+                   delete $all_filter{$src};
+                   $src->remove();
+                   POSIX::close($fd);
+                   if (!%all_filter and $fetch_done) {
+                       $finished_cb->();
+                   }
+               } else {
+                   $buffer .= $b;
+                   if ($b eq "\n") {
+                       my $line = $buffer;
+                       print STDERR "filter stderr: $line";
+                       chomp $line;
+                       debug("filter stderr: $line");
+                       $buffer = "";
+                   }
+               }
+           });
+       }
+
        my $xfer = Amanda::Xfer->new([ $xfer_src, @filters, $xfer_dest ]);
-       $xfer->start($steps->{'handle_xmsg'});
+       $xfer->start($steps->{'handle_xmsg'}, 0, $current_dump->{'bytes'});
        $clerk->start_recovery(
            xfer => $xfer,
            recovery_cb => $steps->{'recovery_cb'});
@@ -480,7 +517,11 @@ sub main {
 
        return failure($err, $finished_cb) if $err;
 
-       $finished_cb->();
+#do all filter are done reading stderr
+       $fetch_done = 1;
+        if (!%all_filter) {
+           $finished_cb->();
+       }
     };
 }