+}
+
+static gpointer
+worker_thread(
+ gpointer data)
+{
+ XferElement *elt = XFER_ELEMENT(data);
+ XferElementGlue *self = XFER_ELEMENT_GLUE(data);
+
+ switch (mech_pair(elt->input_mech, elt->output_mech)) {
+ case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
+ read_and_write(self);
+ break;
+
+ case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
+ case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
+ read_and_push(self);
+ break;
+
+ case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
+ case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
+ pull_and_write(self);
+ break;
+
+ case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
+ pull_and_push(self);
+ break;
+
+ case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
+ case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
+ if ((self->output_data_socket = do_directtcp_connect(self,
+ elt->downstream->input_listen_addrs)) == -1)
+ break;
+ self->write_fdp = &self->output_data_socket;
+ read_and_write(self);
+ break;
+
+ case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
+ if ((self->output_data_socket = do_directtcp_connect(self,
+ elt->downstream->input_listen_addrs)) == -1)
+ break;
+ self->write_fdp = &self->output_data_socket;
+ pull_and_write(self);
+ break;
+
+ case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
+ case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
+ if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
+ break;
+ self->read_fdp = &self->input_data_socket;
+ read_and_write(self);
+ break;
+
+ case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
+ if ((self->input_data_socket = do_directtcp_accept(self,
+ &self->input_listen_socket)) == -1)
+ break;
+ self->read_fdp = &self->input_data_socket;
+ read_and_push(self);
+ break;
+
+ case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
+ case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
+ case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
+ case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
+ case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
+ case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
+ case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
+ case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
+ case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
+ case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
+ default:
+ g_assert_not_reached();
+ break;
+
+ case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
+ case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
+ if ((self->output_data_socket = do_directtcp_accept(self,
+ &self->output_listen_socket)) == -1)
+ break;
+ self->write_fdp = &self->output_data_socket;
+ read_and_write(self);
+ break;
+
+ case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
+ case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
+ if ((self->input_data_socket = do_directtcp_connect(self,
+ elt->upstream->output_listen_addrs)) == -1)
+ break;
+ self->read_fdp = &self->input_data_socket;
+ read_and_write(self);
+ break;
+
+ case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
+ if ((self->input_data_socket = do_directtcp_connect(self,
+ elt->upstream->output_listen_addrs)) == -1)
+ break;
+ self->read_fdp = &self->input_data_socket;
+ read_and_push(self);
+ break;
+
+ case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
+ if ((self->output_data_socket = do_directtcp_accept(self,
+ &self->output_listen_socket)) == -1)
+ break;
+ self->write_fdp = &self->output_data_socket;
+ pull_and_write(self);
+ break;
+
+ case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
+ /* TODO: use async accept's here to avoid order dependency */
+ if ((self->output_data_socket = do_directtcp_accept(self,
+ &self->output_listen_socket)) == -1)
+ break;
+ self->write_fdp = &self->output_data_socket;
+ if ((self->input_data_socket = do_directtcp_accept(self,
+ &self->input_listen_socket)) == -1)
+ break;
+ self->read_fdp = &self->input_data_socket;
+ read_and_write(self);
+ break;
+
+ case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
+ /* TODO: use async connects and select() to avoid order dependency here */
+ if ((self->input_data_socket = do_directtcp_connect(self,
+ elt->upstream->output_listen_addrs)) == -1)
+ break;
+ self->read_fdp = &self->input_data_socket;
+ if ((self->output_data_socket = do_directtcp_connect(self,
+ elt->downstream->input_listen_addrs)) == -1)
+ break;
+ self->write_fdp = &self->output_data_socket;
+ read_and_write(self);
+ break;
+ }