1 # Copyright (c) 2010-2012 Zmanda, Inc. All Rights Reserved.
3 # This library is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU Lesser General Public License version 2.1 as
5 # published by the Free Software Foundation.
7 # This library is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
10 # License for more details.
12 # You should have received a copy of the GNU Lesser General Public License
13 # along with this library; if not, write to the Free Software Foundation,
14 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19 package Amanda::Recovery::Clerk;
25 use Amanda::Xfer qw( :constants );
26 use Amanda::Device qw( :constants );
29 use Amanda::Debug qw( :logging );
34 Amanda::Recovery::Clerk - handle assembling dumpfiles from multiple parts
37 my $clerk = Amanda::Recovery::Clerk->new(
42 dump => $dump, # from Amanda::Recovery::Planner or Amanda::DB::Catalog
43 xfer_src_cb => $steps->{'xfer_src_cb'});
46 step xfer_src_cb => sub {
47 my ($errors, $header, $xfer_src, $directtcp_supported) = @_;
48 die join("\n", @$errors) if ($errors);
49 print "restoring from " . $header->summary() . "\n";
51 my $xfer = Amanda::Xfer->new([$xfer_src, $xfer_dest]);
52 $xfer->start(sub { $clerk->handle_xmsg(@_); });
53 $clerk->start_recovery(
55 recovery_cb => $steps->{'recovery_cb'});
58 step recovery_cb => sub {
60 die join("\n", @{$params{'errors'}}) if ($params{'errors'});
61 print "result: $params{result}\n";
66 $clerk->quit(finished_cb => sub {
72 This package is the counterpart to L<Amanda::Taper::Scribe>, and handles
73 re-assembling dumpfiles from multiple parts, possibly distributed over several
76 A Clerk manages a source element in a transfer. The destination is up to the
77 caller - it could be a file, or even an element obtained from a Scribe. A
78 particular Clerk instance only handles one transfer at a time, but maintains
79 its state between transfers, so multiple transfers from the same device will
80 proceed without rewinding or reloading the volume.
82 At a high level, the Clerk is operated as follows: the caller provides a dump,
83 which includes information on the volumes, and file numbers on those volumes,
84 from which to read the data. The dump object is from L<Amanda::DB::Catalog>,
85 usually by awy of L<Amanda::Recovery::Planner>. The Clerk responds with a
86 transfer source element, which the caller then uses to construct an start a
87 transfer. The clerk then uses a changer to find the required volumes, seeks to
88 the appropriate files, and reads the data into the transfer. Note that the
89 clerk can also recover holding-disk files.
91 Because the clerk operates transfers (see L<Amanda::Xfer>) and the Changer API
92 (L<Amanda::Changer>), its operations assume that L<Amanda::MainLoop> is in use.
94 =head1 OPERATING A CLERK
96 To use a Clerk, first create a new object, giving the changer object that the
97 Clerk should use to load devices:
99 my $clerk = Amanda::Recovery::Clerk->new(
102 If the optional parameter C<debug> is given with a true value, then the Clerk
103 will log additional debug information to the Amanda debug logs. This value is
104 also set to true if the C<DEBUG_RECOVERY> configuration parameter is set.
106 The optional C<feedback> parameter gives an object which will handle feedback
107 form the clerk. See FEEDBACK, below.
109 The C<scan> parameter must be an L<Amanda::Recover::Scan> instance, which
110 will be used to find the volumes required for the recovery.
112 =head2 TRANSFERRING A DUMPFILE
114 Next, get a dump object and supply it to the Clerk to get a transfer source
115 element. The Clerk will verify that the supplied dump matches the on-medium
116 header during the recovery operation, taking into account the C<single_part>
117 flag added by the Planner for single-part recoveries.
119 $clerk->get_xfer_src(
121 xfer_src_cb => $xfer_src_cb);
123 During this operation, the Clerk looks up the first part in the dump and
124 fetches its header. Callers often need this header to construct a transfer
125 appropriate to the data on the volume. The C<$xfer_src_cb> is called with a
126 transfer element and with the first header, or with a list of errors if
127 something goes wrong. The final argument is true if the device from which
128 the restore is done supports directtcp.
130 $xfer_src_cb->(undef, $header, $xfer_src, $dtcp_supp); # OK
131 $xfer_src_cb->([ $err, $err2 ], undef, undef, undef); # errors
133 Once C<$xfer_src_cb> has been called, build the transfer element into a
134 transfer, and start the transfer. Send all transfer messages to the clerk:
136 my $xfer->start(sub {
137 my ($src, $msg, $xfer) = @_;
138 $clerk->handle_xmsg($src, $msg, $xfer);
141 Once the transfer is started, inform the Clerk:
143 $clerk->recovery_started(
145 recovery_cb => $recovery_cb);
147 The C<$recovery_cb> will be called when the recovery is complete - either
148 successfully or unsuccessfully. It is called as:
151 result => "DONE", # or "FAILED"
152 errors => [], # or a list of error messages
155 Once the recovery callback has been invoked, it is safe to start a new transfer
158 Note that, because the Clerk only handles one transfer at a time, if dumpfiles
159 are interleaved on a volume then the recovery process will need to seek to and
160 read all parts from one dumpfile before reading any parts from the next
161 dumpfile. Amanda does not generate interleaved dumps, so in practice this
162 limitation is not significant.
166 When all necessary dumpfiles have been transferred, the Clerk must be cleanly
167 shut down. This is done with the C<quit> method:
170 finished_cb => $finished_cb);
172 The process should not exit until the C<finished_cb> has been invoked.
176 A feedback object implements a number of methods that are called at various
177 times in the recovery process, allowing the user to customize that behavior. A
178 user-defined feedback object should inherit from
179 C<Amanda::Recovery::Clerk::Feedback>, which implements no-op versions of all of
182 The C<clerk_notif_part> method is called just before each part is restored, and is
183 given the label, filenum, and header. Its return value, if any, is ignored.
184 Similarly, C<clerk_notif_holding> is called for a holding-disk recovery and is given
185 the holding filename and its header. Note that C<clerk_notif_holding> is called
186 before the C<xfer_src_cb>, since data will begin flowing from a holding disk
187 immediately when the transfer is started.
189 A typical Clerk feedback class might look like:
191 use base 'Amanda::Recovery::Clerk::Feedback';
193 sub clerk_notif_part {
195 my ($label, $filenum, $hdr) = @_;
196 print "restoring part ", $hdr->{'partnum'},
197 " from '$label' file $filenum\n";
207 my $debug = $Amanda::Config::debug_recovery;
208 $debug = $params{'debug'}
209 if defined $params{'debug'} and $params{'debug'} > $debug;
212 scan => $params{'scan'},
214 feedback => $params{'feedback'}
215 || Amanda::Recovery::Clerk::Feedback->new(),
217 current_label => undef,
218 current_dev => undef,
219 current_res => undef,
224 return bless ($self, $class);
231 for my $rq_param (qw(dump xfer_src_cb)) {
232 croak "required parameter '$rq_param' missing"
233 unless exists $params{$rq_param};
236 confess "Clerk is already busy" if $self->{'xfer_state'};
238 # set up a new xfer_state
239 my $xfer_state = $self->{'xfer_state'} = {
240 dump => $params{'dump'},
241 is_holding => exists $params{'dump'}->{'parts'}[1]{'holding_file'},
249 recovery_cb => undef,
250 xfer_src_cb => $params{'xfer_src_cb'},
257 $self->_maybe_start_part();
264 $self->dbg("starting recovery");
265 for my $rq_param (qw(xfer recovery_cb)) {
266 croak "required parameter '$rq_param' missing"
267 unless exists $params{$rq_param};
270 confess "no xfer is in progress" unless $self->{'xfer_state'};
271 confess "get_xfer_src has not finished"
272 if defined $self->{'xfer_state'}->{'xfer_src_cb'};
274 my $xfer_state = $self->{'xfer_state'};
275 $xfer_state->{'recovery_cb'} = $params{'recovery_cb'};
276 $xfer_state->{'xfer'} = $params{'xfer'};
278 $self->_maybe_start_part();
283 my ($src, $msg, $xfer) = @_;
285 if ($msg->{'elt'} == $self->{'xfer_state'}->{'xfer_src'}) {
286 if ($msg->{'type'} == $XMSG_PART_DONE) {
287 $self->_xmsg_part_done($src, $msg, $xfer);
288 } elsif ($msg->{'type'} == $XMSG_ERROR) {
289 $self->_xmsg_error($src, $msg, $xfer);
290 } elsif ($msg->{'type'} == $XMSG_READY) {
291 $self->_xmsg_ready($src, $msg, $xfer);
295 if ($msg->{'type'} == $XMSG_DONE) {
296 $self->_xmsg_done($src, $msg, $xfer);
303 my $finished_cb = $params{'finished_cb'};
305 confess "Cannot quit a Clerk while a transfer is in progress"
306 if $self->{'xfer_state'};
308 my $steps = define_steps
309 cb_ref => \$finished_cb,
310 finalize => sub { $self->{'scan'}->quit() if defined $self->{'scan'} };
312 step release => sub {
313 # if we have a reservation, we need to release it; otherwise, we can
314 # just call finished_cb
315 if ($self->{'current_res'}) {
316 $self->{'current_dev'}->finish();
317 $self->{'current_res'}->release(finished_cb => $finished_cb);
326 my ($src, $msg, $xfer) = @_;
327 my $xfer_state = $self->{'xfer_state'};
329 if (!$xfer_state->{'is_holding'}) {
330 $xfer_state->{'xfer_src_ready'} = 1;
331 $self->_maybe_start_part();
335 sub _xmsg_part_done {
337 my ($src, $msg, $xfer) = @_;
338 my $xfer_state = $self->{'xfer_state'};
340 my $next_label = $xfer_state->{'next_part'}->{'label'};
341 my $next_filenum = $xfer_state->{'next_part'}->{'filenum'};
343 confess "read incorrect filenum"
344 unless $next_filenum == $msg->{'fileno'};
345 $self->dbg("done reading file $next_filenum on '$next_label'");
347 # fix up the accounting, and then see if we can do something else
348 shift @{$xfer_state->{'remaining_plan'}};
349 $xfer_state->{'next_part_idx'}++;
350 $xfer_state->{'next_part'} = undef;
352 $self->_maybe_start_part();
357 my ($src, $msg, $xfer) = @_;
358 my $xfer_state = $self->{'xfer_state'};
360 push @{$xfer_state->{'errors'}}, $msg->{'message'};
365 my ($src, $msg, $xfer) = @_;
366 my $xfer_state = $self->{'xfer_state'};
368 # eliminate the transfer's state, since it's done
369 $self->{'xfer_state'} = undef;
371 # note that this does not release the reservation, in case the next
372 # transfer is from the same volume
373 my $result = (@{$xfer_state->{'errors'}})? "FAILED" : "DONE";
374 return $xfer_state->{'recovery_cb'}->(
376 errors => $xfer_state->{'errors'},
377 bytes_read => $xfer_state->{'xfer_src'}->get_bytes_read()
381 sub _maybe_start_part {
383 my ($finished_cb) = @_;
384 my $xfer_state = $self->{'xfer_state'};
386 # NOTE: this method is invoked *both* from get_xfer_src and start_recovery;
387 # in the former case it merely loads the file and returns the header.
389 # The finished_cb is called when the method is done thinking about starting
390 # a new part, which usually isn't a very interesting event. It can safely
392 $finished_cb ||= sub { };
394 my $steps = define_steps
395 cb_ref => \$finished_cb;
397 step check_ready => sub {
398 # if we're still working on a part, do nothing
399 return $finished_cb->()
400 if $xfer_state->{'writing_part'};
402 # if we have an xfer source already, and it's not ready, then don't start
403 # the part. This happens when start_recovery is called before XMSG_READY.
404 return $finished_cb->()
405 if $xfer_state->{'xfer_src'} and not $xfer_state->{'xfer_src_ready'};
407 # if we have an xfer source already, but the recovery hasn't started, then
408 # don't start the part. This happens when XMSG_READY comes before
410 return $finished_cb->()
411 if $xfer_state->{'xfer_src'} and not $xfer_state->{'recovery_cb'};
413 $steps->{'check_next'}->();
416 step check_next => sub {
417 # first, see if anything remains to be done
418 if (!exists $xfer_state->{'dump'}{'parts'}[$xfer_state->{'next_part_idx'}]) {
419 # this should not happen until the xfer is started..
420 confess "xfer should be running already"
421 unless $xfer_state->{'xfer'};
423 # tell the source to generate EOF
424 $xfer_state->{'xfer_src'}->start_part(undef);
426 return $finished_cb->();
429 $xfer_state->{'next_part'} =
430 $xfer_state->{'dump'}{'parts'}[$xfer_state->{'next_part_idx'}];
432 # short-circuit for a holding disk
433 if ($xfer_state->{'is_holding'}) {
434 return $steps->{'holding_recovery'}->();
437 my $next_label = $xfer_state->{'next_part'}->{'label'};
438 # load the next label, if necessary
439 if ($self->{'current_label'} and
440 $self->{'current_label'} eq $next_label) {
441 # jump to the seek_file call
442 return $steps->{'seek_and_check'}->();
445 # need to get a new tape
446 return $steps->{'release'}->();
449 step release => sub {
450 if (!$self->{'current_res'}) {
451 return $steps->{'released'}->();
454 $self->{'current_dev'}->finish();
455 $self->{'current_res'}->release(
456 finished_cb => $steps->{'released'});
459 step released => sub {
463 push @{$xfer_state->{'errors'}}, "$err";
464 return $steps->{'handle_error'}->();
467 $self->{'current_dev'} = undef;
468 $self->{'current_res'} = undef;
469 $self->{'current_label'} = undef;
471 # now load the next volume
473 my $next_label = $xfer_state->{'next_part'}->{'label'};
475 $self->dbg("loading volume '$next_label'");
476 $self->{'scan'}->find_volume(label => $next_label,
477 res_cb => $steps->{'loaded_label'});
480 step loaded_label => sub {
481 my ($err, $res) = @_;
483 my $next_label = $xfer_state->{'next_part'}->{'label'};
486 push @{$xfer_state->{'errors'}}, "$err";
487 return $steps->{'handle_error'}->();
490 $self->{'current_res'} = $res;
492 # tell the XSR to use this device, before we start it. If we don't actually
493 # end up using this device, it's no big deal.
494 my $dev = $res->{'device'};
495 if ($xfer_state->{'xfer_src'}
496 and $xfer_state->{'xfer_src'}->isa("Amanda::Xfer::Source::Recovery")) {
497 $xfer_state->{'xfer_src'}->use_device($dev);
500 # open the device and check the label, then go to seek_and_check
501 if (!$dev->start($Amanda::Device::ACCESS_READ, undef, undef)) {
502 $err = $dev->error_or_status();
504 if ($dev->volume_label ne $next_label) {
505 $err = "expected volume label '$next_label', but found volume " .
506 "label '" . $dev->volume_label . "'";
508 $self->{'current_dev'} = $dev;
509 $self->{'current_label'} = $dev->volume_label;
512 return $steps->{'seek_and_check'}->();
516 # the volume didn't work out, so release the reservation and fail
517 $res->release(finished_cb => sub {
518 my ($release_err) = @_;
520 if ($release_err) { # geez, someone is having a bad day!
521 push @{$xfer_state->{'errors'}}, "$release_err";
522 return $steps->{'handle_error'}->();
525 push @{$xfer_state->{'errors'}}, "$err";
526 return $steps->{'handle_error'}->();
530 step seek_and_check => sub {
531 my $next_label = $xfer_state->{'next_part'}->{'label'};
532 my $next_filenum = $xfer_state->{'next_part'}->{'filenum'};
533 my $dev = $self->{'current_dev'};
534 my $on_vol_hdr = $dev->seek_file($next_filenum);
537 push @{$xfer_state->{'errors'}}, $dev->error_or_status();
538 return $steps->{'handle_error'}->();
541 if (!$self->_header_expected($on_vol_hdr)) {
542 # _header_expected already pushed an error message or two
543 return $steps->{'handle_error'}->();
546 # now, either start the part, or invoke the xfer_src_cb.
547 if ($xfer_state->{'xfer_src_cb'}) {
548 my $cb = $xfer_state->{'xfer_src_cb'};
549 $xfer_state->{'xfer_src_cb'} = undef;
551 # make a new xfer_source
552 $xfer_state->{'xfer_src'} = Amanda::Xfer::Source::Recovery->new($dev),
553 $xfer_state->{'xfer_src_ready'} = 0;
555 # invoke the xfer_src_cb
556 $self->dbg("successfully located first part for recovery");
557 $cb->(undef, $on_vol_hdr, $xfer_state->{'xfer_src'},
558 $dev->directtcp_supported());
561 # notify caller of the part
562 $self->{'feedback'}->clerk_notif_part($next_label, $next_filenum, $on_vol_hdr);
565 $self->dbg("reading file $next_filenum on '$next_label'");
566 $xfer_state->{'xfer_src'}->start_part($dev);
569 # inform the caller that we're done
575 # handle a holding restore
576 step holding_recovery => sub {
577 my $next_filename = $xfer_state->{'next_part'}->{'holding_file'};
578 my $on_disk_hdr = Amanda::Holding::get_header($next_filename);
581 push @{$xfer_state->{'errors'}}, "error loading header from '$next_filename'";
582 return $steps->{'handle_error'}->();
585 # remove CONT_FILENAME from the header, since it's not needed anymore
586 $on_disk_hdr->{'cont_filename'} = '';
588 if (!$self->_header_expected($on_disk_hdr)) {
589 # _header_expected already pushed an error message or two
590 return $steps->{'handle_error'}->();
593 # now invoke the xfer_src_cb if it hasn't already been called.
594 if ($xfer_state->{'xfer_src_cb'}) {
595 my $cb = $xfer_state->{'xfer_src_cb'};
596 $xfer_state->{'xfer_src_cb'} = undef;
598 $xfer_state->{'xfer_src'} = Amanda::Xfer::Source::Holding->new(
599 $xfer_state->{'dump'}->{'parts'}[1]{'holding_file'}),
601 # Amanda::Xfer::Source::Holding was *born* ready.
602 $xfer_state->{'xfer_src_ready'} = 1;
604 # notify caller of the part, *before* xfer_src_cb is called!
605 $self->{'feedback'}->clerk_notif_holding($next_filename, $on_disk_hdr);
607 $self->dbg("successfully located holding file for recovery");
608 $cb->(undef, $on_disk_hdr, $xfer_state->{'xfer_src'}, 0);
611 # (nothing else to do until the xfer is done)
617 # this utility sub handles errors differently depending on which phase is active.
618 step handle_error => sub {
619 if ($xfer_state->{'xfer_src_cb'}) {
620 # xfer_src_cb hasn't been called yet, so invoke it now,
621 # after deleting the xfer state
622 $self->{'xfer_state'} = undef;
624 $xfer_state->{'xfer_src_cb'}->($xfer_state->{'errors'},
625 undef, undef, undef);
627 # cancelling the xfer will eventually invoke recovery_cb
629 $xfer_state->{'xfer'}->cancel();
637 my ($timestamp) = @_;
638 if (length($timestamp) == 8) {
639 return $timestamp."000000";
644 sub _header_expected {
646 my ($on_vol_hdr) = @_;
647 my $xfer_state = $self->{'xfer_state'};
648 my $next_part = $xfer_state->{'next_part'};
651 if ($on_vol_hdr->{'name'} ne $next_part->{'dump'}->{'hostname'}) {
652 push @errs, "got hostname '$on_vol_hdr->{name}'; " .
653 "expected '$next_part->{dump}->{hostname}'";
655 if ($on_vol_hdr->{'disk'} ne $next_part->{'dump'}->{'diskname'}) {
656 push @errs, "got disk '$on_vol_hdr->{disk}'; " .
657 "expected '$next_part->{dump}->{diskname}'";
659 # zeropad the datestamps before comparing them, to avoid any compliations
660 # from usetimestamps=0
661 if (_zeropad($on_vol_hdr->{'datestamp'})
662 ne _zeropad($next_part->{'dump'}->{'dump_timestamp'})) {
663 push @errs, "got datestamp '$on_vol_hdr->{datestamp}'; " .
664 "expected '$next_part->{dump}->{dump_timestamp}'";
666 if ($on_vol_hdr->{'dumplevel'} != $next_part->{'dump'}->{'level'}) {
667 push @errs, "got dumplevel '$on_vol_hdr->{dumplevel}'; " .
668 "expected '$next_part->{dump}->{level}'";
670 unless ($xfer_state->{'is_holding'}) {
671 if ($on_vol_hdr->{'partnum'} != $next_part->{'partnum'}) {
672 push @errs, "got partnum '$on_vol_hdr->{partnum}'; " .
673 "expected '$next_part->{partnum}'";
679 if ($xfer_state->{'is_holding'}) {
680 $errmsg = "header on '$next_part->{holding_file}' does not match expectations: ";
682 my $label = $next_part->{'label'};
683 my $filenum = $next_part->{'filenum'};
684 $errmsg = "header on '$label' file $filenum does not match expectations: ";
686 $errmsg .= join("; ", @errs);
687 push @{$xfer_state->{'errors'}}, $errmsg;
694 my ($self, $msg) = @_;
695 if ($self->{'debug'}) {
696 debug("Amanda::Recovery::Clerk: $msg");
700 package Amanda::Recovery::Clerk::Feedback;
703 return bless {}, shift;
706 sub clerk_notif_part { }
708 sub clerk_notif_holding { }