X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=xfer-src%2Fxfer-element.h;h=5d45ac03f0c33ab1689a6ca6ffce4eea0cbfb694;hb=66086d646c4cc92cf582b6765b315060117c6ec4;hp=43a9323f8edb1a6c7fb4d8d431e6c024eba57f1b;hpb=79cdc4b6ea8848b21ba4a0e7d2fd3bc401e0bebe;p=debian%2Famanda diff --git a/xfer-src/xfer-element.h b/xfer-src/xfer-element.h index 43a9323..5d45ac0 100644 --- a/xfer-src/xfer-element.h +++ b/xfer-src/xfer-element.h @@ -1,29 +1,25 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 2008 Zmanda Inc. + * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved. * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation. * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300 + * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ -/* Base classes and interfaces for transfer elements. There are two interfaces - * defined here: IXferProducer and IXferConsumer. The former is for elements - * which produce data, and the latter is for those which consume it. There is - * a top-level XferElement base class, which all implementations subclass. - * - * Unless you're well-acquainted with GType and GObject, this file will be a - * difficult read. It is really only of use to those implementing new subclasses. +/* Base classes for transfer elements. */ #ifndef XFER_ELEMENT_H @@ -32,21 +28,21 @@ #include #include #include "xfer.h" -#include "device.h" -#include "queueing.h" +#include "amanda.h" +#include "directtcp.h" typedef enum { /* sources have no input mechanisms and destinations have no output * mechansisms. */ XFER_MECH_NONE, - /* downstream element will read() from elt->upstream->output_fd; EOF + /* downstream element will read() from elt->upstream's output_fd; EOF * is indicated by the usual OS mechanism resulting in a zero-length * read, in response to which the downstream element must close * the fd. */ XFER_MECH_READFD, - /* upstream element will write() to elt->downstream->input_fd. EOF + /* upstream element will write() to elt->downstream's input_fd. EOF * is indicated by closing the file descriptor. */ XFER_MECH_WRITEFD, @@ -57,13 +53,34 @@ typedef enum { /* upstream element will call elt->downstream->push_buffer(buf) to push * a buffer. EOF is indicated by passing a NULL buffer. */ XFER_MECH_PUSH_BUFFER, + + /* DirectTCP: downstream sends an array of IP:PORT addresses to which a TCP + * connection should be made, then upstream connects to one of the addreses + * and sends the data over that connection */ + XFER_MECH_DIRECTTCP_LISTEN, + + /* DirectTCP: downstream gets IP:PORT addresses from upstream to which a + * TCP connection should be made, then connects to one of the addreses and + * receives the data over that connection */ + XFER_MECH_DIRECTTCP_CONNECT, + + /* (sentinel value) */ + XFER_MECH_MAX, } xfer_mech; -/* Description of a pair (input, output) of xfer mechanisms that an +/* + * Description of a pair (input, output) of xfer mechanisms that an * element can support, along with the associated costs. An array of these * pairs is stored in the class-level variable 'mech_pairs', describing * all of the mechanisms that an element supports. + * + * Use the XFER_NROPS() and XFER_NTHREADS() macros below in declarations in + * order to make declarations more understandable. */ + +#define XFER_NROPS(x) (x) +#define XFER_NTHREADS(x) (x) + typedef struct { xfer_mech input_mech; xfer_mech output_mech; @@ -110,14 +127,30 @@ typedef struct XferElement { gboolean expect_eof; gboolean can_generate_eof; - /* file descriptors for XFER_MECH_READFD and XFER_MECH_WRITEFD. These should be set - * during setup(), and can be accessed by neighboring elements during start(). It is - * up to subclasses to handle closing these file descriptors, if required. */ - gint input_fd; - gint output_fd; + /* file descriptors for XFER_MECH_READFD and XFER_MECH_WRITEFD. These + * should be set during setup(), and can be accessed by neighboring + * elements during start(). These values are shared among multiple + * elements, and thus must be accessed with xfer_element_swap_input_fd and + * xfer_element_swap_output_fd. Any file descriptors remaining here at + * finalize time will be closed. */ + gint _input_fd; + gint _output_fd; + + /* array of IP:PORT pairs that can be used to connect to this element, + * terminated by a 0.0.0.0:0. The first is set by elements with an input + * mech of XFER_MECH_DIRECTTCP_LISTEN and accessed by their upstream + * neighbor; the second is set by elements with an output mech of + * XFER_MECH_DIRECTTCP_CONNECT and accessed by their downstream neighbor. + * */ + + DirectTCPAddr *input_listen_addrs; + DirectTCPAddr *output_listen_addrs; /* cache for repr() */ char *repr; + + /* maximum size to transfer */ + gint64 size; } XferElement; /* @@ -138,21 +171,43 @@ typedef struct { */ char *(*repr)(XferElement *elt); - /* Set up this element. This function is called for all elements in a transfer - * before start() is called for any elements. For mechanisms where this element - * supplies a file descriptor, it should set its input_fd and/or output_fd - * appropriately; neighboring elements will use that value in start(). + /* Set up this element. This function is called for all elements in a + * transfer before start() is called for any elements. For mechanisms + * where this element supplies a file descriptor, it should set its + * input_fd and/or output_fd appropriately; neighboring elements will use + * that value in start(). Elements which supply IP:PORT pairs should set + * their input_addrs, for neighboring elements to use in start(). * * elt->input_mech and elt->output_mech are already set when this function * is called, but upstream and downstream are not. * + * If the setup operation fails, the method should send an XMSG_ERROR and + * call XMSG_CANCEL, and return False. In this situation, the start method + * will not be called. The Xfer will appear to the user to start and + * immediately fail. + * + * Note that this method may never be called if other elements' setup methods + * fail first. + * + * @param elt: the XferElement + * @return: false on failure, true on success + */ + gboolean (*setup)(XferElement *elt); + + /* set the size of data to transfer, to skip NUL padding bytes * @param elt: the XferElement + * @param size: the size of data to transfer + * @return: TRUE */ - void (*setup)(XferElement *elt); + gboolean (*set_size)(XferElement *elt, gint64 size); - /* Start transferring data. The element downstream of this one will already be - * started, while the upstream element will not, so data will not begin flowing - * immediately. + /* Start transferring data. The element downstream of this one will + * already be started, while the upstream element will not, so data will + * not begin flowing immediately. It is safe to access attributes of + * neighboring elements during this call. + * + * This method will *not* be called if all elements do not set up + * correctly. * * @param elt: the XferElement * @return: TRUE if this element will send XMSG_DONE @@ -169,9 +224,9 @@ typedef struct { * If expect_eof is TRUE, then this element should expect an EOF from its * upstream element, and should drain any remaining data until that EOF * arrives and generate an EOF to the downstream element. The utility - * functions xfer_element_drain_by_reading and xfer_element_drain_by_pulling may be useful for this - * purpose. This draining is important in order to avoid hung threads or - * unexpected SIGPIPEs. + * functions xfer_element_drain_fd and xfer_element_drain_buffers may be + * useful for this purpose. This draining is important in order to avoid + * hung threads or unexpected SIGPIPEs. * * If expect_eof is FALSE, then the upstream elements are unable to * generate an early EOF, so this element should *not* attempt to drain any @@ -181,14 +236,19 @@ typedef struct { * If this element can generate an EOF, it should return TRUE, otherwise * FALSE. * + * This method may be called before start or setup if an error is + * encountered during setup. + * * The default implementation sets self->expect_eof and self->cancelled * appropriately and returns self->can_generate_eof. * * This method is always called from the main thread. It must not block. * * @param elt: the XferElement + * @param expect_eof: element should expect an EOF + * @returns: true if this element can return EOF */ - gboolean (*cancel)(XferElement *elt, gboolean generate_eof); + gboolean (*cancel)(XferElement *elt, gboolean expect_eof); /* Get a buffer full of data from this element. This function is called by * the downstream element under XFER_MECH_PULL_CALL. It can block indefinitely, @@ -197,6 +257,7 @@ typedef struct { * * @param elt: the XferElement * @param size (output): size of resulting buffer + * @returns: buffer pointer */ gpointer (*pull_buffer)(XferElement *elt, size_t *size); @@ -212,6 +273,16 @@ typedef struct { */ void (*push_buffer)(XferElement *elt, gpointer buf, size_t size); + /* Returns the mech_pairs that this element supports. The default + * implementation just returns the class attribute 'mech_pairs', but + * subclasses can dynamically select the available mechanisms by overriding + * this method. Note that the method is called before the setup() method. + * + * @param elt: the XferElement + * @returns: array of mech pairs, terminated by + */ + xfer_element_mech_pair_t *(*get_mech_pairs)(XferElement *elt); + /* class variables */ /* This is used by the perl bindings -- it is a class variable giving the @@ -221,7 +292,8 @@ typedef struct { const char *perl_class; /* Statically allocated array of input/output mechanisms supported by this - * class (terminated by ) */ + * class (terminated by ). The default + * get_mech_pairs method returns this. */ xfer_element_mech_pair_t *mech_pairs; } XferElementClass; @@ -232,11 +304,13 @@ typedef struct { void xfer_element_unref(XferElement *elt); gboolean xfer_element_link_to(XferElement *elt, XferElement *successor); char *xfer_element_repr(XferElement *elt); -void xfer_element_setup(XferElement *elt); +gboolean xfer_element_setup(XferElement *elt); +gboolean xfer_element_set_size(XferElement *elt, gint64 size); gboolean xfer_element_start(XferElement *elt); void xfer_element_push_buffer(XferElement *elt, gpointer buf, size_t size); gpointer xfer_element_pull_buffer(XferElement *elt, size_t *size); gboolean xfer_element_cancel(XferElement *elt, gboolean expect_eof); +xfer_element_mech_pair_t *xfer_element_get_mech_pairs(XferElement *elt); /**** * Subclass utilities @@ -248,37 +322,26 @@ gboolean xfer_element_cancel(XferElement *elt, gboolean expect_eof); * * @param upstream: the element to drain */ -void xfer_element_drain_by_pulling(XferElement *upstream); +void xfer_element_drain_buffers(XferElement *upstream); /* Drain UPSTREAM by reading until EOF. This does not close * the file descriptor. * * @param fd: the file descriptor to drain */ -void xfer_element_drain_by_reading(int fd); - -/* Wait for the xfer's state to become CANCELLED or DONE; this is useful to - * wait until a cancelletion is in progress before returning an EOF or - * otherwise handling a failure. If you call this in the main thread, you'll - * be waiting for a while. - * - * @param xfer: the transfer object - * @returns: the new status (XFER_CANCELLED or XFER_DONE) - */ -xfer_status wait_until_xfer_cancelled(Xfer *xfer); +void xfer_element_drain_fd(int fd); -/* Send an XMSG_ERROR constructed with the given format and arguments, then - * cancel the transfer, then wait until the transfer is completely cancelled. - * This is the most common error-handling process for transfer elements. All - * that remains to be done on return is to branch to the appropriate point in - * the cancellation-handling portion of the transfer. +/* Atomically swap a value into elt->_input_fd and _output_fd, respectively. + * Always use these methods to access the field. * - * @param elt: the transfer element producing the error - * @param fmt: the format for the error message - * @param ...: arguments corresponding to the format + * @param elt: xfer element + * @param newfd: new value for the fd field + * @returns: old value of the fd field */ -void xfer_element_handle_error(XferElement *elt, const char *fmt, ...) - G_GNUC_PRINTF(2,3); +#define xfer_element_swap_input_fd(elt, newfd) \ + xfer_atomic_swap_fd((elt)->xfer, &(elt)->_input_fd, newfd) +#define xfer_element_swap_output_fd(elt, newfd) \ + xfer_atomic_swap_fd((elt)->xfer, &(elt)->_output_fd, newfd) /*********************** * XferElement subclasses @@ -289,18 +352,6 @@ void xfer_element_handle_error(XferElement *elt, const char *fmt, ...) * can also provide a good prototype for new elements. */ -/* A transfer source that reads from a Device. The device must be positioned - * at the start of a file before the transfer is started. The transfer will - * continue until the end of the file. - * - * Implemented in source-device.c - * - * @param device: Device object to read from - * @return: new element - */ -XferElement *xfer_source_device( - Device *device); - /* A transfer source that produces LENGTH bytes of random data, for testing * purposes. * @@ -312,6 +363,16 @@ XferElement *xfer_source_device( */ XferElement *xfer_source_random(guint64 length, guint32 prng_seed); +/* Get the ending random seed for the xfer_source_random. Call this after a + * transfer has finished, and construct a new xfer_source_random with the seed. + * The new source will continue the same random sequence at the next byte. This + * is useful for constructing spanned dumps in testing. + * + * @param src: XferSourceRandom object + * @returns: seed + */ +guint32 xfer_source_random_get_seed(XferElement *src); + /* A transfer source that produces LENGTH bytes containing repeated * copies of the provided pattern, for testing purposes. * @@ -336,33 +397,50 @@ XferElement *xfer_source_pattern(guint64 length, void * pattern, XferElement * xfer_source_fd( int fd); -/* A transfer filter that just applies a bytewise XOR transformation to the data - * that passes through it. +/* A transfer source that exposes its listening DirectTCPAddrs (via + * elt->input_listen_addrs) for external use * - * Implemented in filter-xor.c + * Implemented in source-directtcp-listen.c * - * @param xor_key: key for xor operations * @return: new element */ -XferElement *xfer_filter_xor( - unsigned char xor_key); +XferElement * xfer_source_directtcp_listen(void); -/* A transfer destination that writes bytes to a Device. The device should have a - * file started, ready for a device_write_block call. On completion of the transfer, - * the file will be finished. +/* A transfer source that connects to a DirectTCP address and pulls data + * from it into the transfer. * - * Implemented in dest-device.c + * Implemented in source-directtcp-listen.c * - * @param device: the Device to write to, with a file started - * @param max_memory: total amount of memory to use for buffers, or zero - * for a reasonable default. + * @param addrs: DirectTCP addresses to connect to * @return: new element */ -XferElement * -xfer_dest_device( - Device *device, - size_t max_memory); +XferElement * xfer_source_directtcp_connect(DirectTCPAddr *addrs); +/* A transfer filter that executes an external application, feeding it data on + * stdin and taking the results on stdout. + * + * The memory for ARGV becomes the property of the transfer element and will be + * g_strfreev'd when the xfer is destroyed. + * + * Implemented in filter-process.c + * + * @param argv: NULL-terminated command-line arguments + * @param need_root: become root before exec'ing the subprocess + * @return: new element + */ +XferElement *xfer_filter_process(gchar **argv, + gboolean need_root); + +/* A transfer filter that just applies a bytewise XOR transformation to the data + * that passes through it. + * + * Implemented in filter-xor.c + * + * @param xor_key: key for xor operations + * @return: new element + */ +XferElement *xfer_filter_xor( + unsigned char xor_key); /* A transfer destination that consumes all bytes it is given, optionally * validating that they match those produced by source_random @@ -388,4 +466,48 @@ XferElement *xfer_dest_null( XferElement *xfer_dest_fd( int fd); +/* A transfer destination that writes bytes to an in-memory buffer. + * + * Implemented in dest-buffer.c + * + * @param max_size: maximum size for the buffer, or zero for no limit + * @return: new element + */ +XferElement *xfer_dest_buffer( + gsize max_size); + +/* Get the buffer and size from an XferDestBuffer. The resulting buffer + * will remain allocated until the XDB itself is freed. + * + * Implemented in dest-buffer.c + * + * @param elt: the element + * @param buf (output): buffer pointer + * @param size (output): buffer size + */ +void xfer_dest_buffer_get( + XferElement *elt, + gpointer *buf, + gsize *size); + +/* A transfer dest that connects to a DirectTCPAddr and sends data to + * it + * + * Implemented in dest-directtcp-connect.c + * + * @param addrs: DirectTCP addresses to connect to + * @return: new element + */ +XferElement * xfer_dest_directtcp_connect(DirectTCPAddr *addrs); + +/* A transfer dest that listens for a DirecTCP connection and sends data to it + * when connected. Listening addresses are exposed at + * elt->output_listen_addrs. + * + * Implemented in dest-directtcp-listen.c + * + * @return: new element + */ +XferElement * xfer_dest_directtcp_listen(void); + #endif