d135e3a8ad89ab787293c738fa3a40360a630ec9
[debian/amanda] / perl / Amanda / Taper / Scribe.pm
1 # Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This library is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU Lesser General Public License version 2.1 as
5 # published by the Free Software Foundation.
6 #
7 # This library is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
10 # License for more details.
11 #
12 # You should have received a copy of the GNU Lesser General Public License
13 # along with this library; if not, write to the Free Software Foundation,
14 # Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
15 #
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 =head1 NAME
20
21 Amanda::Taper::Scribe
22
23 =head1 SYNOPSIS
24
25   step start_scribe => sub {
26       my $scribe = Amanda::Taper::Scribe->new(
27             taperscan => $taperscan_algo,
28             feedback => $feedback_obj);
29     $scribe->start(
30         write_timestamp => $write_timestamp,
31         finished_cb => $steps->{'start_xfer'});
32   };
33
34   step start_xfer => sub {
35     my ($err) = @_;
36     my $xfer_dest = $scribe->get_xfer_dest(
37         allow_split => 1,
38         max_memory => 64 * 1024,
39         can_cache_inform => 0,
40         part_size => 150 * 1024**2,
41         part_cache_type => 'disk',
42         part_cache_dir => "$tmpdir/splitbuffer",
43         part_cache_max_size => 20 * 1024**2);
44     # .. set up the rest of the transfer ..
45     $xfer->start(sub {
46         my ($src, $msg, $xfer) = @_;
47         $scribe->handle_xmsg($src, $msg, $xfer);
48         # .. any other processing ..
49     };
50     # tell the scribe to start dumping via this transfer
51     $scribe->start_dump(
52         xfer => $xfer,
53         dump_header => $hdr,
54         dump_cb => $steps->{'dump_cb'});
55   };
56
57   step dump_cb => sub {
58       my %params = @_;
59       # .. handle dump results ..
60       print "DONE\n";
61       $finished_cb->();
62   };
63
64
65 =head1 OVERVIEW
66
67 This package provides a high-level abstraction of Amanda's procedure for
68 writing dumpfiles to tape.
69
70 Amanda writes a sequence of dumpfiles to a sequence of volumes.  The
71 volumes are supplied by a taperscan algorithm, which operates a changer
72 to find and load each volume.  As dumpfiles are written to volumes and
73 those volumes fill up, the taperscan algorithm supplies additional
74 volumes.
75
76 In order to reduce internal fragmentation within volumes, Amanda can "split"
77 dumpfiles into smaller pieces, so that the overall dumpfile can span multiple
78 volumes.  Each "part" is written to the volume in sequence.  If a device
79 encounters an error while writing a part, then that part is considered
80 "partial", and is rewritten from its beginning on the next volume.  Some
81 devices can reliably indicate that they are full (EOM), and for these devices
82 parts are simply truncated, and the Scribe starts the next part on the next
83 volume.
84
85 To facilitate rewriting parts on devices which cannot indicate EOM, Amanda must
86 retain all of the data in a part, even after that data is written to the
87 volume.  The Scribe provides several methods to support this: caching the part
88 in memory, caching the part in a special on-disk file, or relying on
89 pre-existing on-disk storage.  The latter method is used when reading from
90 holding disks.
91
92 The details of efficiently splitting dumpfiles and rewriting parts are handled
93 by the low-level C<Amanda::Xfer::Dest::Taper> subclasses.  The Scribe creates
94 an instance of the appropriate subclass and supplies it with volumes from an
95 C<Amanda::Taper::Scan> object.  It calls a number of
96 C<Amanda::Taper::Scribe::Feedback> methods to indicate the status of the dump
97 process and to request permission for each additional volume.
98
99 =head1 OPERATING A SCRIBE
100
101 The C<Amanda::Taper::Scribe> constructor takes two arguments:
102 C<taperscan> and C<feedback>.  The first specifies the taper scan
103 algorithm that the Scribe should use, and the second specifies the
104 C<Feedback> object that will receive notifications from the Scribe (see
105 below).
106
107   my $scribe = Amanda::Taper::Scribe->new(
108         taperscan => $my_taperscan,
109         feedback => $my_feedback);
110
111 Once the object is in place, call its C<start> method.
112
113 =head2 START THE SCRIBE
114
115 Start the scribe's operation by calling its C<start> method.  This will invoke
116 the taperscan algorithm and scan for a volume.  The method takes two parameters:
117
118   $scribe->start(
119         write_timestamp => $ts,
120         finished_cb => $start_finished_cb);
121
122 The timestamp will be written to each volume written by the Scribe.  The
123 C<finished_cb> will be called with a single argument - C<undef> or an error
124 message - when the Scribe is ready to start its first dump.  The Scribe is
125 "ready" when it has found a device to which it can write, although it does not
126 request permission to overwrite that volume, nor start overwriting it, until
127 the first dump begins (that is, until the first call to C<start_dump>).
128
129 =head2 SET UP A TRANSFER
130
131 Once the Scribe is started, begin transferring a dumpfile.  This is a
132 three-step process: first, get an C<Amanda::Xfer::Dest::Taper> object from the
133 Scribe, then start the transfer, and finally let the Scribe know that the
134 transfer has started.  Note that the Scribe supplies and manages the transfer
135 destination, but the transfer itself remains the responsibility of the caller.
136
137 =head3 Get device
138
139 Call C<get_device> to get the first device the xfer will be working with.
140
141   $device = $scribe->get_device();
142
143 This method must be called after C<start> has completed.
144
145 =head3 Check device compatibily for the data path
146
147 Call C<check_data_path>, supplying the data_path requested by the user.
148
149   if (my $err = $scribe->check_data_path($data_path)) {
150       # handle error message
151   }
152
153 This method must be called after C<start> has completed and before
154 C<get_xfer_dest> is called. It returns C<undef> on success or an error message
155 if the supplied C<data_path> is incompatible with the device.  This is mainly
156 used to detect when a DirectTCP dump is going to a non-DirectTCP device.
157
158 =head3 Get a Transfer Destination
159
160 Call C<get_xfer_dest> to get the transfer element, supplying information on how
161 the dump should be split:
162
163   $xdest = $scribe->get_xfer_dest(
164         allow_split => $allow_split,
165         max_memory => $max_memory,
166         # .. splitting parameters
167         );
168
169 This method must be called after C<start> has completed, and will always return
170 a transfer element immediately.  The underlying C<Amanda::Xfer::Dest::Taper>
171 handles device streaming properly.  It uses C<max_memory> bytes of memory for
172 this purpose.
173
174 The splitting parameters to C<get_xfer_dest> are:
175
176 =over 4
177
178 =item C<allow_split>
179
180 this dle is allowed or not to split
181
182 =item C<part_size>
183
184 the split part size to use, or 0 for no splitting
185
186 =item C<part_cache_type>
187
188 when caching, the kind of caching to perform ('disk', 'memory' or the default,
189 'none')
190
191 =item C<part_cache_dir>
192
193 the directory to use for disk caching
194
195 =item C<part_cache_max_size>
196
197 the maximum part size to use when caching
198
199 =item C<can_cache_inform>
200
201 true if the transfer source can call the destination's C<cache_inform> method
202 (e.g., C<Amanda::Xfer::Source::Holding>).
203
204 =back
205
206 The first four of these parameters correspond exactly to the eponymous tapetype
207 configuration parameters, and have the same default values (when omitted or
208 C<undef>).  The method will take this information, along with details of the
209 device it intends to use, and set up the transfer destination.
210
211 The utility function C<get_splitting_args_from_config> can determine the
212 appropriate C<get_xfer_dest> splitting parameters based on a
213 few Amanda configuration parameters.  If a parameter was not seen in the
214 configuration, it should be omitted or passed as C<undef>.  The function
215 returns a hash to pass to C<get_xfer_dest>, although that hash may have an
216 C<warning> key containing a message if there is a problem that the user
217 should know about.
218
219   use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
220   my %splitting_args = get_splitting_args_from_config(
221     # Amanda dumptype configuration parameters,
222     dle_allow_split => ..,
223     dle_tape_splitsize => ..,
224     dle_split_diskbuffer => ..,
225     dle_fallback_splitsize => ..,
226     dle_allow_split => ..,
227     # Amanda tapetype configuration parameters,
228     part_size => .., ## in bytes, not kb!!
229     part_size_kb => ..., ## or use this, in kb
230     part_cache_type => ..,
231     part_cache_type_enum => ..., ## one of the enums from tapetype_getconf
232     part_cache_dir => ..,
233     part_cache_max_size => ..,
234   );
235   if ($splitting_args{'error'}) { .. }
236
237 An C<Amanda::Taper::Scribe> object can only run one transfer at a time, so
238 do not call C<get_xfer_dest> until the C<dump_cb> for the previous C<start_dump>
239 has been called.
240
241 =head3 Start the Transfer
242
243 Armed with the element returned by C<get_xfer_dest>, the caller should create a
244 source element and a transfer object and start the transfer.  In order to
245 manage the splitting process, the Scribe needs to be informed, via its
246 C<handle_xmsg> method, of all transfer messages .  This is usually accomplished
247 with something like:
248
249   $xfer->start(sub {
250       my ($src, $msg, $xfer) = @_;
251       $scribe->handle_xmsg($src, $msg, $xfer);
252   });
253
254 =head3 Inform the Scribe
255
256 Once the transfer has started, the Scribe is ready to begin writing parts to
257 the volume.  This is the first moment at which the Scribe needs a header, too.
258 All of this is supplied to the C<start_dump> method:
259
260   $scribe->start_dump(
261       xfer => $xfer,
262       dump_header => $hdr,
263       dump_cb => $dump_cb);
264
265 The c<dump_header> here is the header that will be applied to all parts of the
266 dumpfile.  The only field in the header that the Scribe controls is the part
267 number.  The C<dump_cb> callback passed to C<start_dump> is called when the
268 dump is completely finished - either successfully or with a fatal error.
269 Unlike most callbacks, this one takes keyword arguments, since it has so many
270 parameters.
271
272   $dump_cb->(
273         result => $result,
274         device_errors => $device_errors,
275         config_denial_message => $cdm,
276         size => $size,
277         duration => $duration,
278         total_duration => $total_duration,
279         nparts => $nparts);
280
281 All parameters will be present on every call, although the order is not
282 guaranteed.
283
284 The C<result> is one of C<"FAILED">, C<"PARTIAL">, or C<"DONE">.  Even when
285 C<dump_cb> reports a fatal error, C<result> may be C<"PARTIAL"> if some data
286 was written successfully.
287
288 The C<device_error> key points to a list of errors, each given as a string,
289 that describe what went wrong to cause the dump to fail.  The
290 C<config_denial_message> parrots the reason provided by C<$perm_cb> (see below)
291 for denying use of a new tape if the cause was 'config', and is C<undef>
292 otherwise.
293
294 The final parameters, C<size> (in bytes), C<duration>, C<total_duration> (in
295 seconds), and C<nparts> describe the total transfer, and are a sum of all of
296 the parts written to the device.  Note that C<nparts> does not include any
297 empty trailing parts.  Note that C<duration> does not include time spent
298 operating the changer, while C<total_duration> reflects the time from the
299 C<start_dump> call to the invocation of the C<dump_cb>.
300
301 =head3 Cancelling a Dump
302
303 After you have requested a transfer destination, the scribe is poised to begin the
304 transfer.  If you cannot actually perform the transfer for some reason, you'll need
305 to go through the motions all the same, but cancel the operation immediately.  That
306 can be done by calling C<cancel_dump>:
307
308   $scribe->cancel_dump(
309         xfer => $xfer,
310         dump_cb => $dump_cb);
311
312 =head2 QUIT
313
314 When all of the dumpfiles are transferred, call the C<quit> method to
315 release any resources and clean up.  This method takes a typical
316 C<finished_cb>.
317
318   $scribe->quit(finished_cb => sub {
319     print "ALL DONE!\n";
320   });
321
322 =head2 GET_BYTES_WRITTEN
323
324 The C<get_bytes_written> returns the number of bytes written to the device at
325 the time of the call, and is meant to be used for status reporting.  This value
326 is updated at least as each part is finished; for some modes of operation, it
327 is updated continuously.  Notably, DirectTCP transfers do not update
328 continuously.
329
330 =head2 START_SCAN
331
332 The C<start_scan> method initiate a scan of the changer to find a usable tape.
333
334 =head1 FEEDBACK
335
336 The C<Amanda::Taper::Scribe::Feedback> class is intended to be
337 subclassed by the user.  It provides a number of notification methods
338 that enable the historical logging and driver/taper interactions
339 required by Amanda.  The parent class does nothing of interest, but
340 allows subclasses to omit methods they do not need.
341
342 The C<request_volume_permission> method provides a means for the caller
343 to limit the number of volumes the Scribe consumes.  It is called as
344
345   $fb->request_volume_permission(perm_cb => $cb);
346
347 The C<perm_cb> is a callback which expects a hash as arguments. If C<allow>
348 is set, then the scribe is allowed to use a new volume, if C<scribe> is set,
349 then the xfer must be transfered to that scribe, otherwise a C<cause>
350 and a C<message> describing why a new volume should not be used. must be
351 set. e.g.
352
353   perm_cb->(allow => 1);
354   perm_cb->(scribe => $new_scribe);
355   perm_cb->(cause => 'config', message => $message);
356   perm_cb->(cause => 'error', message => $message);
357
358 A cause of 'config' indicates that the denial is due to the user's
359 configuration, and thus should not be presented as an error.  The default
360 implementation always calls C<< perm_cb->() >>.
361
362 All of the remaining methods are notifications, and do not take a
363 callback.
364
365   $fb->scribe_notif_new_tape(
366         error => $error,
367         volume_label => $volume_label);
368
369 The Scribe calls C<scribe_notif_new_tape> when a new volume is started.  If the
370 C<volume_label> is undefined, then the volume was not successfully
371 relabled, and its previous contents may still be available.  If C<error>
372 is defined, then no useful data was written to the volume.  Note that
373 C<error> and C<volume_label> may I<both> be defined if the previous
374 contents of the volume were erased, but no useful, new data was written
375 to the volume.
376
377 This method will be called exactly once for every call to
378 C<request_volume_permission> that calls back with C<< perm_cb->() >>.
379
380   $fb->scribe_notif_tape_done(
381         volume_label => $volume_label,
382         size => $size,
383         num_files => $num_files);
384
385 The C<scribe_notif_tape_done> method is called after a volume is completely
386 written and its reservation has been released.  Note that the scribe waits
387 until the last possible moment to release a reservation, so this may be called
388 later than expected, e.g., during a C<quit> invocation.
389
390   $fb->scribe_notif_part_done(
391         partnum => $partnum,
392         fileno => $fileno,
393         successful => $successful,
394         size => $size,
395         duration => $duration);
396
397 The Scribe calls C<scribe_notif_part_done> for each part written to the volume,
398 including partial parts.  If the part was not written successfully, then
399 C<successful> is false.  The C<size> is in bytes, and the C<duration> is
400 a floating-point number of seconds.  If a part fails before a new device
401 file is created, then C<fileno> may be zero.
402
403 Finally, the Scribe sends a few historically significant trace log messages
404 via C<scribe_notif_log_info>:
405
406   $fb->scribe_notif_log_info(
407         message => $message);
408
409 A typical Feedback subclass might begin like this:
410
411   package main::Feedback;
412   use base 'Amanda::Taper::Scribe::Feedback';
413
414   sub request_volume_permission {
415     my $self = shift;
416     my %params = @_;
417
418     $params{'perm_cb'}->(cause => "error", message => "NO VOLUMES FOR YOU!");
419   }
420
421 =cut
422
423 package Amanda::Taper::Scribe;
424
425 use strict;
426 use warnings;
427 use Carp;
428
429 use Amanda::Xfer qw( :constants );
430 use Amanda::Device qw( :constants );
431 use Amanda::Header;
432 use Amanda::Debug qw( :logging );
433 use Amanda::MainLoop;
434 use Amanda::Tapelist;
435 use Amanda::Config qw( :getconf config_dir_relative );
436 use base qw( Exporter );
437
438 our @EXPORT_OK = qw( get_splitting_args_from_config );
439
440 sub new {
441     my $class = shift;
442     my %params = @_;
443
444     my $decide_debug = $Amanda::Config::debug_taper || $params{'debug'};
445     for my $rq_param (qw(taperscan feedback)) {
446         croak "required parameter '$rq_param' mising"
447             unless exists $params{$rq_param};
448     }
449
450     my $self = {
451         taperscan => $params{'taperscan'},
452         feedback => $params{'feedback'},
453         debug => $decide_debug,
454         eject_volume => $params{'eject_volume'},
455         write_timestamp => undef,
456         started => 0,
457
458         # device handling, and our current device and reservation
459         devhandling => Amanda::Taper::Scribe::DevHandling->new(
460             taperscan => $params{'taperscan'},
461             feedback => $params{'feedback'},
462         ),
463         reservation => undef,
464         device => undef,
465         device_size => undef,
466         device_at_eom => undef, # device still exists, but is full
467         close_volume => undef,
468
469         # callback passed to start_dump
470         dump_cb => undef,
471
472         # information for the current dumpfile
473         dump_header => undef,
474         retry_part_on_peom => undef,
475         allow_split => undef,
476         xfer => undef,
477         xdt => undef,
478         xdt_ready => undef,
479         start_part_on_xdt_ready => 0,
480         size => 0,
481         duration => 0.0,
482         dump_start_time => undef,
483         last_part_successful => 0,
484         started_writing => 0,
485         device_errors => [],
486         config_denial_message => undef,
487     };
488
489     return bless ($self, $class);
490 }
491
492 sub start {
493     my $self = shift;
494     my %params = @_;
495
496     for my $rq_param (qw(write_timestamp finished_cb)) {
497         croak "required parameter '$rq_param' missing"
498             unless exists $params{$rq_param};
499     }
500
501     die "scribe already started" if $self->{'started'};
502
503     $self->dbg("starting");
504     $self->{'write_timestamp'} = $params{'write_timestamp'};
505
506     # start up the DevHandling object, making sure we know
507     # when it's done with its startup process
508     $self->{'devhandling'}->start(finished_cb => sub {
509         $self->{'started'} = 1;
510         $params{'finished_cb'}->(@_);
511     });
512 }
513
514 sub quit {
515     my $self = shift;
516     my %params = @_;
517
518     # since there's little other option than to barrel on through the
519     # quitting procedure, quit() just accumulates its error messages
520     # and, if necessary, concantenates them for the finished_cb.
521     my @errors;
522
523     my $steps = define_steps
524         cb_ref => \$params{'finished_cb'};
525
526     step setup => sub {
527         $self->dbg("quitting");
528
529         if ($self->{'xfer'}) {
530             die "Scribe cannot quit while a transfer is active";
531             # Supporting this would be complicated:
532             # - cancel the xfer and wait for it to complete
533             # - ensure that the taperscan not be started afterward
534             # and isn't required for normal Amanda operation.
535         }
536
537         $steps->{'release'}->();
538     };
539
540     step release => sub {
541         if ($self->{'reservation'}) {
542             $self->_release_reservation(finished_cb => $steps->{'released'});
543         } else {
544             $steps->{'stop_devhandling'}->();
545         }
546     };
547
548     step released => sub {
549         my ($err) = @_;
550         push @errors, "$err" if $err;
551
552         $self->{'reservation'} = undef;
553
554         $steps->{'stop_devhandling'}->();
555     };
556
557     step stop_devhandling => sub {
558         $self->{'devhandling'}->quit(finished_cb => $steps->{'stopped_devhandling'});
559     };
560
561     step stopped_devhandling => sub {
562         my ($err) = @_;
563         push @errors, "$err" if $err;
564
565         my $errmsg = join("; ", @errors) if @errors >= 1;
566         $params{'finished_cb'}->($errmsg);
567     };
568 }
569
570 sub get_device {
571     my $self = shift;
572
573     # Can return a device we already have, or "peek" at the
574     # DevHandling object's device.
575     # It might not have right permission on the device.
576
577     my $device;
578     if (defined $self->{'device'}) {
579         $device = $self->{'device'};
580     } else {
581         $device = $self->{'devhandling'}->peek_device();
582     }
583     return $device;
584 }
585
586 sub check_data_path {
587     my $self = shift;
588     my $data_path = shift;
589
590     my $device = $self->get_device();
591
592     if (!defined $device) {
593         die "no device is available to check the datapath";
594     }
595
596     my $use_directtcp = $device->directtcp_supported();
597
598     my $xdt;
599     if (!$use_directtcp) {
600         if ($data_path eq 'DIRECTTCP') {
601             return "Can't dump DIRECTTCP data-path dle to a device ('" .
602                    $device->device_name .
603                    "') that doesn't support it";
604         }
605     }
606     return undef;
607 }
608
609 sub start_scan {
610     my $self = shift;
611
612     $self->{'devhandling'}->start_scan();
613 }
614
615 # Get a transfer destination; does not use a callback
616 sub get_xfer_dest {
617     my $self = shift;
618     my %params = @_;
619
620     for my $rq_param (qw(max_memory)) {
621         croak "required parameter '$rq_param' missing"
622             unless exists $params{$rq_param};
623     }
624
625     die "not yet started"
626         unless $self->{'write_timestamp'} and $self->{'started'};
627     die "xfer element already returned"
628         if ($self->{'xdt'});
629     die "xfer already running"
630         if ($self->{'xfer'});
631
632     $self->{'xfer'} = undef;
633     $self->{'xdt'} = undef;
634     $self->{'size'} = 0;
635     $self->{'duration'} = 0.0;
636     $self->{'nparts'} = undef;
637     $self->{'dump_start_time'} = undef;
638     $self->{'last_part_successful'} = 1;
639     $self->{'started_writing'} = 0;
640     $self->{'device_errors'} = [];
641     $self->{'config_denial_message'} = undef;
642
643     # set the callback
644     $self->{'dump_cb'} = undef;
645     $self->{'retry_part_on_peom'} = 1;
646     $self->{'allow_split'} = 0;
647     $self->{'start_part_on_xdt_ready'} = 0;
648
649     # start getting parameters together to determine what kind of splitting
650     # and caching we're going to do
651     my $part_size = $params{'part_size'} || 0;
652     my ($use_mem_cache, $disk_cache_dirname) = (0, undef);
653     my $can_cache_inform = $params{'can_cache_inform'};
654     my $part_cache_type = $params{'part_cache_type'} || 'none';
655     my $allow_split = $params{'allow_split'};
656
657     my $xdt_first_dev = $self->get_device();
658     if (!defined $xdt_first_dev) {
659         die "no device is available to create an xfer_dest";
660     }
661     my $leom_supported = $xdt_first_dev->property_get("leom");
662     my $use_directtcp = $xdt_first_dev->directtcp_supported();
663
664     # figure out the destination type we'll use, based on the circumstances
665     my ($dest_type, $dest_text);
666     if ($use_directtcp) {
667         $dest_type = 'directtcp';
668         $dest_text = "using DirectTCP";
669     } elsif ($can_cache_inform && $leom_supported) {
670         $dest_type = 'splitter';
671         $dest_text = "using LEOM (falling back to holding disk as cache)";
672     } elsif ($leom_supported) {
673         $dest_type = 'splitter';
674         $dest_text = "using LEOM detection (no caching)";
675     } elsif ($can_cache_inform) {
676         $dest_type = 'splitter';
677         $dest_text = "using cache_inform";
678     } elsif ($part_cache_type ne 'none') {
679         $dest_type = 'cacher';
680
681         # we'll be caching, so apply the maximum size
682         my $part_cache_max_size = $params{'part_cache_max_size'} || 0;
683         $part_size = $part_cache_max_size
684             if ($part_cache_max_size and $part_cache_max_size < $part_size);
685
686         # and figure out what kind of caching to apply
687         if ($part_cache_type eq 'memory') {
688             $use_mem_cache = 1;
689         } else {
690             # note that we assume this has already been checked; if it's wrong,
691             # the xfer element will just fail immediately
692             $disk_cache_dirname = $params{'part_cache_dir'};
693         }
694         $dest_text = "using cache type '$part_cache_type'";
695     } else {
696         $dest_type = 'splitter';
697         $dest_text = "using no cache (PEOM will be fatal)";
698
699         # no directtcp, no caching, no cache_inform, and no LEOM, so a PEOM will be fatal
700         $self->{'retry_part_on_peom'} = 0;
701     }
702
703     if ($allow_split &&
704         ($can_cache_inform ||
705          !defined($part_cache_type) ||
706          $part_cache_type eq 'disk' ||
707          $part_cache_type eq 'memory' ||
708          $leom_supported)) {
709         $self->{'allow_split'} = 1;
710     } else {
711         $self->{'allow_split'} = 0;
712     }
713
714     $self->{'retry_part_on_peom'} = 0 if !$self->{'allow_split'};
715
716     debug("Amanda::Taper::Scribe preparing to write, part size $part_size, "
717         . "$dest_text ($dest_type) "
718         . ($leom_supported? " (LEOM supported)" : " (no LEOM)"));
719
720     # set the device to verbose logging if we're in debug mode
721     if ($self->{'debug'}) {
722         $xdt_first_dev->property_set("verbose", 1);
723     }
724
725     my $xdt;
726     if ($dest_type eq 'directtcp') {
727         $xdt = Amanda::Xfer::Dest::Taper::DirectTCP->new(
728             $xdt_first_dev, $part_size);
729         $self->{'xdt_ready'} = 0; # xdt isn't ready until we get XMSG_READY
730     } elsif ($dest_type eq 'splitter') {
731         $xdt = Amanda::Xfer::Dest::Taper::Splitter->new(
732             $xdt_first_dev, $params{'max_memory'}, $part_size, $can_cache_inform);
733         $self->{'xdt_ready'} = 1; # xdt is ready immediately
734     } else {
735         $xdt = Amanda::Xfer::Dest::Taper::Cacher->new(
736             $xdt_first_dev, $params{'max_memory'}, $part_size,
737             $use_mem_cache, $disk_cache_dirname);
738         $self->{'xdt_ready'} = 1; # xdt is ready immediately
739     }
740     $self->{'start_part_on_xdt_ready'} = 0;
741     $self->{'xdt'} = $xdt;
742
743     return $xdt;
744 }
745
746 sub start_dump {
747     my $self = shift;
748     my %params = @_;
749
750     die "no xfer dest set up; call get_xfer_dest first"
751         unless defined $self->{'xdt'};
752
753     # get the header ready for writing (totalparts was set by the caller)
754     $self->{'dump_header'} = $params{'dump_header'};
755     $self->{'dump_header'}->{'partnum'} = 1;
756
757     # set up the dump_cb for when this dump is done, and keep the xfer
758     $self->{'dump_cb'} = $params{'dump_cb'};
759     $self->{'xfer'} = $params{'xfer'};
760     $self->{'dump_start_time'} = time;
761
762     # and start the part
763     $self->_start_part();
764 }
765
766 sub cancel_dump {
767     my $self = shift;
768     my %params = @_;
769
770     die "no xfer dest set up; call get_xfer_dest first"
771         unless defined $self->{'xdt'};
772
773     # set up the dump_cb for when this dump is done, and keep the xfer
774     $self->{'dump_cb'} = $params{'dump_cb'};
775     $self->{'xfer'} = $params{'xfer'};
776
777     # XXX The cancel should call dump_cb, but right now the xfer stays hung in
778     # accept.  So we leave the xfer to its hang, and dump_cb is called and xdt
779     # and xfer are set to undef.  This should be fixed in 3.2.
780
781     $self->{'xfer'}->cancel();
782
783     $self->{'dump_cb'}->(
784         result => "FAILED",
785         device_errors => [],
786         config_denial_message => undef,
787         size => 0,
788         duration => 0.0,
789         total_duration => 0,
790         nparts => 0);
791     $self->{'xdt'} = undef;
792     $self->{'xfer'} = undef;
793 }
794
795 sub close_volume {
796     my $self = shift;
797
798     $self->{'close_volume'} = 1;
799 }
800
801 sub get_bytes_written {
802     my ($self) = @_;
803
804     if (defined $self->{'xdt'}) {
805         return $self->{'size'} + $self->{'xdt'}->get_part_bytes_written();
806     } else {
807         return $self->{'size'};
808     }
809 }
810
811 sub _start_part {
812     my $self = shift;
813
814     $self->dbg("trying to start part");
815
816     # if the xdt isn't ready yet, wait until it is; note that the XDT is still
817     # using the device right now, so we can't even label it yet.
818     if (!$self->{'xdt_ready'}) {
819         $self->dbg("XDT not ready yet; waiting until it is");
820         $self->{'start_part_on_xdt_ready'} = 1;
821         return
822     }
823
824     if ($self->{'close_volume'}) {
825         $self->{'close_volume'} = undef;
826         return $self->_get_new_volume();
827     }
828
829     # we need an actual, permitted device at this point, so if we don't have
830     # one, then defer this start_part call until we do.  The device may still
831     # exist, but be at EOM, if the last dump failed at EOM and was not retried
832     # on a new volume.
833     if (!$self->{'device'} or $self->{'device_at_eom'}) {
834         # _get_new_volume calls _start_part when it has a new volume in hand
835         return $self->_get_new_volume();
836     }
837
838     # if the dump wasn't successful, and we're not splitting, then bail out.  It's
839     # up to higher-level components to re-try this dump on a new volume, if desired.
840     # Note that this should be caught in the XMSG_PART_DONE handler -- this is just
841     # here for backup.
842     if (!$self->{'last_part_successful'} and !$self->{'retry_part_on_peom'}) {
843         $self->_operation_failed(device_error => "No space left on device (uncaught)");
844         return;
845     }
846
847     # and start writing this part
848     $self->{'started_writing'} = 1;
849     $self->dbg("resuming transfer");
850     $self->{'xdt'}->start_part(!$self->{'last_part_successful'},
851                                $self->{'dump_header'});
852 }
853
854 sub handle_xmsg {
855     my $self = shift;
856     my ($src, $msg, $xfer) = @_;
857
858     if ($msg->{'type'} == $XMSG_DONE) {
859         $self->_xmsg_done($src, $msg, $xfer);
860         return;
861     }
862
863     # for anything else we only pay attention to messages from
864     # our own element
865     if ($msg->{'elt'} == $self->{'xdt'}) {
866         $self->dbg("got msg from xfer dest: $msg");
867         if ($msg->{'type'} == $XMSG_PART_DONE) {
868             $self->_xmsg_part_done($src, $msg, $xfer);
869         } elsif ($msg->{'type'} == $XMSG_READY) {
870             $self->_xmsg_ready($src, $msg, $xfer);
871         } elsif ($msg->{'type'} == $XMSG_ERROR) {
872             $self->_xmsg_error($src, $msg, $xfer);
873         }
874     }
875 }
876
877 sub _xmsg_part_done {
878     my $self = shift;
879     my ($src, $msg, $xfer) = @_;
880
881     # this handles successful zero-byte parts as a special case - they
882     # are an implementation detail of the splitting done by the transfer
883     # destination.
884
885     if ($msg->{'successful'} and $msg->{'size'} == 0) {
886         $self->dbg("not notifying for empty, successful part");
887     } else {
888         # double-check partnum
889         die "Part numbers do not match!"
890             unless ($self->{'dump_header'}->{'partnum'} == $msg->{'partnum'});
891
892         # notify
893         $self->{'feedback'}->scribe_notif_part_done(
894             partnum => $msg->{'partnum'},
895             fileno => $msg->{'fileno'},
896             successful => $msg->{'successful'},
897             size => $msg->{'size'},
898             duration => $msg->{'duration'});
899
900         # increment nparts here, so empty parts are not counted
901         $self->{'nparts'} = $msg->{'partnum'};
902     }
903
904     $self->{'last_part_successful'} = $msg->{'successful'};
905
906     if ($msg->{'successful'}) {
907         $self->{'device_size'} += $msg->{'size'};
908         $self->{'size'} += $msg->{'size'};
909         $self->{'duration'} += $msg->{'duration'};
910     }
911
912     if (!$msg->{'eof'}) {
913         # update the header for the next dumpfile, if this was a non-empty part
914         if ($msg->{'successful'} and $msg->{'size'} != 0) {
915             $self->{'dump_header'}->{'partnum'}++;
916         }
917
918         if ($msg->{'eom'}) {
919             # if there's an error finishing the device, it's probably just carryover
920             # from the error the Xfer::Dest::Taper encountered while writing to the
921             # device, so we ignore it.
922             if (!$self->{'device'}->finish()) {
923                 my $devname = $self->{'device'}->device_name;
924                 my $errmsg = $self->{'device'}->error_or_status();
925                 $self->dbg("ignoring error while finishing device '$devname': $errmsg");
926             }
927
928             # if the part failed..
929             if (!$msg->{'successful'} || !$self->{'allow_split'}) {
930                 # if no caching was going on, then the dump has failed
931                 if (!$self->{'retry_part_on_peom'}) {
932                     # mark this device as at EOM, since we are not going to look
933                     # for another one yet
934                     $self->{'device_at_eom'} = 1;
935
936                     my $msg = "No space left on device";
937                     if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
938                         $msg = $self->{'device'}->error_or_status();
939                     }
940                     $self->_operation_failed(device_error => "$msg, splitting not enabled");
941                     return;
942                 }
943
944                 # log a message for amreport
945                 $self->{'feedback'}->scribe_notif_log_info(
946                     message => "Will request retry of failed split part.");
947             }
948
949             # get a new volume, then go on to the next part
950             $self->_get_new_volume();
951         } else {
952             # if the part was unsuccessful, but the xfer dest has reason to believe
953             # this is not due to EOM, then the dump is done
954             if (!$msg->{'successful'}) {
955                 if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
956                     $msg = $self->{'device'}->error_or_status();
957                     $self->_operation_failed(device_error => $msg);
958                 } else {
959                     $self->_operation_failed();
960                 }
961                 return;
962             }
963
964             # no EOM -- go on to the next part
965             $self->_start_part();
966         }
967     }
968 }
969
970 sub _xmsg_ready {
971     my $self = shift;
972     my ($src, $msg, $xfer) = @_;
973
974     $self->dbg("XDT is ready");
975     $self->{'xdt_ready'} = 1;
976     if ($self->{'start_part_on_xdt_ready'}) {
977         $self->{'start_part_on_xdt_ready'} = 0;
978         $self->_start_part();
979     }
980 }
981
982 sub _xmsg_error {
983     my $self = shift;
984     my ($src, $msg, $xfer) = @_;
985
986     # XMSG_ERROR from the XDT is always fatal
987     $self->_operation_failed(device_error => $msg->{'message'});
988 }
989
990 sub _xmsg_done {
991     my $self = shift;
992     my ($src, $msg, $xfer) = @_;
993
994     if ($msg->{'type'} == $XMSG_DONE) {
995         $self->dbg("transfer is complete");
996         $self->_dump_done();
997     }
998 }
999
1000 sub _dump_done {
1001     my $self = shift;
1002
1003     my $result;
1004
1005     # determine the correct final status - DONE if we're done, PARTIAL
1006     # if we've started writing to the volume, otherwise FAILED
1007     if (@{$self->{'device_errors'}} or $self->{'config_denial_message'} or
1008         !$self->{'last_part_successful'}) {
1009         $result = $self->{'started_writing'}? 'PARTIAL' : 'FAILED';
1010     } else {
1011         $result = 'DONE';
1012     }
1013
1014     my $dump_cb = $self->{'dump_cb'};
1015     my %dump_cb_args = (
1016         result => $result,
1017         device_errors => $self->{'device_errors'},
1018         config_denial_message => $self->{'config_denial_message'},
1019         size => $self->{'size'},
1020         duration => $self->{'duration'},
1021         total_duration => time - $self->{'dump_start_time'},
1022         nparts => $self->{'nparts'});
1023
1024     # reset everything and let the original caller know we're done
1025     $self->{'xfer'} = undef;
1026     $self->{'xdt'} = undef;
1027     $self->{'dump_header'} = undef;
1028     $self->{'dump_cb'} = undef;
1029     $self->{'size'} = 0;
1030     $self->{'duration'} = 0.0;
1031     $self->{'nparts'} = undef;
1032     $self->{'dump_start_time'} = undef;
1033     $self->{'device_errors'} = [];
1034     $self->{'config_denial_message'} = undef;
1035
1036     # and call the callback
1037     $dump_cb->(%dump_cb_args);
1038 }
1039
1040 # keyword parameters are utilities to the caller: either specify
1041 # device_error to add to the device_errors list or config_denial_message
1042 # to set the corresponding key in $self.
1043 sub _operation_failed {
1044     my $self = shift;
1045     my %params = @_;
1046
1047     my $error_message = $params{'device_error'}
1048                      || $params{'config_denial_message'}
1049                      || 'input error';
1050     $self->dbg("operation failed: $error_message");
1051
1052     # tuck the message away as desired
1053     push @{$self->{'device_errors'}}, $params{'device_error'}
1054         if defined $params{'device_error'};
1055     $self->{'config_denial_message'} = $params{'config_denial_message'} 
1056         if $params{'config_denial_message'};
1057
1058     # cancelling the xdt will eventually cause an XMSG_DONE, which will notice
1059     # the error and set the result correctly; but if there's no xfer, then we
1060     # can just call _dump_done directly.
1061     if (defined $self->{'xfer'}) {
1062         $self->dbg("cancelling the transfer: $error_message");
1063
1064         $self->{'xfer'}->cancel();
1065     } else {
1066         if (defined $self->{'dump_cb'}) {
1067             # _dump_done constructs the dump_cb from $self parameters
1068             $self->_dump_done();
1069         } else {
1070             die "error with no callback to handle it: $error_message";
1071         }
1072     }
1073 }
1074
1075 # release the outstanding reservation, calling scribe_notif_tape_done
1076 # after the release
1077 sub _release_reservation {
1078     my $self = shift;
1079     my %params = @_;
1080     my @errors;
1081     my $do_eject = 0;
1082
1083     my ($label, $fm, $kb);
1084
1085     # if we've already written a volume, log it
1086     if ($self->{'device'} and defined $self->{'device'}->volume_label) {
1087         $do_eject = 1 if $self->{'eject_volume'};
1088         $label = $self->{'device'}->volume_label();
1089         $fm = $self->{'device'}->file();
1090         $kb = $self->{'device_size'} / 1024;
1091
1092         # log a message for amreport
1093         $self->{'feedback'}->scribe_notif_log_info(
1094             message => "tape $label kb $kb fm $fm [OK]");
1095     }
1096
1097     # finish the device if it isn't finished yet
1098     if ($self->{'device'}) {
1099         my $already_in_error = $self->{'device'}->status() != $DEVICE_STATUS_SUCCESS;
1100
1101         if (!$self->{'device'}->finish() && !$already_in_error) {
1102             push @errors, $self->{'device'}->error_or_status();
1103         }
1104     }
1105     $self->{'device'} = undef;
1106     $self->{'device_at_eom'} = 0;
1107
1108     $self->{'reservation'}->release(eject => $do_eject, finished_cb => sub {
1109         my ($err) = @_;
1110         push @errors, "$err" if $err;
1111
1112         $self->{'reservation'} = undef;
1113
1114         # notify the feedback that we've finished and released a tape
1115         if ($label) {
1116             return $self->{'feedback'}->scribe_notif_tape_done(
1117                 volume_label => $label,
1118                 size => $kb * 1024,
1119                 num_files => $fm,
1120                 finished_cb => sub {
1121                  $params{'finished_cb'}->(@errors? join("; ", @errors) : undef);
1122                 });
1123         }
1124
1125         $params{'finished_cb'}->(@errors? join("; ", @errors) : undef);
1126     });
1127 }
1128
1129 # invoke the devhandling to get a new device, with all of the requisite
1130 # notifications and checks and whatnot.  On *success*, call _start_dump; on
1131 # failure, call other appropriate methods.
1132 sub _get_new_volume {
1133     my $self = shift;
1134
1135     # release first, if necessary
1136     if ($self->{'reservation'}) {
1137         $self->_release_reservation(finished_cb => sub {
1138             my ($error) = @_;
1139
1140             if ($error) {
1141                 $self->_operation_failed(device_error => $error);
1142             } else {
1143                 $self->_get_new_volume();
1144             }
1145         });
1146
1147         return;
1148     }
1149
1150     $self->{'devhandling'}->get_volume(volume_cb => sub { $self->_volume_cb(@_); });
1151 }
1152
1153 sub _volume_cb  {
1154     my $self = shift;
1155     my ($scan_error, $config_denial_message, $error_denial_message,
1156         $reservation, $new_label, $access_mode, $is_new, $new_scribe) = @_;
1157
1158     # note that we prefer the config_denial_message over the scan error.  If
1159     # both occurred, then the results of the scan are immaterial -- we
1160     # shouldn't have been looking for a new volume anyway.
1161
1162     if ($config_denial_message) {
1163         $self->_operation_failed(config_denial_message => $config_denial_message);
1164         return;
1165     }
1166
1167     if ($error_denial_message) {
1168         $self->_operation_failed(device_error => $error_denial_message);
1169         return;
1170     }
1171
1172     if ($new_scribe) {
1173         # Transfer the xfer to the new scribe
1174         $self->dbg("take scribe from");
1175
1176         $new_scribe->{'dump_cb'} = $self->{'dump_cb'};
1177         $new_scribe->{'dump_header'} = $self->{'dump_header'};
1178         $new_scribe->{'retry_part_on_peom'} = $self->{'retry_part_on_peom'};
1179         $new_scribe->{'allow_split'} = $self->{'allow_split'};
1180         $new_scribe->{'split_method'} = $self->{'split_method'};
1181         $new_scribe->{'xfer'} = $self->{'xfer'};
1182         $new_scribe->{'xdt'} = $self->{'xdt'};
1183         $new_scribe->{'xdt_ready'} = $self->{'xdt_ready'};
1184         $new_scribe->{'start_part_on_xdt_ready'} = $self->{'start_part_on_xdt_ready'};
1185         $new_scribe->{'size'} = $self->{'size'};
1186         $new_scribe->{'duration'} = $self->{'duration'};
1187         $new_scribe->{'dump_start_time'} = $self->{'dump_start_time'};
1188         $new_scribe->{'last_part_successful'} = $self->{'last_part_successful'};
1189         $new_scribe->{'started_writing'} = $self->{'started_writing'};
1190         $new_scribe->{'feedback'} = $self->{'feedback'};
1191         $new_scribe->{'devhandling'}->{'feedback'} = $self->{'feedback'};
1192         $self->{'dump_header'} = undef;
1193         $self->{'dump_cb'} = undef;
1194         $self->{'xfer'} = undef;
1195         $self->{'xdt'} = undef;
1196         $self->{'xdt_ready'} = undef;
1197         $self->{'dump_start_time'} = undef;
1198         $self->{'started_writing'} = 0;
1199         $self->{'feedback'} = undef;
1200         if (defined $new_scribe->{'device'}) {
1201             $new_scribe->{'xdt'}->use_device($new_scribe->{'device'});
1202         }
1203         # start it
1204         $new_scribe->_start_part();
1205
1206         return;
1207     }
1208
1209     if ($scan_error) {
1210         # we had permission to use a tape, but didn't find a tape, so we need
1211         # to notify of such
1212         $self->{'feedback'}->scribe_notif_new_tape(
1213             error => $scan_error,
1214             volume_label => undef);
1215
1216         $self->_operation_failed(device_error => $scan_error);
1217         return;
1218     }
1219
1220     $self->dbg("got new volume; writing new label");
1221
1222     # from here on, if an error occurs, we must send scribe_notif_new_tape, and look
1223     # for a new volume
1224     $self->{'reservation'} = $reservation;
1225     $self->{'device_size'} = 0;
1226     my $device = $self->{'device'} = $reservation->{'device'};
1227
1228     # turn on verbose logging now, if we need it
1229     if ($self->{'debug'}) {
1230         $reservation->{'device'}->property_set("verbose", 1);
1231     }
1232
1233     # read the label once, to get a "before" snapshot (see below)
1234     my $old_label;
1235     my $old_timestamp;
1236     if (!$is_new) {
1237         if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1238             && !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1239             $self->{'feedback'}->scribe_notif_new_tape(
1240                 error => "while reading label on new volume: " . $device->error_or_status(),
1241                 volume_label => undef);
1242
1243             return $self->_get_new_volume();
1244         }
1245         $old_label = $device->volume_label;
1246         $old_timestamp = $device->volume_time;
1247     }
1248
1249     # inform the xdt about this new device before starting it
1250     $self->{'xdt'}->use_device($device);
1251
1252     my $cbX = sub {};
1253     my $steps = define_steps
1254         cb_ref => \$cbX;
1255
1256     step device_start => sub {
1257         $self->_device_start($reservation, $access_mode, $new_label, $is_new,
1258                              $steps->{'device_started'});
1259     };
1260
1261     step device_started => sub {
1262         my $result = shift;
1263
1264         if ($result == 0) {
1265             # try reading the label to see whether we erased the tape
1266             my $erased = 0;
1267             CHECK_READ_LABEL: {
1268             # don't worry about erasing new tapes
1269                 if ($is_new) {
1270                     last CHECK_READ_LABEL;
1271                 }
1272
1273                 $device->finish();
1274                 $device->read_label();
1275
1276                 # does the device think something is broken now?
1277                 if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1278                     and !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1279                     $erased = 1;
1280                     last CHECK_READ_LABEL;
1281                 }
1282
1283                 # has the label changed?
1284                 my $vol_label = $device->volume_label;
1285                 if ((!defined $old_label and defined $vol_label)
1286                     or (defined $old_label and !defined $vol_label)
1287                     or (defined $old_label and $old_label ne $vol_label)) {
1288                     $erased = 1;
1289                     last CHECK_READ_LABEL;
1290                 }
1291
1292                 # has the timestamp changed?
1293                 my $vol_timestamp = $device->volume_time;
1294                 if ((!defined $old_timestamp and defined $vol_timestamp)
1295                     or (defined $old_timestamp and !defined $vol_timestamp)
1296                     or (defined $old_timestamp and $old_timestamp ne $vol_timestamp)) {
1297                     $erased = 1;
1298                     last CHECK_READ_LABEL;
1299                 }
1300             }
1301
1302             $self->{'feedback'}->scribe_notif_new_tape(
1303                 error => "while labeling new volume: " . $device->error_or_status(),
1304                 volume_label => $erased? $new_label : undef);
1305
1306             $self->_get_new_volume();
1307             return $cbX->();
1308         } elsif ($result != 1) {
1309             $self->{'feedback'}->scribe_notif_new_tape(
1310                 error => $result,
1311                 volume_label => undef);
1312             $self->_get_new_volume();
1313             return $cbX->();
1314         }
1315
1316         $new_label = $device->volume_label;
1317
1318         # success!
1319         $self->{'feedback'}->scribe_notif_new_tape(
1320             error => undef,
1321             volume_label => $new_label);
1322
1323         $self->{'reservation'}->set_label(label => $new_label,
1324             finished_cb => $steps->{'set_labelled'});
1325     };
1326
1327     step set_labelled => sub {
1328         my ($err) = @_;
1329         if ($err) {
1330             $self->{'feedback'}->scribe_notif_log_info(
1331                 message => "Error from set_label: $err");
1332             # fall through to start_part anyway...
1333         }
1334         $self->_start_part();
1335         return $cbX->();
1336     }
1337 }
1338
1339 # return 0 for device->start error
1340 # return 1 for success
1341 # return a message for others error
1342 sub _device_start {
1343     my $self = shift;
1344     my ($reservation, $access_mode, $new_label, $is_new, $finished_cb) = @_;
1345
1346     my $device = $reservation->{'device'};
1347     my $tl = $self->{'taperscan'}->{'tapelist'};
1348     my $meta;
1349
1350     if (!defined $tl) { # For Mock::Taperscan in installcheck
1351         if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1352             return $finished_cb->(0);
1353         } else {
1354             return $finished_cb->(1);
1355         }
1356     }
1357
1358     my $steps = define_steps
1359         cb_ref => \$finished_cb;
1360
1361     step setup => sub {
1362         return $reservation->get_meta_label(
1363                                 finished_cb => $steps->{'got_meta_label'});
1364     };
1365
1366     step got_meta_label => sub {
1367         my ($err, $meta) = @_;
1368
1369         if ($is_new) {
1370             # generate the new label and write it to the tapelist file
1371             $tl->reload(1);
1372             if (!$meta) {
1373                 ($meta, $err) = $reservation->make_new_meta_label();
1374                 if (defined $err) {
1375                     $tl->unlock();
1376                     return $finished_cb->($err);
1377                 }
1378             }
1379             ($new_label, my $err) = $reservation->make_new_tape_label(
1380                                         meta => $meta);
1381             if (!defined $new_label) {
1382                 $tl->unlock();
1383                 return $finished_cb->($err);
1384             }
1385             $tl->add_tapelabel('0', $new_label, undef, 1, $meta,
1386                                $reservation->{'barcode'});
1387             $tl->write();
1388             $self->dbg("generate new label '$new_label'");
1389         } elsif (!defined $meta) {
1390             $tl->reload(0);
1391             my $tle = $tl->lookup_tapelabel($new_label);
1392             my $meta = $tle->{'meta'};
1393         }
1394
1395         # write the label to the device
1396         if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1397             if ($is_new) {
1398                 # remove the generated label from the tapelist file
1399                 $tl->reload(1);
1400                 $tl->remove_tapelabel($new_label);
1401                 $tl->write();
1402             }
1403             return $finished_cb->(0);
1404         }
1405
1406         # rewrite the tapelist file
1407         $tl->reload(1);
1408         my $tle = $tl->lookup_tapelabel($new_label);
1409         $meta = $tle->{'meta'} if !$meta && $tle->{'meta'};
1410         $tl->remove_tapelabel($new_label);
1411         $tl->add_tapelabel($self->{'write_timestamp'}, $new_label,
1412                            $tle? $tle->{'comment'} : undef, 1, $meta,
1413                            $reservation->{'barcode'}, $device->block_size/1024);
1414         $tl->write();
1415
1416         $reservation->set_meta_label(meta => $meta,
1417                                      finished_cb => $steps->{'set_meta_label'});
1418     };
1419
1420     step set_meta_label => sub {
1421         return $finished_cb->(1);
1422     }
1423 }
1424
1425 sub dbg {
1426     my ($self, $msg) = @_;
1427     if ($self->{'debug'}) {
1428         debug("Amanda::Taper::Scribe: $msg");
1429     }
1430 }
1431
1432 sub get_splitting_args_from_config {
1433     my %params = @_;
1434
1435     my %splitting_args;
1436
1437     $splitting_args{'allow_split'} = 0;
1438     # if dle_splitting is false, then we don't split - easy.
1439     if (defined $params{'dle_allow_split'} and !$params{'dle_allow_split'}) {
1440         return %splitting_args;
1441     }
1442
1443     # utility for below
1444     my $have_space = sub {
1445         my ($dirname, $part_size) = @_;
1446
1447         use Carp;
1448         my $fsusage = Amanda::Util::get_fs_usage($dirname);
1449         confess "$dirname" if (!$fsusage);
1450
1451         my $avail = $fsusage->{'blocksize'} * $fsusage->{'bavail'};
1452         if ($avail < $part_size) {
1453             Amanda::Debug::debug("disk cache has $avail bytes available on $dirname, but " .
1454                                  "needs $part_size");
1455             return 0;
1456         } else {
1457             return 1;
1458         }
1459     };
1460
1461     # first, handle the alternate spellings for part_size and part_cache_type
1462     $params{'part_size'} = $params{'part_size_kb'} * 1024
1463         if (defined $params{'part_size_kb'});
1464
1465     if (defined $params{'part_cache_type_enum'}) {
1466         $params{'part_cache_type'} = 'none'
1467             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_NONE);
1468         $params{'part_cache_type'} = 'memory'
1469             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_MEMORY);
1470         $params{'part_cache_type'} = 'disk'
1471             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_DISK);
1472
1473         $params{'part_cache_type'} = 'unknown'
1474             unless defined $params{'part_cache_type'};
1475     }
1476
1477     # if any of the dle_* parameters are set, use those to set the part_*
1478     # parameters, which are emptied out first.
1479     if (defined $params{'dle_tape_splitsize'} or
1480         defined $params{'dle_split_diskbuffer'} or
1481         defined $params{'dle_fallback_splitsize'}) {
1482
1483         $params{'part_size'} = $params{'dle_tape_splitsize'} || 0;
1484         $params{'part_cache_type'} = 'none';
1485         $params{'part_cache_dir'} = undef;
1486         $params{'part_cache_max_size'} = undef;
1487
1488         # part cache type is memory unless we have a split_diskbuffer that fits the bill
1489         if ($params{'part_size'}) {
1490             $splitting_args{'allow_split'} = 1;
1491             $params{'part_cache_type'} = 'memory';
1492             if (defined $params{'dle_split_diskbuffer'}
1493                     and -d $params{'dle_split_diskbuffer'}) {
1494                 if ($have_space->($params{'dle_split_diskbuffer'}, $params{'part_size'})) {
1495                     # disk cache checks out, so use it
1496                     $params{'part_cache_type'} = 'disk';
1497                     $params{'part_cache_dir'} = $params{'dle_split_diskbuffer'};
1498                 } else {
1499                     my $msg = "falling back to memory buffer for splitting: " .
1500                                 "insufficient space in disk cache directory";
1501                     $splitting_args{'warning'} = $msg;
1502                 }
1503             }
1504         }
1505
1506         if ($params{'part_cache_type'} eq 'memory') {
1507             # fall back to 10M if fallback size is not given
1508             $params{'part_cache_max_size'} = $params{'dle_fallback_splitsize'} || 10*1024*1024;
1509         }
1510     } else {
1511         my $ps = $params{'part_size'};
1512         my $pcms = $params{'part_cache_max_size'};
1513         $ps = $pcms if (!defined $ps or (defined $pcms and $pcms < $ps));
1514         $splitting_args{'allow_split'} = 1 if ((defined $ps and $ps > 0) or
1515                                                $params{'leom_supported'});
1516
1517         # fail back from 'disk' to 'none' if the disk isn't set up correctly
1518         if (defined $params{'part_cache_type'} and
1519                     $params{'part_cache_type'} eq 'disk') {
1520             my $warning;
1521             if (!$params{'part_cache_dir'}) {
1522                 $warning = "no part-cache-dir specified; "
1523                             . "using part cache type 'none'";
1524             } elsif (!-d $params{'part_cache_dir'}) {
1525                 $warning = "part-cache-dir '$params{part_cache_dir} "
1526                             . "does not exist; using part cache type 'none'";
1527             } elsif (!$have_space->($params{'part_cache_dir'}, $ps)) {
1528                 $warning = "part-cache-dir '$params{part_cache_dir} "
1529                             . "has insufficient space; using part cache type 'none'";
1530             }
1531
1532             if (defined $warning) {
1533                 $splitting_args{'warning'} = $warning;
1534                 $params{'part_cache_type'} = 'none';
1535                 delete $params{'part_cache_dir'};
1536             }
1537         }
1538     }
1539
1540     $splitting_args{'part_size'} = $params{'part_size'}
1541         if defined($params{'part_size'});
1542     $splitting_args{'part_cache_type'} = $params{'part_cache_type'}
1543         if defined($params{'part_cache_type'});
1544     $splitting_args{'part_cache_dir'} = $params{'part_cache_dir'}
1545         if defined($params{'part_cache_dir'});
1546     $splitting_args{'part_cache_max_size'} = $params{'part_cache_max_size'}
1547         if defined($params{'part_cache_max_size'});
1548
1549     return %splitting_args;
1550 }
1551 ##
1552 ## Feedback
1553 ##
1554
1555 package Amanda::Taper::Scribe::Feedback;
1556
1557 sub request_volume_permission {
1558     my $self = shift;
1559     my %params = @_;
1560
1561     # sure, you can have as many volumes as you want!
1562     $params{'perm_cb'}->(allow => 1);
1563 }
1564
1565 sub scribe_notif_new_tape { }
1566 sub scribe_notif_part_done { }
1567 sub scribe_notif_log_info { }
1568 sub scribe_notif_tape_done {
1569     my $self = shift;
1570     my %params = @_;
1571
1572     $params{'finished_cb'}->();
1573 }
1574
1575 ##
1576 ## Device Handling
1577 ##
1578
1579 package Amanda::Taper::Scribe::DevHandling;
1580 use Amanda::MainLoop;
1581 use Carp;
1582 use Amanda::Debug qw( :logging );
1583
1584 # This class handles scanning for volumes, requesting permission for those
1585 # volumes (the driver likes to feel like it's in control), and providing those
1586 # volumes to the scribe on request.  These can all happen independently, but
1587 # the scribe cannot begin writing to a volume until all three have finished.
1588 # That is: the scan is finished, the driver has given its permission, and the
1589 # scribe has requested a volume.
1590 #
1591 # On start, the class starts scanning immediately, even though the scribe has
1592 # not requested a volume.  Subsequently, a new scan does not begin until the
1593 # scribe requests a volume.
1594 #
1595 # This class is "private" to Amanda::Taper::Scribe, so it is documented in
1596 # comments, rather than POD.
1597
1598 # Create a new DevHandling object.  Params are taperscan and feedback.
1599 sub new {
1600     my $class = shift;
1601     my %params = @_;
1602
1603     my $self = {
1604         taperscan => $params{'taperscan'},
1605         feedback => $params{'feedback'},
1606
1607         # is a scan currently running, or completed?
1608         scan_running => 0,
1609         scan_finished => 0,
1610         scan_error => undef,
1611
1612         # scan results
1613         reservation => undef,
1614         device => undef,
1615         volume_label => undef,
1616
1617         # requests for permissiont to use a new volume
1618         request_pending => 0,
1619         request_complete => 0,
1620         request_denied => 0,
1621         config_denial_message => undef,
1622         error_denial_message => undef,
1623
1624         volume_cb => undef, # callback for get_volume
1625         start_finished_cb => undef, # callback for start
1626     };
1627
1628     return bless ($self, $class);
1629 }
1630
1631 ## public methods
1632
1633 # Called at scribe startup, this starts the instance off with a scan.
1634 sub start {
1635     my $self = shift;
1636     my %params = @_;
1637
1638     $self->{'start_finished_cb'} = $params{'finished_cb'};
1639     $self->_start_scanning();
1640 }
1641
1642 sub quit {
1643     my $self = shift;
1644     my %params = @_;
1645
1646     for my $rq_param (qw(finished_cb)) {
1647         croak "required parameter '$rq_param' mising"
1648             unless exists $params{$rq_param};
1649     }
1650
1651     # since there's little other option than to barrel on through the
1652     # quitting procedure, quit() just accumulates its error messages
1653     # and, if necessary, concantenates them for the finished_cb.
1654     my @errors;
1655
1656     my $cleanup_cb = make_cb(cleanup_cb => sub {
1657         my ($error) = @_;
1658         push @errors, $error if $error;
1659
1660         $error = join("; ", @errors) if @errors >= 1;
1661
1662         $params{'finished_cb'}->($error);
1663     });
1664
1665     if ($self->{'reservation'}) {
1666         if ($self->{'device'}) {
1667             if (!$self->{'device'}->finish()) {
1668                 push @errors, $self->{'device'}->error_or_status();
1669             }
1670         }
1671
1672         $self->{'reservation'}->release(finished_cb => $cleanup_cb);
1673     } else {
1674         $cleanup_cb->(undef);
1675     }
1676 }
1677
1678 # Get an open, started device and label to start writing to.  The
1679 # volume_callback takes the following arguments:
1680 #   $scan_error -- error message, or undef if no error occurred
1681 #   $config_denial_reason -- config-related reason request was denied, or undef
1682 #   $error_denial_reason -- error-related reason request was denied, or undef
1683 #   $reservation -- Amanda::Changer reservation
1684 #   $device -- open, started device
1685 # It is the responsibility of the caller to close the device and release the
1686 # reservation when finished.  If $scan_error or $request_denied_info are
1687 # defined, then $reservation and $device will be undef.
1688 sub get_volume {
1689     my $self = shift;
1690     my (%params) = @_;
1691
1692     die "already processing a volume request"
1693         if ($self->{'volume_cb'});
1694
1695     $self->{'volume_cb'} = $params{'volume_cb'};
1696
1697     # kick off the relevant processes, if they're not already running
1698     $self->_start_request();
1699
1700     $self->_maybe_callback();
1701 }
1702
1703 # take a peek at the device we have, for which permission has not yet been
1704 # granted.  This will be undefined before the taperscan completes AND after
1705 # the volume_cb has been called.
1706 sub peek_device {
1707     my $self = shift;
1708
1709     return $self->{'device'};
1710 }
1711
1712 sub start_scan {
1713     my $self = shift;
1714
1715     if (!$self->{'scan_running'} && !$self->{'reservation'}) {
1716         $self->_start_scanning();
1717     }
1718 }
1719
1720 ## private methods
1721
1722 sub _start_scanning {
1723     my $self = shift;
1724
1725     return if $self->{'scan_running'} or $self->{'scan_finished'};
1726
1727     $self->{'scan_running'} = 1;
1728
1729     my $_user_msg_fn = sub {
1730         my %params = @_;
1731         if (exists($params{'slot_result'})) {
1732             if ($params{'does_not_match_labelstr'}) {
1733                 $self->{'feedback'}->scribe_notif_log_info(
1734                     message => "Slot $params{'slot'} with label $params{'label'} do not match labelstr");
1735             } elsif ($params{'not_in_tapelist'}) {
1736                 $self->{'feedback'}->scribe_notif_log_info(
1737                     message => "Slot $params{'slot'} with label $params{'label'} is not in the tapelist");
1738             } elsif ($params{'active'}) {
1739                 $self->{'feedback'}->scribe_notif_log_info(
1740                     message => "Slot $params{'slot'} with label $params{'label'} is not reusable");
1741             } elsif ($params{'not_autolabel'}) {
1742                 if ($params{'label'}) {
1743                     $self->{'feedback'}->scribe_notif_log_info(
1744                         message => "Slot $params{'slot'} with label $params{'label'} is not labelable ");
1745                 } else {
1746                     $self->{'feedback'}->scribe_notif_log_info(
1747                         message => "Slot $params{'slot'} without label is not labelable ");
1748                 }
1749             } elsif ($params{'empty'}) {
1750                 $self->{'feedback'}->scribe_notif_log_info(
1751                     message => "Slot $params{'slot'} is empty, autolabel disabled");
1752             } elsif ($params{'non_amanda'}) {
1753                 $self->{'feedback'}->scribe_notif_log_info(
1754                     message => "Slot $params{'slot'} is a non-amanda volume, autolabel disabled");
1755             } elsif ($params{'volume_error'}) {
1756                 $self->{'feedback'}->scribe_notif_log_info(
1757                     message => "Slot $params{'slot'} is a volume in error: $params{'err'}, autolabel disabled");
1758             } elsif ($params{'not_success'}) {
1759                 $self->{'feedback'}->scribe_notif_log_info(
1760                     message => "Slot $params{'slot'} is a device in error: $params{'err'}, autolabel disabled");
1761             } elsif ($params{'err'}) {
1762                 $self->{'feedback'}->scribe_notif_log_info(
1763                     message => "$params{'err'}");
1764             } elsif ($params{'not_labelable'}) {
1765                 $self->{'feedback'}->scribe_notif_log_info(
1766                     message => "Slot $params{'slot'} without label can't be labeled");
1767             } elsif (!defined $params{'label'}) {
1768                 $self->{'feedback'}->scribe_notif_log_info(
1769                     message => "Slot $params{'slot'} without label can be labeled");
1770             } else {
1771                 $self->{'feedback'}->scribe_notif_log_info(
1772                     message => "Slot $params{'slot'} with label $params{'label'} is usable");
1773             }
1774         }
1775     };
1776
1777     $self->{'taperscan'}->scan(
1778       user_msg_fn => $_user_msg_fn,
1779       result_cb => sub {
1780         my ($error, $reservation, $volume_label, $access_mode, $is_new) = @_;
1781
1782         $self->{'scan_running'} = 0;
1783         $self->{'scan_finished'} = 1;
1784
1785         $self->{'scan_error'} = $error;
1786         $self->{'reservation'} = $reservation;
1787         $self->{'device'} = $reservation->{'device'} if $reservation;
1788         $self->{'volume_label'} = $volume_label;
1789         $self->{'access_mode'} = $access_mode;
1790         $self->{'is_new'} = $is_new;
1791
1792         $self->_maybe_callback();
1793     });
1794 }
1795
1796 sub _start_request {
1797     my $self = shift;
1798
1799     return if $self->{'request_pending'} or $self->{'request_complete'};
1800
1801     $self->{'request_pending'} = 1;
1802
1803     $self->{'feedback'}->request_volume_permission(
1804     perm_cb => sub {
1805         my %params = @_;
1806
1807         $self->{'request_pending'} = 0;
1808         $self->{'request_complete'} = 1;
1809         if (defined $params{'scribe'}) {
1810             $self->{'new_scribe'} = $params{'scribe'};
1811             $self->{'scan_finished'} = 1;
1812             $self->{'request_complete'} = 1;
1813         } elsif (defined $params{'cause'}) {
1814             $self->{'request_denied'} = 1;
1815             if ($params{'cause'} eq 'config') {
1816                 $self->{'config_denial_message'} = $params{'message'};
1817             } elsif ($params{'cause'} eq 'error') {
1818                 $self->{'error_denial_message'} = $params{'message'};
1819             } else {
1820                 die "bad cause '" . $params{'cause'} . "'";
1821             }
1822         } elsif (!defined $params{'allow'}) {
1823             die "no allow or cause defined";
1824         }
1825
1826         $self->_maybe_callback();
1827     });
1828 }
1829
1830 sub _maybe_callback {
1831     my $self = shift;
1832
1833     # if we have any kind of error, release the reservation and come back
1834     # later
1835     if (($self->{'scan_error'} or $self->{'request_denied'}) and $self->{'reservation'}) {
1836         $self->{'device'} = undef;
1837
1838         $self->{'reservation'}->release(finished_cb => sub {
1839             my ($error) = @_;
1840
1841             # so many errors, so little time..
1842             if ($error) {
1843                 if ($self->{'scan_error'}) {
1844                     warning("ignoring error releasing reservation ($error) after a scan error");
1845                 } else {
1846                     $self->{'scan_error'} = $error;
1847                 }
1848             }
1849
1850             $self->{'reservation'} = undef;
1851             $self->_maybe_callback();
1852         });
1853
1854         return;
1855     }
1856
1857     # if we are just starting up, call the finished_cb given to start()
1858     if (defined $self->{'start_finished_cb'} and $self->{'scan_finished'}) {
1859         my $cb = $self->{'start_finished_cb'};
1860         $self->{'start_finished_cb'} = undef;
1861
1862         $cb->($self->{'scan_error'});
1863     }
1864
1865     # if the volume_cb is good to get called, call it and reset to the ground state
1866     if ($self->{'volume_cb'} and (!$self->{'scan_running'} or $self->{'scan_finished'}) and $self->{'request_complete'}) {
1867         # get the cb and its arguments lined up before calling it..
1868         my $volume_cb = $self->{'volume_cb'};
1869         my @volume_cb_args = (
1870             $self->{'scan_error'},
1871             $self->{'config_denial_message'},
1872             $self->{'error_denial_message'},
1873             $self->{'reservation'},
1874             $self->{'volume_label'},
1875             $self->{'access_mode'},
1876             $self->{'is_new'},
1877             $self->{'new_scribe'},
1878         );
1879
1880         # reset everything and prepare for a new scan
1881         $self->{'scan_finished'} = 0;
1882
1883         $self->{'reservation'} = undef;
1884         $self->{'device'} = undef;
1885         $self->{'volume_label'} = undef;
1886
1887         $self->{'request_complete'} = 0;
1888         $self->{'request_denied'} = 0;
1889         $self->{'config_denial_message'} = undef;
1890         $self->{'error_denial_message'} = undef;
1891         $self->{'volume_cb'} = undef;
1892         $self->{'new_scribe'} = undef;
1893
1894         $volume_cb->(@volume_cb_args);
1895     }
1896 }
1897
1898 1;