Merge tag 'upstream/3.3.3'
[debian/amanda] / device-src / ndmp-device.c
index 865f7407b47196eb625c57ad7819fb9c6897c0b1..d8c816a20cbfb3ccd612232963322596799d7bb2 100644 (file)
@@ -1,9 +1,10 @@
 /*
  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
  *
  * This program is distributed in the hope that it will be useful, but
  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -72,6 +73,11 @@ struct NdmpDevice_ {
     gchar       *ndmp_auth;
     gboolean    verbose;
     gsize       read_block_size;
+
+    GMutex     *abort_mutex;
+    GCond      *abort_cond;
+    gboolean    cancel;
+    int         *cancelled;
 };
 
 /*
@@ -574,8 +580,6 @@ ndmp_device_start(
     dumpfile_t *header;
     char       *header_buf;
 
-    self = NDMP_DEVICE(dself);
-
     if (device_in_error(self)) return FALSE;
 
     if (!open_tape_agent(self)) {
@@ -1119,151 +1123,74 @@ listen_impl(
     return TRUE;
 }
 
-static gboolean
-accept_impl(
-    Device *dself,
-    DirectTCPConnection **dtcpconn,
-    ProlongProc prolong,
-    gpointer prolong_data)
+static gpointer
+accept_wait_cond(
+    gpointer data)
 {
-    NdmpDevice *self = NDMP_DEVICE(dself);
+    NdmpDevice *self = NDMP_DEVICE(data);
+
     ndmp9_mover_state state;
     guint64 bytes_moved;
-    ndmp9_mover_mode mode;
-    ndmp9_mover_pause_reason reason;
-    guint64 seek_position;
-
-    if (device_in_error(self)) return FALSE;
-
-    g_assert(self->listen_addrs);
-
-    *dtcpconn = NULL;
-
-    /* TODO: support aborting this operation - maybe just always poll? */
-    prolong = prolong;
-    prolong_data = prolong_data;
 
