Imported Upstream version 3.2.0
[debian/amanda] / perl / Amanda / Taper / Scribe.pm
1 # Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This library is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU Lesser General Public License version 2.1 as
5 # published by the Free Software Foundation.
6 #
7 # This library is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
10 # License for more details.
11 #
12 # You should have received a copy of the GNU Lesser General Public License
13 # along with this library; if not, write to the Free Software Foundation,
14 # Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
15 #
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 =head1 NAME
20
21 Amanda::Taper::Scribe
22
23 =head1 SYNOPSIS
24
25   step start_scribe => sub {
26       my $scribe = Amanda::Taper::Scribe->new(
27             taperscan => $taperscan_algo,
28             feedback => $feedback_obj);
29     $scribe->start(
30         write_timestamp => $write_timestamp,
31         finished_cb => $steps->{'start_xfer'});
32   };
33
34   step start_xfer => sub {
35     my ($err) = @_;
36     my $xfer_dest = $scribe->get_xfer_dest(
37         max_memory => 64 * 1024,
38         can_cache_inform => 0,
39         part_size => 150 * 1024**2,
40         part_cache_type => 'disk',
41         part_cache_dir => "$tmpdir/splitbuffer",
42         part_cache_max_size => 20 * 1024**2);
43     # .. set up the rest of the transfer ..
44     $xfer->start(sub {
45         my ($src, $msg, $xfer) = @_;
46         $scribe->handle_xmsg($src, $msg, $xfer);
47         # .. any other processing ..
48     };
49     # tell the scribe to start dumping via this transfer
50     $scribe->start_dump(
51         xfer => $xfer,
52         dump_header => $hdr,
53         dump_cb => $steps->{'dump_cb'});
54   };
55
56   step dump_cb => sub {
57       my %params = @_;
58       # .. handle dump results ..
59       print "DONE\n";
60       $finished_cb->();
61   };
62
63
64 =head1 OVERVIEW
65
66 This package provides a high-level abstraction of Amanda's procedure for
67 writing dumpfiles to tape.
68
69 Amanda writes a sequence of dumpfiles to a sequence of volumes.  The
70 volumes are supplied by a taperscan algorithm, which operates a changer
71 to find and load each volume.  As dumpfiles are written to volumes and
72 those volumes fill up, the taperscan algorithm supplies additional
73 volumes.
74
75 In order to reduce internal fragmentation within volumes, Amanda can "split"
76 dumpfiles into smaller pieces, so that the overall dumpfile can span multiple
77 volumes.  Each "part" is written to the volume in sequence.  If a device
78 encounters an error while writing a part, then that part is considered
79 "partial", and is rewritten from its beginning on the next volume.  Some
80 devices can reliably indicate that they are full (EOM), and for these devices
81 parts are simply truncated, and the Scribe starts the next part on the next
82 volume.
83
84 To facilitate rewriting parts on devices which cannot indicate EOM, Amanda must
85 retain all of the data in a part, even after that data is written to the
86 volume.  The Scribe provides several methods to support this: caching the part
87 in memory, caching the part in a special on-disk file, or relying on
88 pre-existing on-disk storage.  The latter method is used when reading from
89 holding disks.
90
91 The details of efficiently splitting dumpfiles and rewriting parts are handled
92 by the low-level C<Amanda::Xfer::Dest::Taper> subclasses.  The Scribe creates
93 an instance of the appropriate subclass and supplies it with volumes from an
94 C<Amanda::Taper::Scan> object.  It calls a number of
95 C<Amanda::Taper::Scribe::Feedback> methods to indicate the status of the dump
96 process and to request permission for each additional volume.
97
98 =head1 OPERATING A SCRIBE
99
100 The C<Amanda::Taper::Scribe> constructor takes two arguments:
101 C<taperscan> and C<feedback>.  The first specifies the taper scan
102 algorithm that the Scribe should use, and the second specifies the
103 C<Feedback> object that will receive notifications from the Scribe (see
104 below).
105
106   my $scribe = Amanda::Taper::Scribe->new(
107         taperscan => $my_taperscan,
108         feedback => $my_feedback);
109
110 Once the object is in place, call its C<start> method.
111
112 =head2 START THE SCRIBE
113
114 Start the scribe's operation by calling its C<start> method.  This will invoke
115 the taperscan algorithm and scan for a volume.  The method takes two parameters:
116
117   $scribe->start(
118         write_timestamp => $ts,
119         finished_cb => $start_finished_cb);
120
121 The timestamp will be written to each volume written by the Scribe.  The
122 C<finished_cb> will be called with a single argument - C<undef> or an error
123 message - when the Scribe is ready to start its first dump.  The Scribe is
124 "ready" when it has found a device to which it can write, although it does not
125 request permission to overwrite that volume, nor start overwriting it, until
126 the first dump begins (that is, until the first call to C<start_dump>).
127
128 =head2 SET UP A TRANSFER
129
130 Once the Scribe is started, begin transferring a dumpfile.  This is a
131 three-step process: first, get an C<Amanda::Xfer::Dest::Taper> object from the
132 Scribe, then start the transfer, and finally let the Scribe know that the
133 transfer has started.  Note that the Scribe supplies and manages the transfer
134 destination, but the transfer itself remains the responsibility of the caller.
135
136 =head3 Get device
137
138 Call C<get_device> to get the first device the xfer will be working with.
139
140   $device = $scribe->get_device();
141
142 This method must be called after C<start> has completed.
143
144 =head3 Check device compatibily for the data path
145
146 Call C<check_data_path>, supplying the data_path requested by the user.
147
148   if (my $err = $scribe->check_data_path($data_path)) {
149       # handle error message
150   }
151
152 This method must be called after C<start> has completed and before
153 C<get_xfer_dest> is called. It returns C<undef> on success or an error message
154 if the supplied C<data_path> is incompatible with the device.  This is mainly
155 used to detect when a DirectTCP dump is going to a non-DirectTCP device.
156
157 =head3 Get a Transfer Destination
158
159 Call C<get_xfer_dest> to get the transfer element, supplying information on how
160 the dump should be split:
161
162   $xdest = $scribe->get_xfer_dest(
163         max_memory => $max_memory,
164         # .. splitting parameters
165         );
166
167 This method must be called after C<start> has completed, and will always return
168 a transfer element immediately.  The underlying C<Amanda::Xfer::Dest::Taper>
169 handles device streaming properly.  It uses C<max_memory> bytes of memory for
170 this purpose.
171
172 The splitting parameters to C<get_xfer_dest> are:
173
174 =over 4
175
176 =item C<part_size>
177
178 the split part size to use, or 0 for no splitting
179
180 =item C<part_cache_type>
181
182 when caching, the kind of caching to perform ('disk', 'memory' or the default,
183 'none')
184
185 =item C<part_cache_dir>
186
187 the directory to use for disk caching
188
189 =item C<part_cache_max_size>
190
191 the maximum part size to use when caching
192
193 =item C<can_cache_inform>
194
195 true if the transfer source can call the destination's C<cache_inform> method
196 (e.g., C<Amanda::Xfer::Source::Holding>).
197
198 =back
199
200 The first four of these parameters correspond exactly to the eponymous tapetype
201 configuration parameters, and have the same default values (when omitted or
202 C<undef>).  The method will take this information, along with details of the
203 device it intends to use, and set up the transfer destination.
204
205 The utility function C<get_splitting_args_from_config> can determine the
206 appropriate C<get_xfer_dest> splitting parameters based on a
207 few Amanda configuration parameters.  If a parameter was not seen in the
208 configuration, it should be omitted or passed as C<undef>.  The function
209 returns a hash to pass to C<get_xfer_dest>, although that hash may have an
210 C<warning> key containing a message if there is a problem that the user
211 should know about.
212
213   use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
214   my %splitting_args = get_splitting_args_from_config(
215     # Amanda dumptype configuration parameters,
216     dle_tape_splitsize => ..,
217     dle_split_diskbuffer => ..,
218     dle_fallback_splitsize => ..,
219     dle_allow_split => ..,
220     # Amanda tapetype configuration parameters,
221     part_size => .., ## in bytes, not kb!!
222     part_size_kb => ..., ## or use this, in kb
223     part_cache_type => ..,
224     part_cache_type_enum => ..., ## one of the enums from tapetype_getconf
225     part_cache_dir => ..,
226     part_cache_max_size => ..,
227   );
228   if ($splitting_args{'error'}) { .. }
229
230 An C<Amanda::Taper::Scribe> object can only run one transfer at a time, so
231 do not call C<get_xfer_dest> until the C<dump_cb> for the previous C<start_dump>
232 has been called.
233
234 =head3 Start the Transfer
235
236 Armed with the element returned by C<get_xfer_dest>, the caller should create a
237 source element and a transfer object and start the transfer.  In order to
238 manage the splitting process, the Scribe needs to be informed, via its
239 C<handle_xmsg> method, of all transfer messages .  This is usually accomplished
240 with something like:
241
242   $xfer->start(sub {
243       my ($src, $msg, $xfer) = @_;
244       $scribe->handle_xmsg($src, $msg, $xfer);
245   });
246
247 =head3 Inform the Scribe
248
249 Once the transfer has started, the Scribe is ready to begin writing parts to
250 the volume.  This is the first moment at which the Scribe needs a header, too.
251 All of this is supplied to the C<start_dump> method:
252
253   $scribe->start_dump(
254       xfer => $xfer,
255       dump_header => $hdr,
256       dump_cb => $dump_cb);
257
258 The c<dump_header> here is the header that will be applied to all parts of the
259 dumpfile.  The only field in the header that the Scribe controls is the part
260 number.  The C<dump_cb> callback passed to C<start_dump> is called when the
261 dump is completely finished - either successfully or with a fatal error.
262 Unlike most callbacks, this one takes keyword arguments, since it has so many
263 parameters.
264
265   $dump_cb->(
266         result => $result,
267         device_errors => $device_errors,
268         config_denial_message => $cdm,
269         size => $size,
270         duration => $duration,
271         total_duration => $total_duration,
272         nparts => $nparts);
273
274 All parameters will be present on every call, although the order is not
275 guaranteed.
276
277 The C<result> is one of C<"FAILED">, C<"PARTIAL">, or C<"DONE">.  Even when
278 C<dump_cb> reports a fatal error, C<result> may be C<"PARTIAL"> if some data
279 was written successfully.
280
281 The C<device_error> key points to a list of errors, each given as a string,
282 that describe what went wrong to cause the dump to fail.  The
283 C<config_denial_message> parrots the reason provided by C<$perm_cb> (see below)
284 for denying use of a new tape if the cause was 'config', and is C<undef>
285 otherwise.
286
287 The final parameters, C<size> (in bytes), C<duration>, C<total_duration> (in
288 seconds), and C<nparts> describe the total transfer, and are a sum of all of
289 the parts written to the device.  Note that C<nparts> does not include any
290 empty trailing parts.  Note that C<duration> does not include time spent
291 operating the changer, while C<total_duration> reflects the time from the
292 C<start_dump> call to the invocation of the C<dump_cb>.
293
294 =head3 Cancelling a Dump
295
296 After you have requested a transfer destination, the scribe is poised to begin the
297 transfer.  If you cannot actually perform the transfer for some reason, you'll need
298 to go through the motions all the same, but cancel the operation immediately.  That
299 can be done by calling C<cancel_dump>:
300
301   $scribe->cancel_dump(
302         xfer => $xfer,
303         dump_cb => $dump_cb);
304
305 =head2 QUIT
306
307 When all of the dumpfiles are transferred, call the C<quit> method to
308 release any resources and clean up.  This method takes a typical
309 C<finished_cb>.
310
311   $scribe->quit(finished_cb => sub {
312     print "ALL DONE!\n";
313   });
314
315 =head2 GET_BYTES_WRITTEN
316
317 The C<get_bytes_written> returns the number of bytes written to the device at
318 the time of the call, and is meant to be used for status reporting.  This value
319 is updated at least as each part is finished; for some modes of operation, it
320 is updated continuously.  Notably, DirectTCP transfers do not update
321 continuously.
322
323 =head2 START_SCAN
324
325 The C<start_scan> method initiate a scan of the changer to find a usable tape.
326
327 =head1 FEEDBACK
328
329 The C<Amanda::Taper::Scribe::Feedback> class is intended to be
330 subclassed by the user.  It provides a number of notification methods
331 that enable the historical logging and driver/taper interactions
332 required by Amanda.  The parent class does nothing of interest, but
333 allows subclasses to omit methods they do not need.
334
335 The C<request_volume_permission> method provides a means for the caller
336 to limit the number of volumes the Scribe consumes.  It is called as
337
338   $fb->request_volume_permission(perm_cb => $cb);
339
340 The C<perm_cb> is a callback which expects a hash as arguments. If C<allow>
341 is set, then the scribe is allowed to use a new volume, if C<scribe> is set,
342 then the xfer must be transfered to that scribe, otherwise a C<cause>
343 and a C<message> describing why a new volume should not be used. must be
344 set. e.g.
345
346   perm_cb->(allow => 1);
347   perm_cb->(scribe => $new_scribe);
348   perm_cb->(cause => 'config', message => $message);
349   perm_cb->(cause => 'error', message => $message);
350
351 A cause of 'config' indicates that the denial is due to the user's
352 configuration, and thus should not be presented as an error.  The default
353 implementation always calls C<< perm_cb->() >>.
354
355 All of the remaining methods are notifications, and do not take a
356 callback.
357
358   $fb->scribe_notif_new_tape(
359         error => $error,
360         volume_label => $volume_label);
361
362 The Scribe calls C<scribe_notif_new_tape> when a new volume is started.  If the
363 C<volume_label> is undefined, then the volume was not successfully
364 relabled, and its previous contents may still be available.  If C<error>
365 is defined, then no useful data was written to the volume.  Note that
366 C<error> and C<volume_label> may I<both> be defined if the previous
367 contents of the volume were erased, but no useful, new data was written
368 to the volume.
369
370 This method will be called exactly once for every call to
371 C<request_volume_permission> that calls back with C<< perm_cb->() >>.
372
373   $fb->scribe_notif_tape_done(
374         volume_label => $volume_label,
375         size => $size,
376         num_files => $num_files);
377
378 The C<scribe_notif_tape_done> method is called after a volume is completely
379 written and its reservation has been released.  Note that the scribe waits
380 until the last possible moment to release a reservation, so this may be called
381 later than expected, e.g., during a C<quit> invocation.
382
383   $fb->scribe_notif_part_done(
384         partnum => $partnum,
385         fileno => $fileno,
386         successful => $successful,
387         size => $size,
388         duration => $duration);
389
390 The Scribe calls C<scribe_notif_part_done> for each part written to the volume,
391 including partial parts.  If the part was not written successfully, then
392 C<successful> is false.  The C<size> is in bytes, and the C<duration> is
393 a floating-point number of seconds.  If a part fails before a new device
394 file is created, then C<fileno> may be zero.
395
396 Finally, the Scribe sends a few historically significant trace log messages
397 via C<scribe_notif_log_info>:
398
399   $fb->scribe_notif_log_info(
400         message => $message);
401
402 A typical Feedback subclass might begin like this:
403
404   package main::Feedback;
405   use base 'Amanda::Taper::Scribe::Feedback';
406
407   sub request_volume_permission {
408     my $self = shift;
409     my %params = @_;
410
411     $params{'perm_cb'}->(cause => "error", message => "NO VOLUMES FOR YOU!");
412   }
413
414 =cut
415
416 package Amanda::Taper::Scribe;
417
418 use strict;
419 use warnings;
420 use Carp;
421
422 use Amanda::Xfer qw( :constants );
423 use Amanda::Device qw( :constants );
424 use Amanda::Header;
425 use Amanda::Debug qw( :logging );
426 use Amanda::MainLoop;
427 use Amanda::Tapelist;
428 use Amanda::Config qw( :getconf config_dir_relative );
429 use base qw( Exporter );
430
431 our @EXPORT_OK = qw( get_splitting_args_from_config );
432
433 sub new {
434     my $class = shift;
435     my %params = @_;
436
437     my $decide_debug = $Amanda::Config::debug_taper || $params{'debug'};
438     for my $rq_param qw(taperscan feedback) {
439         croak "required parameter '$rq_param' mising"
440             unless exists $params{$rq_param};
441     }
442
443     my $self = {
444         taperscan => $params{'taperscan'},
445         feedback => $params{'feedback'},
446         debug => $decide_debug,
447         write_timestamp => undef,
448         started => 0,
449
450         # device handling, and our current device and reservation
451         devhandling => Amanda::Taper::Scribe::DevHandling->new(
452             taperscan => $params{'taperscan'},
453             feedback => $params{'feedback'},
454         ),
455         reservation => undef,
456         device => undef,
457         device_size => undef,
458         device_at_eom => undef, # device still exists, but is full
459
460         # callback passed to start_dump
461         dump_cb => undef,
462
463         # information for the current dumpfile
464         dump_header => undef,
465         retry_part_on_peom => undef,
466         xfer => undef,
467         xdt => undef,
468         xdt_ready => undef,
469         start_part_on_xdt_ready => 0,
470         size => 0,
471         duration => 0.0,
472         dump_start_time => undef,
473         last_part_successful => 0,
474         started_writing => 0,
475         device_errors => [],
476         config_denial_message => undef,
477     };
478
479     return bless ($self, $class);
480 }
481
482 sub start {
483     my $self = shift;
484     my %params = @_;
485
486     for my $rq_param qw(write_timestamp finished_cb) {
487         croak "required parameter '$rq_param' missing"
488             unless exists $params{$rq_param};
489     }
490
491     die "scribe already started" if $self->{'started'};
492
493     $self->dbg("starting");
494     $self->{'write_timestamp'} = $params{'write_timestamp'};
495
496     # start up the DevHandling object, making sure we know
497     # when it's done with its startup process
498     $self->{'devhandling'}->start(finished_cb => sub {
499         $self->{'started'} = 1;
500         $params{'finished_cb'}->(@_);
501     });
502 }
503
504 sub quit {
505     my $self = shift;
506     my %params = @_;
507
508     # since there's little other option than to barrel on through the
509     # quitting procedure, quit() just accumulates its error messages
510     # and, if necessary, concantenates them for the finished_cb.
511     my @errors;
512
513     my $steps = define_steps
514         cb_ref => \$params{'finished_cb'};
515
516     step setup => sub {
517         $self->dbg("quitting");
518
519         if ($self->{'xfer'}) {
520             die "Scribe cannot quit while a transfer is active";
521             # Supporting this would be complicated:
522             # - cancel the xfer and wait for it to complete
523             # - ensure that the taperscan not be started afterward
524             # and isn't required for normal Amanda operation.
525         }
526
527         $steps->{'release'}->();
528     };
529
530     step release => sub {
531         if ($self->{'reservation'}) {
532             $self->_release_reservation(finished_cb => $steps->{'released'});
533         } else {
534             $steps->{'stop_devhandling'}->();
535         }
536     };
537
538     step released => sub {
539         my ($err) = @_;
540         push @errors, "$err" if $err;
541
542         $self->{'reservation'} = undef;
543
544         $steps->{'stop_devhandling'}->();
545     };
546
547     step stop_devhandling => sub {
548         $self->{'devhandling'}->quit(finished_cb => $steps->{'stopped_devhandling'});
549     };
550
551     step stopped_devhandling => sub {
552         my ($err) = @_;
553         push @errors, "$err" if $err;
554
555         my $errmsg = join("; ", @errors) if @errors >= 1;
556         $params{'finished_cb'}->($errmsg);
557     };
558 }
559
560 sub get_device {
561     my $self = shift;
562
563     # Can return a device we already have, or "peek" at the
564     # DevHandling object's device.
565     # It might not have right permission on the device.
566
567     my $device;
568     if (defined $self->{'device'}) {
569         $device = $self->{'device'};
570     } else {
571         $device = $self->{'devhandling'}->peek_device();
572     }
573     return $device;
574 }
575
576 sub check_data_path {
577     my $self = shift;
578     my $data_path = shift;
579
580     my $device = $self->get_device();
581
582     if (!defined $device) {
583         die "no device is available to check the datapath";
584     }
585
586     my $use_directtcp = $device->directtcp_supported();
587
588     my $xdt;
589     if (!$use_directtcp) {
590         if ($data_path eq 'DIRECTTCP') {
591             return "Can't dump DIRECTTCP data-path dle to a device ('" .
592                    $device->device_name .
593                    "') that doesn't support it";
594         }
595     }
596     return undef;
597 }
598
599 sub start_scan {
600     my $self = shift;
601
602     $self->{'devhandling'}->start_scan();
603 }
604
605 # Get a transfer destination; does not use a callback
606 sub get_xfer_dest {
607     my $self = shift;
608     my %params = @_;
609
610     for my $rq_param qw(max_memory) {
611         croak "required parameter '$rq_param' missing"
612             unless exists $params{$rq_param};
613     }
614
615     die "not yet started"
616         unless $self->{'write_timestamp'} and $self->{'started'};
617     die "xfer element already returned"
618         if ($self->{'xdt'});
619     die "xfer already running"
620         if ($self->{'xfer'});
621
622     $self->{'xfer'} = undef;
623     $self->{'xdt'} = undef;
624     $self->{'size'} = 0;
625     $self->{'duration'} = 0.0;
626     $self->{'nparts'} = undef;
627     $self->{'dump_start_time'} = undef;
628     $self->{'last_part_successful'} = 1;
629     $self->{'started_writing'} = 0;
630     $self->{'device_errors'} = [];
631     $self->{'config_denial_message'} = undef;
632
633     # set the callback
634     $self->{'dump_cb'} = undef;
635     $self->{'retry_part_on_peom'} = 1;
636     $self->{'start_part_on_xdt_ready'} = 0;
637
638     # start getting parameters together to determine what kind of splitting
639     # and caching we're going to do
640     my $part_size = $params{'part_size'} || 0;
641     my ($use_mem_cache, $disk_cache_dirname) = (0, undef);
642     my $can_cache_inform = $params{'can_cache_inform'};
643     my $part_cache_type = $params{'part_cache_type'} || 'none';
644
645     my $xdt_first_dev = $self->get_device();
646     if (!defined $xdt_first_dev) {
647         die "no device is available to create an xfer_dest";
648     }
649     my $leom_supported = $xdt_first_dev->property_get("leom");
650     my $use_directtcp = $xdt_first_dev->directtcp_supported();
651
652     # figure out the destination type we'll use, based on the circumstances
653     my ($dest_type, $dest_text);
654     if ($use_directtcp) {
655         $dest_type = 'directtcp';
656         $dest_text = "using DirectTCP";
657     } elsif ($can_cache_inform && $leom_supported) {
658         $dest_type = 'splitter';
659         $dest_text = "using LEOM (falling back to holding disk as cache)";
660     } elsif ($leom_supported) {
661         $dest_type = 'splitter';
662         $dest_text = "using LEOM detection (no caching)";
663     } elsif ($can_cache_inform) {
664         $dest_type = 'splitter';
665         $dest_text = "using cache_inform";
666     } elsif ($part_cache_type ne 'none') {
667         $dest_type = 'cacher';
668
669         # we'll be caching, so apply the maximum size
670         my $part_cache_max_size = $params{'part_cache_max_size'} || 0;
671         $part_size = $part_cache_max_size
672             if ($part_cache_max_size and $part_cache_max_size < $part_size);
673
674         # and figure out what kind of caching to apply
675         if ($part_cache_type eq 'memory') {
676             $use_mem_cache = 1;
677         } else {
678             # note that we assume this has already been checked; if it's wrong,
679             # the xfer element will just fail immediately
680             $disk_cache_dirname = $params{'part_cache_dir'};
681         }
682         $dest_text = "using cache type '$part_cache_type'";
683     } else {
684         $dest_type = 'splitter';
685         $dest_text = "using no cache (PEOM will be fatal)";
686
687         # no directtcp, no caching, no cache_inform, and no LEOM, so a PEOM will be fatal
688         $self->{'retry_part_on_peom'} = 0;
689     }
690
691     debug("Amanda::Taper::Scribe preparing to write, part size $part_size, "
692         . "$dest_text ($dest_type) "
693         . ($leom_supported? " (LEOM supported)" : " (no LEOM)"));
694
695     # set the device to verbose logging if we're in debug mode
696     if ($self->{'debug'}) {
697         $xdt_first_dev->property_set("verbose", 1);
698     }
699
700     my $xdt;
701     if ($dest_type eq 'directtcp') {
702         $xdt = Amanda::Xfer::Dest::Taper::DirectTCP->new(
703             $xdt_first_dev, $part_size);
704         $self->{'xdt_ready'} = 0; # xdt isn't ready until we get XMSG_READY
705     } elsif ($dest_type eq 'splitter') {
706         $xdt = Amanda::Xfer::Dest::Taper::Splitter->new(
707             $xdt_first_dev, $params{'max_memory'}, $part_size, $can_cache_inform);
708         $self->{'xdt_ready'} = 1; # xdt is ready immediately
709     } else {
710         $xdt = Amanda::Xfer::Dest::Taper::Cacher->new(
711             $xdt_first_dev, $params{'max_memory'}, $part_size,
712             $use_mem_cache, $disk_cache_dirname);
713         $self->{'xdt_ready'} = 1; # xdt is ready immediately
714     }
715     $self->{'start_part_on_xdt_ready'} = 0;
716     $self->{'xdt'} = $xdt;
717
718     return $xdt;
719 }
720
721 sub start_dump {
722     my $self = shift;
723     my %params = @_;
724
725     die "no xfer dest set up; call get_xfer_dest first"
726         unless defined $self->{'xdt'};
727
728     # get the header ready for writing (totalparts was set by the caller)
729     $self->{'dump_header'} = $params{'dump_header'};
730     $self->{'dump_header'}->{'partnum'} = 1;
731
732     # set up the dump_cb for when this dump is done, and keep the xfer
733     $self->{'dump_cb'} = $params{'dump_cb'};
734     $self->{'xfer'} = $params{'xfer'};
735     $self->{'dump_start_time'} = time;
736
737     # and start the part
738     $self->_start_part();
739 }
740
741 sub cancel_dump {
742     my $self = shift;
743     my %params = @_;
744
745     die "no xfer dest set up; call get_xfer_dest first"
746         unless defined $self->{'xdt'};
747
748     # set up the dump_cb for when this dump is done, and keep the xfer
749     $self->{'dump_cb'} = $params{'dump_cb'};
750     $self->{'xfer'} = $params{'xfer'};
751
752     # XXX The cancel should call dump_cb, but right now the xfer stays hung in
753     # accept.  So we leave the xfer to its hang, and dump_cb is called and xdt
754     # and xfer are set to undef.  This should be fixed in 3.2.
755
756     $self->{'xfer'}->cancel();
757
758     $self->{'dump_cb'}->(
759         result => "FAILED",
760         device_errors => [],
761         config_denial_message => undef,
762         size => 0,
763         duration => 0.0,
764         total_duration => 0,
765         nparts => 0);
766     $self->{'xdt'} = undef;
767     $self->{'xfer'} = undef;
768 }
769
770 sub get_bytes_written {
771     my ($self) = @_;
772
773     if (defined $self->{'xdt'}) {
774         return $self->{'size'} + $self->{'xdt'}->get_part_bytes_written();
775     } else {
776         return $self->{'size'};
777     }
778 }
779
780 sub _start_part {
781     my $self = shift;
782
783     $self->dbg("trying to start part");
784
785     # if the xdt isn't ready yet, wait until it is; note that the XDT is still
786     # using the device right now, so we can't even label it yet.
787     if (!$self->{'xdt_ready'}) {
788         $self->dbg("XDT not ready yet; waiting until it is");
789         $self->{'start_part_on_xdt_ready'} = 1;
790         return
791     }
792
793     # we need an actual, permitted device at this point, so if we don't have
794     # one, then defer this start_part call until we do.  The device may still
795     # exist, but be at EOM, if the last dump failed at EOM and was not retried
796     # on a new volume.
797     if (!$self->{'device'} or $self->{'device_at_eom'}) {
798         # _get_new_volume calls _start_part when it has a new volume in hand
799         return $self->_get_new_volume();
800     }
801
802     # if the dump wasn't successful, and we're not splitting, then bail out.  It's
803     # up to higher-level components to re-try this dump on a new volume, if desired.
804     # Note that this should be caught in the XMSG_PART_DONE handler -- this is just
805     # here for backup.
806     if (!$self->{'last_part_successful'} and !$self->{'retry_part_on_peom'}) {
807         $self->_operation_failed(device_error => "No space left on device (uncaught)");
808         return;
809     }
810
811     # and start writing this part
812     $self->{'started_writing'} = 1;
813     $self->dbg("resuming transfer");
814     $self->{'xdt'}->start_part(!$self->{'last_part_successful'},
815                                $self->{'dump_header'});
816 }
817
818 sub handle_xmsg {
819     my $self = shift;
820     my ($src, $msg, $xfer) = @_;
821
822     if ($msg->{'type'} == $XMSG_DONE) {
823         $self->_xmsg_done($src, $msg, $xfer);
824         return;
825     }
826
827     # for anything else we only pay attention to messages from
828     # our own element
829     if ($msg->{'elt'} == $self->{'xdt'}) {
830         $self->dbg("got msg from xfer dest: $msg");
831         if ($msg->{'type'} == $XMSG_PART_DONE) {
832             $self->_xmsg_part_done($src, $msg, $xfer);
833         } elsif ($msg->{'type'} == $XMSG_READY) {
834             $self->_xmsg_ready($src, $msg, $xfer);
835         } elsif ($msg->{'type'} == $XMSG_ERROR) {
836             $self->_xmsg_error($src, $msg, $xfer);
837         }
838     }
839 }
840
841 sub _xmsg_part_done {
842     my $self = shift;
843     my ($src, $msg, $xfer) = @_;
844
845     # this handles successful zero-byte parts as a special case - they
846     # are an implementation detail of the splitting done by the transfer
847     # destination.
848
849     if ($msg->{'successful'} and $msg->{'size'} == 0) {
850         $self->dbg("not notifying for empty, successful part");
851     } else {
852         # double-check partnum
853         die "Part numbers do not match!"
854             unless ($self->{'dump_header'}->{'partnum'} == $msg->{'partnum'});
855
856         # notify
857         $self->{'feedback'}->scribe_notif_part_done(
858             partnum => $msg->{'partnum'},
859             fileno => $msg->{'fileno'},
860             successful => $msg->{'successful'},
861             size => $msg->{'size'},
862             duration => $msg->{'duration'});
863
864         # increment nparts here, so empty parts are not counted
865         $self->{'nparts'} = $msg->{'partnum'};
866     }
867
868     $self->{'last_part_successful'} = $msg->{'successful'};
869
870     if ($msg->{'successful'}) {
871         $self->{'device_size'} += $msg->{'size'};
872         $self->{'size'} += $msg->{'size'};
873         $self->{'duration'} += $msg->{'duration'};
874     }
875
876     if (!$msg->{'eof'}) {
877         # update the header for the next dumpfile, if this was a non-empty part
878         if ($msg->{'successful'} and $msg->{'size'} != 0) {
879             $self->{'dump_header'}->{'partnum'}++;
880         }
881
882         if ($msg->{'eom'}) {
883             # if there's an error finishing the device, it's probably just carryover
884             # from the error the Xfer::Dest::Taper encountered while writing to the
885             # device, so we ignore it.
886             if (!$self->{'device'}->finish()) {
887                 my $devname = $self->{'device'}->device_name;
888                 my $errmsg = $self->{'device'}->error_or_status();
889                 $self->dbg("ignoring error while finishing device '$devname': $errmsg");
890             }
891
892             # if the part failed..
893             if (!$msg->{'successful'}) {
894                 # if no caching was going on, then the dump has failed
895                 if (!$self->{'retry_part_on_peom'}) {
896                     # mark this device as at EOM, since we are not going to look
897                     # for another one yet
898                     $self->{'device_at_eom'} = 1;
899
900                     my $msg = "No space left on device";
901                     if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
902                         $msg = $self->{'device'}->error_or_status();
903                     }
904                     $self->_operation_failed(device_error => $msg);
905                     return;
906                 }
907
908                 # log a message for amreport
909                 $self->{'feedback'}->scribe_notif_log_info(
910                     message => "Will request retry of failed split part.");
911             }
912
913             # get a new volume, then go on to the next part
914             $self->_get_new_volume();
915         } else {
916             # if the part was unsuccessful, but the xfer dest has reason to believe
917             # this is not due to EOM, then the dump is done
918             if (!$msg->{'successful'}) {
919                 my $msg = "unknown error while dumping";
920                 if ($self->{'device'}->status() != $DEVICE_STATUS_SUCCESS) {
921                     $msg = $self->{'device'}->error_or_status();
922                 }
923                 $self->_operation_failed(device_error => $msg);
924                 return;
925             }
926
927             # no EOM -- go on to the next part
928             $self->_start_part();
929         }
930     }
931 }
932
933 sub _xmsg_ready {
934     my $self = shift;
935     my ($src, $msg, $xfer) = @_;
936
937     $self->dbg("XDT is ready");
938     $self->{'xdt_ready'} = 1;
939     if ($self->{'start_part_on_xdt_ready'}) {
940         $self->{'start_part_on_xdt_ready'} = 0;
941         $self->_start_part();
942     }
943 }
944
945 sub _xmsg_error {
946     my $self = shift;
947     my ($src, $msg, $xfer) = @_;
948
949     # XMSG_ERROR from the XDT is always fatal
950     $self->_operation_failed(device_error => $msg->{'message'});
951 }
952
953 sub _xmsg_done {
954     my $self = shift;
955     my ($src, $msg, $xfer) = @_;
956
957     if ($msg->{'type'} == $XMSG_DONE) {
958         $self->dbg("transfer is complete");
959         $self->_dump_done();
960     }
961 }
962
963 sub _dump_done {
964     my $self = shift;
965
966     my $result;
967
968     # determine the correct final status - DONE if we're done, PARTIAL
969     # if we've started writing to the volume, otherwise FAILED
970     if (@{$self->{'device_errors'}} or $self->{'config_denial_message'}) {
971         $result = $self->{'started_writing'}? 'PARTIAL' : 'FAILED';
972     } else {
973         $result = 'DONE';
974     }
975
976     my $dump_cb = $self->{'dump_cb'};
977     my %dump_cb_args = (
978         result => $result,
979         device_errors => $self->{'device_errors'},
980         config_denial_message => $self->{'config_denial_message'},
981         size => $self->{'size'},
982         duration => $self->{'duration'},
983         total_duration => time - $self->{'dump_start_time'},
984         nparts => $self->{'nparts'});
985
986     # reset everything and let the original caller know we're done
987     $self->{'xfer'} = undef;
988     $self->{'xdt'} = undef;
989     $self->{'dump_header'} = undef;
990     $self->{'dump_cb'} = undef;
991     $self->{'size'} = 0;
992     $self->{'duration'} = 0.0;
993     $self->{'nparts'} = undef;
994     $self->{'dump_start_time'} = undef;
995     $self->{'device_errors'} = [];
996     $self->{'config_denial_message'} = undef;
997
998     # and call the callback
999     $dump_cb->(%dump_cb_args);
1000 }
1001
1002 # keyword parameters are utilities to the caller: either specify
1003 # device_error to add to the device_errors list or config_denial_message
1004 # to set the corresponding key in $self.
1005 sub _operation_failed {
1006     my $self = shift;
1007     my %params = @_;
1008
1009     my $error_message = $params{'device_error'}
1010                      || $params{'config_denial_message'}
1011                      || 'no reason';
1012     $self->dbg("operation failed: $error_message");
1013
1014     # tuck the message away as desired
1015     push @{$self->{'device_errors'}}, $params{'device_error'}
1016         if defined $params{'device_error'};
1017     $self->{'config_denial_message'} = $params{'config_denial_message'} 
1018         if $params{'config_denial_message'};
1019
1020     # cancelling the xdt will eventually cause an XMSG_DONE, which will notice
1021     # the error and set the result correctly; but if there's no xfer, then we
1022     # can just call _dump_done directly.
1023     if (defined $self->{'xfer'}) {
1024         $self->dbg("cancelling the transfer: $error_message");
1025
1026         $self->{'xfer'}->cancel();
1027     } else {
1028         if (defined $self->{'dump_cb'}) {
1029             # _dump_done constructs the dump_cb from $self parameters
1030             $self->_dump_done();
1031         } else {
1032             die "error with no callback to handle it: $error_message";
1033         }
1034     }
1035 }
1036
1037 # release the outstanding reservation, calling scribe_notif_tape_done
1038 # after the release
1039 sub _release_reservation {
1040     my $self = shift;
1041     my %params = @_;
1042     my @errors;
1043
1044     my ($label, $fm, $kb);
1045
1046     # if we've already written a volume, log it
1047     if ($self->{'device'} and defined $self->{'device'}->volume_label) {
1048         $label = $self->{'device'}->volume_label();
1049         $fm = $self->{'device'}->file();
1050         $kb = $self->{'device_size'} / 1024;
1051
1052         # log a message for amreport
1053         $self->{'feedback'}->scribe_notif_log_info(
1054             message => "tape $label kb $kb fm $fm [OK]");
1055     }
1056
1057     # finish the device if it isn't finished yet
1058     if ($self->{'device'}) {
1059         my $already_in_error = $self->{'device'}->status() != $DEVICE_STATUS_SUCCESS;
1060
1061         if (!$self->{'device'}->finish() && !$already_in_error) {
1062             push @errors, $self->{'device'}->error_or_status();
1063         }
1064     }
1065     $self->{'device'} = undef;
1066     $self->{'device_at_eom'} = 0;
1067
1068     $self->{'reservation'}->release(finished_cb => sub {
1069         my ($err) = @_;
1070         push @errors, "$err" if $err;
1071
1072         $self->{'reservation'} = undef;
1073
1074         # notify the feedback that we've finished and released a tape
1075         if ($label) {
1076             $self->{'feedback'}->scribe_notif_tape_done(
1077                 volume_label => $label,
1078                 size => $kb * 1024,
1079                 num_files => $fm);
1080         }
1081
1082         $params{'finished_cb'}->(@errors? join("; ", @errors) : undef);
1083     });
1084 }
1085
1086 # invoke the devhandling to get a new device, with all of the requisite
1087 # notifications and checks and whatnot.  On *success*, call _start_dump; on
1088 # failure, call other appropriate methods.
1089 sub _get_new_volume {
1090     my $self = shift;
1091
1092     # release first, if necessary
1093     if ($self->{'reservation'}) {
1094         $self->_release_reservation(finished_cb => sub {
1095             my ($error) = @_;
1096
1097             if ($error) {
1098                 $self->_operation_failed(device_error => $error);
1099             } else {
1100                 $self->_get_new_volume();
1101             }
1102         });
1103
1104         return;
1105     }
1106
1107     $self->{'devhandling'}->get_volume(volume_cb => sub { $self->_volume_cb(@_); });
1108 }
1109
1110 sub _volume_cb  {
1111     my $self = shift;
1112     my ($scan_error, $config_denial_message, $error_denial_message,
1113         $reservation, $new_label, $access_mode, $is_new, $new_scribe) = @_;
1114
1115     # note that we prefer the config_denial_message over the scan error.  If
1116     # both occurred, then the results of the scan are immaterial -- we
1117     # shouldn't have been looking for a new volume anyway.
1118
1119     if ($config_denial_message) {
1120         $self->_operation_failed(config_denial_message => $config_denial_message);
1121         return;
1122     }
1123
1124     if ($error_denial_message) {
1125         $self->_operation_failed(device_error => $error_denial_message);
1126         return;
1127     }
1128
1129     if ($new_scribe) {
1130         # Transfer the xfer to the new scribe
1131         $self->dbg("take scribe from");
1132
1133         $new_scribe->{'dump_cb'} = $self->{'dump_cb'};
1134         $new_scribe->{'dump_header'} = $self->{'dump_header'};
1135         $new_scribe->{'retry_part_on_peom'} = $self->{'retry_part_on_peom'};
1136         $new_scribe->{'split_method'} = $self->{'split_method'};
1137         $new_scribe->{'xfer'} = $self->{'xfer'};
1138         $new_scribe->{'xdt'} = $self->{'xdt'};
1139         $new_scribe->{'xdt_ready'} = $self->{'xdt_ready'};
1140         $new_scribe->{'start_part_on_xdt_ready'} = $self->{'start_part_on_xdt_ready'};
1141         $new_scribe->{'size'} = $self->{'size'};
1142         $new_scribe->{'duration'} = $self->{'duration'};
1143         $new_scribe->{'dump_start_time'} = $self->{'dump_start_time'};
1144         $new_scribe->{'last_part_successful'} = $self->{'last_part_successful'};
1145         $new_scribe->{'started_writing'} = $self->{'started_writing'};
1146         $new_scribe->{'feedback'} = $self->{'feedback'};
1147         $new_scribe->{'devhandling'}->{'feedback'} = $self->{'feedback'};
1148         $self->{'dump_header'} = undef;
1149         $self->{'dump_cb'} = undef;
1150         $self->{'xfer'} = undef;
1151         $self->{'xdt'} = undef;
1152         $self->{'xdt_ready'} = undef;
1153         $self->{'dump_start_time'} = undef;
1154         $self->{'started_writing'} = 0;
1155         $self->{'feedback'} = undef;
1156         if (defined $new_scribe->{'device'}) {
1157             $new_scribe->{'xdt'}->use_device($new_scribe->{'device'});
1158         }
1159         # start it
1160         $new_scribe->_start_part();
1161
1162         return;
1163     }
1164
1165     if ($scan_error) {
1166         # we had permission to use a tape, but didn't find a tape, so we need
1167         # to notify of such
1168         $self->{'feedback'}->scribe_notif_new_tape(
1169             error => $scan_error,
1170             volume_label => undef);
1171
1172         $self->_operation_failed(device_error => $scan_error);
1173         return;
1174     }
1175
1176     $self->dbg("got new volume; writing new label");
1177
1178     # from here on, if an error occurs, we must send scribe_notif_new_tape, and look
1179     # for a new volume
1180     $self->{'reservation'} = $reservation;
1181     $self->{'device_size'} = 0;
1182     my $device = $self->{'device'} = $reservation->{'device'};
1183
1184     # turn on verbose logging now, if we need it
1185     if ($self->{'debug'}) {
1186         $reservation->{'device'}->property_set("verbose", 1);
1187     }
1188
1189     # read the label once, to get a "before" snapshot (see below)
1190     my $old_label;
1191     my $old_timestamp;
1192     if (!$is_new) {
1193         if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1194             && !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1195             $self->{'feedback'}->scribe_notif_new_tape(
1196                 error => "while reading label on new volume: " . $device->error_or_status(),
1197                 volume_label => undef);
1198
1199             return $self->_get_new_volume();
1200         }
1201         $old_label = $device->volume_label;
1202         $old_timestamp = $device->volume_time;
1203     }
1204
1205     # inform the xdt about this new device before starting it
1206     $self->{'xdt'}->use_device($device);
1207
1208     my $result = $self->_device_start($device, $access_mode, $new_label, $is_new);
1209     if ($result == 0) {
1210         # try reading the label to see whether we erased the tape
1211         my $erased = 0;
1212         CHECK_READ_LABEL: {
1213             # don't worry about erasing new tapes
1214             if ($is_new) {
1215                 last CHECK_READ_LABEL;
1216             }
1217
1218             $device->read_label();
1219
1220             # does the device think something is broken now?
1221             if (($device->status & ~$DEVICE_STATUS_VOLUME_UNLABELED)
1222                 and !($device->status & $DEVICE_STATUS_VOLUME_UNLABELED)) {
1223                 $erased = 1;
1224                 last CHECK_READ_LABEL;
1225             }
1226
1227             # has the label changed?
1228             my $vol_label = $device->volume_label;
1229             if ((!defined $old_label and defined $vol_label)
1230                 or (defined $old_label and !defined $vol_label)
1231                 or (defined $old_label and $old_label ne $vol_label)) {
1232                 $erased = 1;
1233                 last CHECK_READ_LABEL;
1234             }
1235
1236             # has the timestamp changed?
1237             my $vol_timestamp = $device->volume_time;
1238             if ((!defined $old_timestamp and defined $vol_timestamp)
1239                 or (defined $old_timestamp and !defined $vol_timestamp)
1240                 or (defined $old_timestamp and $old_timestamp ne $vol_timestamp)) {
1241                 $erased = 1;
1242                 last CHECK_READ_LABEL;
1243             }
1244         }
1245
1246         $self->{'feedback'}->scribe_notif_new_tape(
1247             error => "while labeling new volume: " . $device->error_or_status(),
1248             volume_label => $erased? $new_label : undef);
1249
1250         return $self->_get_new_volume();
1251     } elsif ($result != 1) {
1252         $self->{'feedback'}->scribe_notif_new_tape(
1253             error => $result,
1254             volume_label => undef);
1255         return $self->_get_new_volume();
1256     }
1257
1258     $new_label = $device->volume_label;
1259
1260     # success!
1261     $self->{'feedback'}->scribe_notif_new_tape(
1262         error => undef,
1263         volume_label => $new_label);
1264
1265     # notify the changer that we've labeled the tape, and start the part.
1266     my $label_set_cb = make_cb(label_set_cb => sub {
1267         my ($err) = @_;
1268         if ($err) {
1269             $self->{'feedback'}->scribe_notif_log_info(
1270                 message => "Error from set_label: $err");
1271             # fall through to start_part anyway...
1272         }
1273         return $self->_start_part();
1274     });
1275     $self->{'reservation'}->set_label(label => $new_label,
1276         finished_cb => $label_set_cb);
1277 }
1278
1279 # return 0 for device->start error
1280 # return 1 for success
1281 # return a message for others error
1282 sub _device_start {
1283     my $self = shift;
1284     my ($device, $access_mode, $new_label, $is_new) = @_;
1285
1286     my $tl = $self->{'taperscan'}->{'tapelist'};
1287
1288     if (!defined $tl) { # For Mock::Taperscan in installcheck
1289         if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1290             return 0;
1291         } else {
1292             return 1;
1293         }
1294     }
1295
1296     if ($is_new) {
1297         # generate the new label and write it to the tapelist file
1298         $tl->reload(1);
1299         ($new_label, my $err) = $self->{'taperscan'}->make_new_tape_label();
1300         if (!defined $new_label) {
1301             $tl->unlock();
1302             return $err;
1303         } else {
1304             $tl->add_tapelabel('0', $new_label, undef, 0);
1305             $tl->write();
1306         }
1307         $self->dbg("generate new label '$new_label'");
1308     }
1309
1310     # write the label to the device
1311     if (!$device->start($access_mode, $new_label, $self->{'write_timestamp'})) {
1312         if ($is_new) {
1313             # remove the generated label from the tapelist file
1314             $tl->reload(1);
1315             $tl->remove_tapelabel($new_label);
1316             $tl->write();
1317         }
1318         return 0;
1319     }
1320
1321     # rewrite the tapelist file
1322     $tl->reload(1);
1323     my $tle = $tl->lookup_tapelabel($new_label);
1324     $tl->remove_tapelabel($new_label);
1325     $tl->add_tapelabel($self->{'write_timestamp'}, $new_label,
1326                        $tle? $tle->{'comment'} : undef, 1);
1327     $tl->write();
1328
1329     return 1;
1330 }
1331
1332 sub dbg {
1333     my ($self, $msg) = @_;
1334     if ($self->{'debug'}) {
1335         debug("Amanda::Taper::Scribe: $msg");
1336     }
1337 }
1338
1339 sub get_splitting_args_from_config {
1340     my %params = @_;
1341
1342     use Data::Dumper;
1343     my %splitting_args;
1344
1345     # if dle_splitting is false, then we don't split - easy.
1346     if (defined $params{'dle_allow_split'} and !$params{'dle_allow_split'}) {
1347         return ();
1348     }
1349
1350     # utility for below
1351     my $have_space = sub {
1352         my ($dirname, $part_size) = @_;
1353
1354         use Carp;
1355         my $fsusage = Amanda::Util::get_fs_usage($dirname);
1356         confess "$dirname" if (!$fsusage);
1357
1358         my $avail = $fsusage->{'blocks'} * $fsusage->{'bavail'};
1359         if ($avail < $part_size) {
1360             Amanda::Debug::debug("disk cache has $avail bytes available on $dirname, but " .
1361                                  "needs $part_size");
1362             return 0;
1363         } else {
1364             return 1;
1365         }
1366     };
1367
1368     # first, handle the alternate spellings for part_size and part_cache_type
1369     $params{'part_size'} = $params{'part_size_kb'} * 1024
1370         if (defined $params{'part_size_kb'});
1371
1372     if (defined $params{'part_cache_type_enum'}) {
1373         $params{'part_cache_type'} = 'none'
1374             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_NONE);
1375         $params{'part_cache_type'} = 'memory'
1376             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_MEMORY);
1377         $params{'part_cache_type'} = 'disk'
1378             if ($params{'part_cache_type_enum'} == $PART_CACHE_TYPE_DISK);
1379
1380         $params{'part_cache_type'} = 'unknown'
1381             unless defined $params{'part_cache_type'};
1382     }
1383
1384     # if any of the dle_* parameters are set, use those to set the part_*
1385     # parameters, which are emptied out first.
1386     if (defined $params{'dle_tape_splitsize'} or
1387         defined $params{'dle_split_diskbuffer'} or
1388         defined $params{'dle_fallback_splitsize'}) {
1389
1390         $params{'part_size'} = $params{'dle_tape_splitsize'} || 0;
1391         $params{'part_cache_type'} = 'none';
1392         $params{'part_cache_dir'} = undef;
1393         $params{'part_cache_max_size'} = undef;
1394
1395         # part cache type is memory unless we have a split_diskbuffer that fits the bill
1396         if ($params{'part_size'}) {
1397             $params{'part_cache_type'} = 'memory';
1398             if (defined $params{'dle_split_diskbuffer'}
1399                     and -d $params{'dle_split_diskbuffer'}) {
1400                 if ($have_space->($params{'dle_split_diskbuffer'}, $params{'part_size'})) {
1401                     # disk cache checks out, so use it
1402                     $params{'part_cache_type'} = 'disk';
1403                     $params{'part_cache_dir'} = $params{'dle_split_diskbuffer'};
1404                 } else {
1405                     my $msg = "falling back to memory buffer for splitting: " .
1406                                 "insufficient space in disk cache directory";
1407                     $splitting_args{'warning'} = $msg;
1408                 }
1409             }
1410         }
1411
1412         if ($params{'part_cache_type'} eq 'memory') {
1413             # fall back to 10M if fallback size is not given
1414             $params{'part_cache_max_size'} = $params{'dle_fallback_splitsize'} || 10*1024*1024;
1415         }
1416     } else {
1417         my $ps = $params{'part_size'};
1418         my $pcms = $params{'part_cache_max_size'};
1419         $ps = $pcms if (!defined $ps or (defined $pcms and $pcms < $ps));
1420
1421         # fail back from 'disk' to 'none' if the disk isn't set up correctly
1422         if (defined $params{'part_cache_type'} and
1423                     $params{'part_cache_type'} eq 'disk') {
1424             my $warning;
1425             if (!$params{'part_cache_dir'}) {
1426                 $warning = "no part-cache-dir specified; "
1427                             . "using part cache type 'none'";
1428             } elsif (!-d $params{'part_cache_dir'}) {
1429                 $warning = "part-cache-dir '$params{part_cache_dir} "
1430                             . "does not exist; using part cache type 'none'";
1431             } elsif (!$have_space->($params{'part_cache_dir'}, $ps)) {
1432                 $warning = "part-cache-dir '$params{part_cache_dir} "
1433                             . "has insufficient space; using part cache type 'none'";
1434             }
1435
1436             if (defined $warning) {
1437                 $splitting_args{'warning'} = $warning;
1438                 $params{'part_cache_type'} = 'none';
1439                 delete $params{'part_cache_dir'};
1440             }
1441         }
1442     }
1443
1444     $splitting_args{'part_size'} = $params{'part_size'}
1445         if defined($params{'part_size'});
1446     $splitting_args{'part_cache_type'} = $params{'part_cache_type'}
1447         if defined($params{'part_cache_type'});
1448     $splitting_args{'part_cache_dir'} = $params{'part_cache_dir'}
1449         if defined($params{'part_cache_dir'});
1450     $splitting_args{'part_cache_max_size'} = $params{'part_cache_max_size'}
1451         if defined($params{'part_cache_max_size'});
1452
1453     return %splitting_args;
1454 }
1455 ##
1456 ## Feedback
1457 ##
1458
1459 package Amanda::Taper::Scribe::Feedback;
1460
1461 sub request_volume_permission {
1462     my $self = shift;
1463     my %params = @_;
1464
1465     # sure, you can have as many volumes as you want!
1466     $params{'perm_cb'}->(allow => 1);
1467 }
1468
1469 sub scribe_notif_new_tape { }
1470 sub scribe_notif_tape_done { }
1471 sub scribe_notif_part_done { }
1472 sub scribe_notif_log_info { }
1473
1474 ##
1475 ## Device Handling
1476 ##
1477
1478 package Amanda::Taper::Scribe::DevHandling;
1479 use Amanda::MainLoop;
1480 use Carp;
1481
1482 # This class handles scanning for volumes, requesting permission for those
1483 # volumes (the driver likes to feel like it's in control), and providing those
1484 # volumes to the scribe on request.  These can all happen independently, but
1485 # the scribe cannot begin writing to a volume until all three have finished.
1486 # That is: the scan is finished, the driver has given its permission, and the
1487 # scribe has requested a volume.
1488 #
1489 # On start, the class starts scanning immediately, even though the scribe has
1490 # not requested a volume.  Subsequently, a new scan does not begin until the
1491 # scribe requests a volume.
1492 #
1493 # This class is "private" to Amanda::Taper::Scribe, so it is documented in
1494 # comments, rather than POD.
1495
1496 # Create a new DevHandling object.  Params are taperscan and feedback.
1497 sub new {
1498     my $class = shift;
1499     my %params = @_;
1500
1501     my $self = {
1502         taperscan => $params{'taperscan'},
1503         feedback => $params{'feedback'},
1504
1505         # is a scan currently running, or completed?
1506         scan_running => 0,
1507         scan_finished => 0,
1508         scan_error => undef,
1509
1510         # scan results
1511         reservation => undef,
1512         device => undef,
1513         volume_label => undef,
1514
1515         # requests for permissiont to use a new volume
1516         request_pending => 0,
1517         request_complete => 0,
1518         request_denied => 0,
1519         config_denial_message => undef,
1520         error_denial_message => undef,
1521
1522         volume_cb => undef, # callback for get_volume
1523         start_finished_cb => undef, # callback for start
1524     };
1525
1526     return bless ($self, $class);
1527 }
1528
1529 ## public methods
1530
1531 # Called at scribe startup, this starts the instance off with a scan.
1532 sub start {
1533     my $self = shift;
1534     my %params = @_;
1535
1536     $self->{'start_finished_cb'} = $params{'finished_cb'};
1537     $self->_start_scanning();
1538 }
1539
1540 sub quit {
1541     my $self = shift;
1542     my %params = @_;
1543
1544     for my $rq_param qw(finished_cb) {
1545         croak "required parameter '$rq_param' mising"
1546             unless exists $params{$rq_param};
1547     }
1548
1549     # since there's little other option than to barrel on through the
1550     # quitting procedure, quit() just accumulates its error messages
1551     # and, if necessary, concantenates them for the finished_cb.
1552     my @errors;
1553
1554     my $cleanup_cb = make_cb(cleanup_cb => sub {
1555         my ($error) = @_;
1556         push @errors, $error if $error;
1557
1558         $error = join("; ", @errors) if @errors >= 1;
1559
1560         $params{'finished_cb'}->($error);
1561     });
1562
1563     if ($self->{'reservation'}) {
1564         if ($self->{'device'}) {
1565             if (!$self->{'device'}->finish()) {
1566                 push @errors, $self->{'device'}->error_or_status();
1567             }
1568         }
1569
1570         $self->{'reservation'}->release(finished_cb => $cleanup_cb);
1571     } else {
1572         $cleanup_cb->(undef);
1573     }
1574 }
1575
1576 # Get an open, started device and label to start writing to.  The
1577 # volume_callback takes the following arguments:
1578 #   $scan_error -- error message, or undef if no error occurred
1579 #   $config_denial_reason -- config-related reason request was denied, or undef
1580 #   $error_denial_reason -- error-related reason request was denied, or undef
1581 #   $reservation -- Amanda::Changer reservation
1582 #   $device -- open, started device
1583 # It is the responsibility of the caller to close the device and release the
1584 # reservation when finished.  If $scan_error or $request_denied_info are
1585 # defined, then $reservation and $device will be undef.
1586 sub get_volume {
1587     my $self = shift;
1588     my (%params) = @_;
1589
1590     die "already processing a volume request"
1591         if ($self->{'volume_cb'});
1592
1593     $self->{'volume_cb'} = $params{'volume_cb'};
1594
1595     # kick off the relevant processes, if they're not already running
1596     $self->_start_request();
1597
1598     $self->_maybe_callback();
1599 }
1600
1601 # take a peek at the device we have, for which permission has not yet been
1602 # granted.  This will be undefined before the taperscan completes AND after
1603 # the volume_cb has been called.
1604 sub peek_device {
1605     my $self = shift;
1606
1607     return $self->{'device'};
1608 }
1609
1610 sub start_scan {
1611     my $self = shift;
1612
1613     if (!$self->{'scan_running'} && !$self->{'reservation'}) {
1614         $self->_start_scanning();
1615     }
1616 }
1617
1618 ## private methods
1619
1620 sub _start_scanning {
1621     my $self = shift;
1622
1623     return if $self->{'scan_running'} or $self->{'scan_finished'};
1624
1625     $self->{'scan_running'} = 1;
1626
1627     $self->{'taperscan'}->scan(result_cb => sub {
1628         my ($error, $reservation, $volume_label, $access_mode, $is_new) = @_;
1629
1630         $self->{'scan_running'} = 0;
1631         $self->{'scan_finished'} = 1;
1632
1633         if ($error) {
1634             $self->{'scan_error'} = $error;
1635         } else {
1636             $self->{'reservation'} = $reservation;
1637             $self->{'device'} = $reservation->{'device'};
1638             $self->{'volume_label'} = $volume_label;
1639             $self->{'access_mode'} = $access_mode;
1640             $self->{'is_new'} = $is_new;
1641         }
1642
1643         $self->_maybe_callback();
1644     });
1645 }
1646
1647 sub _start_request {
1648     my $self = shift;
1649
1650     return if $self->{'request_pending'} or $self->{'request_complete'};
1651
1652     $self->{'request_pending'} = 1;
1653
1654     $self->{'feedback'}->request_volume_permission(
1655     perm_cb => sub {
1656         my %params = @_;
1657
1658         $self->{'request_pending'} = 0;
1659         $self->{'request_complete'} = 1;
1660         if (defined $params{'scribe'}) {
1661             $self->{'new_scribe'} = $params{'scribe'};
1662             $self->{'scan_finished'} = 1;
1663             $self->{'request_complete'} = 1;
1664         } elsif (defined $params{'cause'}) {
1665             $self->{'request_denied'} = 1;
1666             if ($params{'cause'} eq 'config') {
1667                 $self->{'config_denial_message'} = $params{'message'};
1668             } elsif ($params{'cause'} eq 'error') {
1669                 $self->{'error_denial_message'} = $params{'message'};
1670             } else {
1671                 die "bad cause '" . $params{'cause'} . "'";
1672             }
1673         } elsif (!defined $params{'allow'}) {
1674             die "no allow or cause defined";
1675         }
1676
1677         $self->_maybe_callback();
1678     });
1679 }
1680
1681 sub _maybe_callback {
1682     my $self = shift;
1683
1684     # if we have any kind of error, release the reservation and come back
1685     # later
1686     if (($self->{'scan_error'} or $self->{'request_denied'}) and $self->{'reservation'}) {
1687         $self->{'device'} = undef;
1688
1689         $self->{'reservation'}->release(finished_cb => sub {
1690             my ($error) = @_;
1691
1692             # so many errors, so little time..
1693             if ($error) {
1694                 if ($self->{'scan_error'}) {
1695                     warning("ignoring error releasing reservation ($error) after a scan error");
1696                 } else {
1697                     $self->{'scan_error'} = $error;
1698                 }
1699             }
1700
1701             $self->{'reservation'} = undef;
1702             $self->_maybe_callback();
1703         });
1704
1705         return;
1706     }
1707
1708     # if we are just starting up, call the finished_cb given to start()
1709     if (defined $self->{'start_finished_cb'} and $self->{'scan_finished'}) {
1710         my $cb = $self->{'start_finished_cb'};
1711         $self->{'start_finished_cb'} = undef;
1712
1713         $cb->($self->{'scan_error'});
1714     }
1715
1716     # if the volume_cb is good to get called, call it and reset to the ground state
1717     if ($self->{'volume_cb'} and (!$self->{'scan_running'} or $self->{'scan_finished'}) and $self->{'request_complete'}) {
1718         # get the cb and its arguments lined up before calling it..
1719         my $volume_cb = $self->{'volume_cb'};
1720         my @volume_cb_args = (
1721             $self->{'scan_error'},
1722             $self->{'config_denial_message'},
1723             $self->{'error_denial_message'},
1724             $self->{'reservation'},
1725             $self->{'volume_label'},
1726             $self->{'access_mode'},
1727             $self->{'is_new'},
1728             $self->{'new_scribe'},
1729         );
1730
1731         # reset everything and prepare for a new scan
1732         $self->{'scan_finished'} = 0;
1733
1734         $self->{'reservation'} = undef;
1735         $self->{'device'} = undef;
1736         $self->{'volume_label'} = undef;
1737
1738         $self->{'request_complete'} = 0;
1739         $self->{'request_denied'} = 0;
1740         $self->{'config_denial_message'} = undef;
1741         $self->{'error_denial_message'} = undef;
1742         $self->{'volume_cb'} = undef;
1743         $self->{'new_scribe'} = undef;
1744
1745         $volume_cb->(@volume_cb_args);
1746     }
1747 }
1748
1749 1;