Imported Upstream version 3.3.3
[debian/amanda] / perl / Amanda / Taper / Scribe.pm
1 # Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This library is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU Lesser General Public
5 #* License as published by the Free Software Foundation; either
6 # version 2.1 of the License, or (at your option) any later version.
7 #
8 # This library is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
11 # License for more details.
12 #
13 # You should have received a copy of the GNU Lesser General Public License
14 # along with this library; if not, write to the Free Software Foundation,
15 # Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
16 #
17 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19
20 =head1 NAME
21
22 Amanda::Taper::Scribe
23
24 =head1 SYNOPSIS
25
26   step start_scribe => sub {
27       my $scribe = Amanda::Taper::Scribe->new(
28             taperscan => $taperscan_algo,
29             feedback => $feedback_obj);
30     $scribe->start(
31         write_timestamp => $write_timestamp,
32         finished_cb => $steps->{'start_xfer'});
33   };
34
35   step start_xfer => sub {
36     my ($err) = @_;
37     my $xfer_dest = $scribe->get_xfer_dest(
38         allow_split => 1,
39         max_memory => 64 * 1024,
40         can_cache_inform => 0,
41         part_size => 150 * 1024**2,
42         part_cache_type => 'disk',
43         part_cache_dir => "$tmpdir/splitbuffer",
44         part_cache_max_size => 20 * 1024**2);
45     # .. set up the rest of the transfer ..
46     $xfer->start(sub {
47         my ($src, $msg, $xfer) = @_;
48         $scribe->handle_xmsg($src, $msg, $xfer);
49         # .. any other processing ..
50     };
51     # tell the scribe to start dumping via this transfer
52     $scribe->start_dump(
53         xfer => $xfer,
54         dump_header => $hdr,
55         dump_cb => $steps->{'dump_cb'});
56   };
57
58   step dump_cb => sub {
59       my %params = @_;
60       # .. handle dump results ..
61       print "DONE\n";
62       $finished_cb->();
63   };
64
65
66 =head1 OVERVIEW
67
68 This package provides a high-level abstraction of Amanda's procedure for
69 writing dumpfiles to tape.
70
71 Amanda writes a sequence of dumpfiles to a sequence of volumes.  The
72 volumes are supplied by a taperscan algorithm, which operates a changer
73 to find and load each volume.  As dumpfiles are written to volumes and
74 those volumes fill up, the taperscan algorithm supplies additional
75 volumes.
76
77 In order to reduce internal fragmentation within volumes, Amanda can "split"
78 dumpfiles into smaller pieces, so that the overall dumpfile can span multiple
79 volumes.  Each "part" is written to the volume in sequence.  If a device
80 encounters an error while writing a part, then that part is considered
81 "partial", and is rewritten from its beginning on the next volume.  Some
82 devices can reliably indicate that they are full (EOM), and for these devices
83 parts are simply truncated, and the Scribe starts the next part on the next
84 volume.
85
86 To facilitate rewriting parts on devices which cannot indicate EOM, Amanda must
87 retain all of the data in a part, even after that data is written to the
88 volume.  The Scribe provides several methods to support this: caching the part
89 in memory, caching the part in a special on-disk file, or relying on
90 pre-existing on-disk storage.  The latter method is used when reading from
91 holding disks.
92
93 The details of efficiently splitting dumpfiles and rewriting parts are handled
94 by the low-level C<Amanda::Xfer::Dest::Taper> subclasses.  The Scribe creates
95 an instance of the appropriate subclass and supplies it with volumes from an
96 C<Amanda::Taper::Scan> object.  It calls a number of
97 C<Amanda::Taper::Scribe::Feedback> methods to indicate the status of the dump
98 process and to request permission for each additional volume.
99
100 =head1 OPERATING A SCRIBE
101
102 The C<Amanda::Taper::Scribe> constructor takes two arguments:
103 C<taperscan> and C<feedback>.  The first specifies the taper scan
104 algorithm that the Scribe should use, and the second specifies the
105 C<Feedback> object that will receive notifications from the Scribe (see
106 below).
107
108   my $scribe = Amanda::Taper::Scribe->new(
109         taperscan => $my_taperscan,
110         feedback => $my_feedback);
111
112 Once the object is in place, call its C<start> method.
113
114 =head2 START THE SCRIBE
115
116 Start the scribe's operation by calling its C<start> method.  This will invoke
117 the taperscan algorithm and scan for a volume.  The method takes two parameters:
118
119   $scribe->start(
120         write_timestamp => $ts,
121         finished_cb => $start_finished_cb);
122
123 The timestamp will be written to each volume written by the Scribe.  The
124 C<finished_cb> will be called with a single argument - C<undef> or an error
125 message - when the Scribe is ready to start its first dump.  The Scribe is
126 "ready" when it has found a device to which it can write, although it does not
127 request permission to overwrite that volume, nor start overwriting it, until
128 the first dump begins (that is, until the first call to C<start_dump>).
129
130 =head2 SET UP A TRANSFER
131
132 Once the Scribe is started, begin transferring a dumpfile.  This is a
133 three-step process: first, get an C<Amanda::Xfer::Dest::Taper> object from the
134 Scribe, then start the transfer, and finally let the Scribe know that the
135 transfer has started.  Note that the Scribe supplies and manages the transfer
136 destination, but the transfer itself remains the responsibility of the caller.
137
138 =head3 Get device
139
140 Call C<get_device> to get the first device the xfer will be working with.
141
142   $device = $scribe->get_device();
143
144 This method must be called after C<start> has completed.
145
146 =head3 Check device compatibily for the data path
147
148 Call C<check_data_path>, supplying the data_path requested by the user.
149
150   if (my $err = $scribe->check_data_path($data_path)) {
151       # handle error message
152   }
153
154 This method must be called after C<start> has completed and before
155 C<get_xfer_dest> is called. It returns C<undef> on success or an error message
156 if the supplied C<data_path> is incompatible with the device.  This is mainly
157 used to detect when a DirectTCP dump is going to a non-DirectTCP device.
158
159 =head3 Get a Transfer Destination
160
161 Call C<get_xfer_dest> to get the transfer element, supplying information on how
162 the dump should be split:
163
164   $xdest = $scribe->get_xfer_dest(
165         allow_split => $allow_split,
166         max_memory => $max_memory,
167         # .. splitting parameters
168         );
169
170 This method must be called after C<start> has completed, and will always return
171 a transfer element immediately.  The underlying C<Amanda::Xfer::Dest::Taper>
172 handles device streaming properly.  It uses C<max_memory> bytes of memory for
173 this purpose.
174
175 The splitting parameters to C<get_xfer_dest> are:
176
177 =over 4
178
179 =item C<allow_split>
180
181 this dle is allowed or not to split
182
183 =item C<part_size>
184
185 the split part size to use, or 0 for no splitting
186
187 =item C<part_cache_type>
188
189 when caching, the kind of caching to perform ('disk', 'memory' or the default,
190 'none')
191
192 =item C<part_cache_dir>
193
194 the directory to use for disk caching
195
196 =item C<part_cache_max_size>
197
198 the maximum part size to use when caching
199
200 =item C<can_cache_inform>
201
202 true if the transfer source can call the destination's C<cache_inform> method
203 (e.g., C<Amanda::Xfer::Source::Holding>).
204
205 =back
206
207 The first four of these parameters correspond exactly to the eponymous tapetype
208 configuration parameters, and have the same default values (when omitted or
209 C<undef>).  The method will take this information, along with details of the
210 device it intends to use, and set up the transfer destination.
211
212 The utility function C<get_splitting_args_from_config> can determine the
213 appropriate C<get_xfer_dest> splitting parameters based on a
214 few Amanda configuration parameters.  If a parameter was not seen in the
215 configuration, it should be omitted or passed as C<undef>.  The function
216 returns a hash to pass to C<get_xfer_dest>, although that hash may have an
217 C<warning> key containing a message if there is a problem that the user
218 should know about.
219
220   use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
221   my %splitting_args = get_splitting_args_from_config(
222     # Amanda dumptype configuration parameters,
223     dle_allow_split => ..,
224     dle_tape_splitsize => ..,
225     dle_split_diskbuffer => ..,
226     dle_fallback_splitsize => ..,
227     dle_allow_split => ..,
228     # Amanda tapetype configuration parameters,
229     part_size => .., ## in bytes, not kb!!
230     part_size_kb => ..., ## or use this, in kb
231     part_cache_type => ..,
232     part_cache_type_enum => ..., ## one of the enums from tapetype_getconf
233     part_cache_dir => ..,
234     part_cache_max_size => ..,
235   );
236   if ($splitting_args{'error'}) { .. }
237
238 An C<Amanda::Taper::Scribe> object can only run one transfer at a time, so
239 do not call C<get_xfer_dest> until the C<dump_cb> for the previous C<start_dump>
240 has been called.
241
242 =head3 Start the Transfer
243
244 Armed with the element returned by C<get_xfer_dest>, the caller should create a
245 source element and a transfer object and start the transfer.  In order to
246 manage the splitting process, the Scribe needs to be informed, via its
247 C<handle_xmsg> method, of all transfer messages .  This is usually accomplished
248 with something like:
249
250   $xfer->start(sub {
251       my ($src, $msg, $xfer) = @_;
252       $scribe->handle_xmsg($src, $msg, $xfer);
253   });
254
255 =head3 Inform the Scribe
256
257 Once the transfer has started, the Scribe is ready to begin writing parts to
258 the volume.  This is the first moment at which the Scribe needs a header, too.
259 All of this is supplied to the C<start_dump> method:
260
261   $scribe->start_dump(
262       xfer => $xfer,
263       dump_header => $hdr,
264       dump_cb => $dump_cb);
265
266 The c<dump_header> here is the header that will be applied to all parts of the
267 dumpfile.  The only field in the header that the Scribe controls is the part
268 number.  The C<dump_cb> callback passed to C<start_dump> is called when the
269 dump is completely finished - either successfully or with a fatal error.
270 Unlike most callbacks, this one takes keyword arguments, since it has so many
271 parameters.
272
273   $dump_cb->(
274         result => $result,
275         device_errors => $device_errors,
276         config_denial_message => $cdm,
277         size => $size,
278         duration => $duration,
279         total_duration => $total_duration,
280         nparts => $nparts);
281
282 All parameters will be present on every call, although the order is not
283 guaranteed.
284
285 The C<result> is one of C<"FAILED">, C<"PARTIAL">, or C<"DONE">.  Even when
286 C<dump_cb> reports a fatal error, C<result> may be C<"PARTIAL"> if some data
287 was written successfully.
288
289 The C<device_error> key points to a list of errors, each given as a string,
290 that describe what went wrong to cause the dump to fail.  The
291 C<config_denial_message> parrots the reason provided by C<$perm_cb> (see below)
292 for denying use of a new tape if the cause was 'config', and is C<undef>
293 otherwise.
294
295 The final parameters, C<size> (in bytes), C<duration>, C<total_duration> (in
296 seconds), and C<nparts> describe the total transfer, and are a sum of all of
297 the parts written to the device.  Note that C<nparts> does not include any
298 empty trailing parts.  Note that C<duration> does not include time spent
299 operating the changer, while C<total_duration> reflects the time from the
300 C<start_dump> call to the invocation of the C<dump_cb>.
301
302 =head3 Cancelling a Dump
303
304 After you have requested a transfer destination, the scribe is poised to begin the
305 transfer.  If you cannot actually perform the transfer for some reason, you'll need
306 to go through the motions all the same, but cancel the operation immediately.  That
307 can be done by calling C<cancel_dump>:
308
309   $scribe->cancel_dump(
310         xfer => $xfer,
311         dump_cb => $dump_cb);
312
313 =head2 QUIT
314
315 When all of the dumpfiles are transferred, call the C<quit> method to
316 release any resources and clean up.  This method takes a typical
317 C<finished_cb>.
318
319   $scribe->quit(finished_cb => sub {
320     print "ALL DONE!\n";
321   });
322
323 =head2 GET_BYTES_WRITTEN
324
325 The C<get_bytes_written> returns the number of bytes written to the device at
326 the time of the call, and is meant to be used for status reporting.  This value
327 is updated at least as each part is finished; for some modes of operation, it
328 is updated continuously.  Notably, DirectTCP transfers do not update
329 continuously.
330
331 =head2 START_SCAN
332
333 The C<start_scan> method initiate a scan of the changer to find a usable tape.
334
335 =head1 FEEDBACK
336
337 The C<Amanda::Taper::Scribe::Feedback> class is intended to be
338 subclassed by the user.  It provides a number of notification methods
339 that enable the historical logging and driver/taper interactions
340 required by Amanda.  The parent class does nothing of interest, but
341 allows subclasses to omit methods they do not need.
342
343 The C<request_volume_permission> method provides a means for the caller
344 to limit the number of volumes the Scribe consumes.  It is called as
345
346   $fb->request_volume_permission(perm_cb => $cb);
347
348 The C<perm_cb> is a callback which expects a hash as arguments. If C<allow>
349 is set, then the scribe is allowed to use a new volume, if C<scribe> is set,
350 then the xfer must be transfered to that scribe, otherwise a C<cause>
351 and a C<message> describing why a new volume should not be used. must be
352 set. e.g.
353
354   perm_cb->(allow => 1);
355   perm_cb->(scribe => $new_scribe);
356   perm_cb->(cause => 'config', message => $message);
357   perm_cb->(cause => 'error', message => $message);
358
359 A cause of 'config' indicates that the denial is due to the user's
360 configuration, and thus should not be presented as an error.  The default
361 implementation always calls C<< perm_cb->() >>.
362
363 All of the remaining methods are notifications, and do not take a
364 callback.
365
366   $fb->scribe_notif_new_tape(
367         error => $error,
368         volume_label => $volume_label);
369
370 The Scribe calls C<scribe_notif_new_tape> when a new volume is started.  If the
371 C<volume_label> is undefined, then the volume was not successfully
372 relabled, and its previous contents may still be available.  If C<error>
373 is defined, then no useful data was written to the volume.  Note that
374 C<error> and C<volume_label> may I<both> be defined if the previous
375 contents of the volume were erased, but no useful, new data was written
376 to the volume.
377
378 This method will be called exactly once for every call to
379 C<request_volume_permission> that calls back with C<< perm_cb->() >>.
380
381   $fb->scribe_notif_tape_done(
382         volume_label => $volume_label,
383         size => $size,
384         num_files => $num_files);
385
386 The C<scribe_notif_tape_done> method is called after a volume is completely
387 written and its reservation has been released.  Note that the scribe waits
388 until the last possible moment to release a reservation, so this may be called
389 later than expected, e.g., during a C<quit> invocation.
390
391   $fb->scribe_notif_part_done(
392         partnum => $partnum,
393         fileno => $fileno,
394         successful => $successful,
395         size => $size,
396         duration => $duration);
397
398 The Scribe calls C<scribe_notif_part_done> for each part written to the volume,
399 including partial parts.  If the part was not written successfully, then
400 C<successful> is false.  The C<size> is in bytes, and the C<duration> is
401 a floating-point number of seconds.  If a part fails before a new device
402 file is created, then C<fileno> may be zero.
403
404 Finally, the Scribe sends a few historically significant trace log messages
405 via C<scribe_notif_log_info>:
406
407   $fb->scribe_notif_log_info(
408         message => $message);
409
410 A typical Feedback subclass might begin like this:
411
412   package main::Feedback;
413   use base 'Amanda::Taper::Scribe::Feedback';
414
415   sub request_volume_permission {
416     my $self = shift;
417     my %params = @_;
418
419     $params{'perm_cb'}->(cause => "error", message => "NO VOLUMES FOR YOU!");
420   }
421
422 =cut
423
424 package Amanda::Taper::Scribe;
425
426 use strict;
427 use warnings;
428 use Carp;
429
430 use Amanda::Xfer qw( :constants );
431 use Amanda::Device qw( :constants );
432 use Amanda::Header;
433 use Amanda::Debug qw( :logging );
434 use Amanda::MainLoop;
435 use Amanda::Tapelist;
436 use Amanda::Config qw( :getconf config_dir_relative );
437 use base qw( Exporter );
438
439 our @EXPORT_OK = qw( get_splitting_args_from_config );
440
441 sub new {
442     my $class = shift;
443     my %params = @_;
444
445     my $decide_debug = $Amanda::Config::debug_taper || $params{'debug'};
446     for my $rq_param (qw(taperscan feedback)) {
447         croak "required parameter '$rq_param' mising"
448             unless exists $params{$rq_param};
449     }
450
451     my $self = {
452         taperscan => $params{'taperscan'},
453         feedback => $params{'feedback'},
454         debug => $decide_debug,
455         eject_volume => $params{'eject_volume'},
456         write_timestamp => undef,
457         started => 0,
458
459         # device handling, and our current device and reservation
460         devhandling => Amanda::Taper::Scribe::DevHandling->new(
461             taperscan => $params{'taperscan'},
462             feedback => $params{'feedback'},
463         ),
464         reservation => undef,
465         device => undef,
466         device_size => undef,
467         device_at_eom => undef, # device still exists, but is full
468         close_volume => undef,
469
470         # callback passed to start_dump
471         dump_cb => undef,
472
473         # information for the current dumpfile
474         dump_header => undef,
475         retry_part_on_peom => undef,
476         allow_split => undef,
477         xfer => undef,
478         xdt => undef,
479         xdt_ready => undef,
480         start_part_on_xdt_ready => 0,
481         size => 0,
482         duration => 0.0,
483         dump_start_time => undef,
484         last_part_successful => 0,
485         started_writing => 0,
486         device_errors => [],
487         config_denial_message => undef,
488     };
489
490     return bless ($self, $class);
491 }
492
493 sub start {
494     my $self = shift;
495     my %params = @_;
496
497     for my $rq_param (qw(write_timestamp finished_cb)) {
498         croak "required parameter '$rq_param' missing"
499             unless exists $params{$rq_param};
500     }
501
502     confess "scribe already started" if $self->{'started'};
503
504     $self->dbg("starting");
505     $self->{'write_timestamp'} = $params{'write_timestamp'};
506
507     # start up the DevHandling object, making sure we know
508     # when it's done with its startup process
509     $self->{'devhandling'}->start(finished_cb => sub {
510         $self->{'started'} = 1;
511         $params{'finished_cb'}->(@_);
512     });
513 }
514
515 sub quit {
516     my $self = shift;
517     my %params = @_;
518
519     # since there's little other option than to barrel on through the
520     # quitting procedure, quit() just accumulates its error messages
521     # and, if necessary, concantenates them for the finished_cb.
522     my @errors;
523
524     my $steps = define_steps
525         cb_ref => \$params{'finished_cb'};
526
527     step setup => sub {
528         $self->dbg("quitting");
529
530         if ($self->{'xfer'}) {
531             confess "Scribe cannot quit while a transfer is active";
532             # Supporting this would be complicated:
533             # - cancel the xfer and wait for it to complete
534             # - ensure that the taperscan not be started afterward
535             # and isn't required for normal Amanda operation.
536         }
537
538         $steps->{'release'}->();
539     };
540
541     step release => sub {
542         if ($self->{'reservation'}) {
543             $self->_release_reservation(finished_cb => $steps->{'released'});
544         } else {
545             $steps->{'stop_devhandling'}->();
546         }
547     };
548
549     step released => sub {
550         my ($err) = @_;
551         push @errors, "$err" if $err;
552
553         $self->{'reservation'} = undef;
554
555         $steps->{'stop_devhandling'}->();
556     };
557
558     step stop_devhandling => sub {
559         $self->{'devhandling'}->quit(finished_cb => $steps->{'stopped_devhandling'});
560     };
561
562     step stopped_devhandling => sub {
563         my ($err) = @_;
564         push @errors, "$err" if $err;
565
566         my $errmsg = join("; ", @errors) if @errors >= 1;
567         $params{'finished_cb'}->($errmsg);
568     };
569 }
570
571 sub get_device {
572     my $self = shift;
573
574     # Can return a device we already have, or "peek" at the
575     # DevHandling object's device.
576     # It might not have right permission on the device.
577
578     my $device;
579     if (defined $self->{'device'}) {
580         $device = $self->{'device'};
581     } else {
582         $device = $self->{'devhandling'}->peek_device();
583     }
584     return $device;
585 }
586
587 sub check_data_path {
588     my $self = shift;
589     my $data_path = shift;
590
591     my $device = $self->get_device();
592
593     if (!defined $device) {
594         confess "no device is available to check the datapath";
595     }
596
597     my $use_directtcp = $device->directtcp_supported();
598
599     my $xdt;
600     if (!$use_directtcp) {
601         if ($data_path eq 'DIRECTTCP') {
602             return "Can't dump DIRECTTCP data-path dle to a device ('" .
603                    $device->device_name .
604                    "') that doesn't support it";
605         }
606     }
607     return undef;
608 }
609
610 sub start_scan {
611     my $self = shift;
612
613     $self->{'devhandling'}->start_scan();
614 }
615
616 # Get a transfer destination; does not use a callback
617 sub get_xfer_dest {
618     my $self = shift;
619     my %params = @_;
620
621     for my $rq_param (qw(max_memory)) {
622         croak "required parameter '$rq_param' missing"
623             unless exists $params{$rq_param};
624     }
625
626     confess "not yet started"
627         unless $self->{'write_timestamp'} and $self->{'started'};
628     confess "xfer element already returned"
629         if ($self->{'xdt'});
630     confess "xfer already running"
631         if ($self->{'xfer'});
632
633     $self->{'xfer'} = undef;
634     $self->{'xdt'} = undef;
635     $self->{'size'} = 0;
636     $self->{'duration'} = 0.0;
637     $self->{'nparts'} = undef;
638     $self->{'dump_start_time'} = undef;
639     $self->{'last_part_successful'} = 1;
640     $self->{'started_writing'} = 0;
641     $self->{'device_errors'} = [];
642     $self->{'config_denial_message'} = undef;
643
644     # set the callback
645     $self->{'dump_cb'} = undef;
646     $self->{'retry_part_on_peom'} = 1;
647     $self->{'allow_split'} = 0;
648     $self->{'start_part_on_xdt_ready'} = 0;
649
650     # start getting parameters together to determine what kind of splitting
651     # and caching we're going to do
652     my $part_size = $params{'part_size'} || 0;
653     my ($use_mem_cache, $disk_cache_dirname) = (0, undef);
654     my $can_cache_inform = $params{'can_cache_inform'};
655     my $part_cache_type = $params{'part_cache_type'} || 'none';
656     my $allow_split = $params{'allow_split'};
657
658     my $xdt_first_dev = $self->get_device();
659     if (!defined $xdt_first_dev) {
660         confess "no device is available to create an xfer_dest";
661     }
662     my $leom_supported = $xdt_first_dev->property_get("leom");
663     my $use_directtcp = $xdt_first_dev->directtcp_supported();
664
665     # figure out the destination type we'll use, based on the circumstances
666     my ($dest_type, $dest_text);
667     if ($use_directtcp) {
668         $dest_type = 'directtcp';
669         $dest_text = "using DirectTCP";
670     } elsif ($can_cache_inform && $leom_supported) {
671         $dest_type = 'splitter';
672         $dest_text = "using LEOM (falling back to holding disk as cache)";
673     } elsif ($leom_supported) {
674         $dest_type = 'splitter';
675         $dest_text = "using LEOM detection (no caching)";
676     } elsif ($can_cache_inform) {
677         $dest_type = 'splitter';
678         $dest_text = "using cache_inform";
679     } elsif ($part_cache_type ne 'none') {
680         $dest_type = 'cacher';
681
682         # we'll be caching, so apply the maximum size
683         my $part_cache_max_size = $params{'part_cache_max_size'} || 0;
684         $part_size = $part_cache_max_size
685             if ($part_cache_max_size and $part_cache_max_size < $part_size);
686
687         # and figure out what kind of caching to apply
688         if ($part_cache_type eq 'memory') {
689             $use_mem_cache = 1;
690         } else {
691             # note that we assume this has already been checked; if it's wrong,
692             # the xfer element will just fail immediately
693             $disk_cache_dirname = $params{'part_cache_dir'};
694         }
695         $dest_text = "using cache type '$part_cache_type'";
696     } else {
697         $dest_type = 'splitter';
698         $dest_text = "using no cache (PEOM will be fatal)";
699
700         # no directtcp, no caching, no cache_inform, and no LEOM, so a PEOM will be fatal
701         $self->{'retry_part_on_peom'} = 0;
702     }
703
704     if ($allow_split &&
705         ($can_cache_inform ||
706          !defined($part_cache_type) ||
707          $part_cache_type eq 'disk' ||
708          $part_cache_type eq 'memory' ||
709          $leom_supported)) {
710         $self->{'allow_split'} = 1;
711     } else {
712         $self->{'allow_split'} = 0;
713     }
714
715     $self->{'retry_part_on_peom'} = 0 if !$self->{'allow_split'};
716
717     debug("Amanda::Taper::Scribe preparing to write, part size $part_size, "
718         . "$dest_text ($dest_type) "
719         . ($leom_supported? " (LEOM supported)" : " (no LEOM)"));
720
721     # set the device to verbose logging if we're in debug mode
722     if ($self->{'debug'}) {
723         $xdt_first_dev->property_set("verbose", 1);
724     }
725
726     my $xdt;
727     if ($dest_type eq 'directtcp') {
728         $xdt = Amanda::Xfer::Dest::Taper::DirectTCP->new(
729             $xdt_first_dev, $part_size);
730         $self->{'xdt_ready'} = 0; # xdt isn't ready until we get XMSG_READY
731     } elsif ($dest_type eq 'splitter') {
732         $xdt = Amanda::Xfer::Dest::Taper::Splitter->new(
733             $xdt_first_dev, $params{'max_memory'}, $part_size, $can_cache_inform);
734         $self->{'xdt_ready'} = 1; # xdt is ready immediately
735     } else {
736         $xdt = Amanda::Xfer::Dest::Taper::Cacher->new(
737             $xdt_first_dev, $params{'max_memory'}, $part_size,
738             $use_mem_cache, $disk_cache_dirname);
739         $self->{'xdt_ready'} = 1; # xdt is ready immediately
740     }
741     $self->{'start_part_on_xdt_ready'} = 0;
742     $self->{'xdt'} = $xdt;
743
744     return $xdt;
745 }
746
747 sub start_dump {
748     my $self = shift;
749     my %params = @_;
750
751     confess "no xfer dest set up; call get_xfer_dest first"
752         unless defined $self->{'xdt'};
753
754     # get the header ready for writing (totalparts was set by the caller)
755     $self->{'dump_header'} = $params{'dump_header'};
756     $self->{'dump_header'}->{'partnum'} = 1;
757
758     # set up the dump_cb for when this dump is done, and keep the xfer
759     $self->{'dump_cb'} = $params{'dump_cb'};
760     $self->{'xfer'} = $params{'xfer'};
761     $self->{'dump_start_time'} = time;
762
763     # and start the part
764     $self->_start_part();
765 }
766
767 sub cancel_dump {
768     my $self = shift;
769     my %params = @_;
770
771     confess "no xfer dest set up; call get_xfer_dest first"
772         unless defined $self->{'xdt'};
773
774     # set up the dump_cb for when this dump is done, and keep the xfer
775     $self->{'dump_cb'} = $params{'dump_cb'};
776     $self->{'xfer'} = $params{'xfer'};
777
778     # The cancel will can dump_cb.
779
780     $self->{'xfer'}->cancel();
781
782 }
783
784 sub close_volume {
785     my $self = shift;
786
787     $self->{'close_volume'} = 1;
788 }
789
790 sub get_bytes_written {
791     my ($self) = @_;
792
793     if (defined $self->{'xdt'}) {
794         return $self->{'size'} + $self->{'xdt'}->get_part_bytes_written();
795     } else {
796         return $self->{'size'};
797     }
798 }
799
800 sub _start_part {
801     my $self = shift;
802
803     $self->dbg("trying to start part");
804
805     # if the xdt isn't ready yet, wait until it is; note that the XDT is still
806     # using the device right now, so we can't even label it yet.
807     if (!$self->{'xdt_ready'}) {
808         $self->dbg("XDT not ready yet; waiting until it is");
809         $self->{'start_part_on_xdt_ready'} = 1;
810         return
811     }
812
813     if ($self->{'close_volume'}) {
814         $self->{'close_volume'} = undef;
815         return $self->_get_new_volume();
816     }
817
818     # we need an actual, permitted device at this point, so if we don't have
819     # one, then defer this start_part call until we do.  The device may still
820     # exist, but be at EOM, if the last dump failed at EOM and was not retried
821     # on a new volume.
822     if (!$self->{'device'} or $self->{'device_at_eom'}) {
823         # _get_new_volume calls _start_part when it has a new volume in hand
824         return $self->_get_new_volume();
825     }
826
827     # if the dump wasn't successful, and we're not splitting, then bail out.  It's
828     # up to higher-level components to re-try this dump on a new volume, if desired.
829     # Note that this should be caught in the XMSG_PART_DONE handler -- this is just
830     # here for backup.
831     if (!$self->{'last_part_successful'} and !$self->{'retry_part_on_peom'}) {
832         $self->_operation_failed(device_error => "No space left on device (uncaught)");
833         return;
834     }
835
836     # and start writing this part
837     $self->{'started_writing'} = 1;
838     $self->dbg("resuming transfer");
839     $self->{'xdt'}->start_part(!$self->{'last_part_successful'},
840                                $self->{'dump_header'});
841 }
842
843 sub handle_xmsg {
844     my $self = shift;
845     my ($src, $msg, $xfer) = @_;
846
847     if ($msg->{'type'} == $XMSG_DONE) {
848         $self->_xmsg_done($src, $msg, $xfer);
849         return;
850     }
851
852     # for anything else we only pay attention to messages from
853     # our own element
854     if ($msg->{'elt'} == $self->{'xdt'}) {
855         $self->dbg("got msg from xfer dest: $msg");
856         if ($msg->{'type'} == $XMSG_PART_DONE) {
857             $self->_xmsg_part_done($src, $msg, $xfer);
858         } elsif ($msg->{'type'} == $XMSG_READY) {
859             $self->_xmsg_ready($src, $msg, $xfer);
860         } elsif ($msg->{'type'} == $XMSG_ERROR) {
861             $self->_xmsg_error($src, $msg, $xfer);
862         }
863     }
864 }
865
866 sub _xmsg_part_done {
867     my $self = shift;
868     my ($src, $msg, $xfer) = @_;
869
870     # this handles successful zero-byte parts as a special case - they
871     # are an implementation detail of the splitting done by the transfer
872     # destination.
873
874     if ($msg->{'successful'} and $msg->{'size'} == 0) {
875         $self->dbg("not notifying for empty, successful part");
876     } else {
877         # double-check partnum
878         confess "Part numbers do not match! $self->{'dump_header'}->{'partnum'} $msg->{'partnum'}"
879             unless ($self->{'dump_header'}->{'partnum'} == $msg->{'partnum'});
880
881         # notify
882         $self->{'feedback'}->scribe_notif_part_done(
883             partnum => $msg->{'partnum'},
884             fileno => $msg->{'fileno'},
885             successful => $msg->{'successful'},
886             size => $msg->{'size'},
887             duration => $msg->{'duration'});
888
889         # increment nparts here, so empty parts are not counted
890         $self->{'nparts'} = $msg->{'partnum'};
891     }
892
893     $self->{'last_part_successful'} = $msg->{'successful'};
894
895     if ($msg->{'successful'}) {
896         $self->{'device_size'} += $msg->{'size'};
897         $self->{'size'} += $msg->{'size'};
898         $self->{'duration'} += $msg->{'duration'};
899     }
900
901     if (!$msg->{'eof'}) {
902         # update the header for the next dumpfile, if this was a non-empty part
903         if ($msg->{'successful'} and $msg->{'size'} != 0) {
904             $self->{'dump_header'}->{'partnum'}++;
905         }
906
907         if ($msg->{'eom'}) {
908             # if there's an error finishing the device, it's probably just carryover
909             # from the error the Xfer::Dest::Taper encountered while writing to the
910             # device, so we ignore it.
911             if (!$self->{'device'}->finish()) {
912                 my $devname = $self->{'device'}->device_name;
913                 my $errmsg = $self->{'device'}->error_or_status();
914                 $self->dbg("ignoring error while finishing device '$devname': $errmsg");
915             }
916
917             # if the part failed..
918             if (!$msg->{'successful'} || !$self->{'allow_split'}) {
919                 # if no caching was going on, then the dump has failed
920                 if (!$self->{'retry_part_on_peom'}) {
921                     # mark this device as at EOM, since we are not going to look
922                     # for another one yet
923                     $self->{'device_at_eom'} = 1;
924
925                     my $msg = "No space left on device";
926                     if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
927                         $msg = $self->{'device'}->error_or_status();
928                     }
929                     $self->_operation_failed(device_error => "$msg, splitting not enabled");
930                     return;
931                 }
932
933                 # log a message for amreport
934                 $self->{'feedback'}->scribe_notif_log_info(
935                     message => "Will request retry of failed split part.");
936             }
937
938             # get a new volume, then go on to the next part
939             $self->_get_new_volume();
940         } else {
941             # if the part was unsuccessful, but the xfer dest has reason to believe
942             # this is not due to EOM, then the dump is done
943             if (!$msg->{'successful'}) {
944                 if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
945                     $msg = $self->{'device'}->error_or_status();
946                     $self->_operation_failed(device_error => $msg);
947                 } else {
948                     $self->_operation_failed();
949                 }
950                 return;
951             }
952
953             # no EOM -- go on to the next part
954             $self->_start_part();
955         }
956     }
957 }
958
959 sub _xmsg_ready {
960     my $self = shift;
961     my ($src, $msg, $xfer) = @_;
962
963     $self->dbg("XDT is ready");
964     $self->{'xdt_ready'} = 1;
965     if ($self->{'start_part_on_xdt_ready'}) {
966         $self->{'start_part_on_xdt_ready'} = 0;
967         $self->_start_part();
968     }
969 }
970
971 sub _xmsg_error {
972     my $self = shift;
973     my ($src, $msg, $xfer) = @_;
974
975     # XMSG_ERROR from the XDT is always fatal
976     $self->_operation_failed(device_error => $msg->{'message'});
977 }
978
979 sub _xmsg_done {
980     my $self = shift;
981     my ($src, $msg, $xfer) = @_;
982
983     if ($msg->{'type'} == $XMSG_DONE) {
984         $self->dbg("transfer is complete");
985         $self->_dump_done();
986     }
987 }
988
989 sub abort_setup {
990     my $self = shift;
991     my %params = @_;
992
993     $self->{'dump_cb'} = $params{'dump_cb'};
994     $self->_dump_done();
995 }
996
997 sub _dump_done {
998     my $self = shift;
999
1000     my $result;
1001
1002     # determine the correct final status - DONE if we're done, PARTIAL
1003     # if we've started writing to the volume, otherwise FAILED
1004     if (!$self->{'started_writing'}) {
1005         $result = 'FAILED';
1006     } elsif (@{$self->{'device_errors'}} or $self->{'config_denial_message'} or
1007              !$self->{'last_part_successful'}) {
1008         $result = 'PARTIAL';
1009     } else {
1010         $result = 'DONE';
1011     }
1012
1013     my $dump_cb = $self->{'dump_cb'};
1014
1015     return if !defined $dump_cb;
1016
1017     my %dump_cb_args = (
1018         result => $result,
1019         device_errors => $self->{'device_errors'},
1020         config_denial_message => $self->{'config_denial_message'},
1021         size => $self->{'size'},
1022         duration => $self->{'duration'},
1023         total_duration => time - $self->{'dump_start_time'},
1024         nparts => $self->{'nparts'});
1025
1026     # reset everything and let the original caller know we're done
1027     $self->{'xfer'} = undef;
1028     $self->{'xdt'} = undef;
1029     $self->{'dump_header'} = undef;
1030     $self->{'dump_cb'} = undef;
1031     $self->{'size'} = 0;
1032     $self->{'duration'} = 0.0;
1033     $self->{'nparts'} = undef;
1034     $self->{'dump_start_time'} = undef;
1035     $self->{'device_errors'} = [];
1036     $self->{'config_denial_message'} = undef;
1037
1038     # and call the callback
1039     $dump_cb->(%dump_cb_args);
1040 }
1041
1042 # keyword parameters are utilities to the caller: either specify
1043 # device_error to add to the device_errors list or config_denial_message
1044 # to set the corresponding key in $self.
1045 sub _operation_failed {
1046     my $self = shift;
1047     my %params = @_;
1048
1049     my $error_message = $params{'device_error'}
1050                      || $params{'config_denial_message'}
1051                      || 'input error';
1052     $self->dbg("operation failed: $error_message");
1053
1054     # tuck the message away as desired
1055     push @{$self->{'device_errors'}}, $params{'device_error'}
1056         if defined $params{'device_error'};
1057     $self->{'config_denial_message'} = $params{'config_denial_message'} 
1058         if $params{'config_denial_message'};
1059
1060     # cancelling the xdt will eventually cause an XMSG_DONE, which will notice
1061     # the error and set the result correctly; but if there's no xfer, then we
1062     # can just call _dump_done directly.
1063     if (defined $self->{'xfer'}) {
1064         $self->dbg("cancelling the transfer: $error_message");
1065
1066         $self->{'xfer'}->cancel();
1067     } else {
1068         if (defined $self->{'dump_cb'}) {
1069             # _dump_done constructs the dump_cb from $self parameters
1070             $self->_dump_done();
1071         }
1072     }
1073 }
1074
1075 # release the outstanding reservation, calling scribe_notif_tape_done
1076 # after the release
1077 sub _release_reservation {
1078     my $self = shift;
1079     my %params = @_;
1080     my @errors;
1081     my $do_eject = 0;
1082
1083     my ($label, $fm, $kb);
1084
1085     # if we've already written a volume, log it
1086     if ($self->{'device'} and defined $self->{'device'}->volume_label) {
1087         $do_eject = 1 if $self->{'eject_volume'};
1088         $label = $self->{'device'}->volume_label();
1089         $fm = $self->{'device'}->file();
1090         $kb = $self->{'device_size'} / 1024;
1091
1092         # log a message for amreport
1093         $self->{'feedback'}->scribe_notif_log_info(
1094             message => "tape $label kb $kb fm $fm [OK]");
1095     }
1096
1097     # finish the device if it isn't finished yet
1098     if ($self->{'device'}) {
1099         my $already_in_error = $self->{'device'}->status() != $DEVICE_STATUS_SUCCESS;
1100
1101         if (!$self->{'device'}->finish() && !$already_in_error) {
1102             push @errors, $self->{'device'}->error_or_status();
1103         }
1104     }
1105     $self->{'device'} = undef;
1106     $self->{'device_at_eom'} = 0;
1107
1108     $self->{'reservation'}->release(eject => $do_eject, finished_cb => sub {
1109         my ($err) = @_;
1110         push @errors, "$err" if $err;
1111
1112         $self->{'reservation'} = undef;
1113
1114         # notify the feedback that we've finished and released a tape
1115         if ($label) {
1116             return $self->{'feedback'}->scribe_notif_tape_done(
1117                 volume_label => $label,
1118                 size => $kb * 1024,
1119                 num_files => $fm,
1120                 finished_cb => sub {
1121                  $params{'finished_cb'}->(@errors? join("; ", @errors) : undef);
1122                 });
1123         }
1124
1125         $params{'finished_cb'}->(@errors? join("; ", @errors) : undef);
1126     });
1127 }
1128
1129 # invoke the devhandling to get a new device, with all of the requisite
1130 # notifications and checks and whatnot.  On *success*, call _start_dump; on
1131 # failure, call other appropriate methods.
1132 sub _get_new_volume {
1133     my $self = shift;
1134
1135     # release first, if necessary
1136     if ($self->{'reservation'}) {
1137         $self->_release_reservation(finished_cb => sub {
1138             my ($error) = @_;
1139
1140             if ($error) {
1141                 $self->_operation_failed(device_error => $error);
1142             } else {
1143                 $self->_get_new_volume();
1144             }
1145         });
1146
1147         return;
1148     }
1149
1150     $self->{'devhandling'}->get_volume(volume_cb => sub { $self->_volume_cb(@_); });
1151 }
1152
1153 sub _volume_cb  {
1154     my $self = shift;
1155     my ($scan_error, $config_denial_message, $error_denial_message,
1156         $reservation, $new_label, $access_mode, $is_new, $new_scribe) = @_;
1157
1158     # note that we prefer the config_denial_message over the scan error.  If
1159     # both occurred, then the results of the scan are immaterial -- we
1160     # shouldn't have been looking for a new volume anyway.
1161
1162     if ($config_denial_message) {
1163         $self->_operation_failed(config_denial_message => $config_denial_message);
1164         return;
1165     }
1166
1167     if ($error_denial_message) {
1168         $self->_operation_failed(device_error => $error_denial_message);
1169         return;
1170     }
1171
1172     if ($new_scribe) {
1173         # Transfer the xfer to the new scribe
1174         $self->dbg("take scribe from");
1175
1176         $new_scribe->{'dump_cb'} = $self->{'dump_cb'};
1177         $new_scribe->{'dump_header'} = $self->{'dump_header'};
1178         $new_scribe->{'retry_part_on_peom'} = $self->{'retry_part_on_peom'};
1179         $new_scribe->{'allow_split'} = $self->{'allow_split'};
1180         $new_scribe->{'split_method'} = $self->{'split_method'};
1181         $new_scribe->{'xfer'} = $self->{'xfer'};
1182         $new_scribe->{'xdt'} = $self->{'xdt'};
1183         $new_scribe->{'xdt_ready'} = $self->{'xdt_ready'};
1184         $new_scribe->{'start_part_on_xdt_ready'} = $self->{'start_part_on_xdt_ready'};
1185         $new_scribe->{'size'} = $self->{'size'};
1186         $new_scribe->{'duration'} = $self->{'duration'};
1187         $new_scribe->{'dump_start_time'} = $self->{'dump_start_time'};
1188         $new_scribe->{'last_part_successful'} = $self->{'last_part_successful'};
1189         $new_scribe->{'started_writing'} = $self->{'started_writing'};
1190         $new_scribe->{'feedback'} = $self->{'feedback'};
1191         $new_scribe->{'devhandling'}->{'feedback'} = $self->{'feedback'};
1192         $self->{'dump_header'} = undef;
1193         $self->{'dump_cb'} = undef;
1194         $self->{'xfer'} = undef;
1195         $self->{'xdt'} = undef;
1196         $self->{'xdt_ready'} = undef;
1197         $self->{'dump_start_time'} = undef;
1198         $self->{'started_writing'} = 0;
1199         $self->{'feedback'} = undef;
1200         if (defined $new_scribe->{'device'}) {
1201             $new_scribe->{'xdt'}->use_device($new_scribe->{'device'});
1202         }
1203         # start it
1204         $new_scribe->_start_part();
1205
1206         return;
1207     }
1208
1209     if ($scan_error) {
1210         # we had permission to use a tape, but didn't find a tape, so we need
1211         # to notify of such
1212         $self->{'feedback'}->scribe_notif_new_tape(
1213             error => $scan_error,
1214             volume_label => undef);
1215
1216         $self->_operation_failed(device_error => $scan_error);
1217         return;
1218     }
1219
1220     $self->dbg("got new volume; writing new label");
1221
1222     # from here on, if an error occurs, we must send scribe_notif_new_tape, and look
1223     # for a new volume
1224     $self->{'reservation'} = $reservation;
1225     $self->{'device_size'} = 0;
1226     my $device = $self->{'device'} = $reservation->{'device'};
1227
1228     # turn on verbose logging now, if we need it
1229     if ($self->{'debug'}) {
1230         $reservation->{'device'}->property_set("verbose", 1);
1231     }
1232
1233     # read the label once, to get a "before" snapshot (see below)
1234     my $old_label;
1235     my $old_timestamp;
1236     if (!$is_new) {
1237         if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1238             && !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1239             $self->{'feedback'}->scribe_notif_new_tape(
1240                 error => "while reading label on new volume: " . $device->error_or_status(),
1241                 volume_label => undef);
1242
1243             return $self->_get_new_volume();
1244         }
1245         $old_label = $device->volume_label;
1246         $old_timestamp = $device->volume_time;
1247     }
1248
1249     # inform the xdt about this new device before starting it
1250     $self->{'xdt'}->use_device($device);
1251
1252     my $cbX = sub {};
1253     my $steps = define_steps
1254         cb_ref => \$cbX;
1255
1256     step device_start => sub {
1257         $self->_device_start($reservation, $access_mode, $new_label, $is_new,
1258                              $steps->{'device_started'});
1259     };
1260
1261     step device_started => sub {
1262         my $result = shift;
1263
1264         if ($result =~ /\D/) {
1265             $self->{'feedback'}->scribe_notif_new_tape(
1266                 error => $result,
1267                 volume_label => undef);
1268             $self->_get_new_volume();
1269             return $cbX->();
1270         } elsif ($result == 0) {
1271             # try reading the label to see whether we erased the tape
1272             my $erased = 0;
1273             CHECK_READ_LABEL: {
1274             # don't worry about erasing new tapes
1275                 if ($is_new) {
1276                     last CHECK_READ_LABEL;
1277                 }
1278
1279                 $device->finish();
1280                 $device->read_label();
1281
1282                 # does the device think something is broken now?
1283                 if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1284                     and !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1285                     $erased = 1;
1286                     last CHECK_READ_LABEL;
1287                 }
1288
1289                 # has the label changed?
1290                 my $vol_label = $device->volume_label;
1291                 if ((!defined $old_label and defined $vol_label)
1292                     or (defined $old_label and !defined $vol_label)
1293                     or (defined $old_label and $old_label ne $vol_label)) {
1294                     $erased = 1;
1295                     last CHECK_READ_LABEL;
1296                 }
1297
1298                 # has the timestamp changed?
1299                 my $vol_timestamp = $device->volume_time;
1300                 if ((!defined $old_timestamp and defined $vol_timestamp)
1301                     or (defined $old_timestamp and !defined $vol_timestamp)
1302                     or (defined $old_timestamp and $old_timestamp ne $vol_timestamp)) {
1303                     $erased = 1;
1304                     last CHECK_READ_LABEL;
1305                 }
1306             }
1307
1308             $self->{'feedback'}->scribe_notif_new_tape(
1309                 error => "while labeling new volume: " . $device->error_or_status(),
1310                 volume_label => $erased? $new_label : undef);
1311
1312             $self->_get_new_volume();
1313             return $cbX->();
1314         }
1315
1316         $new_label = $device->volume_label;
1317
1318         # success!
1319         $self->{'feedback'}->scribe_notif_new_tape(
1320             error => undef,
1321             volume_label => $new_label);
1322
1323         $self->{'reservation'}->set_label(label => $new_label,
1324             finished_cb => $steps->{'set_labelled'});
1325     };
1326
1327     step set_labelled => sub {
1328         my ($err) = @_;
1329         if ($err) {
1330             $self->{'feedback'}->scribe_notif_log_info(
1331                 message => "Error from set_label: $err");
1332             # fall through to start_part anyway...
1333         }
1334         $self->_start_part();
1335         return $cbX->();
1336     }
1337 }
1338
1339 # return 0 for device->start error
1340 # return 1 for success
1341 # return a message for others error
1342 sub _device_start {
1343     my $self = shift;
1344     my ($reservation, $access_mode, $new_label, $is_new, $finished_cb) = @_;
1345
1346     my $device = $reservation->{'device'};
1347     my $tl = $self->{'taperscan'}->{'tapelist'};
1348     my $meta;
1349
1350     if (!defined $tl) { # For Mock::Taperscan in installcheck
1351         if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1352             return $finished_cb->(0);
1353         } else {
1354             return $finished_cb->(1);
1355         }
1356     }
1357
1358     my $steps = define_steps
1359         cb_ref => \$finished_cb;
1360
1361     step setup => sub {
1362         return $reservation->get_meta_label(
1363                                 finished_cb => $steps->{'got_meta_label'});
1364     };
1365
1366     step got_meta_label => sub {
1367         my ($err, $meta) = @_;
1368
1369         if ($is_new) {
1370             # generate the new label and write it to the tapelist file
1371             $tl->reload(1);
1372             if (!$meta) {
1373                 ($meta, $err) = $reservation->make_new_meta_label();
1374                 if (defined $err) {
1375                     $tl->unlock();
1376                     return $finished_cb->($err);
1377                 }
1378             }
1379             ($new_label, my $err) = $reservation->make_new_tape_label(
1380                                         meta => $meta);
1381             if (!defined $new_label) {
1382                 $tl->unlock();
1383                 return $finished_cb->($err);
1384             }
1385             $tl->add_tapelabel('0', $new_label, undef, 1, $meta,
1386                                $reservation->{'barcode'});
1387             $tl->write();
1388             $self->dbg("generate new label '$new_label'");
1389         } else {
1390             $tl->reload(0);
1391             my $tle = $tl->lookup_tapelabel($new_label);
1392             $meta = $tle->{'meta'} if !defined $meta && $tle->{'meta'};
1393             my $barcode = $tle->{'barcode'};
1394             if (defined $barcode and $barcode ne $reservation->{'barcode'}) {
1395                 return $finished_cb->("tapelist for label '$new_label' have barcode '$barcode' but changer report '" . $reservation->{'barcode'} . "'");
1396             }
1397         }
1398
1399         # write the label to the device
1400         if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1401             if ($is_new) {
1402                 # remove the generated label from the tapelist file
1403                 $tl->reload(1);
1404                 $tl->remove_tapelabel($new_label);
1405                 $tl->write();
1406             }
1407             return $finished_cb->(0);
1408         }
1409
1410         # rewrite the tapelist file
1411         $tl->reload(1);
1412         my $tle = $tl->lookup_tapelabel($new_label);
1413         $meta = $tle->{'meta'} if !$meta && $tle->{'meta'};
1414         $tl->remove_tapelabel($new_label);
1415         $tl->add_tapelabel($self->{'write_timestamp'}, $new_label,
1416                            $tle? $tle->{'comment'} : undef, 1, $meta,
1417                            $reservation->{'barcode'}, $device->block_size/1024);
1418         $tl->write();
1419
1420         $reservation->set_meta_label(meta => $meta,
1421                                      finished_cb => $steps->{'set_meta_label'});
1422     };
1423
1424     step set_meta_label => sub {
1425         return $finished_cb->(1);
1426     }
1427 }
1428
1429 sub dbg {
1430     my ($self, $msg) = @_;
1431     if ($self->{'debug'}) {
1432         debug("Amanda::Taper::Scribe: $msg");
1433     }
1434 }
1435
1436 sub get_splitting_args_from_config {
1437     my %params = @_;
1438
1439     my %splitting_args;
1440
1441     $splitting_args{'allow_split'} = 0;
1442     # if dle_splitting is false, then we don't split - easy.
1443     if (defined $params{'dle_allow_split'} and !$params{'dle_allow_split'}) {
1444         return %splitting_args;
1445     }
1446
1447     # utility for below
1448     my $have_space = sub {
1449         my ($dirname, $part_size) = @_;
1450
1451         use Carp;
1452         my $fsusage = Amanda::Util::get_fs_usage($dirname);
1453         confess "$dirname" if (!$fsusage);
1454
1455         my $avail = $fsusage->{'blocksize'} * $fsusage->{'bavail'};
1456         if ($avail < $part_size) {
1457             Amanda::Debug::debug("disk cache has $avail bytes available on $dirname, but " .
1458                                  "needs $part_size");
1459             return 0;
1460         } else {
1461             return 1;
1462         }
1463     };
1464
1465     # first, handle the alternate spellings for part_size and part_cache_type
1466     $params{'part_size'} = $params{'part_size_kb'} * 1024
1467         if (defined $params{'part_size_kb'});
1468
1469     if (defined $params{'part_cache_type_enum'}) {
1470         $params{'part_cache_type'} = 'none'
1471             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_NONE);
1472         $params{'part_cache_type'} = 'memory'
1473             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_MEMORY);
1474         $params{'part_cache_type'} = 'disk'
1475             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_DISK);
1476
1477         $params{'part_cache_type'} = 'unknown'
1478             unless defined $params{'part_cache_type'};
1479     }
1480
1481     if (defined $splitting_args{'data_path'} and
1482         $splitting_args{'data_path'} eq "DIRECTTCP") {
1483         my $ps = $params{'dle_tape_splitsize'};
1484         if (defined $ps and $ps > 0) {
1485             $params{'part_cache_max_size'} = undef
1486         } else {
1487             $ps = $params{'part_size'};
1488             my $pcms = $params{'part_cache_max_size'};
1489             $ps = $pcms if (!defined $ps or (defined $pcms and $pcms < $ps));
1490         }
1491         $splitting_args{'allow_split'} = 1 if ((defined $ps and $ps > 0) or
1492                                                $params{'leom_supported'});
1493         $params{'part_size'} = $ps;
1494         $params{'part_cache_type'} = 'none';
1495         $params{'part_cache_dir'} = undef;
1496     } elsif (defined $params{'dle_tape_splitsize'} or
1497              defined $params{'dle_split_diskbuffer'} or
1498              defined $params{'dle_fallback_splitsize'}) {
1499         # if any of the dle_* parameters are set, use those to set the part_*
1500         # parameters, which are emptied out first.
1501
1502         $params{'part_size'} = $params{'dle_tape_splitsize'} || 0;
1503         $params{'part_cache_type'} = 'none';
1504         $params{'part_cache_dir'} = undef;
1505         $params{'part_cache_max_size'} = undef;
1506
1507         # part cache type is memory unless we have a split_diskbuffer that fits the bill
1508         if ($params{'part_size'}) {
1509             $splitting_args{'allow_split'} = 1;
1510             $params{'part_cache_type'} = 'memory';
1511             if (defined $params{'dle_split_diskbuffer'}
1512                     and -d $params{'dle_split_diskbuffer'}) {
1513                 if ($have_space->($params{'dle_split_diskbuffer'}, $params{'part_size'})) {
1514                     # disk cache checks out, so use it
1515                     $params{'part_cache_type'} = 'disk';
1516                     $params{'part_cache_dir'} = $params{'dle_split_diskbuffer'};
1517                 } else {
1518                     my $msg = "falling back to memory buffer for splitting: " .
1519                                 "insufficient space in disk cache directory";
1520                     $splitting_args{'warning'} = $msg;
1521                 }
1522             }
1523         }
1524
1525         if ($params{'part_cache_type'} eq 'memory') {
1526             # fall back to 10M if fallback size is not given
1527             $params{'part_cache_max_size'} = $params{'dle_fallback_splitsize'} || 10*1024*1024;
1528         }
1529     } else {
1530         my $ps = $params{'part_size'};
1531         my $pcms = $params{'part_cache_max_size'};
1532         $ps = $pcms if (!defined $ps or (defined $pcms and $pcms < $ps));
1533         $splitting_args{'allow_split'} = 1 if ((defined $ps and $ps > 0) or
1534                                                $params{'leom_supported'});
1535
1536         # fail back from 'disk' to 'none' if the disk isn't set up correctly
1537         if (defined $params{'part_cache_type'} and
1538                     $params{'part_cache_type'} eq 'disk') {
1539             my $warning;
1540             if (!$params{'part_cache_dir'}) {
1541                 $warning = "no part-cache-dir specified; "
1542                             . "using part cache type 'none'";
1543             } elsif (!-d $params{'part_cache_dir'}) {
1544                 $warning = "part-cache-dir '$params{part_cache_dir} "
1545                             . "does not exist; using part cache type 'none'";
1546             } elsif (!$have_space->($params{'part_cache_dir'}, $ps)) {
1547                 $warning = "part-cache-dir '$params{part_cache_dir} "
1548                             . "has insufficient space; using part cache type 'none'";
1549             }
1550
1551             if (defined $warning) {
1552                 $splitting_args{'warning'} = $warning;
1553                 $params{'part_cache_type'} = 'none';
1554                 delete $params{'part_cache_dir'};
1555             }
1556         }
1557     }
1558
1559     $splitting_args{'part_size'} = $params{'part_size'}
1560         if defined($params{'part_size'});
1561     $splitting_args{'part_cache_type'} = $params{'part_cache_type'}
1562         if defined($params{'part_cache_type'});
1563     $splitting_args{'part_cache_dir'} = $params{'part_cache_dir'}
1564         if defined($params{'part_cache_dir'});
1565     $splitting_args{'part_cache_max_size'} = $params{'part_cache_max_size'}
1566         if defined($params{'part_cache_max_size'});
1567
1568     return %splitting_args;
1569 }
1570 ##
1571 ## Feedback
1572 ##
1573
1574 package Amanda::Taper::Scribe::Feedback;
1575
1576 sub request_volume_permission {
1577     my $self = shift;
1578     my %params = @_;
1579
1580     # sure, you can have as many volumes as you want!
1581     $params{'perm_cb'}->(allow => 1);
1582 }
1583
1584 sub scribe_notif_new_tape { }
1585 sub scribe_notif_part_done { }
1586 sub scribe_notif_log_info { }
1587 sub scribe_notif_tape_done {
1588     my $self = shift;
1589     my %params = @_;
1590
1591     $params{'finished_cb'}->();
1592 }
1593
1594 ##
1595 ## Device Handling
1596 ##
1597
1598 package Amanda::Taper::Scribe::DevHandling;
1599 use Amanda::MainLoop;
1600 use Carp;
1601 use Amanda::Debug qw( :logging );
1602
1603 # This class handles scanning for volumes, requesting permission for those
1604 # volumes (the driver likes to feel like it's in control), and providing those
1605 # volumes to the scribe on request.  These can all happen independently, but
1606 # the scribe cannot begin writing to a volume until all three have finished.
1607 # That is: the scan is finished, the driver has given its permission, and the
1608 # scribe has requested a volume.
1609 #
1610 # On start, the class starts scanning immediately, even though the scribe has
1611 # not requested a volume.  Subsequently, a new scan does not begin until the
1612 # scribe requests a volume.
1613 #
1614 # This class is "private" to Amanda::Taper::Scribe, so it is documented in
1615 # comments, rather than POD.
1616
1617 # Create a new DevHandling object.  Params are taperscan and feedback.
1618 sub new {
1619     my $class = shift;
1620     my %params = @_;
1621
1622     my $self = {
1623         taperscan => $params{'taperscan'},
1624         feedback => $params{'feedback'},
1625
1626         # is a scan currently running, or completed?
1627         scan_running => 0,
1628         scan_finished => 0,
1629         scan_error => undef,
1630
1631         # scan results
1632         reservation => undef,
1633         device => undef,
1634         volume_label => undef,
1635
1636         # requests for permissiont to use a new volume
1637         request_pending => 0,
1638         request_complete => 0,
1639         request_denied => 0,
1640         config_denial_message => undef,
1641         error_denial_message => undef,
1642
1643         volume_cb => undef, # callback for get_volume
1644         start_finished_cb => undef, # callback for start
1645     };
1646
1647     return bless ($self, $class);
1648 }
1649
1650 ## public methods
1651
1652 # Called at scribe startup, this starts the instance off with a scan.
1653 sub start {
1654     my $self = shift;
1655     my %params = @_;
1656
1657     $self->{'start_finished_cb'} = $params{'finished_cb'};
1658     $self->_start_scanning();
1659 }
1660
1661 sub quit {
1662     my $self = shift;
1663     my %params = @_;
1664
1665     for my $rq_param (qw(finished_cb)) {
1666         croak "required parameter '$rq_param' mising"
1667             unless exists $params{$rq_param};
1668     }
1669
1670     # since there's little other option than to barrel on through the
1671     # quitting procedure, quit() just accumulates its error messages
1672     # and, if necessary, concantenates them for the finished_cb.
1673     my @errors;
1674
1675     my $cleanup_cb = make_cb(cleanup_cb => sub {
1676         my ($error) = @_;
1677         push @errors, $error if $error;
1678
1679         $error = join("; ", @errors) if @errors >= 1;
1680
1681         $params{'finished_cb'}->($error);
1682     });
1683
1684     if ($self->{'reservation'}) {
1685         if ($self->{'device'}) {
1686             if (!$self->{'device'}->finish()) {
1687                 push @errors, $self->{'device'}->error_or_status();
1688             }
1689         }
1690
1691         $self->{'reservation'}->release(finished_cb => $cleanup_cb);
1692     } else {
1693         $cleanup_cb->(undef);
1694     }
1695 }
1696
1697 # Get an open, started device and label to start writing to.  The
1698 # volume_callback takes the following arguments:
1699 #   $scan_error -- error message, or undef if no error occurred
1700 #   $config_denial_reason -- config-related reason request was denied, or undef
1701 #   $error_denial_reason -- error-related reason request was denied, or undef
1702 #   $reservation -- Amanda::Changer reservation
1703 #   $device -- open, started device
1704 # It is the responsibility of the caller to close the device and release the
1705 # reservation when finished.  If $scan_error or $request_denied_info are
1706 # defined, then $reservation and $device will be undef.
1707 sub get_volume {
1708     my $self = shift;
1709     my (%params) = @_;
1710
1711     confess "already processing a volume request"
1712         if ($self->{'volume_cb'});
1713
1714     $self->{'volume_cb'} = $params{'volume_cb'};
1715
1716     # kick off the relevant processes, if they're not already running
1717     $self->_start_request();
1718
1719     $self->_maybe_callback();
1720 }
1721
1722 # take a peek at the device we have, for which permission has not yet been
1723 # granted.  This will be undefined before the taperscan completes AND after
1724 # the volume_cb has been called.
1725 sub peek_device {
1726     my $self = shift;
1727
1728     return $self->{'device'};
1729 }
1730
1731 sub start_scan {
1732     my $self = shift;
1733
1734     if (!$self->{'scan_running'} && !$self->{'reservation'}) {
1735         $self->_start_scanning();
1736     }
1737 }
1738
1739 ## private methods
1740
1741 sub _start_scanning {
1742     my $self = shift;
1743
1744     return if $self->{'scan_running'} or $self->{'scan_finished'};
1745
1746     $self->{'scan_running'} = 1;
1747
1748     my $_user_msg_fn = sub {
1749         my %params = @_;
1750         if (exists($params{'slot_result'})) {
1751             if ($params{'does_not_match_labelstr'}) {
1752                 $self->{'feedback'}->scribe_notif_log_info(
1753                     message => "Slot $params{'slot'} with label $params{'label'} do not match labelstr");
1754             } elsif ($params{'not_in_tapelist'}) {
1755                 $self->{'feedback'}->scribe_notif_log_info(
1756                     message => "Slot $params{'slot'} with label $params{'label'} is not in the tapelist");
1757             } elsif ($params{'active'}) {
1758                 $self->{'feedback'}->scribe_notif_log_info(
1759                     message => "Slot $params{'slot'} with label $params{'label'} is not reusable");
1760             } elsif ($params{'not_autolabel'}) {
1761                 if ($params{'label'}) {
1762                     $self->{'feedback'}->scribe_notif_log_info(
1763                         message => "Slot $params{'slot'} with label $params{'label'} is not labelable ");
1764                 } elsif ($params{'empty'}) {
1765                     $self->{'feedback'}->scribe_notif_log_info(
1766                         message => "Slot $params{'slot'} is empty, autolabel not set");
1767                 } elsif ($params{'non_amanda'}) {
1768                     $self->{'feedback'}->scribe_notif_log_info(
1769                         message => "Slot $params{'slot'} is a non-amanda volume, autolabel not set");
1770                 } elsif ($params{'volume_error'}) {
1771                     $self->{'feedback'}->scribe_notif_log_info(
1772                         message => "Slot $params{'slot'} is a volume in error: $params{'err'}, autolabel not set");
1773                 } elsif ($params{'not_success'}) {
1774                     $self->{'feedback'}->scribe_notif_log_info(
1775                         message => "Slot $params{'slot'} is a device in error: $params{'err'}, autolabel not set");
1776                 } elsif ($params{'err'}) {
1777                     $self->{'feedback'}->scribe_notif_log_info(
1778                         message => "$params{'err'}");
1779                 } else {
1780                     $self->{'feedback'}->scribe_notif_log_info(
1781                         message => "Slot $params{'slot'} without label is not labelable ");
1782                 }
1783             } elsif ($params{'empty'}) {
1784                 $self->{'feedback'}->scribe_notif_log_info(
1785                     message => "Slot $params{'slot'} is empty, autolabel disabled");
1786             } elsif ($params{'non_amanda'}) {
1787                 $self->{'feedback'}->scribe_notif_log_info(
1788                     message => "Slot $params{'slot'} is a non-amanda volume, autolabel disabled");
1789             } elsif ($params{'volume_error'}) {
1790                 $self->{'feedback'}->scribe_notif_log_info(
1791                     message => "Slot $params{'slot'} is a volume in error: $params{'err'}, autolabel disabled");
1792             } elsif ($params{'not_success'}) {
1793                 $self->{'feedback'}->scribe_notif_log_info(
1794                     message => "Slot $params{'slot'} is a device in error: $params{'err'}, autolabel disabled");
1795             } elsif ($params{'err'}) {
1796                 $self->{'feedback'}->scribe_notif_log_info(
1797                     message => "$params{'err'}");
1798             } elsif ($params{'not_labelable'}) {
1799                 $self->{'feedback'}->scribe_notif_log_info(
1800                     message => "Slot $params{'slot'} without label can't be labeled");
1801             } elsif (!defined $params{'label'}) {
1802                 $self->{'feedback'}->scribe_notif_log_info(
1803                     message => "Slot $params{'slot'} without label can be labeled");
1804             } elsif ($params{'relabeled'}) {
1805                 $self->{'feedback'}->scribe_notif_log_info(
1806                     message => "Slot $params{'slot'} with label $params{'label'} will be relabeled");
1807             } else {
1808                 $self->{'feedback'}->scribe_notif_log_info(
1809                     message => "Slot $params{'slot'} with label $params{'label'} is usable");
1810             }
1811         }
1812     };
1813
1814     $self->{'taperscan'}->scan(
1815       user_msg_fn => $_user_msg_fn,
1816       result_cb => sub {
1817         my ($error, $reservation, $volume_label, $access_mode, $is_new) = @_;
1818
1819         $self->{'scan_running'} = 0;
1820         $self->{'scan_finished'} = 1;
1821
1822         $self->{'scan_error'} = $error;
1823         $self->{'reservation'} = $reservation;
1824         $self->{'device'} = $reservation->{'device'} if $reservation;
1825         $self->{'volume_label'} = $volume_label;
1826         $self->{'access_mode'} = $access_mode;
1827         $self->{'is_new'} = $is_new;
1828
1829         $self->_maybe_callback();
1830     });
1831 }
1832
1833 sub _start_request {
1834     my $self = shift;
1835
1836     return if $self->{'request_pending'} or $self->{'request_complete'};
1837
1838     $self->{'request_pending'} = 1;
1839
1840     $self->{'feedback'}->request_volume_permission(
1841     perm_cb => sub {
1842         my %params = @_;
1843
1844         $self->{'request_pending'} = 0;
1845         $self->{'request_complete'} = 1;
1846         if (defined $params{'scribe'}) {
1847             $self->{'new_scribe'} = $params{'scribe'};
1848             $self->{'scan_finished'} = 1;
1849             $self->{'request_complete'} = 1;
1850         } elsif (defined $params{'cause'}) {
1851             $self->{'request_denied'} = 1;
1852             if ($params{'cause'} eq 'config') {
1853                 $self->{'config_denial_message'} = $params{'message'};
1854             } elsif ($params{'cause'} eq 'error') {
1855                 $self->{'error_denial_message'} = $params{'message'};
1856             } else {
1857                 confess "bad cause '" . $params{'cause'} . "'";
1858             }
1859         } elsif (!defined $params{'allow'}) {
1860             confess "no allow or cause defined";
1861         }
1862
1863         $self->_maybe_callback();
1864     });
1865 }
1866
1867 sub _maybe_callback {
1868     my $self = shift;
1869
1870     # if we have any kind of error, release the reservation and come back
1871     # later
1872     if (($self->{'scan_error'} or $self->{'request_denied'}) and $self->{'reservation'}) {
1873         $self->{'device'} = undef;
1874
1875         $self->{'reservation'}->release(finished_cb => sub {
1876             my ($error) = @_;
1877
1878             # so many errors, so little time..
1879             if ($error) {
1880                 if ($self->{'scan_error'}) {
1881                     warning("ignoring error releasing reservation ($error) after a scan error");
1882                 } else {
1883                     $self->{'scan_error'} = $error;
1884                 }
1885             }
1886
1887             $self->{'reservation'} = undef;
1888             $self->_maybe_callback();
1889         });
1890
1891         return;
1892     }
1893
1894     # if we are just starting up, call the finished_cb given to start()
1895     if (defined $self->{'start_finished_cb'} and $self->{'scan_finished'}) {
1896         my $cb = $self->{'start_finished_cb'};
1897         $self->{'start_finished_cb'} = undef;
1898
1899         $cb->($self->{'scan_error'});
1900     }
1901
1902     # if the volume_cb is good to get called, call it and reset to the ground state
1903     if ($self->{'volume_cb'} and (!$self->{'scan_running'} or $self->{'scan_finished'}) and $self->{'request_complete'}) {
1904         # get the cb and its arguments lined up before calling it..
1905         my $volume_cb = $self->{'volume_cb'};
1906         my @volume_cb_args = (
1907             $self->{'scan_error'},
1908             $self->{'config_denial_message'},
1909             $self->{'error_denial_message'},
1910             $self->{'reservation'},
1911             $self->{'volume_label'},
1912             $self->{'access_mode'},
1913             $self->{'is_new'},
1914             $self->{'new_scribe'},
1915         );
1916
1917         # reset everything and prepare for a new scan
1918         $self->{'scan_finished'} = 0;
1919
1920         $self->{'reservation'} = undef;
1921         $self->{'device'} = undef;
1922         $self->{'volume_label'} = undef;
1923
1924         $self->{'request_complete'} = 0;
1925         $self->{'request_denied'} = 0;
1926         $self->{'config_denial_message'} = undef;
1927         $self->{'error_denial_message'} = undef;
1928         $self->{'volume_cb'} = undef;
1929         $self->{'new_scribe'} = undef;
1930
1931         $volume_cb->(@volume_cb_args);
1932     }
1933 }
1934
1935 1;