-    if (!self->for_writing) {
-       /* when reading, we don't get any kind of notification that the
-        * connection has been established, but we can't call NDMP_MOVER_READ
-        * until the mover is active.  So we have to poll, waiting for ACTIVE.
-        * This is ugly. */
-       gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */
-       while (1) {
-           if (!ndmp_connection_mover_get_state(self->ndmp,
+    gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */
+    g_mutex_lock(self->abort_mutex);
+    while (1) {
+       g_mutex_unlock(self->abort_mutex);
+       if (!ndmp_connection_mover_get_state(self->ndmp,
                    &state, &bytes_moved, NULL, NULL)) {
-               set_error_from_ndmp(self);
-               return FALSE;
-           }
-
-           if (state != NDMP9_MOVER_STATE_LISTEN)
-               break;
-
-           /* back off a little bit to give the other side time to breathe,
-            * but not more than one second */
-           g_usleep(backoff);
-           backoff *= 2;
-           if (backoff > G_USEC_PER_SEC)
-               backoff = G_USEC_PER_SEC;
-       }
-
-       /* double-check state */
-       if (state != NDMP9_MOVER_STATE_ACTIVE) {
-           device_set_error(DEVICE(self),
-               g_strdup("mover did not enter the ACTIVE state as expected"),
-               DEVICE_STATUS_DEVICE_ERROR);
-           return FALSE;
-       }
-
-       /* now, we need to get this into the PAUSED state, since right now we
-        * aren't allowed to perform any tape movement commands.  So we issue a
-        * MOVER_READ request for the whole darn image stream after setting the
-        * usual empty window. Note that this means the whole dump will be read
-        * in one MOVER_READ operation, even if it does not begin at the
-        * beginning of a part. */
-       if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
+           g_mutex_lock(self->abort_mutex);
            set_error_from_ndmp(self);
-           return FALSE;
-       }
-
-       /* now we should expect a notice that the mover has paused */
-    } else {
-       /* when writing, the mover will pause as soon as the first byte comes
-         * in, so there's no need to do anything to trigger the pause.
-         *
-         * Well, sometimes it won't - specifically, when it does not allow a
-         * zero-byte mover window, which means we've set up IndirectTCP.  But in
-         * that case, there's nothing interesting to do here.*/
-    }
-
-    if (self->indirecttcp_sock == -1) {
-       /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to write
-        * outside the window, while the standard specifies .._EOW, instead.  When
-        * reading to a connection, we get the appropriate .._SEEK.  It's easy
-        * enough to handle both. */
-
-       if (!ndmp_connection_wait_for_notify(self->ndmp,
-               NULL,
-               NULL,
-               &reason, &seek_position)) {
-           set_error_from_ndmp(self);
-           return FALSE;
-       }
-
-       if (reason != NDMP9_MOVER_PAUSE_SEEK && reason != NDMP9_MOVER_PAUSE_EOW) {
-           device_set_error(DEVICE(self),
-               g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
-               DEVICE_STATUS_DEVICE_ERROR);
-           return FALSE;
+           state = 0;
+           break;
        }
-    }
+       g_mutex_lock(self->abort_mutex);
 
-    /* at this point, if we're doing directtcp, the mover is paused and ready
-     * to go, and the listen addrs are no longer required; if we're doing
-     * indirecttcp, then the other end may not even know of our listen_addrs
-     * yet, so we can't free them.
-     */
+       if (state != NDMP9_MOVER_STATE_LISTEN)
+           break;
 
-    if (self->indirecttcp_sock == -1) {
-       g_free(self->listen_addrs);
-       self->listen_addrs = NULL;
+       /* back off a little bit to give the other side time to breathe,
+        * but not more than one second */
+       g_mutex_unlock(self->abort_mutex);
+       g_usleep(backoff);
+       g_mutex_lock(self->abort_mutex);
+       if (self->cancel)
+           break;
+       backoff *= 2;
+       if (backoff > G_USEC_PER_SEC)
+           backoff = G_USEC_PER_SEC;
     }
 
-    if (self->for_writing)
-       mode = NDMP9_MOVER_MODE_READ;
-    else
-       mode = NDMP9_MOVER_MODE_WRITE;
-
-    /* set up the new directtcp connection */
-    if (self->directtcp_conn)
-       g_object_unref(self->directtcp_conn);
-    self->directtcp_conn =
-       directtcp_connection_ndmp_new(self->ndmp, mode);
-    *dtcpconn = DIRECTTCP_CONNECTION(self->directtcp_conn);
-
-    /* reference it for the caller */
-    g_object_ref(*dtcpconn);
-
-    return TRUE;
+    self->cancel = TRUE;
+    g_cond_broadcast(self->abort_cond);
+    g_mutex_unlock(self->abort_mutex);
+    return GINT_TO_POINTER(state);
 }
 
 static int
-accept_with_cond_impl(
+accept_impl(
     Device *dself,
     DirectTCPConnection **dtcpconn,
+    gboolean *cancelled,
     GMutex *abort_mutex,
     GCond *abort_cond)
 {
     NdmpDevice *self = NDMP_DEVICE(dself);
     ndmp9_mover_state state;
-    guint64 bytes_moved;
     ndmp9_mover_mode mode;
-    ndmp9_mover_pause_reason reason;
+    ndmp9_mover_halt_reason mover_halt_reason  = NDMP9_MOVER_HALT_NA;
+    ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
     guint64 seek_position;
-    int result;
+    GThread *wait_thread;
+    int      result;
+    char    *err;
 
     if (device_in_error(self)) return 1;
 
+    self->abort_mutex = abort_mutex;
+    self->abort_cond = abort_cond;
+    self->cancelled = cancelled;
+    self->cancel = FALSE;
+
     g_assert(self->listen_addrs);
 
     *dtcpconn = NULL;
@@ -1273,23 +1200,18 @@ accept_with_cond_impl(
         * connection has been established, but we can't call NDMP_MOVER_READ
         * until the mover is active.  So we have to poll, waiting for ACTIVE.
         * This is ugly. */
-       gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */
-       while (1) {
-           if (!ndmp_connection_mover_get_state(self->ndmp,
-                   &state, &bytes_moved, NULL, NULL)) {
-               set_error_from_ndmp(self);
-               return 1;
-           }
 
-           if (state != NDMP9_MOVER_STATE_LISTEN)
-               break;
+       wait_thread = g_thread_create(accept_wait_cond, (gpointer)self, TRUE,
+                                     NULL);
+       while (!*cancelled && !self->cancel) {
+           g_cond_wait(self->abort_cond, self->abort_mutex);
+       }
+       self->cancel = TRUE;
+       state = GPOINTER_TO_INT(g_thread_join(wait_thread));
 
-           /* back off a little bit to give the other side time to breathe,
-            * but not more than one second */
-           g_usleep(backoff);
-           backoff *= 2;
-           if (backoff > G_USEC_PER_SEC)
-               backoff = G_USEC_PER_SEC;
+       if (*cancelled) {
+           result = 2;
+           goto accept_failed;
        }
 
        /* double-check state */
@@ -1297,7 +1219,8 @@ accept_with_cond_impl(
            device_set_error(DEVICE(self),
                g_strdup("mover did not enter the ACTIVE state as expected"),
                DEVICE_STATUS_DEVICE_ERROR);
-           return 1;
+           result = 1;
+           goto accept_failed;
        }
 
        /* now, we need to get this into the PAUSED state, since right now we
@@ -1308,7 +1231,8 @@ accept_with_cond_impl(
         * beginning of a part. */
        if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
            set_error_from_ndmp(self);
-           return 1;
+           result = 1;
+           goto accept_failed;
        }
 
        /* now we should expect a notice that the mover has paused */
@@ -1324,24 +1248,39 @@ accept_with_cond_impl(
         * .._SEEK.  It's easy enough to handle both. */
        result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
                        NULL,
-                       NULL,
-                       &reason, &seek_position,
+                       &mover_halt_reason,
+                       &mover_pause_reason, &seek_position,
+                       cancelled,
                        abort_mutex, abort_cond);
 
        if (result == 1) {
            set_error_from_ndmp(self);
-           return 1;
+           goto accept_failed;
        } else if (result == 2) {
-           return 2;
+           goto accept_failed;
+       }
+
+       err = NULL;
+       if (mover_pause_reason) {
+           switch (mover_pause_reason) {
+               case NDMP9_MOVER_PAUSE_SEEK:
+               case NDMP9_MOVER_PAUSE_EOW:
+                       break;
+               default:
+                       err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
+                       break;
+           }
+       } else if (mover_halt_reason) {
+           err = "unexpected NOTIFY_MOVER_HALT";
        }
 
-       if (reason != NDMP9_MOVER_PAUSE_SEEK &&
-           reason != NDMP9_MOVER_PAUSE_EOW) {
+       if (err) {
            device_set_error(DEVICE(self),
-           g_strdup_printf(
-               "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
+               g_strdup_printf("waiting NDMP_MOVER_PAUSE_SEEK: %s", err),
                DEVICE_STATUS_DEVICE_ERROR);
-           return FALSE;
+           result = 1;
+           goto accept_failed;
+
        }
     }
 
@@ -1371,127 +1310,30 @@ accept_with_cond_impl(
     g_object_ref(*dtcpconn);
 
     return 0;
-}
-
-static gboolean
-connect_impl(
-    Device *dself,
-    gboolean for_writing,
-    DirectTCPAddr *addrs,
-    DirectTCPConnection **dtcpconn,
-    ProlongProc prolong,
-    gpointer prolong_data)
-{
-    NdmpDevice *self = NDMP_DEVICE(dself);
-    ndmp9_mover_mode mode;
-    ndmp9_mover_pause_reason reason;
-    guint64 seek_position;
-
-    g_assert(!self->listen_addrs);
-
-    *dtcpconn = NULL;
-    self->for_writing = for_writing;
-
-    /* TODO: support aborting this operation - maybe just always poll? */
-    prolong = prolong;
-    prolong_data = prolong_data;
-
-    if (!open_tape_agent(self)) {
-       /* error message was set by open_tape_agent */
-       return FALSE;
-    }
-
-    /* first, set the window to an empty span so that the mover doesn't start
-     * reading or writing data immediately.  NDMJOB tends to reset the record
-     * size periodically (in direct contradiction to the spec), so we reset it
-     * here as well. */
-    if (!ndmp_connection_mover_set_record_size(self->ndmp,
-               DEVICE(self)->block_size)) {
-       set_error_from_ndmp(self);
-       return FALSE;
-    }
-
-    if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) {
-       set_error_from_ndmp(self);
-       return FALSE;
-    }
-
-    if (self->for_writing)
-       mode = NDMP9_MOVER_MODE_READ;
-    else
-       mode = NDMP9_MOVER_MODE_WRITE;
 
-    if (!ndmp_connection_mover_connect(self->ndmp, mode, addrs)) {
-       set_error_from_ndmp(self);
-       return FALSE;
-    }
-
-    if (!self->for_writing) {
-       /* The agent is in the ACTIVE state, and will remain so until we tell
-        * it to do something else.  The thing we want to is for it to start
-        * reading data from the tape, which will immediately trigger an EOW or
-        * SEEK pause. */
-       if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
-           set_error_from_ndmp(self);
-           return FALSE;
-       }
-
-       /* now we should expect a notice that the mover has paused */
-    } else {
-       /* when writing, the mover will pause as soon as the first byte comes
-        * in, so there's no need to do anything to trigger the pause. */
-    }
-
-    /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to write
-     * outside the window, while the standard specifies .._EOW, instead.  When
-     * reading to a connection, we get the appropriate .._SEEK.  It's easy
-     * enough to handle both. */
-
-    if (!ndmp_connection_wait_for_notify(self->ndmp,
-           NULL,
-           NULL,
-           &reason, &seek_position)) {
-       set_error_from_ndmp(self);
-       return FALSE;
-    }
-
-    if (reason != NDMP9_MOVER_PAUSE_SEEK && reason != NDMP9_MOVER_PAUSE_EOW) {
-       device_set_error(DEVICE(self),
-           g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
-           DEVICE_STATUS_DEVICE_ERROR);
-       return FALSE;
-    }
-
-    if (self->listen_addrs) {
-       g_free(self->listen_addrs);
-       self->listen_addrs = NULL;
+accept_failed:
+    if (self->indirecttcp_sock == -1) {
+        g_free(self->listen_addrs);
+        self->listen_addrs = NULL;
     }
-
-    /* set up the new directtcp connection */
-    if (self->directtcp_conn)
-       g_object_unref(self->directtcp_conn);
-    self->directtcp_conn =
-       directtcp_connection_ndmp_new(self->ndmp, mode);
-    *dtcpconn = DIRECTTCP_CONNECTION(self->directtcp_conn);
-
-    /* reference it for the caller */
-    g_object_ref(*dtcpconn);
-
-    return TRUE;
+    return result;
 }
 
-static gboolean
-connect_with_cond_impl(
+
+static int
+connect_impl(
     Device *dself,
     gboolean for_writing,
     DirectTCPAddr *addrs,
     DirectTCPConnection **dtcpconn,
+    int    *cancelled,
     GMutex *abort_mutex,
-    GCond *abort_cond)
+    GCond  *abort_cond)
 {
     NdmpDevice *self = NDMP_DEVICE(dself);
     ndmp9_mover_mode mode;
-    ndmp9_mover_pause_reason reason;
+    ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+    ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
     guint64 seek_position;
     int result;
 
@@ -1553,8 +1395,9 @@ connect_with_cond_impl(
 
     result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
            NULL,
-           NULL,
-           &reason, &seek_position,
+           &mover_halt_reason,
+           &mover_pause_reason, &seek_position,
+           cancelled,
            abort_mutex, abort_cond);
 
     if (result == 1) {
@@ -1564,7 +1407,14 @@ connect_with_cond_impl(
        return 2;
     }
 
-    if (reason != NDMP9_MOVER_PAUSE_SEEK && reason != NDMP9_MOVER_PAUSE_EOW) {
+    if (mover_halt_reason != NDMP9_MOVER_HALT_NA) {
+       device_set_error(DEVICE(self),
+           g_strdup_printf("got NDMP9_MOVER_HALT"),
+           DEVICE_STATUS_DEVICE_ERROR);
+       return 1;
+    }
+    if (mover_pause_reason != NDMP9_MOVER_PAUSE_SEEK &&
+       mover_pause_reason != NDMP9_MOVER_PAUSE_EOW) {
        device_set_error(DEVICE(self),
            g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
            DEVICE_STATUS_DEVICE_ERROR);
@@ -1629,11 +1479,11 @@ indirecttcp_start_writing(
        const char *addr;
        char *addrspec;
 
-       addr = inet_ntop(AF_INET, &iter->sin.sin_addr.s_addr, inet, 40);
+       addr = inet_ntop(AF_INET, &iter->sin.sin_addr.s_addr, inet, INET_ADDRSTRLEN);
 
        addrspec = g_strdup_printf("%s:%d%s", addr, SU_GET_PORT(iter),
                SU_GET_FAMILY(iter+1) !=0? " ":"");
-    g_debug("indirecttcp_start_writing, send %s", addrspec);
+       g_debug("indirecttcp_start_writing, send %s", addrspec);
        if (full_write(conn_sock, addrspec, strlen(addrspec)) < strlen(addrspec)) {
            device_set_error(DEVICE(self),
                g_strdup_printf("writing to indirecttcp socket: %s", strerror(errno)),
@@ -1665,20 +1515,24 @@ indirecttcp_start_writing(
     return TRUE;
 }
 
-static gboolean
+static int
 write_from_connection_impl(
-    Device *dself,
-    guint64 size,
-    guint64 *actual_size)
+    Device  *dself,
+    guint64  size,
+    guint64 *actual_size,
+    int     *cancelled,
+    GMutex  *abort_mutex,
+    GCond   *abort_cond)
 {
     NdmpDevice *self = NDMP_DEVICE(dself);
     DirectTCPConnectionNDMP *nconn = self->directtcp_conn;
     gboolean eom = FALSE, eof = FALSE, eow = FALSE;
     ndmp9_mover_state mover_state;
-    ndmp9_mover_halt_reason halt_reason;
-    ndmp9_mover_pause_reason pause_reason;
+    ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+    ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
     guint64 bytes_moved_before, bytes_moved_after;
     gchar *err;
+    int result;
 
     if (device_in_error(self)) return FALSE;
 
@@ -1694,7 +1548,7 @@ write_from_connection_impl(
     if (!ndmp_connection_mover_get_state(self->ndmp,
                &mover_state, &bytes_moved_before, NULL, NULL)) {
        set_error_from_ndmp(self);
-       return FALSE;
+       return 1;
     }
 
     if (self->indirecttcp_sock != -1) {
@@ -1715,33 +1569,37 @@ write_from_connection_impl(
                nconn->offset,
                size? size : G_MAXUINT64 - nconn->offset)) {
        set_error_from_ndmp(self);
-       return FALSE;
+       return 1;
     }
 
     /* for DirectTCP, we just tell the mover to continue; IndirectTCP is more complicated. */
     if (self->indirecttcp_sock != -1) {
        if (!indirecttcp_start_writing(self)) {
-           return FALSE;
+           return 1;
        }
     } else {
        if (!ndmp_connection_mover_continue(self->ndmp)) {
            set_error_from_ndmp(self);
-           return FALSE;
+           return 1;
        }
     }
 
     /* now wait for the mover to pause itself again, or halt on EOF or an error */
-    if (!ndmp_connection_wait_for_notify(self->ndmp,
-           NULL,
-           &halt_reason,
-           &pause_reason, NULL)) {
+     result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+                   NULL,
+                   &mover_halt_reason,
+                   &mover_pause_reason, NULL,
+                   cancelled, abort_mutex, abort_cond);
+    if (result == 1) {
        set_error_from_ndmp(self);
-       return FALSE;
+       return 1;
+    } else if (result == 2) {
+       return 2;
     }
 
     err = NULL;
-    if (pause_reason) {
-       switch (pause_reason) {
+    if (mover_pause_reason) {
+       switch (mover_pause_reason) {
            case NDMP9_MOVER_PAUSE_EOM:
                eom = TRUE;
                break;
@@ -1757,8 +1615,8 @@ write_from_connection_impl(
                err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
                break;
        }
-    } else if (halt_reason) {
-       switch (halt_reason) {
+    } else if (mover_halt_reason) {
+       switch (mover_halt_reason) {
            case NDMP9_MOVER_HALT_CONNECT_CLOSED:
                eof = TRUE;
                break;
@@ -1777,7 +1635,7 @@ write_from_connection_impl(
        device_set_error(DEVICE(self),
            g_strdup_printf("waiting for accept: %s", err),
            DEVICE_STATUS_DEVICE_ERROR);
-       return FALSE;
+       return 1;
     }
 
     /* no error, so the mover stopped due to one of EOM (volume out of space),
@@ -1787,7 +1645,7 @@ write_from_connection_impl(
     if (!ndmp_connection_mover_get_state(self->ndmp,
                &mover_state, &bytes_moved_after, NULL, NULL)) {
        set_error_from_ndmp(self);
-       return FALSE;
+       return 1;
     }
     size = bytes_moved_after - bytes_moved_before;
     nconn->offset += size;
@@ -1805,31 +1663,36 @@ write_from_connection_impl(
          * we do need to figure out the actual size */
         DEVICE(self)->is_eom = TRUE;
     } else {
+       g_assert_not_reached();
         error("not reached");
     }
 
-    return TRUE;
+    return 0;
 }
 
-static gboolean
+static int
 read_to_connection_impl(
     Device *dself,
     guint64 size,
-    guint64 *actual_size)
+    guint64 *actual_size,
+    int     *cancelled,
+    GMutex  *abort_mutex,
+    GCond   *abort_cond)
 {
     NdmpDevice *self = NDMP_DEVICE(dself);
     DirectTCPConnectionNDMP *nconn = self->directtcp_conn;
     gboolean eom = FALSE, eof = FALSE, eow = FALSE;
     ndmp9_mover_state mover_state;
-    ndmp9_mover_halt_reason halt_reason;
-    ndmp9_mover_pause_reason pause_reason;
+    ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+    ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
     guint64 bytes_moved_before, bytes_moved_after;
     gchar *err;
+    int result;
 
     if (actual_size)
        *actual_size = 0;
 
-    if (device_in_error(self)) return FALSE;
+    if (device_in_error(self)) return 1;
 
     /* read_to_connection does not support IndirectTCP */
     g_assert(self->indirecttcp_sock == -1);
@@ -1842,7 +1705,7 @@ read_to_connection_impl(
     if (!ndmp_connection_mover_get_state(self->ndmp,
                &mover_state, &bytes_moved_before, NULL, NULL)) {
        set_error_from_ndmp(self);
-       return FALSE;
+       return 1;
     }
 
     /* the mover had best be PAUSED right now */
@@ -1852,26 +1715,30 @@ read_to_connection_impl(
                nconn->offset,
                size? size : G_MAXUINT64 - nconn->offset)) {
        set_error_from_ndmp(self);
-       return FALSE;
+       return 1;
     }
 
     if (!ndmp_connection_mover_continue(self->ndmp)) {
        set_error_from_ndmp(self);
-       return FALSE;
+       return 1;
     }
 
     /* now wait for the mover to pause itself again, or halt on EOF or an error */
-    if (!ndmp_connection_wait_for_notify(self->ndmp,
-           NULL,
-           &halt_reason,
-           &pause_reason, NULL)) {
+    result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+                   NULL,
+                   &mover_halt_reason,
+                   &mover_pause_reason, NULL,
+                   cancelled, abort_mutex, abort_cond);
+    if (result == 1) {
        set_error_from_ndmp(self);
-       return FALSE;
+       return 1;
+    } else if (result == 2) {
+       return 2;
     }
 
     err = NULL;
-    if (pause_reason) {
-       switch (pause_reason) {
+    if (mover_pause_reason) {
+       switch (mover_pause_reason) {
            case NDMP9_MOVER_PAUSE_EOF:
                eof = TRUE;
                break;
@@ -1887,8 +1754,8 @@ read_to_connection_impl(
                err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
                break;
        }
-    } else if (halt_reason) {
-       switch (halt_reason) {
+    } else if (mover_halt_reason) {
+       switch (mover_halt_reason) {
            case NDMP9_MOVER_HALT_CONNECT_CLOSED:
                eof = TRUE;
                break;
@@ -1907,7 +1774,7 @@ read_to_connection_impl(
        device_set_error(DEVICE(self),
            g_strdup_printf("waiting for accept: %s", err),
            DEVICE_STATUS_DEVICE_ERROR);
-       return FALSE;
+       return 1;
     }
 
     /* no error, so the mover stopped due to one of EOM (volume out of space),
@@ -1917,7 +1784,7 @@ read_to_connection_impl(
     if (!ndmp_connection_mover_get_state(self->ndmp,
                &mover_state, &bytes_moved_after, NULL, NULL)) {
        set_error_from_ndmp(self);
-       return FALSE;
+       return 1;
     }
     size = bytes_moved_after - bytes_moved_before;
     nconn->offset += size;
@@ -1935,10 +1802,11 @@ read_to_connection_impl(
          * we do need to figure out the actual size */
         DEVICE(self)->is_eom = TRUE;
     } else {
+       g_assert_not_reached();
         error("not reached");
     }
 
-    return TRUE;
+    return 0;
 }
 
 static gboolean
@@ -2105,10 +1973,8 @@ ndmp_device_class_init(NdmpDeviceClass * c G_GNUC_UNUSED)
 
     device_class->directtcp_supported = TRUE;
     device_class->listen = listen_impl;
-    device_class->accept = accept_impl;
-    device_class->accept_with_cond = accept_with_cond_impl;
-    device_class->connect = connect_impl;
-    device_class->connect_with_cond = connect_with_cond_impl;
+    device_class->accept= accept_impl;
+    device_class->connect= connect_impl;
     device_class->write_from_connection = write_from_connection_impl;
     device_class->read_to_connection = read_to_connection_impl;
     device_class->use_connection = use_connection_impl;
@@ -2316,7 +2182,7 @@ directtcp_connection_ndmp_close(DirectTCPConnection *dself)
     char *rv = NULL;
     ndmp9_mover_state state;
     guint64 bytes_moved;
-    ndmp9_mover_halt_reason reason;
+    ndmp9_mover_halt_reason mover_halt_reason;
     gboolean expect_notif = FALSE;
 
     /* based on the current state, we may need to abort or stop the
@@ -2354,7 +2220,7 @@ directtcp_connection_ndmp_close(DirectTCPConnection *dself)
     if (expect_notif) {
        if (!ndmp_connection_wait_for_notify(self->ndmp,
                NULL,
-               &reason, /* value is ignored.. */
+               &mover_halt_reason, /* value is ignored.. */
                NULL, NULL)) {
            goto error;
        }