Imported Upstream version 3.3.3
[debian/amanda] / server-src / amidxtaped.pl
1 #! @PERL@
2 # Copyright (c) 2010-2012 Zmanda, Inc.  All Rights Reserved.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; either version 2
7 # of the License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 # for more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program; if not, write to the Free Software Foundation, Inc.,
16 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 #
18 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
20
21 use lib '@amperldir@';
22 use strict;
23 use warnings;
24
25 ##
26 # Interactivity class
27
28 package main::Interactivity;
29 use base 'Amanda::Interactivity';
30 use Amanda::Util qw( weaken_ref );
31 use Amanda::MainLoop;
32 use Amanda::Feature;
33 use Amanda::Debug qw( debug );
34 use Amanda::Config qw( :getconf );
35 use Amanda::Recovery::Scan qw( $DEFAULT_CHANGER );
36
37 sub new {
38     my $class = shift;
39     my %params = @_;
40
41     my $self = {
42         clientservice => $params{'clientservice'},
43     };
44
45     # (weak ref here to eliminate reference loop)
46     weaken_ref($self->{'clientservice'});
47
48     return bless ($self, $class);
49 }
50
51 sub abort() {
52     my $self = shift;
53
54     debug("ignoring spurious Amanda::Recovery::Scan abort call");
55 }
56
57 sub user_request {
58     my $self = shift;
59     my %params = @_;
60     my $buffer = "";
61
62     my $steps = define_steps
63         cb_ref => \$params{'request_cb'};
64
65     step send_message => sub {
66         if ($params{'err'}) {
67             $self->{'clientservice'}->sendmessage("$params{err}");
68         }
69
70         $steps->{'check_fe_feedme'}->();
71     };
72
73     step check_fe_feedme => sub {
74         # note that fe_amrecover_FEEDME implies fe_amrecover_splits
75         if (!$self->{'clientservice'}->{'their_features'}->has(
76                                     $Amanda::Feature::fe_amrecover_FEEDME)) {
77             return $params{'request_cb'}->("remote cannot prompt for volumes", undef);
78         }
79         $steps->{'send_feedme'}->();
80     };
81
82     step send_feedme => sub {
83         $self->{'clientservice'}->sendctlline("FEEDME $params{label}\r\n", $steps->{'read_response'});
84     };
85
86     step read_response => sub {
87         my ($err, $written) = @_;
88         return $params{'request_cb'}->($err, undef) if $err;
89
90         $self->{'clientservice'}->getline_async(
91                 $self->{'clientservice'}->{'ctl_stream'}, $steps->{'got_response'});
92     };
93
94     step got_response => sub {
95         my ($err, $line) = @_;
96         return $params{'request_cb'}->($err, undef) if $err;
97
98         if ($line eq "OK\r\n") {
99             return $params{'request_cb'}->(undef, undef); # carry on as you were
100         } elsif ($line =~ /^TAPE (.*)\r\n$/) {
101             my $tape = $1;
102             if ($tape eq getconf($CNF_AMRECOVER_CHANGER)) {
103                 $tape = $Amanda::Recovery::Scan::DEFAULT_CHANGER;
104             }
105             return $params{'request_cb'}->(undef, $tape); # use this device
106         } else {
107             return $params{'request_cb'}->("got invalid response from remote", undef);
108         }
109     };
110 };
111
112 ##
113 # ClientService class
114
115 package main::ClientService;
116 use base 'Amanda::ClientService';
117
118 use Sys::Hostname;
119
120 use Amanda::Debug qw( debug info warning );
121 use Amanda::MainLoop qw( :GIOCondition );
122 use Amanda::Util qw( :constants match_disk match_host );
123 use Amanda::Feature;
124 use Amanda::Config qw( :init :getconf );
125 use Amanda::Changer;
126 use Amanda::Recovery::Scan;
127 use Amanda::Xfer qw( :constants );
128 use Amanda::Cmdline;
129 use Amanda::Recovery::Clerk;
130 use Amanda::Recovery::Planner;
131 use Amanda::Recovery::Scan;
132 use Amanda::DB::Catalog;
133 use Amanda::Disklist;
134
135 # Note that this class performs its control IO synchronously.  This is adequate
136 # for this service, as it never receives unsolicited input from the remote
137 # system.
138
139 sub run {
140     my $self = shift;
141
142     $self->{'my_features'} = Amanda::Feature::Set->mine();
143     $self->{'their_features'} = Amanda::Feature::Set->old();
144     $self->{'all_filter'} = {};
145
146     $self->setup_streams();
147 }
148
149 sub setup_streams {
150     my $self = shift;
151
152     # get started checking security for inetd or processing the REQ/REP
153     # for amandad
154     if ($self->from_inetd()) {
155         if (!$self->check_inetd_security('main')) {
156             $main::exit_status = 1;
157             return $self->quit();
158         }
159         $self->{'ctl_stream'} = 'main';
160         $self->{'data_stream'} = undef; # no data stream yet
161     } else {
162         my $req = $self->get_req();
163
164         # make some sanity checks
165         my $errors = [];
166         if (defined $req->{'options'}{'auth'} and defined $self->amandad_auth()
167                 and $req->{'options'}{'auth'} ne $self->amandad_auth()) {
168             my $reqauth = $req->{'options'}{'auth'};
169             my $amauth = $self->amandad_auth();
170             push @$errors, "recover program requested auth '$reqauth', " .
171                            "but amandad is using auth '$amauth'";
172             $main::exit_status = 1;
173         }
174
175         # and pull out the features, if given
176         if (defined($req->{'features'})) {
177             $self->{'their_features'} = $req->{'features'};
178         }
179
180         $self->send_rep(['CTL' => 'rw', 'DATA' => 'w'], $errors);
181         return $self->quit() if (@$errors);
182
183         $self->{'ctl_stream'} = 'CTL';
184         $self->{'data_stream'} = 'DATA';
185     }
186
187     $self->read_command();
188 }
189
190 sub read_command {
191     my $self = shift;
192     my $ctl_stream = $self->{'ctl_stream'};
193     my $command = $self->{'command'} = {};
194
195     my @known_commands = qw(
196         HOST DISK DATESTAMP LABEL DEVICE FSF HEADER
197         FEATURES CONFIG );
198     while (1) {
199         $_ = $self->getline($ctl_stream);
200         $_ =~ s/\r?\n$//g;
201
202         last if /^END$/;
203         last if /^[0-9]+$/;
204
205         if (/^([A-Z]+)(=(.*))?$/) {
206             my ($cmd, $val) = ($1, $3);
207             if (!grep { $_ eq $cmd } @known_commands) {
208                 $self->sendmessage("invalid command '$cmd'");
209                 return $self->quit();
210             }
211             if (exists $command->{$cmd}) {
212                 warning("got duplicate command key '$cmd' from remote");
213             } else {
214                 $command->{$cmd} = $val || 1;
215             }
216         }
217
218         # features are handled specially.  This is pretty weird!
219         if (/^FEATURES=/) {
220             my $featreply;
221             my $featurestr = $self->{'my_features'}->as_string();
222             if ($self->from_amandad) {
223                 $featreply = "FEATURES=$featurestr\r\n";
224             } else {
225                 $featreply = $featurestr;
226             }
227
228             $self->senddata($ctl_stream, $featreply);
229         }
230     }
231
232     # process some info from the command
233     if ($command->{'FEATURES'}) {
234         $self->{'their_features'} = Amanda::Feature::Set->from_string($command->{'FEATURES'});
235     }
236
237     # load the configuration
238     if (!$command->{'CONFIG'}) {
239         die "no CONFIG line given";
240     }
241     config_init($CONFIG_INIT_EXPLICIT_NAME, $command->{'CONFIG'});
242     my ($cfgerr_level, @cfgerr_errors) = config_errors();
243     if ($cfgerr_level >= $CFGERR_ERRORS) {
244         die "configuration errors; aborting connection";
245     }
246     Amanda::Util::finish_setup($RUNNING_AS_DUMPUSER_PREFERRED);
247
248     # and the disklist
249     my $diskfile = Amanda::Config::config_dir_relative(getconf($CNF_DISKFILE));
250     $cfgerr_level = Amanda::Disklist::read_disklist('filename' => $diskfile);
251     if ($cfgerr_level >= $CFGERR_ERRORS) {
252         die "Errors processing disklist";
253     }
254
255     $self->setup_data_stream();
256 }
257
258 sub setup_data_stream {
259     my $self = shift;
260
261     # if we're using amandad, then this is ready to roll - it's only inetd mode
262     # that we need to fix
263     if ($self->from_inetd()) {
264         if ($self->{'their_features'}->has($Amanda::Feature::fe_recover_splits)) {
265             # remote side is expecting CONNECT
266             my $port = $self->connection_listen('DATA', 0);
267             $self->senddata($self->{'ctl_stream'}, "CONNECT $port\n");
268             $self->connection_accept('DATA', 30, sub { $self->got_connection(@_); });
269         } else {
270             $self->{'ctl_stream'} = undef; # don't use this for ctl anymore
271             $self->{'data_stream'} = 'main';
272             $self->make_plan();
273         }
274     } else {
275         $self->make_plan();
276     }
277 }
278
279 sub got_connection {
280     my $self = shift;
281     my ($err) = @_;
282
283     if ($err) {
284         $self->sendmessage("$err");
285         return $self->quit();
286     }
287
288     if (!$self->check_inetd_security('DATA')) {
289         $main::exit_status = 1;
290         return $self->quit();
291     }
292     $self->{'data_stream'} = 'DATA';
293
294     $self->make_plan();
295 }
296
297 sub make_plan {
298     my $self = shift;
299
300     # put together a dumpspec
301     my $spec;
302     if (exists $self->{'command'}{'HOST'}
303      || exists $self->{'command'}{'DISK'}
304      || exists $self->{'command'}{'DATESTAMP'}) {
305         my $disk = $self->{'command'}{'DISK'};
306         if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_correct_disk_quoting)) {
307             debug("ignoring specified DISK, as it may be badly quoted");
308             $disk = undef;
309         }
310         $spec = Amanda::Cmdline::dumpspec_t->new(
311             $self->{'command'}{'HOST'},
312             $disk,
313             $self->{'command'}{'DATESTAMP'},
314             undef,  # amidxtaped protocol does not provide a level (!?)
315             undef); # amidxtaped protocol does not provide a write timestamp
316     }
317
318     # figure out if this is a holding-disk recovery
319     my $is_holding = 0;
320     if (!exists $self->{'command'}{'LABEL'} and exists $self->{'command'}{'DEVICE'}) {
321         $is_holding = 1;
322     }
323
324     my $chg;
325     if ($is_holding) {
326         # for holding, give the clerk a null; it won't touch it
327         $chg = Amanda::Changer->new("chg-null:");
328     } else {
329         # if not doing a holding-disk recovery, then we will need a changer.
330         # If we're using the "default" changer, instantiate that.  There are
331         # several ways the user can specify the default changer:
332         my $use_default = 0;
333         if (!exists $self->{'command'}{'DEVICE'}) {
334             $use_default = 1;
335         } elsif ($self->{'command'}{'DEVICE'} eq getconf($CNF_AMRECOVER_CHANGER)) {
336             $use_default = 1;
337         }
338
339         my $tlf = Amanda::Config::config_dir_relative(getconf($CNF_TAPELIST));
340         my $tl = Amanda::Tapelist->new($tlf);
341         if ($use_default) {
342             $chg = Amanda::Changer->new(undef, tapelist => $tl);
343         } else {
344             $chg = Amanda::Changer->new($self->{'command'}{'DEVICE'}, tapelist => $tl);
345         }
346
347         # if we got a bogus changer, log it to the debug log, but allow the
348         # scan algorithm to find a good one later.
349         if ($chg->isa("Amanda::Changer::Error")) {
350             warning("$chg");
351             $chg = Amanda::Changer->new("chg-null:");
352         }
353     }
354     $self->{'chg'} = $chg;
355
356     my $interactivity = main::Interactivity->new(clientservice => $self);
357
358     my $scan = Amanda::Recovery::Scan->new(
359                         chg => $chg,
360                         interactivity => $interactivity);
361     $self->{'scan'} = $scan;
362
363     # XXX temporary
364     $scan->{'scan_conf'}->{'driveinuse'} = Amanda::Recovery::Scan::SCAN_ASK;
365     $scan->{'scan_conf'}->{'volinuse'} = Amanda::Recovery::Scan::SCAN_ASK;
366     $scan->{'scan_conf'}->{'notfound'} = Amanda::Recovery::Scan::SCAN_ASK;
367
368     $self->{'clerk'} = Amanda::Recovery::Clerk->new(
369         # note that we don't have any use for clerk_notif's, so we don't pass
370         # a feedback object
371         scan => $scan);
372
373     if ($is_holding) {
374         # if this is a holding recovery, then the plan is pretty easy.  The holding
375         # file is given to us in the aptly-named DEVICE command key, with a :0 suffix
376         my $holding_file_tapespec = $self->{'command'}{'DEVICE'};
377         my $holding_file = $self->tapespec_to_holding($holding_file_tapespec);
378
379         return Amanda::Recovery::Planner::make_plan(
380             holding_file => $holding_file,
381             $spec? (dumpspec => $spec) : (),
382             plan_cb => sub { $self->plan_cb(@_); });
383     } else {
384         my $filelist = Amanda::Util::unmarshal_tapespec($self->{'command'}{'LABEL'});
385
386         # if LABEL was just a label, then FSF should contain the filenum we want to
387         # start with.
388         if ($filelist->[1][0] == 0) {
389             if (exists $self->{'command'}{'FSF'}) {
390                 $filelist->[1][0] = 0+$self->{'command'}{'FSF'};
391                 # note that if this is a split dump, make_plan will helpfully find the
392                 # remaining parts and include them in the restore.  Pretty spiffy.
393             } else {
394                 # we have only a label and (hopefully) a dumpspec, so let's see if the
395                 # catalog can find a dump for us.
396                 $filelist = $self->try_to_find_dump(
397                         $self->{'command'}{'LABEL'},
398                         $spec);
399                 if (!$filelist) {
400                     return $self->quit();
401                 }
402             }
403         }
404
405         return Amanda::Recovery::Planner::make_plan(
406             filelist => $filelist,
407             chg => $chg,
408             $spec? (dumpspec => $spec) : (),
409             plan_cb => sub { $self->plan_cb(@_); });
410     }
411 }
412
413 sub plan_cb {
414     my $self = shift;
415     my ($err, $plan) = @_;
416
417     if ($err) {
418         $self->sendmessage("$err");
419         return $self->quit();
420     }
421
422     if (@{$plan->{'dumps'}} > 1) {
423         $self->sendmessage("multiple matching dumps; cannot recover");
424         return $self->quit();
425     }
426
427     # check that the request-limit for this DLE allows this recovery.  because
428     # of the bass-ackward way that amrecover specifies the dump to us, we can't
429     # check the results until *after* the plan was created.
430     my $dump = $plan->{'dumps'}->[0];
431     my $dle = Amanda::Disklist::get_disk($dump->{'hostname'}, $dump->{'diskname'});
432     my $recovery_limit;
433     if ($dle && dumptype_seen($dle->{'config'}, $DUMPTYPE_RECOVERY_LIMIT)) {
434         debug("using DLE recovery limit");
435         $recovery_limit = dumptype_getconf($dle->{'config'}, $DUMPTYPE_RECOVERY_LIMIT);
436     } elsif (getconf_seen($CNF_RECOVERY_LIMIT)) {
437         debug("using global recovery limit as default");
438         $recovery_limit = getconf($CNF_RECOVERY_LIMIT);
439     }
440     my $peer = $ENV{'AMANDA_AUTHENTICATED_PEER'};
441     if (defined $recovery_limit) { # undef -> no recovery limit
442         if (!$peer) {
443             warning("a recovery limit is specified for this DLE, but no authenticated ".
444                     "peer name is available; rejecting request.");
445             $self->sendmessage("No matching dumps found");
446             return $self->quit();
447         }
448         my $matched = 0;
449         for my $rl (@$recovery_limit) {
450             if ($rl eq $Amanda::Config::LIMIT_SAMEHOST) {
451                 # handle same-host with a case-insensitive string compare, not match_host
452                 if (lc($peer) eq lc($dump->{'hostname'})) {
453                     $matched = 1;
454                     last;
455                 }
456             } elsif ($rl eq $Amanda::Config::LIMIT_SERVER) {
457                 # handle server with a case-insensitive string compare, not match_host
458                 my $myhostname = hostname;
459                 debug("myhostname: $myhostname");
460                 if (lc($peer) eq lc($myhostname)) {
461                     $matched = 1;
462                     last;
463                 }
464             } else {
465                 # otherwise use match_host to allow match expressions
466                 if (match_host($rl, $peer)) {
467                     $matched = 1;
468                     last;
469                 }
470             }
471         }
472         if (!$matched) {
473             warning("authenticated peer '$peer' did not match recovery-limit ".
474                     "config; rejecting request");
475             $self->sendmessage("No matching dumps found");
476             return $self->quit();
477         }
478     }
479
480     if (!$self->{'their_features'}->has($Amanda::Feature::fe_recover_splits)) {
481         # if we have greater than one volume, we may need to prompt for a new
482         # volume in mid-recovery.  Sadly, we have no way to inform the client of
483         # this.  In hopes that this will "just work", we just issue a warning.
484         my @vols = $plan->get_volume_list();
485         warning("client does not support split dumps; restore may fail if " .
486                 "interaction is necessary");
487     }
488
489     # now set up the transfer
490     $self->{'dump'} = $plan->{'dumps'}[0];
491     $self->{'clerk'}->get_xfer_src(
492         dump => $self->{'dump'},
493         xfer_src_cb => sub { $self->xfer_src_cb(@_); });
494 }
495
496 sub xfer_src_cb {
497     my $self = shift;
498     my ($errors, $header, $xfer_src, $directtcp_supported) = @_;
499
500     if ($errors) {
501         for (@$errors) {
502             $self->sendmessage("$_");
503         }
504         return $self->quit();
505     }
506
507     $self->{'xfer_src'} = $xfer_src;
508     $self->{'xfer_src_supports_directtcp'} = $directtcp_supported;
509     $self->{'header'} = $header;
510
511     debug("recovering from " . $header->summary());
512
513     # set up any filters that need to be applied, decryption first
514     my @filters;
515     if ($header->{'encrypted'}) {
516         if ($header->{'srv_encrypt'}) {
517             push @filters,
518                 Amanda::Xfer::Filter::Process->new(
519                     [ $header->{'srv_encrypt'}, $header->{'srv_decrypt_opt'} ], 0);
520             $header->{'encrypted'} = 0;
521             $header->{'srv_encrypt'} = '';
522             $header->{'srv_decrypt_opt'} = '';
523             $header->{'clnt_encrypt'} = '';
524             $header->{'clnt_decrypt_opt'} = '';
525             $header->{'encrypt_suffix'} = 'N';
526         } elsif ($header->{'clnt_encrypt'}) {
527             if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_receive_unfiltered)) {
528                 push @filters,
529                     Amanda::Xfer::Filter::Process->new(
530                         [ $header->{'clnt_encrypt'},
531                           $header->{'clnt_decrypt_opt'} ], 0);
532                 $header->{'encrypted'} = 0;
533                 $header->{'srv_encrypt'} = '';
534                 $header->{'srv_decrypt_opt'} = '';
535                 $header->{'clnt_encrypt'} = '';
536                 $header->{'clnt_decrypt_opt'} = '';
537                 $header->{'encrypt_suffix'} = 'N';
538             } else {
539                 debug("Not decrypting client encrypted stream");
540             }
541         } else {
542             $self->sendmessage("could not decrypt encrypted dump: no program specified");
543             return $self->quit();
544         }
545
546     }
547
548     if ($header->{'compressed'}) {
549         # need to uncompress this file
550         debug("..with decompression applied");
551
552         if ($header->{'srvcompprog'}) {
553             # TODO: this assumes that srvcompprog takes "-d" to decrypt
554             push @filters,
555                 Amanda::Xfer::Filter::Process->new(
556                     [ $header->{'srvcompprog'}, "-d" ], 0);
557             # adjust the header
558             $header->{'compressed'} = 0;
559             $header->{'uncompress_cmd'} = '';
560             $header->{'srvcompprog'} = '';
561         } elsif ($header->{'clntcompprog'}) {
562             if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_receive_unfiltered)) {
563                 # TODO: this assumes that clntcompprog takes "-d" to decrypt
564                 push @filters,
565                     Amanda::Xfer::Filter::Process->new(
566                         [ $header->{'clntcompprog'}, "-d" ], 0);
567                 # adjust the header
568                 $header->{'compressed'} = 0;
569                 $header->{'uncompress_cmd'} = '';
570                 $header->{'clntcompprog'} = '';
571             }
572         } else {
573             my $dle = $header->get_dle();
574             if ($dle &&
575                 (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_receive_unfiltered) ||
576                  $dle->{'compress'} == $Amanda::Config::COMP_SERVER_FAST ||
577                  $dle->{'compress'} == $Amanda::Config::COMP_SERVER_BEST)) {
578                 push @filters,
579                     Amanda::Xfer::Filter::Process->new(
580                         [ $Amanda::Constants::UNCOMPRESS_PATH,
581                           $Amanda::Constants::UNCOMPRESS_OPT ], 0);
582                 # adjust the header
583                 $header->{'compressed'} = 0;
584                 $header->{'uncompress_cmd'} = '';
585             }
586         }
587
588     }
589     $self->{'xfer_filters'} = [ @filters ];
590
591     # only send the header if requested
592     if ($self->{'command'}{'HEADER'}) {
593         $self->send_header();
594     } else {
595         $self->expect_datapath();
596     }
597 }
598
599 sub send_header {
600     my $self = shift;
601
602     my $header = $self->{'header'};
603
604     # filter out some things the remote might not be able to process
605     if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_dle_in_header)) {
606         $header->{'dle_str'} = undef;
607     } else {
608         $header->{'dle_str'} =
609             Amanda::Disklist::clean_dle_str_for_client($header->{'dle_str'},
610                    Amanda::Feature::am_features($self->{'their_features'}));
611     }
612     if (!$self->{'their_features'}->has($Amanda::Feature::fe_amrecover_origsize_in_header)) {
613         $header->{'orig_size'} = 0;
614     }
615
616     # even with fe_amrecover_splits, amrecover doesn't like F_SPLIT_DUMPFILE.
617     $header->{'type'} = $Amanda::Header::F_DUMPFILE;
618
619     my $hdr_str = $header->to_string(32768, 32768);
620     Amanda::Util::full_write($self->wfd($self->{'data_stream'}), $hdr_str, length($hdr_str))
621         or die "writing to $self->{data_stream}: $!";
622
623     $self->expect_datapath();
624 }
625
626 sub expect_datapath {
627     my $self = shift;
628
629     $self->{'datapath'} = 'none';
630
631     # short-circuit this if amrecover doesn't support datapaths
632     if (!$self->{'their_features'}->has($Amanda::Feature::fe_amidxtaped_datapath)) {
633         return $self->start_xfer();
634     }
635
636     my $line = $self->getline($self->{'ctl_stream'});
637     if ($line eq "ABORT\r\n") {
638         return Amanda::MainLoop::quit();
639     }
640     my ($dpspec) = ($line =~ /^AVAIL-DATAPATH (.*)\r\n$/);
641     die "bad AVAIL-DATAPATH line" unless $dpspec;
642     my @avail_dps = split / /, $dpspec;
643
644     if (grep /^DIRECT-TCP$/, @avail_dps) {
645         # remote can handle a directtcp transfer .. can we?
646         if ($self->{'xfer_src_supports_directtcp'}) {
647             $self->{'datapath'} = 'directtcp';
648         } else {
649             $self->{'datapath'} = 'amanda';
650         }
651     } else {
652         # remote can at least handle AMANDA
653         die "remote cannot handle AMANDA datapath??"
654             unless grep /^AMANDA$/, @avail_dps;
655         $self->{'datapath'} = 'amanda';
656     }
657
658     $self->start_xfer();
659 }
660
661 sub start_xfer {
662     my $self = shift;
663
664     # create the appropriate destination based on our datapath
665     my $xfer_dest;
666     if ($self->{'datapath'} eq 'directtcp') {
667         $xfer_dest = Amanda::Xfer::Dest::DirectTCPListen->new();
668     } else {
669         $xfer_dest = Amanda::Xfer::Dest::Fd->new(
670                 $self->wfd($self->{'data_stream'})),
671     }
672
673     if ($self->{'datapath'} eq 'amanda') {
674         $self->sendctlline("USE-DATAPATH AMANDA\r\n");
675         my $dpline = $self->getline($self->{'ctl_stream'});
676         if ($dpline ne "DATAPATH-OK\r\n") {
677             die "expected DATAPATH-OK";
678         }
679     }
680
681     # start reading all filter stderr
682     foreach my $filter (@{$self->{'xfer_filters'}}) {
683         my $fd = $filter->get_stderr_fd();
684         $fd.="";
685         $fd = int($fd);
686         my $src = Amanda::MainLoop::fd_source($fd,
687                                               $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
688         my $buffer = "";
689         $self->{'all_filter'}{$src} = 1;
690         $src->set_callback( sub {
691             my $b;
692             my $n_read = POSIX::read($fd, $b, 1);
693             if (!defined $n_read) {
694                 return;
695             } elsif ($n_read == 0) {
696                 delete $self->{'all_filter'}->{$src};
697                 $src->remove();
698                 POSIX::close($fd);
699                 if (!%{$self->{'all_filter'}} and $self->{'fetch_done'}) {
700                     Amanda::MainLoop::quit();
701                 }
702             } else {
703                 $buffer .= $b;
704                 if ($b eq "\n") {
705                     my $line = $buffer;
706                     #print STDERR "filter stderr: $line";
707                     chomp $line;
708                     $self->sendmessage("filter stderr: $line");
709                     debug("filter stderr: $line");
710                     $buffer = "";
711                 }
712             }
713         });
714     }
715
716     # create and start the transfer
717     $self->{'xfer'} = Amanda::Xfer->new([
718         $self->{'xfer_src'},
719         @{$self->{'xfer_filters'}},
720         $xfer_dest,
721     ]);
722     my $size = 0;
723     $size = $self->{'dump'}->{'bytes'} if exists $self->{'dump'}->{'bytes'};
724     $self->{'xfer'}->start(sub { $self->handle_xmsg(@_); }, 0, $size);
725     debug("started xfer; datapath=$self->{datapath}");
726
727     # send the data-path response, if we have a datapath
728     if ($self->{'datapath'} eq 'directtcp') {
729         my $addrs = $xfer_dest->get_addrs();
730         $addrs = [ map { $_->[0] . ":" . $_->[1] } @$addrs ];
731         $addrs = join(" ", @$addrs);
732         $self->sendctlline("USE-DATAPATH DIRECT-TCP $addrs\r\n");
733         my $dpline = $self->getline($self->{'ctl_stream'});
734         if ($dpline ne "DATAPATH-OK\r\n") {
735             die "expected DATAPATH-OK";
736         }
737     }
738
739     # and let the clerk know
740     $self->{'clerk'}->start_recovery(
741         xfer => $self->{'xfer'},
742         recovery_cb => sub { $self->recovery_cb(@_); });
743 }
744
745 sub handle_xmsg {
746     my $self = shift;
747     my ($src, $msg, $xfer) = @_;
748
749     $self->{'clerk'}->handle_xmsg($src, $msg, $xfer);
750     if ($msg->{'elt'} != $self->{'xfer_src'}) {
751         if ($msg->{'type'} == $XMSG_ERROR) {
752             $self->sendmessage("$msg->{message}");
753         }
754     }
755 }
756
757 sub recovery_cb {
758     my $self = shift;
759     my %params = @_;
760
761     debug("recovery complete");
762     if (@{$params{'errors'}}) {
763         for (@{$params{'errors'}}) {
764             $self->sendmessage("$_");
765         }
766         return $self->quit();
767     }
768
769     # note that the amidxtaped protocol has no way to indicate successful
770     # completion of a transfer
771     if ($params{'result'} ne 'DONE') {
772         warning("NOTE: transfer failed, but amrecover does not know that");
773     }
774
775     $self->finish();
776 }
777
778 sub finish {
779     my $self = shift;
780
781     # close the data fd for writing to signal EOF
782     $self->close($self->{'data_stream'}, 'w');
783
784     $self->quit();
785 }
786
787 sub quit {
788     my $self = shift;
789
790     if ($self->{'clerk'}) {
791         $self->{'clerk'}->quit(finished_cb => sub {
792             my ($err) = @_;
793             $self->{'chg'}->quit() if defined $self->{'chg'};
794             if ($err) {
795                 # it's *way* too late to report this to amrecover now!
796                 warning("while quitting clerk: $err");
797             }
798             $self->quit1();
799         });
800     } else {
801         $self->{'scan'}->quit() if defined $self->{'scan'};
802         $self->{'chg'}->quit() if defined $self->{'chg'};
803         $self->quit1();
804     }
805
806 }
807
808 sub quit1 {
809     my $self = shift;
810
811     $self->{'fetch_done'} = 1;
812     if (!%{$self->{'all_filter'}}) {
813         Amanda::MainLoop::quit();
814     }
815 }
816
817 ## utilities
818
819 sub check_inetd_security {
820     my $self = shift;
821     my ($stream) = @_;
822
823     my $firstline = $self->getline($stream);
824     if ($firstline !~ /^SECURITY (.*)\n/) {
825         warning("did not get security line");
826         print "ERROR did not get security line\r\n";
827         return 0;
828     }
829
830     my $errmsg = $self->check_bsd_security($stream, $1);
831     if ($errmsg) {
832         print "ERROR $errmsg\r\n";
833         return 0;
834     }
835
836     return 1;
837 }
838
839 sub get_req {
840     my $self = shift;
841
842     my $req_str = '';
843     while (1) {
844         my $buf = Amanda::Util::full_read($self->rfd('main'), 1024);
845         last unless $buf;
846         $req_str .= $buf;
847     }
848     # we've read main to EOF, so close it
849     $self->close('main', 'r');
850
851     return $self->{'req'} = $self->parse_req($req_str);
852 }
853
854 sub send_rep {
855     my $self = shift;
856     my ($streams, $errors) = @_;
857     my $rep = '';
858
859     # first, if there were errors in the REQ, report them
860     if (@$errors) {
861         for my $err (@$errors) {
862             $rep .= "ERROR $err\n";
863         }
864     } else {
865         my $connline = $self->connect_streams(@$streams);
866         $rep .= "$connline\n";
867     }
868     # rep needs a empty-line terminator, I think
869     $rep .= "\n";
870
871     # write the whole rep packet, and close main to signal the end of the packet
872     $self->senddata('main', $rep);
873     $self->close('main', 'w');
874 }
875
876 # helper function to get a line, including the trailing '\n', from a stream.  This
877 # reads a character at a time to ensure that no extra characters are consumed.  This
878 # could certainly be more efficient! (TODO)
879 sub getline {
880     my $self = shift;
881     my ($stream) = @_;
882     my $fd = $self->rfd($stream);
883     my $line = '';
884
885     while (1) {
886         my $c;
887         POSIX::read($fd, $c, 1)
888             or last;
889         $line .= $c;
890         last if $c eq "\n";
891     }
892
893     my $chopped = $line;
894     $chopped =~ s/[\r\n]*$//g;
895     debug("CTL << $chopped");
896
897     return $line;
898 }
899
900 # like getline, but async; TODO:
901 #  - make all uses of getline async
902 #  - use buffering to read more than one character at a time
903 sub getline_async {
904     my $self = shift;
905     my ($stream, $async_read_cb) = @_;
906     my $fd = $self->rfd($stream);
907
908     my $data_in;
909     my $buf = '';
910
911     $data_in = sub {
912         my ($err, $data) = @_;
913
914         return $async_read_cb->($err, undef) if $err;
915
916         $buf .= $data;
917         if ($buf =~ /\r\n$/) {
918             my $chopped = $buf;
919             $chopped =~ s/[\r\n]*$//g;
920             debug("CTL << $chopped");
921
922             $async_read_cb->(undef, $buf);
923         } else {
924             Amanda::MainLoop::async_read(fd => $fd, size => 1, async_read_cb => $data_in);
925         }
926     };
927     Amanda::MainLoop::async_read(fd => $fd, size => 1, async_read_cb => $data_in);
928 }
929
930 # helper function to write a data to a stream.  This does not add newline characters.
931 # If the callback is given, this is async (TODO: all calls should be async)
932 sub senddata {
933     my $self = shift;
934     my ($stream, $data, $async_write_cb) = @_;
935     my $fd = $self->wfd($stream);
936
937     if (defined $async_write_cb) {
938         return Amanda::MainLoop::async_write(
939                 fd => $fd,
940                 data => $data,
941                 async_write_cb => $async_write_cb);
942     } else {
943         Amanda::Util::full_write($fd, $data, length($data))
944             or die "writing to $stream: $!";
945     }
946 }
947
948 # send a line on the control stream, or just log it if the ctl stream is gone;
949 # async callback is just like for senddata
950 sub sendctlline {
951     my $self = shift;
952     my ($msg, $async_write_cb) = @_;
953
954     my $chopped = $msg;
955     $chopped =~ s/[\r\n]*$//g;
956
957     if ($self->{'ctl_stream'}) {
958         debug("CTL >> $chopped");
959         return $self->senddata($self->{'ctl_stream'}, $msg, $async_write_cb);
960     } else {
961         debug("not sending CTL message as CTL is closed >> $chopped");
962         if (defined $async_write_cb) {
963             $async_write_cb->(undef, length($msg));
964         }
965     }
966 }
967
968 # send a MESSAGE on the CTL stream, but only if the remote has
969 # fe_amrecover_message
970 sub sendmessage {
971     my $self = shift;
972     my ($msg) = @_;
973
974     if ($self->{'their_features'}->has($Amanda::Feature::fe_amrecover_message)) {
975         $self->sendctlline("MESSAGE $msg\r\n");
976     } else {
977         warning("remote does not understand MESSAGE; not sent: MESSAGE $msg");
978     }
979 }
980
981 # covert a tapespec to a holding filename
982 sub tapespec_to_holding {
983     my $self = shift;
984     my ($tapespec) = @_;
985
986     my $filelist = Amanda::Util::unmarshal_tapespec($tapespec);
987
988     # $filelist should have the form [ $holding_file => [ 0 ] ]
989     die "invalid holding tapespec" unless @$filelist == 2;
990     die "invalid holding tapespec" unless @{$filelist->[1]} == 1;
991     die "invalid holding tapespec" unless $filelist->[1][0] == 0;
992
993     return $filelist->[0];
994 }
995
996 # amrecover didn't give us much to go on, but see if we can find a dump that
997 # will make it happy.
998 sub try_to_find_dump {
999     my $self = shift;
1000     my ($label, $spec) = @_;
1001
1002     # search the catalog; get_dumps cannot search by labels, so we have to use
1003     # get_parts instead
1004     my @parts = Amanda::DB::Catalog::get_parts(
1005         label => $label,
1006         dumpspecs => [ $spec ]);
1007
1008     if (!@parts) {
1009         $self->sendmessage("could not find any matching dumps on volume '$label'");
1010         return undef;
1011     }
1012
1013     # (note that if there is more than one dump in @parts, the planner will
1014     # catch it later)
1015
1016     # sort the parts by their order on each volume.  This sorts the volumes
1017     # lexically by label, but the planner will straighten it out.
1018     @parts = Amanda::DB::Catalog::sort_dumps([ "label", "filenum" ], @parts);
1019
1020     # loop over the parts for the dump and make a filelist.
1021     my $last_label = '';
1022     my $last_filenums = undef;
1023     my $filelist = [];
1024     for my $part (@parts) {
1025         next unless defined $part; # skip part number 0
1026         if ($part->{'label'} ne $last_label) {
1027             $last_label = $part->{'label'};
1028             $last_filenums = [];
1029             push @$filelist, $last_label, $last_filenums;
1030         }
1031         push @$last_filenums, $part->{'filenum'};
1032     }
1033
1034     return $filelist;
1035 }
1036
1037 ##
1038 # main driver
1039
1040 package main;
1041 use Amanda::Debug qw( debug );
1042 use Amanda::Util qw( :constants );
1043 use Amanda::Config qw( :init );
1044
1045 our $exit_status = 0;
1046
1047 sub main {
1048     Amanda::Util::setup_application("amidxtaped", "server", $CONTEXT_DAEMON);
1049     config_init(0, undef);
1050     Amanda::Debug::debug_dup_stderr_to_debug();
1051
1052     my $cs = main::ClientService->new();
1053     Amanda::MainLoop::call_later(sub { $cs->run(); });
1054     Amanda::MainLoop::run();
1055
1056     debug("exiting with $exit_status");
1057     Amanda::Util::finish_application();
1058 }
1059
1060 main();
1061 exit($exit_status);