Imported Upstream version 3.3.0
[debian/amanda] / xfer-src / element-glue.c
index ce71f113b5a581f89c9f58a2db415ca5923b5888..ecc543d66d924ed1c407ff58af3a6562305158de 100644 (file)
@@ -25,6 +25,7 @@
 #include "directtcp.h"
 #include "util.h"
 #include "sockaddr-util.h"
+#include "debug.h"
 
 /*
  * Instance definition
@@ -111,30 +112,52 @@ do_directtcp_listen(
     DirectTCPAddr **addrsp)
 {
     int sock;
-    sockaddr_union addr;
+    sockaddr_union data_addr;
     DirectTCPAddr *addrs;
     socklen_t len;
+    struct addrinfo *res;
+    struct addrinfo *res_addr;
+    sockaddr_union *addr = NULL;
+
+    if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
+       xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
+       return FALSE;
+    }
+    for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
+       if (res_addr->ai_family == AF_INET) {
+            addr = (sockaddr_union *)res_addr->ai_addr;
+            break;
+        }
+    }
+    if (!addr) {
+        addr = (sockaddr_union *)res->ai_addr;
+    }
 
-    sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
+    sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
     if (sock < 0) {
        xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
        return FALSE;
     }
 
+    len = SS_LEN(addr);
+    if (bind(sock, (struct sockaddr *)addr, len) != 0) {
+       xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
+       freeaddrinfo(res);
+       return FALSE;
+    }
+
     if (listen(sock, 1) < 0) {
        xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
        return FALSE;
     }
 
     /* TODO: which addresses should this display? all ifaces? localhost? */
-    len = sizeof(addr);
-    if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+    len = sizeof(data_addr);
+    if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
        error("getsockname(): %s", strerror(errno));
-    g_assert(SU_GET_FAMILY(&addr) == AF_INET);
 
     addrs = g_new0(DirectTCPAddr, 2);
-    addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
-    addrs[0].port = SU_GET_PORT(&addr);
+    copy_sockaddr(&addrs[0], &data_addr);
     *addrsp = addrs;
 
     return TRUE;
@@ -195,12 +218,10 @@ do_directtcp_connect(
     }
 
     /* set up the sockaddr -- IPv4 only */
-    SU_INIT(&addr, AF_INET);
-    SU_SET_PORT(&addr, addrs->port);
-    ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+    copy_sockaddr(&addr, addrs);
 
     g_debug("making data connection to %s", str_sockaddr(&addr));
-    sock = socket(AF_INET, SOCK_STREAM, 0);
+    sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
     if (sock < 0) {
        xfer_cancel_with_error(elt,
            "socket(): %s", strerror(errno));
@@ -323,7 +344,7 @@ pull_and_write(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_pulling(elt->upstream);
+       xfer_element_drain_buffers(elt->upstream);
 
     /* close the fd we've been writing, as an EOF signal to downstream, and
      * set it to -1 to avoid accidental re-use */
@@ -370,7 +391,7 @@ read_and_write(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_reading(rfd);
+       xfer_element_drain_fd(rfd);
 
     /* close the read fd.  If it's not at EOF, then upstream will get EPIPE, which will hopefully
      * kill it and complete the cancellation */
@@ -405,6 +426,7 @@ read_and_push(
                            fd, strerror(saved_errno));
                    wait_until_xfer_cancelled(elt->xfer);
                }
+                amfree(buf);
                break;
            } else if (len == 0) { /* we only count a zero-length read as EOF */
                amfree(buf);
@@ -416,7 +438,7 @@ read_and_push(
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_reading(fd);
+       xfer_element_drain_fd(fd);
 
     /* send an EOF indication downstream */
     xfer_element_push_buffer(elt->downstream, NULL, 0);
@@ -448,7 +470,7 @@ pull_and_push(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_pulling(elt->upstream);
+       xfer_element_drain_buffers(elt->upstream);
 
     if (!eof_sent)
        xfer_element_push_buffer(elt->downstream, NULL, 0);
@@ -937,7 +959,7 @@ pull_buffer_impl(
 
        case PULL_FROM_FD: {
            int fd = get_read_fd(self);
-           char *buf = g_malloc(GLUE_BUFFER_SIZE);
+           char *buf;
            ssize_t len;
 
            /* if the fd is already closed, it's possible upstream bailed out
@@ -945,7 +967,7 @@ pull_buffer_impl(
            if (elt->cancelled || fd == -1) {
                if (fd != -1) {
                    if (elt->expect_eof)
-                       xfer_element_drain_by_reading(fd);
+                       xfer_element_drain_fd(fd);
 
                    close_read_fd(self);
                }
@@ -954,6 +976,8 @@ pull_buffer_impl(
                return NULL;
            }
 
+           buf = g_malloc(GLUE_BUFFER_SIZE);
+
            /* read from upstream */
            len = full_read(fd, buf, GLUE_BUFFER_SIZE);
            if (len < GLUE_BUFFER_SIZE) {
@@ -970,7 +994,7 @@ pull_buffer_impl(
 
                    /* and finish off the upstream */
                    if (elt->expect_eof) {
-                       xfer_element_drain_by_reading(fd);
+                       xfer_element_drain_fd(fd);
                    }
                    close_read_fd(self);
                } else if (len == 0) {
@@ -1166,44 +1190,44 @@ finalize_impl(
 }
 
 static xfer_element_mech_pair_t _pairs[] = {
-    { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
-    { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
-    { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
-    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
-    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
-    { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
-    { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
-    { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
-    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
-    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
-
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
-
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
-
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy  */
+    { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+    { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+    { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+    { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
+    { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
+    { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
+    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
+    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy  */
 
     /* terminator */
-    { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
 };
 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;