lintian doesn't like orphan packages with uploaders...
[debian/amanda] / server-src / amfetchdump.pl
index 11a6bc1b924e44c2db4dec11119569c5aa63d696..df12800e7f605a8376dd06656c0ea5a29854f0b7 100644 (file)
@@ -1,9 +1,10 @@
 #! @PERL@
-# Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+# Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
 #
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the GNU General Public License version 2 as published
-# by the Free Software Foundation.
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
 #
 # This program is distributed in the hope that it will be useful, but
 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -22,6 +23,9 @@ use strict;
 use warnings;
 
 use Getopt::Long;
+use File::Basename;
+use XML::Simple;
+use IPC::Open3;
 
 use Amanda::Device qw( :constants );
 use Amanda::Debug qw( :logging );
@@ -37,13 +41,14 @@ use Amanda::Xfer qw( :constants );
 use Amanda::Recovery::Planner;
 use Amanda::Recovery::Clerk;
 use Amanda::Recovery::Scan;
+use Amanda::Extract;
 
-# Interactive package
-package Amanda::Interactive::amfetchdump;
+# Interactivity package
+package Amanda::Interactivity::amfetchdump;
 use POSIX qw( :errno_h );
 use Amanda::MainLoop qw( :GIOCondition );
 use vars qw( @ISA );
