Merge tag 'upstream/3.3.2'
[debian/amanda] / xfer-src / element-glue.c
index 271369c516b598dbc29421e3c3a97bca196ad9d6..bb31e938dd0f2206efb6ce99da56ba2992279fa6 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 as published
@@ -25,6 +25,7 @@
 #include "directtcp.h"
 #include "util.h"
 #include "sockaddr-util.h"
+#include "debug.h"
 
 /*
  * Instance definition
@@ -67,7 +68,7 @@ typedef struct XferElementGlue_ {
 
     /* a ring buffer of ptr/size pairs with semaphores */
     struct { gpointer buf; size_t size; } *ring;
-    semaphore_t *ring_used_sem, *ring_free_sem;
+    amsemaphore_t *ring_used_sem, *ring_free_sem;
     gint ring_head, ring_tail;
 
     GThread *thread;
@@ -111,30 +112,52 @@ do_directtcp_listen(
     DirectTCPAddr **addrsp)
 {
     int sock;
-    sockaddr_union addr;
+    sockaddr_union data_addr;
     DirectTCPAddr *addrs;
     socklen_t len;
+    struct addrinfo *res;
+    struct addrinfo *res_addr;
+    sockaddr_union *addr = NULL;
 
-    sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
+    if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
+       xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
+       return FALSE;
+    }
+    for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
+       if (res_addr->ai_family == AF_INET) {
+            addr = (sockaddr_union *)res_addr->ai_addr;
+            break;
+        }
+    }
+    if (!addr) {
+        addr = (sockaddr_union *)res->ai_addr;
+    }
+
+    sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
     if (sock < 0) {
        xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
        return FALSE;
     }
 
+    len = SS_LEN(addr);
+    if (bind(sock, (struct sockaddr *)addr, len) != 0) {
+       xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
+       freeaddrinfo(res);
+       return FALSE;
+    }
+
     if (listen(sock, 1) < 0) {
        xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
        return FALSE;
     }
 
     /* TODO: which addresses should this display? all ifaces? localhost? */
-    len = sizeof(addr);
-    if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+    len = sizeof(data_addr);
+    if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
        error("getsockname(): %s", strerror(errno));
-    g_assert(SU_GET_FAMILY(&addr) == AF_INET);
 
     addrs = g_new0(DirectTCPAddr, 2);
-    addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
-    addrs[0].port = SU_GET_PORT(&addr);
+    copy_sockaddr(&addrs[0], &data_addr);
     *addrsp = addrs;
 
     return TRUE;
@@ -195,12 +218,10 @@ do_directtcp_connect(
     }
 
     /* set up the sockaddr -- IPv4 only */
-    SU_INIT(&addr, AF_INET);
-    SU_SET_PORT(&addr, addrs->port);
-    ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+    copy_sockaddr(&addr, addrs);
 
