/*
- * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
#include "stream.h"
#include "ndmlib.h"
#include "ndmpconnobj.h"
+#include "sockaddr-util.h"
/*
* Type checking and casting macros
/* support for IndirectTCP */
int indirecttcp_sock; /* -1 if not in use */
- int force_indirecttcp;
+ int indirect;
/* Current DirectTCPConnectionNDMP */
struct DirectTCPConnectionNDMP_ *directtcp_conn;
gchar *ndmp_password;
gchar *ndmp_auth;
gboolean verbose;
+ gsize read_block_size;
+
+ GMutex *abort_mutex;
+ GCond *abort_cond;
+ gboolean cancel;
+ int *cancelled;
};
/*
static DevicePropertyBase device_property_ndmp_username;
static DevicePropertyBase device_property_ndmp_password;
static DevicePropertyBase device_property_ndmp_auth;
-static DevicePropertyBase device_property__force_indirecttcp;
+static DevicePropertyBase device_property_indirect;
#define PROPERTY_NDMP_USERNAME (device_property_ndmp_username.ID)
#define PROPERTY_NDMP_PASSWORD (device_property_ndmp_password.ID)
#define PROPERTY_NDMP_AUTH (device_property_ndmp_auth.ID)
-#define PROPERTY__FORCE_INDIRECTTCP (device_property__force_indirecttcp.ID)
+#define PROPERTY_INDIRECT (device_property_indirect.ID)
+
/*
* prototypes
void ndmp_device_register(void);
static void set_error_from_ndmp(NdmpDevice *self);
+#define ndmp_device_read_size(self) \
+ (((NdmpDevice *)(self))->read_block_size? \
+ ((NdmpDevice *)(self))->read_block_size : ((Device *)(self))->block_size)
+
/*
* Utility functions
*/
Device *dself)
{
NdmpDevice *self = NDMP_DEVICE(dself);
- dumpfile_t *header;
+ dumpfile_t *header = NULL;
gpointer buf = NULL;
guint64 buf_size = 0;
+ gsize read_block_size = 0;
amfree(dself->volume_label);
amfree(dself->volume_time);
if (device_in_error(self)) return dself->status;
- header = dself->volume_header = g_new(dumpfile_t, 1);
- fh_init(header);
-
if (!open_tape_agent(self)) {
/* error status was set by open_tape_agent */
return dself->status;
/* read the tape header from the NDMP server */
dself->status = 0;
- buf = g_malloc(dself->block_size);
+ read_block_size = ndmp_device_read_size(self);
+ buf = g_malloc(read_block_size);
if (!ndmp_connection_tape_read(self->ndmp,
buf,
- dself->block_size,
+ read_block_size,
&buf_size)) {
/* handle known errors */
device_set_error(dself,
g_strdup(_("no tape label found")),
DEVICE_STATUS_VOLUME_UNLABELED);
+ header = dself->volume_header = g_new(dumpfile_t, 1);
+ fh_init(header);
goto read_err;
default:
}
}
+ header = dself->volume_header = g_new(dumpfile_t, 1);
+ fh_init(header);
parse_file_header(buf, header, buf_size);
read_err:
dumpfile_t *header;
char *header_buf;
- self = NDMP_DEVICE(dself);
-
if (device_in_error(self)) return FALSE;
if (!open_tape_agent(self)) {
}
dself->access_mode = mode;
+ g_mutex_lock(dself->device_mutex);
dself->in_file = FALSE;
+ g_mutex_unlock(dself->device_mutex);
if (!single_ndmp_mtio(self, NDMP9_MTIO_REW)) {
/* single_ndmp_mtio already set our error message */
ndmp_device_finish(
Device *dself)
{
+ gboolean rval;
+
NdmpDevice *self = NDMP_DEVICE(dself);
- if (device_in_error(dself)) return FALSE;
+ rval = !device_in_error(dself);
/* we're not in a file anymore */
dself->access_mode = ACCESS_NULL;
if (!close_tape_agent(self)) {
/* error is set by close_tape_agent */
- return FALSE;
+ rval = FALSE;
}
if (self->ndmp)
close_connection(self);
- return TRUE;
+ return rval;
}
static gboolean
dself->is_eof = FALSE;
dself->is_eom = FALSE;
+ g_mutex_lock(dself->device_mutex);
+ dself->bytes_written = 0;
+ g_mutex_unlock(dself->device_mutex);
/* set the blocksize in the header properly */
header->blocksize = dself->block_size;
amfree(header_buf);
/* arrange the file numbers correctly */
+ g_mutex_lock(dself->device_mutex);
dself->in_file = TRUE;
+ g_mutex_unlock(dself->device_mutex);
if (!ndmp_get_state(self)) {
/* error already set by ndmp_get_state */
return FALSE;
}
dself->block++;
+ g_mutex_lock(dself->device_mutex);
+ dself->bytes_written += size;
+ g_mutex_unlock(dself->device_mutex);
if (replacement_buffer) g_free(replacement_buffer);
return TRUE;
if (device_in_error(dself)) return FALSE;
/* we're not in a file anymore */
+ g_mutex_lock(dself->device_mutex);
dself->in_file = FALSE;
+ g_mutex_unlock(dself->device_mutex);
if (!single_ndmp_mtio(self, NDMP9_MTIO_EOF)) {
/* error was set by single_ndmp_mtio */
gpointer buf;
guint64 buf_size;
dumpfile_t *header;
+ gsize read_block_size = 0;
if (device_in_error(dself)) return FALSE;
}
/* fix up status */
+ g_mutex_lock(dself->device_mutex);
dself->in_file = TRUE;
+ g_mutex_unlock(dself->device_mutex);
dself->file = file;
dself->block = 0;
+ g_mutex_lock(dself->device_mutex);
+ dself->bytes_read = 0;
+ g_mutex_unlock(dself->device_mutex);
/* now read the header */
- buf = g_malloc(dself->block_size);
+ read_block_size = ndmp_device_read_size(self);
+ buf = g_malloc(read_block_size);
if (!ndmp_connection_tape_read(self->ndmp,
- buf, dself->block_size, &buf_size)) {
+ buf, read_block_size, &buf_size)) {
switch (ndmp_connection_err_code(self->ndmp)) {
case NDMP9_EOF_ERR:
case NDMP9_EOM_ERR:
ndmp_device_read_block (Device * dself, gpointer data, int *size_req) {
NdmpDevice *self = NDMP_DEVICE(dself);
guint64 requested, actual;
+ gsize read_block_size = ndmp_device_read_size(self);
/* We checked the NDMP device's blocksize when the device was opened, which should
* catch any misalignent of server block size and Amanda block size */
- g_assert(dself->block_size < INT_MAX); /* check data type mismatch */
- if (!data || *size_req < (int)(dself->block_size)) {
- *size_req = (int)(dself->block_size);
+ g_assert(read_block_size < INT_MAX); /* check data type mismatch */
+ if (!data || *size_req < (int)(read_block_size)) {
+ *size_req = (int)(read_block_size);
return 0;
}
}
*size_req = (int)actual; /* cast is OK - requested size was < INT_MAX too */
+ g_mutex_lock(dself->device_mutex);
+ dself->bytes_read += actual;
+ g_mutex_unlock(dself->device_mutex);
return *size_req;
}
/* An IndirectTCP address is 255.255.255.255:$port */
self->listen_addrs = *addrs = g_new0(DirectTCPAddr, 2);
- (*addrs)->ipv4 = 0xffffffff;
- (*addrs)->port = port;
+ addrs[0]->sin.sin_family = AF_INET;
+ addrs[0]->sin.sin_addr.s_addr = htonl(0xffffffff);
+ SU_SET_PORT(addrs[0], port);
return TRUE;
}
if (for_writing) {
/* if we're forcing indirecttcp, just do it */
- if (self->force_indirecttcp) {
+ if (self->indirect) {
return indirecttcp_listen(self, addrs);
}
if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) {
- /* NDMP9_ILLEGAL_ARGS_ERR means the NDMP server doesn't like a zero-byte
+ /* NDMP4_ILLEGAL_ARGS_ERR means the NDMP server doesn't like a zero-byte
* mover window, so we'll ignore it */
- if (ndmp_connection_err_code(self->ndmp) != NDMP9_ILLEGAL_ARGS_ERR) {
+ if (ndmp_connection_err_code(self->ndmp) != NDMP4_ILLEGAL_ARGS_ERR) {
set_error_from_ndmp(self);
return FALSE;
}
* mover will pause immediately when it wants to read the first mover
* record. */
if (!ndmp_connection_mover_set_window(self->ndmp,
- DEVICE(self)->block_size,
- DEVICE(self)->block_size)) {
+ dself->block_size,
+ dself->block_size)) {
set_error_from_ndmp(self);
return FALSE;
}
/* then tell it to start listening */
if (!ndmp_connection_mover_listen(self->ndmp,
- for_writing? NDMP4_MOVER_MODE_READ : NDMP4_MOVER_MODE_WRITE,
- NDMP4_ADDR_TCP,
+ for_writing? NDMP9_MOVER_MODE_READ : NDMP9_MOVER_MODE_WRITE,
+ NDMP9_ADDR_TCP,
addrs)) {
set_error_from_ndmp(self);
return FALSE;
return TRUE;
}
-static gboolean
+static gpointer
+accept_wait_cond(
+ gpointer data)
+{
+ NdmpDevice *self = NDMP_DEVICE(data);
+
+ ndmp9_mover_state state;
+ guint64 bytes_moved;
+
+ gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */
+ g_mutex_lock(self->abort_mutex);
+ while (1) {
+ g_mutex_unlock(self->abort_mutex);
+ if (!ndmp_connection_mover_get_state(self->ndmp,
+ &state, &bytes_moved, NULL, NULL)) {
+ g_mutex_lock(self->abort_mutex);
+ set_error_from_ndmp(self);
+ state = 0;
+ break;
+ }
+ g_mutex_lock(self->abort_mutex);
+
+ if (state != NDMP9_MOVER_STATE_LISTEN)
+ break;
+
+ /* back off a little bit to give the other side time to breathe,
+ * but not more than one second */
+ g_mutex_unlock(self->abort_mutex);
+ g_usleep(backoff);
+ g_mutex_lock(self->abort_mutex);
+ if (self->cancel)
+ break;
+ backoff *= 2;
+ if (backoff > G_USEC_PER_SEC)
+ backoff = G_USEC_PER_SEC;
+ }
+
+ self->cancel = TRUE;
+ g_cond_broadcast(self->abort_cond);
+ g_mutex_unlock(self->abort_mutex);
+ return GINT_TO_POINTER(state);
+}
+
+static int
accept_impl(
Device *dself,
DirectTCPConnection **dtcpconn,
- ProlongProc prolong,
- gpointer prolong_data)
+ gboolean *cancelled,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
{
NdmpDevice *self = NDMP_DEVICE(dself);
ndmp9_mover_state state;
- guint64 bytes_moved;
ndmp9_mover_mode mode;
- ndmp9_mover_pause_reason reason;
+ ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+ ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
guint64 seek_position;
+ GThread *wait_thread;
+ int result;
+ char *err;
- if (device_in_error(self)) return FALSE;
+ if (device_in_error(self)) return 1;
+
+ self->abort_mutex = abort_mutex;
+ self->abort_cond = abort_cond;
+ self->cancelled = cancelled;
+ self->cancel = FALSE;
g_assert(self->listen_addrs);
*dtcpconn = NULL;
- /* TODO: support aborting this operation - maybe just always poll? */
- prolong = prolong;
- prolong_data = prolong_data;
-
if (!self->for_writing) {
/* when reading, we don't get any kind of notification that the
* connection has been established, but we can't call NDMP_MOVER_READ
* until the mover is active. So we have to poll, waiting for ACTIVE.
* This is ugly. */
- gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */
- while (1) {
- if (!ndmp_connection_mover_get_state(self->ndmp,
- &state, &bytes_moved, NULL, NULL)) {
- set_error_from_ndmp(self);
- return FALSE;
- }
- if (state != NDMP4_MOVER_STATE_LISTEN)
- break;
+ wait_thread = g_thread_create(accept_wait_cond, (gpointer)self, TRUE,
+ NULL);
+ while (!*cancelled && !self->cancel) {
+ g_cond_wait(self->abort_cond, self->abort_mutex);
+ }
+ self->cancel = TRUE;
+ state = GPOINTER_TO_INT(g_thread_join(wait_thread));
- /* back off a little bit to give the other side time to breathe,
- * but not more than one second */
- g_usleep(backoff);
- backoff *= 2;
- if (backoff > G_USEC_PER_SEC)
- backoff = G_USEC_PER_SEC;
+ if (*cancelled) {
+ result = 2;
+ goto accept_failed;
}
/* double-check state */
- if (state != NDMP4_MOVER_STATE_ACTIVE) {
+ if (state != NDMP9_MOVER_STATE_ACTIVE) {
device_set_error(DEVICE(self),
g_strdup("mover did not enter the ACTIVE state as expected"),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ result = 1;
+ goto accept_failed;
}
/* now, we need to get this into the PAUSED state, since right now we
* beginning of a part. */
if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
set_error_from_ndmp(self);
- return FALSE;
+ result = 1;
+ goto accept_failed;
}
/* now we should expect a notice that the mover has paused */
} else {
/* when writing, the mover will pause as soon as the first byte comes
- * in, so there's no need to do anything to trigger the pause.
- *
- * Well, sometimes it won't - specifically, when it does not allow a
- * zero-byte mover window, which means we've set up IndirectTCP. But in
- * that case, there's nothing interesting to do here.*/
+ * in, so there's no need to do anything to trigger the pause. */
}
if (self->indirecttcp_sock == -1) {
- /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to write
- * outside the window, while the standard specifies .._EOW, instead. When
- * reading to a connection, we get the appropriate .._SEEK. It's easy
- * enough to handle both. */
-
- if (!ndmp_connection_wait_for_notify(self->ndmp,
- NULL,
- NULL,
- &reason, &seek_position)) {
+ /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to
+ * write outside the window, while the standard specifies .._EOW,
+ * instead. When reading to a connection, we get the appropriate
+ * .._SEEK. It's easy enough to handle both. */
+ result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+ NULL,
+ &mover_halt_reason,
+ &mover_pause_reason, &seek_position,
+ cancelled,
+ abort_mutex, abort_cond);
+
+ if (result == 1) {
set_error_from_ndmp(self);
- return FALSE;
+ goto accept_failed;
+ } else if (result == 2) {
+ goto accept_failed;
+ }
+
+ err = NULL;
+ if (mover_pause_reason) {
+ switch (mover_pause_reason) {
+ case NDMP9_MOVER_PAUSE_SEEK:
+ case NDMP9_MOVER_PAUSE_EOW:
+ break;
+ default:
+ err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
+ break;
+ }
+ } else if (mover_halt_reason) {
+ err = "unexpected NOTIFY_MOVER_HALT";
}
- if (reason != NDMP9_MOVER_PAUSE_SEEK && reason != NDMP9_MOVER_PAUSE_EOW) {
+ if (err) {
device_set_error(DEVICE(self),
- g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
+ g_strdup_printf("waiting NDMP_MOVER_PAUSE_SEEK: %s", err),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ result = 1;
+ goto accept_failed;
+
}
}
- /* at this point, if we're doing directtcp, the mover is paused and ready
- * to go, and the listen addrs are no longer required; if we're doing
- * indirecttcp, then the other end may not even know of our listen_addrs
- * yet, so we can't free them. */
+ /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to write
+ * outside the window, while the standard specifies .._EOW, instead. When
+ * reading to a connection, we get the appropriate .._SEEK. It's easy
+ * enough to handle both. */
if (self->indirecttcp_sock == -1) {
g_free(self->listen_addrs);
}
if (self->for_writing)
- mode = NDMP4_MOVER_MODE_WRITE;
+ mode = NDMP9_MOVER_MODE_READ;
else
- mode = NDMP4_MOVER_MODE_READ;
+ mode = NDMP9_MOVER_MODE_WRITE;
/* set up the new directtcp connection */
if (self->directtcp_conn)
/* reference it for the caller */
g_object_ref(*dtcpconn);
- return TRUE;
+ return 0;
+
+accept_failed:
+ if (self->indirecttcp_sock == -1) {
+ g_free(self->listen_addrs);
+ self->listen_addrs = NULL;
+ }
+ return result;
+}
+
+
+static int
+connect_impl(
+ Device *dself,
+ gboolean for_writing,
+ DirectTCPAddr *addrs,
+ DirectTCPConnection **dtcpconn,
+ int *cancelled,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
+{
+ NdmpDevice *self = NDMP_DEVICE(dself);
+ ndmp9_mover_mode mode;
+ ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+ ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
+ guint64 seek_position;
+ int result;
+
+ g_assert(!self->listen_addrs);
+
+ *dtcpconn = NULL;
+ self->for_writing = for_writing;
+
+ if (!open_tape_agent(self)) {
+ /* error message was set by open_tape_agent */
+ return 1;
+ }
+
+ /* first, set the window to an empty span so that the mover doesn't start
+ * reading or writing data immediately. NDMJOB tends to reset the record
+ * size periodically (in direct contradiction to the spec), so we reset it
+ * here as well. */
+ if (!ndmp_connection_mover_set_record_size(self->ndmp,
+ DEVICE(self)->block_size)) {
+ set_error_from_ndmp(self);
+ return 1;
+ }
+
+ if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) {
+ set_error_from_ndmp(self);
+ return 1;
+ }
+
+ if (self->for_writing)
+ mode = NDMP9_MOVER_MODE_READ;
+ else
+ mode = NDMP9_MOVER_MODE_WRITE;
+
+ if (!ndmp_connection_mover_connect(self->ndmp, mode, addrs)) {
+ set_error_from_ndmp(self);
+ return 1;
+ }
+
+ if (!self->for_writing) {
+ /* The agent is in the ACTIVE state, and will remain so until we tell
+ * it to do something else. The thing we want to is for it to start
+ * reading data from the tape, which will immediately trigger an EOW or
+ * SEEK pause. */
+ if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
+ set_error_from_ndmp(self);
+ return 1;
+ }
+
+ /* now we should expect a notice that the mover has paused */
+ } else {
+ /* when writing, the mover will pause as soon as the first byte comes
+ * in, so there's no need to do anything to trigger the pause. */
+ }
+
+ /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to write
+ * outside the window, while the standard specifies .._EOW, instead. When
+ * reading to a connection, we get the appropriate .._SEEK. It's easy
+ * enough to handle both. */
+
+ result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+ NULL,
+ &mover_halt_reason,
+ &mover_pause_reason, &seek_position,
+ cancelled,
+ abort_mutex, abort_cond);
+
+ if (result == 1) {
+ set_error_from_ndmp(self);
+ return 1;
+ } else if (result == 2) {
+ return 2;
+ }
+
+ if (mover_halt_reason != NDMP9_MOVER_HALT_NA) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("got NDMP9_MOVER_HALT"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return 1;
+ }
+ if (mover_pause_reason != NDMP9_MOVER_PAUSE_SEEK &&
+ mover_pause_reason != NDMP9_MOVER_PAUSE_EOW) {
+ device_set_error(DEVICE(self),
+ g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return 1;
+ }
+
+ if (self->listen_addrs) {
+ g_free(self->listen_addrs);
+ self->listen_addrs = NULL;
+ }
+
+ /* set up the new directtcp connection */
+ if (self->directtcp_conn)
+ g_object_unref(self->directtcp_conn);
+ self->directtcp_conn =
+ directtcp_connection_ndmp_new(self->ndmp, mode);
+ *dtcpconn = DIRECTTCP_CONNECTION(self->directtcp_conn);
+
+ /* reference it for the caller */
+ g_object_ref(*dtcpconn);
+
+ return 0;
}
static gboolean
* indirecttcp_sock. The mover remains IDLE, although its window is set
* correctly for the part we are about to write. */
+ g_debug("indirecttcp_start_writing, ready to accept");
conn_sock = accept(self->indirecttcp_sock, NULL, NULL);
if (conn_sock < 0) {
device_set_error(DEVICE(self),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
+ g_debug("indirecttcp_start_writing, accepted");
close(self->indirecttcp_sock);
self->indirecttcp_sock = -1;
}
/* format the addresses and send them down the socket */
- for (iter = real_addrs; iter && iter->ipv4; iter++) {
- struct in_addr in;
- char *addr, *addrspec;
+ for (iter = real_addrs; iter && SU_GET_FAMILY(iter) != 0; iter++) {
+ char inet[INET_ADDRSTRLEN];
+ const char *addr;
+ char *addrspec;
- in.s_addr = htonl(iter->ipv4);
- addr = inet_ntoa(in);
+ addr = inet_ntop(AF_INET, &iter->sin.sin_addr.s_addr, inet, INET_ADDRSTRLEN);
- addrspec = g_strdup_printf("%s:%d%s", addr, iter->port,
- (iter+1)->ipv4? " ":"");
+ addrspec = g_strdup_printf("%s:%d%s", addr, SU_GET_PORT(iter),
+ SU_GET_FAMILY(iter+1) !=0? " ":"");
+ g_debug("indirecttcp_start_writing, send %s", addrspec);
if (full_write(conn_sock, addrspec, strlen(addrspec)) < strlen(addrspec)) {
device_set_error(DEVICE(self),
g_strdup_printf("writing to indirecttcp socket: %s", strerror(errno)),
return TRUE;
}
-static gboolean
+static int
write_from_connection_impl(
- Device *dself,
- guint64 size,
- guint64 *actual_size)
+ Device *dself,
+ guint64 size,
+ guint64 *actual_size,
+ int *cancelled,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
{
NdmpDevice *self = NDMP_DEVICE(dself);
DirectTCPConnectionNDMP *nconn = self->directtcp_conn;
gboolean eom = FALSE, eof = FALSE, eow = FALSE;
ndmp9_mover_state mover_state;
- ndmp9_mover_halt_reason halt_reason;
- ndmp9_mover_pause_reason pause_reason;
+ ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+ ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
guint64 bytes_moved_before, bytes_moved_after;
gchar *err;
+ int result;
if (device_in_error(self)) return FALSE;
+ g_debug("write_from_connection_impl");
if (actual_size)
*actual_size = 0;
/* if this is false, then the caller did not use use_connection correctly */
g_assert(self->directtcp_conn != NULL);
g_assert(self->ndmp == nconn->ndmp);
+ g_assert(nconn->mode == NDMP9_MOVER_MODE_READ);
if (!ndmp_connection_mover_get_state(self->ndmp,
&mover_state, &bytes_moved_before, NULL, NULL)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
if (self->indirecttcp_sock != -1) {
- /* If we're doing IndirectTCP, then we've deferred the whole mover_set_window
- * / mover_listen process.. until now. So the mover should be IDLE. */
- g_assert(mover_state == NDMP4_MOVER_STATE_IDLE);
+ /* If we're doing IndirectTCP, then we've deferred the whole
+ * mover_set_window mover_listen process.. until now.
+ * So the mover should be IDLE.
+ */
+ g_assert(mover_state == NDMP9_MOVER_STATE_IDLE);
} else {
/* the mover had best be PAUSED right now */
- g_assert(mover_state == NDMP4_MOVER_STATE_PAUSED);
+ g_assert(mover_state == NDMP9_MOVER_STATE_PAUSED);
}
/* we want to set the window regardless of whether this is directtcp or
- * indirecttcp */
+ * indirecttcp
+ */
if (!ndmp_connection_mover_set_window(self->ndmp,
nconn->offset,
size? size : G_MAXUINT64 - nconn->offset)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
/* for DirectTCP, we just tell the mover to continue; IndirectTCP is more complicated. */
if (self->indirecttcp_sock != -1) {
if (!indirecttcp_start_writing(self)) {
- return FALSE;
+ return 1;
}
} else {
if (!ndmp_connection_mover_continue(self->ndmp)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
}
/* now wait for the mover to pause itself again, or halt on EOF or an error */
- if (!ndmp_connection_wait_for_notify(self->ndmp,
- NULL,
- &halt_reason,
- &pause_reason, NULL)) {
+ result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+ NULL,
+ &mover_halt_reason,
+ &mover_pause_reason, NULL,
+ cancelled, abort_mutex, abort_cond);
+ if (result == 1) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
+ } else if (result == 2) {
+ return 2;
}
err = NULL;
- if (pause_reason) {
- switch (pause_reason) {
+ if (mover_pause_reason) {
+ switch (mover_pause_reason) {
case NDMP9_MOVER_PAUSE_EOM:
eom = TRUE;
break;
err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
break;
}
- } else if (halt_reason) {
- switch (halt_reason) {
+ } else if (mover_halt_reason) {
+ switch (mover_halt_reason) {
case NDMP9_MOVER_HALT_CONNECT_CLOSED:
eof = TRUE;
break;
device_set_error(DEVICE(self),
g_strdup_printf("waiting for accept: %s", err),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ return 1;
}
/* no error, so the mover stopped due to one of EOM (volume out of space),
if (!ndmp_connection_mover_get_state(self->ndmp,
&mover_state, &bytes_moved_after, NULL, NULL)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
size = bytes_moved_after - bytes_moved_before;
nconn->offset += size;
* we do need to figure out the actual size */
DEVICE(self)->is_eom = TRUE;
} else {
+ g_assert_not_reached();
error("not reached");
}
- return TRUE;
+ return 0;
}
-static gboolean
+static int
read_to_connection_impl(
Device *dself,
guint64 size,
- guint64 *actual_size)
+ guint64 *actual_size,
+ int *cancelled,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
{
NdmpDevice *self = NDMP_DEVICE(dself);
DirectTCPConnectionNDMP *nconn = self->directtcp_conn;
gboolean eom = FALSE, eof = FALSE, eow = FALSE;
ndmp9_mover_state mover_state;
- ndmp9_mover_halt_reason halt_reason;
- ndmp9_mover_pause_reason pause_reason;
+ ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
+ ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
guint64 bytes_moved_before, bytes_moved_after;
gchar *err;
+ int result;
if (actual_size)
*actual_size = 0;
- if (device_in_error(self)) return FALSE;
+ if (device_in_error(self)) return 1;
/* read_to_connection does not support IndirectTCP */
g_assert(self->indirecttcp_sock == -1);
/* if this is false, then the caller did not use use_connection correctly */
g_assert(nconn != NULL);
g_assert(self->ndmp == nconn->ndmp);
+ g_assert(nconn->mode == NDMP9_MOVER_MODE_WRITE);
if (!ndmp_connection_mover_get_state(self->ndmp,
&mover_state, &bytes_moved_before, NULL, NULL)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
/* the mover had best be PAUSED right now */
- g_assert(mover_state == NDMP4_MOVER_STATE_PAUSED);
+ g_assert(mover_state == NDMP9_MOVER_STATE_PAUSED);
if (!ndmp_connection_mover_set_window(self->ndmp,
nconn->offset,
size? size : G_MAXUINT64 - nconn->offset)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
if (!ndmp_connection_mover_continue(self->ndmp)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
/* now wait for the mover to pause itself again, or halt on EOF or an error */
- if (!ndmp_connection_wait_for_notify(self->ndmp,
- NULL,
- &halt_reason,
- &pause_reason, NULL)) {
+ result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
+ NULL,
+ &mover_halt_reason,
+ &mover_pause_reason, NULL,
+ cancelled, abort_mutex, abort_cond);
+ if (result == 1) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
+ } else if (result == 2) {
+ return 2;
}
err = NULL;
- if (pause_reason) {
- switch (pause_reason) {
+ if (mover_pause_reason) {
+ switch (mover_pause_reason) {
case NDMP9_MOVER_PAUSE_EOF:
eof = TRUE;
break;
err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
break;
}
- } else if (halt_reason) {
- switch (halt_reason) {
+ } else if (mover_halt_reason) {
+ switch (mover_halt_reason) {
case NDMP9_MOVER_HALT_CONNECT_CLOSED:
eof = TRUE;
break;
device_set_error(DEVICE(self),
g_strdup_printf("waiting for accept: %s", err),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ return 1;
}
/* no error, so the mover stopped due to one of EOM (volume out of space),
if (!ndmp_connection_mover_get_state(self->ndmp,
&mover_state, &bytes_moved_after, NULL, NULL)) {
set_error_from_ndmp(self);
- return FALSE;
+ return 1;
}
size = bytes_moved_after - bytes_moved_before;
nconn->offset += size;
* we do need to figure out the actual size */
DEVICE(self)->is_eom = TRUE;
} else {
+ g_assert_not_reached();
error("not reached");
}
- return TRUE;
+ return 0;
}
static gboolean
}
static gboolean
-ndmp_device_set__force_indirecttcp_fn(Device *dself,
+ndmp_device_set_read_block_size_fn(Device *p_self, DevicePropertyBase *base G_GNUC_UNUSED,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ NdmpDevice *self = NDMP_DEVICE(p_self);
+ gsize read_block_size = g_value_get_uint(val);
+
+ if (read_block_size != 0 &&
+ ((gsize)read_block_size < p_self->block_size ||
+ (gsize)read_block_size > p_self->max_block_size)) {
+ device_set_error(p_self,
+ g_strdup_printf("Error setting READ-BLOCk-SIZE property to '%zu', it must be between %zu and %zu", read_block_size, p_self->block_size, p_self->max_block_size),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+
+ self->read_block_size = read_block_size;
+
+ /* use the READ_BLOCK_SIZE, even if we're invoked to get the old READ_BUFFER_SIZE */
+ return device_simple_property_set_fn(p_self, base,
+ val, surety, source);
+}
+
+static gboolean
+ndmp_device_set_indirect_fn(Device *dself,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
NdmpDevice *self = NDMP_DEVICE(dself);
- self->force_indirecttcp = g_value_get_boolean(val);
+ self->indirect = g_value_get_boolean(val);
return device_simple_property_set_fn(dself, base, val, surety, source);
}
device_class->directtcp_supported = TRUE;
device_class->listen = listen_impl;
- device_class->accept = accept_impl;
+ device_class->accept= accept_impl;
+ device_class->connect= connect_impl;
device_class->write_from_connection = write_from_connection_impl;
device_class->read_to_connection = read_to_connection_impl;
device_class->use_connection = use_connection_impl;
device_simple_property_get_fn,
ndmp_device_set_verbose_fn);
- device_class_register_property(device_class, PROPERTY__FORCE_INDIRECTTCP,
+ device_class_register_property(device_class, PROPERTY_INDIRECT,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK,
device_simple_property_get_fn,
- ndmp_device_set__force_indirecttcp_fn);
+ ndmp_device_set_indirect_fn);
+
+ device_class_register_property(device_class, PROPERTY_READ_BLOCK_SIZE,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ ndmp_device_set_read_block_size_fn);
}
static void
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
+ g_value_init(&response, G_TYPE_BOOLEAN);
+ g_value_set_boolean(&response, TRUE);
+ device_set_simple_property(dself, PROPERTY_LEOM,
+ &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
+ g_value_unset(&response);
+
g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
+ self->read_block_size = 0;
+ g_value_init(&response, G_TYPE_UINT);
+ g_value_set_uint(&response, self->read_block_size);
+ device_set_simple_property(dself, PROPERTY_READ_BLOCK_SIZE,
+ &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
+ g_value_unset(&response);
+
g_value_init(&response, G_TYPE_STRING);
g_value_set_string(&response, "ndmp");
device_set_simple_property(dself, PROPERTY_NDMP_USERNAME,
g_value_init(&response, G_TYPE_BOOLEAN);
g_value_set_boolean(&response, FALSE);
- device_set_simple_property(dself, PROPERTY__FORCE_INDIRECTTCP,
+ device_set_simple_property(dself, PROPERTY_INDIRECT,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
g_value_unset(&response);
- self->force_indirecttcp = FALSE;
+ self->indirect = FALSE;
self->indirecttcp_sock = -1;
}
char *device_node)
{
Device *rval;
- NdmpDevice * ndmp_rval;
g_assert(0 == strcmp(device_type, NDMP_DEVICE_NAME));
rval = DEVICE(g_object_new(TYPE_NDMP_DEVICE, NULL));
- ndmp_rval = (NdmpDevice *)rval;
device_open_device(rval, device_name, device_type, device_node);
return rval;
device_property_fill_and_register(&device_property_ndmp_auth,
G_TYPE_STRING, "ndmp_auth",
"Authentication method for the NDMP agent - md5 (default), text, none, or void");
- device_property_fill_and_register(&device_property__force_indirecttcp,
- G_TYPE_BOOLEAN, "_force_indirecttcp",
- "For testing only - force IndirectTCP mode, even if the NDMP server supports "
+ device_property_fill_and_register(&device_property_indirect,
+ G_TYPE_BOOLEAN, "indirect",
+ "Use Indirect TCP mode, even if the NDMP server supports "
"window length 0");
}
char *rv = NULL;
ndmp9_mover_state state;
guint64 bytes_moved;
- ndmp9_mover_halt_reason reason;
+ ndmp9_mover_halt_reason mover_halt_reason;
gboolean expect_notif = FALSE;
/* based on the current state, we may need to abort or stop the
if (expect_notif) {
if (!ndmp_connection_wait_for_notify(self->ndmp,
NULL,
- &reason, /* value is ignored.. */
+ &mover_halt_reason, /* value is ignored.. */
NULL, NULL)) {
goto error;
}