-@ISA = qw( Amanda::Interactive );
+@ISA = qw( Amanda::Interactivity );
 
 sub new {
     my $class = shift;
@@ -78,12 +83,12 @@ sub user_request {
        if (!defined $n_read) {
            return if ($! == EINTR);
            $self->abort();
-           return $params{'finished_cb'}->(
+           return $params{'request_cb'}->(
                Amanda::Changer::Error->new('fatal',
                        message => "Fail to read from stdin"));
        } elsif ($n_read == 0) {
            $self->abort();
-           return $params{'finished_cb'}->(
+           return $params{'request_cb'}->(
                Amanda::Changer::Error->new('fatal',
                        message => "Aborted by user"));
        } else {
@@ -93,7 +98,7 @@ sub user_request {
                chomp $line;
                $buffer = "";
                $self->abort();
-               return $params{'finished_cb'}->(undef, $line);
+               return $params{'request_cb'}->(undef, $line);
            }
        }
     };
@@ -112,10 +117,14 @@ package main;
 sub usage {
     my ($msg) = @_;
     print STDERR <<EOF;
-Usage: amfetchdump config [-c|-C|-l] [-p|-n] [-a]
-    [-O directory] [-d device] [-o configoption]*
-    [--header-file file] [--header-fd fd]
-    hostname [diskname [datestamp [hostname [diskname [datestamp ... ]]]]]"));
+Usage: amfetchdump [-c|-C|-l] [-p|-n] [-a] [-O directory] [-d device]
+    [-h|--header-file file|--header-fd fd]
+    [-decrypt|--no-decrypt|--server-decrypt|--client-decrypt]
+    [--decompress|--no-decompress|--server-decompress|--client-decompress]
+    [--extract --directory directory [--data-path (amanda|directtcp)]
+    [--application-property='NAME=VALUE']*]
+    [-o configoption]* [--exact-match] config
+    hostname [diskname [datestamp [hostname [diskname [datestamp ... ]]]]]
 EOF
     print STDERR "ERROR: $msg\n" if $msg;
     exit(1);
@@ -130,7 +139,20 @@ my $config_overrides = new_config_overrides($#ARGV+1);
 
 my ($opt_config, $opt_no_reassembly, $opt_compress, $opt_compress_best, $opt_pipe,
     $opt_assume, $opt_leave, $opt_blocksize, $opt_device, $opt_chdir, $opt_header,
-    $opt_header_file, $opt_header_fd, @opt_dumpspecs);
+    $opt_header_file, $opt_header_fd, @opt_dumpspecs,
+    $opt_decrypt, $opt_server_decrypt, $opt_client_decrypt,
+    $opt_decompress, $opt_server_decompress, $opt_client_decompress,
+    $opt_extract, $opt_directory, $opt_data_path, %application_property,
+    $opt_exact_match);
+
+my $NEVER = 0;
+my $ALWAYS = 1;
+my $ONLY_SERVER = 2;
+my $ONLY_CLIENT = 3;
+my $decrypt;
+my $decompress;
+
+debug("Arguments: " . join(' ', @ARGV));
 Getopt::Long::Configure(qw(bundling));
 GetOptions(
     'version' => \&Amanda::Util::version_opt,
@@ -144,6 +166,17 @@ GetOptions(
     'h' => \$opt_header,
     'header-file=s' => \$opt_header_file,
     'header-fd=i' => \$opt_header_fd,
+    'decrypt!' => \$opt_decrypt,
+    'server-decrypt' => \$opt_server_decrypt,
+    'client-decrypt' => \$opt_client_decrypt,
+    'decompress!' => \$opt_decompress,
+    'server-decompress' => \$opt_server_decompress,
+    'client-decompress' => \$opt_client_decompress,
+    'extract' => \$opt_extract,
+    'directory=s' => \$opt_directory,
+    'data-path=s' => \$opt_data_path,
+    'application-property=s' => \%application_property,
+    'exact-match' => \$opt_exact_match,
     'b=s' => \$opt_blocksize,
     'd=s' => \$opt_device,
     'O=s' => \$opt_chdir,
@@ -152,11 +185,10 @@ GetOptions(
 usage() unless (@ARGV);
 $opt_config = shift @ARGV;
 
-$opt_compress = 1 if $opt_compress_best;
-
-usage("must specify at least a hostname") unless @ARGV;
-@opt_dumpspecs = Amanda::Cmdline::parse_dumpspecs([@ARGV],
-    $Amanda::Cmdline::CMDLINE_PARSE_DATESTAMP);
+if (defined $opt_compress and defined $opt_compress_best) {
+    print STDERR "Can't use -c and -C\n";
+    usage();
+}
 
 usage("The -b option is no longer supported; set readblocksize in the tapetype section\n" .
       "of amanda.conf instead.")
@@ -166,9 +198,123 @@ usage("-l is not compatible with -c or -C")
 usage("-p is not compatible with -n")
     if ($opt_leave and $opt_no_reassembly);
 usage("-h, --header-file, and --header-fd are mutually incompatible")
-    if (($opt_header and $opt_header_file or $opt_header_fd)
+    if (($opt_header and ($opt_header_file or $opt_header_fd))
            or ($opt_header_file and $opt_header_fd));
 
+     $opt_data_path = lc($opt_data_path) if defined ($opt_data_path);
+usage("--data_path must be 'amanda' or 'directtcp'")
+    if (defined $opt_data_path and $opt_data_path ne 'directtcp' and $opt_data_path ne 'amanda');
+
+if (defined $opt_leave) {
+    if (defined $opt_decrypt and $opt_decrypt) {
+       print STDERR "-l is incompatible with --decrypt\n";
+       usage();
+    }
+    if (defined $opt_server_decrypt) {
+       print STDERR "-l is incompatible with --server-decrypt\n";
+       usage();
+    }
+    if (defined $opt_client_decrypt) {
+       print STDERR "-l is incompatible with --client-decrypt\n";
+       usage();
+    }
+    if (defined $opt_decompress and $opt_decompress) {
+       print STDERR "-l is incompatible with --decompress\n";
+       usage();
+    }
+    if (defined $opt_server_decompress) {
+       print STDERR "-l is incompatible with --server-decompress\n";
+       usage();
+    }
+    if (defined $opt_client_decompress) {
+       print STDERR "-l is incompatible with --client-decompress\n";
+       usage();
+    }
+}
+
+if (( defined $opt_directory and !defined $opt_extract) or
+    (!defined $opt_directory and  defined $opt_extract)) {
+    print STDERR "Both --directorty and --extract must be set\n";
+    usage();
+}
+if (defined $opt_directory and defined $opt_extract) {
+    $opt_decrypt = 1;
+    if (defined $opt_server_decrypt or defined $opt_client_decrypt) {
+       print STDERR "--server_decrypt or --client-decrypt is incompatible with --extract\n";
+       usage();
+    }
+    $opt_decompress = 1;
+    if (defined $opt_server_decompress || defined $opt_client_decompress) {
+       print STDERR "--server-decompress r --client-decompress is incompatible with --extract\n";
+       usage();
+    }
+    if (defined($opt_leave) +
+       defined($opt_compress) +
+       defined($opt_compress_best)) {
+       print STDERR "Can't use -l -c or -C with --extract\n";
+       usage();
+    }
+    if (defined $opt_pipe) {
+       print STDERR "--pipe is incompatible with --extract\n";
+       usage();
+    }
+    if (defined $opt_header) {
+       print STDERR "--header is incompatible with --extract\n";
+       usage();
+    }
+}
+
+if (defined($opt_decrypt) +
+    defined($opt_server_decrypt) +
+    defined($opt_client_decrypt) > 1) {
+    print STDERR "Can't use only on of --decrypt, --no-decrypt, --server-decrypt or --client-decrypt\n";
+    usage();
+}
+if (defined($opt_decompress) +
+    defined($opt_server_decompress) +
+    defined($opt_client_decompress) > 1) {
+    print STDERR "Can't use only on of --decompress, --no-decompress, --server-decompress or --client-decompress\n";
+    usage();
+}
+
+if (defined($opt_compress) and
+    defined($opt_decompress) +
+    defined($opt_server_decompress) +
+    defined($opt_client_decompress) > 0) {
+    print STDERR "Can't specify -c with one of --decompress, --no-decompress, --server-decompress or --client-decompress\n";
+    usage();
+}
+if (defined($opt_compress_best) and
+    defined($opt_decompress) +
+    defined($opt_server_decompress) +
+    defined($opt_client_decompress) > 0) {
+    print STDERR "Can't specify -C with one of --decompress, --no-decompress, --server-decompress or --client-decompress\n";
+    usage();
+}
+
+$decompress = $ALWAYS;
+$decrypt = $ALWAYS;
+$decrypt = $NEVER  if defined $opt_leave;
+$decrypt = $NEVER  if defined $opt_decrypt and !$opt_decrypt;
+$decrypt = $ALWAYS if defined $opt_decrypt and $opt_decrypt;
+$decrypt = $ONLY_SERVER if defined $opt_server_decrypt;
+$decrypt = $ONLY_CLIENT if defined $opt_client_decrypt;
+
+$opt_compress = 1 if $opt_compress_best;
+
+$decompress = $NEVER  if defined $opt_compress;
+$decompress = $NEVER  if defined $opt_leave;
+$decompress = $NEVER  if defined $opt_decompress and !$opt_decompress;
+$decompress = $ALWAYS if defined $opt_decompress and $opt_decompress;
+$decompress = $ONLY_SERVER if defined $opt_server_decompress;
+$decompress = $ONLY_CLIENT if defined $opt_client_decompress;
+
+usage("must specify at least a hostname") unless @ARGV;
+my $cmd_flags = $Amanda::Cmdline::CMDLINE_PARSE_DATESTAMP |
+               $Amanda::Cmdline::CMDLINE_PARSE_LEVEL;
+$cmd_flags |= $Amanda::Cmdline::CMDLINE_EXACT_MATCH if $opt_exact_match;
+@opt_dumpspecs = Amanda::Cmdline::parse_dumpspecs([@ARGV], $cmd_flags);
+
 set_config_overrides($config_overrides);
 config_init($CONFIG_INIT_EXPLICIT_NAME, $opt_config);
 my ($cfgerr_level, @cfgerr_errors) = config_errors();
@@ -183,9 +329,11 @@ Amanda::Util::finish_setup($RUNNING_AS_DUMPUSER);
 
 my $exit_status = 0;
 my $clerk;
+use Data::Dumper;
 sub failure {
     my ($msg, $finished_cb) = @_;
     print STDERR "ERROR: $msg\n";
+    debug("FAILURE: $msg");
     $exit_status = 1;
     if ($clerk) {
        $clerk->quit(finished_cb => sub {
@@ -204,36 +352,48 @@ use Amanda::MainLoop;
 
 sub new {
     my $class = shift;
-    my ($chg, $dev_name) = @_;
+    my ($chg, $dev_name, $is_tty) = @_;
 
     return bless {
        chg => $chg,
        dev_name => $dev_name,
+       is_tty => $is_tty,
     }, $class;
 }
 
-sub notif_part {
+sub clerk_notif_part {
     my $self = shift;
     my ($label, $filenum, $header) = @_;
 
+    print STDERR "\n" if $self->{'is_tty'};
     print STDERR "amfetchdump: $filenum: restoring ", $header->summary(), "\n";
 }
 
-sub notif_holding {
+sub clerk_notif_holding {
     my $self = shift;
     my ($filename, $header) = @_;
 
     # this used to give the fd from which the holding file was being read.. why??
+    print STDERR "\n" if $self->{'is_tty'};
     print STDERR "Reading '$filename'\n", $header->summary(), "\n";
 }
 
 package main;
 
+use Amanda::MainLoop qw( :GIOCondition );
 sub main {
     my ($finished_cb) = @_;
     my $current_dump;
     my $plan;
     my @xfer_errs;
+    my %all_filter;
+    my $recovery_done;
+    my %recovery_params;
+    my $timer;
+    my $is_tty;
+    my $delay;
+    my $directtcp = 0;
+    my @directtcp_command;
 
     my $steps = define_steps
        cb_ref => \$finished_cb;
@@ -248,7 +408,14 @@ sub main {
            return failure("Cannot chdir to $destdir: $!", $finished_cb);
        }
 
-       my $interactive = Amanda::Interactive::amfetchdump->new();
+       $is_tty = -t STDERR;
+       if($is_tty) {
+           $delay = 1000; # 1 second
+       } else {
+           $delay = 5000; # 5 seconds
+       }
+
+       my $interactivity = Amanda::Interactivity::amfetchdump->new();
        # if we have an explicit device, then the clerk doesn't get a changer --
        # we operate the changer via Amanda::Recovery::Scan
        if (defined $opt_device) {
@@ -256,19 +423,19 @@ sub main {
            return failure($chg, $finished_cb) if $chg->isa("Amanda::Changer::Error");
            my $scan = Amanda::Recovery::Scan->new(
                                chg => $chg,
-                               interactive => $interactive);
+                               interactivity => $interactivity);
            return failure($scan, $finished_cb) if $scan->isa("Amanda::Changer::Error");
            $clerk = Amanda::Recovery::Clerk->new(
-               feedback => main::Feedback->new($chg, $opt_device),
+               feedback => main::Feedback->new($chg, $opt_device, $is_tty),
                scan     => $scan);
        } else {
            my $scan = Amanda::Recovery::Scan->new(
-                               interactive => $interactive);
+                               interactivity => $interactivity);
            return failure($scan, $finished_cb) if $scan->isa("Amanda::Changer::Error");
 
            $clerk = Amanda::Recovery::Clerk->new(
                changer => $chg,
-               feedback => main::Feedback->new($chg, undef),
+               feedback => main::Feedback->new($chg, undef, $is_tty),
                scan     => $scan);
        }
 
@@ -288,6 +455,12 @@ sub main {
            return failure("No matching dumps found", $finished_cb);
        }
 
+       # if we are doing a -p operation, only keep the first dump
+       if ($opt_pipe) {
+           print STDERR "WARNING: Fetch first dump only because of -p argument\n" if @{$plan->{'dumps'}} > 1;
+           @{$plan->{'dumps'}} = ($plan->{'dumps'}[0]);
+       }
+
        my @needed_labels = $plan->get_volume_list();
        my @needed_holding = $plan->get_holding_file_list();
        if (@needed_labels) {
@@ -312,10 +485,14 @@ sub main {
 
     step start_dump => sub {
        $current_dump = shift @{$plan->{'dumps'}};
+
        if (!$current_dump) {
            return $steps->{'finished'}->();
        }
 
+       $recovery_done = 0;
+       %recovery_params = ();
+
        $clerk->get_xfer_src(
            dump => $current_dump,
            xfer_src_cb => $steps->{'xfer_src_cb'});
@@ -325,10 +502,105 @@ sub main {
        my ($errs, $hdr, $xfer_src, $directtcp_supported) = @_;
        return failure(join("; ", @$errs), $finished_cb) if $errs;
 
+       my $dle_str = $hdr->{'dle_str'};
+       my $p1 = XML::Simple->new();
+       my $dle = $p1->XMLin($dle_str);
+
        # and set up the destination..
        my $dest_fh;
-       if ($opt_pipe) {
+       my $xfer_dest;
+       my @filters;
+
+       if (defined $opt_data_path and $opt_data_path eq 'directtcp' and !$directtcp_supported) {
+           return failure("The device can't do directtcp", $finished_cb);
+       }
+       $directtcp_supported = 0 if defined $opt_data_path and $opt_data_path eq 'amanda';
+       if ($opt_extract) {
+           my $program = uc(basename($hdr->{program}));
+           my @argv;
+           if ($program ne "APPLICATION") {
+               $directtcp_supported = 0;
+               my %validation_programs = (
+                       "STAR" => [ $Amanda::Constants::STAR, qw(-x -f -) ],
+                       "DUMP" => [ $Amanda::Constants::RESTORE, qw(xbf 2 -) ],
+                       "VDUMP" => [ $Amanda::Constants::VRESTORE, qw(xf -) ],
+                       "VXDUMP" => [ $Amanda::Constants::VXRESTORE, qw(xbf 2 -) ],
+                       "XFSDUMP" => [ $Amanda::Constants::XFSRESTORE, qw(-v silent) ],
+                       "TAR" => [ $Amanda::Constants::GNUTAR, qw(--ignore-zeros -xf -) ],
+                       "GTAR" => [ $Amanda::Constants::GNUTAR, qw(--ignore-zeros -xf -) ],
+                       "GNUTAR" => [ $Amanda::Constants::GNUTAR, qw(--ignore-zeros -xf -) ],
+                       "SMBCLIENT" => [ $Amanda::Constants::GNUTAR, qw(-xf -) ],
+                       "PKZIP" => undef,
+               );
+               if (!exists $validation_programs{$program}) {
+                   return failure("Unknown program '$program' in header; no validation to perform",
+                                  $finished_cb);
+               }
+               @argv = $validation_programs{$program};
+           } else {
+               if (!defined $hdr->{application}) {
+                   return failure("Application not set", $finished_cb);
+               }
+               my $program_path = $Amanda::Paths::APPLICATION_DIR . "/" .
+                                  $hdr->{application};
+               if (!-x $program_path) {
+                   return failure("Application '" . $hdr->{application} .
+                                  "($program_path)' not available on the server",
+                                  $finished_cb);
+               }
+               my %bsu_argv;
+               $bsu_argv{'application'} = $hdr->{application};
+               $bsu_argv{'config'} = $opt_config;
+               $bsu_argv{'host'} = $hdr->{'name'};
+               $bsu_argv{'disk'} = $hdr->{'disk'};
+               $bsu_argv{'device'} = $dle->{'diskdevice'} if defined $dle->{'diskdevice'};
+               my ($bsu, $err) = Amanda::Extract::BSU(%bsu_argv);
+               if (defined $opt_data_path and $opt_data_path eq 'directtcp' and
+                   !$bsu->{'data-path-directtcp'}) {
+                   return failure("The application can't do directtcp", $finished_cb);
+               }
+               if ($directtcp_supported and !$bsu->{'data-path-directtcp'}) {
+                   # application do not support directtcp
+                   $directtcp_supported = 0;
+               }
+
+               push @argv, $program_path, "restore";
+               push @argv, "--config", $opt_config;
+               push @argv, "--host", $hdr->{'name'};
+               push @argv, "--disk", $hdr->{'disk'};
+               push @argv, "--device", $dle->{'diskdevice'} if defined ($dle->{'diskdevice'});
+               push @argv, "--level", $hdr->{'dumplevel'};
+               push @argv, "--directory", $opt_directory;
+
+               # add application_property
+               while (my($name, $value) = each(%application_property)) {
+                   push @argv, "--".$name, $value if $value;
+               }
+
+               #merge property from header;
+               while (my($name, $value) = each (%{$dle->{'backup-program'}->{'property'}})) {
+                   if (!exists $application_property{$name}) {
+                       push @argv, "--".$name, $value->{'value'};
+                   }
+               }
+
+           }
+           $directtcp = $directtcp_supported;
+           if ($directtcp_supported) {
+               $xfer_dest = Amanda::Xfer::Dest::DirectTCPListen->new();
+               @directtcp_command = @argv;
+           } else {
+               # set up the extraction command as a filter element, since
+               # we need its stderr.
+               debug("Running: ". join(' ',@argv));
+               push @filters, Amanda::Xfer::Filter::Process->new(\@argv, 0);
+
+               $dest_fh = \*STDOUT;
+               $xfer_dest = Amanda::Xfer::Dest::Fd->new($dest_fh);
+           }
+       } elsif ($opt_pipe) {
            $dest_fh = \*STDOUT;
+           $xfer_dest = Amanda::Xfer::Dest::Fd->new($dest_fh);
        } else {
            my $filename = sprintf("%s.%s.%s.%d",
                    $hdr->{'name'},
@@ -348,13 +620,23 @@ sub main {
            if (!open($dest_fh, ">", $filename)) {
                return failure("Could not open '$filename' for writing: $!", $finished_cb);
            }
+           $xfer_dest = Amanda::Xfer::Dest::Fd->new($dest_fh);
        }
 
-       my $xfer_dest = Amanda::Xfer::Dest::Fd->new($dest_fh);
+       $timer = Amanda::MainLoop::timeout_source($delay);
+       $timer->set_callback(sub {
+           my $size = $xfer_src->get_bytes_read();
+           if ($is_tty) {
+               print STDERR "\r" . int($size/1024) . " kb ";
+           } else {
+               print STDERR "READ SIZE: " . int($size/1024) . " kb\n";
+           }
+       });
 
        # set up any filters that need to be applied; decryption first
-       my @filters;
-       if ($hdr->{'encrypted'} and not $opt_leave) {
+       if ($hdr->{'encrypted'} and
+           (($hdr->{'srv_encrypt'} and ($decrypt == $ALWAYS || $decrypt == $ONLY_SERVER)) ||
+            ($hdr->{'clnt_encrypt'} and ($decrypt == $ALWAYS || $decrypt == $ONLY_CLIENT)))) {
            if ($hdr->{'srv_encrypt'}) {
                push @filters,
                    Amanda::Xfer::Filter::Process->new(
@@ -376,16 +658,23 @@ sub main {
            $hdr->{'encrypt_suffix'} = 'N';
        }
 
-       if ($hdr->{'compressed'} and not $opt_compress and not $opt_leave) {
+       if ($hdr->{'compressed'} and not $opt_compress and
+           (($hdr->{'srvcompprog'} and ($decompress == $ALWAYS || $decompress == $ONLY_SERVER)) ||
+            ($hdr->{'clntcompprog'} and ($decompress == $ALWAYS || $decompress == $ONLY_CLIENT)) ||
+            ($dle->{'compress'} and $dle->{'compress'} eq "SERVER-FAST" and ($decompress == $ALWAYS || $decompress == $ONLY_SERVER)) ||
+            ($dle->{'compress'} and $dle->{'compress'} eq "SERVER-BEST" and ($decompress == $ALWAYS || $decompress == $ONLY_SERVER)) ||
+            ($dle->{'compress'} and $dle->{'compress'} eq "FAST" and ($decompress == $ALWAYS || $decompress == $ONLY_CLIENT)) ||
+            ($dle->{'compress'} and $dle->{'compress'} eq "BEST" and ($decompress == $ALWAYS || $decompress == $ONLY_CLIENT)))) {
            # need to uncompress this file
-
-           if ($hdr->{'srvcompprog'}) {
-               # TODO: this assumes that srvcompprog takes "-d" to decrypt
+           if ($hdr->{'encrypted'}) {
+               print "Not decompressing because the backup image is not decrypted\n";
+           } elsif ($hdr->{'srvcompprog'}) {
+               # TODO: this assumes that srvcompprog takes "-d" to decompress
                push @filters,
                    Amanda::Xfer::Filter::Process->new(
                        [ $hdr->{'srvcompprog'}, "-d" ], 0);
            } elsif ($hdr->{'clntcompprog'}) {
-               # TODO: this assumes that clntcompprog takes "-d" to decrypt
+               # TODO: this assumes that clntcompprog takes "-d" to decompress
                push @filters,
                    Amanda::Xfer::Filter::Process->new(
                        [ $hdr->{'clntcompprog'}, "-d" ], 0);
@@ -431,11 +720,99 @@ sub main {
            syswrite $hdr_fh, $hdr->to_string(32768, 32768), 32768;
        }
 
-       my $xfer = Amanda::Xfer->new([ $xfer_src, @filters, $xfer_dest ]);
-       $xfer->start($steps->{'handle_xmsg'});
+       # start reading all filter stderr
+       foreach my $filter (@filters) {
+           my $fd = $filter->get_stderr_fd();
+           $fd.="";
+           $fd = int($fd);
+           my $src = Amanda::MainLoop::fd_source($fd,
+                                                $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
+           my $buffer = "";
+           $all_filter{$src} = 1;
+           $src->set_callback( sub {
+               my $b;
+               my $n_read = POSIX::read($fd, $b, 1);
+               if (!defined $n_read) {
+                   return;
+               } elsif ($n_read == 0) {
+                   delete $all_filter{$src};
+                   $src->remove();
+                   POSIX::close($fd);
+                   if (!%all_filter and $recovery_done) {
+                       $steps->{'filter_done'}->();
+                   }
+               } else {
+                   $buffer .= $b;
+                   if ($b eq "\n") {
+                       my $line = $buffer;
+                       chomp $line;
+                       if (length($line) > 1) {
+                           print STDERR "filter stderr: $line\n";
+                           debug("filter stderr: $line");
+                       }
+                       $buffer = "";
+                   }
+               }
+           });
+       }
+
+       my $xfer;
+       if (@filters) {
+           $xfer = Amanda::Xfer->new([ $xfer_src, @filters, $xfer_dest ]);
+       } else {
+           $xfer = Amanda::Xfer->new([ $xfer_src, $xfer_dest ]);
+       }
+       $xfer->start($steps->{'handle_xmsg'}, 0, $current_dump->{'bytes'});
        $clerk->start_recovery(
            xfer => $xfer,
            recovery_cb => $steps->{'recovery_cb'});
+       if ($directtcp) {
+           my $addr = $xfer_dest->get_addrs();
+           push @directtcp_command, "--data-path", "DIRECTTCP";
+           push @directtcp_command, "--direct-tcp", "$addr->[0]->[0]:$addr->[0]->[1]";
+           debug("Running: ". join(' ', @directtcp_command));
+
+           my ($wtr, $rdr);
+           my $err = Symbol::gensym;
+           my $amndmp_pid = open3($wtr, $rdr, $err, @directtcp_command);
+           $amndmp_pid = $amndmp_pid;
+           my $file_to_close = 2;
+           my $amndmp_stdout_src = Amanda::MainLoop::fd_source($rdr,
+                                               $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
+           my $amndmp_stderr_src = Amanda::MainLoop::fd_source($err,
+                                               $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
+
+           $amndmp_stdout_src->set_callback( sub {
+               my $line = <$rdr>;
+               if (!defined $line) {
+                   $file_to_close--;
+                   $amndmp_stdout_src->remove();
+                   if ($file_to_close == 0) {
+                       #abort the xfer
+                       $xfer->cancel() if $xfer->get_status != $XFER_DONE;
+                   }
+                   return;
+               }
+               chomp $line;
+               debug("amndmp stdout: $line");
+               print "$line\n";
+           });
+           $amndmp_stderr_src->set_callback( sub {
+               my $line = <$err>;
+               if (!defined $line) {
+                    $file_to_close--;
+                    $amndmp_stderr_src->remove();
+                    if ($file_to_close == 0) {
+                       #abort the xfer
+                       $xfer->cancel() if $xfer->get_status != $XFER_DONE;
+                    }
+                    return;
+               }
+               chomp $line;
+               debug("amndmp stderr: $line");
+               print STDERR "$line\n";
+           });
+       }
     };
 
     step handle_xmsg => sub {
@@ -450,14 +827,24 @@ sub main {
     };
 
     step recovery_cb => sub {
-       my %params = @_;
+       %recovery_params = @_;
+       $recovery_done = 1;
+
+       $steps->{'filter_done'}->() if !%all_filter;
+    };
 
-       @xfer_errs = (@xfer_errs, @{$params{'errors'}})
-           if $params{'errors'};
+    step filter_done => sub {
+       if ($is_tty) {
+           print STDERR "\r" . int($recovery_params{'bytes_read'}/1024) . " kb ";
+       } else {
+           print STDERR "READ SIZE: " . int($recovery_params{'bytes_read'}/1024) . " kb\n";
+       }
+       @xfer_errs = (@xfer_errs, @{$recovery_params{'errors'}})
+           if $recovery_params{'errors'};
        return failure(join("; ", @xfer_errs), $finished_cb)
            if @xfer_errs;
        return failure("recovery failed", $finished_cb)
-           if $params{'result'} ne 'DONE';
+           if $recovery_params{'result'} ne 'DONE';
 
        $steps->{'start_dump'}->();
     };
@@ -473,6 +860,11 @@ sub main {
     step quit => sub {
        my ($err) = @_;
 
+       if (defined $timer) {
+           $timer->remove();
+           $timer = undef;
+       }
+       print STDERR "\n" if $is_tty;
        return failure($err, $finished_cb) if $err;
 
        $finished_cb->();