2 * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
25 Amanda::Xfer - the transfer architecture
30 use Amanda::Xfer qw( :constants );
33 my $infd = POSIX::open("input", POSIX::O_RDONLY, 0);
34 my $outfd = POSIX::open("output", POSIX::O_CREAT|POSIX::O_WRONLY, 0640);
35 my $xfer = Amanda::Xfer->new([
36 Amanda::Xfer::Source::Fd->new($infd),
37 Amanda::Xfer::Dest::Fd->new($outfd)
40 my ($src, $xmsg, $xfer) = @_;
41 print "Message from $xfer: $xmsg\n"; # use stringify operations
42 if ($msg->{'type'} == $XMSG_DONE) {
43 Amanda::MainLoop::quit();
46 Amanda::MainLoop::run();
48 See L<http://wiki.zmanda.com/index.php/XFA> for background on the
49 transfer architecture.
51 =head1 Amanda::Xfer Objects
53 A new transfer is created with C<< Amanda::Xfer->new() >>, which takes
54 an arrayref giving the transfer elements which should compose the
57 The resulting object has the following methods:
63 Start this transfer. Processing takes place asynchronously, and messages will
64 begin queueing up immediately. If C<$cb> is given, then it is installed as the
65 callback for messages from this transfer. The callback receives three
66 arguments: the event source, the message, and a reference to the controlling
67 transfer. See the description of C<Amanda::Xfer::Msg>, below, for details.
69 There is no need to remove the source on completion of the transfer - that is
74 Stop transferring data. The transfer will send an C<XMSG_CANCEL>,
75 "drain" any buffered data as best it can, and then complete normally
80 Get the transfer's status. The result will be one of C<$XFER_INIT>,
81 C<$XFER_START>, C<$XFER_RUNNING>, or C<$XFER_DONE>. These symbols are
82 available for import with the tag C<:constants>.
86 Return a string representation of this transfer, suitable for use in
87 debugging messages. This method is automatically invoked when a
88 transfer is interpolated into a string:
90 print "Starting $xfer\n";
94 Get the L<Amanda::MainLoop> event source through which messages will
95 be delivered for this transfer. Use its C<set_callback> method to
96 connect a perl sub for processing events.
98 Use of this method is deprecated; instead, pass a callback to the C<start>
99 method. If you set a callback via C<get_source>, then you I<must> C<remove>
100 the source when the transfer is complete!
104 =head1 Amanda::Xfer::Element objects
106 The individual transfer elements that compose a transfer are instances
107 of subclasses of Amanda::Xfer::Element. All such objects have a
108 C<repr()> method, similar to that for transfers, and support a similar
109 kind of string interpolation.
111 Note that the names of these classes contain the words "Source",
112 "Filter", and "Dest". This is merely suggestive of their intended
113 purpose -- there are no such abstract classes.
115 =head2 Transfer Sources
117 =head3 Amanda::Xfer::Source::Device (SERVER ONLY)
119 Amanda::Xfer::Source::Device->new($device);
121 This source reads data from a device. The device should already be
122 queued up for reading (C<< $device->seek_file(..) >>). The element
123 will read until the end of the device file.
125 =head3 Amanda::Xfer::Source::Fd
127 Amanda::Xfer::Source::Fd->new(fileno($fh));
129 This source reads data from a file descriptor. It reads until EOF,
130 but does not close the descriptor. Be careful not to let Perl close
133 =head3 Amanda::Xfer::Source::Holding (SERVER-ONLY)
135 Amanda::Xfer::Source::Holding->new($filename);
137 This source reads data from a holding file (see L<Amanda::Holding>).
138 If the transfer only consists of a C<Amanda::Xfer::Source::Holding>
139 and an C<Amanda::Xfer::Dest::Taper::Cacher> (with no filters), then the source
140 will call the destination's C<cache_inform> method so that it can use
141 holding chunks for a split-part cache.
143 =head3 Amanda::Xfer::Source::Random
145 Amanda::Xfer::Source::Random->new($length, $seed);
147 This source provides I<length> bytes of random data (or an unlimited
148 amount of data if I<length> is zero). C<$seed> is the seed used to
149 generate the random numbers; this seed can be used in a destination to
150 check for correct output.
152 If you need to string multiple transfers together into a coherent sequence of
153 random numbers, for example when testing the re-assembly of spanned dumps, call
155 my $seed = $src->get_seed();
157 to get the finishing seed for the source, then pass this to the source
158 constructor for the next transfer. When concatenated, the bytestreams from the
159 transfers will verify correctly using the original random seed.
161 =head3 Amanda::Xfer::Source::Pattern
163 Amanda::Xfer::Source::Pattern->new($length, $pattern);
165 This source provides I<length> bytes containing copies of
166 I<pattern>. If I<length> is zero, the source provides an unlimited
169 =head3 Amanda::Xfer::Source::Recovery (SERVER ONLY)
171 Amanda::Xfer::Source::Recovery->new($first_device);
173 This source reads a datastream composed of on-device files. Its constructor
174 takes a pointer to the first device that will be read from; this is used
175 internally to determine whether DirectTCP is supported.
177 The element sense C<$XMSG_READY> when it is ready for the first C<start_part>
178 invocation. Don't do anything with the device between the start of the
179 transfer and when the element sends an C<$XMSG_READY>.
181 The element contains no logic to decide I<which> files to assemble into the
182 datastream; instead, it relies on the caller to supply pre-positioned devices:
184 $src->start_part($device);
186 Once C<start_part> is called, the source will read until C<$device> produces an
187 EOF. As each part is completed, the element sends an C<$XMSG_PART_DONE>
188 L<Amanda::Xfer::Msg>, with the following keys:
190 size bytes read from the device
191 duration time spent reading
192 fileno the on-media file number from which the part was read
194 Call C<start_part> with C<$device = undef> to indicate that there are no more
197 To switch to a new device in mid-transfer, use C<use_device>:
199 $dest->use_device($device);
201 This method must be called with a device that is not yet started, and thus must
202 be called before the C<start_part> method is called with a new device.
204 =head3 Amanda::Xfer::Source::DirectTCPListen
206 Amanda::Xfer::Source::DirectTCPListen->new();
208 This source is for use when the transfer data will come in via DirectTCP, with
209 the data's I<source> connecting to the data's I<destination>. That is, the
210 data source is the connection initiator. Set up the transfer, and after
211 starting it, call this element's C<get_addrs> method to get an arrayref of ip/port pairs,
212 e.g., C<[ "192.168.4.5", 9924 ]>, all of which are listening for an incoming
213 data connection. Once a connection arrives, this element will read data from
214 it and send those data into the transfer.
216 my $addrs = $src->get_addrs();
218 =head3 Amanda::Xfer::Source::DirectTCPConnect
220 Amanda::Xfer::Source::DirectTCPConnect->new($addrs);
222 This source is for use when the transfer data will come in via DirectTCP, with
223 the data's I<destination> connecting to the the data's I<source>. That is, the
224 data destination is the connection initiator. The element connects to
225 C<$addrs> and reads the transfer data from the connection.
227 =head2 Transfer Filters
229 =head3 Amanda::Xfer::Filter:Process
231 Amanda::Xfer::Filter::Process->new([@args], $need_root, $log_stderr);
233 This filter will pipe data through the standard file descriptors of the
234 subprocess specified by C<@args>. If C<$need_root> is true, it will attempt to
235 change to uid 0 before executing the process. Standard output from the process
236 is redirected to the debug log. Note that the process is invoked directly, not
237 via a shell, so shell metacharcters (e.g., C<< 2>&1 >>) will not function as
238 expected. If C<$log_stderr> is set, then the filter's standard error is sent
239 to the debug log; otherwise, it is sent to the parent process's stderr.
241 =head3 Amanda::Xfer::Filter:Xor
243 Amanda::Xfer::Filter::Xor->new($key);
245 This filter applies a bytewise XOR operation to the data flowing
248 =head2 Transfer Destinations
250 =head3 Amanda::Xfer::Dest::Device (SERVER ONLY)
252 Amanda::Xfer::Dest::Device->new($device, $cancel_at_eom);
254 This source writes data to a device. The device should be ready for writing
255 (C<< $device->start_file(..) >>). On completion of the transfer, the file will
256 be finished. If an error occurs, or if C<$cancel_at_eom> is true and the
257 device signals LEOM, the transfer will be cancelled.
259 Note that this element does not apply any sort of stream buffering.
261 =head3 Amanda::Xfer::Dest::Buffer
263 Amanda::Xfer::Dest::Buffer->new($max_size);
265 This destination records data into an in-memory buffer which can grow up to
266 C<$max_size> bytes. The buffer is available with the C<get> method, which
267 returns a copy of the buffer as a perl scalar:
269 my $buf = $xdb->get();
271 =head3 Amanda::Xfer::Dest::DirectTCPListen
273 Amanda::Xfer::Dest::DirectTCPListen->new();
275 This destination is for use when the transfer data will come in via DirectTCP,
276 with the data's I<destination> connecting to the data's I<source>. That is,
277 the data destination is the connection initiator. Set up the transfer, and
278 after starting it, call this element's C<get_addrs> method to get an arrayref
279 of ip/port pairs, e.g., C<[ "192.168.4.5", 9924 ]>, all of which are listening
280 for an incoming data connection. Once a connection arrives, this element will
281 write the transfer data to it.
283 my $addrs = $src->get_addrs();
285 =head3 Amanda::Xfer::Dest::DirectTCPConnect
287 Amanda::Xfer::Dest::DirectTCPConnect->new($addrs);
289 This destination is for use when the transfer data will come in via DirectTCP,
290 with the data's I<source> connecting to the the data's I<destination>. That
291 is, the data source is the connection initiator. The element connects to
292 C<$addrs> and writes the transfer data to the connection.
294 =head3 Amanda::Xfer::Dest::Fd
296 Amanda::Xfer::Dest::Fd->new(fileno($fh));
298 This destination writes data to a file descriptor. The file is not
299 closed after the transfer is completed. Be careful not to let Perl
300 close the file for you!
302 =head3 Amanda::Xfer::Dest::Null
304 Amanda::Xfer::Dest::Null->new($seed);
306 This destination discards the data it receives. If C<$seed> is
307 nonzero, then the element will validate that it receives the data that
308 C<Amanda::Xfer::Source::Random> produced with the same seed. No
309 validation is performed if C<$seed> is zero.
311 =head3 Amanda::Xfer::Dest::Taper (SERVER ONLY)
313 This is the parent class to C<Amanda::Xfer::Dest::Taper::Cacher> and
314 C<Amanda::Xfer::Dest::Taper::DirectTCP>. These subclasses allow a single
315 transfer to write to multiple files (parts) on a device, and even spread those
316 parts over multiple devices, without interrupting the transfer itself.
318 The subclass constructors all take a C<$first_device>, which should be
319 configured but not yet started; and a C<$part_size> giving the maximum size of
320 each part. Note that this value may be rounded up internally as necessary.
322 When a transfer using a taper destination element is first started, no data is
323 transfered until the element's C<start_part> method is called:
325 $dest->start_part($retry_part);
327 where C<$device> is the device to which the part should be written. The device
328 should have a file open and ready to write (that is,
329 C<< $device->start_file(..) >> has already been called). If C<$retry_part> is
330 true, then the previous, unsuccessful part will be retried.
332 As each part is completed, the element sends an C<$XMSG_PART_DONE>
333 C<Amanda::Xfer::Msg>, with the following keys:
335 successful true if the part was written successfully
336 eof recipient should not call start_part again
337 eom this volume is at EOM; a new volume is required
338 size bytes written to volume
339 duration time spent writing, not counting changer ops, etc.
340 partnum the zero-based number of this part in the overall dumpfile
341 fileno the on-media file number used for this part, or 0 if no file
344 If C<eom> is true, then the caller should find a new volume before
345 continuing. If C<eof> is not true, then C<start_part> should be called
346 again, with C<$retry_part = !successful>. Note that it is possible
347 for some destinations to write a portion of a part successfully,
348 but still stop at EOM. That is, C<eom> does not necessarily imply
351 To switch to a new device in mid-transfer, use C<use_device>:
353 $dest->use_device($device);
355 This method must be called with a device that is not yet started.
357 If neither the memory nor disk caches are in use, but the dumpfile is
358 available on disk, then the C<cache_inform> method allows the element
359 to use that on-disk data to support retries. This is intended to
360 support transfers from Amanda's holding disk (see
361 C<Amanda::Xfer::Source::Holding>), but may be useful for other
364 $dest->cache_inform($filename, $offset, $length);
366 This function indicates that C<$filename> contains C<$length> bytes of
367 data, beginning at offset C<$offset> from the beginning of the file.
368 These bytes are assumed to follow immediately after any bytes
369 previously specified to C<cache_inform>. That is, no gaps or overlaps
370 are allowed in the data stream described to C<cache_inform>.
371 Furthermore, the location of each byte must be specified to this
372 method I<before> it is sent through the transfer.
374 $dest->get_part_bytes_written();
376 This function returns the number of bytes written for the current part
379 =head3 Amanda::Xfer::Dest::Taper::Splitter
381 Amanda::Xfer::Dest::Taper::Splitter->new($first_device, $max_memory,
382 $part_size, $expect_cache_inform);
384 This class splits a data stream into parts on the storage media. It is for use
385 when the device supports LEOM, when the dump is already available on disk
386 (C<cache_inform>), or when no caching is desired. It does not cache parts, so
387 it can only retry a partial part if the transfer source is calling
388 C<cache_inform>. If the element is used with devices that do not support LEOM,
389 then it will cancel the entire transfer if the device reaches EOM and
390 C<cache_inform> is not in use. Set C<$expect_cache_inform> appropriately based
391 on the incoming data.
393 The C<$part_size> and C<$first_device> parameters are described above for
394 C<Amanda::Xfer::Dest::Taper>.
396 =head3 Amanda::Xfer::Dest::Taper::Cacher
398 Amanda::Xfer::Dest::Taper::Cacher->new($first_device, $max_memory,
399 $part_size, $use_mem_cache, $disk_cache_dirname);
401 This class is similar to the splitter, but caches data from each part in one of
402 a variety of ways to support "rewinding" to retry a failed part (e.g., one that
403 does not fit on a device). It assumes that when a device reaches EOM while
404 writing, the entire on-volume file is corrupt - that is, that the device does
405 not support logical EOM. The class does not support C<cache_inform>.
407 The C<$part_size> and C<$first_device> parameters are described above for
408 C<Amanda::Xfer::Dest::Taper>.
410 If C<$use_mem_cache> is true, each part will be cached in memory (using
411 C<$part_size> bytes of memory; plan accordingly!). If C<$disk_cache_dirname>
412 is defined, then each part will be cached on-disk in a file in this directory.
413 It is an error to specify both in-memory and on-disk caching. If neither
414 option is specified, the element will operate successfully, but will not be
415 able to retry a part, and will cancel the transfer if a part fails.
417 =head3 Amanda::Xfer::Dest::Taper::DirectTCP
419 Amanda::Xfer::Dest::Taper::DirectTCP->new($first_device, $part_size);
421 This class uses the Device API DirectTCP methods to write data to a device via
422 DirectTCP. Since all DirectTCP devices support logical EOM, this class does
423 not cache any data, and will never re-start an unsuccessful part.
425 As state above, C<$first_device> must not be started when C<new> is called.
426 Furthermore, no use of that device is allowed until the element sens an
427 C<$XMSG_READY> to indicate that it is finished with the device. The
428 C<start_part> method must not be called until this method is received either.
430 =head1 Amanda::Xfer::Msg objects
432 Messages are simple hashrefs, with a few convenience methods. Like
433 transfers, they have a C<repr()> method that formats the message
434 nicely, and is available through string interpolation:
436 print "Received message $msg\n";
438 The canonical description of the message types and keys is in
439 C<xfer-src/xmsg.h>, and is not duplicated here. Every message has the
440 following basic keys.
446 The message type -- one of the C<xmsg_type> constants available from
447 the import tag C<:constants>.
451 The transfer element that sent the message.
455 The version of the message. This is used to support extensibility of
460 Additional keys are described in the documentation for the elements
461 that use them. All keys are listed in C<xfer-src/xmsg.h>.