#include "directtcp.h"
#include "util.h"
#include "sockaddr-util.h"
+#include "debug.h"
/*
* Instance definition
/* a ring buffer of ptr/size pairs with semaphores */
struct { gpointer buf; size_t size; } *ring;
- semaphore_t *ring_used_sem, *ring_free_sem;
+ amsemaphore_t *ring_used_sem, *ring_free_sem;
gint ring_head, ring_tail;
GThread *thread;
DirectTCPAddr **addrsp)
{
int sock;
- sockaddr_union addr;
+ sockaddr_union data_addr;
DirectTCPAddr *addrs;
socklen_t len;
+ struct addrinfo *res;
+ struct addrinfo *res_addr;
+ sockaddr_union *addr = NULL;
+
+ if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
+ xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
+ return FALSE;
+ }
+ for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
+ if (res_addr->ai_family == AF_INET) {
+ addr = (sockaddr_union *)res_addr->ai_addr;
+ break;
+ }
+ }
+ if (!addr) {
+ addr = (sockaddr_union *)res->ai_addr;
+ }
- sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
+ sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
if (sock < 0) {
xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
return FALSE;
}
+ len = SS_LEN(addr);
+ if (bind(sock, (struct sockaddr *)addr, len) != 0) {
+ xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
+ freeaddrinfo(res);
+ return FALSE;
+ }
+
if (listen(sock, 1) < 0) {
xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
return FALSE;
}
/* TODO: which addresses should this display? all ifaces? localhost? */
- len = sizeof(addr);
- if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+ len = sizeof(data_addr);
+ if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
error("getsockname(): %s", strerror(errno));
- g_assert(SU_GET_FAMILY(&addr) == AF_INET);
addrs = g_new0(DirectTCPAddr, 2);
- addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
- addrs[0].port = SU_GET_PORT(&addr);
+ copy_sockaddr(&addrs[0], &data_addr);
*addrsp = addrs;
return TRUE;
}
/* set up the sockaddr -- IPv4 only */
- SU_INIT(&addr, AF_INET);
- SU_SET_PORT(&addr, addrs->port);
- ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+ copy_sockaddr(&addr, addrs);
g_debug("making data connection to %s", str_sockaddr(&addr));
- sock = socket(AF_INET, SOCK_STREAM, 0);
+ sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
if (sock < 0) {
xfer_cancel_with_error(elt,
"socket(): %s", strerror(errno));
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_pulling(elt->upstream);
+ xfer_element_drain_buffers(elt->upstream);
/* close the fd we've been writing, as an EOF signal to downstream, and
* set it to -1 to avoid accidental re-use */
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_reading(rfd);
+ xfer_element_drain_fd(rfd);
/* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully
* kill it and complete the cancellation */
fd, strerror(saved_errno));
wait_until_xfer_cancelled(elt->xfer);
}
+ amfree(buf);
break;
} else if (len == 0) { /* we only count a zero-length read as EOF */
amfree(buf);
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_reading(fd);
+ xfer_element_drain_fd(fd);
/* send an EOF indication downstream */
xfer_element_push_buffer(elt->downstream, NULL, 0);
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_pulling(elt->upstream);
+ xfer_element_drain_buffers(elt->upstream);
if (!eof_sent)
xfer_element_push_buffer(elt->downstream, NULL, 0);
/* set up ring if desired */
if (need_ring) {
self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
- self->ring_used_sem = semaphore_new_with_value(0);
- self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
+ self->ring_used_sem = amsemaphore_new_with_value(0);
+ self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
}
if (need_listen_input) {
}
/* make sure there's at least one element available */
- semaphore_down(self->ring_used_sem);
+ amsemaphore_down(self->ring_used_sem);
/* get it */
buf = self->ring[self->ring_tail].buf;
self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
/* and mark this element as free to be overwritten */
- semaphore_up(self->ring_free_sem);
+ amsemaphore_up(self->ring_free_sem);
return buf;
}
case PULL_FROM_FD: {
int fd = get_read_fd(self);
- char *buf = g_malloc(GLUE_BUFFER_SIZE);
+ char *buf;
ssize_t len;
/* if the fd is already closed, it's possible upstream bailed out
if (elt->cancelled || fd == -1) {
if (fd != -1) {
if (elt->expect_eof)
- xfer_element_drain_by_reading(fd);
+ xfer_element_drain_fd(fd);
close_read_fd(self);
}
return NULL;
}
+ buf = g_malloc(GLUE_BUFFER_SIZE);
+
/* read from upstream */
len = full_read(fd, buf, GLUE_BUFFER_SIZE);
if (len < GLUE_BUFFER_SIZE) {
/* and finish off the upstream */
if (elt->expect_eof) {
- xfer_element_drain_by_reading(fd);
+ xfer_element_drain_fd(fd);
}
close_read_fd(self);
} else if (len == 0) {
}
/* make sure there's at least one element free */
- semaphore_down(self->ring_free_sem);
+ amsemaphore_down(self->ring_free_sem);
/* set it */
self->ring[self->ring_head].buf = buf;
self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
/* and mark this element as available for reading */
- semaphore_up(self->ring_used_sem);
+ amsemaphore_up(self->ring_used_sem);
return;
}
amfree(self->ring);
- semaphore_free(self->ring_used_sem);
- semaphore_free(self->ring_free_sem);
+ amsemaphore_free(self->ring_used_sem);
+ amsemaphore_free(self->ring_free_sem);
}
/* chain up */
}
static xfer_element_mech_pair_t _pairs[] = {
- { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
- { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
- { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
- { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
- { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
- { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
- { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
- { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
- { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
- { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
-
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
-
- { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
-
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
+ { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+ { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+ { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+ { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+ { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+ { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
+ { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
+ { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
+ { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
+ { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
/* terminator */
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;