Imported Upstream version 3.3.0
[debian/amanda] / perl / Amanda / Taper / Scribe.pm
1 # Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This library is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU Lesser General Public License version 2.1 as
5 # published by the Free Software Foundation.
6 #
7 # This library is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
10 # License for more details.
11 #
12 # You should have received a copy of the GNU Lesser General Public License
13 # along with this library; if not, write to the Free Software Foundation,
14 # Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
15 #
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 =head1 NAME
20
21 Amanda::Taper::Scribe
22
23 =head1 SYNOPSIS
24
25   step start_scribe => sub {
26       my $scribe = Amanda::Taper::Scribe->new(
27             taperscan => $taperscan_algo,
28             feedback => $feedback_obj);
29     $scribe->start(
30         write_timestamp => $write_timestamp,
31         finished_cb => $steps->{'start_xfer'});
32   };
33
34   step start_xfer => sub {
35     my ($err) = @_;
36     my $xfer_dest = $scribe->get_xfer_dest(
37         allow_split => 1,
38         max_memory => 64 * 1024,
39         can_cache_inform => 0,
40         part_size => 150 * 1024**2,
41         part_cache_type => 'disk',
42         part_cache_dir => "$tmpdir/splitbuffer",
43         part_cache_max_size => 20 * 1024**2);
44     # .. set up the rest of the transfer ..
45     $xfer->start(sub {
46         my ($src, $msg, $xfer) = @_;
47         $scribe->handle_xmsg($src, $msg, $xfer);
48         # .. any other processing ..
49     };
50     # tell the scribe to start dumping via this transfer
51     $scribe->start_dump(
52         xfer => $xfer,
53         dump_header => $hdr,
54         dump_cb => $steps->{'dump_cb'});
55   };
56
57   step dump_cb => sub {
58       my %params = @_;
59       # .. handle dump results ..
60       print "DONE\n";
61       $finished_cb->();
62   };
63
64
65 =head1 OVERVIEW
66
67 This package provides a high-level abstraction of Amanda's procedure for
68 writing dumpfiles to tape.
69
70 Amanda writes a sequence of dumpfiles to a sequence of volumes.  The
71 volumes are supplied by a taperscan algorithm, which operates a changer
72 to find and load each volume.  As dumpfiles are written to volumes and
73 those volumes fill up, the taperscan algorithm supplies additional
74 volumes.
75
76 In order to reduce internal fragmentation within volumes, Amanda can "split"
77 dumpfiles into smaller pieces, so that the overall dumpfile can span multiple
78 volumes.  Each "part" is written to the volume in sequence.  If a device
79 encounters an error while writing a part, then that part is considered
80 "partial", and is rewritten from its beginning on the next volume.  Some
81 devices can reliably indicate that they are full (EOM), and for these devices
82 parts are simply truncated, and the Scribe starts the next part on the next
83 volume.
84
85 To facilitate rewriting parts on devices which cannot indicate EOM, Amanda must
86 retain all of the data in a part, even after that data is written to the
87 volume.  The Scribe provides several methods to support this: caching the part
88 in memory, caching the part in a special on-disk file, or relying on
89 pre-existing on-disk storage.  The latter method is used when reading from
90 holding disks.
91
92 The details of efficiently splitting dumpfiles and rewriting parts are handled
93 by the low-level C<Amanda::Xfer::Dest::Taper> subclasses.  The Scribe creates
94 an instance of the appropriate subclass and supplies it with volumes from an
95 C<Amanda::Taper::Scan> object.  It calls a number of
96 C<Amanda::Taper::Scribe::Feedback> methods to indicate the status of the dump
97 process and to request permission for each additional volume.
98
99 =head1 OPERATING A SCRIBE
100
101 The C<Amanda::Taper::Scribe> constructor takes two arguments:
102 C<taperscan> and C<feedback>.  The first specifies the taper scan
103 algorithm that the Scribe should use, and the second specifies the
104 C<Feedback> object that will receive notifications from the Scribe (see
105 below).
106
107   my $scribe = Amanda::Taper::Scribe->new(
108         taperscan => $my_taperscan,
109         feedback => $my_feedback);
110
111 Once the object is in place, call its C<start> method.
112
113 =head2 START THE SCRIBE
114
115 Start the scribe's operation by calling its C<start> method.  This will invoke
116 the taperscan algorithm and scan for a volume.  The method takes two parameters:
117
118   $scribe->start(
119         write_timestamp => $ts,
120         finished_cb => $start_finished_cb);
121
122 The timestamp will be written to each volume written by the Scribe.  The
123 C<finished_cb> will be called with a single argument - C<undef> or an error
124 message - when the Scribe is ready to start its first dump.  The Scribe is
125 "ready" when it has found a device to which it can write, although it does not
126 request permission to overwrite that volume, nor start overwriting it, until
127 the first dump begins (that is, until the first call to C<start_dump>).
128
129 =head2 SET UP A TRANSFER
130
131 Once the Scribe is started, begin transferring a dumpfile.  This is a
132 three-step process: first, get an C<Amanda::Xfer::Dest::Taper> object from the
133 Scribe, then start the transfer, and finally let the Scribe know that the
134 transfer has started.  Note that the Scribe supplies and manages the transfer
135 destination, but the transfer itself remains the responsibility of the caller.
136
137 =head3 Get device
138
139 Call C<get_device> to get the first device the xfer will be working with.
140
141   $device = $scribe->get_device();
142
143 This method must be called after C<start> has completed.
144
145 =head3 Check device compatibily for the data path
146
147 Call C<check_data_path>, supplying the data_path requested by the user.
148
149   if (my $err = $scribe->check_data_path($data_path)) {
150       # handle error message
151   }
152
153 This method must be called after C<start> has completed and before
154 C<get_xfer_dest> is called. It returns C<undef> on success or an error message
155 if the supplied C<data_path> is incompatible with the device.  This is mainly
156 used to detect when a DirectTCP dump is going to a non-DirectTCP device.
157
158 =head3 Get a Transfer Destination
159
160 Call C<get_xfer_dest> to get the transfer element, supplying information on how
161 the dump should be split:
162
163   $xdest = $scribe->get_xfer_dest(
164         allow_split => $allow_split,
165         max_memory => $max_memory,
166         # .. splitting parameters
167         );
168
169 This method must be called after C<start> has completed, and will always return
170 a transfer element immediately.  The underlying C<Amanda::Xfer::Dest::Taper>
171 handles device streaming properly.  It uses C<max_memory> bytes of memory for
172 this purpose.
173
174 The splitting parameters to C<get_xfer_dest> are:
175
176 =over 4
177
178 =item C<allow_split>
179
180 this dle is allowed or not to split
181
182 =item C<part_size>
183
184 the split part size to use, or 0 for no splitting
185
186 =item C<part_cache_type>
187
188 when caching, the kind of caching to perform ('disk', 'memory' or the default,
189 'none')
190
191 =item C<part_cache_dir>
192
193 the directory to use for disk caching
194
195 =item C<part_cache_max_size>
196
197 the maximum part size to use when caching
198
199 =item C<can_cache_inform>
200
201 true if the transfer source can call the destination's C<cache_inform> method
202 (e.g., C<Amanda::Xfer::Source::Holding>).
203
204 =back
205
206 The first four of these parameters correspond exactly to the eponymous tapetype
207 configuration parameters, and have the same default values (when omitted or
208 C<undef>).  The method will take this information, along with details of the
209 device it intends to use, and set up the transfer destination.
210
211 The utility function C<get_splitting_args_from_config> can determine the
212 appropriate C<get_xfer_dest> splitting parameters based on a
213 few Amanda configuration parameters.  If a parameter was not seen in the
214 configuration, it should be omitted or passed as C<undef>.  The function
215 returns a hash to pass to C<get_xfer_dest>, although that hash may have an
216 C<warning> key containing a message if there is a problem that the user
217 should know about.
218
219   use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
220   my %splitting_args = get_splitting_args_from_config(
221     # Amanda dumptype configuration parameters,
222     dle_allow_split => ..,
223     dle_tape_splitsize => ..,
224     dle_split_diskbuffer => ..,
225     dle_fallback_splitsize => ..,
226     dle_allow_split => ..,
227     # Amanda tapetype configuration parameters,
228     part_size => .., ## in bytes, not kb!!
229     part_size_kb => ..., ## or use this, in kb
230     part_cache_type => ..,
231     part_cache_type_enum => ..., ## one of the enums from tapetype_getconf
232     part_cache_dir => ..,
233     part_cache_max_size => ..,
234   );
235   if ($splitting_args{'error'}) { .. }
236
237 An C<Amanda::Taper::Scribe> object can only run one transfer at a time, so
238 do not call C<get_xfer_dest> until the C<dump_cb> for the previous C<start_dump>
239 has been called.
240
241 =head3 Start the Transfer
242
243 Armed with the element returned by C<get_xfer_dest>, the caller should create a
244 source element and a transfer object and start the transfer.  In order to
245 manage the splitting process, the Scribe needs to be informed, via its
246 C<handle_xmsg> method, of all transfer messages .  This is usually accomplished
247 with something like:
248
249   $xfer->start(sub {
250       my ($src, $msg, $xfer) = @_;
251       $scribe->handle_xmsg($src, $msg, $xfer);
252   });
253
254 =head3 Inform the Scribe
255
256 Once the transfer has started, the Scribe is ready to begin writing parts to
257 the volume.  This is the first moment at which the Scribe needs a header, too.
258 All of this is supplied to the C<start_dump> method:
259
260   $scribe->start_dump(
261       xfer => $xfer,
262       dump_header => $hdr,
263       dump_cb => $dump_cb);
264
265 The c<dump_header> here is the header that will be applied to all parts of the
266 dumpfile.  The only field in the header that the Scribe controls is the part
267 number.  The C<dump_cb> callback passed to C<start_dump> is called when the
268 dump is completely finished - either successfully or with a fatal error.
269 Unlike most callbacks, this one takes keyword arguments, since it has so many
270 parameters.
271
272   $dump_cb->(
273         result => $result,
274         device_errors => $device_errors,
275         config_denial_message => $cdm,
276         size => $size,
277         duration => $duration,
278         total_duration => $total_duration,
279         nparts => $nparts);
280
281 All parameters will be present on every call, although the order is not
282 guaranteed.
283
284 The C<result> is one of C<"FAILED">, C<"PARTIAL">, or C<"DONE">.  Even when
285 C<dump_cb> reports a fatal error, C<result> may be C<"PARTIAL"> if some data
286 was written successfully.
287
288 The C<device_error> key points to a list of errors, each given as a string,
289 that describe what went wrong to cause the dump to fail.  The
290 C<config_denial_message> parrots the reason provided by C<$perm_cb> (see below)
291 for denying use of a new tape if the cause was 'config', and is C<undef>
292 otherwise.
293
294 The final parameters, C<size> (in bytes), C<duration>, C<total_duration> (in
295 seconds), and C<nparts> describe the total transfer, and are a sum of all of
296 the parts written to the device.  Note that C<nparts> does not include any
297 empty trailing parts.  Note that C<duration> does not include time spent
298 operating the changer, while C<total_duration> reflects the time from the
299 C<start_dump> call to the invocation of the C<dump_cb>.
300
301 =head3 Cancelling a Dump
302
303 After you have requested a transfer destination, the scribe is poised to begin the
304 transfer.  If you cannot actually perform the transfer for some reason, you'll need
305 to go through the motions all the same, but cancel the operation immediately.  That
306 can be done by calling C<cancel_dump>:
307
308   $scribe->cancel_dump(
309         xfer => $xfer,
310         dump_cb => $dump_cb);
311
312 =head2 QUIT
313
314 When all of the dumpfiles are transferred, call the C<quit> method to
315 release any resources and clean up.  This method takes a typical
316 C<finished_cb>.
317
318   $scribe->quit(finished_cb => sub {
319     print "ALL DONE!\n";
320   });
321
322 =head2 GET_BYTES_WRITTEN
323
324 The C<get_bytes_written> returns the number of bytes written to the device at
325 the time of the call, and is meant to be used for status reporting.  This value
326 is updated at least as each part is finished; for some modes of operation, it
327 is updated continuously.  Notably, DirectTCP transfers do not update
328 continuously.
329
330 =head2 START_SCAN
331
332 The C<start_scan> method initiate a scan of the changer to find a usable tape.
333
334 =head1 FEEDBACK
335
336 The C<Amanda::Taper::Scribe::Feedback> class is intended to be
337 subclassed by the user.  It provides a number of notification methods
338 that enable the historical logging and driver/taper interactions
339 required by Amanda.  The parent class does nothing of interest, but
340 allows subclasses to omit methods they do not need.
341
342 The C<request_volume_permission> method provides a means for the caller
343 to limit the number of volumes the Scribe consumes.  It is called as
344
345   $fb->request_volume_permission(perm_cb => $cb);
346
347 The C<perm_cb> is a callback which expects a hash as arguments. If C<allow>
348 is set, then the scribe is allowed to use a new volume, if C<scribe> is set,
349 then the xfer must be transfered to that scribe, otherwise a C<cause>
350 and a C<message> describing why a new volume should not be used. must be
351 set. e.g.
352
353   perm_cb->(allow => 1);
354   perm_cb->(scribe => $new_scribe);
355   perm_cb->(cause => 'config', message => $message);
356   perm_cb->(cause => 'error', message => $message);
357
358 A cause of 'config' indicates that the denial is due to the user's
359 configuration, and thus should not be presented as an error.  The default
360 implementation always calls C<< perm_cb->() >>.
361
362 All of the remaining methods are notifications, and do not take a
363 callback.
364
365   $fb->scribe_notif_new_tape(
366         error => $error,
367         volume_label => $volume_label);
368
369 The Scribe calls C<scribe_notif_new_tape> when a new volume is started.  If the
370 C<volume_label> is undefined, then the volume was not successfully
371 relabled, and its previous contents may still be available.  If C<error>
372 is defined, then no useful data was written to the volume.  Note that
373 C<error> and C<volume_label> may I<both> be defined if the previous
374 contents of the volume were erased, but no useful, new data was written
375 to the volume.
376
377 This method will be called exactly once for every call to
378 C<request_volume_permission> that calls back with C<< perm_cb->() >>.
379
380   $fb->scribe_notif_tape_done(
381         volume_label => $volume_label,
382         size => $size,
383         num_files => $num_files);
384
385 The C<scribe_notif_tape_done> method is called after a volume is completely
386 written and its reservation has been released.  Note that the scribe waits
387 until the last possible moment to release a reservation, so this may be called
388 later than expected, e.g., during a C<quit> invocation.
389
390   $fb->scribe_notif_part_done(
391         partnum => $partnum,
392         fileno => $fileno,
393         successful => $successful,
394         size => $size,
395         duration => $duration);
396
397 The Scribe calls C<scribe_notif_part_done> for each part written to the volume,
398 including partial parts.  If the part was not written successfully, then
399 C<successful> is false.  The C<size> is in bytes, and the C<duration> is
400 a floating-point number of seconds.  If a part fails before a new device
401 file is created, then C<fileno> may be zero.
402
403 Finally, the Scribe sends a few historically significant trace log messages
404 via C<scribe_notif_log_info>:
405
406   $fb->scribe_notif_log_info(
407         message => $message);
408
409 A typical Feedback subclass might begin like this:
410
411   package main::Feedback;
412   use base 'Amanda::Taper::Scribe::Feedback';
413
414   sub request_volume_permission {
415     my $self = shift;
416     my %params = @_;
417
418     $params{'perm_cb'}->(cause => "error", message => "NO VOLUMES FOR YOU!");
419   }
420
421 =cut
422
423 package Amanda::Taper::Scribe;
424
425 use strict;
426 use warnings;
427 use Carp;
428
429 use Amanda::Xfer qw( :constants );
430 use Amanda::Device qw( :constants );
431 use Amanda::Header;
432 use Amanda::Debug qw( :logging );
433 use Amanda::MainLoop;
434 use Amanda::Tapelist;
435 use Amanda::Config qw( :getconf config_dir_relative );
436 use base qw( Exporter );
437
438 our @EXPORT_OK = qw( get_splitting_args_from_config );
439
440 sub new {
441     my $class = shift;
442     my %params = @_;
443
444     my $decide_debug = $Amanda::Config::debug_taper || $params{'debug'};
445     for my $rq_param qw(taperscan feedback) {
446         croak "required parameter '$rq_param' mising"
447             unless exists $params{$rq_param};
448     }
449
450     my $self = {
451         taperscan => $params{'taperscan'},
452         feedback => $params{'feedback'},
453         debug => $decide_debug,
454         write_timestamp => undef,
455         started => 0,
456
457         # device handling, and our current device and reservation
458         devhandling => Amanda::Taper::Scribe::DevHandling->new(
459             taperscan => $params{'taperscan'},
460             feedback => $params{'feedback'},
461         ),
462         reservation => undef,
463         device => undef,
464         device_size => undef,
465         device_at_eom => undef, # device still exists, but is full
466
467         # callback passed to start_dump
468         dump_cb => undef,
469
470         # information for the current dumpfile
471         dump_header => undef,
472         retry_part_on_peom => undef,
473         allow_split => undef,
474         xfer => undef,
475         xdt => undef,
476         xdt_ready => undef,
477         start_part_on_xdt_ready => 0,
478         size => 0,
479         duration => 0.0,
480         dump_start_time => undef,
481         last_part_successful => 0,
482         started_writing => 0,
483         device_errors => [],
484         config_denial_message => undef,
485     };
486
487     return bless ($self, $class);
488 }
489
490 sub start {
491     my $self = shift;
492     my %params = @_;
493
494     for my $rq_param qw(write_timestamp finished_cb) {
495         croak "required parameter '$rq_param' missing"
496             unless exists $params{$rq_param};
497     }
498
499     die "scribe already started" if $self->{'started'};
500
501     $self->dbg("starting");
502     $self->{'write_timestamp'} = $params{'write_timestamp'};
503
504     # start up the DevHandling object, making sure we know
505     # when it's done with its startup process
506     $self->{'devhandling'}->start(finished_cb => sub {
507         $self->{'started'} = 1;
508         $params{'finished_cb'}->(@_);
509     });
510 }
511
512 sub quit {
513     my $self = shift;
514     my %params = @_;
515
516     # since there's little other option than to barrel on through the
517     # quitting procedure, quit() just accumulates its error messages
518     # and, if necessary, concantenates them for the finished_cb.
519     my @errors;
520
521     my $steps = define_steps
522         cb_ref => \$params{'finished_cb'};
523
524     step setup => sub {
525         $self->dbg("quitting");
526
527         if ($self->{'xfer'}) {
528             die "Scribe cannot quit while a transfer is active";
529             # Supporting this would be complicated:
530             # - cancel the xfer and wait for it to complete
531             # - ensure that the taperscan not be started afterward
532             # and isn't required for normal Amanda operation.
533         }
534
535         $steps->{'release'}->();
536     };
537
538     step release => sub {
539         if ($self->{'reservation'}) {
540             $self->_release_reservation(finished_cb => $steps->{'released'});
541         } else {
542             $steps->{'stop_devhandling'}->();
543         }
544     };
545
546     step released => sub {
547         my ($err) = @_;
548         push @errors, "$err" if $err;
549
550         $self->{'reservation'} = undef;
551
552         $steps->{'stop_devhandling'}->();
553     };
554
555     step stop_devhandling => sub {
556         $self->{'devhandling'}->quit(finished_cb => $steps->{'stopped_devhandling'});
557     };
558
559     step stopped_devhandling => sub {
560         my ($err) = @_;
561         push @errors, "$err" if $err;
562
563         my $errmsg = join("; ", @errors) if @errors >= 1;
564         $params{'finished_cb'}->($errmsg);
565     };
566 }
567
568 sub get_device {
569     my $self = shift;
570
571     # Can return a device we already have, or "peek" at the
572     # DevHandling object's device.
573     # It might not have right permission on the device.
574
575     my $device;
576     if (defined $self->{'device'}) {
577         $device = $self->{'device'};
578     } else {
579         $device = $self->{'devhandling'}->peek_device();
580     }
581     return $device;
582 }
583
584 sub check_data_path {
585     my $self = shift;
586     my $data_path = shift;
587
588     my $device = $self->get_device();
589
590     if (!defined $device) {
591         die "no device is available to check the datapath";
592     }
593
594     my $use_directtcp = $device->directtcp_supported();
595
596     my $xdt;
597     if (!$use_directtcp) {
598         if ($data_path eq 'DIRECTTCP') {
599             return "Can't dump DIRECTTCP data-path dle to a device ('" .
600                    $device->device_name .
601                    "') that doesn't support it";
602         }
603     }
604     return undef;
605 }
606
607 sub start_scan {
608     my $self = shift;
609
610     $self->{'devhandling'}->start_scan();
611 }
612
613 # Get a transfer destination; does not use a callback
614 sub get_xfer_dest {
615     my $self = shift;
616     my %params = @_;
617
618     for my $rq_param qw(max_memory) {
619         croak "required parameter '$rq_param' missing"
620             unless exists $params{$rq_param};
621     }
622
623     die "not yet started"
624         unless $self->{'write_timestamp'} and $self->{'started'};
625     die "xfer element already returned"
626         if ($self->{'xdt'});
627     die "xfer already running"
628         if ($self->{'xfer'});
629
630     $self->{'xfer'} = undef;
631     $self->{'xdt'} = undef;
632     $self->{'size'} = 0;
633     $self->{'duration'} = 0.0;
634     $self->{'nparts'} = undef;
635     $self->{'dump_start_time'} = undef;
636     $self->{'last_part_successful'} = 1;
637     $self->{'started_writing'} = 0;
638     $self->{'device_errors'} = [];
639     $self->{'config_denial_message'} = undef;
640
641     # set the callback
642     $self->{'dump_cb'} = undef;
643     $self->{'retry_part_on_peom'} = 1;
644     $self->{'allow_split'} = 0;
645     $self->{'start_part_on_xdt_ready'} = 0;
646
647     # start getting parameters together to determine what kind of splitting
648     # and caching we're going to do
649     my $part_size = $params{'part_size'} || 0;
650     my ($use_mem_cache, $disk_cache_dirname) = (0, undef);
651     my $can_cache_inform = $params{'can_cache_inform'};
652     my $part_cache_type = $params{'part_cache_type'} || 'none';
653     my $allow_split = $params{'allow_split'};
654
655     my $xdt_first_dev = $self->get_device();
656     if (!defined $xdt_first_dev) {
657         die "no device is available to create an xfer_dest";
658     }
659     my $leom_supported = $xdt_first_dev->property_get("leom");
660     my $use_directtcp = $xdt_first_dev->directtcp_supported();
661
662     # figure out the destination type we'll use, based on the circumstances
663     my ($dest_type, $dest_text);
664     if ($use_directtcp) {
665         $dest_type = 'directtcp';
666         $dest_text = "using DirectTCP";
667     } elsif ($can_cache_inform && $leom_supported) {
668         $dest_type = 'splitter';
669         $dest_text = "using LEOM (falling back to holding disk as cache)";
670     } elsif ($leom_supported) {
671         $dest_type = 'splitter';
672         $dest_text = "using LEOM detection (no caching)";
673     } elsif ($can_cache_inform) {
674         $dest_type = 'splitter';
675         $dest_text = "using cache_inform";
676     } elsif ($part_cache_type ne 'none') {
677         $dest_type = 'cacher';
678
679         # we'll be caching, so apply the maximum size
680         my $part_cache_max_size = $params{'part_cache_max_size'} || 0;
681         $part_size = $part_cache_max_size
682             if ($part_cache_max_size and $part_cache_max_size < $part_size);
683
684         # and figure out what kind of caching to apply
685         if ($part_cache_type eq 'memory') {
686             $use_mem_cache = 1;
687         } else {
688             # note that we assume this has already been checked; if it's wrong,
689             # the xfer element will just fail immediately
690             $disk_cache_dirname = $params{'part_cache_dir'};
691         }
692         $dest_text = "using cache type '$part_cache_type'";
693     } else {
694         $dest_type = 'splitter';
695         $dest_text = "using no cache (PEOM will be fatal)";
696
697         # no directtcp, no caching, no cache_inform, and no LEOM, so a PEOM will be fatal
698         $self->{'retry_part_on_peom'} = 0;
699     }
700
701     if ($allow_split &&
702         ($can_cache_inform ||
703          !defined($part_cache_type) ||
704          $part_cache_type eq 'disk' ||
705          $part_cache_type eq 'memory' ||
706          $leom_supported)) {
707         $self->{'allow_split'} = 1;
708     } else {
709         $self->{'allow_split'} = 0;
710     }
711
712     $self->{'retry_part_on_peom'} = 0 if !$self->{'allow_split'};
713
714     debug("Amanda::Taper::Scribe preparing to write, part size $part_size, "
715         . "$dest_text ($dest_type) "
716         . ($leom_supported? " (LEOM supported)" : " (no LEOM)"));
717
718     # set the device to verbose logging if we're in debug mode
719     if ($self->{'debug'}) {
720         $xdt_first_dev->property_set("verbose", 1);
721     }
722
723     my $xdt;
724     if ($dest_type eq 'directtcp') {
725         $xdt = Amanda::Xfer::Dest::Taper::DirectTCP->new(
726             $xdt_first_dev, $part_size);
727         $self->{'xdt_ready'} = 0; # xdt isn't ready until we get XMSG_READY
728     } elsif ($dest_type eq 'splitter') {
729         $xdt = Amanda::Xfer::Dest::Taper::Splitter->new(
730             $xdt_first_dev, $params{'max_memory'}, $part_size, $can_cache_inform);
731         $self->{'xdt_ready'} = 1; # xdt is ready immediately
732     } else {
733         $xdt = Amanda::Xfer::Dest::Taper::Cacher->new(
734             $xdt_first_dev, $params{'max_memory'}, $part_size,
735             $use_mem_cache, $disk_cache_dirname);
736         $self->{'xdt_ready'} = 1; # xdt is ready immediately
737     }
738     $self->{'start_part_on_xdt_ready'} = 0;
739     $self->{'xdt'} = $xdt;
740
741     return $xdt;
742 }
743
744 sub start_dump {
745     my $self = shift;
746     my %params = @_;
747
748     die "no xfer dest set up; call get_xfer_dest first"
749         unless defined $self->{'xdt'};
750
751     # get the header ready for writing (totalparts was set by the caller)
752     $self->{'dump_header'} = $params{'dump_header'};
753     $self->{'dump_header'}->{'partnum'} = 1;
754
755     # set up the dump_cb for when this dump is done, and keep the xfer
756     $self->{'dump_cb'} = $params{'dump_cb'};
757     $self->{'xfer'} = $params{'xfer'};
758     $self->{'dump_start_time'} = time;
759
760     # and start the part
761     $self->_start_part();
762 }
763
764 sub cancel_dump {
765     my $self = shift;
766     my %params = @_;
767
768     die "no xfer dest set up; call get_xfer_dest first"
769         unless defined $self->{'xdt'};
770
771     # set up the dump_cb for when this dump is done, and keep the xfer
772     $self->{'dump_cb'} = $params{'dump_cb'};
773     $self->{'xfer'} = $params{'xfer'};
774
775     # XXX The cancel should call dump_cb, but right now the xfer stays hung in
776     # accept.  So we leave the xfer to its hang, and dump_cb is called and xdt
777     # and xfer are set to undef.  This should be fixed in 3.2.
778
779     $self->{'xfer'}->cancel();
780
781     $self->{'dump_cb'}->(
782         result => "FAILED",
783         device_errors => [],
784         config_denial_message => undef,
785         size => 0,
786         duration => 0.0,
787         total_duration => 0,
788         nparts => 0);
789     $self->{'xdt'} = undef;
790     $self->{'xfer'} = undef;
791 }
792
793 sub get_bytes_written {
794     my ($self) = @_;
795
796     if (defined $self->{'xdt'}) {
797         return $self->{'size'} + $self->{'xdt'}->get_part_bytes_written();
798     } else {
799         return $self->{'size'};
800     }
801 }
802
803 sub _start_part {
804     my $self = shift;
805
806     $self->dbg("trying to start part");
807
808     # if the xdt isn't ready yet, wait until it is; note that the XDT is still
809     # using the device right now, so we can't even label it yet.
810     if (!$self->{'xdt_ready'}) {
811         $self->dbg("XDT not ready yet; waiting until it is");
812         $self->{'start_part_on_xdt_ready'} = 1;
813         return
814     }
815
816     # we need an actual, permitted device at this point, so if we don't have
817     # one, then defer this start_part call until we do.  The device may still
818     # exist, but be at EOM, if the last dump failed at EOM and was not retried
819     # on a new volume.
820     if (!$self->{'device'} or $self->{'device_at_eom'}) {
821         # _get_new_volume calls _start_part when it has a new volume in hand
822         return $self->_get_new_volume();
823     }
824
825     # if the dump wasn't successful, and we're not splitting, then bail out.  It's
826     # up to higher-level components to re-try this dump on a new volume, if desired.
827     # Note that this should be caught in the XMSG_PART_DONE handler -- this is just
828     # here for backup.
829     if (!$self->{'last_part_successful'} and !$self->{'retry_part_on_peom'}) {
830         $self->_operation_failed(device_error => "No space left on device (uncaught)");
831         return;
832     }
833
834     # and start writing this part
835     $self->{'started_writing'} = 1;
836     $self->dbg("resuming transfer");
837     $self->{'xdt'}->start_part(!$self->{'last_part_successful'},
838                                $self->{'dump_header'});
839 }
840
841 sub handle_xmsg {
842     my $self = shift;
843     my ($src, $msg, $xfer) = @_;
844
845     if ($msg->{'type'} == $XMSG_DONE) {
846         $self->_xmsg_done($src, $msg, $xfer);
847         return;
848     }
849
850     # for anything else we only pay attention to messages from
851     # our own element
852     if ($msg->{'elt'} == $self->{'xdt'}) {
853         $self->dbg("got msg from xfer dest: $msg");
854         if ($msg->{'type'} == $XMSG_PART_DONE) {
855             $self->_xmsg_part_done($src, $msg, $xfer);
856         } elsif ($msg->{'type'} == $XMSG_READY) {
857             $self->_xmsg_ready($src, $msg, $xfer);
858         } elsif ($msg->{'type'} == $XMSG_ERROR) {
859             $self->_xmsg_error($src, $msg, $xfer);
860         }
861     }
862 }
863
864 sub _xmsg_part_done {
865     my $self = shift;
866     my ($src, $msg, $xfer) = @_;
867
868     # this handles successful zero-byte parts as a special case - they
869     # are an implementation detail of the splitting done by the transfer
870     # destination.
871
872     if ($msg->{'successful'} and $msg->{'size'} == 0) {
873         $self->dbg("not notifying for empty, successful part");
874     } else {
875         # double-check partnum
876         die "Part numbers do not match!"
877             unless ($self->{'dump_header'}->{'partnum'} == $msg->{'partnum'});
878
879         # notify
880         $self->{'feedback'}->scribe_notif_part_done(
881             partnum => $msg->{'partnum'},
882             fileno => $msg->{'fileno'},
883             successful => $msg->{'successful'},
884             size => $msg->{'size'},
885             duration => $msg->{'duration'});
886
887         # increment nparts here, so empty parts are not counted
888         $self->{'nparts'} = $msg->{'partnum'};
889     }
890
891     $self->{'last_part_successful'} = $msg->{'successful'};
892
893     if ($msg->{'successful'}) {
894         $self->{'device_size'} += $msg->{'size'};
895         $self->{'size'} += $msg->{'size'};
896         $self->{'duration'} += $msg->{'duration'};
897     }
898
899     if (!$msg->{'eof'}) {
900         # update the header for the next dumpfile, if this was a non-empty part
901         if ($msg->{'successful'} and $msg->{'size'} != 0) {
902             $self->{'dump_header'}->{'partnum'}++;
903         }
904
905         if ($msg->{'eom'}) {
906             # if there's an error finishing the device, it's probably just carryover
907             # from the error the Xfer::Dest::Taper encountered while writing to the
908             # device, so we ignore it.
909             if (!$self->{'device'}->finish()) {
910                 my $devname = $self->{'device'}->device_name;
911                 my $errmsg = $self->{'device'}->error_or_status();
912                 $self->dbg("ignoring error while finishing device '$devname': $errmsg");
913             }
914
915             # if the part failed..
916             if (!$msg->{'successful'} || !$self->{'allow_split'}) {
917                 # if no caching was going on, then the dump has failed
918                 if (!$self->{'retry_part_on_peom'}) {
919                     # mark this device as at EOM, since we are not going to look
920                     # for another one yet
921                     $self->{'device_at_eom'} = 1;
922
923                     my $msg = "No space left on device";
924                     if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
925                         $msg = $self->{'device'}->error_or_status();
926                     }
927                     $self->_operation_failed(device_error => "$msg, splitting not enabled");
928                     return;
929                 }
930
931                 # log a message for amreport
932                 $self->{'feedback'}->scribe_notif_log_info(
933                     message => "Will request retry of failed split part.");
934             }
935
936             # get a new volume, then go on to the next part
937             $self->_get_new_volume();
938         } else {
939             # if the part was unsuccessful, but the xfer dest has reason to believe
940             # this is not due to EOM, then the dump is done
941             if (!$msg->{'successful'}) {
942                 my $msg = "unknown error while dumping";
943                 if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
944                     $msg = $self->{'device'}->error_or_status();
945                 }
946                 $self->_operation_failed(device_error => $msg);
947                 return;
948             }
949
950             # no EOM -- go on to the next part
951             $self->_start_part();
952         }
953     }
954 }
955
956 sub _xmsg_ready {
957     my $self = shift;
958     my ($src, $msg, $xfer) = @_;
959
960     $self->dbg("XDT is ready");
961     $self->{'xdt_ready'} = 1;
962     if ($self->{'start_part_on_xdt_ready'}) {
963         $self->{'start_part_on_xdt_ready'} = 0;
964         $self->_start_part();
965     }
966 }
967
968 sub _xmsg_error {
969     my $self = shift;
970     my ($src, $msg, $xfer) = @_;
971
972     # XMSG_ERROR from the XDT is always fatal
973     $self->_operation_failed(device_error => $msg->{'message'});
974 }
975
976 sub _xmsg_done {
977     my $self = shift;
978     my ($src, $msg, $xfer) = @_;
979
980     if ($msg->{'type'} == $XMSG_DONE) {
981         $self->dbg("transfer is complete");
982         $self->_dump_done();
983     }
984 }
985
986 sub _dump_done {
987     my $self = shift;
988
989     my $result;
990
991     # determine the correct final status - DONE if we're done, PARTIAL
992     # if we've started writing to the volume, otherwise FAILED
993     if (@{$self->{'device_errors'}} or $self->{'config_denial_message'}) {
994         $result = $self->{'started_writing'}? 'PARTIAL' : 'FAILED';
995     } else {
996         $result = 'DONE';
997     }
998
999     my $dump_cb = $self->{'dump_cb'};
1000     my %dump_cb_args = (
1001         result => $result,
1002         device_errors => $self->{'device_errors'},
1003         config_denial_message => $self->{'config_denial_message'},
1004         size => $self->{'size'},
1005         duration => $self->{'duration'},
1006         total_duration => time - $self->{'dump_start_time'},
1007         nparts => $self->{'nparts'});
1008
1009     # reset everything and let the original caller know we're done
1010     $self->{'xfer'} = undef;
1011     $self->{'xdt'} = undef;
1012     $self->{'dump_header'} = undef;
1013     $self->{'dump_cb'} = undef;
1014     $self->{'size'} = 0;
1015     $self->{'duration'} = 0.0;
1016     $self->{'nparts'} = undef;
1017     $self->{'dump_start_time'} = undef;
1018     $self->{'device_errors'} = [];
1019     $self->{'config_denial_message'} = undef;
1020
1021     # and call the callback
1022     $dump_cb->(%dump_cb_args);
1023 }
1024
1025 # keyword parameters are utilities to the caller: either specify
1026 # device_error to add to the device_errors list or config_denial_message
1027 # to set the corresponding key in $self.
1028 sub _operation_failed {
1029     my $self = shift;
1030     my %params = @_;
1031
1032     my $error_message = $params{'device_error'}
1033                      || $params{'config_denial_message'}
1034                      || 'no reason';
1035     $self->dbg("operation failed: $error_message");
1036
1037     # tuck the message away as desired
1038     push @{$self->{'device_errors'}}, $params{'device_error'}
1039         if defined $params{'device_error'};
1040     $self->{'config_denial_message'} = $params{'config_denial_message'} 
1041         if $params{'config_denial_message'};
1042
1043     # cancelling the xdt will eventually cause an XMSG_DONE, which will notice
1044     # the error and set the result correctly; but if there's no xfer, then we
1045     # can just call _dump_done directly.
1046     if (defined $self->{'xfer'}) {
1047         $self->dbg("cancelling the transfer: $error_message");
1048
1049         $self->{'xfer'}->cancel();
1050     } else {
1051         if (defined $self->{'dump_cb'}) {
1052             # _dump_done constructs the dump_cb from $self parameters
1053             $self->_dump_done();
1054         } else {
1055             die "error with no callback to handle it: $error_message";
1056         }
1057     }
1058 }
1059
1060 # release the outstanding reservation, calling scribe_notif_tape_done
1061 # after the release
1062 sub _release_reservation {
1063     my $self = shift;
1064     my %params = @_;
1065     my @errors;
1066
1067     my ($label, $fm, $kb);
1068
1069     # if we've already written a volume, log it
1070     if ($self->{'device'} and defined $self->{'device'}->volume_label) {
1071         $label = $self->{'device'}->volume_label();
1072         $fm = $self->{'device'}->file();
1073         $kb = $self->{'device_size'} / 1024;
1074
1075         # log a message for amreport
1076         $self->{'feedback'}->scribe_notif_log_info(
1077             message => "tape $label kb $kb fm $fm [OK]");
1078     }
1079
1080     # finish the device if it isn't finished yet
1081     if ($self->{'device'}) {
1082         my $already_in_error = $self->{'device'}->status() != $DEVICE_STATUS_SUCCESS;
1083
1084         if (!$self->{'device'}->finish() && !$already_in_error) {
1085             push @errors, $self->{'device'}->error_or_status();
1086         }
1087     }
1088     $self->{'device'} = undef;
1089     $self->{'device_at_eom'} = 0;
1090
1091     $self->{'reservation'}->release(finished_cb => sub {
1092         my ($err) = @_;
1093         push @errors, "$err" if $err;
1094
1095         $self->{'reservation'} = undef;
1096
1097         # notify the feedback that we've finished and released a tape
1098         if ($label) {
1099             return $self->{'feedback'}->scribe_notif_tape_done(
1100                 volume_label => $label,
1101                 size => $kb * 1024,
1102                 num_files => $fm,
1103                 finished_cb => sub {
1104                  $params{'finished_cb'}->(@errors? join("; ", @errors) : undef);
1105                 });
1106         }
1107
1108         $params{'finished_cb'}->(@errors? join("; ", @errors) : undef);
1109     });
1110 }
1111
1112 # invoke the devhandling to get a new device, with all of the requisite
1113 # notifications and checks and whatnot.  On *success*, call _start_dump; on
1114 # failure, call other appropriate methods.
1115 sub _get_new_volume {
1116     my $self = shift;
1117
1118     # release first, if necessary
1119     if ($self->{'reservation'}) {
1120         $self->_release_reservation(finished_cb => sub {
1121             my ($error) = @_;
1122
1123             if ($error) {
1124                 $self->_operation_failed(device_error => $error);
1125             } else {
1126                 $self->_get_new_volume();
1127             }
1128         });
1129
1130         return;
1131     }
1132
1133     $self->{'devhandling'}->get_volume(volume_cb => sub { $self->_volume_cb(@_); });
1134 }
1135
1136 sub _volume_cb  {
1137     my $self = shift;
1138     my ($scan_error, $config_denial_message, $error_denial_message,
1139         $reservation, $new_label, $access_mode, $is_new, $new_scribe) = @_;
1140
1141     # note that we prefer the config_denial_message over the scan error.  If
1142     # both occurred, then the results of the scan are immaterial -- we
1143     # shouldn't have been looking for a new volume anyway.
1144
1145     if ($config_denial_message) {
1146         $self->_operation_failed(config_denial_message => $config_denial_message);
1147         return;
1148     }
1149
1150     if ($error_denial_message) {
1151         $self->_operation_failed(device_error => $error_denial_message);
1152         return;
1153     }
1154
1155     if ($new_scribe) {
1156         # Transfer the xfer to the new scribe
1157         $self->dbg("take scribe from");
1158
1159         $new_scribe->{'dump_cb'} = $self->{'dump_cb'};
1160         $new_scribe->{'dump_header'} = $self->{'dump_header'};
1161         $new_scribe->{'retry_part_on_peom'} = $self->{'retry_part_on_peom'};
1162         $new_scribe->{'allow_split'} = $self->{'allow_split'};
1163         $new_scribe->{'split_method'} = $self->{'split_method'};
1164         $new_scribe->{'xfer'} = $self->{'xfer'};
1165         $new_scribe->{'xdt'} = $self->{'xdt'};
1166         $new_scribe->{'xdt_ready'} = $self->{'xdt_ready'};
1167         $new_scribe->{'start_part_on_xdt_ready'} = $self->{'start_part_on_xdt_ready'};
1168         $new_scribe->{'size'} = $self->{'size'};
1169         $new_scribe->{'duration'} = $self->{'duration'};
1170         $new_scribe->{'dump_start_time'} = $self->{'dump_start_time'};
1171         $new_scribe->{'last_part_successful'} = $self->{'last_part_successful'};
1172         $new_scribe->{'started_writing'} = $self->{'started_writing'};
1173         $new_scribe->{'feedback'} = $self->{'feedback'};
1174         $new_scribe->{'devhandling'}->{'feedback'} = $self->{'feedback'};
1175         $self->{'dump_header'} = undef;
1176         $self->{'dump_cb'} = undef;
1177         $self->{'xfer'} = undef;
1178         $self->{'xdt'} = undef;
1179         $self->{'xdt_ready'} = undef;
1180         $self->{'dump_start_time'} = undef;
1181         $self->{'started_writing'} = 0;
1182         $self->{'feedback'} = undef;
1183         if (defined $new_scribe->{'device'}) {
1184             $new_scribe->{'xdt'}->use_device($new_scribe->{'device'});
1185         }
1186         # start it
1187         $new_scribe->_start_part();
1188
1189         return;
1190     }
1191
1192     if ($scan_error) {
1193         # we had permission to use a tape, but didn't find a tape, so we need
1194         # to notify of such
1195         $self->{'feedback'}->scribe_notif_new_tape(
1196             error => $scan_error,
1197             volume_label => undef);
1198
1199         $self->_operation_failed(device_error => $scan_error);
1200         return;
1201     }
1202
1203     $self->dbg("got new volume; writing new label");
1204
1205     # from here on, if an error occurs, we must send scribe_notif_new_tape, and look
1206     # for a new volume
1207     $self->{'reservation'} = $reservation;
1208     $self->{'device_size'} = 0;
1209     my $device = $self->{'device'} = $reservation->{'device'};
1210
1211     # turn on verbose logging now, if we need it
1212     if ($self->{'debug'}) {
1213         $reservation->{'device'}->property_set("verbose", 1);
1214     }
1215
1216     # read the label once, to get a "before" snapshot (see below)
1217     my $old_label;
1218     my $old_timestamp;
1219     if (!$is_new) {
1220         if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1221             && !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1222             $self->{'feedback'}->scribe_notif_new_tape(
1223                 error => "while reading label on new volume: " . $device->error_or_status(),
1224                 volume_label => undef);
1225
1226             return $self->_get_new_volume();
1227         }
1228         $old_label = $device->volume_label;
1229         $old_timestamp = $device->volume_time;
1230     }
1231
1232     # inform the xdt about this new device before starting it
1233     $self->{'xdt'}->use_device($device);
1234
1235     my $cbX = sub {};
1236     my $steps = define_steps
1237         cb_ref => \$cbX;
1238
1239     step device_start => sub {
1240         $self->_device_start($reservation, $access_mode, $new_label, $is_new,
1241                              $steps->{'device_started'});
1242     };
1243
1244     step device_started => sub {
1245         my $result = shift;
1246
1247         if ($result == 0) {
1248             # try reading the label to see whether we erased the tape
1249             my $erased = 0;
1250             CHECK_READ_LABEL: {
1251             # don't worry about erasing new tapes
1252                 if ($is_new) {
1253                     last CHECK_READ_LABEL;
1254                 }
1255
1256                 $device->finish();
1257                 $device->read_label();
1258
1259                 # does the device think something is broken now?
1260                 if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1261                     and !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1262                     $erased = 1;
1263                     last CHECK_READ_LABEL;
1264                 }
1265
1266                 # has the label changed?
1267                 my $vol_label = $device->volume_label;
1268                 if ((!defined $old_label and defined $vol_label)
1269                     or (defined $old_label and !defined $vol_label)
1270                     or (defined $old_label and $old_label ne $vol_label)) {
1271                     $erased = 1;
1272                     last CHECK_READ_LABEL;
1273                 }
1274
1275                 # has the timestamp changed?
1276                 my $vol_timestamp = $device->volume_time;
1277                 if ((!defined $old_timestamp and defined $vol_timestamp)
1278                     or (defined $old_timestamp and !defined $vol_timestamp)
1279                     or (defined $old_timestamp and $old_timestamp ne $vol_timestamp)) {
1280                     $erased = 1;
1281                     last CHECK_READ_LABEL;
1282                 }
1283             }
1284
1285             $self->{'feedback'}->scribe_notif_new_tape(
1286                 error => "while labeling new volume: " . $device->error_or_status(),
1287                 volume_label => $erased? $new_label : undef);
1288
1289             $self->_get_new_volume();
1290             return $cbX->();
1291         } elsif ($result != 1) {
1292             $self->{'feedback'}->scribe_notif_new_tape(
1293                 error => $result,
1294                 volume_label => undef);
1295             $self->_get_new_volume();
1296             return $cbX->();
1297         }
1298
1299         $new_label = $device->volume_label;
1300
1301         # success!
1302         $self->{'feedback'}->scribe_notif_new_tape(
1303             error => undef,
1304             volume_label => $new_label);
1305
1306         $self->{'reservation'}->set_label(label => $new_label,
1307             finished_cb => $steps->{'set_labelled'});
1308     };
1309
1310     step set_labelled => sub {
1311         my ($err) = @_;
1312         if ($err) {
1313             $self->{'feedback'}->scribe_notif_log_info(
1314                 message => "Error from set_label: $err");
1315             # fall through to start_part anyway...
1316         }
1317         $self->_start_part();
1318         return $cbX->();
1319     }
1320 }
1321
1322 # return 0 for device->start error
1323 # return 1 for success
1324 # return a message for others error
1325 sub _device_start {
1326     my $self = shift;
1327     my ($reservation, $access_mode, $new_label, $is_new, $finished_cb) = @_;
1328
1329     my $device = $reservation->{'device'};
1330     my $tl = $self->{'taperscan'}->{'tapelist'};
1331     my $meta;
1332
1333     if (!defined $tl) { # For Mock::Taperscan in installcheck
1334         if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1335             return $finished_cb->(0);
1336         } else {
1337             return $finished_cb->(1);
1338         }
1339     }
1340
1341     my $steps = define_steps
1342         cb_ref => \$finished_cb;
1343
1344     step setup => sub {
1345         return $reservation->get_meta_label(
1346                                 finished_cb => $steps->{'got_meta_label'});
1347     };
1348
1349     step got_meta_label => sub {
1350         my ($err, $meta) = @_;
1351
1352         if ($is_new) {
1353             # generate the new label and write it to the tapelist file
1354             $tl->reload(1);
1355             ($new_label, my $err) = $reservation->make_new_tape_label();
1356             if (!defined $new_label) {
1357                 $tl->unlock();
1358                 return $finished_cb->($err);
1359             }
1360             if (!$meta) {
1361                 ($meta, $err) = $reservation->make_new_meta_label();
1362                 if (defined $err) {
1363                     $tl->unlock();
1364                     return $finished_cb->($err);
1365                 }
1366             }
1367             $tl->add_tapelabel('0', $new_label, undef, $meta, $reservation->{'barcode'});
1368             $tl->write();
1369             $self->dbg("generate new label '$new_label'");
1370         } elsif (!defined $meta) {
1371             $tl->reload(0);
1372             my $tle = $tl->lookup_tapelabel($new_label);
1373             my $meta = $tle->{'meta'};
1374         }
1375
1376         # write the label to the device
1377         if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1378             if ($is_new) {
1379                 # remove the generated label from the tapelist file
1380                 $tl->reload(1);
1381                 $tl->remove_tapelabel($new_label);
1382                 $tl->write();
1383             }
1384             return $finished_cb->(0);
1385         }
1386
1387         # rewrite the tapelist file
1388         $tl->reload(1);
1389         my $tle = $tl->lookup_tapelabel($new_label);
1390         $meta = $tle->{'meta'} if !$meta && $tle->{'meta'};
1391         $tl->remove_tapelabel($new_label);
1392         $tl->add_tapelabel($self->{'write_timestamp'}, $new_label,
1393                            $tle? $tle->{'comment'} : undef, 1, $meta);
1394         $tl->write();
1395
1396         $reservation->set_meta_label(meta => $meta,
1397                                      finished_cb => $steps->{'set_meta_label'});
1398     };
1399
1400     step set_meta_label => sub {
1401         return $finished_cb->(1);
1402     }
1403 }
1404
1405 sub dbg {
1406     my ($self, $msg) = @_;
1407     if ($self->{'debug'}) {
1408         debug("Amanda::Taper::Scribe: $msg");
1409     }
1410 }
1411
1412 sub get_splitting_args_from_config {
1413     my %params = @_;
1414
1415     my %splitting_args;
1416
1417     $splitting_args{'allow_split'} = 0;
1418     # if dle_splitting is false, then we don't split - easy.
1419     if (defined $params{'dle_allow_split'} and !$params{'dle_allow_split'}) {
1420         return %splitting_args;
1421     }
1422
1423     # utility for below
1424     my $have_space = sub {
1425         my ($dirname, $part_size) = @_;
1426
1427         use Carp;
1428         my $fsusage = Amanda::Util::get_fs_usage($dirname);
1429         confess "$dirname" if (!$fsusage);
1430
1431         my $avail = $fsusage->{'blocksize'} * $fsusage->{'bavail'};
1432         if ($avail < $part_size) {
1433             Amanda::Debug::debug("disk cache has $avail bytes available on $dirname, but " .
1434                                  "needs $part_size");
1435             return 0;
1436         } else {
1437             return 1;
1438         }
1439     };
1440
1441     # first, handle the alternate spellings for part_size and part_cache_type
1442     $params{'part_size'} = $params{'part_size_kb'} * 1024
1443         if (defined $params{'part_size_kb'});
1444
1445     if (defined $params{'part_cache_type_enum'}) {
1446         $params{'part_cache_type'} = 'none'
1447             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_NONE);
1448         $params{'part_cache_type'} = 'memory'
1449             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_MEMORY);
1450         $params{'part_cache_type'} = 'disk'
1451             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_DISK);
1452
1453         $params{'part_cache_type'} = 'unknown'
1454             unless defined $params{'part_cache_type'};
1455     }
1456
1457     # if any of the dle_* parameters are set, use those to set the part_*
1458     # parameters, which are emptied out first.
1459     if (defined $params{'dle_tape_splitsize'} or
1460         defined $params{'dle_split_diskbuffer'} or
1461         defined $params{'dle_fallback_splitsize'}) {
1462
1463         $params{'part_size'} = $params{'dle_tape_splitsize'} || 0;
1464         $params{'part_cache_type'} = 'none';
1465         $params{'part_cache_dir'} = undef;
1466         $params{'part_cache_max_size'} = undef;
1467
1468         # part cache type is memory unless we have a split_diskbuffer that fits the bill
1469         if ($params{'part_size'}) {
1470             $splitting_args{'allow_split'} = 1;
1471             $params{'part_cache_type'} = 'memory';
1472             if (defined $params{'dle_split_diskbuffer'}
1473                     and -d $params{'dle_split_diskbuffer'}) {
1474                 if ($have_space->($params{'dle_split_diskbuffer'}, $params{'part_size'})) {
1475                     # disk cache checks out, so use it
1476                     $params{'part_cache_type'} = 'disk';
1477                     $params{'part_cache_dir'} = $params{'dle_split_diskbuffer'};
1478                 } else {
1479                     my $msg = "falling back to memory buffer for splitting: " .
1480                                 "insufficient space in disk cache directory";
1481                     $splitting_args{'warning'} = $msg;
1482                 }
1483             }
1484         }
1485
1486         if ($params{'part_cache_type'} eq 'memory') {
1487             # fall back to 10M if fallback size is not given
1488             $params{'part_cache_max_size'} = $params{'dle_fallback_splitsize'} || 10*1024*1024;
1489         }
1490     } else {
1491         my $ps = $params{'part_size'};
1492         my $pcms = $params{'part_cache_max_size'};
1493         $ps = $pcms if (!defined $ps or (defined $pcms and $pcms < $ps));
1494         $splitting_args{'allow_split'} = 1 if ((defined $ps and $ps > 0) or
1495                                                $params{'leom_supported'});
1496
1497         # fail back from 'disk' to 'none' if the disk isn't set up correctly
1498         if (defined $params{'part_cache_type'} and
1499                     $params{'part_cache_type'} eq 'disk') {
1500             my $warning;
1501             if (!$params{'part_cache_dir'}) {
1502                 $warning = "no part-cache-dir specified; "
1503                             . "using part cache type 'none'";
1504             } elsif (!-d $params{'part_cache_dir'}) {
1505                 $warning = "part-cache-dir '$params{part_cache_dir} "
1506                             . "does not exist; using part cache type 'none'";
1507             } elsif (!$have_space->($params{'part_cache_dir'}, $ps)) {
1508                 $warning = "part-cache-dir '$params{part_cache_dir} "
1509                             . "has insufficient space; using part cache type 'none'";
1510             }
1511
1512             if (defined $warning) {
1513                 $splitting_args{'warning'} = $warning;
1514                 $params{'part_cache_type'} = 'none';
1515                 delete $params{'part_cache_dir'};
1516             }
1517         }
1518     }
1519
1520     $splitting_args{'part_size'} = $params{'part_size'}
1521         if defined($params{'part_size'});
1522     $splitting_args{'part_cache_type'} = $params{'part_cache_type'}
1523         if defined($params{'part_cache_type'});
1524     $splitting_args{'part_cache_dir'} = $params{'part_cache_dir'}
1525         if defined($params{'part_cache_dir'});
1526     $splitting_args{'part_cache_max_size'} = $params{'part_cache_max_size'}
1527         if defined($params{'part_cache_max_size'});
1528
1529     return %splitting_args;
1530 }
1531 ##
1532 ## Feedback
1533 ##
1534
1535 package Amanda::Taper::Scribe::Feedback;
1536
1537 sub request_volume_permission {
1538     my $self = shift;
1539     my %params = @_;
1540
1541     # sure, you can have as many volumes as you want!
1542     $params{'perm_cb'}->(allow => 1);
1543 }
1544
1545 sub scribe_notif_new_tape { }
1546 sub scribe_notif_part_done { }
1547 sub scribe_notif_log_info { }
1548 sub scribe_notif_tape_done {
1549     my $self = shift;
1550     my %params = @_;
1551
1552     $params{'finished_cb'}->();
1553 }
1554
1555 ##
1556 ## Device Handling
1557 ##
1558
1559 package Amanda::Taper::Scribe::DevHandling;
1560 use Amanda::MainLoop;
1561 use Carp;
1562
1563 # This class handles scanning for volumes, requesting permission for those
1564 # volumes (the driver likes to feel like it's in control), and providing those
1565 # volumes to the scribe on request.  These can all happen independently, but
1566 # the scribe cannot begin writing to a volume until all three have finished.
1567 # That is: the scan is finished, the driver has given its permission, and the
1568 # scribe has requested a volume.
1569 #
1570 # On start, the class starts scanning immediately, even though the scribe has
1571 # not requested a volume.  Subsequently, a new scan does not begin until the
1572 # scribe requests a volume.
1573 #
1574 # This class is "private" to Amanda::Taper::Scribe, so it is documented in
1575 # comments, rather than POD.
1576
1577 # Create a new DevHandling object.  Params are taperscan and feedback.
1578 sub new {
1579     my $class = shift;
1580     my %params = @_;
1581
1582     my $self = {
1583         taperscan => $params{'taperscan'},
1584         feedback => $params{'feedback'},
1585
1586         # is a scan currently running, or completed?
1587         scan_running => 0,
1588         scan_finished => 0,
1589         scan_error => undef,
1590
1591         # scan results
1592         reservation => undef,
1593         device => undef,
1594         volume_label => undef,
1595
1596         # requests for permissiont to use a new volume
1597         request_pending => 0,
1598         request_complete => 0,
1599         request_denied => 0,
1600         config_denial_message => undef,
1601         error_denial_message => undef,
1602
1603         volume_cb => undef, # callback for get_volume
1604         start_finished_cb => undef, # callback for start
1605     };
1606
1607     return bless ($self, $class);
1608 }
1609
1610 ## public methods
1611
1612 # Called at scribe startup, this starts the instance off with a scan.
1613 sub start {
1614     my $self = shift;
1615     my %params = @_;
1616
1617     $self->{'start_finished_cb'} = $params{'finished_cb'};
1618     $self->_start_scanning();
1619 }
1620
1621 sub quit {
1622     my $self = shift;
1623     my %params = @_;
1624
1625     for my $rq_param qw(finished_cb) {
1626         croak "required parameter '$rq_param' mising"
1627             unless exists $params{$rq_param};
1628     }
1629
1630     # since there's little other option than to barrel on through the
1631     # quitting procedure, quit() just accumulates its error messages
1632     # and, if necessary, concantenates them for the finished_cb.
1633     my @errors;
1634
1635     my $cleanup_cb = make_cb(cleanup_cb => sub {
1636         my ($error) = @_;
1637         push @errors, $error if $error;
1638
1639         $error = join("; ", @errors) if @errors >= 1;
1640
1641         $params{'finished_cb'}->($error);
1642     });
1643
1644     if ($self->{'reservation'}) {
1645         if ($self->{'device'}) {
1646             if (!$self->{'device'}->finish()) {
1647                 push @errors, $self->{'device'}->error_or_status();
1648             }
1649         }
1650
1651         $self->{'reservation'}->release(finished_cb => $cleanup_cb);
1652     } else {
1653         $cleanup_cb->(undef);
1654     }
1655 }
1656
1657 # Get an open, started device and label to start writing to.  The
1658 # volume_callback takes the following arguments:
1659 #   $scan_error -- error message, or undef if no error occurred
1660 #   $config_denial_reason -- config-related reason request was denied, or undef
1661 #   $error_denial_reason -- error-related reason request was denied, or undef
1662 #   $reservation -- Amanda::Changer reservation
1663 #   $device -- open, started device
1664 # It is the responsibility of the caller to close the device and release the
1665 # reservation when finished.  If $scan_error or $request_denied_info are
1666 # defined, then $reservation and $device will be undef.
1667 sub get_volume {
1668     my $self = shift;
1669     my (%params) = @_;
1670
1671     die "already processing a volume request"
1672         if ($self->{'volume_cb'});
1673
1674     $self->{'volume_cb'} = $params{'volume_cb'};
1675
1676     # kick off the relevant processes, if they're not already running
1677     $self->_start_request();
1678
1679     $self->_maybe_callback();
1680 }
1681
1682 # take a peek at the device we have, for which permission has not yet been
1683 # granted.  This will be undefined before the taperscan completes AND after
1684 # the volume_cb has been called.
1685 sub peek_device {
1686     my $self = shift;
1687
1688     return $self->{'device'};
1689 }
1690
1691 sub start_scan {
1692     my $self = shift;
1693
1694     if (!$self->{'scan_running'} && !$self->{'reservation'}) {
1695         $self->_start_scanning();
1696     }
1697 }
1698
1699 ## private methods
1700
1701 sub _start_scanning {
1702     my $self = shift;
1703
1704     return if $self->{'scan_running'} or $self->{'scan_finished'};
1705
1706     $self->{'scan_running'} = 1;
1707
1708     my $_user_msg_fn = sub {
1709         my %params = @_;
1710         if (exists($params{'slot_result'})) {
1711             if ($params{'does_not_match_labelstr'}) {
1712                 $self->{'feedback'}->scribe_notif_log_info(
1713                     message => "Slot $params{'slot'} with label $params{'label'} do not match labelstr");
1714             } elsif ($params{'not_in_tapelist'}) {
1715                 $self->{'feedback'}->scribe_notif_log_info(
1716                     message => "Slot $params{'slot'} with label $params{'label'} is not in the tapelist");
1717             } elsif ($params{'active'}) {
1718                 $self->{'feedback'}->scribe_notif_log_info(
1719                     message => "Slot $params{'slot'} with label $params{'label'} is not reusable");
1720             } elsif ($params{'not_autolabel'}) {
1721                 if ($params{'label'}) {
1722                     $self->{'feedback'}->scribe_notif_log_info(
1723                         message => "Slot $params{'slot'} with label $params{'label'} is not labelable ");
1724                 } else {
1725                     $self->{'feedback'}->scribe_notif_log_info(
1726                         message => "Slot $params{'slot'} without label is not labelable ");
1727                 }
1728             } elsif ($params{'empty'}) {
1729                 $self->{'feedback'}->scribe_notif_log_info(
1730                     message => "Slot $params{'slot'} is empty");
1731             } elsif ($params{'non_amanda'}) {
1732                 $self->{'feedback'}->scribe_notif_log_info(
1733                     message => "Slot $params{'slot'} is a non-amanda volume");
1734             } elsif ($params{'volume_error'}) {
1735                 $self->{'feedback'}->scribe_notif_log_info(
1736                     message => "Slot $params{'slot'} is a volume in error: $params{'err'}");
1737             } elsif ($params{'not_success'}) {
1738                 $self->{'feedback'}->scribe_notif_log_info(
1739                     message => "Slot $params{'slot'} is a device in error: $params{'err'}");
1740             } elsif ($params{'err'}) {
1741                 $self->{'feedback'}->scribe_notif_log_info(
1742                     message => "$params{'err'}");
1743             } elsif ($params{'not_labelable'}) {
1744                 $self->{'feedback'}->scribe_notif_log_info(
1745                     message => "Slot $params{'slot'} without label can't be labeled");
1746             } elsif (!defined $params{'label'}) {
1747                 $self->{'feedback'}->scribe_notif_log_info(
1748                     message => "Slot $params{'slot'} without label can be labeled");
1749             } else {
1750                 $self->{'feedback'}->scribe_notif_log_info(
1751                     message => "Slot $params{'slot'} with label $params{'label'} is usable");
1752             }
1753         }
1754     };
1755
1756     $self->{'taperscan'}->scan(
1757       user_msg_fn => $_user_msg_fn,
1758       result_cb => sub {
1759         my ($error, $reservation, $volume_label, $access_mode, $is_new) = @_;
1760
1761         $self->{'scan_running'} = 0;
1762         $self->{'scan_finished'} = 1;
1763
1764         if ($error) {
1765             $self->{'scan_error'} = $error;
1766         } else {
1767             $self->{'reservation'} = $reservation;
1768             $self->{'device'} = $reservation->{'device'};
1769             $self->{'volume_label'} = $volume_label;
1770             $self->{'access_mode'} = $access_mode;
1771             $self->{'is_new'} = $is_new;
1772         }
1773
1774         $self->_maybe_callback();
1775     });
1776 }
1777
1778 sub _start_request {
1779     my $self = shift;
1780
1781     return if $self->{'request_pending'} or $self->{'request_complete'};
1782
1783     $self->{'request_pending'} = 1;
1784
1785     $self->{'feedback'}->request_volume_permission(
1786     perm_cb => sub {
1787         my %params = @_;
1788
1789         $self->{'request_pending'} = 0;
1790         $self->{'request_complete'} = 1;
1791         if (defined $params{'scribe'}) {
1792             $self->{'new_scribe'} = $params{'scribe'};
1793             $self->{'scan_finished'} = 1;
1794             $self->{'request_complete'} = 1;
1795         } elsif (defined $params{'cause'}) {
1796             $self->{'request_denied'} = 1;
1797             if ($params{'cause'} eq 'config') {
1798                 $self->{'config_denial_message'} = $params{'message'};
1799             } elsif ($params{'cause'} eq 'error') {
1800                 $self->{'error_denial_message'} = $params{'message'};
1801             } else {
1802                 die "bad cause '" . $params{'cause'} . "'";
1803             }
1804         } elsif (!defined $params{'allow'}) {
1805             die "no allow or cause defined";
1806         }
1807
1808         $self->_maybe_callback();
1809     });
1810 }
1811
1812 sub _maybe_callback {
1813     my $self = shift;
1814
1815     # if we have any kind of error, release the reservation and come back
1816     # later
1817     if (($self->{'scan_error'} or $self->{'request_denied'}) and $self->{'reservation'}) {
1818         $self->{'device'} = undef;
1819
1820         $self->{'reservation'}->release(finished_cb => sub {
1821             my ($error) = @_;
1822
1823             # so many errors, so little time..
1824             if ($error) {
1825                 if ($self->{'scan_error'}) {
1826                     warning("ignoring error releasing reservation ($error) after a scan error");
1827                 } else {
1828                     $self->{'scan_error'} = $error;
1829                 }
1830             }
1831
1832             $self->{'reservation'} = undef;
1833             $self->_maybe_callback();
1834         });
1835
1836         return;
1837     }
1838
1839     # if we are just starting up, call the finished_cb given to start()
1840     if (defined $self->{'start_finished_cb'} and $self->{'scan_finished'}) {
1841         my $cb = $self->{'start_finished_cb'};
1842         $self->{'start_finished_cb'} = undef;
1843
1844         $cb->($self->{'scan_error'});
1845     }
1846
1847     # if the volume_cb is good to get called, call it and reset to the ground state
1848     if ($self->{'volume_cb'} and (!$self->{'scan_running'} or $self->{'scan_finished'}) and $self->{'request_complete'}) {
1849         # get the cb and its arguments lined up before calling it..
1850         my $volume_cb = $self->{'volume_cb'};
1851         my @volume_cb_args = (
1852             $self->{'scan_error'},
1853             $self->{'config_denial_message'},
1854             $self->{'error_denial_message'},
1855             $self->{'reservation'},
1856             $self->{'volume_label'},
1857             $self->{'access_mode'},
1858             $self->{'is_new'},
1859             $self->{'new_scribe'},
1860         );
1861
1862         # reset everything and prepare for a new scan
1863         $self->{'scan_finished'} = 0;
1864
1865         $self->{'reservation'} = undef;
1866         $self->{'device'} = undef;
1867         $self->{'volume_label'} = undef;
1868
1869         $self->{'request_complete'} = 0;
1870         $self->{'request_denied'} = 0;
1871         $self->{'config_denial_message'} = undef;
1872         $self->{'error_denial_message'} = undef;
1873         $self->{'volume_cb'} = undef;
1874         $self->{'new_scribe'} = undef;
1875
1876         $volume_cb->(@volume_cb_args);
1877     }
1878 }
1879
1880 1;