-    g_debug("making data connection to %s", str_sockaddr(&addr));
-    sock = socket(AF_INET, SOCK_STREAM, 0);
+    g_debug("do_directtcp_connect making data connection to %s", str_sockaddr(&addr));
+    sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
     if (sock < 0) {
        xfer_cancel_with_error(elt,
            "socket(): %s", strerror(errno));
@@ -323,7 +344,7 @@ pull_and_write(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_pulling(elt->upstream);
+       xfer_element_drain_buffers(elt->upstream);
 
     /* close the fd we've been writing, as an EOF signal to downstream, and
      * set it to -1 to avoid accidental re-use */
@@ -370,7 +391,7 @@ read_and_write(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_reading(rfd);
+       xfer_element_drain_fd(rfd);
 
     /* close the read fd.  If it's not at EOF, then upstream will get EPIPE, which will hopefully
      * kill it and complete the cancellation */
@@ -405,6 +426,7 @@ read_and_push(
                            fd, strerror(saved_errno));
                    wait_until_xfer_cancelled(elt->xfer);
                }
+                amfree(buf);
                break;
            } else if (len == 0) { /* we only count a zero-length read as EOF */
                amfree(buf);
@@ -416,7 +438,7 @@ read_and_push(
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_reading(fd);
+       xfer_element_drain_fd(fd);
 
     /* send an EOF indication downstream */
     xfer_element_push_buffer(elt->downstream, NULL, 0);
@@ -448,7 +470,7 @@ pull_and_push(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_pulling(elt->upstream);
+       xfer_element_drain_buffers(elt->upstream);
 
     if (!eof_sent)
        xfer_element_push_buffer(elt->downstream, NULL, 0);
@@ -829,8 +851,8 @@ setup_impl(
     /* set up ring if desired */
     if (need_ring) {
        self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
-       self->ring_used_sem = semaphore_new_with_value(0);
-       self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
+       self->ring_used_sem = amsemaphore_new_with_value(0);
+       self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
     }
 
     if (need_listen_input) {
@@ -922,7 +944,7 @@ pull_buffer_impl(
            }
 
            /* make sure there's at least one element available */
-           semaphore_down(self->ring_used_sem);
+           amsemaphore_down(self->ring_used_sem);
 
            /* get it */
            buf = self->ring[self->ring_tail].buf;
@@ -930,14 +952,14 @@ pull_buffer_impl(
            self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
 
            /* and mark this element as free to be overwritten */
-           semaphore_up(self->ring_free_sem);
+           amsemaphore_up(self->ring_free_sem);
 
            return buf;
        }
 
        case PULL_FROM_FD: {
            int fd = get_read_fd(self);
-           char *buf = g_malloc(GLUE_BUFFER_SIZE);
+           char *buf;
            ssize_t len;
 
            /* if the fd is already closed, it's possible upstream bailed out
@@ -945,7 +967,7 @@ pull_buffer_impl(
            if (elt->cancelled || fd == -1) {
                if (fd != -1) {
                    if (elt->expect_eof)
-                       xfer_element_drain_by_reading(fd);
+                       xfer_element_drain_fd(fd);
 
                    close_read_fd(self);
                }
@@ -954,6 +976,8 @@ pull_buffer_impl(
                return NULL;
            }
 
+           buf = g_malloc(GLUE_BUFFER_SIZE);
+
            /* read from upstream */
            len = full_read(fd, buf, GLUE_BUFFER_SIZE);
            if (len < GLUE_BUFFER_SIZE) {
@@ -970,7 +994,7 @@ pull_buffer_impl(
 
                    /* and finish off the upstream */
                    if (elt->expect_eof) {
-                       xfer_element_drain_by_reading(fd);
+                       xfer_element_drain_fd(fd);
                    }
                    close_read_fd(self);
                } else if (len == 0) {
@@ -1053,7 +1077,7 @@ push_buffer_impl(
            }
 
            /* make sure there's at least one element free */
-           semaphore_down(self->ring_free_sem);
+           amsemaphore_down(self->ring_free_sem);
 
            /* set it */
            self->ring[self->ring_head].buf = buf;
@@ -1061,7 +1085,7 @@ push_buffer_impl(
            self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
 
            /* and mark this element as available for reading */
-           semaphore_up(self->ring_used_sem);
+           amsemaphore_up(self->ring_used_sem);
 
            return;
 
@@ -1141,10 +1165,10 @@ finalize_impl(
     /* close our pipes and fd's if they're still open */
     if (self->pipe[0] != -1) close(self->pipe[0]);
     if (self->pipe[1] != -1) close(self->pipe[1]);
-    if (self->input_listen_socket != -1) close(self->input_listen_socket);
-    if (self->output_listen_socket != -1) close(self->output_listen_socket);
     if (self->input_data_socket != -1) close(self->input_data_socket);
     if (self->output_data_socket != -1) close(self->output_data_socket);
+    if (self->input_listen_socket != -1) close(self->input_listen_socket);
+    if (self->output_listen_socket != -1) close(self->output_listen_socket);
     if (self->read_fd != -1) close(self->read_fd);
     if (self->write_fd != -1) close(self->write_fd);
 
@@ -1157,8 +1181,8 @@ finalize_impl(
        }
 
        amfree(self->ring);
-       semaphore_free(self->ring_used_sem);
-       semaphore_free(self->ring_free_sem);
+       amsemaphore_free(self->ring_used_sem);
+       amsemaphore_free(self->ring_free_sem);
     }
 
     /* chain up */
@@ -1166,44 +1190,44 @@ finalize_impl(
 }
 
 static xfer_element_mech_pair_t _pairs[] = {
-    { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
-    { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
-    { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
-    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
-    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
-    { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
-    { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
-    { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
-    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
-    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
-
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
-
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
-
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy  */
+    { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+    { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+    { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+    { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
+    { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
+    { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
+    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
+    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy  */
 
     /* terminator */
-    { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
 };
 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;