2 * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
25 Amanda::Xfer - the transfer architecture
30 use Amanda::Xfer qw( :constants );
33 my $infd = POSIX::open("input", POSIX::O_RDONLY, 0);
34 my $outfd = POSIX::open("output", POSIX::O_CREAT|POSIX::O_WRONLY, 0640);
35 my $xfer = Amanda::Xfer->new([
36 Amanda::Xfer::Source::Fd->new($infd),
37 Amanda::Xfer::Dest::Fd->new($outfd)
40 my ($src, $xmsg, $xfer) = @_;
41 print "Message from $xfer: $xmsg\n"; # use stringify operations
42 if ($msg->{'type'} == $XMSG_DONE) {
43 Amanda::MainLoop::quit();
46 Amanda::MainLoop::run();
48 See L<http://wiki.zmanda.com/index.php/XFA> for background on the
49 transfer architecture.
51 =head1 Amanda::Xfer Objects
53 A new transfer is created with C<< Amanda::Xfer->new() >>, which takes
54 an arrayref giving the transfer elements which should compose the
57 The resulting object has the following methods:
61 =item start($cb, $offset, $size)
63 Start this transfer. It transfer $size bytes starting from offset $offset.
64 $offset must be 0. $size is only supported by Amanda::Xfer::Source::Recovery.
65 A size of 0 transfer everything to EOF.
66 Processing takes place asynchronously, and messages will
67 begin queueing up immediately. If C<$cb> is given, then it is installed as the
68 callback for messages from this transfer. The callback receives three
69 arguments: the event source, the message, and a reference to the controlling
70 transfer. See the description of C<Amanda::Xfer::Msg>, below, for details.
72 There is no need to remove the source on completion of the transfer - that is
77 Stop transferring data. The transfer will send an C<XMSG_CANCEL>,
78 "drain" any buffered data as best it can, and then complete normally
83 Get the transfer's status. The result will be one of C<$XFER_INIT>,
84 C<$XFER_START>, C<$XFER_RUNNING>, or C<$XFER_DONE>. These symbols are
85 available for import with the tag C<:constants>.
89 Return a string representation of this transfer, suitable for use in
90 debugging messages. This method is automatically invoked when a
91 transfer is interpolated into a string:
93 print "Starting $xfer\n";
97 Get the L<Amanda::MainLoop> event source through which messages will
98 be delivered for this transfer. Use its C<set_callback> method to
99 connect a perl sub for processing events.
101 Use of this method is deprecated; instead, pass a callback to the C<start>
102 method. If you set a callback via C<get_source>, then you I<must> C<remove>
103 the source when the transfer is complete!
107 =head1 Amanda::Xfer::Element objects
109 The individual transfer elements that compose a transfer are instances
110 of subclasses of Amanda::Xfer::Element. All such objects have a
111 C<repr()> method, similar to that for transfers, and support a similar
112 kind of string interpolation.
114 Note that the names of these classes contain the words "Source",
115 "Filter", and "Dest". This is merely suggestive of their intended
116 purpose -- there are no such abstract classes.
118 =head2 Transfer Sources
120 =head3 Amanda::Xfer::Source::Device (SERVER ONLY)
122 Amanda::Xfer::Source::Device->new($device);
124 This source reads data from a device. The device should already be
125 queued up for reading (C<< $device->seek_file(..) >>). The element
126 will read until the end of the device file.
128 =head3 Amanda::Xfer::Source::Fd
130 Amanda::Xfer::Source::Fd->new(fileno($fh));
132 This source reads data from a file descriptor. It reads until EOF,
133 but does not close the descriptor. Be careful not to let Perl close
136 =head3 Amanda::Xfer::Source::Holding (SERVER-ONLY)
138 Amanda::Xfer::Source::Holding->new($filename);
140 This source reads data from a holding file (see L<Amanda::Holding>).
141 If the transfer only consists of a C<Amanda::Xfer::Source::Holding>
142 and an C<Amanda::Xfer::Dest::Taper::Cacher> (with no filters), then the source
143 will call the destination's C<cache_inform> method so that it can use
144 holding chunks for a split-part cache.
146 =head3 Amanda::Xfer::Source::Random
148 Amanda::Xfer::Source::Random->new($length, $seed);
150 This source provides I<length> bytes of random data (or an unlimited
151 amount of data if I<length> is zero). C<$seed> is the seed used to
152 generate the random numbers; this seed can be used in a destination to
153 check for correct output.
155 If you need to string multiple transfers together into a coherent sequence of
156 random numbers, for example when testing the re-assembly of spanned dumps, call
158 my $seed = $src->get_seed();
160 to get the finishing seed for the source, then pass this to the source
161 constructor for the next transfer. When concatenated, the bytestreams from the
162 transfers will verify correctly using the original random seed.
164 =head3 Amanda::Xfer::Source::Pattern
166 Amanda::Xfer::Source::Pattern->new($length, $pattern);
168 This source provides I<length> bytes containing copies of
169 I<pattern>. If I<length> is zero, the source provides an unlimited
172 =head3 Amanda::Xfer::Source::Recovery (SERVER ONLY)
174 Amanda::Xfer::Source::Recovery->new($first_device);
176 This source reads a datastream composed of on-device files. Its constructor
177 takes a pointer to the first device that will be read from; this is used
178 internally to determine whether DirectTCP is supported.
180 The element sense C<$XMSG_READY> when it is ready for the first C<start_part>
181 invocation. Don't do anything with the device between the start of the
182 transfer and when the element sends an C<$XMSG_READY>.
184 The element contains no logic to decide I<which> files to assemble into the
185 datastream; instead, it relies on the caller to supply pre-positioned devices:
187 $src->start_part($device);
189 Once C<start_part> is called, the source will read until C<$device> produces an
190 EOF. As each part is completed, the element sends an C<$XMSG_PART_DONE>
191 L<Amanda::Xfer::Msg>, with the following keys:
193 size bytes read from the device
194 duration time spent reading
195 fileno the on-media file number from which the part was read
197 Call C<start_part> with C<$device = undef> to indicate that there are no more
200 To switch to a new device in mid-transfer, use C<use_device>:
202 $dest->use_device($device);
204 This method must be called with a device that is not yet started, and thus must
205 be called before the C<start_part> method is called with a new device.
207 =head3 Amanda::Xfer::Source::DirectTCPListen
209 Amanda::Xfer::Source::DirectTCPListen->new();
211 This source is for use when the transfer data will come in via DirectTCP, with
212 the data's I<source> connecting to the data's I<destination>. That is, the
213 data source is the connection initiator. Set up the transfer, and after
214 starting it, call this element's C<get_addrs> method to get an arrayref of ip/port pairs,
215 e.g., C<[ "192.168.4.5", 9924 ]>, all of which are listening for an incoming
216 data connection. Once a connection arrives, this element will read data from
217 it and send those data into the transfer.
219 my $addrs = $src->get_addrs();
221 =head3 Amanda::Xfer::Source::DirectTCPConnect
223 Amanda::Xfer::Source::DirectTCPConnect->new($addrs);
225 This source is for use when the transfer data will come in via DirectTCP, with
226 the data's I<destination> connecting to the the data's I<source>. That is, the
227 data destination is the connection initiator. The element connects to
228 C<$addrs> and reads the transfer data from the connection.
230 =head2 Transfer Filters
232 =head3 Amanda::Xfer::Filter:Process
234 $xfp = Amanda::Xfer::Filter::Process->new([@args], $need_root);
236 This filter will pipe data through the standard file descriptors of the
237 subprocess specified by C<@args>. If C<$need_root> is true, it will attempt to
238 change to uid 0 before executing the process. Note that the process is
239 invoked directly, not via a shell, so shell metacharcters (e.g., C<< 2>&1 >>)
240 will not function as expected. This method create a pipe for the process
241 stderr and the caller must read it or a hang may occur.
243 $xfp->get_stderr_fd()
245 Return the file descriptor of the stderr pipe to read from.
247 =head3 Amanda::Xfer::Filter:Xor
249 Amanda::Xfer::Filter::Xor->new($key);
251 This filter applies a bytewise XOR operation to the data flowing
254 =head2 Transfer Destinations
256 =head3 Amanda::Xfer::Dest::Device (SERVER ONLY)
258 Amanda::Xfer::Dest::Device->new($device, $cancel_at_eom);
260 This source writes data to a device. The device should be ready for writing
261 (C<< $device->start_file(..) >>). On completion of the transfer, the file will
262 be finished. If an error occurs, or if C<$cancel_at_eom> is true and the
263 device signals LEOM, the transfer will be cancelled.
265 Note that this element does not apply any sort of stream buffering.
267 =head3 Amanda::Xfer::Dest::Buffer
269 Amanda::Xfer::Dest::Buffer->new($max_size);
271 This destination records data into an in-memory buffer which can grow up to
272 C<$max_size> bytes. The buffer is available with the C<get> method, which
273 returns a copy of the buffer as a perl scalar:
275 my $buf = $xdb->get();
277 =head3 Amanda::Xfer::Dest::DirectTCPListen
279 Amanda::Xfer::Dest::DirectTCPListen->new();
281 This destination is for use when the transfer data will come in via DirectTCP,
282 with the data's I<destination> connecting to the data's I<source>. That is,
283 the data destination is the connection initiator. Set up the transfer, and
284 after starting it, call this element's C<get_addrs> method to get an arrayref
285 of ip/port pairs, e.g., C<[ "192.168.4.5", 9924 ]>, all of which are listening
286 for an incoming data connection. Once a connection arrives, this element will
287 write the transfer data to it.
289 my $addrs = $src->get_addrs();
291 =head3 Amanda::Xfer::Dest::DirectTCPConnect
293 Amanda::Xfer::Dest::DirectTCPConnect->new($addrs);
295 This destination is for use when the transfer data will come in via DirectTCP,
296 with the data's I<source> connecting to the the data's I<destination>. That
297 is, the data source is the connection initiator. The element connects to
298 C<$addrs> and writes the transfer data to the connection.
300 =head3 Amanda::Xfer::Dest::Fd
302 Amanda::Xfer::Dest::Fd->new(fileno($fh));
304 This destination writes data to a file descriptor. The file is not
305 closed after the transfer is completed. Be careful not to let Perl
306 close the file for you!
308 =head3 Amanda::Xfer::Dest::Null
310 Amanda::Xfer::Dest::Null->new($seed);
312 This destination discards the data it receives. If C<$seed> is
313 nonzero, then the element will validate that it receives the data that
314 C<Amanda::Xfer::Source::Random> produced with the same seed. No
315 validation is performed if C<$seed> is zero.
317 =head3 Amanda::Xfer::Dest::Taper (SERVER ONLY)
319 This is the parent class to C<Amanda::Xfer::Dest::Taper::Cacher> and
320 C<Amanda::Xfer::Dest::Taper::DirectTCP>. These subclasses allow a single
321 transfer to write to multiple files (parts) on a device, and even spread those
322 parts over multiple devices, without interrupting the transfer itself.
324 The subclass constructors all take a C<$first_device>, which should be
325 configured but not yet started; and a C<$part_size> giving the maximum size of
326 each part. Note that this value may be rounded up internally as necessary.
328 When a transfer using a taper destination element is first started, no data is
329 transfered until the element's C<start_part> method is called:
331 $dest->start_part($retry_part);
333 where C<$device> is the device to which the part should be written. The device
334 should have a file open and ready to write (that is,
335 C<< $device->start_file(..) >> has already been called). If C<$retry_part> is
336 true, then the previous, unsuccessful part will be retried.
338 As each part is completed, the element sends an C<$XMSG_PART_DONE>
339 C<Amanda::Xfer::Msg>, with the following keys:
341 successful true if the part was written successfully
342 eof recipient should not call start_part again
343 eom this volume is at EOM; a new volume is required
344 size bytes written to volume
345 duration time spent writing, not counting changer ops, etc.
346 partnum the zero-based number of this part in the overall dumpfile
347 fileno the on-media file number used for this part, or 0 if no file
350 If C<eom> is true, then the caller should find a new volume before
351 continuing. If C<eof> is not true, then C<start_part> should be called
352 again, with C<$retry_part = !successful>. Note that it is possible
353 for some destinations to write a portion of a part successfully,
354 but still stop at EOM. That is, C<eom> does not necessarily imply
357 To switch to a new device in mid-transfer, use C<use_device>:
359 $dest->use_device($device);
361 This method must be called with a device that is not yet started.
363 If neither the memory nor disk caches are in use, but the dumpfile is
364 available on disk, then the C<cache_inform> method allows the element
365 to use that on-disk data to support retries. This is intended to
366 support transfers from Amanda's holding disk (see
367 C<Amanda::Xfer::Source::Holding>), but may be useful for other
370 $dest->cache_inform($filename, $offset, $length);
372 This function indicates that C<$filename> contains C<$length> bytes of
373 data, beginning at offset C<$offset> from the beginning of the file.
374 These bytes are assumed to follow immediately after any bytes
375 previously specified to C<cache_inform>. That is, no gaps or overlaps
376 are allowed in the data stream described to C<cache_inform>.
377 Furthermore, the location of each byte must be specified to this
378 method I<before> it is sent through the transfer.
380 $dest->get_part_bytes_written();
382 This function returns the number of bytes written for the current part
385 =head3 Amanda::Xfer::Dest::Taper::Splitter
387 Amanda::Xfer::Dest::Taper::Splitter->new($first_device, $max_memory,
388 $part_size, $expect_cache_inform);
390 This class splits a data stream into parts on the storage media. It is for use
391 when the device supports LEOM, when the dump is already available on disk
392 (C<cache_inform>), or when no caching is desired. It does not cache parts, so
393 it can only retry a partial part if the transfer source is calling
394 C<cache_inform>. If the element is used with devices that do not support LEOM,
395 then it will cancel the entire transfer if the device reaches EOM and
396 C<cache_inform> is not in use. Set C<$expect_cache_inform> appropriately based
397 on the incoming data.
399 The C<$part_size> and C<$first_device> parameters are described above for
400 C<Amanda::Xfer::Dest::Taper>.
402 =head3 Amanda::Xfer::Dest::Taper::Cacher
404 Amanda::Xfer::Dest::Taper::Cacher->new($first_device, $max_memory,
405 $part_size, $use_mem_cache, $disk_cache_dirname);
407 This class is similar to the splitter, but caches data from each part in one of
408 a variety of ways to support "rewinding" to retry a failed part (e.g., one that
409 does not fit on a device). It assumes that when a device reaches EOM while
410 writing, the entire on-volume file is corrupt - that is, that the device does
411 not support logical EOM. The class does not support C<cache_inform>.
413 The C<$part_size> and C<$first_device> parameters are described above for
414 C<Amanda::Xfer::Dest::Taper>.
416 If C<$use_mem_cache> is true, each part will be cached in memory (using
417 C<$part_size> bytes of memory; plan accordingly!). If C<$disk_cache_dirname>
418 is defined, then each part will be cached on-disk in a file in this directory.
419 It is an error to specify both in-memory and on-disk caching. If neither
420 option is specified, the element will operate successfully, but will not be
421 able to retry a part, and will cancel the transfer if a part fails.
423 =head3 Amanda::Xfer::Dest::Taper::DirectTCP
425 Amanda::Xfer::Dest::Taper::DirectTCP->new($first_device, $part_size);
427 This class uses the Device API DirectTCP methods to write data to a device via
428 DirectTCP. Since all DirectTCP devices support logical EOM, this class does
429 not cache any data, and will never re-start an unsuccessful part.
431 As state above, C<$first_device> must not be started when C<new> is called.
432 Furthermore, no use of that device is allowed until the element sens an
433 C<$XMSG_READY> to indicate that it is finished with the device. The
434 C<start_part> method must not be called until this method is received either.
436 =head1 Amanda::Xfer::Msg objects
438 Messages are simple hashrefs, with a few convenience methods. Like
439 transfers, they have a C<repr()> method that formats the message
440 nicely, and is available through string interpolation:
442 print "Received message $msg\n";
444 The canonical description of the message types and keys is in
445 C<xfer-src/xmsg.h>, and is not duplicated here. Every message has the
446 following basic keys.
452 The message type -- one of the C<xmsg_type> constants available from
453 the import tag C<:constants>.
457 The transfer element that sent the message.
461 The version of the message. This is used to support extensibility of
466 Additional keys are described in the documentation for the elements
467 that use them. All keys are listed in C<xfer-src/xmsg.h>.