/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2008 Zmanda Inc.
+ * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
*
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
*
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
+ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
-/* Base classes and interfaces for transfer elements. There are two interfaces
- * defined here: IXferProducer and IXferConsumer. The former is for elements
- * which produce data, and the latter is for those which consume it. There is
- * a top-level XferElement base class, which all implementations subclass.
- *
- * Unless you're well-acquainted with GType and GObject, this file will be a
- * difficult read. It is really only of use to those implementing new subclasses.
+/* Base classes for transfer elements.
*/
#ifndef XFER_ELEMENT_H
#include <glib.h>
#include <glib-object.h>
#include "xfer.h"
-#include "device.h"
-#include "queueing.h"
+#include "amanda.h"
+#include "directtcp.h"
typedef enum {
/* sources have no input mechanisms and destinations have no output
* mechansisms. */
XFER_MECH_NONE,
- /* downstream element will read() from elt->upstream->output_fd; EOF
+ /* downstream element will read() from elt->upstream's output_fd; EOF
* is indicated by the usual OS mechanism resulting in a zero-length
* read, in response to which the downstream element must close
* the fd. */
XFER_MECH_READFD,
- /* upstream element will write() to elt->downstream->input_fd. EOF
+ /* upstream element will write() to elt->downstream's input_fd. EOF
* is indicated by closing the file descriptor. */
XFER_MECH_WRITEFD,
/* upstream element will call elt->downstream->push_buffer(buf) to push
* a buffer. EOF is indicated by passing a NULL buffer. */
XFER_MECH_PUSH_BUFFER,
+
+ /* DirectTCP: downstream sends an array of IP:PORT addresses to which a TCP
+ * connection should be made, then upstream connects to one of the addreses
+ * and sends the data over that connection */
+ XFER_MECH_DIRECTTCP_LISTEN,
+
+ /* DirectTCP: downstream gets IP:PORT addresses from upstream to which a
+ * TCP connection should be made, then connects to one of the addreses and
+ * receives the data over that connection */
+ XFER_MECH_DIRECTTCP_CONNECT,
+
+ /* (sentinel value) */
+ XFER_MECH_MAX,
} xfer_mech;
-/* Description of a pair (input, output) of xfer mechanisms that an
+/*
+ * Description of a pair (input, output) of xfer mechanisms that an
* element can support, along with the associated costs. An array of these
* pairs is stored in the class-level variable 'mech_pairs', describing
* all of the mechanisms that an element supports.
+ *
+ * Use the XFER_NROPS() and XFER_NTHREADS() macros below in declarations in
+ * order to make declarations more understandable.
*/
+
+#define XFER_NROPS(x) (x)
+#define XFER_NTHREADS(x) (x)
+
typedef struct {
xfer_mech input_mech;
xfer_mech output_mech;
gboolean expect_eof;
gboolean can_generate_eof;
- /* file descriptors for XFER_MECH_READFD and XFER_MECH_WRITEFD. These should be set
- * during setup(), and can be accessed by neighboring elements during start(). It is
- * up to subclasses to handle closing these file descriptors, if required. */
- gint input_fd;
- gint output_fd;
+ /* file descriptors for XFER_MECH_READFD and XFER_MECH_WRITEFD. These
+ * should be set during setup(), and can be accessed by neighboring
+ * elements during start(). These values are shared among multiple
+ * elements, and thus must be accessed with xfer_element_swap_input_fd and
+ * xfer_element_swap_output_fd. Any file descriptors remaining here at
+ * finalize time will be closed. */
+ gint _input_fd;
+ gint _output_fd;
+
+ /* array of IP:PORT pairs that can be used to connect to this element,
+ * terminated by a 0.0.0.0:0. The first is set by elements with an input
+ * mech of XFER_MECH_DIRECTTCP_LISTEN and accessed by their upstream
+ * neighbor; the second is set by elements with an output mech of
+ * XFER_MECH_DIRECTTCP_CONNECT and accessed by their downstream neighbor.
+ * */
+
+ DirectTCPAddr *input_listen_addrs;
+ DirectTCPAddr *output_listen_addrs;
/* cache for repr() */
char *repr;
+
+ /* maximum size to transfer */
+ gint64 size;
} XferElement;
/*
*/
char *(*repr)(XferElement *elt);
- /* Set up this element. This function is called for all elements in a transfer
- * before start() is called for any elements. For mechanisms where this element
- * supplies a file descriptor, it should set its input_fd and/or output_fd
- * appropriately; neighboring elements will use that value in start().
+ /* Set up this element. This function is called for all elements in a
+ * transfer before start() is called for any elements. For mechanisms
+ * where this element supplies a file descriptor, it should set its
+ * input_fd and/or output_fd appropriately; neighboring elements will use
+ * that value in start(). Elements which supply IP:PORT pairs should set
+ * their input_addrs, for neighboring elements to use in start().
*
* elt->input_mech and elt->output_mech are already set when this function
* is called, but upstream and downstream are not.
*
+ * If the setup operation fails, the method should send an XMSG_ERROR and
+ * call XMSG_CANCEL, and return False. In this situation, the start method
+ * will not be called. The Xfer will appear to the user to start and
+ * immediately fail.
+ *
+ * Note that this method may never be called if other elements' setup methods
+ * fail first.
+ *
+ * @param elt: the XferElement
+ * @return: false on failure, true on success
+ */
+ gboolean (*setup)(XferElement *elt);
+
+ /* set the size of data to transfer, to skip NUL padding bytes
* @param elt: the XferElement
+ * @param size: the size of data to transfer
+ * @return: TRUE
*/
- void (*setup)(XferElement *elt);
+ gboolean (*set_size)(XferElement *elt, gint64 size);
- /* Start transferring data. The element downstream of this one will already be
- * started, while the upstream element will not, so data will not begin flowing
- * immediately.
+ /* Start transferring data. The element downstream of this one will
+ * already be started, while the upstream element will not, so data will
+ * not begin flowing immediately. It is safe to access attributes of
+ * neighboring elements during this call.
+ *
+ * This method will *not* be called if all elements do not set up
+ * correctly.
*
* @param elt: the XferElement
* @return: TRUE if this element will send XMSG_DONE
* If expect_eof is TRUE, then this element should expect an EOF from its
* upstream element, and should drain any remaining data until that EOF
* arrives and generate an EOF to the downstream element. The utility
- * functions xfer_element_drain_by_reading and xfer_element_drain_by_pulling may be useful for this
- * purpose. This draining is important in order to avoid hung threads or
- * unexpected SIGPIPEs.
+ * functions xfer_element_drain_fd and xfer_element_drain_buffers may be
+ * useful for this purpose. This draining is important in order to avoid
+ * hung threads or unexpected SIGPIPEs.
*
* If expect_eof is FALSE, then the upstream elements are unable to
* generate an early EOF, so this element should *not* attempt to drain any
* If this element can generate an EOF, it should return TRUE, otherwise
* FALSE.
*
+ * This method may be called before start or setup if an error is
+ * encountered during setup.
+ *
* The default implementation sets self->expect_eof and self->cancelled
* appropriately and returns self->can_generate_eof.
*
* This method is always called from the main thread. It must not block.
*
* @param elt: the XferElement
+ * @param expect_eof: element should expect an EOF
+ * @returns: true if this element can return EOF
*/
- gboolean (*cancel)(XferElement *elt, gboolean generate_eof);
+ gboolean (*cancel)(XferElement *elt, gboolean expect_eof);
/* Get a buffer full of data from this element. This function is called by
* the downstream element under XFER_MECH_PULL_CALL. It can block indefinitely,
*
* @param elt: the XferElement
* @param size (output): size of resulting buffer
+ * @returns: buffer pointer
*/
gpointer (*pull_buffer)(XferElement *elt, size_t *size);
*/
void (*push_buffer)(XferElement *elt, gpointer buf, size_t size);
+ /* Returns the mech_pairs that this element supports. The default
+ * implementation just returns the class attribute 'mech_pairs', but
+ * subclasses can dynamically select the available mechanisms by overriding
+ * this method. Note that the method is called before the setup() method.
+ *
+ * @param elt: the XferElement
+ * @returns: array of mech pairs, terminated by <NONE,NONE>
+ */
+ xfer_element_mech_pair_t *(*get_mech_pairs)(XferElement *elt);
+
/* class variables */
/* This is used by the perl bindings -- it is a class variable giving the
const char *perl_class;
/* Statically allocated array of input/output mechanisms supported by this
- * class (terminated by <XFER_MECH_NONE,XFER_MECH_NONE>) */
+ * class (terminated by <XFER_MECH_NONE,XFER_MECH_NONE>). The default
+ * get_mech_pairs method returns this. */
xfer_element_mech_pair_t *mech_pairs;
} XferElementClass;
void xfer_element_unref(XferElement *elt);
gboolean xfer_element_link_to(XferElement *elt, XferElement *successor);
char *xfer_element_repr(XferElement *elt);
-void xfer_element_setup(XferElement *elt);
+gboolean xfer_element_setup(XferElement *elt);
+gboolean xfer_element_set_size(XferElement *elt, gint64 size);
gboolean xfer_element_start(XferElement *elt);
void xfer_element_push_buffer(XferElement *elt, gpointer buf, size_t size);
gpointer xfer_element_pull_buffer(XferElement *elt, size_t *size);
gboolean xfer_element_cancel(XferElement *elt, gboolean expect_eof);
+xfer_element_mech_pair_t *xfer_element_get_mech_pairs(XferElement *elt);
/****
* Subclass utilities
*
* @param upstream: the element to drain
*/
-void xfer_element_drain_by_pulling(XferElement *upstream);
+void xfer_element_drain_buffers(XferElement *upstream);
/* Drain UPSTREAM by reading until EOF. This does not close
* the file descriptor.
*
* @param fd: the file descriptor to drain
*/
-void xfer_element_drain_by_reading(int fd);
-
-/* Wait for the xfer's state to become CANCELLED or DONE; this is useful to
- * wait until a cancelletion is in progress before returning an EOF or
- * otherwise handling a failure. If you call this in the main thread, you'll
- * be waiting for a while.
- *
- * @param xfer: the transfer object
- * @returns: the new status (XFER_CANCELLED or XFER_DONE)
- */
-xfer_status wait_until_xfer_cancelled(Xfer *xfer);
+void xfer_element_drain_fd(int fd);
-/* Send an XMSG_ERROR constructed with the given format and arguments, then
- * cancel the transfer, then wait until the transfer is completely cancelled.
- * This is the most common error-handling process for transfer elements. All
- * that remains to be done on return is to branch to the appropriate point in
- * the cancellation-handling portion of the transfer.
+/* Atomically swap a value into elt->_input_fd and _output_fd, respectively.
+ * Always use these methods to access the field.
*
- * @param elt: the transfer element producing the error
- * @param fmt: the format for the error message
- * @param ...: arguments corresponding to the format
+ * @param elt: xfer element
+ * @param newfd: new value for the fd field
+ * @returns: old value of the fd field
*/
-void xfer_element_handle_error(XferElement *elt, const char *fmt, ...)
- G_GNUC_PRINTF(2,3);
+#define xfer_element_swap_input_fd(elt, newfd) \
+ xfer_atomic_swap_fd((elt)->xfer, &(elt)->_input_fd, newfd)
+#define xfer_element_swap_output_fd(elt, newfd) \
+ xfer_atomic_swap_fd((elt)->xfer, &(elt)->_output_fd, newfd)
/***********************
* XferElement subclasses
* can also provide a good prototype for new elements.
*/
-/* A transfer source that reads from a Device. The device must be positioned
- * at the start of a file before the transfer is started. The transfer will
- * continue until the end of the file.
- *
- * Implemented in source-device.c
- *
- * @param device: Device object to read from
- * @return: new element
- */
-XferElement *xfer_source_device(
- Device *device);
-
/* A transfer source that produces LENGTH bytes of random data, for testing
* purposes.
*
*/
XferElement *xfer_source_random(guint64 length, guint32 prng_seed);
+/* Get the ending random seed for the xfer_source_random. Call this after a
+ * transfer has finished, and construct a new xfer_source_random with the seed.
+ * The new source will continue the same random sequence at the next byte. This
+ * is useful for constructing spanned dumps in testing.
+ *
+ * @param src: XferSourceRandom object
+ * @returns: seed
+ */
+guint32 xfer_source_random_get_seed(XferElement *src);
+
/* A transfer source that produces LENGTH bytes containing repeated
* copies of the provided pattern, for testing purposes.
*
XferElement * xfer_source_fd(
int fd);
-/* A transfer filter that just applies a bytewise XOR transformation to the data
- * that passes through it.
+/* A transfer source that exposes its listening DirectTCPAddrs (via
+ * elt->input_listen_addrs) for external use
*
- * Implemented in filter-xor.c
+ * Implemented in source-directtcp-listen.c
*
- * @param xor_key: key for xor operations
* @return: new element
*/
-XferElement *xfer_filter_xor(
- unsigned char xor_key);
+XferElement * xfer_source_directtcp_listen(void);
-/* A transfer destination that writes bytes to a Device. The device should have a
- * file started, ready for a device_write_block call. On completion of the transfer,
- * the file will be finished.
+/* A transfer source that connects to a DirectTCP address and pulls data
+ * from it into the transfer.
*
- * Implemented in dest-device.c
+ * Implemented in source-directtcp-listen.c
*
- * @param device: the Device to write to, with a file started
- * @param max_memory: total amount of memory to use for buffers, or zero
- * for a reasonable default.
+ * @param addrs: DirectTCP addresses to connect to
* @return: new element
*/
-XferElement *
-xfer_dest_device(
- Device *device,
- size_t max_memory);
+XferElement * xfer_source_directtcp_connect(DirectTCPAddr *addrs);
+/* A transfer filter that executes an external application, feeding it data on
+ * stdin and taking the results on stdout.
+ *
+ * The memory for ARGV becomes the property of the transfer element and will be
+ * g_strfreev'd when the xfer is destroyed.
+ *
+ * Implemented in filter-process.c
+ *
+ * @param argv: NULL-terminated command-line arguments
+ * @param need_root: become root before exec'ing the subprocess
+ * @return: new element
+ */
+XferElement *xfer_filter_process(gchar **argv,
+ gboolean need_root);
+
+/* A transfer filter that just applies a bytewise XOR transformation to the data
+ * that passes through it.
+ *
+ * Implemented in filter-xor.c
+ *
+ * @param xor_key: key for xor operations
+ * @return: new element
+ */
+XferElement *xfer_filter_xor(
+ unsigned char xor_key);
/* A transfer destination that consumes all bytes it is given, optionally
* validating that they match those produced by source_random
XferElement *xfer_dest_fd(
int fd);
+/* A transfer destination that writes bytes to an in-memory buffer.
+ *
+ * Implemented in dest-buffer.c
+ *
+ * @param max_size: maximum size for the buffer, or zero for no limit
+ * @return: new element
+ */
+XferElement *xfer_dest_buffer(
+ gsize max_size);
+
+/* Get the buffer and size from an XferDestBuffer. The resulting buffer
+ * will remain allocated until the XDB itself is freed.
+ *
+ * Implemented in dest-buffer.c
+ *
+ * @param elt: the element
+ * @param buf (output): buffer pointer
+ * @param size (output): buffer size
+ */
+void xfer_dest_buffer_get(
+ XferElement *elt,
+ gpointer *buf,
+ gsize *size);
+
+/* A transfer dest that connects to a DirectTCPAddr and sends data to
+ * it
+ *
+ * Implemented in dest-directtcp-connect.c
+ *
+ * @param addrs: DirectTCP addresses to connect to
+ * @return: new element
+ */
+XferElement * xfer_dest_directtcp_connect(DirectTCPAddr *addrs);
+
+/* A transfer dest that listens for a DirecTCP connection and sends data to it
+ * when connected. Listening addresses are exposed at
+ * elt->output_listen_addrs.
+ *
+ * Implemented in dest-directtcp-listen.c
+ *
+ * @return: new element
+ */
+XferElement * xfer_dest_directtcp_listen(void);
+
#endif