use Amanda::Recovery::Clerk;
use Amanda::Recovery::Scan;
-# Interactive package
-package Amanda::Interactive::amfetchdump;
+# Interactivity package
+package Amanda::Interactivity::amfetchdump;
use POSIX qw( :errno_h );
use Amanda::MainLoop qw( :GIOCondition );
use vars qw( @ISA );
-@ISA = qw( Amanda::Interactive );
+@ISA = qw( Amanda::Interactivity );
sub new {
my $class = shift;
if (!defined $n_read) {
return if ($! == EINTR);
$self->abort();
- return $params{'finished_cb'}->(
+ return $params{'request_cb'}->(
Amanda::Changer::Error->new('fatal',
message => "Fail to read from stdin"));
} elsif ($n_read == 0) {
$self->abort();
- return $params{'finished_cb'}->(
+ return $params{'request_cb'}->(
Amanda::Changer::Error->new('fatal',
message => "Aborted by user"));
} else {
chomp $line;
$buffer = "";
$self->abort();
- return $params{'finished_cb'}->(undef, $line);
+ return $params{'request_cb'}->(undef, $line);
}
}
};
package main;
+use Amanda::MainLoop qw( :GIOCondition );
sub main {
my ($finished_cb) = @_;
my $current_dump;
my $plan;
my @xfer_errs;
+ my %all_filter;
+ my $fetch_done;
my $steps = define_steps
cb_ref => \$finished_cb;
return failure("Cannot chdir to $destdir: $!", $finished_cb);
}
- my $interactive = Amanda::Interactive::amfetchdump->new();
+ my $interactivity = Amanda::Interactivity::amfetchdump->new();
# if we have an explicit device, then the clerk doesn't get a changer --
# we operate the changer via Amanda::Recovery::Scan
if (defined $opt_device) {
return failure($chg, $finished_cb) if $chg->isa("Amanda::Changer::Error");
my $scan = Amanda::Recovery::Scan->new(
chg => $chg,
- interactive => $interactive);
+ interactivity => $interactivity);
return failure($scan, $finished_cb) if $scan->isa("Amanda::Changer::Error");
$clerk = Amanda::Recovery::Clerk->new(
feedback => main::Feedback->new($chg, $opt_device),
scan => $scan);
} else {
my $scan = Amanda::Recovery::Scan->new(
- interactive => $interactive);
+ interactivity => $interactivity);
return failure($scan, $finished_cb) if $scan->isa("Amanda::Changer::Error");
$clerk = Amanda::Recovery::Clerk->new(
if ($hdr->{'srv_encrypt'}) {
push @filters,
Amanda::Xfer::Filter::Process->new(
- [ $hdr->{'srv_encrypt'}, $hdr->{'srv_decrypt_opt'} ], 0, 0);
+ [ $hdr->{'srv_encrypt'}, $hdr->{'srv_decrypt_opt'} ], 0);
} elsif ($hdr->{'clnt_encrypt'}) {
push @filters,
Amanda::Xfer::Filter::Process->new(
- [ $hdr->{'clnt_encrypt'}, $hdr->{'clnt_decrypt_opt'} ], 0, 0);
+ [ $hdr->{'clnt_encrypt'}, $hdr->{'clnt_decrypt_opt'} ], 0);
} else {
return failure("could not decrypt encrypted dump: no program specified",
$finished_cb);
# TODO: this assumes that srvcompprog takes "-d" to decrypt
push @filters,
Amanda::Xfer::Filter::Process->new(
- [ $hdr->{'srvcompprog'}, "-d" ], 0, 0);
+ [ $hdr->{'srvcompprog'}, "-d" ], 0);
} elsif ($hdr->{'clntcompprog'}) {
# TODO: this assumes that clntcompprog takes "-d" to decrypt
push @filters,
Amanda::Xfer::Filter::Process->new(
- [ $hdr->{'clntcompprog'}, "-d" ], 0, 0);
+ [ $hdr->{'clntcompprog'}, "-d" ], 0);
} else {
push @filters,
Amanda::Xfer::Filter::Process->new(
[ $Amanda::Constants::UNCOMPRESS_PATH,
- $Amanda::Constants::UNCOMPRESS_OPT ], 0, 0);
+ $Amanda::Constants::UNCOMPRESS_OPT ], 0);
}
# adjust the header
push @filters,
Amanda::Xfer::Filter::Process->new(
[ $Amanda::Constants::COMPRESS_PATH,
- $compress_opt ], 0, 0);
+ $compress_opt ], 0);
# adjust the header
$hdr->{'compressed'} = 1;
syswrite $hdr_fh, $hdr->to_string(32768, 32768), 32768;
}
+ # start reading all filter stderr
+ foreach my $filter (@filters) {
+ my $fd = $filter->get_stderr_fd();
+ $fd.="";
+ $fd = int($fd);
+ my $src = Amanda::MainLoop::fd_source($fd,
+ $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
+ my $buffer = "";
+ $all_filter{$src} = 1;
+ $src->set_callback( sub {
+ my $b;
+ my $n_read = POSIX::read($fd, $b, 1);
+ if (!defined $n_read) {
+ return;
+ } elsif ($n_read == 0) {
+ delete $all_filter{$src};
+ $src->remove();
+ POSIX::close($fd);
+ if (!%all_filter and $fetch_done) {
+ $finished_cb->();
+ }
+ } else {
+ $buffer .= $b;
+ if ($b eq "\n") {
+ my $line = $buffer;
+ print STDERR "filter stderr: $line";
+ chomp $line;
+ debug("filter stderr: $line");
+ $buffer = "";
+ }
+ }
+ });
+ }
+
my $xfer = Amanda::Xfer->new([ $xfer_src, @filters, $xfer_dest ]);
- $xfer->start($steps->{'handle_xmsg'});
+ $xfer->start($steps->{'handle_xmsg'}, 0, $current_dump->{'bytes'});
$clerk->start_recovery(
xfer => $xfer,
recovery_cb => $steps->{'recovery_cb'});
return failure($err, $finished_cb) if $err;
- $finished_cb->();
+#do all filter are done reading stderr
+ $fetch_done = 1;
+ if (!%all_filter) {
+ $finished_cb->();
+ }
};
}