#include "directtcp.h"
#include "util.h"
#include "sockaddr-util.h"
+#include "debug.h"
/*
* Instance definition
DirectTCPAddr **addrsp)
{
int sock;
- sockaddr_union addr;
+ sockaddr_union data_addr;
DirectTCPAddr *addrs;
socklen_t len;
+ struct addrinfo *res;
+ struct addrinfo *res_addr;
+ sockaddr_union *addr = NULL;
- sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
+ if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
+ xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
+ return FALSE;
+ }
+ for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
+ if (res_addr->ai_family == AF_INET) {
+ addr = (sockaddr_union *)res_addr->ai_addr;
+ break;
+ }
+ }
+ if (!addr) {
+ addr = (sockaddr_union *)res->ai_addr;
+ }
+
+ sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
if (sock < 0) {
xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
return FALSE;
}
+ len = SS_LEN(addr);
+ if (bind(sock, (struct sockaddr *)addr, len) != 0) {
+ xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
+ freeaddrinfo(res);
+ return FALSE;
+ }
+
if (listen(sock, 1) < 0) {
xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
return FALSE;
}
/* TODO: which addresses should this display? all ifaces? localhost? */
- len = sizeof(addr);
- if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+ len = sizeof(data_addr);
+ if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
error("getsockname(): %s", strerror(errno));
- g_assert(SU_GET_FAMILY(&addr) == AF_INET);
addrs = g_new0(DirectTCPAddr, 2);
- addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
- addrs[0].port = SU_GET_PORT(&addr);
+ copy_sockaddr(&addrs[0], &data_addr);
*addrsp = addrs;
return TRUE;
}
/* set up the sockaddr -- IPv4 only */
- SU_INIT(&addr, AF_INET);
- SU_SET_PORT(&addr, addrs->port);
- ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+ copy_sockaddr(&addr, addrs);
g_debug("making data connection to %s", str_sockaddr(&addr));
- sock = socket(AF_INET, SOCK_STREAM, 0);
+ sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
if (sock < 0) {
xfer_cancel_with_error(elt,
"socket(): %s", strerror(errno));
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_pulling(elt->upstream);
+ xfer_element_drain_buffers(elt->upstream);
/* close the fd we've been writing, as an EOF signal to downstream, and
* set it to -1 to avoid accidental re-use */
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_reading(rfd);
+ xfer_element_drain_fd(rfd);
/* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully
* kill it and complete the cancellation */
fd, strerror(saved_errno));
wait_until_xfer_cancelled(elt->xfer);
}
+ amfree(buf);
break;
} else if (len == 0) { /* we only count a zero-length read as EOF */
amfree(buf);
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_reading(fd);
+ xfer_element_drain_fd(fd);
/* send an EOF indication downstream */
xfer_element_push_buffer(elt->downstream, NULL, 0);
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_pulling(elt->upstream);
+ xfer_element_drain_buffers(elt->upstream);
if (!eof_sent)
xfer_element_push_buffer(elt->downstream, NULL, 0);
case PULL_FROM_FD: {
int fd = get_read_fd(self);
- char *buf = g_malloc(GLUE_BUFFER_SIZE);
+ char *buf;
ssize_t len;
/* if the fd is already closed, it's possible upstream bailed out
if (elt->cancelled || fd == -1) {
if (fd != -1) {
if (elt->expect_eof)
- xfer_element_drain_by_reading(fd);
+ xfer_element_drain_fd(fd);
close_read_fd(self);
}
return NULL;
}
+ buf = g_malloc(GLUE_BUFFER_SIZE);
+
/* read from upstream */
len = full_read(fd, buf, GLUE_BUFFER_SIZE);
if (len < GLUE_BUFFER_SIZE) {
/* and finish off the upstream */
if (elt->expect_eof) {
- xfer_element_drain_by_reading(fd);
+ xfer_element_drain_fd(fd);
}
close_read_fd(self);
} else if (len == 0) {
/* close our pipes and fd's if they're still open */
if (self->pipe[0] != -1) close(self->pipe[0]);
if (self->pipe[1] != -1) close(self->pipe[1]);
- if (self->input_listen_socket != -1) close(self->input_listen_socket);
- if (self->output_listen_socket != -1) close(self->output_listen_socket);
if (self->input_data_socket != -1) close(self->input_data_socket);
if (self->output_data_socket != -1) close(self->output_data_socket);
+ if (self->input_listen_socket != -1) close(self->input_listen_socket);
+ if (self->output_listen_socket != -1) close(self->output_listen_socket);
if (self->read_fd != -1) close(self->read_fd);
if (self->write_fd != -1) close(self->write_fd);
}
static xfer_element_mech_pair_t _pairs[] = {
- { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
- { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
- { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
- { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
- { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
- { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
- { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
- { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
- { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
- { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
-
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
-
- { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
-
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
+ { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+ { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+ { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+ { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+ { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+ { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
+ { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
+ { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
+ { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
+ { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
/* terminator */
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;