Merge tag 'upstream/3.3.2'
[debian/amanda] / xfer-src / xfer-test.c
index 30e69700e8afa30ce0e100f8c87ecc87eee34504..9c43b4a1c87cd65787152c3198f478e6365f46f2 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
  * 
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 as published
  * Author: Dustin J. Mitchell <dustin@zmanda.com>
  */
 
+#include "amanda.h"
 #include "amxfer.h"
 #include "glib-util.h"
 #include "testutils.h"
-#include "amanda.h"
 #include "event.h"
 #include "simpleprng.h"
 #include "sockaddr-util.h"
@@ -103,7 +103,7 @@ source_readfd_setup_impl(
        g_critical("Error from pipe(): %s", strerror(errno));
 
     self->write_fd = p[1];
-    XFER_ELEMENT(self)->output_fd = p[0];
+    g_assert(xfer_element_swap_output_fd(elt, p[0]) == -1);
 
     return TRUE;
 }
@@ -124,8 +124,8 @@ source_readfd_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_NONE, XFER_MECH_READFD, 1, 1},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_NONE, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->setup = source_readfd_setup_impl;
@@ -184,8 +184,12 @@ source_writefd_thread(
     gpointer data)
 {
     XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
+    XferElement *elt = XFER_ELEMENT(data);
     char buf[TEST_XFER_SIZE];
-    int fd = XFER_ELEMENT(self)->downstream->input_fd;
+    int fd = xfer_element_swap_input_fd(elt->downstream, -1);
+
+    /* this shouldn't happen, although non-test elements handle it gracefully */
+    g_assert(fd != -1);
 
     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
 
@@ -194,7 +198,6 @@ source_writefd_thread(
     }
 
     close(fd);
-    XFER_ELEMENT(self)->downstream->input_fd = -1;
 
     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
 
@@ -220,8 +223,8 @@ source_writefd_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_NONE, XFER_MECH_WRITEFD, 1, 1},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_NONE, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->start = source_writefd_start_impl;
@@ -322,8 +325,8 @@ source_push_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, 1, 1},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->start = source_push_start_impl;
@@ -417,8 +420,8 @@ source_pull_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->pull_buffer = source_pull_pull_buffer_impl;
@@ -479,28 +482,24 @@ source_listen_thread(
     XferSourceListen *self = XFER_SOURCE_LISTEN(data);
     XferElement *elt = XFER_ELEMENT(self);
     DirectTCPAddr *addrs;
-    sockaddr_union addr;
     int sock;
     char *buf;
     int i;
 
     /* set up the sockaddr -- IPv4 only */
-    SU_INIT(&addr, AF_INET);
     addrs = elt->downstream->input_listen_addrs;
     g_assert(addrs != NULL);
-    SU_SET_PORT(&addr, addrs->port);
-    ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
 
-    tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
-    sock = socket(AF_INET, SOCK_STREAM, 0);
+    tu_dbg("making data connection to %s\n", str_sockaddr(addrs));
+    sock = socket(SU_GET_FAMILY(addrs), SOCK_STREAM, 0);
     if (sock < 0) {
        error("socket(): %s", strerror(errno));
     }
-    if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
+    if (connect(sock, (struct sockaddr *)addrs, SS_LEN(addrs)) < 0) {
        error("connect(): %s", strerror(errno));
     }
 
-    tu_dbg("connected to %s\n", str_sockaddr(&addr));
+    tu_dbg("connected to %s\n", str_sockaddr(addrs));
 
     buf = g_malloc(TEST_BLOCK_SIZE);
     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
@@ -544,8 +543,8 @@ source_listen_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->start = source_listen_start_impl;
@@ -671,8 +670,7 @@ source_connect_setup_impl(
     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
 
     addrs = g_new0(DirectTCPAddr, 2);
-    addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
-    addrs[0].port = SU_GET_PORT(&addr);
+    copy_sockaddr(&addrs[0], &addr);
     elt->output_listen_addrs = addrs;
 
     return TRUE;
@@ -697,8 +695,8 @@ source_connect_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->setup = source_connect_setup_impl;
@@ -757,9 +755,13 @@ dest_readfd_thread(
     gpointer data)
 {
     XferDestReadfd *self = XFER_DEST_READFD(data);
+    XferElement *elt = XFER_ELEMENT(data);
     char buf[TEST_XFER_SIZE];
     size_t remaining;
-    int fd = XFER_ELEMENT(self)->upstream->output_fd;
+    int fd = xfer_element_swap_output_fd(elt->upstream, -1);
+
+    /* this shouldn't happen, although non-test elements handle it gracefully */
+    g_assert(fd != -1);
 
     remaining = sizeof(buf);
     while (remaining) {
@@ -778,7 +780,6 @@ dest_readfd_thread(
        g_critical("data entering XferDestReadfd does not match");
 
     close(fd);
-    XFER_ELEMENT(self)->upstream->output_fd = -1;
 
     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
 
@@ -804,8 +805,8 @@ dest_readfd_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_READFD, XFER_MECH_NONE, 1, 1},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_READFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->start = dest_readfd_start_impl;
@@ -885,7 +886,6 @@ dest_writefd_thread(
        g_critical("data entering XferDestWritefd does not match");
 
     close(fd);
-    XFER_ELEMENT(self)->upstream->output_fd = -1;
 
     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
 
@@ -905,7 +905,7 @@ dest_writefd_setup_impl(
        g_critical("Error from pipe(): %s", strerror(errno));
 
     self->read_fd = p[0];
-    XFER_ELEMENT(self)->input_fd = p[1];
+    g_assert(xfer_element_swap_input_fd(elt, p[1]) == -1);
 
     return TRUE;
 }
@@ -926,8 +926,8 @@ dest_writefd_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_WRITEFD, XFER_MECH_NONE, 1, 1},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_WRITEFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->setup = dest_writefd_setup_impl;
@@ -1024,8 +1024,8 @@ dest_push_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 0},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(0) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->push_buffer = dest_push_push_buffer_impl;
@@ -1124,8 +1124,8 @@ dest_pull_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 1, 1},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->start = dest_pull_start_impl;
@@ -1243,8 +1243,7 @@ dest_listen_setup_impl(
     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
 
     addrs = g_new0(DirectTCPAddr, 2);
-    addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
-    addrs[0].port = SU_GET_PORT(&addr);
+    copy_sockaddr(&addrs[0], &addr);
     elt->input_listen_addrs = addrs;
 
     return TRUE;
@@ -1269,8 +1268,8 @@ dest_listen_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 1, 1},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->setup = dest_listen_setup_impl;
@@ -1342,11 +1341,10 @@ dest_connect_thread(
     SU_INIT(&addr, AF_INET);
     addrs = elt->upstream->output_listen_addrs;
     g_assert(addrs != NULL);
-    SU_SET_PORT(&addr, addrs->port);
-    ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+    copy_sockaddr(&addr, addrs);
 
     tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
-    sock = socket(AF_INET, SOCK_STREAM, 0);
+    sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
     if (sock < 0) {
        error("socket(): %s", strerror(errno));
     }
@@ -1392,8 +1390,8 @@ dest_connect_class_init(
 {
     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, 1, 1},
-       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
+       { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
     };
 
     xec->start = dest_connect_start_impl;
@@ -1481,7 +1479,7 @@ test_xfer_simple(void)
        elements[i] = NULL;
     }
 
-    xfer_start(xfer);
+    xfer_start(xfer, 0, 0);
 
     g_main_loop_run(default_main_loop());
     g_assert(xfer->status == XFER_DONE);
@@ -1536,7 +1534,7 @@ test_xfer_files(gboolean add_filters)
        elements[i] = NULL;
     }
 
-    xfer_start(xfer);
+    xfer_start(xfer, 0, 0);
 
     g_main_loop_run(default_main_loop());
     g_assert(xfer->status == XFER_DONE);
@@ -1585,7 +1583,7 @@ test_glue_combo(
        elements[i] = NULL;
     }
 
-    xfer_start(xfer);
+    xfer_start(xfer, 0, 0);
 
     g_main_loop_run(default_main_loop());
     g_assert(xfer->status == XFER_DONE);