/*
- * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
#include "util.h"
#include "device.h"
#include "directtcp.h"
+#include "stream.h"
#include "ndmlib.h"
#include "ndmpconnobj.h"
+#include "sockaddr-util.h"
/*
* Type checking and casting macros
DirectTCPAddr *listen_addrs;
gboolean for_writing;
+ /* support for IndirectTCP */
+ int indirecttcp_sock; /* -1 if not in use */
+ int indirect;
+
/* Current DirectTCPConnectionNDMP */
struct DirectTCPConnectionNDMP_ *directtcp_conn;
gchar *ndmp_auth;
gboolean verbose;
gsize read_block_size;
+
+ GMutex *abort_mutex;
+ GCond *abort_cond;
+ gboolean cancel;
+ int *cancelled;
};
/*
static DevicePropertyBase device_property_ndmp_username;
static DevicePropertyBase device_property_ndmp_password;
static DevicePropertyBase device_property_ndmp_auth;
+static DevicePropertyBase device_property_indirect;
#define PROPERTY_NDMP_USERNAME (device_property_ndmp_username.ID)
#define PROPERTY_NDMP_PASSWORD (device_property_ndmp_password.ID)
#define PROPERTY_NDMP_AUTH (device_property_ndmp_auth.ID)
+#define PROPERTY_INDIRECT (device_property_indirect.ID)
/*
g_free(self->ndmp_password);
if (self->ndmp_auth)
g_free(self->ndmp_auth);
+ if (self->indirecttcp_sock != -1)
+ close(self->indirecttcp_sock);
}
static DeviceStatusFlags
dumpfile_t *header;
char *header_buf;
- self = NDMP_DEVICE(dself);
-
if (device_in_error(self)) return FALSE;
if (!open_tape_agent(self)) {
}
dself->access_mode = mode;
+ g_mutex_lock(dself->device_mutex);
dself->in_file = FALSE;
+ g_mutex_unlock(dself->device_mutex);
if (!single_ndmp_mtio(self, NDMP9_MTIO_REW)) {
/* single_ndmp_mtio already set our error message */
dself->is_eof = FALSE;
dself->is_eom = FALSE;
+ g_mutex_lock(dself->device_mutex);
+ dself->bytes_written = 0;
+ g_mutex_unlock(dself->device_mutex);
/* set the blocksize in the header properly */
header->blocksize = dself->block_size;
amfree(header_buf);
/* arrange the file numbers correctly */
+ g_mutex_lock(dself->device_mutex);
dself->in_file = TRUE;
+ g_mutex_unlock(dself->device_mutex);
if (!ndmp_get_state(self)) {
/* error already set by ndmp_get_state */
return FALSE;
}
dself->block++;
+ g_mutex_lock(dself->device_mutex);
+ dself->bytes_written += size;
+ g_mutex_unlock(dself->device_mutex);
if (replacement_buffer) g_free(replacement_buffer);
return TRUE;
if (device_in_error(dself)) return FALSE;
/* we're not in a file anymore */
+ g_mutex_lock(dself->device_mutex);
dself->in_file = FALSE;
+ g_mutex_unlock(dself->device_mutex);
if (!single_ndmp_mtio(self, NDMP9_MTIO_EOF)) {
/* error was set by single_ndmp_mtio */
}
/* fix up status */
+ g_mutex_lock(dself->device_mutex);
dself->in_file = TRUE;
+ g_mutex_unlock(dself->device_mutex);
dself->file = file;
dself->block = 0;
+ g_mutex_lock(dself->device_mutex);
+ dself->bytes_read = 0;
+ g_mutex_unlock(dself->device_mutex);
/* now read the header */
read_block_size = ndmp_device_read_size(self);
}
*size_req = (int)actual; /* cast is OK - requested size was < INT_MAX too */
+ g_mutex_lock(dself->device_mutex);
+ dself->bytes_read += actual;
+ g_mutex_unlock(dself->device_mutex);
return *size_req;
}
+static gboolean
+indirecttcp_listen(
+ NdmpDevice *self,
+ DirectTCPAddr **addrs)
+{
+ in_port_t port;
+
+ self->indirecttcp_sock = stream_server(AF_INET, &port, 0, STREAM_BUFSIZE, 0);
+ if (self->indirecttcp_sock < 0) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("Could not bind indirecttcp socket: %s", strerror(errno)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+
+ /* An IndirectTCP address is 255.255.255.255:$port */
+ self->listen_addrs = *addrs = g_new0(DirectTCPAddr, 2);
+ addrs[0]->sin.sin_family = AF_INET;
+ addrs[0]->sin.sin_addr.s_addr = htonl(0xffffffff);
+ SU_SET_PORT(addrs[0], port);
+
+ return TRUE;
+}
+
static gboolean
listen_impl(
Device *dself,
return FALSE;
}
+ self->for_writing = for_writing;
+
/* first, set the window to an empty span so that the mover doesn't start
* reading or writing data immediately. NDMJOB tends to reset the record
* size periodically (in direct contradiction to the spec), so we reset it
return FALSE;
}
- if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) {
- set_error_from_ndmp(self);
- return FALSE;
+ if (for_writing) {
+ /* if we're forcing indirecttcp, just do it */
+ if (self->indirect) {
+ return indirecttcp_listen(self, addrs);
+ }
+ if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) {
+ /* NDMP4_ILLEGAL_ARGS_ERR means the NDMP server doesn't like a zero-byte
+ * mover window, so we'll ignore it */
+ if (ndmp_connection_err_code(self->ndmp) != NDMP4_ILLEGAL_ARGS_ERR) {
+ set_error_from_ndmp(self);
+ return FALSE;
+ }
+
+ g_debug("NDMP Device: cannot set zero-length mover window; "
+ "falling back to IndirectTCP");
+ /* In this case, we need to set up IndirectTCP */
+ return indirecttcp_listen(self, addrs);
+ }
+ } else {
+ /* For reading, set the window to the second mover record, so that the
+ * mover will pause immediately when it wants to read the first mover
+ * record. */
+ if (!ndmp_connection_mover_set_window(self->ndmp,
+ dself->block_size,
+ dself->block_size)) {
+ set_error_from_ndmp(self);
+ return FALSE;
+ }
}
/* then tell it to start listening */
return FALSE;
}
self->listen_addrs = *addrs;
- self->for_writing = for_writing;
return TRUE;
}
-static gboolean
+static gpointer
+accept_wait_cond(
+ gpointer data)
+{
+ NdmpDevice *self = NDMP_DEVICE(data);
+
+ ndmp9_mover_state state;
+ guint64 bytes_moved;
+
+ gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */
+ g_mutex_lock(self->abort_mutex);
+ while (1) {
+ g_mutex_unlock(self->abort_mutex);
+ if (!ndmp_connection_mover_get_state(self->ndmp,
+ &state, &bytes_moved, NULL, NULL)) {
+ g_mutex_lock(self->abort_mutex);
+ set_error_from_ndmp(self);
+ state = 0;
+ break;
+ }
+ g_mutex_lock(self->abort_mutex);
+
+ if (state != NDMP9_MOVER_STATE_LISTEN)
+ break;
+
+ /* back off a little bit to give the other side time to breathe,
+ * but not more than one second */
+ g_mutex_unlock(self->abort_mutex);
+ g_usleep(backoff);
+ g_mutex_lock(self->abort_mutex);
+ if (self->cancel)
+ break;
+ backoff *= 2;
+ if (backoff > G_USEC_PER_SEC)
+ backoff = G_USEC_PER_SEC;
+ }
+
+ self->cancel = TRUE;
+ g_cond_broadcast(self->abort_cond);
+ g_mutex_unlock(self->abort_mutex);
+ return GINT_TO_POINTER(state);
+}
+
+static int
accept_impl(
Device *dself,
DirectTCPConnection **dtcpconn,
- ProlongProc prolong,
- gpointer prolong_data)
+ gboolean *cancelled,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
{
NdmpDevice *self = NDMP_DEVICE(dself);
ndmp9_mover_state state;
- guint64 bytes_moved;
ndmp9_mover_mode mode;
- ndmp9_mover_pause_reason reason;
+ ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+ ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
guint64 seek_position;
+ GThread *wait_thread;
+ int result;
+ char *err;
- if (device_in_error(self)) return FALSE;
+ if (device_in_error(self)) return 1;
+
+ self->abort_mutex = abort_mutex;
+ self->abort_cond = abort_cond;
+ self->cancelled = cancelled;
+ self->cancel = FALSE;
g_assert(self->listen_addrs);
*dtcpconn = NULL;
- /* TODO: support aborting this operation - maybe just always poll? */
- prolong = prolong;
- prolong_data = prolong_data;
-
if (!self->for_writing) {
/* when reading, we don't get any kind of notification that the
* connection has been established, but we can't call NDMP_MOVER_READ
* until the mover is active. So we have to poll, waiting for ACTIVE.
* This is ugly. */
- gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */
- while (1) {
- if (!ndmp_connection_mover_get_state(self->ndmp,
- &state, &bytes_moved, NULL, NULL)) {
- set_error_from_ndmp(self);
- return FALSE;
- }
- if (state != NDMP9_MOVER_STATE_LISTEN)
- break;
+ wait_thread = g_thread_create(accept_wait_cond, (gpointer)self, TRUE,
+ NULL);
+ while (!*cancelled && !self->cancel) {
+ g_cond_wait(self->abort_cond, self->abort_mutex);
+ }
+ self->cancel = TRUE;
+ state = GPOINTER_TO_INT(g_thread_join(wait_thread));
- /* back off a little bit to give the other side time to breathe,
- * but not more than one second */
- g_usleep(backoff);
- backoff *= 2;
- if (backoff > G_USEC_PER_SEC)
- backoff = G_USEC_PER_SEC;
+ if (*cancelled) {
+ result = 2;
+ goto accept_failed;
}
/* double-check state */
device_set_error(DEVICE(self),
g_strdup("mover did not enter the ACTIVE state as expected"),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ result = 1;
+ goto accept_failed;
}
/* now, we need to get this into the PAUSED state, since right now we
* beginning of a part. */
if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
set_error_from_ndmp(self);
- return FALSE;
+ result = 1;
+ goto accept_failed;
}
/* now we should expect a notice that the mover has paused */
* in, so there's no need to do anything to trigger the pause. */
}
+ if (self->indirecttcp_sock == -1) {
+ /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to
+ * write outside the window, while the standard specifies .._EOW,
+ * instead. When reading to a connection, we get the appropriate
+ * .._SEEK. It's easy enough to handle both. */
+ result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+ NULL,
+ &mover_halt_reason,
+ &mover_pause_reason, &seek_position,
+ cancelled,
+ abort_mutex, abort_cond);
+
+ if (result == 1) {
+ set_error_from_ndmp(self);
+ goto accept_failed;
+ } else if (result == 2) {
+ goto accept_failed;
+ }
+
+ err = NULL;
+ if (mover_pause_reason) {
+ switch (mover_pause_reason) {
+ case NDMP9_MOVER_PAUSE_SEEK:
+ case NDMP9_MOVER_PAUSE_EOW:
+ break;
+ default:
+ err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
+ break;
+ }
+ } else if (mover_halt_reason) {
+ err = "unexpected NOTIFY_MOVER_HALT";
+ }
+
+ if (err) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("waiting NDMP_MOVER_PAUSE_SEEK: %s", err),
+ DEVICE_STATUS_DEVICE_ERROR);
+ result = 1;
+ goto accept_failed;
+
+ }
+ }
+
/* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to write
* outside the window, while the standard specifies .._EOW, instead. When
* reading to a connection, we get the appropriate .._SEEK. It's easy
* enough to handle both. */
- if (!ndmp_connection_wait_for_notify(self->ndmp,
- NULL,
- NULL,
- &reason, &seek_position)) {
- set_error_from_ndmp(self);
- return FALSE;
- }
-
- if (reason != NDMP9_MOVER_PAUSE_SEEK && reason != NDMP9_MOVER_PAUSE_EOW) {
- device_set_error(DEVICE(self),
- g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ if (self->indirecttcp_sock == -1) {
+ g_free(self->listen_addrs);
+ self->listen_addrs = NULL;
}
- g_free(self->listen_addrs);
- self->listen_addrs = NULL;
-
if (self->for_writing)
mode = NDMP9_MOVER_MODE_READ;
else
/* reference it for the caller */
g_object_ref(*dtcpconn);
- return TRUE;
+ return 0;
+
+accept_failed:
+ if (self->indirecttcp_sock == -1) {
+ g_free(self->listen_addrs);
+ self->listen_addrs = NULL;
+ }
+ return result;
}
-static gboolean
+
+static int
connect_impl(
Device *dself,
gboolean for_writing,
DirectTCPAddr *addrs,
DirectTCPConnection **dtcpconn,
- ProlongProc prolong,
- gpointer prolong_data)
+ int *cancelled,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
{
NdmpDevice *self = NDMP_DEVICE(dself);
ndmp9_mover_mode mode;
- ndmp9_mover_pause_reason reason;
+ ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+ ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
guint64 seek_position;
+ int result;
g_assert(!self->listen_addrs);
*dtcpconn = NULL;
self->for_writing = for_writing;
- /* TODO: support aborting this operation - maybe just always poll? */
- prolong = prolong;
- prolong_data = prolong_data;
-
if (!open_tape_agent(self)) {
/* error message was set by open_tape_agent */
- return FALSE;
+ return 1;
}
/* first, set the window to an empty span so that the mover doesn't start
if (!ndmp_connection_mover_set_record_size(self->ndmp,
DEVICE(self)->block_size)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
if (self->for_writing)
if (!ndmp_connection_mover_connect(self->ndmp, mode, addrs)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
if (!self->for_writing) {
* SEEK pause. */
if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
/* now we should expect a notice that the mover has paused */
* reading to a connection, we get the appropriate .._SEEK. It's easy
* enough to handle both. */
- if (!ndmp_connection_wait_for_notify(self->ndmp,
- NULL,
+ result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
NULL,
- &reason, &seek_position)) {
+ &mover_halt_reason,
+ &mover_pause_reason, &seek_position,
+ cancelled,
+ abort_mutex, abort_cond);
+
+ if (result == 1) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
+ } else if (result == 2) {
+ return 2;
}
- if (reason != NDMP9_MOVER_PAUSE_SEEK && reason != NDMP9_MOVER_PAUSE_EOW) {
+ if (mover_halt_reason != NDMP9_MOVER_HALT_NA) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("got NDMP9_MOVER_HALT"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return 1;
+ }
+ if (mover_pause_reason != NDMP9_MOVER_PAUSE_SEEK &&
+ mover_pause_reason != NDMP9_MOVER_PAUSE_EOW) {
device_set_error(DEVICE(self),
g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ return 1;
}
if (self->listen_addrs) {
/* reference it for the caller */
g_object_ref(*dtcpconn);
- return TRUE;
+ return 0;
}
static gboolean
+indirecttcp_start_writing(
+ NdmpDevice *self)
+{
+ DirectTCPAddr *real_addrs, *iter;
+ int conn_sock;
+
+ /* The current state is that the other end is trying to connect to
+ * indirecttcp_sock. The mover remains IDLE, although its window is set
+ * correctly for the part we are about to write. */
+
+ g_debug("indirecttcp_start_writing, ready to accept");
+ conn_sock = accept(self->indirecttcp_sock, NULL, NULL);
+ if (conn_sock < 0) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("Could not accept indirecttcp socket: %s", strerror(errno)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ g_debug("indirecttcp_start_writing, accepted");
+
+ close(self->indirecttcp_sock);
+ self->indirecttcp_sock = -1;
+
+ /* tell mover to start listening */
+ g_assert(self->for_writing);
+ if (!ndmp_connection_mover_listen(self->ndmp,
+ NDMP4_MOVER_MODE_READ,
+ NDMP4_ADDR_TCP,
+ &real_addrs)) {
+ set_error_from_ndmp(self);
+ return FALSE;
+ }
+
+ /* format the addresses and send them down the socket */
+ for (iter = real_addrs; iter && SU_GET_FAMILY(iter) != 0; iter++) {
+ char inet[INET_ADDRSTRLEN];
+ const char *addr;
+ char *addrspec;
+
+ addr = inet_ntop(AF_INET, &iter->sin.sin_addr.s_addr, inet, INET_ADDRSTRLEN);
+
+ addrspec = g_strdup_printf("%s:%d%s", addr, SU_GET_PORT(iter),
+ SU_GET_FAMILY(iter+1) !=0? " ":"");
+ g_debug("indirecttcp_start_writing, send %s", addrspec);
+ if (full_write(conn_sock, addrspec, strlen(addrspec)) < strlen(addrspec)) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("writing to indirecttcp socket: %s", strerror(errno)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ }
+
+ /* close the socket for good. This ensures that the next call to
+ * write_from_connection_impl will not go through the mover setup process.
+ * */
+ if (close(conn_sock) < 0) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("closing indirecttcp socket: %s", strerror(errno)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ conn_sock = -1;
+
+ /* and free the listen_addrs, since we didn't free them in accept_impl */
+ if (self->listen_addrs) {
+ g_free(self->listen_addrs);
+ self->listen_addrs = NULL;
+ }
+
+ /* Now it's up to the remote end to connect to the mover and start sending
+ * data. We won't get any notification when this happens, although we could
+ * in principle poll for such a thing. */
+ return TRUE;
+}
+
+static int
write_from_connection_impl(
- Device *dself,
- guint64 size,
- guint64 *actual_size)
+ Device *dself,
+ guint64 size,
+ guint64 *actual_size,
+ int *cancelled,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
{
NdmpDevice *self = NDMP_DEVICE(dself);
DirectTCPConnectionNDMP *nconn = self->directtcp_conn;
gboolean eom = FALSE, eof = FALSE, eow = FALSE;
ndmp9_mover_state mover_state;
- ndmp9_mover_halt_reason halt_reason;
- ndmp9_mover_pause_reason pause_reason;
+ ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+ ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
guint64 bytes_moved_before, bytes_moved_after;
gchar *err;
+ int result;
if (device_in_error(self)) return FALSE;
+ g_debug("write_from_connection_impl");
if (actual_size)
*actual_size = 0;
if (!ndmp_connection_mover_get_state(self->ndmp,
&mover_state, &bytes_moved_before, NULL, NULL)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
- /* the mover had best be PAUSED right now */
- g_assert(mover_state == NDMP9_MOVER_STATE_PAUSED);
+ if (self->indirecttcp_sock != -1) {
+ /* If we're doing IndirectTCP, then we've deferred the whole
+ * mover_set_window mover_listen process.. until now.
+ * So the mover should be IDLE.
+ */
+ g_assert(mover_state == NDMP9_MOVER_STATE_IDLE);
+ } else {
+ /* the mover had best be PAUSED right now */
+ g_assert(mover_state == NDMP9_MOVER_STATE_PAUSED);
+ }
+ /* we want to set the window regardless of whether this is directtcp or
+ * indirecttcp
+ */
if (!ndmp_connection_mover_set_window(self->ndmp,
nconn->offset,
size? size : G_MAXUINT64 - nconn->offset)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
- if (!ndmp_connection_mover_continue(self->ndmp)) {
- set_error_from_ndmp(self);
- return FALSE;
+ /* for DirectTCP, we just tell the mover to continue; IndirectTCP is more complicated. */
+ if (self->indirecttcp_sock != -1) {
+ if (!indirecttcp_start_writing(self)) {
+ return 1;
+ }
+ } else {
+ if (!ndmp_connection_mover_continue(self->ndmp)) {
+ set_error_from_ndmp(self);
+ return 1;
+ }
}
/* now wait for the mover to pause itself again, or halt on EOF or an error */
- if (!ndmp_connection_wait_for_notify(self->ndmp,
- NULL,
- &halt_reason,
- &pause_reason, NULL)) {
+ result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+ NULL,
+ &mover_halt_reason,
+ &mover_pause_reason, NULL,
+ cancelled, abort_mutex, abort_cond);
+ if (result == 1) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
+ } else if (result == 2) {
+ return 2;
}
err = NULL;
- if (pause_reason) {
- switch (pause_reason) {
+ if (mover_pause_reason) {
+ switch (mover_pause_reason) {
case NDMP9_MOVER_PAUSE_EOM:
eom = TRUE;
break;
err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
break;
}
- } else if (halt_reason) {
- switch (halt_reason) {
+ } else if (mover_halt_reason) {
+ switch (mover_halt_reason) {
case NDMP9_MOVER_HALT_CONNECT_CLOSED:
eof = TRUE;
break;
device_set_error(DEVICE(self),
g_strdup_printf("waiting for accept: %s", err),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ return 1;
}
/* no error, so the mover stopped due to one of EOM (volume out of space),
if (!ndmp_connection_mover_get_state(self->ndmp,
&mover_state, &bytes_moved_after, NULL, NULL)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
size = bytes_moved_after - bytes_moved_before;
nconn->offset += size;
* we do need to figure out the actual size */
DEVICE(self)->is_eom = TRUE;
} else {
+ g_assert_not_reached();
error("not reached");
}
- return TRUE;
+ return 0;
}
-static gboolean
+static int
read_to_connection_impl(
Device *dself,
guint64 size,
- guint64 *actual_size)
+ guint64 *actual_size,
+ int *cancelled,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
{
NdmpDevice *self = NDMP_DEVICE(dself);
DirectTCPConnectionNDMP *nconn = self->directtcp_conn;
gboolean eom = FALSE, eof = FALSE, eow = FALSE;
ndmp9_mover_state mover_state;
- ndmp9_mover_halt_reason halt_reason;
- ndmp9_mover_pause_reason pause_reason;
+ ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+ ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
guint64 bytes_moved_before, bytes_moved_after;
gchar *err;
+ int result;
if (actual_size)
*actual_size = 0;
- if (device_in_error(self)) return FALSE;
+ if (device_in_error(self)) return 1;
+
+ /* read_to_connection does not support IndirectTCP */
+ g_assert(self->indirecttcp_sock == -1);
/* if this is false, then the caller did not use use_connection correctly */
g_assert(nconn != NULL);
if (!ndmp_connection_mover_get_state(self->ndmp,
&mover_state, &bytes_moved_before, NULL, NULL)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
/* the mover had best be PAUSED right now */
nconn->offset,
size? size : G_MAXUINT64 - nconn->offset)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
if (!ndmp_connection_mover_continue(self->ndmp)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
/* now wait for the mover to pause itself again, or halt on EOF or an error */
- if (!ndmp_connection_wait_for_notify(self->ndmp,
- NULL,
- &halt_reason,
- &pause_reason, NULL)) {
+ result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+ NULL,
+ &mover_halt_reason,
+ &mover_pause_reason, NULL,
+ cancelled, abort_mutex, abort_cond);
+ if (result == 1) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
+ } else if (result == 2) {
+ return 2;
}
err = NULL;
- if (pause_reason) {
- switch (pause_reason) {
+ if (mover_pause_reason) {
+ switch (mover_pause_reason) {
case NDMP9_MOVER_PAUSE_EOF:
eof = TRUE;
break;
err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
break;
}
- } else if (halt_reason) {
- switch (halt_reason) {
+ } else if (mover_halt_reason) {
+ switch (mover_halt_reason) {
case NDMP9_MOVER_HALT_CONNECT_CLOSED:
eof = TRUE;
break;
device_set_error(DEVICE(self),
g_strdup_printf("waiting for accept: %s", err),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ return 1;
}
/* no error, so the mover stopped due to one of EOM (volume out of space),
if (!ndmp_connection_mover_get_state(self->ndmp,
&mover_state, &bytes_moved_after, NULL, NULL)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
size = bytes_moved_after - bytes_moved_before;
nconn->offset += size;
* we do need to figure out the actual size */
DEVICE(self)->is_eom = TRUE;
} else {
+ g_assert_not_reached();
error("not reached");
}
- return TRUE;
+ return 0;
}
static gboolean
if (read_block_size != 0 &&
((gsize)read_block_size < p_self->block_size ||
- (gsize)read_block_size > p_self->max_block_size))
+ (gsize)read_block_size > p_self->max_block_size)) {
+ device_set_error(p_self,
+ g_strdup_printf("Error setting READ-BLOCk-SIZE property to '%zu', it must be between %zu and %zu", read_block_size, p_self->block_size, p_self->max_block_size),
+ DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
+ }
self->read_block_size = read_block_size;
val, surety, source);
}
+static gboolean
+ndmp_device_set_indirect_fn(Device *dself,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ NdmpDevice *self = NDMP_DEVICE(dself);
+
+ self->indirect = g_value_get_boolean(val);
+
+ return device_simple_property_set_fn(dself, base, val, surety, source);
+}
+
static void
ndmp_device_class_init(NdmpDeviceClass * c G_GNUC_UNUSED)
{
device_class->directtcp_supported = TRUE;
device_class->listen = listen_impl;
- device_class->accept = accept_impl;
- device_class->connect = connect_impl;
+ device_class->accept= accept_impl;
+ device_class->connect= connect_impl;
device_class->write_from_connection = write_from_connection_impl;
device_class->read_to_connection = read_to_connection_impl;
device_class->use_connection = use_connection_impl;
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK,
device_simple_property_get_fn,
ndmp_device_set_verbose_fn);
+
+ device_class_register_property(device_class, PROPERTY_INDIRECT,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK,
+ device_simple_property_get_fn,
+ ndmp_device_set_indirect_fn);
+
device_class_register_property(device_class, PROPERTY_READ_BLOCK_SIZE,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
g_value_unset(&response);
self->ndmp_auth = g_strdup("md5");
+ g_value_init(&response, G_TYPE_BOOLEAN);
+ g_value_set_boolean(&response, FALSE);
+ device_set_simple_property(dself, PROPERTY_INDIRECT,
+ &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
+ g_value_unset(&response);
+ self->indirect = FALSE;
+
+ self->indirecttcp_sock = -1;
}
static GType
char *device_node)
{
Device *rval;
- NdmpDevice * ndmp_rval;
g_assert(0 == strcmp(device_type, NDMP_DEVICE_NAME));
rval = DEVICE(g_object_new(TYPE_NDMP_DEVICE, NULL));
- ndmp_rval = (NdmpDevice *)rval;
device_open_device(rval, device_name, device_type, device_node);
return rval;
device_property_fill_and_register(&device_property_ndmp_auth,
G_TYPE_STRING, "ndmp_auth",
"Authentication method for the NDMP agent - md5 (default), text, none, or void");
+ device_property_fill_and_register(&device_property_indirect,
+ G_TYPE_BOOLEAN, "indirect",
+ "Use Indirect TCP mode, even if the NDMP server supports "
+ "window length 0");
}
/*
char *rv = NULL;
ndmp9_mover_state state;
guint64 bytes_moved;
- ndmp9_mover_halt_reason reason;
+ ndmp9_mover_halt_reason mover_halt_reason;
gboolean expect_notif = FALSE;
/* based on the current state, we may need to abort or stop the
if (expect_notif) {
if (!ndmp_connection_wait_for_notify(self->ndmp,
NULL,
- &reason, /* value is ignored.. */
+ &mover_halt_reason, /* value is ignored.. */
NULL, NULL)) {
goto error;
}