2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
22 /* Base classes for transfer elements.
25 #ifndef XFER_ELEMENT_H
26 #define XFER_ELEMENT_H
29 #include <glib-object.h>
32 #include "directtcp.h"
35 /* sources have no input mechanisms and destinations have no output
39 /* downstream element will read() from elt->upstream->output_fd; EOF
40 * is indicated by the usual OS mechanism resulting in a zero-length
41 * read, in response to which the downstream element must close
45 /* upstream element will write() to elt->downstream->input_fd. EOF
46 * is indicated by closing the file descriptor. */
49 /* downstream element will call elt->upstream->pull_buffer() to
50 * pull a buffer. EOF is indicated by returning a NULL buffer */
51 XFER_MECH_PULL_BUFFER,
53 /* upstream element will call elt->downstream->push_buffer(buf) to push
54 * a buffer. EOF is indicated by passing a NULL buffer. */
55 XFER_MECH_PUSH_BUFFER,
57 /* DirectTCP: downstream sends an array of IP:PORT addresses to which a TCP
58 * connection should be made, then upstream connects to one of the addreses
59 * and sends the data over that connection */
60 XFER_MECH_DIRECTTCP_LISTEN,
62 /* DirectTCP: downstream gets IP:PORT addresses from upstream to which a
63 * TCP connection should be made, then connects to one of the addreses and
64 * receives the data over that connection */
65 XFER_MECH_DIRECTTCP_CONNECT,
67 /* (sentinel value) */
71 /* Description of a pair (input, output) of xfer mechanisms that an
72 * element can support, along with the associated costs. An array of these
73 * pairs is stored in the class-level variable 'mech_pairs', describing
74 * all of the mechanisms that an element supports.
78 xfer_mech output_mech;
79 guint8 ops_per_byte; /* number of byte copies or other operations */
80 guint8 nthreads; /* number of additional threads created */
81 } xfer_element_mech_pair_t;
83 /***********************
86 * The virtual base class for all transfer elements
89 GType xfer_element_get_type(void);
90 #define XFER_ELEMENT_TYPE (xfer_element_get_type())
91 #define XFER_ELEMENT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_element_get_type(), XferElement)
92 #define XFER_ELEMENT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_element_get_type(), XferElement const)
93 #define XFER_ELEMENT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_element_get_type(), XferElementClass)
94 #define IS_XFER_ELEMENT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_element_get_type ())
95 #define XFER_ELEMENT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_element_get_type(), XferElementClass)
98 * Main object structure
101 typedef struct XferElement {
104 /* The transfer to which this element is attached */
105 Xfer *xfer; /* set by xfer_new */
107 /* assigned input and output mechanisms */
108 xfer_mech input_mech;
109 xfer_mech output_mech;
111 /* neighboring xfer elements */
112 struct XferElement *upstream;
113 struct XferElement *downstream;
115 /* Information regarding cancellation. Cancelled and expect_eof are set by
116 * the default cancel() method. Can_generate_eof should be set during
117 * initialization, and is returned by the default cancel implementation */
120 gboolean can_generate_eof;
122 /* file descriptors for XFER_MECH_READFD and XFER_MECH_WRITEFD. These should be set
123 * during setup(), and can be accessed by neighboring elements during start(). It is
124 * up to subclasses to handle closing these file descriptors, if required. */
128 /* array of IP:PORT pairs that can be used to connect to this element,
129 * terminated by a 0.0.0.0:0. The first is set by elements with an input
130 * mech of XFER_MECH_DIRECTTCP_LISTEN and accessed by their upstream
131 * neighbor; the second is set by elements with an output mech of
132 * XFER_MECH_DIRECTTCP_CONNECT and accessed by their downstream neighbor.
135 DirectTCPAddr *input_listen_addrs;
136 DirectTCPAddr *output_listen_addrs;
138 /* cache for repr() */
147 GObjectClass __parent__;
149 /* Get a string representation of this element. The returned string will be freed
150 * when the element is finalized, and is static until that time. This method is
151 * implemented by XferElement, but can be overridden by classes that can provide
152 * additional useful information about themselves. Overriding methods can use
153 * the 'repr' instance variable as a cache -- it will be freed on finalize().
155 * @param elt: the XferElement
156 * @return: statically allocated string
158 char *(*repr)(XferElement *elt);
160 /* Set up this element. This function is called for all elements in a
161 * transfer before start() is called for any elements. For mechanisms
162 * where this element supplies a file descriptor, it should set its
163 * input_fd and/or output_fd appropriately; neighboring elements will use
164 * that value in start(). Elements which supply IP:PORT pairs should set
165 * their input_addrs, for neighboring elements to use in start().
167 * elt->input_mech and elt->output_mech are already set when this function
168 * is called, but upstream and downstream are not.
170 * If the setup operation fails, the method should send an XMSG_ERROR and
171 * call XMSG_CANCEL, and return False. In this situation, the start method
172 * will not be called. The Xfer will appear to the user to start and
175 * Note that this method may never be called if other elements' setup methods
178 * @param elt: the XferElement
179 * @return: false on failure, true on success
181 gboolean (*setup)(XferElement *elt);
183 /* Start transferring data. The element downstream of this one will
184 * already be started, while the upstream element will not, so data will
185 * not begin flowing immediately. It is safe to access attributes of
186 * neighboring elements during this call.
188 * This method will *not* be called if all elements do not set up
191 * @param elt: the XferElement
192 * @return: TRUE if this element will send XMSG_DONE
194 gboolean (*start)(XferElement *elt);
196 /* Stop transferring data. The upstream element's cancel method will
197 * already have been called, but due to buffering and synchronization
198 * issues, data may yet arrive. The element may discard any such data, but
199 * must not fail. This method is only called for abnormal terminations;
200 * elements should normally stop processing on receiving an EOF indication
203 * If expect_eof is TRUE, then this element should expect an EOF from its
204 * upstream element, and should drain any remaining data until that EOF
205 * arrives and generate an EOF to the downstream element. The utility
206 * functions xfer_element_drain_by_reading and
207 * xfer_element_drain_by_pulling may be useful for this purpose. This
208 * draining is important in order to avoid hung threads or unexpected
211 * If expect_eof is FALSE, then the upstream elements are unable to
212 * generate an early EOF, so this element should *not* attempt to drain any
213 * remaining data. As an example, an FdSource is not active and thus
214 * cannot generate an EOF on request.
216 * If this element can generate an EOF, it should return TRUE, otherwise
219 * This method may be called before start or setup if an error is
220 * encountered during setup.
222 * The default implementation sets self->expect_eof and self->cancelled
223 * appropriately and returns self->can_generate_eof.
225 * This method is always called from the main thread. It must not block.
227 * @param elt: the XferElement
228 * @param expect_eof: element should expect an EOF
229 * @returns: true if this element can return EOF
231 gboolean (*cancel)(XferElement *elt, gboolean expect_eof);
233 /* Get a buffer full of data from this element. This function is called by
234 * the downstream element under XFER_MECH_PULL_CALL. It can block indefinitely,
235 * and must only return NULL on EOF. Responsibility to free the buffer transfers
238 * @param elt: the XferElement
239 * @param size (output): size of resulting buffer
240 * @returns: buffer pointer
242 gpointer (*pull_buffer)(XferElement *elt, size_t *size);
244 /* A buffer full of data is being sent to this element for processing; this
245 * function is called by the upstream element under XFER_MECH_PUSH_CALL.
246 * It can block indefinitely if the data cannot be processed immediately.
247 * An EOF condition is signaled by call with a NULL buffer. Responsibility to
248 * free the buffer transfers to the callee.
250 * @param elt: the XferElement
252 * @param size: size of buffer
254 void (*push_buffer)(XferElement *elt, gpointer buf, size_t size);
256 /* Returns the mech_pairs that this element supports. The default
257 * implementation just returns the class attribute 'mech_pairs', but
258 * subclasses can dynamically select the available mechanisms by overriding
259 * this method. Note that the method is called before the setup() method.
261 * @param elt: the XferElement
262 * @returns: array of mech pairs, terminated by <NONE,NONE>
264 xfer_element_mech_pair_t *(*get_mech_pairs)(XferElement *elt);
266 /* class variables */
268 /* This is used by the perl bindings -- it is a class variable giving the
269 * appropriate perl class to wrap this XferElement. It should be set by
270 * each class's class_init.
272 const char *perl_class;
274 /* Statically allocated array of input/output mechanisms supported by this
275 * class (terminated by <XFER_MECH_NONE,XFER_MECH_NONE>). The default
276 * get_mech_pairs method returns this. */
277 xfer_element_mech_pair_t *mech_pairs;
284 void xfer_element_unref(XferElement *elt);
285 gboolean xfer_element_link_to(XferElement *elt, XferElement *successor);
286 char *xfer_element_repr(XferElement *elt);
287 gboolean xfer_element_setup(XferElement *elt);
288 gboolean xfer_element_start(XferElement *elt);
289 void xfer_element_push_buffer(XferElement *elt, gpointer buf, size_t size);
290 gpointer xfer_element_pull_buffer(XferElement *elt, size_t *size);
291 gboolean xfer_element_cancel(XferElement *elt, gboolean expect_eof);
292 xfer_element_mech_pair_t *xfer_element_get_mech_pairs(XferElement *elt);
297 * These are utilities for subclasses
300 /* Drain UPSTREAM by pulling buffers until EOF
302 * @param upstream: the element to drain
304 void xfer_element_drain_by_pulling(XferElement *upstream);
306 /* Drain UPSTREAM by reading until EOF. This does not close
307 * the file descriptor.
309 * @param fd: the file descriptor to drain
311 void xfer_element_drain_by_reading(int fd);
313 /***********************
314 * XferElement subclasses
316 * These simple subclasses do not introduce any additional public members or
317 * methods, so they do not have their own header file. The functions here
318 * provide their only public interface. The implementation of these elements
319 * can also provide a good prototype for new elements.
322 /* A transfer source that produces LENGTH bytes of random data, for testing
325 * Implemented in source-random.c
327 * @param length: bytes to produce, or zero for no limit
328 * @param prng_seed: initial value for random number generator
329 * @return: new element
331 XferElement *xfer_source_random(guint64 length, guint32 prng_seed);
333 /* Get the ending random seed for the xfer_source_random. Call this after a
334 * transfer has finished, and construct a new xfer_source_random with the seed.
335 * The new source will continue the same random sequence at the next byte. This
336 * is useful for constructing spanned dumps in testing.
338 * @param src: XferSourceRandom object
341 guint32 xfer_source_random_get_seed(XferElement *src);
343 /* A transfer source that produces LENGTH bytes containing repeated
344 * copies of the provided pattern, for testing purposes.
346 * Implemented in source-pattern.c
348 * @param length: bytes to produce, or zero for no limit
349 * @param pattern: Pointer to memory containing the desired byte pattern.
350 * @param pattern_length: Size of pattern to repeat.
351 * @return: new element
353 XferElement *xfer_source_pattern(guint64 length, void * pattern,
354 size_t pattern_length);
356 /* A transfer source that provides bytes read from a file descriptor.
357 * Reading continues until EOF, but the file descriptor is not closed.
359 * Implemented in source-fd.c
361 * @param fd: the file descriptor from which to read
362 * @return: new element
364 XferElement * xfer_source_fd(
367 /* A transfer source that exposes its listening DirectTCPAddrs (via
368 * elt->input_listen_addrs) for external use
370 * Implemented in source-directtcp-listen.c
372 * @return: new element
374 XferElement * xfer_source_directtcp_listen(void);
376 /* A transfer source that connects to a DirectTCP address and pulls data
377 * from it into the transfer.
379 * Implemented in source-directtcp-listen.c
381 * @param addrs: DirectTCP addresses to connect to
382 * @return: new element
384 XferElement * xfer_source_directtcp_connect(DirectTCPAddr *addrs);
386 /* A transfer filter that executes an external application, feeding it data on
387 * stdin and taking the results on stdout.
389 * The memory for ARGV becomes the property of the transfer element and will be
390 * g_free'd when the xfer is destroyed.
392 * Implemented in filter-process.c
394 * @param argv: NULL-terminated command-line arguments
395 * @param need_root: become root before exec'ing the subprocess
396 * @return: new element
398 XferElement *xfer_filter_process(gchar **argv, gboolean need_root);
400 /* A transfer filter that just applies a bytewise XOR transformation to the data
401 * that passes through it.
403 * Implemented in filter-xor.c
405 * @param xor_key: key for xor operations
406 * @return: new element
408 XferElement *xfer_filter_xor(
409 unsigned char xor_key);
411 /* A transfer destination that consumes all bytes it is given, optionally
412 * validating that they match those produced by source_random
414 * Implemented in dest-null.c
416 * @param prng_seed: if nonzero, validate that the datastream matches
417 * that produced by a random source with this random seed. If zero,
418 * no validation is performed.
419 * @return: new element
421 XferElement *xfer_dest_null(
424 /* A transfer destination that writes bytes to a file descriptor. The file
425 * descriptor is not closed when the transfer is complete.
427 * Implemented in dest-fd.c
429 * @param fd: file descriptor to which to write
430 * @return: new element
432 XferElement *xfer_dest_fd(
435 /* A transfer destination that writes bytes to an in-memory buffer.
437 * Implemented in dest-buffer.c
439 * @param max_size: maximum size for the buffer, or zero for no limit
440 * @return: new element
442 XferElement *xfer_dest_buffer(
445 /* Get the buffer and size from an XferDestBuffer. The resulting buffer
446 * will remain allocated until the XDB itself is freed.
448 * Implemented in dest-buffer.c
450 * @param elt: the element
451 * @param buf (output): buffer pointer
452 * @param size (output): buffer size
454 void xfer_dest_buffer_get(
459 /* A transfer dest that connects to a DirectTCPAddr and sends data to
462 * Implemented in dest-directtcp-connect.c
464 * @param addrs: DirectTCP addresses to connect to
465 * @return: new element
467 XferElement * xfer_dest_directtcp_connect(DirectTCPAddr *addrs);
469 /* A transfer dest that listens for a DirecTCP connection and sends data to it
470 * when connected. Listening addresses are exposed at
471 * elt->output_listen_addrs.
473 * Implemented in dest-directtcp-listen.c
475 * @return: new element
477 XferElement * xfer_dest_directtcp_listen(void);