Imported Upstream version 3.3.2
[debian/amanda] / server-src / amidxtaped.pl
1 #! @PERL@
2 # Copyright (c) 2010-2012 Zmanda, Inc.  All Rights Reserved.
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License version 2 as published
6 # by the Free Software Foundation.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 # for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19
20 use lib '@amperldir@';
21 use strict;
22 use warnings;
23
24 ##
25 # Interactivity class
26
27 package main::Interactivity;
28 use base 'Amanda::Interactivity';
29 use Amanda::Util qw( weaken_ref );
30 use Amanda::MainLoop;
31 use Amanda::Feature;
32 use Amanda::Debug qw( debug );
33 use Amanda::Config qw( :getconf );
34 use Amanda::Recovery::Scan qw( $DEFAULT_CHANGER );
35
36 sub new {
37     my $class = shift;
38     my %params = @_;
39
40     my $self = {
41         clientservice => $params{'clientservice'},
42     };
43
44     # (weak ref here to eliminate reference loop)
45     weaken_ref($self->{'clientservice'});
46
47     return bless ($self, $class);
48 }
49
50 sub abort() {
51     my $self = shift;
52
53     debug("ignoring spurious Amanda::Recovery::Scan abort call");
54 }
55
56 sub user_request {
57     my $self = shift;
58     my %params = @_;
59     my $buffer = "";
60
61     my $steps = define_steps
62         cb_ref => \$params{'request_cb'};
63
64     step send_message => sub {
65         if ($params{'err'}) {
66             $self->{'clientservice'}->sendmessage("$params{err}");
67         }
68
69         $steps->{'check_fe_feedme'}->();
70     };
71
72     step check_fe_feedme => sub {
73         # note that fe_amrecover_FEEDME implies fe_amrecover_splits
74         if (!$self->{'clientservice'}->{'their_features'}->has(
75                                     $Amanda::Feature::fe_amrecover_FEEDME)) {
76             return $params{'request_cb'}->("remote cannot prompt for volumes", undef);
77         }
78         $steps->{'send_feedme'}->();
79     };
80
81     step send_feedme => sub {
82         $self->{'clientservice'}->sendctlline("FEEDME $params{label}\r\n", $steps->{'read_response'});
83     };
84
85     step read_response => sub {
86         my ($err, $written) = @_;
87         return $params{'request_cb'}->($err, undef) if $err;
88
89         $self->{'clientservice'}->getline_async(
90                 $self->{'clientservice'}->{'ctl_stream'}, $steps->{'got_response'});
91     };
92
93     step got_response => sub {
94         my ($err, $line) = @_;
95         return $params{'request_cb'}->($err, undef) if $err;
96
97         if ($line eq "OK\r\n") {
98             return $params{'request_cb'}->(undef, undef); # carry on as you were
99         } elsif ($line =~ /^TAPE (.*)\r\n$/) {
100             my $tape = $1;
101             if ($tape eq getconf($CNF_AMRECOVER_CHANGER)) {
102                 $tape = $Amanda::Recovery::Scan::DEFAULT_CHANGER;
103             }
104             return $params{'request_cb'}->(undef, $tape); # use this device
105         } else {
106             return $params{'request_cb'}->("got invalid response from remote", undef);
107         }
108     };
109 };
110
111 ##
112 # ClientService class
113
114 package main::ClientService;
115 use base 'Amanda::ClientService';
116
117 use Sys::Hostname;
118
119 use Amanda::Debug qw( debug info warning );
120 use Amanda::MainLoop qw( :GIOCondition );
121 use Amanda::Util qw( :constants match_disk match_host );
122 use Amanda::Feature;
123 use Amanda::Config qw( :init :getconf );
124 use Amanda::Changer;
125 use Amanda::Recovery::Scan;
126 use Amanda::Xfer qw( :constants );
127 use Amanda::Cmdline;
128 use Amanda::Recovery::Clerk;
129 use Amanda::Recovery::Planner;
130 use Amanda::Recovery::Scan;
131 use Amanda::DB::Catalog;
132 use Amanda::Disklist;
133
134 # Note that this class performs its control IO synchronously.  This is adequate
135 # for this service, as it never receives unsolicited input from the remote
136 # system.
137
138 sub run {
139     my $self = shift;
140
141     $self->{'my_features'} = Amanda::Feature::Set->mine();
142     $self->{'their_features'} = Amanda::Feature::Set->old();
143     $self->{'all_filter'} = {};
144
145     $self->setup_streams();
146 }
147
148 sub setup_streams {
149     my $self = shift;
150
151     # get started checking security for inetd or processing the REQ/REP
152     # for amandad
153     if ($self->from_inetd()) {
154         if (!$self->check_inetd_security('main')) {
155             $main::exit_status = 1;
156             return $self->quit();
157         }
158         $self->{'ctl_stream'} = 'main';
159         $self->{'data_stream'} = undef; # no data stream yet
160     } else {
161         my $req = $self->get_req();
162
163         # make some sanity checks
164         my $errors = [];
165         if (defined $req->{'options'}{'auth'} and defined $self->amandad_auth()
166                 and $req->{'options'}{'auth'} ne $self->amandad_auth()) {
167             my $reqauth = $req->{'options'}{'auth'};
168             my $amauth = $self->amandad_auth();
169             push @$errors, "recover program requested auth '$reqauth', " .
170                            "but amandad is using auth '$amauth'";
171             $main::exit_status = 1;
172         }
173
174         # and pull out the features, if given
175         if (defined($req->{'features'})) {
176             $self->{'their_features'} = $req->{'features'};
177         }
178
179         $self->send_rep(['CTL' => 'rw', 'DATA' => 'w'], $errors);
180         return $self->quit() if (@$errors);
181
182         $self->{'ctl_stream'} = 'CTL';
183         $self->{'data_stream'} = 'DATA';
184     }
185
186     $self->read_command();
187 }
188
189 sub read_command {
190     my $self = shift;
191     my $ctl_stream = $self->{'ctl_stream'};
192     my $command = $self->{'command'} = {};
193
194     my @known_commands = qw(
195         HOST DISK DATESTAMP LABEL DEVICE FSF HEADER
196         FEATURES CONFIG );
197     while (1) {
198         $_ = $self->getline($ctl_stream);
199         $_ =~ s/\r?\n$//g;
200
201         last if /^END$/;
202         last if /^[0-9]+$/;
203
204         if (/^([A-Z]+)(=(.*))?$/) {
205             my ($cmd, $val) = ($1, $3);
206             if (!grep { $_ eq $cmd } @known_commands) {
207                 $self->sendmessage("invalid command '$cmd'");
208                 return $self->quit();
209             }
210             if (exists $command->{$cmd}) {
211                 warning("got duplicate command key '$cmd' from remote");
212             } else {
213                 $command->{$cmd} = $val || 1;
214             }
215         }
216
217         # features are handled specially.  This is pretty weird!
218         if (/^FEATURES=/) {
219             my $featreply;
220             my $featurestr = $self->{'my_features'}->as_string();
221             if ($self->from_amandad) {
222                 $featreply = "FEATURES=$featurestr\r\n";
223             } else {
224                 $featreply = $featurestr;
225             }
226
227             $self->senddata($ctl_stream, $featreply);
228         }
229     }
230
231     # process some info from the command
232     if ($command->{'FEATURES'}) {
233         $self->{'their_features'} = Amanda::Feature::Set->from_string($command->{'FEATURES'});
234     }
235
236     # load the configuration
237     if (!$command->{'CONFIG'}) {
238         die "no CONFIG line given";
239     }
240     config_init($CONFIG_INIT_EXPLICIT_NAME, $command->{'CONFIG'});
241     my ($cfgerr_level, @cfgerr_errors) = config_errors();
242     if ($cfgerr_level >= $CFGERR_ERRORS) {
243         die "configuration errors; aborting connection";
244     }
245     Amanda::Util::finish_setup($RUNNING_AS_DUMPUSER_PREFERRED);
246
247     # and the disklist
248     my $diskfile = Amanda::Config::config_dir_relative(getconf($CNF_DISKFILE));
249     $cfgerr_level = Amanda::Disklist::read_disklist('filename' => $diskfile);
250     if ($cfgerr_level >= $CFGERR_ERRORS) {
251         die "Errors processing disklist";
252     }
253
254     $self->setup_data_stream();
255 }
256
257 sub setup_data_stream {
258     my $self = shift;
259
260     # if we're using amandad, then this is ready to roll - it's only inetd mode
261     # that we need to fix
262     if ($self->from_inetd()) {
263         if ($self->{'their_features'}->has($Amanda::Feature::fe_recover_splits)) {
264             # remote side is expecting CONNECT
265             my $port = $self->connection_listen('DATA', 0);
266             $self->senddata($self->{'ctl_stream'}, "CONNECT $port\n");
267             $self->connection_accept('DATA', 30, sub { $self->got_connection(@_); });
268         } else {
269             $self->{'ctl_stream'} = undef; # don't use this for ctl anymore
270             $self->{'data_stream'} = 'main';
271             $self->make_plan();
272         }
273     } else {
274         $self->make_plan();
275     }
276 }
277
278 sub got_connection {
279     my $self = shift;
280     my ($err) = @_;
281
282     if ($err) {
283         $self->sendmessage("$err");
284         return $self->quit();
285     }
286
287     if (!$self->check_inetd_security('DATA')) {
288         $main::exit_status = 1;
289         return $self->quit();
290     }
291     $self->{'data_stream'} = 'DATA';
292
293     $self->make_plan();
294 }
295
296 sub make_plan {
297     my $self = shift;
298
299     # put together a dumpspec
300     my $spec;
301     if (exists $self->{'command'}{'HOST'}
302      || exists $self->{'command'}{'DISK'}
303      || exists $self->{'command'}{'DATESTAMP'}) {
304         my $disk = $self->{'command'}{'DISK'};
305         if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_correct_disk_quoting)) {
306             debug("ignoring specified DISK, as it may be badly quoted");
307             $disk = undef;
308         }
309         $spec = Amanda::Cmdline::dumpspec_t->new(
310             $self->{'command'}{'HOST'},
311             $disk,
312             $self->{'command'}{'DATESTAMP'},
313             undef,  # amidxtaped protocol does not provide a level (!?)
314             undef); # amidxtaped protocol does not provide a write timestamp
315     }
316
317     # figure out if this is a holding-disk recovery
318     my $is_holding = 0;
319     if (!exists $self->{'command'}{'LABEL'} and exists $self->{'command'}{'DEVICE'}) {
320         $is_holding = 1;
321     }
322
323     my $chg;
324     if ($is_holding) {
325         # for holding, give the clerk a null; it won't touch it
326         $chg = Amanda::Changer->new("chg-null:");
327     } else {
328         # if not doing a holding-disk recovery, then we will need a changer.
329         # If we're using the "default" changer, instantiate that.  There are
330         # several ways the user can specify the default changer:
331         my $use_default = 0;
332         if (!exists $self->{'command'}{'DEVICE'}) {
333             $use_default = 1;
334         } elsif ($self->{'command'}{'DEVICE'} eq getconf($CNF_AMRECOVER_CHANGER)) {
335             $use_default = 1;
336         }
337
338         my $tlf = Amanda::Config::config_dir_relative(getconf($CNF_TAPELIST));
339         my $tl = Amanda::Tapelist->new($tlf);
340         if ($use_default) {
341             $chg = Amanda::Changer->new(undef, tapelist => $tl);
342         } else {
343             $chg = Amanda::Changer->new($self->{'command'}{'DEVICE'}, tapelist => $tl);
344         }
345
346         # if we got a bogus changer, log it to the debug log, but allow the
347         # scan algorithm to find a good one later.
348         if ($chg->isa("Amanda::Changer::Error")) {
349             warning("$chg");
350             $chg = Amanda::Changer->new("chg-null:");
351         }
352     }
353     $self->{'chg'} = $chg;
354
355     my $interactivity = main::Interactivity->new(clientservice => $self);
356
357     my $scan = Amanda::Recovery::Scan->new(
358                         chg => $chg,
359                         interactivity => $interactivity);
360     $self->{'scan'} = $scan;
361
362     # XXX temporary
363     $scan->{'scan_conf'}->{'driveinuse'} = Amanda::Recovery::Scan::SCAN_ASK;
364     $scan->{'scan_conf'}->{'volinuse'} = Amanda::Recovery::Scan::SCAN_ASK;
365     $scan->{'scan_conf'}->{'notfound'} = Amanda::Recovery::Scan::SCAN_ASK;
366
367     $self->{'clerk'} = Amanda::Recovery::Clerk->new(
368         # note that we don't have any use for clerk_notif's, so we don't pass
369         # a feedback object
370         scan => $scan);
371
372     if ($is_holding) {
373         # if this is a holding recovery, then the plan is pretty easy.  The holding
374         # file is given to us in the aptly-named DEVICE command key, with a :0 suffix
375         my $holding_file_tapespec = $self->{'command'}{'DEVICE'};
376         my $holding_file = $self->tapespec_to_holding($holding_file_tapespec);
377
378         return Amanda::Recovery::Planner::make_plan(
379             holding_file => $holding_file,
380             $spec? (dumpspec => $spec) : (),
381             plan_cb => sub { $self->plan_cb(@_); });
382     } else {
383         my $filelist = Amanda::Util::unmarshal_tapespec($self->{'command'}{'LABEL'});
384
385         # if LABEL was just a label, then FSF should contain the filenum we want to
386         # start with.
387         if ($filelist->[1][0] == 0) {
388             if (exists $self->{'command'}{'FSF'}) {
389                 $filelist->[1][0] = 0+$self->{'command'}{'FSF'};
390                 # note that if this is a split dump, make_plan will helpfully find the
391                 # remaining parts and include them in the restore.  Pretty spiffy.
392             } else {
393                 # we have only a label and (hopefully) a dumpspec, so let's see if the
394                 # catalog can find a dump for us.
395                 $filelist = $self->try_to_find_dump(
396                         $self->{'command'}{'LABEL'},
397                         $spec);
398                 if (!$filelist) {
399                     return $self->quit();
400                 }
401             }
402         }
403
404         return Amanda::Recovery::Planner::make_plan(
405             filelist => $filelist,
406             chg => $chg,
407             $spec? (dumpspec => $spec) : (),
408             plan_cb => sub { $self->plan_cb(@_); });
409     }
410 }
411
412 sub plan_cb {
413     my $self = shift;
414     my ($err, $plan) = @_;
415
416     if ($err) {
417         $self->sendmessage("$err");
418         return $self->quit();
419     }
420
421     if (@{$plan->{'dumps'}} > 1) {
422         $self->sendmessage("multiple matching dumps; cannot recover");
423         return $self->quit();
424     }
425
426     # check that the request-limit for this DLE allows this recovery.  because
427     # of the bass-ackward way that amrecover specifies the dump to us, we can't
428     # check the results until *after* the plan was created.
429     my $dump = $plan->{'dumps'}->[0];
430     my $dle = Amanda::Disklist::get_disk($dump->{'hostname'}, $dump->{'diskname'});
431     my $recovery_limit;
432     if ($dle && dumptype_seen($dle->{'config'}, $DUMPTYPE_RECOVERY_LIMIT)) {
433         debug("using DLE recovery limit");
434         $recovery_limit = dumptype_getconf($dle->{'config'}, $DUMPTYPE_RECOVERY_LIMIT);
435     } elsif (getconf_seen($CNF_RECOVERY_LIMIT)) {
436         debug("using global recovery limit as default");
437         $recovery_limit = getconf($CNF_RECOVERY_LIMIT);
438     }
439     my $peer = $ENV{'AMANDA_AUTHENTICATED_PEER'};
440     if (defined $recovery_limit) { # undef -> no recovery limit
441         if (!$peer) {
442             warning("a recovery limit is specified for this DLE, but no authenticated ".
443                     "peer name is available; rejecting request.");
444             $self->sendmessage("No matching dumps found");
445             return $self->quit();
446         }
447         my $matched = 0;
448         for my $rl (@$recovery_limit) {
449             if ($rl eq $Amanda::Config::LIMIT_SAMEHOST) {
450                 # handle same-host with a case-insensitive string compare, not match_host
451                 if (lc($peer) eq lc($dump->{'hostname'})) {
452                     $matched = 1;
453                     last;
454                 }
455             } elsif ($rl eq $Amanda::Config::LIMIT_SERVER) {
456                 # handle server with a case-insensitive string compare, not match_host
457                 my $myhostname = hostname;
458                 debug("myhostname: $myhostname");
459                 if (lc($peer) eq lc($myhostname)) {
460                     $matched = 1;
461                     last;
462                 }
463             } else {
464                 # otherwise use match_host to allow match expressions
465                 if (match_host($rl, $peer)) {
466                     $matched = 1;
467                     last;
468                 }
469             }
470         }
471         if (!$matched) {
472             warning("authenticated peer '$peer' did not match recovery-limit ".
473                     "config; rejecting request");
474             $self->sendmessage("No matching dumps found");
475             return $self->quit();
476         }
477     }
478
479     if (!$self->{'their_features'}->has($Amanda::Feature::fe_recover_splits)) {
480         # if we have greater than one volume, we may need to prompt for a new
481         # volume in mid-recovery.  Sadly, we have no way to inform the client of
482         # this.  In hopes that this will "just work", we just issue a warning.
483         my @vols = $plan->get_volume_list();
484         warning("client does not support split dumps; restore may fail if " .
485                 "interaction is necessary");
486     }
487
488     # now set up the transfer
489     $self->{'dump'} = $plan->{'dumps'}[0];
490     $self->{'clerk'}->get_xfer_src(
491         dump => $self->{'dump'},
492         xfer_src_cb => sub { $self->xfer_src_cb(@_); });
493 }
494
495 sub xfer_src_cb {
496     my $self = shift;
497     my ($errors, $header, $xfer_src, $directtcp_supported) = @_;
498
499     if ($errors) {
500         for (@$errors) {
501             $self->sendmessage("$_");
502         }
503         return $self->quit();
504     }
505
506     $self->{'xfer_src'} = $xfer_src;
507     $self->{'xfer_src_supports_directtcp'} = $directtcp_supported;
508     $self->{'header'} = $header;
509
510     debug("recovering from " . $header->summary());
511
512     # set up any filters that need to be applied, decryption first
513     my @filters;
514     if ($header->{'encrypted'}) {
515         if ($header->{'srv_encrypt'}) {
516             push @filters,
517                 Amanda::Xfer::Filter::Process->new(
518                     [ $header->{'srv_encrypt'}, $header->{'srv_decrypt_opt'} ], 0);
519             $header->{'encrypted'} = 0;
520             $header->{'srv_encrypt'} = '';
521             $header->{'srv_decrypt_opt'} = '';
522             $header->{'clnt_encrypt'} = '';
523             $header->{'clnt_decrypt_opt'} = '';
524             $header->{'encrypt_suffix'} = 'N';
525         } elsif ($header->{'clnt_encrypt'}) {
526             if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_receive_unfiltered)) {
527                 push @filters,
528                     Amanda::Xfer::Filter::Process->new(
529                         [ $header->{'clnt_encrypt'},
530                           $header->{'clnt_decrypt_opt'} ], 0);
531                 $header->{'encrypted'} = 0;
532                 $header->{'srv_encrypt'} = '';
533                 $header->{'srv_decrypt_opt'} = '';
534                 $header->{'clnt_encrypt'} = '';
535                 $header->{'clnt_decrypt_opt'} = '';
536                 $header->{'encrypt_suffix'} = 'N';
537             } else {
538                 debug("Not decrypting client encrypted stream");
539             }
540         } else {
541             $self->sendmessage("could not decrypt encrypted dump: no program specified");
542             return $self->quit();
543         }
544
545     }
546
547     if ($header->{'compressed'}) {
548         # need to uncompress this file
549         debug("..with decompression applied");
550
551         if ($header->{'srvcompprog'}) {
552             # TODO: this assumes that srvcompprog takes "-d" to decrypt
553             push @filters,
554                 Amanda::Xfer::Filter::Process->new(
555                     [ $header->{'srvcompprog'}, "-d" ], 0);
556             # adjust the header
557             $header->{'compressed'} = 0;
558             $header->{'uncompress_cmd'} = '';
559             $header->{'srvcompprog'} = '';
560         } elsif ($header->{'clntcompprog'}) {
561             if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_receive_unfiltered)) {
562                 # TODO: this assumes that clntcompprog takes "-d" to decrypt
563                 push @filters,
564                     Amanda::Xfer::Filter::Process->new(
565                         [ $header->{'clntcompprog'}, "-d" ], 0);
566                 # adjust the header
567                 $header->{'compressed'} = 0;
568                 $header->{'uncompress_cmd'} = '';
569                 $header->{'clntcompprog'} = '';
570             }
571         } else {
572             my $dle = $header->get_dle();
573             if ($dle &&
574                 (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_receive_unfiltered) ||
575                  $dle->{'compress'} == $Amanda::Config::COMP_SERVER_FAST ||
576                  $dle->{'compress'} == $Amanda::Config::COMP_SERVER_BEST)) {
577                 push @filters,
578                     Amanda::Xfer::Filter::Process->new(
579                         [ $Amanda::Constants::UNCOMPRESS_PATH,
580                           $Amanda::Constants::UNCOMPRESS_OPT ], 0);
581                 # adjust the header
582                 $header->{'compressed'} = 0;
583                 $header->{'uncompress_cmd'} = '';
584             }
585         }
586
587     }
588     $self->{'xfer_filters'} = [ @filters ];
589
590     # only send the header if requested
591     if ($self->{'command'}{'HEADER'}) {
592         $self->send_header();
593     } else {
594         $self->expect_datapath();
595     }
596 }
597
598 sub send_header {
599     my $self = shift;
600
601     my $header = $self->{'header'};
602
603     # filter out some things the remote might not be able to process
604     if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_dle_in_header)) {
605         $header->{'dle_str'} = undef;
606     } else {
607         $header->{'dle_str'} =
608             Amanda::Disklist::clean_dle_str_for_client($header->{'dle_str'},
609                    Amanda::Feature::am_features($self->{'their_features'}));
610     }
611     if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_origsize_in_header)) {
612         $header->{'orig_size'} = 0;
613     }
614
615     # even with fe_amrecover_splits, amrecover doesn't like F_SPLIT_DUMPFILE.
616     $header->{'type'} = $Amanda::Header::F_DUMPFILE;
617
618     my $hdr_str = $header->to_string(32768, 32768);
619     Amanda::Util::full_write($self->wfd($self->{'data_stream'}), $hdr_str, length($hdr_str))
620         or die "writing to $self->{data_stream}: $!";
621
622     $self->expect_datapath();
623 }
624
625 sub expect_datapath {
626     my $self = shift;
627
628     $self->{'datapath'} = 'none';
629
630     # short-circuit this if amrecover doesn't support datapaths
631     if (!$self->{'their_features'}->has($Amanda::Feature::fe_amidxtaped_datapath)) {
632         return $self->start_xfer();
633     }
634
635     my $line = $self->getline($self->{'ctl_stream'});
636     if ($line eq "ABORT\r\n") {
637         return Amanda::MainLoop::quit();
638     }
639     my ($dpspec) = ($line =~ /^AVAIL-DATAPATH (.*)\r\n$/);
640     die "bad AVAIL-DATAPATH line" unless $dpspec;
641     my @avail_dps = split / /, $dpspec;
642
643     if (grep /^DIRECT-TCP$/, @avail_dps) {
644         # remote can handle a directtcp transfer .. can we?
645         if ($self->{'xfer_src_supports_directtcp'}) {
646             $self->{'datapath'} = 'directtcp';
647         } else {
648             $self->{'datapath'} = 'amanda';
649         }
650     } else {
651         # remote can at least handle AMANDA
652         die "remote cannot handle AMANDA datapath??"
653             unless grep /^AMANDA$/, @avail_dps;
654         $self->{'datapath'} = 'amanda';
655     }
656
657     $self->start_xfer();
658 }
659
660 sub start_xfer {
661     my $self = shift;
662
663     # create the appropriate destination based on our datapath
664     my $xfer_dest;
665     if ($self->{'datapath'} eq 'directtcp') {
666         $xfer_dest = Amanda::Xfer::Dest::DirectTCPListen->new();
667     } else {
668         $xfer_dest = Amanda::Xfer::Dest::Fd->new(
669                 $self->wfd($self->{'data_stream'})),
670     }
671
672     if ($self->{'datapath'} eq 'amanda') {
673         $self->sendctlline("USE-DATAPATH AMANDA\r\n");
674         my $dpline = $self->getline($self->{'ctl_stream'});
675         if ($dpline ne "DATAPATH-OK\r\n") {
676             die "expected DATAPATH-OK";
677         }
678     }
679
680     # start reading all filter stderr
681     foreach my $filter (@{$self->{'xfer_filters'}}) {
682         my $fd = $filter->get_stderr_fd();
683         $fd.="";
684         $fd = int($fd);
685         my $src = Amanda::MainLoop::fd_source($fd,
686                                               $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
687         my $buffer = "";
688         $self->{'all_filter'}{$src} = 1;
689         $src->set_callback( sub {
690             my $b;
691             my $n_read = POSIX::read($fd, $b, 1);
692             if (!defined $n_read) {
693                 return;
694             } elsif ($n_read == 0) {
695                 delete $self->{'all_filter'}->{$src};
696                 $src->remove();
697                 POSIX::close($fd);
698                 if (!%{$self->{'all_filter'}} and $self->{'fetch_done'}) {
699                     Amanda::MainLoop::quit();
700                 }
701             } else {
702                 $buffer .= $b;
703                 if ($b eq "\n") {
704                     my $line = $buffer;
705                     #print STDERR "filter stderr: $line";
706                     chomp $line;
707                     $self->sendmessage("filter stderr: $line");
708                     debug("filter stderr: $line");
709                     $buffer = "";
710                 }
711             }
712         });
713     }
714
715     # create and start the transfer
716     $self->{'xfer'} = Amanda::Xfer->new([
717         $self->{'xfer_src'},
718         @{$self->{'xfer_filters'}},
719         $xfer_dest,
720     ]);
721     my $size = 0;
722     $size = $self->{'dump'}->{'bytes'} if exists $self->{'dump'}->{'bytes'};
723     $self->{'xfer'}->start(sub { $self->handle_xmsg(@_); }, 0, $size);
724     debug("started xfer; datapath=$self->{datapath}");
725
726     # send the data-path response, if we have a datapath
727     if ($self->{'datapath'} eq 'directtcp') {
728         my $addrs = $xfer_dest->get_addrs();
729         $addrs = [ map { $_->[0] . ":" . $_->[1] } @$addrs ];
730         $addrs = join(" ", @$addrs);
731         $self->sendctlline("USE-DATAPATH DIRECT-TCP $addrs\r\n");
732         my $dpline = $self->getline($self->{'ctl_stream'});
733         if ($dpline ne "DATAPATH-OK\r\n") {
734             die "expected DATAPATH-OK";
735         }
736     }
737
738     # and let the clerk know
739     $self->{'clerk'}->start_recovery(
740         xfer => $self->{'xfer'},
741         recovery_cb => sub { $self->recovery_cb(@_); });
742 }
743
744 sub handle_xmsg {
745     my $self = shift;
746     my ($src, $msg, $xfer) = @_;
747
748     $self->{'clerk'}->handle_xmsg($src, $msg, $xfer);
749     if ($msg->{'elt'} != $self->{'xfer_src'}) {
750         if ($msg->{'type'} == $XMSG_ERROR) {
751             $self->sendmessage("$msg->{message}");
752         }
753     }
754 }
755
756 sub recovery_cb {
757     my $self = shift;
758     my %params = @_;
759
760     debug("recovery complete");
761     if (@{$params{'errors'}}) {
762         for (@{$params{'errors'}}) {
763             $self->sendmessage("$_");
764         }
765         return $self->quit();
766     }
767
768     # note that the amidxtaped protocol has no way to indicate successful
769     # completion of a transfer
770     if ($params{'result'} ne 'DONE') {
771         warning("NOTE: transfer failed, but amrecover does not know that");
772     }
773
774     $self->finish();
775 }
776
777 sub finish {
778     my $self = shift;
779
780     # close the data fd for writing to signal EOF
781     $self->close($self->{'data_stream'}, 'w');
782
783     $self->quit();
784 }
785
786 sub quit {
787     my $self = shift;
788
789     if ($self->{'clerk'}) {
790         $self->{'clerk'}->quit(finished_cb => sub {
791             my ($err) = @_;
792             $self->{'chg'}->quit() if defined $self->{'chg'};
793             if ($err) {
794                 # it's *way* too late to report this to amrecover now!
795                 warning("while quitting clerk: $err");
796             }
797             $self->quit1();
798         });
799     } else {
800         $self->{'scan'}->quit() if defined $self->{'scan'};
801         $self->{'chg'}->quit() if defined $self->{'chg'};
802         $self->quit1();
803     }
804
805 }
806
807 sub quit1 {
808     my $self = shift;
809
810     $self->{'fetch_done'} = 1;
811     if (!%{$self->{'all_filter'}}) {
812         Amanda::MainLoop::quit();
813     }
814 }
815
816 ## utilities
817
818 sub check_inetd_security {
819     my $self = shift;
820     my ($stream) = @_;
821
822     my $firstline = $self->getline($stream);
823     if ($firstline !~ /^SECURITY (.*)\n/) {
824         warning("did not get security line");
825         print "ERROR did not get security line\r\n";
826         return 0;
827     }
828
829     my $errmsg = $self->check_bsd_security($stream, $1);
830     if ($errmsg) {
831         print "ERROR $errmsg\r\n";
832         return 0;
833     }
834
835     return 1;
836 }
837
838 sub get_req {
839     my $self = shift;
840
841     my $req_str = '';
842     while (1) {
843         my $buf = Amanda::Util::full_read($self->rfd('main'), 1024);
844         last unless $buf;
845         $req_str .= $buf;
846     }
847     # we've read main to EOF, so close it
848     $self->close('main', 'r');
849
850     return $self->{'req'} = $self->parse_req($req_str);
851 }
852
853 sub send_rep {
854     my $self = shift;
855     my ($streams, $errors) = @_;
856     my $rep = '';
857
858     # first, if there were errors in the REQ, report them
859     if (@$errors) {
860         for my $err (@$errors) {
861             $rep .= "ERROR $err\n";
862         }
863     } else {
864         my $connline = $self->connect_streams(@$streams);
865         $rep .= "$connline\n";
866     }
867     # rep needs a empty-line terminator, I think
868     $rep .= "\n";
869
870     # write the whole rep packet, and close main to signal the end of the packet
871     $self->senddata('main', $rep);
872     $self->close('main', 'w');
873 }
874
875 # helper function to get a line, including the trailing '\n', from a stream.  This
876 # reads a character at a time to ensure that no extra characters are consumed.  This
877 # could certainly be more efficient! (TODO)
878 sub getline {
879     my $self = shift;
880     my ($stream) = @_;
881     my $fd = $self->rfd($stream);
882     my $line = '';
883
884     while (1) {
885         my $c;
886         POSIX::read($fd, $c, 1)
887             or last;
888         $line .= $c;
889         last if $c eq "\n";
890     }
891
892     my $chopped = $line;
893     $chopped =~ s/[\r\n]*$//g;
894     debug("CTL << $chopped");
895
896     return $line;
897 }
898
899 # like getline, but async; TODO:
900 #  - make all uses of getline async
901 #  - use buffering to read more than one character at a time
902 sub getline_async {
903     my $self = shift;
904     my ($stream, $async_read_cb) = @_;
905     my $fd = $self->rfd($stream);
906
907     my $data_in;
908     my $buf = '';
909
910     $data_in = sub {
911         my ($err, $data) = @_;
912
913         return $async_read_cb->($err, undef) if $err;
914
915         $buf .= $data;
916         if ($buf =~ /\r\n$/) {
917             my $chopped = $buf;
918             $chopped =~ s/[\r\n]*$//g;
919             debug("CTL << $chopped");
920
921             $async_read_cb->(undef, $buf);
922         } else {
923             Amanda::MainLoop::async_read(fd => $fd, size => 1, async_read_cb => $data_in);
924         }
925     };
926     Amanda::MainLoop::async_read(fd => $fd, size => 1, async_read_cb => $data_in);
927 }
928
929 # helper function to write a data to a stream.  This does not add newline characters.
930 # If the callback is given, this is async (TODO: all calls should be async)
931 sub senddata {
932     my $self = shift;
933     my ($stream, $data, $async_write_cb) = @_;
934     my $fd = $self->wfd($stream);
935
936     if (defined $async_write_cb) {
937         return Amanda::MainLoop::async_write(
938                 fd => $fd,
939                 data => $data,
940                 async_write_cb => $async_write_cb);
941     } else {
942         Amanda::Util::full_write($fd, $data, length($data))
943             or die "writing to $stream: $!";
944     }
945 }
946
947 # send a line on the control stream, or just log it if the ctl stream is gone;
948 # async callback is just like for senddata
949 sub sendctlline {
950     my $self = shift;
951     my ($msg, $async_write_cb) = @_;
952
953     my $chopped = $msg;
954     $chopped =~ s/[\r\n]*$//g;
955
956     if ($self->{'ctl_stream'}) {
957         debug("CTL >> $chopped");
958         return $self->senddata($self->{'ctl_stream'}, $msg, $async_write_cb);
959     } else {
960         debug("not sending CTL message as CTL is closed >> $chopped");
961         if (defined $async_write_cb) {
962             $async_write_cb->(undef, length($msg));
963         }
964     }
965 }
966
967 # send a MESSAGE on the CTL stream, but only if the remote has
968 # fe_amrecover_message
969 sub sendmessage {
970     my $self = shift;
971     my ($msg) = @_;
972
973     if ($self->{'their_features'}->has($Amanda::Feature::fe_amrecover_message)) {
974         $self->sendctlline("MESSAGE $msg\r\n");
975     } else {
976         warning("remote does not understand MESSAGE; not sent: MESSAGE $msg");
977     }
978 }
979
980 # covert a tapespec to a holding filename
981 sub tapespec_to_holding {
982     my $self = shift;
983     my ($tapespec) = @_;
984
985     my $filelist = Amanda::Util::unmarshal_tapespec($tapespec);
986
987     # $filelist should have the form [ $holding_file => [ 0 ] ]
988     die "invalid holding tapespec" unless @$filelist == 2;
989     die "invalid holding tapespec" unless @{$filelist->[1]} == 1;
990     die "invalid holding tapespec" unless $filelist->[1][0] == 0;
991
992     return $filelist->[0];
993 }
994
995 # amrecover didn't give us much to go on, but see if we can find a dump that
996 # will make it happy.
997 sub try_to_find_dump {
998     my $self = shift;
999     my ($label, $spec) = @_;
1000
1001     # search the catalog; get_dumps cannot search by labels, so we have to use
1002     # get_parts instead
1003     my @parts = Amanda::DB::Catalog::get_parts(
1004         label => $label,
1005         dumpspecs => [ $spec ]);
1006
1007     if (!@parts) {
1008         $self->sendmessage("could not find any matching dumps on volume '$label'");
1009         return undef;
1010     }
1011
1012     # (note that if there is more than one dump in @parts, the planner will
1013     # catch it later)
1014
1015     # sort the parts by their order on each volume.  This sorts the volumes
1016     # lexically by label, but the planner will straighten it out.
1017     @parts = Amanda::DB::Catalog::sort_dumps([ "label", "filenum" ], @parts);
1018
1019     # loop over the parts for the dump and make a filelist.
1020     my $last_label = '';
1021     my $last_filenums = undef;
1022     my $filelist = [];
1023     for my $part (@parts) {
1024         next unless defined $part; # skip part number 0
1025         if ($part->{'label'} ne $last_label) {
1026             $last_label = $part->{'label'};
1027             $last_filenums = [];
1028             push @$filelist, $last_label, $last_filenums;
1029         }
1030         push @$last_filenums, $part->{'filenum'};
1031     }
1032
1033     return $filelist;
1034 }
1035
1036 ##
1037 # main driver
1038
1039 package main;
1040 use Amanda::Debug qw( debug );
1041 use Amanda::Util qw( :constants );
1042 use Amanda::Config qw( :init );
1043
1044 our $exit_status = 0;
1045
1046 sub main {
1047     Amanda::Util::setup_application("amidxtaped", "server", $CONTEXT_DAEMON);
1048     config_init(0, undef);
1049     Amanda::Debug::debug_dup_stderr_to_debug();
1050
1051     my $cs = main::ClientService->new();
1052     Amanda::MainLoop::call_later(sub { $cs->run(); });
1053     Amanda::MainLoop::run();
1054
1055     debug("exiting with $exit_status");
1056     Amanda::Util::finish_application();
1057 }
1058
1059 main();
1060 exit($exit_status);