Imported Upstream version 3.1.0
[debian/amanda] / perl / Amanda / Taper / Scribe.pm
1 # Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This library is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU Lesser General Public License version 2.1 as
5 # published by the Free Software Foundation.
6 #
7 # This library is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
10 # License for more details.
11 #
12 # You should have received a copy of the GNU Lesser General Public License
13 # along with this library; if not, write to the Free Software Foundation,
14 # Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
15 #
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 package Amanda::Taper::Scribe;
20
21 use strict;
22 use warnings;
23 use Carp;
24
25 use Amanda::Xfer qw( :constants );
26 use Amanda::Device qw( :constants );
27 use Amanda::Header;
28 use Amanda::Debug qw( :logging );
29 use Amanda::MainLoop;
30
31 =head1 NAME
32
33 Amanda::Taper::Scribe
34
35 =head1 SYNOPSIS
36
37   my $scribe = Amanda::Taper::Scribe->new(
38         taperscan => $taperscan_algo,
39         feedback => $feedback_obj);
40
41   $subs{'start_scribe'} = make_cb(start_scribe => sub {
42     $scribe->start($datestamp, finished_cb => $subs{'start_xfer'});
43   });
44
45   $subs{'start_xfer'} = make_cb(start_xfer => sub {
46     my ($err) = @_;
47
48     my $xfer_dest = $scribe->get_xfer_dest(
49         max_memory => 64 * 1024,
50         split_method => 'disk',
51         part_size => 150 * 1024**2,
52         disk_cache_dirname => "$tmpdir/splitbuffer");
53
54     # .. set up the rest of the transfer ..
55
56     $xfer->start(sub {
57         my ($src, $msg, $xfer) = @_;
58         $scribe->handle_xmsg($src, $msg, $xfer);
59         # .. any other processing ..
60     });
61
62     # tell the scribe to start dumping via this transfer
63     $scribe->start_dump(
64         xfer => $xfer,
65         dump_header => $hdr,
66         dump_cb => $subs{'dump_cb'});
67   });
68
69   $subs{'dump_cb'} = make_cb(dump_cb => sub {
70       my %params = @_;
71       # .. handle dump results ..
72
73       print "DONE\n";
74       Amanda::MainLoop::quit();
75   });
76
77
78   $subs{'start_scribe'}->();
79   Amanda::MainLoop::run();
80
81 =head1 OVERVIEW
82
83 This package provides a high-level abstraction of Amanda's procedure for
84 writing dumpfiles to tape.
85
86 Amanda writes a sequence of dumpfiles to a sequence of volumes.  The
87 volumes are supplied by a taperscan algorithm, which operates a changer
88 to find and load each volume.  As dumpfiles are written to volumes and
89 those volumes fill up, the taperscan algorithm supplies additional
90 volumes.
91
92 In order to reduce internal fragmentation within volumes, Amanda can "split"
93 dumpfiles into smaller pieces, so that the overall dumpfile can span multiple
94 volumes.  Each "part" is written to the volume in sequence.  If a device
95 encounters an error while writing a part, then that part is considered
96 "partial", and is rewritten from its beginning on the next volume.  Some
97 devices can reliably indicate that they are full (EOM), and for these devices
98 parts are simply truncated, and the Scribe starts the next part on the next
99 volume.
100
101 To facilitate rewriting parts on devices which cannot indicate EOM, Amanda must
102 retain all of the data in a part, even after that data is written to the
103 volume.  The Scribe provides several methods to support this: caching the part
104 in memory, caching the part in a special on-disk file, or relying on
105 pre-existing on-disk storage.  The latter method is used when reading from
106 holding disks.
107
108 The details of efficiently splitting dumpfiles and rewriting parts are handled
109 by the low-level C<Amanda::Xfer::Dest::Taper> subclasses.  The Scribe creates
110 an instance of the appropriate subclass and supplies it with volumes from an
111 C<Amanda::Taper::Scan> object.  It calls a number of
112 C<Amanda::Taper::Scribe::Feedback> methods to indicate the status of the dump
113 process and to request permission for each additional volume.
114
115 =head1 OPERATING A SCRIBE
116
117 The C<Amanda::Taper::Scribe> constructor takes two arguments:
118 C<taperscan> and C<feedback>.  The first specifies the taper scan
119 algorithm that the Scribe should use, and the second specifies the
120 C<Feedback> object that will receive notifications from the Scribe (see
121 below).
122
123   my $scribe = Amanda::Taper::Scribe->new(
124         taperscan => $my_taperscan,
125         feedback => $my_feedback);
126
127 Once the object is in place, call its C<start> method.
128
129 =head2 START THE SCRIBE
130
131 Start the scribe's operation by calling its C<start> method.  This will invoke
132 the taperscan algorithm and scan for a volume.  The method takes two parameters:
133
134   $scribe->start(
135         dump_timestamp => $ts,
136         finished_cb => $start_finished_cb);
137
138 The timestamp will be written to each volume written by the Scribe.  The
139 C<finished_cb> will be called with a single argument - C<undef> or an error
140 message - when the Scribe is ready to start its first dump.  The Scribe is
141 "ready" when it has found a device to which it can write, although it does not
142 request permission to overwrite that volume, nor start overwriting it, until
143 the first dump begins (that is, until the first call to C<start_dump>).
144
145 =head2 SET UP A TRANSFER
146
147 Once the Scribe is started, begin transferring a dumpfile.  This is a
148 three-step process: first, get an C<Amanda::Xfer::Dest::Taper> object from the
149 Scribe, then start the transfer, and finally let the Scribe know that the
150 transfer has started.  Note that the Scribe supplies and manages the transfer
151 destination, but the transfer itself remains the responsibility of the caller.
152
153 =head3 Get a Transfer Destination
154
155 Call C<get_xfer_dest> to get the transfer element, supplying information on how
156 the dump should be split:
157
158   $xdest = $scribe->get_xfer_dest(
159         max_memory => $max_memory,
160         # .. split parameters
161         );
162
163 This method must be called after C<start> has completed, and will always return
164 a transfer element immediately.
165
166 The underlying C<Amanda::Xfer::Dest::Taper> handles device streaming
167 properly.  It uses C<max_memory> bytes of memory for this purpose.
168
169 The arguments to C<get_xfer_dest> differ for the various split methods.
170 For no splitting:
171
172   $scribe->get_xfer_dest(
173         # ...
174         split_method => 'none');
175
176 For buffering the split parts in memory:
177
178   $scribe->get_xfer_dest(
179         # ...
180         split_method => 'memory',
181         part_size => $part_size);
182
183 For buffering the split parts on disk:
184
185   $scribe->get_xfer_dest(
186         # ...
187         split_method => 'disk',
188         part_size => $part_size,
189         disk_cache_dirname => $disk_cache_dirname);
190
191 Finally, if the transfer source is capable of calling
192 C<Amanda::Xfer::Dest::Taper>'s C<cache_inform> method:
193
194   $scribe->get_xfer_dest(
195         # ...
196         split_method => 'cache_inform',
197         part_size => $part_size);
198
199 An C<Amanda::Taper::Scribe> object can only run one transfer at a time, so
200 do not call C<get_xfer_dest> until the C<dump_cb> for the previous C<start_dump>
201 has been called.
202
203 =head3 Start the Transfer
204
205 Armed with the element returned by C<get_xfer_dest>, the caller should create a
206 source element and a transfer object and start the transfer.  In order to
207 manage the splitting process, the Scribe needs to be informed, via its
208 C<handle_xmsg> method, of all transfer messages .  This is usually accomplished
209 with something like:
210
211   $xfer->start(sub {
212       my ($src, $msg, $xfer) = @_;
213       $scribe->handle_xmsg($src, $msg, $xfer);
214   });
215
216 =head3 Inform the Scribe
217
218 Once the transfer has started, the Scribe is ready to begin writing parts to
219 the volume.  This is the first moment at which the Scribe needs a header, too.
220 All of this is supplied to the C<start_dump> method:
221
222   $scribe->start_dump(
223       xfer => $xfer,
224       dump_header => $hdr,
225       dump_cb => $dump_cb);
226
227 The c<dump_header> here is the header that will be applied to all parts of the
228 dumpfile.  The only field in the header that the Scribe controls is the part
229 number.  The C<dump_cb> callback passed to C<start_dump> is called when the
230 dump is completely finished - either successfully or with a fatal error.
231 Unlike most callbacks, this one takes keyword arguments, since it has so many
232 parameters.
233
234   $dump_cb->(
235         result => $result,
236         device_errors => $device_errors,
237         size => $size,
238         duration => $duration,
239         total_duration => $total_duration);
240
241 All parameters will be present on every call.
242
243 The C<result> is one of C<"FAILED">, C<"PARTIAL">, or C<"DONE">.  Even when
244 C<dump_cb> reports a fatal error, C<result> may be C<"PARTIAL"> if some data
245 was written successfully.
246
247 The final parameters, C<size> (in bytes), C<duration>, and C<total_duration>
248 (in seconds) describe the total transfer, and are a sum of all of the parts
249 written to the device.  Note that C<duration> does not include time spent
250 operating the changer, while C<total_duration> reflects the time from the
251 C<start_dump> call to the invocation of the C<dump_cb>.
252
253 TODO: cancel_dump
254
255 =head2 QUIT
256
257 When all of the dumpfiles are transferred, call the C<quit> method to
258 release any resources and clean up.  This method takes a typical
259 C<finished_cb>.
260
261   $scribe->quit(finished_cb => sub {
262     print "ALL DONE!\n";
263   });
264
265 =head2 GET_BYTES_WRITTEN
266
267 The C<get_bytes_written> returns the number of bytes written to the device at
268 the time of the call, and is meant to be used for status reporting.  This value
269 is updated at least as each part is finished; for some modes of operation, it
270 is updated continuously.  Notably, DirectTCP transfers do not update
271 continuously.
272
273 =head1 FEEDBACK
274
275 The C<Amanda::Taper::Scribe::Feedback> class is intended to be
276 subclassed by the user.  It provides a number of notification methods
277 that enable the historical logging and driver/taper interactions
278 required by Amanda.  The parent class does nothing of interest, but
279 allows subclasses to omit methods they do not need.
280
281 The C<request_volume_permission> method provides a means for the caller
282 to limit the number of volumes the Scribe consumes.  It is called as
283
284   $fb->request_volume_permission(perm_cb => $cb);
285
286 where the C<perm_cb> is a callback which expects a single argument:
287 C<undef> if permission is granted, or reason (as a string) if permission
288 is denied.  The default implementation always calls C<< perm_cb->(undef) >>.
289
290 All of the remaining methods are notifications, and do not take a
291 callback.
292
293   $fb->notif_new_tape(
294         error => $error,
295         volume_label => $volume_label);
296
297 The Scribe calls C<notif_new_tape> when a new volume is started.  If the
298 C<volume_label> is undefined, then the volume was not successfully
299 relabled, and its previous contents may still be available.  If C<error>
300 is defined, then no useful data was written to the volume.  Note that
301 C<error> and C<volume_label> may I<both> be defined if the previous
302 contents of the volume were erased, but no useful, new data was written
303 to the volume.
304
305 This method will be called exactly once for every call to
306 C<request_volume_permission> that calls C<< perm_cb->(undef) >>.
307
308   $fb->notif_part_done(
309         partnum => $partnum,
310         fileno => $fileno,
311         successful => $successful,
312         size => $size,
313         duration => $duration);
314
315 The Scribe calls C<notif_part_done> for each part written to the volume,
316 including partial parts.  If the part was not written successfully, then
317 C<successful> is false.  The C<size> is in bytes, and the C<duration> is
318 a floating-point number of seconds.  If a part fails before a new device
319 file is created, then C<fileno> may be zero.
320
321 Finally, the Scribe sends a few historically significant trace log messages
322 via C<notif_log_info>:
323
324   $fb->notif_log_info(
325         message => $message);
326
327 A typical Feedback subclass might begin like this:
328
329   package main::Feedback;
330   use base 'Amanda::Taper::Scribe::Feedback';
331
332   sub request_volume_permission {
333     my $self = shift;
334     my %params = @_;
335
336     $params{'perm_cb'}->("NO VOLUMES FOR YOU!");
337   }
338
339 =cut
340
341 sub new {
342     my $class = shift;
343     my %params = @_;
344
345     for my $rq_param qw(taperscan feedback) {
346         croak "required parameter '$rq_param' mising"
347             unless exists $params{$rq_param};
348     }
349
350     my $self = {
351         feedback => $params{'feedback'},
352         debug => $params{'debug'},
353         dump_timestamp => undef,
354         started => 0,
355
356         # device handling, and our current device and reservation
357         devhandling => Amanda::Taper::Scribe::DevHandling->new(
358             taperscan => $params{'taperscan'},
359             feedback => $params{'feedback'},
360         ),
361         reservation => undef,
362         device => undef,
363         device_size => undef,
364         device_at_eom => undef, # device still exists, but is full
365
366         # callback passed to start_dump
367         dump_cb => undef,
368
369         # information for the current dumpfile
370         dump_header => undef,
371         split_method => undef,
372         xfer => undef,
373         xdt => undef,
374         xdt_ready => undef,
375         start_part_on_xdt_ready => 0,
376         size => 0,
377         duration => 0.0,
378         dump_start_time => undef,
379         last_part_successful => 0,
380         started_writing => 0,
381         device_errors => [],
382     };
383
384     return bless ($self, $class);
385 }
386
387 sub start {
388     my $self = shift;
389     my %params = @_;
390
391     for my $rq_param qw(dump_timestamp finished_cb) {
392         croak "required parameter '$rq_param' missing"
393             unless exists $params{$rq_param};
394     }
395
396     die "scribe already started" if $self->{'started'};
397
398     $self->dbg("starting");
399     $self->{'dump_timestamp'} = $params{'dump_timestamp'};
400
401     # start up the DevHandling object, making sure we know
402     # when it's done with its startup process
403     $self->{'devhandling'}->start(finished_cb => sub {
404         $self->{'started'} = 1;
405         $params{'finished_cb'}->(@_);
406     });
407 }
408
409 sub quit {
410     my $self = shift;
411     my %params = @_;
412
413     for my $rq_param qw(finished_cb) {
414         croak "required parameter '$rq_param' mising"
415             unless exists $params{$rq_param};
416     }
417
418     $self->_log_volume_done();
419
420     # since there's little other option than to barrel on through the
421     # quitting procedure, quit() just accumulates its error messages
422     # and, if necessary, concantenates them for the finished_cb.
423     my @errors;
424
425     if ($self->{'xfer'}) {
426         die "Scribe cannot quit while a transfer is active";
427         # Supporting this would be complicated:
428         # - cancel the xfer and wait for it to complete
429         # - ensure that the taperscan not be started afterward
430         # and isn't required for normal Amanda operation.
431     }
432
433     $self->dbg("quitting");
434
435     my $cleanup_cb = make_cb(cleanup_cb => sub {
436         my ($error) = @_;
437         push @errors, $error if $error;
438
439         if (@errors == 1) {
440             $error = $errors[0];
441         } elsif (@errors > 1) {
442             $error = join("; ", @errors);
443         }
444
445         $params{'finished_cb'}->($error);
446     });
447
448     if ($self->{'reservation'}) {
449         if ($self->{'device'}) {
450             if (!$self->{'device'}->finish()) {
451                 push @errors, $self->{'device'}->error_or_status();
452             }
453         }
454
455         $self->{'reservation'}->release(finished_cb => $cleanup_cb);
456     } else {
457         $cleanup_cb->(undef);
458     }
459 }
460
461 # Get a transfer destination; does not use a callback
462 sub get_xfer_dest {
463     my $self = shift;
464     my %params = @_;
465
466     for my $rq_param qw(max_memory split_method) {
467         croak "required parameter '$rq_param' missing"
468             unless exists $params{$rq_param};
469     }
470
471     die "Scribe is not started yet" unless $self->{'started'};
472
473     $self->dbg("get_xfer_dest(split_method=$params{split_method})");
474
475     if ($params{'split_method'} ne 'none') {
476         croak("required parameter 'part_size' missing")
477             unless exists $params{'part_size'};
478     }
479
480     $self->{'split_method'} = $params{'split_method'};
481     my ($part_size, $use_mem_cache, $disk_cache_dirname) = (0, 0, undef);
482     if ($params{'split_method'} eq 'none') {
483         $part_size = 0;
484     } elsif ($params{'split_method'} eq 'memory') {
485         $part_size = $params{'part_size'};
486         $use_mem_cache = 1;
487     } elsif ($params{'split_method'} eq 'disk') {
488         $part_size = $params{'part_size'};
489         croak("required parameter 'disk_cache_dirname' missing")
490             unless exists $params{'disk_cache_dirname'};
491         $disk_cache_dirname = $params{'disk_cache_dirname'};
492     } elsif ($params{'split_method'} eq 'cache_inform') {
493         $part_size = $params{'part_size'};
494         $use_mem_cache = 0;
495     } else {
496         croak("invalid split_method $params{split_method}");
497     }
498
499     debug("Amanda::Taper::Scribe setting up a transfer with split method $params{split_method}");
500
501     die "not yet started"
502         unless ($self->{'dump_timestamp'});
503     die "xfer element already returned"
504         if ($self->{'xdt'});
505     die "xfer already running"
506         if ($self->{'xfer'});
507
508     $self->{'xfer'} = undef;
509     $self->{'xdt'} = undef;
510     $self->{'size'} = 0;
511     $self->{'duration'} = 0.0;
512     $self->{'dump_start_time'} = undef;
513     $self->{'last_part_successful'} = 1;
514     $self->{'started_writing'} = 0;
515     $self->{'device_errors'} = [];
516
517     # set the callback
518     $self->{'dump_cb'} = undef;
519
520     # to build an xfer destination, we need a device, although we don't necessarily
521     # need permission to write to it yet.  So we can either use a device we already
522     # have, or we "peek" at the DevHandling object's device.
523     my $xdt_first_dev;
524     if (defined $self->{'device'}) {
525         $xdt_first_dev = $self->{'device'};
526     } else {
527         $xdt_first_dev = $self->{'devhandling'}->peek_device();
528     }
529
530     if (!defined $xdt_first_dev) {
531         die "no device is available to create an xfer_dest";
532     }
533
534     # set the device to verbose logging if we're in debug mode
535     if ($self->{'debug'}) {
536         $xdt_first_dev->property_set("verbose", 1);
537     }
538
539     my $use_directtcp = $xdt_first_dev->directtcp_supported();
540
541     my $xdt;
542     if ($use_directtcp) {
543         # note: using the current configuration scheme, the user must specify either
544         # a disk cache or a fallback_splitsize in order to split a directtcp dump; the
545         # fix is to use a better set of config params for splitting
546         $xdt = Amanda::Xfer::Dest::Taper::DirectTCP->new(
547             $xdt_first_dev, $part_size);
548         $self->{'xdt_ready'} = 0; # xdt isn't ready until we get XMSG_READY
549     } else {
550         $xdt = Amanda::Xfer::Dest::Taper::Splitter->new(
551             $xdt_first_dev, $params{'max_memory'}, $part_size,
552             $use_mem_cache, $disk_cache_dirname);
553         $self->{'xdt_ready'} = 1; # xdt is ready immediately
554     }
555     $self->{'start_part_on_xdt_ready'} = 0;
556     $self->{'xdt'} = $xdt;
557
558     return $xdt;
559 }
560
561 sub start_dump {
562     my $self = shift;
563     my %params = @_;
564
565     die "no xfer dest set up; call get_xfer_dest first"
566         unless defined $self->{'xdt'};
567
568     # get the header ready for writing (totalparts was set by the caller)
569     $self->{'dump_header'} = $params{'dump_header'};
570     $self->{'dump_header'}->{'partnum'} = 1;
571
572     # set up the dump_cb for when this dump is done, and keep the xfer
573     $self->{'dump_cb'} = $params{'dump_cb'};
574     $self->{'xfer'} = $params{'xfer'};
575     $self->{'dump_start_time'} = time;
576
577     # and start the part
578     $self->_start_part();
579 }
580
581 sub cancel_dump {
582     my $self = shift;
583     my %params = @_;
584
585     die "no xfer dest set up; call get_xfer_dest first"
586         unless defined $self->{'xdt'};
587
588     # set up the dump_cb for when this dump is done, and keep the xfer
589     $self->{'dump_cb'} = $params{'dump_cb'};
590     $self->{'xfer'} = $params{'xfer'};
591
592     # The cancel should call dump_cb, but the xfer stay hanged in accept.
593     # That's why dump_cb is called and xdt and xfer are set to undef.
594     $self->{'xfer'}->cancel();
595
596     $self->{'dump_cb'}->(
597         result => "FAILED",
598         device_errors => [],
599         size => 0,
600         duration => 0.0,
601         total_duration => 0);
602     $self->{'xdt'} = undef;
603     $self->{'xfer'} = undef;
604 }
605
606 sub get_bytes_written {
607     my ($self) = @_;
608
609     if (defined $self->{'xdt'}) {
610         return $self->{'size'} + $self->{'xdt'}->get_part_bytes_written();
611     } else {
612         return $self->{'size'};
613     }
614 }
615
616 sub _start_part {
617     my $self = shift;
618
619     $self->dbg("trying to start part");
620
621     # if the xdt isn't ready yet, wait until it is; note that the XDT is still
622     # using the device right now, so we can't even label it yet.
623     if (!$self->{'xdt_ready'}) {
624         $self->dbg("XDT not ready yet; waiting until it is");
625         $self->{'start_part_on_xdt_ready'} = 1;
626         return
627     }
628
629     # we need an actual, permitted device at this point, so if we don't have
630     # one, then defer this start_part call until we do.  The device may still
631     # exist, but be at EOM, if the last dump failed at EOM and was not retried
632     # on a new volume.
633     if (!$self->{'device'} or $self->{'device_at_eom'}) {
634         # _get_new_volume calls _start_part when it has a new volume in hand
635         return $self->_get_new_volume();
636     }
637
638     # if the dump wasn't successful, and we're not splitting, then bail out.  It's
639     # up to higher-level components to re-try this dump on a new volume, if desired.
640     # Note that this should be caught in the XMSG_PART_DONE handler -- this is just
641     # here for backup.
642     if (!$self->{'last_part_successful'} and $self->{'split_method'} eq 'none') {
643         $self->_operation_failed("No space left on device (uncaught)");
644         return;
645     }
646
647     # and start writing this part
648     $self->{'started_writing'} = 1;
649     $self->dbg("resuming transfer");
650     $self->{'xdt'}->start_part(!$self->{'last_part_successful'},
651                                $self->{'dump_header'});
652 }
653
654 sub handle_xmsg {
655     my $self = shift;
656     my ($src, $msg, $xfer) = @_;
657
658     if ($msg->{'type'} == $XMSG_DONE) {
659         $self->_xmsg_done($src, $msg, $xfer);
660         return;
661     }
662
663     # for anything else we only pay attention to messages from
664     # our own element
665     if ($msg->{'elt'} == $self->{'xdt'}) {
666         $self->dbg("got msg from xfer dest: $msg");
667         if ($msg->{'type'} == $XMSG_PART_DONE) {
668             $self->_xmsg_part_done($src, $msg, $xfer);
669         } elsif ($msg->{'type'} == $XMSG_READY) {
670             $self->_xmsg_ready($src, $msg, $xfer);
671         } elsif ($msg->{'type'} == $XMSG_ERROR) {
672             $self->_xmsg_error($src, $msg, $xfer);
673         }
674     }
675 }
676
677 sub _xmsg_part_done {
678     my $self = shift;
679     my ($src, $msg, $xfer) = @_;
680
681         # this handles successful zero-byte parts as a special case - they
682         # are an implementation detail of the splitting done by the transfer
683         # destination.
684
685     if ($msg->{'successful'} and $msg->{'size'} == 0) {
686         $self->dbg("not notifying for empty, successful part");
687     } else {
688         # double-check partnum
689         die "Part numbers do not match!"
690             unless ($self->{'dump_header'}->{'partnum'} == $msg->{'partnum'});
691
692         # notify
693         $self->{'feedback'}->notif_part_done(
694             partnum => $msg->{'partnum'},
695             fileno => $msg->{'fileno'},
696             successful => $msg->{'successful'},
697             size => $msg->{'size'},
698             duration => $msg->{'duration'});
699     }
700
701     $self->{'last_part_successful'} = $msg->{'successful'};
702
703     if ($msg->{'successful'}) {
704         $self->{'device_size'} += $msg->{'size'};
705         $self->{'size'} += $msg->{'size'};
706         $self->{'duration'} += $msg->{'duration'};
707     }
708
709     if (!$msg->{'eof'}) {
710         # update the header for the next dumpfile, if this was a non-empty part
711         if ($msg->{'successful'} and $msg->{'size'} != 0) {
712             $self->{'dump_header'}->{'partnum'}++;
713         }
714
715         if ($msg->{'eom'}) {
716             # if there's an error finishing the device, it's probably just carryover
717             # from the error the Xfer::Dest::Taper encountered while writing to the
718             # device, so we ignore it.
719             if (!$self->{'device'}->finish()) {
720                 my $devname = $self->{'device'}->device_name;
721                 my $errmsg = $self->{'device'}->error_or_status();
722                 $self->dbg("ignoring error while finishing device '$devname': $errmsg");
723             }
724
725             # if the part failed..
726             if (!$msg->{'successful'}) {
727                 # if no caching was going on, then the dump has failed
728                 if ($self->{'split_method'} eq 'none') {
729                     # mark this device as at EOM, since we are not going to look
730                     # for another one yet
731                     $self->{'device_at_eom'} = 1;
732
733                     my $msg = "No space left on device";
734                     if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
735                         $msg = $self->{'device'}->error_or_status();
736                     }
737                     $self->_operation_failed($msg);
738                     return;
739                 }
740
741                 # log a message for amreport
742                 $self->{'feedback'}->notif_log_info(
743                     message => "Will request retry of failed split part.");
744             }
745
746             # get a new volume, then go on to the next part
747             $self->_get_new_volume();
748         } else {
749             # if the part was unsuccessful, but the xfer dest has reason to believe
750             # this is not due to EOM, then the dump is done
751             if (!$msg->{'successful'}) {
752                 my $msg = "unknown error while dumping";
753                 if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
754                     $msg = $self->{'device'}->error_or_status();
755                 }
756                 $self->_operation_failed($msg);
757                 return;
758             }
759
760             # no EOM -- go on to the next part
761             $self->_start_part();
762         }
763     }
764 }
765
766 sub _xmsg_ready {
767     my $self = shift;
768     my ($src, $msg, $xfer) = @_;
769
770     $self->dbg("XDT is ready");
771     $self->{'xdt_ready'} = 1;
772     if ($self->{'start_part_on_xdt_ready'}) {
773         $self->{'start_part_on_xdt_ready'} = 0;
774         $self->_start_part();
775     }
776 }
777
778 sub _xmsg_error {
779     my $self = shift;
780     my ($src, $msg, $xfer) = @_;
781
782     # XMSG_ERROR from the XDT is always fatal
783     $self->_operation_failed($msg->{'message'});
784 }
785
786 sub _xmsg_done {
787     my $self = shift;
788     my ($src, $msg, $xfer) = @_;
789
790     if ($msg->{'type'} == $XMSG_DONE) {
791         $self->dbg("transfer is complete");
792         $self->_dump_done();
793     }
794 }
795
796 sub _dump_done {
797     my $self = shift;
798
799     my $result;
800
801     # determine the correct final status - DONE if we're done, PARTIAL
802     # if we've started writing to the volume, otherwise FAILED
803     if (@{$self->{'device_errors'}}) {
804         $result = $self->{'started_writing'}? 'PARTIAL' : 'FAILED';
805     } else {
806         $result = 'DONE';
807     }
808
809     my $dump_cb = $self->{'dump_cb'};
810     my %dump_cb_args = (
811         result => $result,
812         device_errors => $self->{'device_errors'},
813         size => $self->{'size'},
814         duration => $self->{'duration'},
815         total_duration => time - $self->{'dump_start_time'});
816
817     # reset everything and let the original caller know we're done
818     $self->{'xfer'} = undef;
819     $self->{'xdt'} = undef;
820     $self->{'dump_header'} = undef;
821     $self->{'dump_cb'} = undef;
822     $self->{'size'} = 0;
823     $self->{'duration'} = 0.0;
824     $self->{'dump_start_time'} = undef;
825     $self->{'device_errors'} = [];
826
827     # and call the callback
828     $dump_cb->(%dump_cb_args);
829 }
830
831 sub _operation_failed {
832     my $self = shift;
833     my ($error) = @_;
834
835     $self->dbg("operation failed: $error");
836
837     push @{$self->{'device_errors'}}, $error;
838
839     # cancelling the xdt will eventually cause an XMSG_DONE, which will notice
840     # the error and set the result correctly; but if there's no xfer, then we
841     # can just call _dump_done directly.
842     if (defined $self->{'xfer'}) {
843         $self->dbg("cancelling the transfer: $error");
844
845         $self->{'xfer'}->cancel();
846     } else {
847         if (defined $self->{'dump_cb'}) {
848             # _dump_done uses device_errors, set above
849             $self->_dump_done();
850         } else {
851             die "error with no callback to handle it: $error";
852         }
853     }
854 }
855
856 sub _log_volume_done {
857     my $self = shift;
858
859     # if we've already written a volume, log it
860     if ($self->{'device'} and defined $self->{'device'}->volume_label) {
861         my $label = $self->{'device'}->volume_label();
862         my $fm = $self->{'device'}->file();
863         my $kb = $self->{'device_size'} / 1024;
864
865         # log a message for amreport
866         $self->{'feedback'}->notif_log_info(
867             message => "tape $label kb $kb fm $fm [OK]");
868     }
869 }
870
871 # invoke the devhandling to get a new device, with all of the requisite
872 # notifications and checks and whatnot.  On *success*, call _start_dump; on
873 # failure, call other appropriate methods.
874 sub _get_new_volume {
875     my $self = shift;
876
877     $self->_log_volume_done();
878     $self->{'device'} = undef;
879     $self->{'device_at_eom'} = 0;
880
881     # release first, if necessary
882     if ($self->{'reservation'}) {
883         my $res = $self->{'reservation'};
884
885         $self->{'reservation'} = undef;
886         $self->{'device'} = undef;
887
888         $res->release(finished_cb => sub {
889             my ($error) = @_;
890
891             if ($error) {
892                 $self->_operation_failed($error);
893             } else {
894                 $self->_get_new_volume();
895             }
896         });
897
898         return;
899     }
900
901     $self->{'devhandling'}->get_volume(volume_cb => sub { $self->_volume_cb(@_); });
902 }
903
904 sub _volume_cb  {
905     my $self = shift;
906     my ($scan_error, $request_denied_reason, $reservation,
907         $new_label, $access_mode, $is_new) = @_;
908
909     # note that we prefer the request_denied_reason over the scan error.  If
910     # both occurred, then the results of the scan are immaterial -- we
911     # shouldn't have been looking for a new volume anyway.
912
913     if ($request_denied_reason) {
914         $self->_operation_failed($request_denied_reason);
915         return;
916     }
917
918     if ($scan_error) {
919         # we had permission to use a tape, but didn't find a tape, so we need
920         # to notify of such
921         $self->{'feedback'}->notif_new_tape(
922             error => $scan_error,
923             volume_label => undef);
924
925         $self->_operation_failed($scan_error);
926         return;
927     }
928
929     $self->dbg("got new volume; writing new label");
930
931     # from here on, if an error occurs, we must send notif_new_tape, and look
932     # for a new volume
933     $self->{'reservation'} = $reservation;
934     $self->{'device_size'} = 0;
935     my $device = $self->{'device'} = $reservation->{'device'};
936
937     # turn on verbose logging now, if we need it
938     if ($self->{'debug'}) {
939         $reservation->{'device'}->property_set("verbose", 1);
940     }
941
942     # read the label once, to get a "before" snapshot (see below)
943     my $old_label;
944     my $old_timestamp;
945     if (!$is_new) {
946         if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
947             && !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
948             $self->{'feedback'}->notif_new_tape(
949                 error => "while reading label on new volume: " . $device->error_or_status(),
950                 volume_label => undef);
951
952             return $self->_get_new_volume();
953         }
954         $old_label = $device->volume_label;
955         $old_timestamp = $device->volume_time;
956     }
957
958     # inform the xdt about this new device before starting it
959     $self->{'xdt'}->use_device($device);
960
961     if (!$device->start($access_mode, $new_label, $self->{'dump_timestamp'})) {
962         # try reading the label to see whether we erased the tape
963         my $erased = 0;
964         CHECK_READ_LABEL: {
965             # don't worry about erasing new tapes
966             if ($is_new) {
967                 last CHECK_READ_LABEL;
968             }
969
970             $device->read_label();
971
972             # does the device think something is broken now?
973             if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
974                 and !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
975                 $erased = 1;
976                 last CHECK_READ_LABEL;
977             }
978
979             # has the label changed?
980             my $vol_label = $device->volume_label;
981             if ((!defined $old_label and defined $vol_label)
982                 or (defined $old_label and !defined $vol_label)
983                 or (defined $old_label and $old_label ne $vol_label)) {
984                 $erased = 1;
985                 last CHECK_READ_LABEL;
986             }
987
988             # has the timestamp changed?
989             my $vol_timestamp = $device->volume_time;
990             if ((!defined $old_timestamp and defined $vol_timestamp)
991                 or (defined $old_timestamp and !defined $vol_timestamp)
992                 or (defined $old_timestamp and $old_timestamp ne $vol_timestamp)) {
993                 $erased = 1;
994                 last CHECK_READ_LABEL;
995             }
996         }
997
998         $self->{'feedback'}->notif_new_tape(
999             error => "while labeling new volume: " . $device->error_or_status(),
1000             volume_label => $erased? $new_label : undef);
1001
1002         return $self->_get_new_volume();
1003     }
1004
1005     # success!
1006     $self->{'feedback'}->notif_new_tape(
1007         error => undef,
1008         volume_label => $new_label);
1009
1010     # notify the changer that we've labeled the tape, and start the part.
1011     my $label_set_cb = make_cb(label_set_cb => sub {
1012         my ($err) = @_;
1013         if ($err) {
1014             $self->{'feedback'}->notif_log_info(
1015                 message => "Error from set_label: $err");
1016             # fall through to start_part anyway...
1017         }
1018         return $self->_start_part();
1019     });
1020     $self->{'reservation'}->set_label(label => $new_label,
1021         finished_cb => $label_set_cb);
1022 }
1023
1024 sub dbg {
1025     my ($self, $msg) = @_;
1026     if ($self->{'debug'}) {
1027         debug("Amanda::Taper::Scribe: $msg");
1028     }
1029 }
1030
1031 ##
1032 ## Feedback
1033 ##
1034
1035 package Amanda::Taper::Scribe::Feedback;
1036
1037 # request permission to use a volume.
1038 #
1039 # $params{'perm_cb'} - callback taking one argument: an error message or 'undef'
1040 sub request_volume_permission {
1041     my $self = shift;
1042     my %params = @_;
1043
1044     # sure, you can have as many volumes as you want!
1045     $params{'perm_cb'}->(undef);
1046 }
1047
1048 sub notif_new_tape { }
1049 sub notif_part_done { }
1050 sub notif_log_info { }
1051
1052 ##
1053 ## Device Handling
1054 ##
1055
1056 package Amanda::Taper::Scribe::DevHandling;
1057
1058 # This class handles scanning for volumes, requesting permission for those
1059 # volumes (the driver likes to feel like it's in control), and providing those
1060 # volumes to the scribe on request.  These can all happen independently, but
1061 # the scribe cannot begin writing to a volume until all three have finished.
1062 # That is: the scan is finished, the driver has given its permission, and the
1063 # scribe has requested a volume.
1064 #
1065 # On start, the class starts scanning immediately, even though the scribe has
1066 # not requested a volume.  Subsequently, a new scan does not begin until the
1067 # scribe requests a volume.
1068 #
1069 # This class is "private" to Amanda::Taper::Scribe, so it is documented in
1070 # comments, rather than POD.
1071
1072 # Create a new DevHandling object.  Params are taperscan and feedback.
1073 sub new {
1074     my $class = shift;
1075     my %params = @_;
1076
1077     my $self = {
1078         taperscan => $params{'taperscan'},
1079         feedback => $params{'feedback'},
1080
1081         # is a scan currently running, or completed?
1082         scan_running => 0,
1083         scan_finished => 0,
1084         scan_error => undef,
1085
1086         # scan results
1087         reservation => undef,
1088         device => undef,
1089         volume_label => undef,
1090
1091         # requests for permissiont to use a new volume
1092         request_pending => 0,
1093         request_complete => 0,
1094         request_denied_reason => undef,
1095
1096         volume_cb => undef, # callback for get_volume
1097         start_finished_cb => undef, # callback for start
1098     };
1099
1100     return bless ($self, $class);
1101 }
1102
1103 ## public methods
1104
1105 # Called at scribe startup, this starts the instance off with a scan.
1106 sub start {
1107     my $self = shift;
1108     my %params = @_;
1109
1110     $self->{'start_finished_cb'} = $params{'finished_cb'};
1111     $self->_start_scanning();
1112 }
1113
1114 # Get an open, started device and label to start writing to.  The
1115 # volume_callback takes the following arguments:
1116 #   $scan_error -- error message, or undef if no error occurred
1117 #   $request_denied_reason -- reason volume request was denied, or undef
1118 #   $reservation -- Amanda::Changer reservation
1119 #   $device -- open, started device
1120 # It is the responsibility of the caller to close the device and release the
1121 # reservation when finished.  If $scan_error or $request_denied_reason are
1122 # defined, then $reservation and $device will be undef.
1123 sub get_volume {
1124     my $self = shift;
1125     my (%params) = @_;
1126
1127     die "already processing a volume request"
1128         if ($self->{'volume_cb'});
1129
1130     $self->{'volume_cb'} = $params{'volume_cb'};
1131
1132     # kick off the relevant processes, if they're not already running
1133     $self->_start_scanning();
1134     $self->_start_request();
1135
1136     $self->_maybe_callback();
1137 }
1138
1139 # take a peek at the device we have, for which permission has not yet been
1140 # granted.  This will be undefined before the taperscan completes AND after
1141 # the volume_cb has been called.
1142 sub peek_device {
1143     my $self = shift;
1144
1145     return $self->{'device'};
1146 }
1147
1148 ## private methods
1149
1150 sub _start_scanning {
1151     my $self = shift;
1152
1153     return if $self->{'scan_running'} or $self->{'scan_finished'};
1154
1155     $self->{'scan_running'} = 1;
1156
1157     $self->{'taperscan'}->scan(result_cb => sub {
1158         my ($error, $reservation, $volume_label, $access_mode, $is_new) = @_;
1159
1160         $self->{'scan_running'} = 0;
1161         $self->{'scan_finished'} = 1;
1162
1163         if ($error) {
1164             $self->{'scan_error'} = $error;
1165         } else {
1166             $self->{'reservation'} = $reservation;
1167             $self->{'device'} = $reservation->{'device'};
1168             $self->{'volume_label'} = $volume_label;
1169             $self->{'access_mode'} = $access_mode;
1170             $self->{'is_new'} = $access_mode;
1171         }
1172
1173         if (!$error and $is_new) {
1174             $self->{'feedback'}->notif_log_info(
1175                 message => "Will write new label `$volume_label' to new tape");
1176         }
1177
1178         $self->_maybe_callback();
1179     });
1180 }
1181
1182 sub _start_request {
1183     my $self = shift;
1184
1185     return if $self->{'request_pending'} or $self->{'request_complete'};
1186
1187     $self->{'request_pending'} = 1;
1188
1189     $self->{'feedback'}->request_volume_permission(perm_cb => sub {
1190         my ($refusal_reason) = @_;
1191
1192         $self->{'request_pending'} = 0;
1193         $self->{'request_complete'} = 1;
1194         $self->{'request_denied_reason'} = $refusal_reason;
1195
1196         $self->_maybe_callback();
1197     });
1198 }
1199
1200 sub _maybe_callback {
1201     my $self = shift;
1202
1203     # if we have any kind of error, release the reservation and come back
1204     # later
1205     if (($self->{'scan_error'} or $self->{'request_denied_reason'}) and $self->{'reservation'}) {
1206         $self->{'device'} = undef;
1207
1208         $self->{'reservation'}->release(finished_cb => sub {
1209             my ($error) = @_;
1210
1211             # so many errors, so little time..
1212             if ($error) {
1213                 if ($self->{'scan_error'}) {
1214                     warning("ignoring error releasing reservation ($error) after a scan error");
1215                 } else {
1216                     $self->{'scan_error'} = $error;
1217                 }
1218             }
1219
1220             $self->{'reservation'} = undef;
1221             $self->_maybe_callback();
1222         });
1223
1224         return;
1225     }
1226
1227     # if we are just starting up, call the finished_cb given to start()
1228     if (defined $self->{'start_finished_cb'} and $self->{'scan_finished'}) {
1229         my $cb = $self->{'start_finished_cb'};
1230         $self->{'start_finished_cb'} = undef;
1231
1232         $cb->($self->{'scan_error'});
1233     }
1234
1235     # if the volume_cb is good to get called, call it and reset to the ground state
1236     if ($self->{'volume_cb'} and $self->{'scan_finished'} and $self->{'request_complete'}) {
1237         # get the cb and its arguments lined up before calling it..
1238         my $volume_cb = $self->{'volume_cb'};
1239         my @volume_cb_args = (
1240             $self->{'scan_error'},
1241             $self->{'request_denied_reason'},
1242             $self->{'reservation'},
1243             $self->{'volume_label'},
1244             $self->{'access_mode'},
1245             $self->{'is_new'},
1246         );
1247
1248         # reset everything and prepare for a new scan
1249         $self->{'scan_finished'} = 0;
1250
1251         $self->{'reservation'} = undef;
1252         $self->{'device'} = undef;
1253         $self->{'volume_label'} = undef;
1254
1255         $self->{'request_complete'} = 0;
1256         $self->{'volume_cb'} = undef;
1257
1258         $volume_cb->(@volume_cb_args);
1259     }
1260 }
1261
1262 1;