+static gboolean
+connect_with_cond_impl(
+ Device *dself,
+ gboolean for_writing,
+ DirectTCPAddr *addrs,
+ DirectTCPConnection **dtcpconn,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
+{
+ NdmpDevice *self = NDMP_DEVICE(dself);
+ ndmp9_mover_mode mode;
+ ndmp9_mover_pause_reason reason;
+ guint64 seek_position;
+ int result;
+
+ g_assert(!self->listen_addrs);
+
+ *dtcpconn = NULL;
+ self->for_writing = for_writing;
+
+ if (!open_tape_agent(self)) {
+ /* error message was set by open_tape_agent */
+ return 1;
+ }
+
+ /* first, set the window to an empty span so that the mover doesn't start
+ * reading or writing data immediately. NDMJOB tends to reset the record
+ * size periodically (in direct contradiction to the spec), so we reset it
+ * here as well. */
+ if (!ndmp_connection_mover_set_record_size(self->ndmp,
+ DEVICE(self)->block_size)) {
+ set_error_from_ndmp(self);
+ return 1;
+ }
+
+ if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) {
+ set_error_from_ndmp(self);
+ return 1;
+ }
+
+ if (self->for_writing)
+ mode = NDMP9_MOVER_MODE_READ;
+ else
+ mode = NDMP9_MOVER_MODE_WRITE;
+
+ if (!ndmp_connection_mover_connect(self->ndmp, mode, addrs)) {
+ set_error_from_ndmp(self);
+ return 1;
+ }
+
+ if (!self->for_writing) {
+ /* The agent is in the ACTIVE state, and will remain so until we tell
+ * it to do something else. The thing we want to is for it to start
+ * reading data from the tape, which will immediately trigger an EOW or
+ * SEEK pause. */
+ if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
+ set_error_from_ndmp(self);
+ return 1;
+ }
+
+ /* now we should expect a notice that the mover has paused */
+ } else {
+ /* when writing, the mover will pause as soon as the first byte comes
+ * in, so there's no need to do anything to trigger the pause. */
+ }
+
+ /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to write
+ * outside the window, while the standard specifies .._EOW, instead. When
+ * reading to a connection, we get the appropriate .._SEEK. It's easy
+ * enough to handle both. */
+
+ result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+ NULL,
+ NULL,
+ &reason, &seek_position,
+ abort_mutex, abort_cond);
+
+ if (result == 1) {
+ set_error_from_ndmp(self);
+ return 1;
+ } else if (result == 2) {
+ return 2;
+ }
+
+ if (reason != NDMP9_MOVER_PAUSE_SEEK && reason != NDMP9_MOVER_PAUSE_EOW) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return 1;
+ }
+
+ if (self->listen_addrs) {
+ g_free(self->listen_addrs);
+ self->listen_addrs = NULL;
+ }
+
+ /* set up the new directtcp connection */
+ if (self->directtcp_conn)
+ g_object_unref(self->directtcp_conn);
+ self->directtcp_conn =
+ directtcp_connection_ndmp_new(self->ndmp, mode);
+ *dtcpconn = DIRECTTCP_CONNECTION(self->directtcp_conn);
+
+ /* reference it for the caller */
+ g_object_ref(*dtcpconn);
+
+ return 0;
+}
+
+static gboolean
+indirecttcp_start_writing(
+ NdmpDevice *self)
+{
+ DirectTCPAddr *real_addrs, *iter;
+ int conn_sock;
+
+ /* The current state is that the other end is trying to connect to
+ * indirecttcp_sock. The mover remains IDLE, although its window is set
+ * correctly for the part we are about to write. */
+
+ g_debug("indirecttcp_start_writing, ready to accept");
+ conn_sock = accept(self->indirecttcp_sock, NULL, NULL);
+ if (conn_sock < 0) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("Could not accept indirecttcp socket: %s", strerror(errno)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ g_debug("indirecttcp_start_writing, accepted");
+
+ close(self->indirecttcp_sock);
+ self->indirecttcp_sock = -1;
+
+ /* tell mover to start listening */
+ g_assert(self->for_writing);
+ if (!ndmp_connection_mover_listen(self->ndmp,
+ NDMP4_MOVER_MODE_READ,
+ NDMP4_ADDR_TCP,
+ &real_addrs)) {
+ set_error_from_ndmp(self);
+ return FALSE;
+ }
+
+ /* format the addresses and send them down the socket */
+ for (iter = real_addrs; iter && SU_GET_FAMILY(iter) != 0; iter++) {
+ char inet[INET_ADDRSTRLEN];
+ const char *addr;
+ char *addrspec;
+
+ addr = inet_ntop(AF_INET, &iter->sin.sin_addr.s_addr, inet, 40);
+
+ addrspec = g_strdup_printf("%s:%d%s", addr, SU_GET_PORT(iter),
+ SU_GET_FAMILY(iter+1) !=0? " ":"");
+ g_debug("indirecttcp_start_writing, send %s", addrspec);
+ if (full_write(conn_sock, addrspec, strlen(addrspec)) < strlen(addrspec)) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("writing to indirecttcp socket: %s", strerror(errno)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ }
+
+ /* close the socket for good. This ensures that the next call to
+ * write_from_connection_impl will not go through the mover setup process.
+ * */
+ if (close(conn_sock) < 0) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("closing indirecttcp socket: %s", strerror(errno)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ conn_sock = -1;
+
+ /* and free the listen_addrs, since we didn't free them in accept_impl */
+ if (self->listen_addrs) {
+ g_free(self->listen_addrs);
+ self->listen_addrs = NULL;
+ }
+
+ /* Now it's up to the remote end to connect to the mover and start sending
+ * data. We won't get any notification when this happens, although we could
+ * in principle poll for such a thing. */
+ return TRUE;
+}
+