1 # Copyright (c) 2010-2012 Zmanda, Inc. All Rights Reserved.
3 # This library is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU Lesser General Public
5 #* License as published by the Free Software Foundation; either
6 # version 2.1 of the License, or (at your option) any later version.
8 # This library is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
11 # License for more details.
13 # You should have received a copy of the GNU Lesser General Public License
14 # along with this library; if not, write to the Free Software Foundation,
15 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
17 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
20 package Amanda::Recovery::Clerk;
26 use Amanda::Xfer qw( :constants );
27 use Amanda::Device qw( :constants );
30 use Amanda::Debug qw( :logging );
35 Amanda::Recovery::Clerk - handle assembling dumpfiles from multiple parts
38 my $clerk = Amanda::Recovery::Clerk->new(
43 dump => $dump, # from Amanda::Recovery::Planner or Amanda::DB::Catalog
44 xfer_src_cb => $steps->{'xfer_src_cb'});
47 step xfer_src_cb => sub {
48 my ($errors, $header, $xfer_src, $directtcp_supported) = @_;
49 die join("\n", @$errors) if ($errors);
50 print "restoring from " . $header->summary() . "\n";
52 my $xfer = Amanda::Xfer->new([$xfer_src, $xfer_dest]);
53 $xfer->start(sub { $clerk->handle_xmsg(@_); });
54 $clerk->start_recovery(
56 recovery_cb => $steps->{'recovery_cb'});
59 step recovery_cb => sub {
61 die join("\n", @{$params{'errors'}}) if ($params{'errors'});
62 print "result: $params{result}\n";
67 $clerk->quit(finished_cb => sub {
73 This package is the counterpart to L<Amanda::Taper::Scribe>, and handles
74 re-assembling dumpfiles from multiple parts, possibly distributed over several
77 A Clerk manages a source element in a transfer. The destination is up to the
78 caller - it could be a file, or even an element obtained from a Scribe. A
79 particular Clerk instance only handles one transfer at a time, but maintains
80 its state between transfers, so multiple transfers from the same device will
81 proceed without rewinding or reloading the volume.
83 At a high level, the Clerk is operated as follows: the caller provides a dump,
84 which includes information on the volumes, and file numbers on those volumes,
85 from which to read the data. The dump object is from L<Amanda::DB::Catalog>,
86 usually by awy of L<Amanda::Recovery::Planner>. The Clerk responds with a
87 transfer source element, which the caller then uses to construct an start a
88 transfer. The clerk then uses a changer to find the required volumes, seeks to
89 the appropriate files, and reads the data into the transfer. Note that the
90 clerk can also recover holding-disk files.
92 Because the clerk operates transfers (see L<Amanda::Xfer>) and the Changer API
93 (L<Amanda::Changer>), its operations assume that L<Amanda::MainLoop> is in use.
95 =head1 OPERATING A CLERK
97 To use a Clerk, first create a new object, giving the changer object that the
98 Clerk should use to load devices:
100 my $clerk = Amanda::Recovery::Clerk->new(
103 If the optional parameter C<debug> is given with a true value, then the Clerk
104 will log additional debug information to the Amanda debug logs. This value is
105 also set to true if the C<DEBUG_RECOVERY> configuration parameter is set.
107 The optional C<feedback> parameter gives an object which will handle feedback
108 form the clerk. See FEEDBACK, below.
110 The C<scan> parameter must be an L<Amanda::Recover::Scan> instance, which
111 will be used to find the volumes required for the recovery.
113 =head2 TRANSFERRING A DUMPFILE
115 Next, get a dump object and supply it to the Clerk to get a transfer source
116 element. The Clerk will verify that the supplied dump matches the on-medium
117 header during the recovery operation, taking into account the C<single_part>
118 flag added by the Planner for single-part recoveries.
120 $clerk->get_xfer_src(
122 xfer_src_cb => $xfer_src_cb);
124 During this operation, the Clerk looks up the first part in the dump and
125 fetches its header. Callers often need this header to construct a transfer
126 appropriate to the data on the volume. The C<$xfer_src_cb> is called with a
127 transfer element and with the first header, or with a list of errors if
128 something goes wrong. The final argument is true if the device from which
129 the restore is done supports directtcp.
131 $xfer_src_cb->(undef, $header, $xfer_src, $dtcp_supp); # OK
132 $xfer_src_cb->([ $err, $err2 ], undef, undef, undef); # errors
134 Once C<$xfer_src_cb> has been called, build the transfer element into a
135 transfer, and start the transfer. Send all transfer messages to the clerk:
137 my $xfer->start(sub {
138 my ($src, $msg, $xfer) = @_;
139 $clerk->handle_xmsg($src, $msg, $xfer);
142 Once the transfer is started, inform the Clerk:
144 $clerk->recovery_started(
146 recovery_cb => $recovery_cb);
148 The C<$recovery_cb> will be called when the recovery is complete - either
149 successfully or unsuccessfully. It is called as:
152 result => "DONE", # or "FAILED"
153 errors => [], # or a list of error messages
156 Once the recovery callback has been invoked, it is safe to start a new transfer
159 Note that, because the Clerk only handles one transfer at a time, if dumpfiles
160 are interleaved on a volume then the recovery process will need to seek to and
161 read all parts from one dumpfile before reading any parts from the next
162 dumpfile. Amanda does not generate interleaved dumps, so in practice this
163 limitation is not significant.
167 When all necessary dumpfiles have been transferred, the Clerk must be cleanly
168 shut down. This is done with the C<quit> method:
171 finished_cb => $finished_cb);
173 The process should not exit until the C<finished_cb> has been invoked.
177 A feedback object implements a number of methods that are called at various
178 times in the recovery process, allowing the user to customize that behavior. A
179 user-defined feedback object should inherit from
180 C<Amanda::Recovery::Clerk::Feedback>, which implements no-op versions of all of
183 The C<clerk_notif_part> method is called just before each part is restored, and is
184 given the label, filenum, and header. Its return value, if any, is ignored.
185 Similarly, C<clerk_notif_holding> is called for a holding-disk recovery and is given
186 the holding filename and its header. Note that C<clerk_notif_holding> is called
187 before the C<xfer_src_cb>, since data will begin flowing from a holding disk
188 immediately when the transfer is started.
190 A typical Clerk feedback class might look like:
192 use base 'Amanda::Recovery::Clerk::Feedback';
194 sub clerk_notif_part {
196 my ($label, $filenum, $hdr) = @_;
197 print "restoring part ", $hdr->{'partnum'},
198 " from '$label' file $filenum\n";
208 my $debug = $Amanda::Config::debug_recovery;
209 $debug = $params{'debug'}
210 if defined $params{'debug'} and $params{'debug'} > $debug;
213 scan => $params{'scan'},
215 feedback => $params{'feedback'}
216 || Amanda::Recovery::Clerk::Feedback->new(),
218 current_label => undef,
219 current_dev => undef,
220 current_res => undef,
225 return bless ($self, $class);
232 for my $rq_param (qw(dump xfer_src_cb)) {
233 croak "required parameter '$rq_param' missing"
234 unless exists $params{$rq_param};
237 confess "Clerk is already busy" if $self->{'xfer_state'};
239 # set up a new xfer_state
240 my $xfer_state = $self->{'xfer_state'} = {
241 dump => $params{'dump'},
242 is_holding => exists $params{'dump'}->{'parts'}[1]{'holding_file'},
250 recovery_cb => undef,
251 xfer_src_cb => $params{'xfer_src_cb'},
258 $self->_maybe_start_part();
265 $self->dbg("starting recovery");
266 for my $rq_param (qw(xfer recovery_cb)) {
267 croak "required parameter '$rq_param' missing"
268 unless exists $params{$rq_param};
271 confess "no xfer is in progress" unless $self->{'xfer_state'};
272 confess "get_xfer_src has not finished"
273 if defined $self->{'xfer_state'}->{'xfer_src_cb'};
275 my $xfer_state = $self->{'xfer_state'};
276 $xfer_state->{'recovery_cb'} = $params{'recovery_cb'};
277 $xfer_state->{'xfer'} = $params{'xfer'};
279 $self->_maybe_start_part();
284 my ($src, $msg, $xfer) = @_;
286 if ($msg->{'elt'} == $self->{'xfer_state'}->{'xfer_src'}) {
287 if ($msg->{'type'} == $XMSG_PART_DONE) {
288 $self->_xmsg_part_done($src, $msg, $xfer);
289 } elsif ($msg->{'type'} == $XMSG_ERROR) {
290 $self->_xmsg_error($src, $msg, $xfer);
291 } elsif ($msg->{'type'} == $XMSG_READY) {
292 $self->_xmsg_ready($src, $msg, $xfer);
296 if ($msg->{'type'} == $XMSG_DONE) {
297 $self->_xmsg_done($src, $msg, $xfer);
304 my $finished_cb = $params{'finished_cb'};
306 confess "Cannot quit a Clerk while a transfer is in progress"
307 if $self->{'xfer_state'} and $self->{'xfer_state'}->{'xfer'};
309 my $steps = define_steps
310 cb_ref => \$finished_cb,
311 finalize => sub { $self->{'scan'}->quit() if defined $self->{'scan'} };
313 step release => sub {
314 # if we have a reservation, we need to release it; otherwise, we can
315 # just call finished_cb
316 if ($self->{'current_res'}) {
317 $self->{'current_dev'}->finish();
318 $self->{'current_res'}->release(finished_cb => $finished_cb);
327 my ($src, $msg, $xfer) = @_;
328 my $xfer_state = $self->{'xfer_state'};
330 if (!$xfer_state->{'is_holding'}) {
331 $xfer_state->{'xfer_src_ready'} = 1;
332 $self->_maybe_start_part();
336 sub _xmsg_part_done {
338 my ($src, $msg, $xfer) = @_;
339 my $xfer_state = $self->{'xfer_state'};
341 my $next_label = $xfer_state->{'next_part'}->{'label'};
342 my $next_filenum = $xfer_state->{'next_part'}->{'filenum'};
344 confess "read incorrect filenum"
345 unless $next_filenum == $msg->{'fileno'};
346 $self->dbg("done reading file $next_filenum on '$next_label'");
348 # fix up the accounting, and then see if we can do something else
349 shift @{$xfer_state->{'remaining_plan'}};
350 $xfer_state->{'next_part_idx'}++;
351 $xfer_state->{'next_part'} = undef;
353 $self->_maybe_start_part();
358 my ($src, $msg, $xfer) = @_;
359 my $xfer_state = $self->{'xfer_state'};
361 push @{$xfer_state->{'errors'}}, $msg->{'message'};
366 my ($src, $msg, $xfer) = @_;
367 my $xfer_state = $self->{'xfer_state'};
369 # eliminate the transfer's state, since it's done
370 $self->{'xfer_state'} = undef;
372 # note that this does not release the reservation, in case the next
373 # transfer is from the same volume
374 my $result = (@{$xfer_state->{'errors'}})? "FAILED" : "DONE";
375 return $xfer_state->{'recovery_cb'}->(
377 errors => $xfer_state->{'errors'},
378 bytes_read => $xfer_state->{'xfer_src'}->get_bytes_read()
382 sub _maybe_start_part {
384 my ($finished_cb) = @_;
385 my $xfer_state = $self->{'xfer_state'};
387 # NOTE: this method is invoked *both* from get_xfer_src and start_recovery;
388 # in the former case it merely loads the file and returns the header.
390 # The finished_cb is called when the method is done thinking about starting
391 # a new part, which usually isn't a very interesting event. It can safely
393 $finished_cb ||= sub { };
395 my $steps = define_steps
396 cb_ref => \$finished_cb;
398 step check_ready => sub {
399 # if we're still working on a part, do nothing
400 return $finished_cb->()
401 if $xfer_state->{'writing_part'};
403 # if we have an xfer source already, and it's not ready, then don't start
404 # the part. This happens when start_recovery is called before XMSG_READY.
405 return $finished_cb->()
406 if $xfer_state->{'xfer_src'} and not $xfer_state->{'xfer_src_ready'};
408 # if we have an xfer source already, but the recovery hasn't started, then
409 # don't start the part. This happens when XMSG_READY comes before
411 return $finished_cb->()
412 if $xfer_state->{'xfer_src'} and not $xfer_state->{'recovery_cb'};
414 $steps->{'check_next'}->();
417 step check_next => sub {
418 # first, see if anything remains to be done
419 if (!exists $xfer_state->{'dump'}{'parts'}[$xfer_state->{'next_part_idx'}]) {
420 # this should not happen until the xfer is started..
421 confess "xfer should be running already"
422 unless $xfer_state->{'xfer'};
424 # tell the source to generate EOF
425 $xfer_state->{'xfer_src'}->start_part(undef);
427 return $finished_cb->();
430 $xfer_state->{'next_part'} =
431 $xfer_state->{'dump'}{'parts'}[$xfer_state->{'next_part_idx'}];
433 # short-circuit for a holding disk
434 if ($xfer_state->{'is_holding'}) {
435 return $steps->{'holding_recovery'}->();
438 my $next_label = $xfer_state->{'next_part'}->{'label'};
439 # load the next label, if necessary
440 if ($self->{'current_label'} and
441 $self->{'current_label'} eq $next_label) {
442 # jump to the seek_file call
443 return $steps->{'seek_and_check'}->();
446 # need to get a new tape
447 return $steps->{'release'}->();
450 step release => sub {
451 if (!$self->{'current_res'}) {
452 return $steps->{'released'}->();
455 $self->{'current_dev'}->finish();
456 $self->{'current_res'}->release(
457 finished_cb => $steps->{'released'});
460 step released => sub {
464 push @{$xfer_state->{'errors'}}, "$err";
465 return $steps->{'handle_error'}->();
468 $self->{'current_dev'} = undef;
469 $self->{'current_res'} = undef;
470 $self->{'current_label'} = undef;
472 # now load the next volume
474 my $next_label = $xfer_state->{'next_part'}->{'label'};
476 $self->dbg("loading volume '$next_label'");
477 $self->{'scan'}->find_volume(label => $next_label,
478 res_cb => $steps->{'loaded_label'});
481 step loaded_label => sub {
482 my ($err, $res) = @_;
484 my $next_label = $xfer_state->{'next_part'}->{'label'};
487 push @{$xfer_state->{'errors'}}, "$err";
488 return $steps->{'handle_error'}->();
491 $self->{'current_res'} = $res;
493 # tell the XSR to use this device, before we start it. If we don't actually
494 # end up using this device, it's no big deal.
495 my $dev = $res->{'device'};
496 if ($xfer_state->{'xfer_src'}
497 and $xfer_state->{'xfer_src'}->isa("Amanda::Xfer::Source::Recovery")) {
498 $xfer_state->{'xfer_src'}->use_device($dev);
501 # open the device and check the label, then go to seek_and_check
502 if (!$dev->start($Amanda::Device::ACCESS_READ, undef, undef)) {
503 $err = $dev->error_or_status();
505 if ($dev->volume_label ne $next_label) {
506 $err = "expected volume label '$next_label', but found volume " .
507 "label '" . $dev->volume_label . "'";
509 $self->{'current_dev'} = $dev;
510 $self->{'current_label'} = $dev->volume_label;
513 return $steps->{'seek_and_check'}->();
517 # the volume didn't work out, so release the reservation and fail
518 $res->release(finished_cb => sub {
519 my ($release_err) = @_;
521 if ($release_err) { # geez, someone is having a bad day!
522 push @{$xfer_state->{'errors'}}, "$release_err";
523 return $steps->{'handle_error'}->();
526 push @{$xfer_state->{'errors'}}, "$err";
527 return $steps->{'handle_error'}->();
531 step seek_and_check => sub {
532 my $next_label = $xfer_state->{'next_part'}->{'label'};
533 my $next_filenum = $xfer_state->{'next_part'}->{'filenum'};
534 my $dev = $self->{'current_dev'};
535 my $on_vol_hdr = $dev->seek_file($next_filenum);
538 push @{$xfer_state->{'errors'}}, $dev->error_or_status();
539 return $steps->{'handle_error'}->();
542 if (!$self->_header_expected($on_vol_hdr)) {
543 # _header_expected already pushed an error message or two
544 return $steps->{'handle_error'}->();
547 # now, either start the part, or invoke the xfer_src_cb.
548 if ($xfer_state->{'xfer_src_cb'}) {
549 my $cb = $xfer_state->{'xfer_src_cb'};
550 $xfer_state->{'xfer_src_cb'} = undef;
552 # make a new xfer_source
553 $xfer_state->{'xfer_src'} = Amanda::Xfer::Source::Recovery->new($dev),
554 $xfer_state->{'xfer_src_ready'} = 0;
556 # invoke the xfer_src_cb
557 $self->dbg("successfully located first part for recovery");
558 $cb->(undef, $on_vol_hdr, $xfer_state->{'xfer_src'},
559 $dev->directtcp_supported());
562 # notify caller of the part
563 $self->{'feedback'}->clerk_notif_part($next_label, $next_filenum, $on_vol_hdr);
566 $self->dbg("reading file $next_filenum on '$next_label'");
567 $xfer_state->{'xfer_src'}->start_part($dev);
570 # inform the caller that we're done
576 # handle a holding restore
577 step holding_recovery => sub {
578 my $next_filename = $xfer_state->{'next_part'}->{'holding_file'};
579 my $on_disk_hdr = Amanda::Holding::get_header($next_filename);
582 push @{$xfer_state->{'errors'}}, "error loading header from '$next_filename'";
583 return $steps->{'handle_error'}->();
586 # remove CONT_FILENAME from the header, since it's not needed anymore
587 $on_disk_hdr->{'cont_filename'} = '';
589 if (!$self->_header_expected($on_disk_hdr)) {
590 # _header_expected already pushed an error message or two
591 return $steps->{'handle_error'}->();
594 # now invoke the xfer_src_cb if it hasn't already been called.
595 if ($xfer_state->{'xfer_src_cb'}) {
596 my $cb = $xfer_state->{'xfer_src_cb'};
597 $xfer_state->{'xfer_src_cb'} = undef;
599 $xfer_state->{'xfer_src'} = Amanda::Xfer::Source::Holding->new(
600 $xfer_state->{'dump'}->{'parts'}[1]{'holding_file'}),
602 # Amanda::Xfer::Source::Holding was *born* ready.
603 $xfer_state->{'xfer_src_ready'} = 1;
605 # notify caller of the part, *before* xfer_src_cb is called!
606 $self->{'feedback'}->clerk_notif_holding($next_filename, $on_disk_hdr);
608 $self->dbg("successfully located holding file for recovery");
609 $cb->(undef, $on_disk_hdr, $xfer_state->{'xfer_src'}, 0);
612 # (nothing else to do until the xfer is done)
618 # this utility sub handles errors differently depending on which phase is active.
619 step handle_error => sub {
620 if ($xfer_state->{'xfer_src_cb'}) {
621 # xfer_src_cb hasn't been called yet, so invoke it now,
622 # after deleting the xfer state
623 $self->{'xfer_state'} = undef;
625 $xfer_state->{'xfer_src_cb'}->($xfer_state->{'errors'},
626 undef, undef, undef);
628 # cancelling the xfer will eventually invoke recovery_cb
630 $xfer_state->{'xfer'}->cancel();
638 my ($timestamp) = @_;
639 if (length($timestamp) == 8) {
640 return $timestamp."000000";
645 sub _header_expected {
647 my ($on_vol_hdr) = @_;
648 my $xfer_state = $self->{'xfer_state'};
649 my $next_part = $xfer_state->{'next_part'};
652 if ($on_vol_hdr->{'name'} ne $next_part->{'dump'}->{'hostname'}) {
653 push @errs, "got hostname '$on_vol_hdr->{name}'; " .
654 "expected '$next_part->{dump}->{hostname}'";
656 if ($on_vol_hdr->{'disk'} ne $next_part->{'dump'}->{'diskname'}) {
657 push @errs, "got disk '$on_vol_hdr->{disk}'; " .
658 "expected '$next_part->{dump}->{diskname}'";
660 # zeropad the datestamps before comparing them, to avoid any compliations
661 # from usetimestamps=0
662 if (_zeropad($on_vol_hdr->{'datestamp'})
663 ne _zeropad($next_part->{'dump'}->{'dump_timestamp'})) {
664 push @errs, "got datestamp '$on_vol_hdr->{datestamp}'; " .
665 "expected '$next_part->{dump}->{dump_timestamp}'";
667 if ($on_vol_hdr->{'dumplevel'} != $next_part->{'dump'}->{'level'}) {
668 push @errs, "got dumplevel '$on_vol_hdr->{dumplevel}'; " .
669 "expected '$next_part->{dump}->{level}'";
671 unless ($xfer_state->{'is_holding'}) {
672 if ($on_vol_hdr->{'partnum'} != $next_part->{'partnum'}) {
673 push @errs, "got partnum '$on_vol_hdr->{partnum}'; " .
674 "expected '$next_part->{partnum}'";
680 if ($xfer_state->{'is_holding'}) {
681 $errmsg = "header on '$next_part->{holding_file}' does not match expectations: ";
683 my $label = $next_part->{'label'};
684 my $filenum = $next_part->{'filenum'};
685 $errmsg = "header on '$label' file $filenum does not match expectations: ";
687 $errmsg .= join("; ", @errs);
688 push @{$xfer_state->{'errors'}}, $errmsg;
695 my ($self, $msg) = @_;
696 if ($self->{'debug'}) {
697 debug("Amanda::Recovery::Clerk: $msg");
701 package Amanda::Recovery::Clerk::Feedback;
704 return bless {}, shift;
707 sub clerk_notif_part { }
709 sub clerk_notif_holding { }