*xfer_source_directtcp_connect = *Amanda::Xferc::xfer_source_directtcp_connect;
*xfer_filter_xor = *Amanda::Xferc::xfer_filter_xor;
*xfer_filter_process = *Amanda::Xferc::xfer_filter_process;
+*get_err_fd = *Amanda::Xferc::get_err_fd;
*xfer_dest_null = *Amanda::Xferc::xfer_dest_null;
*xfer_dest_buffer = *Amanda::Xferc::xfer_dest_buffer;
*xfer_dest_buffer_get = *Amanda::Xferc::xfer_dest_buffer_get;
if ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
- });
+ }, 0, 0);
Amanda::MainLoop::run();
See L<http://wiki.zmanda.com/index.php/XFA> for background on the
=over
-=item start($cb)
+=item start($cb, $offset, $size)
-Start this transfer. Processing takes place asynchronously, and messages will
+Start this transfer. It transfer $size bytes starting from offset $offset.
+$offset must be 0. $size is only supported by Amanda::Xfer::Source::Recovery.
+A size of 0 transfer everything to EOF.
+Processing takes place asynchronously, and messages will
begin queueing up immediately. If C<$cb> is given, then it is installed as the
callback for messages from this transfer. The callback receives three
arguments: the event source, the message, and a reference to the controlling
This source reads data from a holding file (see L<Amanda::Holding>).
If the transfer only consists of a C<Amanda::Xfer::Source::Holding>
-and an C<Amanda::Xfer::Dest::Taper::Splitter> (with no filters), then the source
+and an C<Amanda::Xfer::Dest::Taper::Cacher> (with no filters), then the source
will call the destination's C<cache_inform> method so that it can use
holding chunks for a split-part cache.
Amanda::Xfer::Source::DirectTCPListen->new();
This source is for use when the transfer data will come in via DirectTCP, with
-the data's I<source> connecting to the data's I<destination>. Set up the
-transfer, and after starting it, call its C<get_addrs> method to get an
-arrayref of ip/port pairs, e.g., C<[ "192.168.4.5", 9924 ]>, all of which are
-listening for an incoming data connection. Once a connection arrives, this
-element will read data from it and send those data into the transfer.
+the data's I<source> connecting to the data's I<destination>. That is, the
+data source is the connection initiator. Set up the transfer, and after
+starting it, call this element's C<get_addrs> method to get an arrayref of ip/port pairs,
+e.g., C<[ "192.168.4.5", 9924 ]>, all of which are listening for an incoming
+data connection. Once a connection arrives, this element will read data from
+it and send those data into the transfer.
my $addrs = $src->get_addrs();
Amanda::Xfer::Source::DirectTCPConnect->new($addrs);
This source is for use when the transfer data will come in via DirectTCP, with
-the data's I<destination> connecting to the the data's I<source>. The element
-connects to C<$addrs> and reads the transfer data from the connection.
+the data's I<destination> connecting to the the data's I<source>. That is, the
+data destination is the connection initiator. The element connects to
+C<$addrs> and reads the transfer data from the connection.
=head2 Transfer Filters
=head3 Amanda::Xfer::Filter:Process
- Amanda::Xfer::Filter::Process->new([@args], $need_root);
+ $xfp = Amanda::Xfer::Filter::Process->new([@args], $need_root);
This filter will pipe data through the standard file descriptors of the
subprocess specified by C<@args>. If C<$need_root> is true, it will attempt to
-change to uid 0 before executing the process. Standard output from the process
-is redirected to the debug log. Note that the process is invoked directly, not
-via a shell, so shell metacharcters (e.g., C<< 2>&1 >>) will not function as
-expected.
+change to uid 0 before executing the process. Note that the process is
+invoked directly, not via a shell, so shell metacharcters (e.g., C<< 2>&1 >>)
+will not function as expected. This method create a pipe for the process
+stderr and the caller must read it or a hang may occur.
+
+ $xfp->get_stderr_fd()
+
+Return the file descriptor of the stderr pipe to read from.
=head3 Amanda::Xfer::Filter:Xor
=head3 Amanda::Xfer::Dest::Device (SERVER ONLY)
- Amanda::Xfer::Dest::Device->new($device, $max_memory);
+ Amanda::Xfer::Dest::Device->new($device, $cancel_at_eom);
+
+This source writes data to a device. The device should be ready for writing
+(C<< $device->start_file(..) >>). On completion of the transfer, the file will
+be finished. If an error occurs, or if C<$cancel_at_eom> is true and the
+device signals LEOM, the transfer will be cancelled.
-This source writes data to a device. The device should be ready for
-writing (C<< $device->start_file(..) >>). No more than C<$max_memory>
-will be used for buffers. Use zero for the default buffer size. On
-completion of the transfer, the file will be finished.
+Note that this element does not apply any sort of stream buffering.
=head3 Amanda::Xfer::Dest::Buffer
Amanda::Xfer::Dest::DirectTCPListen->new();
This destination is for use when the transfer data will come in via DirectTCP,
-with the data's I<destination> connecting to the data's I<source>. Set up the
-transfer, and after starting it, call this element's C<get_addrs> method to get
-an arrayref of ip/port pairs, e.g., C<[ "192.168.4.5", 9924 ]>, all of which
-are listening for an incoming data connection. Once a connection arrives, this
-element will write the transfer data to it.
+with the data's I<destination> connecting to the data's I<source>. That is,
+the data destination is the connection initiator. Set up the transfer, and
+after starting it, call this element's C<get_addrs> method to get an arrayref
+of ip/port pairs, e.g., C<[ "192.168.4.5", 9924 ]>, all of which are listening
+for an incoming data connection. Once a connection arrives, this element will
+write the transfer data to it.
my $addrs = $src->get_addrs();
Amanda::Xfer::Dest::DirectTCPConnect->new($addrs);
This destination is for use when the transfer data will come in via DirectTCP,
-with the data's I<source> connecting to the the data's I<destination>. The
-element connects to C<$addrs> and writes the transfer data to the connection.
+with the data's I<source> connecting to the the data's I<destination>. That
+is, the data source is the connection initiator. The element connects to
+C<$addrs> and writes the transfer data to the connection.
=head3 Amanda::Xfer::Dest::Fd
=head3 Amanda::Xfer::Dest::Taper (SERVER ONLY)
-This is the parent class to C<Amanda::Xfer::Dest::Taper::Splitter> and
+This is the parent class to C<Amanda::Xfer::Dest::Taper::Cacher> and
C<Amanda::Xfer::Dest::Taper::DirectTCP>. These subclasses allow a single
transfer to write to multiple files (parts) on a device, and even spread those
parts over multiple devices, without interrupting the transfer itself.
=head3 Amanda::Xfer::Dest::Taper::Splitter
Amanda::Xfer::Dest::Taper::Splitter->new($first_device, $max_memory,
+ $part_size, $expect_cache_inform);
+
+This class splits a data stream into parts on the storage media. It is for use
+when the device supports LEOM, when the dump is already available on disk
+(C<cache_inform>), or when no caching is desired. It does not cache parts, so
+it can only retry a partial part if the transfer source is calling
+C<cache_inform>. If the element is used with devices that do not support LEOM,
+then it will cancel the entire transfer if the device reaches EOM and
+C<cache_inform> is not in use. Set C<$expect_cache_inform> appropriately based
+on the incoming data.
+
+The C<$part_size> and C<$first_device> parameters are described above for
+C<Amanda::Xfer::Dest::Taper>.
+
+=head3 Amanda::Xfer::Dest::Taper::Cacher
+
+ Amanda::Xfer::Dest::Taper::Cacher->new($first_device, $max_memory,
$part_size, $use_mem_cache, $disk_cache_dirname);
-This class caches data from each part in one of a variety of ways, and supports
-"rewinding" to retry a failed part (e.g., one that does not fit on a device).
-It assumes that when a device reaches EOM while writing, the entire file is
-corrupt. It does not (yet) support logical EOM, which can render this
-assumption incorrect.
+This class is similar to the splitter, but caches data from each part in one of
+a variety of ways to support "rewinding" to retry a failed part (e.g., one that
+does not fit on a device). It assumes that when a device reaches EOM while
+writing, the entire on-volume file is corrupt - that is, that the device does
+not support logical EOM. The class does not support C<cache_inform>.
-The C<$first_device> is used to calculate some internal constants, notably the
-slab size, based on the device characteristics. Subsequent devices must have
-the same block size. The C<$part_size> and C<$first_device> parameters are
-described above.
+The C<$part_size> and C<$first_device> parameters are described above for
+C<Amanda::Xfer::Dest::Taper>.
If C<$use_mem_cache> is true, each part will be cached in memory (using
C<$part_size> bytes of memory; plan accordingly!). If C<$disk_cache_dirname>
is defined, then each part will be cached on-disk in a file in this directory.
It is an error to specify both in-memory and on-disk caching. If neither
option is specified, the element will operate successfully, but will not be
-able to retry a part unless C<cache_inform> has been used properly (see above).
+able to retry a part, and will cancel the transfer if a part fails.
=head3 Amanda::Xfer::Dest::Taper::DirectTCP
push @{$EXPORT_TAGS{"constants"}}, @{$EXPORT_TAGS{"xmsg_type"}};
sub xfer_start_with_callback {
- my ($xfer, $cb) = @_;
+ my ($xfer, $cb, $offset, $size) = @_;
if (defined $cb) {
my $releasing_cb = sub {
my ($src, $msg, $xfer) = @_;
};
$xfer->get_source()->set_callback($releasing_cb);
}
- xfer_start($xfer);
+ $offset = 0 if !defined $offset;
+ $size = 0 if !defined $size;
+ xfer_start($xfer, $offset, $size);
+}
+
+sub xfer_set_callback {
+ my ($xfer, $cb) = @_;
+ if (defined $cb) {
+ my $releasing_cb = sub {
+ my ($src, $msg, $xfer) = @_;
+ my $done = $msg->{'type'} == $XMSG_DONE;
+ $src->remove() if $done;
+ $cb->(@_);
+ $cb = undef if $done; # break potential reference loop
+ };
+ $xfer->get_source()->set_callback($releasing_cb);
+ } else {
+ $xfer->get_source()->set_callback(undef);
+ }
}
package Amanda::Xfer::Xfer;
*get_status = *Amanda::Xfer::xfer_get_status;
*get_source = *Amanda::Xfer::xfer_get_amglue_source;
*start = *Amanda::Xfer::xfer_start_with_callback;
+*set_callback = *Amanda::Xfer::xfer_set_callback;
*cancel = *Amanda::Xfer::xfer_cancel;
package Amanda::Xfer::Element;
Amanda::Xfer::xfer_filter_process(@_);
}
+*get_stderr_fd = *Amanda::Xfer::get_err_fd;
package Amanda::Xfer::Dest::Fd;
# try to load Amanda::XferServer, which is server-only. If it's not found, then
# its classes just remain undefined.
BEGIN {
- eval "use Amanda::XferServer;";
+ use Amanda::Util;
+ if (Amanda::Util::built_with_component("server")) {
+ eval "use Amanda::XferServer;";
+ }
}
1;