* Author: Dustin J. Mitchell <dustin@zmanda.com>
*/
+#include "amanda.h"
#include "amxfer.h"
#include "glib-util.h"
#include "testutils.h"
-#include "amanda.h"
#include "event.h"
#include "simpleprng.h"
#include "sockaddr-util.h"
g_critical("Error from pipe(): %s", strerror(errno));
self->write_fd = p[1];
- XFER_ELEMENT(self)->output_fd = p[0];
+ g_assert(xfer_element_swap_output_fd(elt, p[0]) == -1);
return TRUE;
}
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_NONE, XFER_MECH_READFD, 1, 1},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_NONE, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->setup = source_readfd_setup_impl;
gpointer data)
{
XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
+ XferElement *elt = XFER_ELEMENT(data);
char buf[TEST_XFER_SIZE];
- int fd = XFER_ELEMENT(self)->downstream->input_fd;
+ int fd = xfer_element_swap_input_fd(elt->downstream, -1);
+
+ /* this shouldn't happen, although non-test elements handle it gracefully */
+ g_assert(fd != -1);
simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
}
close(fd);
- XFER_ELEMENT(self)->downstream->input_fd = -1;
xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_NONE, XFER_MECH_WRITEFD, 1, 1},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_NONE, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->start = source_writefd_start_impl;
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, 1, 1},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->start = source_push_start_impl;
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->pull_buffer = source_pull_pull_buffer_impl;
XferSourceListen *self = XFER_SOURCE_LISTEN(data);
XferElement *elt = XFER_ELEMENT(self);
DirectTCPAddr *addrs;
- sockaddr_union addr;
int sock;
char *buf;
int i;
/* set up the sockaddr -- IPv4 only */
- SU_INIT(&addr, AF_INET);
addrs = elt->downstream->input_listen_addrs;
g_assert(addrs != NULL);
- SU_SET_PORT(&addr, addrs->port);
- ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
- tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
- sock = socket(AF_INET, SOCK_STREAM, 0);
+ tu_dbg("making data connection to %s\n", str_sockaddr(addrs));
+ sock = socket(SU_GET_FAMILY(addrs), SOCK_STREAM, 0);
if (sock < 0) {
error("socket(): %s", strerror(errno));
}
- if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
+ if (connect(sock, (struct sockaddr *)addrs, SS_LEN(addrs)) < 0) {
error("connect(): %s", strerror(errno));
}
- tu_dbg("connected to %s\n", str_sockaddr(&addr));
+ tu_dbg("connected to %s\n", str_sockaddr(addrs));
buf = g_malloc(TEST_BLOCK_SIZE);
for (i = 0; i < TEST_BLOCK_COUNT; i++) {
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->start = source_listen_start_impl;
g_assert(SU_GET_FAMILY(&addr) == AF_INET);
addrs = g_new0(DirectTCPAddr, 2);
- addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
- addrs[0].port = SU_GET_PORT(&addr);
+ copy_sockaddr(&addrs[0], &addr);
elt->output_listen_addrs = addrs;
return TRUE;
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->setup = source_connect_setup_impl;
gpointer data)
{
XferDestReadfd *self = XFER_DEST_READFD(data);
+ XferElement *elt = XFER_ELEMENT(data);
char buf[TEST_XFER_SIZE];
size_t remaining;
- int fd = XFER_ELEMENT(self)->upstream->output_fd;
+ int fd = xfer_element_swap_output_fd(elt->upstream, -1);
+
+ /* this shouldn't happen, although non-test elements handle it gracefully */
+ g_assert(fd != -1);
remaining = sizeof(buf);
while (remaining) {
g_critical("data entering XferDestReadfd does not match");
close(fd);
- XFER_ELEMENT(self)->upstream->output_fd = -1;
xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_READFD, XFER_MECH_NONE, 1, 1},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_READFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->start = dest_readfd_start_impl;
g_critical("data entering XferDestWritefd does not match");
close(fd);
- XFER_ELEMENT(self)->upstream->output_fd = -1;
xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
g_critical("Error from pipe(): %s", strerror(errno));
self->read_fd = p[0];
- XFER_ELEMENT(self)->input_fd = p[1];
+ g_assert(xfer_element_swap_input_fd(elt, p[1]) == -1);
return TRUE;
}
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_WRITEFD, XFER_MECH_NONE, 1, 1},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_WRITEFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->setup = dest_writefd_setup_impl;
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 0},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(0) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->push_buffer = dest_push_push_buffer_impl;
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 1, 1},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->start = dest_pull_start_impl;
g_assert(SU_GET_FAMILY(&addr) == AF_INET);
addrs = g_new0(DirectTCPAddr, 2);
- addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
- addrs[0].port = SU_GET_PORT(&addr);
+ copy_sockaddr(&addrs[0], &addr);
elt->input_listen_addrs = addrs;
return TRUE;
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 1, 1},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->setup = dest_listen_setup_impl;
SU_INIT(&addr, AF_INET);
addrs = elt->upstream->output_listen_addrs;
g_assert(addrs != NULL);
- SU_SET_PORT(&addr, addrs->port);
- ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+ copy_sockaddr(&addr, addrs);
tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
- sock = socket(AF_INET, SOCK_STREAM, 0);
+ sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
if (sock < 0) {
error("socket(): %s", strerror(errno));
}
{
XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, 1, 1},
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xec->start = dest_connect_start_impl;
elements[i] = NULL;
}
- xfer_start(xfer);
+ xfer_start(xfer, 0, 0);
g_main_loop_run(default_main_loop());
g_assert(xfer->status == XFER_DONE);
elements[i] = NULL;
}
- xfer_start(xfer);
+ xfer_start(xfer, 0, 0);
g_main_loop_run(default_main_loop());
g_assert(xfer->status == XFER_DONE);
elements[i] = NULL;
}
- xfer_start(xfer);
+ xfer_start(xfer, 0, 0);
g_main_loop_run(default_main_loop());
g_assert(xfer->status == XFER_DONE);