#include <glib.h>
#include <glib-object.h>
#include "xfer.h"
-#include "queueing.h"
+#include "amanda.h"
#include "directtcp.h"
typedef enum {
* mechansisms. */
XFER_MECH_NONE,
- /* downstream element will read() from elt->upstream->output_fd; EOF
+ /* downstream element will read() from elt->upstream's output_fd; EOF
* is indicated by the usual OS mechanism resulting in a zero-length
* read, in response to which the downstream element must close
* the fd. */
XFER_MECH_READFD,
- /* upstream element will write() to elt->downstream->input_fd. EOF
+ /* upstream element will write() to elt->downstream's input_fd. EOF
* is indicated by closing the file descriptor. */
XFER_MECH_WRITEFD,
XFER_MECH_MAX,
} xfer_mech;
-/* Description of a pair (input, output) of xfer mechanisms that an
+/*
+ * Description of a pair (input, output) of xfer mechanisms that an
* element can support, along with the associated costs. An array of these
* pairs is stored in the class-level variable 'mech_pairs', describing
* all of the mechanisms that an element supports.
+ *
+ * Use the XFER_NROPS() and XFER_NTHREADS() macros below in declarations in
+ * order to make declarations more understandable.
*/
+
+#define XFER_NROPS(x) (x)
+#define XFER_NTHREADS(x) (x)
+
typedef struct {
xfer_mech input_mech;
xfer_mech output_mech;
gboolean expect_eof;
gboolean can_generate_eof;
- /* file descriptors for XFER_MECH_READFD and XFER_MECH_WRITEFD. These should be set
- * during setup(), and can be accessed by neighboring elements during start(). It is
- * up to subclasses to handle closing these file descriptors, if required. */
- gint input_fd;
- gint output_fd;
+ /* file descriptors for XFER_MECH_READFD and XFER_MECH_WRITEFD. These
+ * should be set during setup(), and can be accessed by neighboring
+ * elements during start(). These values are shared among multiple
+ * elements, and thus must be accessed with xfer_element_swap_input_fd and
+ * xfer_element_swap_output_fd. Any file descriptors remaining here at
+ * finalize time will be closed. */
+ gint _input_fd;
+ gint _output_fd;
/* array of IP:PORT pairs that can be used to connect to this element,
* terminated by a 0.0.0.0:0. The first is set by elements with an input
/* cache for repr() */
char *repr;
+
+ /* maximum size to transfer */
+ gint64 size;
} XferElement;
/*
*/
gboolean (*setup)(XferElement *elt);
+ /* set the size of data to transfer, to skip NUL padding bytes
+ * @param elt: the XferElement
+ * @param size: the size of data to transfer
+ * @return: TRUE
+ */
+ gboolean (*set_size)(XferElement *elt, gint64 size);
+
/* Start transferring data. The element downstream of this one will
* already be started, while the upstream element will not, so data will
* not begin flowing immediately. It is safe to access attributes of
* If expect_eof is TRUE, then this element should expect an EOF from its
* upstream element, and should drain any remaining data until that EOF
* arrives and generate an EOF to the downstream element. The utility
- * functions xfer_element_drain_by_reading and
- * xfer_element_drain_by_pulling may be useful for this purpose. This
- * draining is important in order to avoid hung threads or unexpected
- * SIGPIPEs.
+ * functions xfer_element_drain_fd and xfer_element_drain_buffers may be
+ * useful for this purpose. This draining is important in order to avoid
+ * hung threads or unexpected SIGPIPEs.
*
* If expect_eof is FALSE, then the upstream elements are unable to
* generate an early EOF, so this element should *not* attempt to drain any
gboolean xfer_element_link_to(XferElement *elt, XferElement *successor);
char *xfer_element_repr(XferElement *elt);
gboolean xfer_element_setup(XferElement *elt);
+gboolean xfer_element_set_size(XferElement *elt, gint64 size);
gboolean xfer_element_start(XferElement *elt);
void xfer_element_push_buffer(XferElement *elt, gpointer buf, size_t size);
gpointer xfer_element_pull_buffer(XferElement *elt, size_t *size);
*
* @param upstream: the element to drain
*/
-void xfer_element_drain_by_pulling(XferElement *upstream);
+void xfer_element_drain_buffers(XferElement *upstream);
/* Drain UPSTREAM by reading until EOF. This does not close
* the file descriptor.
*
* @param fd: the file descriptor to drain
*/
-void xfer_element_drain_by_reading(int fd);
+void xfer_element_drain_fd(int fd);
+
+/* Atomically swap a value into elt->_input_fd and _output_fd, respectively.
+ * Always use these methods to access the field.
+ *
+ * @param elt: xfer element
+ * @param newfd: new value for the fd field
+ * @returns: old value of the fd field
+ */
+#define xfer_element_swap_input_fd(elt, newfd) \
+ xfer_atomic_swap_fd((elt)->xfer, &(elt)->_input_fd, newfd)
+#define xfer_element_swap_output_fd(elt, newfd) \
+ xfer_atomic_swap_fd((elt)->xfer, &(elt)->_output_fd, newfd)
/***********************
* XferElement subclasses
* stdin and taking the results on stdout.
*
* The memory for ARGV becomes the property of the transfer element and will be
- * g_free'd when the xfer is destroyed.
+ * g_strfreev'd when the xfer is destroyed.
*
* Implemented in filter-process.c
*
* @param need_root: become root before exec'ing the subprocess
* @return: new element
*/
-XferElement *xfer_filter_process(gchar **argv, gboolean need_root);
+XferElement *xfer_filter_process(gchar **argv,
+ gboolean need_root);
/* A transfer filter that just applies a bytewise XOR transformation to the data
* that passes through it.