* Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
+#include "amanda.h"
#include "amxfer.h"
#include "element-glue.h"
-#include "amanda.h"
#include "directtcp.h"
+#include "util.h"
#include "sockaddr-util.h"
/*
/* instructions to push_buffer_impl */
enum {
PUSH_TO_RING_BUFFER,
- PUSH_TO_FD, /* write to *write_fdp */
+ PUSH_TO_FD, /* write to write_fd */
PUSH_INVALID,
PUSH_ACCEPT_FIRST = (1 << 16),
/* instructions to pull_buffer_impl */
enum {
PULL_FROM_RING_BUFFER,
- PULL_FROM_FD, /* read from *read_fdp */
+ PULL_FROM_FD, /* read from read_fd */
PULL_INVALID,
PULL_ACCEPT_FIRST = (1 << 16),
int pipe[2];
int input_listen_socket, output_listen_socket;
int input_data_socket, output_data_socket;
+ int read_fd, write_fd;
/* a ring buffer of ptr/size pairs with semaphores */
struct { gpointer buf; size_t size; } *ring;
return TRUE;
}
+static gboolean
+prolong_accept(
+ gpointer data)
+{
+ return !XFER_ELEMENT(data)->cancelled;
+}
+
static int
do_directtcp_accept(
XferElementGlue *self,
int sock;
g_assert(*socketp != -1);
- if ((sock = accept(*socketp, NULL, NULL)) == -1) {
+ if ((sock = interruptible_accept(*socketp, NULL, NULL,
+ prolong_accept, self)) == -1) {
+ /* if the accept was interrupted due to a cancellation, then do not
+ * add a further error message */
+ if (errno == 0 && XFER_ELEMENT(self)->cancelled)
+ return -1;
+
xfer_cancel_with_error(XFER_ELEMENT(self),
_("Error accepting incoming connection: %s"), strerror(errno));
wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
#define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
-/* if self->read_fdp or self->write_fdp are pointing to this integer, then
- * they should be redirected to point to the upstream's output_fd or
- * downstream's input_fd, respectively, at start() */
+/*
+ * fd handling
+ */
+
+/* if self->read_fdp or self->write_fdp are pointing to this integer, then they
+ * should be redirected to point to the upstream's output_fd or downstream's
+ * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
+ * respectively. */
static int neighboring_element_fd = -1;
+#define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
+static int
+_get_read_fd(XferElementGlue *self)
+{
+ if (!self->read_fdp)
+ return -1; /* shouldn't happen.. */
+
+ if (self->read_fdp == &neighboring_element_fd) {
+ XferElement *elt = XFER_ELEMENT(self);
+ self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
+ } else {
+ self->read_fd = *self->read_fdp;
+ *self->read_fdp = -1;
+ }
+ self->read_fdp = NULL;
+ return self->read_fd;
+}
+
+#define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
+static int
+_get_write_fd(XferElementGlue *self)
+{
+ if (!self->write_fdp)
+ return -1; /* shouldn't happen.. */
+
+ if (self->write_fdp == &neighboring_element_fd) {
+ XferElement *elt = XFER_ELEMENT(self);
+ self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
+ } else {
+ self->write_fd = *self->write_fdp;
+ *self->write_fdp = -1;
+ }
+ self->write_fdp = NULL;
+ return self->write_fd;
+}
+
+static int
+close_read_fd(XferElementGlue *self)
+{
+ int fd = get_read_fd(self);
+ self->read_fd = -1;
+ return close(fd);
+}
+
+static int
+close_write_fd(XferElementGlue *self)
+{
+ int fd = get_write_fd(self);
+ self->write_fd = -1;
+ return close(fd);
+}
+
/*
* Worker thread utility functions
*/
pull_and_write(XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
- int fd = *self->write_fdp;
+ int fd = get_write_fd(self);
+ self->write_fdp = NULL;
while (!elt->cancelled) {
size_t len;
/* close the fd we've been writing, as an EOF signal to downstream, and
* set it to -1 to avoid accidental re-use */
- close(fd);
- *self->write_fdp = -1;
+ close_write_fd(self);
}
static void
read_and_write(XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
- int rfd = *self->read_fdp;
- int wfd = *self->write_fdp;
-
/* dynamically allocate a buffer, in case this thread has
* a limited amount of stack allocated */
char *buf = g_malloc(GLUE_BUFFER_SIZE);
+ int rfd = get_read_fd(self);
+ int wfd = get_write_fd(self);
while (!elt->cancelled) {
size_t len;
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_pulling(elt->upstream);
+ xfer_element_drain_by_reading(rfd);
- /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental
- * re-use */
- if (!elt->cancelled || elt->expect_eof) {
- close(rfd);
- *self->read_fdp = -1;
- }
+ /* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully
+ * kill it and complete the cancellation */
+ close_read_fd(self);
- /* close the fd we've been writing, as an EOF signal to downstream, and
- * set it to -1 to avoid accidental re-use */
- close(wfd);
- *self->write_fdp = -1;
+ /* close the fd we've been writing, as an EOF signal to downstream */
+ close_write_fd(self);
amfree(buf);
}
XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
- int fd = *self->read_fdp;
+ int fd = get_read_fd(self);
while (!elt->cancelled) {
char *buf = g_malloc(GLUE_BUFFER_SIZE);
/* send an EOF indication downstream */
xfer_element_push_buffer(elt->downstream, NULL, 0);
- /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
- * re-use */
- close(fd);
- *self->read_fdp = -1;
+ /* close the read fd, since it's at EOF */
+ close_read_fd(self);
}
static void
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
/* thread will read from pipe and call downstream's push_buffer */
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->on_pull = PULL_FROM_FD;
self->read_fdp = &self->pipe[0];
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
/* thread will connect for output, then read from pipe and write to socket */
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
/* thread will accept output conn, then read from pipe and write to socket */
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
make_pipe(self);
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->on_push = PUSH_TO_FD;
self->write_fdp = &self->pipe[1];
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
/* thread will pull from upstream and write to pipe */
make_pipe(self);
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->write_fdp = &self->pipe[1];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
/* thread will accept for input, then read from socket and write to pipe */
make_pipe(self);
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->write_fdp = &self->pipe[1];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
/* thread will connect for input, then read from socket and write to pipe */
make_pipe(self);
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->write_fdp = &self->pipe[1];
self->need_thread = TRUE;
{
XferElementGlue *self = (XferElementGlue *)elt;
- /* upstream and downstream are now set, so we can point our fdp's to them */
- if (self->write_fdp == &neighboring_element_fd)
- self->write_fdp = &elt->downstream->input_fd;
- if (self->read_fdp == &neighboring_element_fd)
- self->read_fdp = &elt->upstream->output_fd;
-
if (self->need_thread)
- self->thread = g_thread_create(worker_thread, (gpointer)self, FALSE, NULL);
+ self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
/* we're active if we have a thread that will eventually die */
return self->need_thread;
}
case PULL_FROM_FD: {
- int fd = *self->read_fdp;
+ int fd = get_read_fd(self);
char *buf = g_malloc(GLUE_BUFFER_SIZE);
ssize_t len;
- if (elt->cancelled) {
- if (elt->expect_eof)
- xfer_element_drain_by_reading(fd);
+ /* if the fd is already closed, it's possible upstream bailed out
+ * so quickly that we didn't even get a look at the fd */
+ if (elt->cancelled || fd == -1) {
+ if (fd != -1) {
+ if (elt->expect_eof)
+ xfer_element_drain_by_reading(fd);
- close(fd);
- *self->read_fdp = -1;
+ close_read_fd(self);
+ }
*size = 0;
return NULL;
if (elt->expect_eof) {
xfer_element_drain_by_reading(fd);
}
- close(fd);
- *self->read_fdp = -1;
+ close_read_fd(self);
} else if (len == 0) {
/* EOF */
g_free(buf);
*size = 0;
/* signal EOF to downstream */
- close(fd);
- *self->read_fdp = -1;
+ close_read_fd(self);
}
}
return;
case PUSH_TO_FD: {
- int fd = *self->write_fdp;
+ int fd = get_write_fd(self);
+
+ /* if the fd is already closed, it's possible upstream bailed out
+ * so quickly that we didn't even get a look at the fd. In this
+ * case we can assume the xfer has been cancelled and just discard
+ * the data. */
+ if (fd == -1)
+ return;
if (elt->cancelled) {
if (!elt->expect_eof || !buf) {
- close(fd);
- *self->write_fdp = -1;
+ close_write_fd(self);
/* hack to ensure we won't close the fd again, if we get another push */
elt->expect_eof = TRUE;
}
amfree(buf);
} else {
- close(fd);
- *self->write_fdp = -1;
+ close_write_fd(self);
}
return;
self->output_listen_socket = -1;
self->input_data_socket = -1;
self->output_data_socket = -1;
+ self->read_fd = -1;
+ self->write_fd = -1;
}
static void
{
XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
- /* close our pipes if they're still open (they shouldn't be!) */
+ /* first make sure the worker thread has finished up */
+ if (self->thread)
+ g_thread_join(self->thread);
+
+ /* close our pipes and fd's if they're still open */
if (self->pipe[0] != -1) close(self->pipe[0]);
if (self->pipe[1] != -1) close(self->pipe[1]);
- if (self->input_listen_socket != -1) close(self->input_listen_socket);
- if (self->output_listen_socket != -1) close(self->output_listen_socket);
if (self->input_data_socket != -1) close(self->input_data_socket);
if (self->output_data_socket != -1) close(self->output_data_socket);
+ if (self->input_listen_socket != -1) close(self->input_listen_socket);
+ if (self->output_listen_socket != -1) close(self->output_listen_socket);
+ if (self->read_fd != -1) close(self->read_fd);
+ if (self->write_fd != -1) close(self->write_fd);
if (self->ring) {
/* empty the ring buffer, ignoring syncronization issues */