Imported Upstream version 3.3.2
[debian/amanda] / perl / Amanda / Taper / Scribe.pm
1 # Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This library is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU Lesser General Public License version 2.1 as
5 # published by the Free Software Foundation.
6 #
7 # This library is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
10 # License for more details.
11 #
12 # You should have received a copy of the GNU Lesser General Public License
13 # along with this library; if not, write to the Free Software Foundation,
14 # Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
15 #
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 =head1 NAME
20
21 Amanda::Taper::Scribe
22
23 =head1 SYNOPSIS
24
25   step start_scribe => sub {
26       my $scribe = Amanda::Taper::Scribe->new(
27             taperscan => $taperscan_algo,
28             feedback => $feedback_obj);
29     $scribe->start(
30         write_timestamp => $write_timestamp,
31         finished_cb => $steps->{'start_xfer'});
32   };
33
34   step start_xfer => sub {
35     my ($err) = @_;
36     my $xfer_dest = $scribe->get_xfer_dest(
37         allow_split => 1,
38         max_memory => 64 * 1024,
39         can_cache_inform => 0,
40         part_size => 150 * 1024**2,
41         part_cache_type => 'disk',
42         part_cache_dir => "$tmpdir/splitbuffer",
43         part_cache_max_size => 20 * 1024**2);
44     # .. set up the rest of the transfer ..
45     $xfer->start(sub {
46         my ($src, $msg, $xfer) = @_;
47         $scribe->handle_xmsg($src, $msg, $xfer);
48         # .. any other processing ..
49     };
50     # tell the scribe to start dumping via this transfer
51     $scribe->start_dump(
52         xfer => $xfer,
53         dump_header => $hdr,
54         dump_cb => $steps->{'dump_cb'});
55   };
56
57   step dump_cb => sub {
58       my %params = @_;
59       # .. handle dump results ..
60       print "DONE\n";
61       $finished_cb->();
62   };
63
64
65 =head1 OVERVIEW
66
67 This package provides a high-level abstraction of Amanda's procedure for
68 writing dumpfiles to tape.
69
70 Amanda writes a sequence of dumpfiles to a sequence of volumes.  The
71 volumes are supplied by a taperscan algorithm, which operates a changer
72 to find and load each volume.  As dumpfiles are written to volumes and
73 those volumes fill up, the taperscan algorithm supplies additional
74 volumes.
75
76 In order to reduce internal fragmentation within volumes, Amanda can "split"
77 dumpfiles into smaller pieces, so that the overall dumpfile can span multiple
78 volumes.  Each "part" is written to the volume in sequence.  If a device
79 encounters an error while writing a part, then that part is considered
80 "partial", and is rewritten from its beginning on the next volume.  Some
81 devices can reliably indicate that they are full (EOM), and for these devices
82 parts are simply truncated, and the Scribe starts the next part on the next
83 volume.
84
85 To facilitate rewriting parts on devices which cannot indicate EOM, Amanda must
86 retain all of the data in a part, even after that data is written to the
87 volume.  The Scribe provides several methods to support this: caching the part
88 in memory, caching the part in a special on-disk file, or relying on
89 pre-existing on-disk storage.  The latter method is used when reading from
90 holding disks.
91
92 The details of efficiently splitting dumpfiles and rewriting parts are handled
93 by the low-level C<Amanda::Xfer::Dest::Taper> subclasses.  The Scribe creates
94 an instance of the appropriate subclass and supplies it with volumes from an
95 C<Amanda::Taper::Scan> object.  It calls a number of
96 C<Amanda::Taper::Scribe::Feedback> methods to indicate the status of the dump
97 process and to request permission for each additional volume.
98
99 =head1 OPERATING A SCRIBE
100
101 The C<Amanda::Taper::Scribe> constructor takes two arguments:
102 C<taperscan> and C<feedback>.  The first specifies the taper scan
103 algorithm that the Scribe should use, and the second specifies the
104 C<Feedback> object that will receive notifications from the Scribe (see
105 below).
106
107   my $scribe = Amanda::Taper::Scribe->new(
108         taperscan => $my_taperscan,
109         feedback => $my_feedback);
110
111 Once the object is in place, call its C<start> method.
112
113 =head2 START THE SCRIBE
114
115 Start the scribe's operation by calling its C<start> method.  This will invoke
116 the taperscan algorithm and scan for a volume.  The method takes two parameters:
117
118   $scribe->start(
119         write_timestamp => $ts,
120         finished_cb => $start_finished_cb);
121
122 The timestamp will be written to each volume written by the Scribe.  The
123 C<finished_cb> will be called with a single argument - C<undef> or an error
124 message - when the Scribe is ready to start its first dump.  The Scribe is
125 "ready" when it has found a device to which it can write, although it does not
126 request permission to overwrite that volume, nor start overwriting it, until
127 the first dump begins (that is, until the first call to C<start_dump>).
128
129 =head2 SET UP A TRANSFER
130
131 Once the Scribe is started, begin transferring a dumpfile.  This is a
132 three-step process: first, get an C<Amanda::Xfer::Dest::Taper> object from the
133 Scribe, then start the transfer, and finally let the Scribe know that the
134 transfer has started.  Note that the Scribe supplies and manages the transfer
135 destination, but the transfer itself remains the responsibility of the caller.
136
137 =head3 Get device
138
139 Call C<get_device> to get the first device the xfer will be working with.
140
141   $device = $scribe->get_device();
142
143 This method must be called after C<start> has completed.
144
145 =head3 Check device compatibily for the data path
146
147 Call C<check_data_path>, supplying the data_path requested by the user.
148
149   if (my $err = $scribe->check_data_path($data_path)) {
150       # handle error message
151   }
152
153 This method must be called after C<start> has completed and before
154 C<get_xfer_dest> is called. It returns C<undef> on success or an error message
155 if the supplied C<data_path> is incompatible with the device.  This is mainly
156 used to detect when a DirectTCP dump is going to a non-DirectTCP device.
157
158 =head3 Get a Transfer Destination
159
160 Call C<get_xfer_dest> to get the transfer element, supplying information on how
161 the dump should be split:
162
163   $xdest = $scribe->get_xfer_dest(
164         allow_split => $allow_split,
165         max_memory => $max_memory,
166         # .. splitting parameters
167         );
168
169 This method must be called after C<start> has completed, and will always return
170 a transfer element immediately.  The underlying C<Amanda::Xfer::Dest::Taper>
171 handles device streaming properly.  It uses C<max_memory> bytes of memory for
172 this purpose.
173
174 The splitting parameters to C<get_xfer_dest> are:
175
176 =over 4
177
178 =item C<allow_split>
179
180 this dle is allowed or not to split
181
182 =item C<part_size>
183
184 the split part size to use, or 0 for no splitting
185
186 =item C<part_cache_type>
187
188 when caching, the kind of caching to perform ('disk', 'memory' or the default,
189 'none')
190
191 =item C<part_cache_dir>
192
193 the directory to use for disk caching
194
195 =item C<part_cache_max_size>
196
197 the maximum part size to use when caching
198
199 =item C<can_cache_inform>
200
201 true if the transfer source can call the destination's C<cache_inform> method
202 (e.g., C<Amanda::Xfer::Source::Holding>).
203
204 =back
205
206 The first four of these parameters correspond exactly to the eponymous tapetype
207 configuration parameters, and have the same default values (when omitted or
208 C<undef>).  The method will take this information, along with details of the
209 device it intends to use, and set up the transfer destination.
210
211 The utility function C<get_splitting_args_from_config> can determine the
212 appropriate C<get_xfer_dest> splitting parameters based on a
213 few Amanda configuration parameters.  If a parameter was not seen in the
214 configuration, it should be omitted or passed as C<undef>.  The function
215 returns a hash to pass to C<get_xfer_dest>, although that hash may have an
216 C<warning> key containing a message if there is a problem that the user
217 should know about.
218
219   use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
220   my %splitting_args = get_splitting_args_from_config(
221     # Amanda dumptype configuration parameters,
222     dle_allow_split => ..,
223     dle_tape_splitsize => ..,
224     dle_split_diskbuffer => ..,
225     dle_fallback_splitsize => ..,
226     dle_allow_split => ..,
227     # Amanda tapetype configuration parameters,
228     part_size => .., ## in bytes, not kb!!
229     part_size_kb => ..., ## or use this, in kb
230     part_cache_type => ..,
231     part_cache_type_enum => ..., ## one of the enums from tapetype_getconf
232     part_cache_dir => ..,
233     part_cache_max_size => ..,
234   );
235   if ($splitting_args{'error'}) { .. }
236
237 An C<Amanda::Taper::Scribe> object can only run one transfer at a time, so
238 do not call C<get_xfer_dest> until the C<dump_cb> for the previous C<start_dump>
239 has been called.
240
241 =head3 Start the Transfer
242
243 Armed with the element returned by C<get_xfer_dest>, the caller should create a
244 source element and a transfer object and start the transfer.  In order to
245 manage the splitting process, the Scribe needs to be informed, via its
246 C<handle_xmsg> method, of all transfer messages .  This is usually accomplished
247 with something like:
248
249   $xfer->start(sub {
250       my ($src, $msg, $xfer) = @_;
251       $scribe->handle_xmsg($src, $msg, $xfer);
252   });
253
254 =head3 Inform the Scribe
255
256 Once the transfer has started, the Scribe is ready to begin writing parts to
257 the volume.  This is the first moment at which the Scribe needs a header, too.
258 All of this is supplied to the C<start_dump> method:
259
260   $scribe->start_dump(
261       xfer => $xfer,
262       dump_header => $hdr,
263       dump_cb => $dump_cb);
264
265 The c<dump_header> here is the header that will be applied to all parts of the
266 dumpfile.  The only field in the header that the Scribe controls is the part
267 number.  The C<dump_cb> callback passed to C<start_dump> is called when the
268 dump is completely finished - either successfully or with a fatal error.
269 Unlike most callbacks, this one takes keyword arguments, since it has so many
270 parameters.
271
272   $dump_cb->(
273         result => $result,
274         device_errors => $device_errors,
275         config_denial_message => $cdm,
276         size => $size,
277         duration => $duration,
278         total_duration => $total_duration,
279         nparts => $nparts);
280
281 All parameters will be present on every call, although the order is not
282 guaranteed.
283
284 The C<result> is one of C<"FAILED">, C<"PARTIAL">, or C<"DONE">.  Even when
285 C<dump_cb> reports a fatal error, C<result> may be C<"PARTIAL"> if some data
286 was written successfully.
287
288 The C<device_error> key points to a list of errors, each given as a string,
289 that describe what went wrong to cause the dump to fail.  The
290 C<config_denial_message> parrots the reason provided by C<$perm_cb> (see below)
291 for denying use of a new tape if the cause was 'config', and is C<undef>
292 otherwise.
293
294 The final parameters, C<size> (in bytes), C<duration>, C<total_duration> (in
295 seconds), and C<nparts> describe the total transfer, and are a sum of all of
296 the parts written to the device.  Note that C<nparts> does not include any
297 empty trailing parts.  Note that C<duration> does not include time spent
298 operating the changer, while C<total_duration> reflects the time from the
299 C<start_dump> call to the invocation of the C<dump_cb>.
300
301 =head3 Cancelling a Dump
302
303 After you have requested a transfer destination, the scribe is poised to begin the
304 transfer.  If you cannot actually perform the transfer for some reason, you'll need
305 to go through the motions all the same, but cancel the operation immediately.  That
306 can be done by calling C<cancel_dump>:
307
308   $scribe->cancel_dump(
309         xfer => $xfer,
310         dump_cb => $dump_cb);
311
312 =head2 QUIT
313
314 When all of the dumpfiles are transferred, call the C<quit> method to
315 release any resources and clean up.  This method takes a typical
316 C<finished_cb>.
317
318   $scribe->quit(finished_cb => sub {
319     print "ALL DONE!\n";
320   });
321
322 =head2 GET_BYTES_WRITTEN
323
324 The C<get_bytes_written> returns the number of bytes written to the device at
325 the time of the call, and is meant to be used for status reporting.  This value
326 is updated at least as each part is finished; for some modes of operation, it
327 is updated continuously.  Notably, DirectTCP transfers do not update
328 continuously.
329
330 =head2 START_SCAN
331
332 The C<start_scan> method initiate a scan of the changer to find a usable tape.
333
334 =head1 FEEDBACK
335
336 The C<Amanda::Taper::Scribe::Feedback> class is intended to be
337 subclassed by the user.  It provides a number of notification methods
338 that enable the historical logging and driver/taper interactions
339 required by Amanda.  The parent class does nothing of interest, but
340 allows subclasses to omit methods they do not need.
341
342 The C<request_volume_permission> method provides a means for the caller
343 to limit the number of volumes the Scribe consumes.  It is called as
344
345   $fb->request_volume_permission(perm_cb => $cb);
346
347 The C<perm_cb> is a callback which expects a hash as arguments. If C<allow>
348 is set, then the scribe is allowed to use a new volume, if C<scribe> is set,
349 then the xfer must be transfered to that scribe, otherwise a C<cause>
350 and a C<message> describing why a new volume should not be used. must be
351 set. e.g.
352
353   perm_cb->(allow => 1);
354   perm_cb->(scribe => $new_scribe);
355   perm_cb->(cause => 'config', message => $message);
356   perm_cb->(cause => 'error', message => $message);
357
358 A cause of 'config' indicates that the denial is due to the user's
359 configuration, and thus should not be presented as an error.  The default
360 implementation always calls C<< perm_cb->() >>.
361
362 All of the remaining methods are notifications, and do not take a
363 callback.
364
365   $fb->scribe_notif_new_tape(
366         error => $error,
367         volume_label => $volume_label);
368
369 The Scribe calls C<scribe_notif_new_tape> when a new volume is started.  If the
370 C<volume_label> is undefined, then the volume was not successfully
371 relabled, and its previous contents may still be available.  If C<error>
372 is defined, then no useful data was written to the volume.  Note that
373 C<error> and C<volume_label> may I<both> be defined if the previous
374 contents of the volume were erased, but no useful, new data was written
375 to the volume.
376
377 This method will be called exactly once for every call to
378 C<request_volume_permission> that calls back with C<< perm_cb->() >>.
379
380   $fb->scribe_notif_tape_done(
381         volume_label => $volume_label,
382         size => $size,
383         num_files => $num_files);
384
385 The C<scribe_notif_tape_done> method is called after a volume is completely
386 written and its reservation has been released.  Note that the scribe waits
387 until the last possible moment to release a reservation, so this may be called
388 later than expected, e.g., during a C<quit> invocation.
389
390   $fb->scribe_notif_part_done(
391         partnum => $partnum,
392         fileno => $fileno,
393         successful => $successful,
394         size => $size,
395         duration => $duration);
396
397 The Scribe calls C<scribe_notif_part_done> for each part written to the volume,
398 including partial parts.  If the part was not written successfully, then
399 C<successful> is false.  The C<size> is in bytes, and the C<duration> is
400 a floating-point number of seconds.  If a part fails before a new device
401 file is created, then C<fileno> may be zero.
402
403 Finally, the Scribe sends a few historically significant trace log messages
404 via C<scribe_notif_log_info>:
405
406   $fb->scribe_notif_log_info(
407         message => $message);
408
409 A typical Feedback subclass might begin like this:
410
411   package main::Feedback;
412   use base 'Amanda::Taper::Scribe::Feedback';
413
414   sub request_volume_permission {
415     my $self = shift;
416     my %params = @_;
417
418     $params{'perm_cb'}->(cause => "error", message => "NO VOLUMES FOR YOU!");
419   }
420
421 =cut
422
423 package Amanda::Taper::Scribe;
424
425 use strict;
426 use warnings;
427 use Carp;
428
429 use Amanda::Xfer qw( :constants );
430 use Amanda::Device qw( :constants );
431 use Amanda::Header;
432 use Amanda::Debug qw( :logging );
433 use Amanda::MainLoop;
434 use Amanda::Tapelist;
435 use Amanda::Config qw( :getconf config_dir_relative );
436 use base qw( Exporter );
437
438 our @EXPORT_OK = qw( get_splitting_args_from_config );
439
440 sub new {
441     my $class = shift;
442     my %params = @_;
443
444     my $decide_debug = $Amanda::Config::debug_taper || $params{'debug'};
445     for my $rq_param (qw(taperscan feedback)) {
446         croak "required parameter '$rq_param' mising"
447             unless exists $params{$rq_param};
448     }
449
450     my $self = {
451         taperscan => $params{'taperscan'},
452         feedback => $params{'feedback'},
453         debug => $decide_debug,
454         eject_volume => $params{'eject_volume'},
455         write_timestamp => undef,
456         started => 0,
457
458         # device handling, and our current device and reservation
459         devhandling => Amanda::Taper::Scribe::DevHandling->new(
460             taperscan => $params{'taperscan'},
461             feedback => $params{'feedback'},
462         ),
463         reservation => undef,
464         device => undef,
465         device_size => undef,
466         device_at_eom => undef, # device still exists, but is full
467         close_volume => undef,
468
469         # callback passed to start_dump
470         dump_cb => undef,
471
472         # information for the current dumpfile
473         dump_header => undef,
474         retry_part_on_peom => undef,
475         allow_split => undef,
476         xfer => undef,
477         xdt => undef,
478         xdt_ready => undef,
479         start_part_on_xdt_ready => 0,
480         size => 0,
481         duration => 0.0,
482         dump_start_time => undef,
483         last_part_successful => 0,
484         started_writing => 0,
485         device_errors => [],
486         config_denial_message => undef,
487     };
488
489     return bless ($self, $class);
490 }
491
492 sub start {
493     my $self = shift;
494     my %params = @_;
495
496     for my $rq_param (qw(write_timestamp finished_cb)) {
497         croak "required parameter '$rq_param' missing"
498             unless exists $params{$rq_param};
499     }
500
501     confess "scribe already started" if $self->{'started'};
502
503     $self->dbg("starting");
504     $self->{'write_timestamp'} = $params{'write_timestamp'};
505
506     # start up the DevHandling object, making sure we know
507     # when it's done with its startup process
508     $self->{'devhandling'}->start(finished_cb => sub {
509         $self->{'started'} = 1;
510         $params{'finished_cb'}->(@_);
511     });
512 }
513
514 sub quit {
515     my $self = shift;
516     my %params = @_;
517
518     # since there's little other option than to barrel on through the
519     # quitting procedure, quit() just accumulates its error messages
520     # and, if necessary, concantenates them for the finished_cb.
521     my @errors;
522
523     my $steps = define_steps
524         cb_ref => \$params{'finished_cb'};
525
526     step setup => sub {
527         $self->dbg("quitting");
528
529         if ($self->{'xfer'}) {
530             confess "Scribe cannot quit while a transfer is active";
531             # Supporting this would be complicated:
532             # - cancel the xfer and wait for it to complete
533             # - ensure that the taperscan not be started afterward
534             # and isn't required for normal Amanda operation.
535         }
536
537         $steps->{'release'}->();
538     };
539
540     step release => sub {
541         if ($self->{'reservation'}) {
542             $self->_release_reservation(finished_cb => $steps->{'released'});
543         } else {
544             $steps->{'stop_devhandling'}->();
545         }
546     };
547
548     step released => sub {
549         my ($err) = @_;
550         push @errors, "$err" if $err;
551
552         $self->{'reservation'} = undef;
553
554         $steps->{'stop_devhandling'}->();
555     };
556
557     step stop_devhandling => sub {
558         $self->{'devhandling'}->quit(finished_cb => $steps->{'stopped_devhandling'});
559     };
560
561     step stopped_devhandling => sub {
562         my ($err) = @_;
563         push @errors, "$err" if $err;
564
565         my $errmsg = join("; ", @errors) if @errors >= 1;
566         $params{'finished_cb'}->($errmsg);
567     };
568 }
569
570 sub get_device {
571     my $self = shift;
572
573     # Can return a device we already have, or "peek" at the
574     # DevHandling object's device.
575     # It might not have right permission on the device.
576
577     my $device;
578     if (defined $self->{'device'}) {
579         $device = $self->{'device'};
580     } else {
581         $device = $self->{'devhandling'}->peek_device();
582     }
583     return $device;
584 }
585
586 sub check_data_path {
587     my $self = shift;
588     my $data_path = shift;
589
590     my $device = $self->get_device();
591
592     if (!defined $device) {
593         confess "no device is available to check the datapath";
594     }
595
596     my $use_directtcp = $device->directtcp_supported();
597
598     my $xdt;
599     if (!$use_directtcp) {
600         if ($data_path eq 'DIRECTTCP') {
601             return "Can't dump DIRECTTCP data-path dle to a device ('" .
602                    $device->device_name .
603                    "') that doesn't support it";
604         }
605     }
606     return undef;
607 }
608
609 sub start_scan {
610     my $self = shift;
611
612     $self->{'devhandling'}->start_scan();
613 }
614
615 # Get a transfer destination; does not use a callback
616 sub get_xfer_dest {
617     my $self = shift;
618     my %params = @_;
619
620     for my $rq_param (qw(max_memory)) {
621         croak "required parameter '$rq_param' missing"
622             unless exists $params{$rq_param};
623     }
624
625     confess "not yet started"
626         unless $self->{'write_timestamp'} and $self->{'started'};
627     confess "xfer element already returned"
628         if ($self->{'xdt'});
629     confess "xfer already running"
630         if ($self->{'xfer'});
631
632     $self->{'xfer'} = undef;
633     $self->{'xdt'} = undef;
634     $self->{'size'} = 0;
635     $self->{'duration'} = 0.0;
636     $self->{'nparts'} = undef;
637     $self->{'dump_start_time'} = undef;
638     $self->{'last_part_successful'} = 1;
639     $self->{'started_writing'} = 0;
640     $self->{'device_errors'} = [];
641     $self->{'config_denial_message'} = undef;
642
643     # set the callback
644     $self->{'dump_cb'} = undef;
645     $self->{'retry_part_on_peom'} = 1;
646     $self->{'allow_split'} = 0;
647     $self->{'start_part_on_xdt_ready'} = 0;
648
649     # start getting parameters together to determine what kind of splitting
650     # and caching we're going to do
651     my $part_size = $params{'part_size'} || 0;
652     my ($use_mem_cache, $disk_cache_dirname) = (0, undef);
653     my $can_cache_inform = $params{'can_cache_inform'};
654     my $part_cache_type = $params{'part_cache_type'} || 'none';
655     my $allow_split = $params{'allow_split'};
656
657     my $xdt_first_dev = $self->get_device();
658     if (!defined $xdt_first_dev) {
659         confess "no device is available to create an xfer_dest";
660     }
661     my $leom_supported = $xdt_first_dev->property_get("leom");
662     my $use_directtcp = $xdt_first_dev->directtcp_supported();
663
664     # figure out the destination type we'll use, based on the circumstances
665     my ($dest_type, $dest_text);
666     if ($use_directtcp) {
667         $dest_type = 'directtcp';
668         $dest_text = "using DirectTCP";
669     } elsif ($can_cache_inform && $leom_supported) {
670         $dest_type = 'splitter';
671         $dest_text = "using LEOM (falling back to holding disk as cache)";
672     } elsif ($leom_supported) {
673         $dest_type = 'splitter';
674         $dest_text = "using LEOM detection (no caching)";
675     } elsif ($can_cache_inform) {
676         $dest_type = 'splitter';
677         $dest_text = "using cache_inform";
678     } elsif ($part_cache_type ne 'none') {
679         $dest_type = 'cacher';
680
681         # we'll be caching, so apply the maximum size
682         my $part_cache_max_size = $params{'part_cache_max_size'} || 0;
683         $part_size = $part_cache_max_size
684             if ($part_cache_max_size and $part_cache_max_size < $part_size);
685
686         # and figure out what kind of caching to apply
687         if ($part_cache_type eq 'memory') {
688             $use_mem_cache = 1;
689         } else {
690             # note that we assume this has already been checked; if it's wrong,
691             # the xfer element will just fail immediately
692             $disk_cache_dirname = $params{'part_cache_dir'};
693         }
694         $dest_text = "using cache type '$part_cache_type'";
695     } else {
696         $dest_type = 'splitter';
697         $dest_text = "using no cache (PEOM will be fatal)";
698
699         # no directtcp, no caching, no cache_inform, and no LEOM, so a PEOM will be fatal
700         $self->{'retry_part_on_peom'} = 0;
701     }
702
703     if ($allow_split &&
704         ($can_cache_inform ||
705          !defined($part_cache_type) ||
706          $part_cache_type eq 'disk' ||
707          $part_cache_type eq 'memory' ||
708          $leom_supported)) {
709         $self->{'allow_split'} = 1;
710     } else {
711         $self->{'allow_split'} = 0;
712     }
713
714     $self->{'retry_part_on_peom'} = 0 if !$self->{'allow_split'};
715
716     debug("Amanda::Taper::Scribe preparing to write, part size $part_size, "
717         . "$dest_text ($dest_type) "
718         . ($leom_supported? " (LEOM supported)" : " (no LEOM)"));
719
720     # set the device to verbose logging if we're in debug mode
721     if ($self->{'debug'}) {
722         $xdt_first_dev->property_set("verbose", 1);
723     }
724
725     my $xdt;
726     if ($dest_type eq 'directtcp') {
727         $xdt = Amanda::Xfer::Dest::Taper::DirectTCP->new(
728             $xdt_first_dev, $part_size);
729         $self->{'xdt_ready'} = 0; # xdt isn't ready until we get XMSG_READY
730     } elsif ($dest_type eq 'splitter') {
731         $xdt = Amanda::Xfer::Dest::Taper::Splitter->new(
732             $xdt_first_dev, $params{'max_memory'}, $part_size, $can_cache_inform);
733         $self->{'xdt_ready'} = 1; # xdt is ready immediately
734     } else {
735         $xdt = Amanda::Xfer::Dest::Taper::Cacher->new(
736             $xdt_first_dev, $params{'max_memory'}, $part_size,
737             $use_mem_cache, $disk_cache_dirname);
738         $self->{'xdt_ready'} = 1; # xdt is ready immediately
739     }
740     $self->{'start_part_on_xdt_ready'} = 0;
741     $self->{'xdt'} = $xdt;
742
743     return $xdt;
744 }
745
746 sub start_dump {
747     my $self = shift;
748     my %params = @_;
749
750     confess "no xfer dest set up; call get_xfer_dest first"
751         unless defined $self->{'xdt'};
752
753     # get the header ready for writing (totalparts was set by the caller)
754     $self->{'dump_header'} = $params{'dump_header'};
755     $self->{'dump_header'}->{'partnum'} = 1;
756
757     # set up the dump_cb for when this dump is done, and keep the xfer
758     $self->{'dump_cb'} = $params{'dump_cb'};
759     $self->{'xfer'} = $params{'xfer'};
760     $self->{'dump_start_time'} = time;
761
762     # and start the part
763     $self->_start_part();
764 }
765
766 sub cancel_dump {
767     my $self = shift;
768     my %params = @_;
769
770     confess "no xfer dest set up; call get_xfer_dest first"
771         unless defined $self->{'xdt'};
772
773     # set up the dump_cb for when this dump is done, and keep the xfer
774     $self->{'dump_cb'} = $params{'dump_cb'};
775     $self->{'xfer'} = $params{'xfer'};
776
777     # The cancel will can dump_cb.
778
779     $self->{'xfer'}->cancel();
780
781 }
782
783 sub close_volume {
784     my $self = shift;
785
786     $self->{'close_volume'} = 1;
787 }
788
789 sub get_bytes_written {
790     my ($self) = @_;
791
792     if (defined $self->{'xdt'}) {
793         return $self->{'size'} + $self->{'xdt'}->get_part_bytes_written();
794     } else {
795         return $self->{'size'};
796     }
797 }
798
799 sub _start_part {
800     my $self = shift;
801
802     $self->dbg("trying to start part");
803
804     # if the xdt isn't ready yet, wait until it is; note that the XDT is still
805     # using the device right now, so we can't even label it yet.
806     if (!$self->{'xdt_ready'}) {
807         $self->dbg("XDT not ready yet; waiting until it is");
808         $self->{'start_part_on_xdt_ready'} = 1;
809         return
810     }
811
812     if ($self->{'close_volume'}) {
813         $self->{'close_volume'} = undef;
814         return $self->_get_new_volume();
815     }
816
817     # we need an actual, permitted device at this point, so if we don't have
818     # one, then defer this start_part call until we do.  The device may still
819     # exist, but be at EOM, if the last dump failed at EOM and was not retried
820     # on a new volume.
821     if (!$self->{'device'} or $self->{'device_at_eom'}) {
822         # _get_new_volume calls _start_part when it has a new volume in hand
823         return $self->_get_new_volume();
824     }
825
826     # if the dump wasn't successful, and we're not splitting, then bail out.  It's
827     # up to higher-level components to re-try this dump on a new volume, if desired.
828     # Note that this should be caught in the XMSG_PART_DONE handler -- this is just
829     # here for backup.
830     if (!$self->{'last_part_successful'} and !$self->{'retry_part_on_peom'}) {
831         $self->_operation_failed(device_error => "No space left on device (uncaught)");
832         return;
833     }
834
835     # and start writing this part
836     $self->{'started_writing'} = 1;
837     $self->dbg("resuming transfer");
838     $self->{'xdt'}->start_part(!$self->{'last_part_successful'},
839                                $self->{'dump_header'});
840 }
841
842 sub handle_xmsg {
843     my $self = shift;
844     my ($src, $msg, $xfer) = @_;
845
846     if ($msg->{'type'} == $XMSG_DONE) {
847         $self->_xmsg_done($src, $msg, $xfer);
848         return;
849     }
850
851     # for anything else we only pay attention to messages from
852     # our own element
853     if ($msg->{'elt'} == $self->{'xdt'}) {
854         $self->dbg("got msg from xfer dest: $msg");
855         if ($msg->{'type'} == $XMSG_PART_DONE) {
856             $self->_xmsg_part_done($src, $msg, $xfer);
857         } elsif ($msg->{'type'} == $XMSG_READY) {
858             $self->_xmsg_ready($src, $msg, $xfer);
859         } elsif ($msg->{'type'} == $XMSG_ERROR) {
860             $self->_xmsg_error($src, $msg, $xfer);
861         }
862     }
863 }
864
865 sub _xmsg_part_done {
866     my $self = shift;
867     my ($src, $msg, $xfer) = @_;
868
869     # this handles successful zero-byte parts as a special case - they
870     # are an implementation detail of the splitting done by the transfer
871     # destination.
872
873     if ($msg->{'successful'} and $msg->{'size'} == 0) {
874         $self->dbg("not notifying for empty, successful part");
875     } else {
876         # double-check partnum
877         confess "Part numbers do not match!"
878             unless ($self->{'dump_header'}->{'partnum'} == $msg->{'partnum'});
879
880         # notify
881         $self->{'feedback'}->scribe_notif_part_done(
882             partnum => $msg->{'partnum'},
883             fileno => $msg->{'fileno'},
884             successful => $msg->{'successful'},
885             size => $msg->{'size'},
886             duration => $msg->{'duration'});
887
888         # increment nparts here, so empty parts are not counted
889         $self->{'nparts'} = $msg->{'partnum'};
890     }
891
892     $self->{'last_part_successful'} = $msg->{'successful'};
893
894     if ($msg->{'successful'}) {
895         $self->{'device_size'} += $msg->{'size'};
896         $self->{'size'} += $msg->{'size'};
897         $self->{'duration'} += $msg->{'duration'};
898     }
899
900     if (!$msg->{'eof'}) {
901         # update the header for the next dumpfile, if this was a non-empty part
902         if ($msg->{'successful'} and $msg->{'size'} != 0) {
903             $self->{'dump_header'}->{'partnum'}++;
904         }
905
906         if ($msg->{'eom'}) {
907             # if there's an error finishing the device, it's probably just carryover
908             # from the error the Xfer::Dest::Taper encountered while writing to the
909             # device, so we ignore it.
910             if (!$self->{'device'}->finish()) {
911                 my $devname = $self->{'device'}->device_name;
912                 my $errmsg = $self->{'device'}->error_or_status();
913                 $self->dbg("ignoring error while finishing device '$devname': $errmsg");
914             }
915
916             # if the part failed..
917             if (!$msg->{'successful'} || !$self->{'allow_split'}) {
918                 # if no caching was going on, then the dump has failed
919                 if (!$self->{'retry_part_on_peom'}) {
920                     # mark this device as at EOM, since we are not going to look
921                     # for another one yet
922                     $self->{'device_at_eom'} = 1;
923
924                     my $msg = "No space left on device";
925                     if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
926                         $msg = $self->{'device'}->error_or_status();
927                     }
928                     $self->_operation_failed(device_error => "$msg, splitting not enabled");
929                     return;
930                 }
931
932                 # log a message for amreport
933                 $self->{'feedback'}->scribe_notif_log_info(
934                     message => "Will request retry of failed split part.");
935             }
936
937             # get a new volume, then go on to the next part
938             $self->_get_new_volume();
939         } else {
940             # if the part was unsuccessful, but the xfer dest has reason to believe
941             # this is not due to EOM, then the dump is done
942             if (!$msg->{'successful'}) {
943                 if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
944                     $msg = $self->{'device'}->error_or_status();
945                     $self->_operation_failed(device_error => $msg);
946                 } else {
947                     $self->_operation_failed();
948                 }
949                 return;
950             }
951
952             # no EOM -- go on to the next part
953             $self->_start_part();
954         }
955     }
956 }
957
958 sub _xmsg_ready {
959     my $self = shift;
960     my ($src, $msg, $xfer) = @_;
961
962     $self->dbg("XDT is ready");
963     $self->{'xdt_ready'} = 1;
964     if ($self->{'start_part_on_xdt_ready'}) {
965         $self->{'start_part_on_xdt_ready'} = 0;
966         $self->_start_part();
967     }
968 }
969
970 sub _xmsg_error {
971     my $self = shift;
972     my ($src, $msg, $xfer) = @_;
973
974     # XMSG_ERROR from the XDT is always fatal
975     $self->_operation_failed(device_error => $msg->{'message'});
976 }
977
978 sub _xmsg_done {
979     my $self = shift;
980     my ($src, $msg, $xfer) = @_;
981
982     if ($msg->{'type'} == $XMSG_DONE) {
983         $self->dbg("transfer is complete");
984         $self->_dump_done();
985     }
986 }
987
988 sub _dump_done {
989     my $self = shift;
990
991     my $result;
992
993     # determine the correct final status - DONE if we're done, PARTIAL
994     # if we've started writing to the volume, otherwise FAILED
995     if (@{$self->{'device_errors'}} or $self->{'config_denial_message'} or
996         !$self->{'last_part_successful'}) {
997         $result = $self->{'started_writing'}? 'PARTIAL' : 'FAILED';
998     } else {
999         $result = 'DONE';
1000     }
1001
1002     my $dump_cb = $self->{'dump_cb'};
1003     my %dump_cb_args = (
1004         result => $result,
1005         device_errors => $self->{'device_errors'},
1006         config_denial_message => $self->{'config_denial_message'},
1007         size => $self->{'size'},
1008         duration => $self->{'duration'},
1009         total_duration => time - $self->{'dump_start_time'},
1010         nparts => $self->{'nparts'});
1011
1012     # reset everything and let the original caller know we're done
1013     $self->{'xfer'} = undef;
1014     $self->{'xdt'} = undef;
1015     $self->{'dump_header'} = undef;
1016     $self->{'dump_cb'} = undef;
1017     $self->{'size'} = 0;
1018     $self->{'duration'} = 0.0;
1019     $self->{'nparts'} = undef;
1020     $self->{'dump_start_time'} = undef;
1021     $self->{'device_errors'} = [];
1022     $self->{'config_denial_message'} = undef;
1023
1024     # and call the callback
1025     $dump_cb->(%dump_cb_args);
1026 }
1027
1028 # keyword parameters are utilities to the caller: either specify
1029 # device_error to add to the device_errors list or config_denial_message
1030 # to set the corresponding key in $self.
1031 sub _operation_failed {
1032     my $self = shift;
1033     my %params = @_;
1034
1035     my $error_message = $params{'device_error'}
1036                      || $params{'config_denial_message'}
1037                      || 'input error';
1038     $self->dbg("operation failed: $error_message");
1039
1040     # tuck the message away as desired
1041     push @{$self->{'device_errors'}}, $params{'device_error'}
1042         if defined $params{'device_error'};
1043     $self->{'config_denial_message'} = $params{'config_denial_message'} 
1044         if $params{'config_denial_message'};
1045
1046     # cancelling the xdt will eventually cause an XMSG_DONE, which will notice
1047     # the error and set the result correctly; but if there's no xfer, then we
1048     # can just call _dump_done directly.
1049     if (defined $self->{'xfer'}) {
1050         $self->dbg("cancelling the transfer: $error_message");
1051
1052         $self->{'xfer'}->cancel();
1053     } else {
1054         if (defined $self->{'dump_cb'}) {
1055             # _dump_done constructs the dump_cb from $self parameters
1056             $self->_dump_done();
1057         } else {
1058             confess "error with no callback to handle it: $error_message";
1059         }
1060     }
1061 }
1062
1063 # release the outstanding reservation, calling scribe_notif_tape_done
1064 # after the release
1065 sub _release_reservation {
1066     my $self = shift;
1067     my %params = @_;
1068     my @errors;
1069     my $do_eject = 0;
1070
1071     my ($label, $fm, $kb);
1072
1073     # if we've already written a volume, log it
1074     if ($self->{'device'} and defined $self->{'device'}->volume_label) {
1075         $do_eject = 1 if $self->{'eject_volume'};
1076         $label = $self->{'device'}->volume_label();
1077         $fm = $self->{'device'}->file();
1078         $kb = $self->{'device_size'} / 1024;
1079
1080         # log a message for amreport
1081         $self->{'feedback'}->scribe_notif_log_info(
1082             message => "tape $label kb $kb fm $fm [OK]");
1083     }
1084
1085     # finish the device if it isn't finished yet
1086     if ($self->{'device'}) {
1087         my $already_in_error = $self->{'device'}->status() != $DEVICE_STATUS_SUCCESS;
1088
1089         if (!$self->{'device'}->finish() && !$already_in_error) {
1090             push @errors, $self->{'device'}->error_or_status();
1091         }
1092     }
1093     $self->{'device'} = undef;
1094     $self->{'device_at_eom'} = 0;
1095
1096     $self->{'reservation'}->release(eject => $do_eject, finished_cb => sub {
1097         my ($err) = @_;
1098         push @errors, "$err" if $err;
1099
1100         $self->{'reservation'} = undef;
1101
1102         # notify the feedback that we've finished and released a tape
1103         if ($label) {
1104             return $self->{'feedback'}->scribe_notif_tape_done(
1105                 volume_label => $label,
1106                 size => $kb * 1024,
1107                 num_files => $fm,
1108                 finished_cb => sub {
1109                  $params{'finished_cb'}->(@errors? join("; ", @errors) : undef);
1110                 });
1111         }
1112
1113         $params{'finished_cb'}->(@errors? join("; ", @errors) : undef);
1114     });
1115 }
1116
1117 # invoke the devhandling to get a new device, with all of the requisite
1118 # notifications and checks and whatnot.  On *success*, call _start_dump; on
1119 # failure, call other appropriate methods.
1120 sub _get_new_volume {
1121     my $self = shift;
1122
1123     # release first, if necessary
1124     if ($self->{'reservation'}) {
1125         $self->_release_reservation(finished_cb => sub {
1126             my ($error) = @_;
1127
1128             if ($error) {
1129                 $self->_operation_failed(device_error => $error);
1130             } else {
1131                 $self->_get_new_volume();
1132             }
1133         });
1134
1135         return;
1136     }
1137
1138     $self->{'devhandling'}->get_volume(volume_cb => sub { $self->_volume_cb(@_); });
1139 }
1140
1141 sub _volume_cb  {
1142     my $self = shift;
1143     my ($scan_error, $config_denial_message, $error_denial_message,
1144         $reservation, $new_label, $access_mode, $is_new, $new_scribe) = @_;
1145
1146     # note that we prefer the config_denial_message over the scan error.  If
1147     # both occurred, then the results of the scan are immaterial -- we
1148     # shouldn't have been looking for a new volume anyway.
1149
1150     if ($config_denial_message) {
1151         $self->_operation_failed(config_denial_message => $config_denial_message);
1152         return;
1153     }
1154
1155     if ($error_denial_message) {
1156         $self->_operation_failed(device_error => $error_denial_message);
1157         return;
1158     }
1159
1160     if ($new_scribe) {
1161         # Transfer the xfer to the new scribe
1162         $self->dbg("take scribe from");
1163
1164         $new_scribe->{'dump_cb'} = $self->{'dump_cb'};
1165         $new_scribe->{'dump_header'} = $self->{'dump_header'};
1166         $new_scribe->{'retry_part_on_peom'} = $self->{'retry_part_on_peom'};
1167         $new_scribe->{'allow_split'} = $self->{'allow_split'};
1168         $new_scribe->{'split_method'} = $self->{'split_method'};
1169         $new_scribe->{'xfer'} = $self->{'xfer'};
1170         $new_scribe->{'xdt'} = $self->{'xdt'};
1171         $new_scribe->{'xdt_ready'} = $self->{'xdt_ready'};
1172         $new_scribe->{'start_part_on_xdt_ready'} = $self->{'start_part_on_xdt_ready'};
1173         $new_scribe->{'size'} = $self->{'size'};
1174         $new_scribe->{'duration'} = $self->{'duration'};
1175         $new_scribe->{'dump_start_time'} = $self->{'dump_start_time'};
1176         $new_scribe->{'last_part_successful'} = $self->{'last_part_successful'};
1177         $new_scribe->{'started_writing'} = $self->{'started_writing'};
1178         $new_scribe->{'feedback'} = $self->{'feedback'};
1179         $new_scribe->{'devhandling'}->{'feedback'} = $self->{'feedback'};
1180         $self->{'dump_header'} = undef;
1181         $self->{'dump_cb'} = undef;
1182         $self->{'xfer'} = undef;
1183         $self->{'xdt'} = undef;
1184         $self->{'xdt_ready'} = undef;
1185         $self->{'dump_start_time'} = undef;
1186         $self->{'started_writing'} = 0;
1187         $self->{'feedback'} = undef;
1188         if (defined $new_scribe->{'device'}) {
1189             $new_scribe->{'xdt'}->use_device($new_scribe->{'device'});
1190         }
1191         # start it
1192         $new_scribe->_start_part();
1193
1194         return;
1195     }
1196
1197     if ($scan_error) {
1198         # we had permission to use a tape, but didn't find a tape, so we need
1199         # to notify of such
1200         $self->{'feedback'}->scribe_notif_new_tape(
1201             error => $scan_error,
1202             volume_label => undef);
1203
1204         $self->_operation_failed(device_error => $scan_error);
1205         return;
1206     }
1207
1208     $self->dbg("got new volume; writing new label");
1209
1210     # from here on, if an error occurs, we must send scribe_notif_new_tape, and look
1211     # for a new volume
1212     $self->{'reservation'} = $reservation;
1213     $self->{'device_size'} = 0;
1214     my $device = $self->{'device'} = $reservation->{'device'};
1215
1216     # turn on verbose logging now, if we need it
1217     if ($self->{'debug'}) {
1218         $reservation->{'device'}->property_set("verbose", 1);
1219     }
1220
1221     # read the label once, to get a "before" snapshot (see below)
1222     my $old_label;
1223     my $old_timestamp;
1224     if (!$is_new) {
1225         if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1226             && !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1227             $self->{'feedback'}->scribe_notif_new_tape(
1228                 error => "while reading label on new volume: " . $device->error_or_status(),
1229                 volume_label => undef);
1230
1231             return $self->_get_new_volume();
1232         }
1233         $old_label = $device->volume_label;
1234         $old_timestamp = $device->volume_time;
1235     }
1236
1237     # inform the xdt about this new device before starting it
1238     $self->{'xdt'}->use_device($device);
1239
1240     my $cbX = sub {};
1241     my $steps = define_steps
1242         cb_ref => \$cbX;
1243
1244     step device_start => sub {
1245         $self->_device_start($reservation, $access_mode, $new_label, $is_new,
1246                              $steps->{'device_started'});
1247     };
1248
1249     step device_started => sub {
1250         my $result = shift;
1251
1252         if ($result == 0) {
1253             # try reading the label to see whether we erased the tape
1254             my $erased = 0;
1255             CHECK_READ_LABEL: {
1256             # don't worry about erasing new tapes
1257                 if ($is_new) {
1258                     last CHECK_READ_LABEL;
1259                 }
1260
1261                 $device->finish();
1262                 $device->read_label();
1263
1264                 # does the device think something is broken now?
1265                 if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1266                     and !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1267                     $erased = 1;
1268                     last CHECK_READ_LABEL;
1269                 }
1270
1271                 # has the label changed?
1272                 my $vol_label = $device->volume_label;
1273                 if ((!defined $old_label and defined $vol_label)
1274                     or (defined $old_label and !defined $vol_label)
1275                     or (defined $old_label and $old_label ne $vol_label)) {
1276                     $erased = 1;
1277                     last CHECK_READ_LABEL;
1278                 }
1279
1280                 # has the timestamp changed?
1281                 my $vol_timestamp = $device->volume_time;
1282                 if ((!defined $old_timestamp and defined $vol_timestamp)
1283                     or (defined $old_timestamp and !defined $vol_timestamp)
1284                     or (defined $old_timestamp and $old_timestamp ne $vol_timestamp)) {
1285                     $erased = 1;
1286                     last CHECK_READ_LABEL;
1287                 }
1288             }
1289
1290             $self->{'feedback'}->scribe_notif_new_tape(
1291                 error => "while labeling new volume: " . $device->error_or_status(),
1292                 volume_label => $erased? $new_label : undef);
1293
1294             $self->_get_new_volume();
1295             return $cbX->();
1296         } elsif ($result != 1) {
1297             $self->{'feedback'}->scribe_notif_new_tape(
1298                 error => $result,
1299                 volume_label => undef);
1300             $self->_get_new_volume();
1301             return $cbX->();
1302         }
1303
1304         $new_label = $device->volume_label;
1305
1306         # success!
1307         $self->{'feedback'}->scribe_notif_new_tape(
1308             error => undef,
1309             volume_label => $new_label);
1310
1311         $self->{'reservation'}->set_label(label => $new_label,
1312             finished_cb => $steps->{'set_labelled'});
1313     };
1314
1315     step set_labelled => sub {
1316         my ($err) = @_;
1317         if ($err) {
1318             $self->{'feedback'}->scribe_notif_log_info(
1319                 message => "Error from set_label: $err");
1320             # fall through to start_part anyway...
1321         }
1322         $self->_start_part();
1323         return $cbX->();
1324     }
1325 }
1326
1327 # return 0 for device->start error
1328 # return 1 for success
1329 # return a message for others error
1330 sub _device_start {
1331     my $self = shift;
1332     my ($reservation, $access_mode, $new_label, $is_new, $finished_cb) = @_;
1333
1334     my $device = $reservation->{'device'};
1335     my $tl = $self->{'taperscan'}->{'tapelist'};
1336     my $meta;
1337
1338     if (!defined $tl) { # For Mock::Taperscan in installcheck
1339         if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1340             return $finished_cb->(0);
1341         } else {
1342             return $finished_cb->(1);
1343         }
1344     }
1345
1346     my $steps = define_steps
1347         cb_ref => \$finished_cb;
1348
1349     step setup => sub {
1350         return $reservation->get_meta_label(
1351                                 finished_cb => $steps->{'got_meta_label'});
1352     };
1353
1354     step got_meta_label => sub {
1355         my ($err, $meta) = @_;
1356
1357         if ($is_new) {
1358             # generate the new label and write it to the tapelist file
1359             $tl->reload(1);
1360             if (!$meta) {
1361                 ($meta, $err) = $reservation->make_new_meta_label();
1362                 if (defined $err) {
1363                     $tl->unlock();
1364                     return $finished_cb->($err);
1365                 }
1366             }
1367             ($new_label, my $err) = $reservation->make_new_tape_label(
1368                                         meta => $meta);
1369             if (!defined $new_label) {
1370                 $tl->unlock();
1371                 return $finished_cb->($err);
1372             }
1373             $tl->add_tapelabel('0', $new_label, undef, 1, $meta,
1374                                $reservation->{'barcode'});
1375             $tl->write();
1376             $self->dbg("generate new label '$new_label'");
1377         } else {
1378             $tl->reload(0);
1379             my $tle = $tl->lookup_tapelabel($new_label);
1380             $meta = $tle->{'meta'} if !defined $meta && $tle->{'meta'};
1381             my $barcode = $tle->{'barcode'};
1382             if (defined $barcode and $barcode ne $reservation->{'barcode'}) {
1383                 return $finished_cb->("tapelist for label '$new_label' have barcode '$barcode' but changer report '" . $reservation->{'barcode'} . "'");
1384             }
1385         }
1386
1387         # write the label to the device
1388         if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1389             if ($is_new) {
1390                 # remove the generated label from the tapelist file
1391                 $tl->reload(1);
1392                 $tl->remove_tapelabel($new_label);
1393                 $tl->write();
1394             }
1395             return $finished_cb->(0);
1396         }
1397
1398         # rewrite the tapelist file
1399         $tl->reload(1);
1400         my $tle = $tl->lookup_tapelabel($new_label);
1401         $meta = $tle->{'meta'} if !$meta && $tle->{'meta'};
1402         $tl->remove_tapelabel($new_label);
1403         $tl->add_tapelabel($self->{'write_timestamp'}, $new_label,
1404                            $tle? $tle->{'comment'} : undef, 1, $meta,
1405                            $reservation->{'barcode'}, $device->block_size/1024);
1406         $tl->write();
1407
1408         $reservation->set_meta_label(meta => $meta,
1409                                      finished_cb => $steps->{'set_meta_label'});
1410     };
1411
1412     step set_meta_label => sub {
1413         return $finished_cb->(1);
1414     }
1415 }
1416
1417 sub dbg {
1418     my ($self, $msg) = @_;
1419     if ($self->{'debug'}) {
1420         debug("Amanda::Taper::Scribe: $msg");
1421     }
1422 }
1423
1424 sub get_splitting_args_from_config {
1425     my %params = @_;
1426
1427     my %splitting_args;
1428
1429     $splitting_args{'allow_split'} = 0;
1430     # if dle_splitting is false, then we don't split - easy.
1431     if (defined $params{'dle_allow_split'} and !$params{'dle_allow_split'}) {
1432         return %splitting_args;
1433     }
1434
1435     # utility for below
1436     my $have_space = sub {
1437         my ($dirname, $part_size) = @_;
1438
1439         use Carp;
1440         my $fsusage = Amanda::Util::get_fs_usage($dirname);
1441         confess "$dirname" if (!$fsusage);
1442
1443         my $avail = $fsusage->{'blocksize'} * $fsusage->{'bavail'};
1444         if ($avail < $part_size) {
1445             Amanda::Debug::debug("disk cache has $avail bytes available on $dirname, but " .
1446                                  "needs $part_size");
1447             return 0;
1448         } else {
1449             return 1;
1450         }
1451     };
1452
1453     # first, handle the alternate spellings for part_size and part_cache_type
1454     $params{'part_size'} = $params{'part_size_kb'} * 1024
1455         if (defined $params{'part_size_kb'});
1456
1457     if (defined $params{'part_cache_type_enum'}) {
1458         $params{'part_cache_type'} = 'none'
1459             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_NONE);
1460         $params{'part_cache_type'} = 'memory'
1461             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_MEMORY);
1462         $params{'part_cache_type'} = 'disk'
1463             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_DISK);
1464
1465         $params{'part_cache_type'} = 'unknown'
1466             unless defined $params{'part_cache_type'};
1467     }
1468
1469     # if any of the dle_* parameters are set, use those to set the part_*
1470     # parameters, which are emptied out first.
1471     if (defined $params{'dle_tape_splitsize'} or
1472         defined $params{'dle_split_diskbuffer'} or
1473         defined $params{'dle_fallback_splitsize'}) {
1474
1475         $params{'part_size'} = $params{'dle_tape_splitsize'} || 0;
1476         $params{'part_cache_type'} = 'none';
1477         $params{'part_cache_dir'} = undef;
1478         $params{'part_cache_max_size'} = undef;
1479
1480         # part cache type is memory unless we have a split_diskbuffer that fits the bill
1481         if ($params{'part_size'}) {
1482             $splitting_args{'allow_split'} = 1;
1483             $params{'part_cache_type'} = 'memory';
1484             if (defined $params{'dle_split_diskbuffer'}
1485                     and -d $params{'dle_split_diskbuffer'}) {
1486                 if ($have_space->($params{'dle_split_diskbuffer'}, $params{'part_size'})) {
1487                     # disk cache checks out, so use it
1488                     $params{'part_cache_type'} = 'disk';
1489                     $params{'part_cache_dir'} = $params{'dle_split_diskbuffer'};
1490                 } else {
1491                     my $msg = "falling back to memory buffer for splitting: " .
1492                                 "insufficient space in disk cache directory";
1493                     $splitting_args{'warning'} = $msg;
1494                 }
1495             }
1496         }
1497
1498         if ($params{'part_cache_type'} eq 'memory') {
1499             # fall back to 10M if fallback size is not given
1500             $params{'part_cache_max_size'} = $params{'dle_fallback_splitsize'} || 10*1024*1024;
1501         }
1502     } else {
1503         my $ps = $params{'part_size'};
1504         my $pcms = $params{'part_cache_max_size'};
1505         $ps = $pcms if (!defined $ps or (defined $pcms and $pcms < $ps));
1506         $splitting_args{'allow_split'} = 1 if ((defined $ps and $ps > 0) or
1507                                                $params{'leom_supported'});
1508
1509         # fail back from 'disk' to 'none' if the disk isn't set up correctly
1510         if (defined $params{'part_cache_type'} and
1511                     $params{'part_cache_type'} eq 'disk') {
1512             my $warning;
1513             if (!$params{'part_cache_dir'}) {
1514                 $warning = "no part-cache-dir specified; "
1515                             . "using part cache type 'none'";
1516             } elsif (!-d $params{'part_cache_dir'}) {
1517                 $warning = "part-cache-dir '$params{part_cache_dir} "
1518                             . "does not exist; using part cache type 'none'";
1519             } elsif (!$have_space->($params{'part_cache_dir'}, $ps)) {
1520                 $warning = "part-cache-dir '$params{part_cache_dir} "
1521                             . "has insufficient space; using part cache type 'none'";
1522             }
1523
1524             if (defined $warning) {
1525                 $splitting_args{'warning'} = $warning;
1526                 $params{'part_cache_type'} = 'none';
1527                 delete $params{'part_cache_dir'};
1528             }
1529         }
1530     }
1531
1532     $splitting_args{'part_size'} = $params{'part_size'}
1533         if defined($params{'part_size'});
1534     $splitting_args{'part_cache_type'} = $params{'part_cache_type'}
1535         if defined($params{'part_cache_type'});
1536     $splitting_args{'part_cache_dir'} = $params{'part_cache_dir'}
1537         if defined($params{'part_cache_dir'});
1538     $splitting_args{'part_cache_max_size'} = $params{'part_cache_max_size'}
1539         if defined($params{'part_cache_max_size'});
1540
1541     return %splitting_args;
1542 }
1543 ##
1544 ## Feedback
1545 ##
1546
1547 package Amanda::Taper::Scribe::Feedback;
1548
1549 sub request_volume_permission {
1550     my $self = shift;
1551     my %params = @_;
1552
1553     # sure, you can have as many volumes as you want!
1554     $params{'perm_cb'}->(allow => 1);
1555 }
1556
1557 sub scribe_notif_new_tape { }
1558 sub scribe_notif_part_done { }
1559 sub scribe_notif_log_info { }
1560 sub scribe_notif_tape_done {
1561     my $self = shift;
1562     my %params = @_;
1563
1564     $params{'finished_cb'}->();
1565 }
1566
1567 ##
1568 ## Device Handling
1569 ##
1570
1571 package Amanda::Taper::Scribe::DevHandling;
1572 use Amanda::MainLoop;
1573 use Carp;
1574 use Amanda::Debug qw( :logging );
1575
1576 # This class handles scanning for volumes, requesting permission for those
1577 # volumes (the driver likes to feel like it's in control), and providing those
1578 # volumes to the scribe on request.  These can all happen independently, but
1579 # the scribe cannot begin writing to a volume until all three have finished.
1580 # That is: the scan is finished, the driver has given its permission, and the
1581 # scribe has requested a volume.
1582 #
1583 # On start, the class starts scanning immediately, even though the scribe has
1584 # not requested a volume.  Subsequently, a new scan does not begin until the
1585 # scribe requests a volume.
1586 #
1587 # This class is "private" to Amanda::Taper::Scribe, so it is documented in
1588 # comments, rather than POD.
1589
1590 # Create a new DevHandling object.  Params are taperscan and feedback.
1591 sub new {
1592     my $class = shift;
1593     my %params = @_;
1594
1595     my $self = {
1596         taperscan => $params{'taperscan'},
1597         feedback => $params{'feedback'},
1598
1599         # is a scan currently running, or completed?
1600         scan_running => 0,
1601         scan_finished => 0,
1602         scan_error => undef,
1603
1604         # scan results
1605         reservation => undef,
1606         device => undef,
1607         volume_label => undef,
1608
1609         # requests for permissiont to use a new volume
1610         request_pending => 0,
1611         request_complete => 0,
1612         request_denied => 0,
1613         config_denial_message => undef,
1614         error_denial_message => undef,
1615
1616         volume_cb => undef, # callback for get_volume
1617         start_finished_cb => undef, # callback for start
1618     };
1619
1620     return bless ($self, $class);
1621 }
1622
1623 ## public methods
1624
1625 # Called at scribe startup, this starts the instance off with a scan.
1626 sub start {
1627     my $self = shift;
1628     my %params = @_;
1629
1630     $self->{'start_finished_cb'} = $params{'finished_cb'};
1631     $self->_start_scanning();
1632 }
1633
1634 sub quit {
1635     my $self = shift;
1636     my %params = @_;
1637
1638     for my $rq_param (qw(finished_cb)) {
1639         croak "required parameter '$rq_param' mising"
1640             unless exists $params{$rq_param};
1641     }
1642
1643     # since there's little other option than to barrel on through the
1644     # quitting procedure, quit() just accumulates its error messages
1645     # and, if necessary, concantenates them for the finished_cb.
1646     my @errors;
1647
1648     my $cleanup_cb = make_cb(cleanup_cb => sub {
1649         my ($error) = @_;
1650         push @errors, $error if $error;
1651
1652         $error = join("; ", @errors) if @errors >= 1;
1653
1654         $params{'finished_cb'}->($error);
1655     });
1656
1657     if ($self->{'reservation'}) {
1658         if ($self->{'device'}) {
1659             if (!$self->{'device'}->finish()) {
1660                 push @errors, $self->{'device'}->error_or_status();
1661             }
1662         }
1663
1664         $self->{'reservation'}->release(finished_cb => $cleanup_cb);
1665     } else {
1666         $cleanup_cb->(undef);
1667     }
1668 }
1669
1670 # Get an open, started device and label to start writing to.  The
1671 # volume_callback takes the following arguments:
1672 #   $scan_error -- error message, or undef if no error occurred
1673 #   $config_denial_reason -- config-related reason request was denied, or undef
1674 #   $error_denial_reason -- error-related reason request was denied, or undef
1675 #   $reservation -- Amanda::Changer reservation
1676 #   $device -- open, started device
1677 # It is the responsibility of the caller to close the device and release the
1678 # reservation when finished.  If $scan_error or $request_denied_info are
1679 # defined, then $reservation and $device will be undef.
1680 sub get_volume {
1681     my $self = shift;
1682     my (%params) = @_;
1683
1684     confess "already processing a volume request"
1685         if ($self->{'volume_cb'});
1686
1687     $self->{'volume_cb'} = $params{'volume_cb'};
1688
1689     # kick off the relevant processes, if they're not already running
1690     $self->_start_request();
1691
1692     $self->_maybe_callback();
1693 }
1694
1695 # take a peek at the device we have, for which permission has not yet been
1696 # granted.  This will be undefined before the taperscan completes AND after
1697 # the volume_cb has been called.
1698 sub peek_device {
1699     my $self = shift;
1700
1701     return $self->{'device'};
1702 }
1703
1704 sub start_scan {
1705     my $self = shift;
1706
1707     if (!$self->{'scan_running'} && !$self->{'reservation'}) {
1708         $self->_start_scanning();
1709     }
1710 }
1711
1712 ## private methods
1713
1714 sub _start_scanning {
1715     my $self = shift;
1716
1717     return if $self->{'scan_running'} or $self->{'scan_finished'};
1718
1719     $self->{'scan_running'} = 1;
1720
1721     my $_user_msg_fn = sub {
1722         my %params = @_;
1723         if (exists($params{'slot_result'})) {
1724             if ($params{'does_not_match_labelstr'}) {
1725                 $self->{'feedback'}->scribe_notif_log_info(
1726                     message => "Slot $params{'slot'} with label $params{'label'} do not match labelstr");
1727             } elsif ($params{'not_in_tapelist'}) {
1728                 $self->{'feedback'}->scribe_notif_log_info(
1729                     message => "Slot $params{'slot'} with label $params{'label'} is not in the tapelist");
1730             } elsif ($params{'active'}) {
1731                 $self->{'feedback'}->scribe_notif_log_info(
1732                     message => "Slot $params{'slot'} with label $params{'label'} is not reusable");
1733             } elsif ($params{'not_autolabel'}) {
1734                 if ($params{'label'}) {
1735                     $self->{'feedback'}->scribe_notif_log_info(
1736                         message => "Slot $params{'slot'} with label $params{'label'} is not labelable ");
1737                 } elsif ($params{'empty'}) {
1738                     $self->{'feedback'}->scribe_notif_log_info(
1739                         message => "Slot $params{'slot'} is empty, autolabel not set");
1740                 } elsif ($params{'non_amanda'}) {
1741                     $self->{'feedback'}->scribe_notif_log_info(
1742                         message => "Slot $params{'slot'} is a non-amanda volume, autolabel not set");
1743                 } elsif ($params{'volume_error'}) {
1744                     $self->{'feedback'}->scribe_notif_log_info(
1745                         message => "Slot $params{'slot'} is a volume in error: $params{'err'}, autolabel not set");
1746                 } elsif ($params{'not_success'}) {
1747                     $self->{'feedback'}->scribe_notif_log_info(
1748                         message => "Slot $params{'slot'} is a device in error: $params{'err'}, autolabel not set");
1749                 } elsif ($params{'err'}) {
1750                     $self->{'feedback'}->scribe_notif_log_info(
1751                         message => "$params{'err'}");
1752                 } else {
1753                     $self->{'feedback'}->scribe_notif_log_info(
1754                         message => "Slot $params{'slot'} without label is not labelable ");
1755                 }
1756             } elsif ($params{'empty'}) {
1757                 $self->{'feedback'}->scribe_notif_log_info(
1758                     message => "Slot $params{'slot'} is empty, autolabel disabled");
1759             } elsif ($params{'non_amanda'}) {
1760                 $self->{'feedback'}->scribe_notif_log_info(
1761                     message => "Slot $params{'slot'} is a non-amanda volume, autolabel disabled");
1762             } elsif ($params{'volume_error'}) {
1763                 $self->{'feedback'}->scribe_notif_log_info(
1764                     message => "Slot $params{'slot'} is a volume in error: $params{'err'}, autolabel disabled");
1765             } elsif ($params{'not_success'}) {
1766                 $self->{'feedback'}->scribe_notif_log_info(
1767                     message => "Slot $params{'slot'} is a device in error: $params{'err'}, autolabel disabled");
1768             } elsif ($params{'err'}) {
1769                 $self->{'feedback'}->scribe_notif_log_info(
1770                     message => "$params{'err'}");
1771             } elsif ($params{'not_labelable'}) {
1772                 $self->{'feedback'}->scribe_notif_log_info(
1773                     message => "Slot $params{'slot'} without label can't be labeled");
1774             } elsif (!defined $params{'label'}) {
1775                 $self->{'feedback'}->scribe_notif_log_info(
1776                     message => "Slot $params{'slot'} without label can be labeled");
1777             } elsif ($params{'relabeled'}) {
1778                 $self->{'feedback'}->scribe_notif_log_info(
1779                     message => "Slot $params{'slot'} with label $params{'label'} will be relabeled");
1780             } else {
1781                 $self->{'feedback'}->scribe_notif_log_info(
1782                     message => "Slot $params{'slot'} with label $params{'label'} is usable");
1783             }
1784         }
1785     };
1786
1787     $self->{'taperscan'}->scan(
1788       user_msg_fn => $_user_msg_fn,
1789       result_cb => sub {
1790         my ($error, $reservation, $volume_label, $access_mode, $is_new) = @_;
1791
1792         $self->{'scan_running'} = 0;
1793         $self->{'scan_finished'} = 1;
1794
1795         $self->{'scan_error'} = $error;
1796         $self->{'reservation'} = $reservation;
1797         $self->{'device'} = $reservation->{'device'} if $reservation;
1798         $self->{'volume_label'} = $volume_label;
1799         $self->{'access_mode'} = $access_mode;
1800         $self->{'is_new'} = $is_new;
1801
1802         $self->_maybe_callback();
1803     });
1804 }
1805
1806 sub _start_request {
1807     my $self = shift;
1808
1809     return if $self->{'request_pending'} or $self->{'request_complete'};
1810
1811     $self->{'request_pending'} = 1;
1812
1813     $self->{'feedback'}->request_volume_permission(
1814     perm_cb => sub {
1815         my %params = @_;
1816
1817         $self->{'request_pending'} = 0;
1818         $self->{'request_complete'} = 1;
1819         if (defined $params{'scribe'}) {
1820             $self->{'new_scribe'} = $params{'scribe'};
1821             $self->{'scan_finished'} = 1;
1822             $self->{'request_complete'} = 1;
1823         } elsif (defined $params{'cause'}) {
1824             $self->{'request_denied'} = 1;
1825             if ($params{'cause'} eq 'config') {
1826                 $self->{'config_denial_message'} = $params{'message'};
1827             } elsif ($params{'cause'} eq 'error') {
1828                 $self->{'error_denial_message'} = $params{'message'};
1829             } else {
1830                 confess "bad cause '" . $params{'cause'} . "'";
1831             }
1832         } elsif (!defined $params{'allow'}) {
1833             confess "no allow or cause defined";
1834         }
1835
1836         $self->_maybe_callback();
1837     });
1838 }
1839
1840 sub _maybe_callback {
1841     my $self = shift;
1842
1843     # if we have any kind of error, release the reservation and come back
1844     # later
1845     if (($self->{'scan_error'} or $self->{'request_denied'}) and $self->{'reservation'}) {
1846         $self->{'device'} = undef;
1847
1848         $self->{'reservation'}->release(finished_cb => sub {
1849             my ($error) = @_;
1850
1851             # so many errors, so little time..
1852             if ($error) {
1853                 if ($self->{'scan_error'}) {
1854                     warning("ignoring error releasing reservation ($error) after a scan error");
1855                 } else {
1856                     $self->{'scan_error'} = $error;
1857                 }
1858             }
1859
1860             $self->{'reservation'} = undef;
1861             $self->_maybe_callback();
1862         });
1863
1864         return;
1865     }
1866
1867     # if we are just starting up, call the finished_cb given to start()
1868     if (defined $self->{'start_finished_cb'} and $self->{'scan_finished'}) {
1869         my $cb = $self->{'start_finished_cb'};
1870         $self->{'start_finished_cb'} = undef;
1871
1872         $cb->($self->{'scan_error'});
1873     }
1874
1875     # if the volume_cb is good to get called, call it and reset to the ground state
1876     if ($self->{'volume_cb'} and (!$self->{'scan_running'} or $self->{'scan_finished'}) and $self->{'request_complete'}) {
1877         # get the cb and its arguments lined up before calling it..
1878         my $volume_cb = $self->{'volume_cb'};
1879         my @volume_cb_args = (
1880             $self->{'scan_error'},
1881             $self->{'config_denial_message'},
1882             $self->{'error_denial_message'},
1883             $self->{'reservation'},
1884             $self->{'volume_label'},
1885             $self->{'access_mode'},
1886             $self->{'is_new'},
1887             $self->{'new_scribe'},
1888         );
1889
1890         # reset everything and prepare for a new scan
1891         $self->{'scan_finished'} = 0;
1892
1893         $self->{'reservation'} = undef;
1894         $self->{'device'} = undef;
1895         $self->{'volume_label'} = undef;
1896
1897         $self->{'request_complete'} = 0;
1898         $self->{'request_denied'} = 0;
1899         $self->{'config_denial_message'} = undef;
1900         $self->{'error_denial_message'} = undef;
1901         $self->{'volume_cb'} = undef;
1902         $self->{'new_scribe'} = undef;
1903
1904         $volume_cb->(@volume_cb_args);
1905     }
1906 }
1907
1908 1;