X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fndmp-device.c;h=d8c816a20cbfb3ccd612232963322596799d7bb2;hb=HEAD;hp=0e0c9987ea7333e253407b2a74107e11322d319a;hpb=b221e8dc16f345f8c8d7df8df71f4d36daaabb4c;p=debian%2Famanda diff --git a/device-src/ndmp-device.c b/device-src/ndmp-device.c index 0e0c998..d8c816a 100644 --- a/device-src/ndmp-device.c +++ b/device-src/ndmp-device.c @@ -1,9 +1,10 @@ /* - * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published - * by the Free Software Foundation. + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY @@ -22,8 +23,10 @@ #include "util.h" #include "device.h" #include "directtcp.h" +#include "stream.h" #include "ndmlib.h" #include "ndmpconnobj.h" +#include "sockaddr-util.h" /* * Type checking and casting macros @@ -54,6 +57,10 @@ struct NdmpDevice_ { DirectTCPAddr *listen_addrs; gboolean for_writing; + /* support for IndirectTCP */ + int indirecttcp_sock; /* -1 if not in use */ + int indirect; + /* Current DirectTCPConnectionNDMP */ struct DirectTCPConnectionNDMP_ *directtcp_conn; @@ -66,6 +73,11 @@ struct NdmpDevice_ { gchar *ndmp_auth; gboolean verbose; gsize read_block_size; + + GMutex *abort_mutex; + GCond *abort_cond; + gboolean cancel; + int *cancelled; }; /* @@ -136,9 +148,11 @@ typedef enum { static DevicePropertyBase device_property_ndmp_username; static DevicePropertyBase device_property_ndmp_password; static DevicePropertyBase device_property_ndmp_auth; +static DevicePropertyBase device_property_indirect; #define PROPERTY_NDMP_USERNAME (device_property_ndmp_username.ID) #define PROPERTY_NDMP_PASSWORD (device_property_ndmp_password.ID) #define PROPERTY_NDMP_AUTH (device_property_ndmp_auth.ID) +#define PROPERTY_INDIRECT (device_property_indirect.ID) /* @@ -453,6 +467,8 @@ static void ndmp_device_finalize(GObject * obj_self) g_free(self->ndmp_password); if (self->ndmp_auth) g_free(self->ndmp_auth); + if (self->indirecttcp_sock != -1) + close(self->indirecttcp_sock); } static DeviceStatusFlags @@ -564,8 +580,6 @@ ndmp_device_start( dumpfile_t *header; char *header_buf; - self = NDMP_DEVICE(dself); - if (device_in_error(self)) return FALSE; if (!open_tape_agent(self)) { @@ -580,7 +594,9 @@ ndmp_device_start( } dself->access_mode = mode; + g_mutex_lock(dself->device_mutex); dself->in_file = FALSE; + g_mutex_unlock(dself->device_mutex); if (!single_ndmp_mtio(self, NDMP9_MTIO_REW)) { /* single_ndmp_mtio already set our error message */ @@ -714,6 +730,9 @@ ndmp_device_start_file( dself->is_eof = FALSE; dself->is_eom = FALSE; + g_mutex_lock(dself->device_mutex); + dself->bytes_written = 0; + g_mutex_unlock(dself->device_mutex); /* set the blocksize in the header properly */ header->blocksize = dself->block_size; @@ -751,7 +770,9 @@ ndmp_device_start_file( amfree(header_buf); /* arrange the file numbers correctly */ + g_mutex_lock(dself->device_mutex); dself->in_file = TRUE; + g_mutex_unlock(dself->device_mutex); if (!ndmp_get_state(self)) { /* error already set by ndmp_get_state */ return FALSE; @@ -809,6 +830,9 @@ ndmp_device_write_block( } dself->block++; + g_mutex_lock(dself->device_mutex); + dself->bytes_written += size; + g_mutex_unlock(dself->device_mutex); if (replacement_buffer) g_free(replacement_buffer); return TRUE; @@ -823,7 +847,9 @@ ndmp_device_finish_file( if (device_in_error(dself)) return FALSE; /* we're not in a file anymore */ + g_mutex_lock(dself->device_mutex); dself->in_file = FALSE; + g_mutex_unlock(dself->device_mutex); if (!single_ndmp_mtio(self, NDMP9_MTIO_EOF)) { /* error was set by single_ndmp_mtio */ @@ -911,9 +937,14 @@ incomplete_bsf: } /* fix up status */ + g_mutex_lock(dself->device_mutex); dself->in_file = TRUE; + g_mutex_unlock(dself->device_mutex); dself->file = file; dself->block = 0; + g_mutex_lock(dself->device_mutex); + dself->bytes_read = 0; + g_mutex_unlock(dself->device_mutex); /* now read the header */ read_block_size = ndmp_device_read_size(self); @@ -988,10 +1019,37 @@ ndmp_device_read_block (Device * dself, gpointer data, int *size_req) { } *size_req = (int)actual; /* cast is OK - requested size was < INT_MAX too */ + g_mutex_lock(dself->device_mutex); + dself->bytes_read += actual; + g_mutex_unlock(dself->device_mutex); return *size_req; } +static gboolean +indirecttcp_listen( + NdmpDevice *self, + DirectTCPAddr **addrs) +{ + in_port_t port; + + self->indirecttcp_sock = stream_server(AF_INET, &port, 0, STREAM_BUFSIZE, 0); + if (self->indirecttcp_sock < 0) { + device_set_error(DEVICE(self), + g_strdup_printf("Could not bind indirecttcp socket: %s", strerror(errno)), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + + /* An IndirectTCP address is 255.255.255.255:$port */ + self->listen_addrs = *addrs = g_new0(DirectTCPAddr, 2); + addrs[0]->sin.sin_family = AF_INET; + addrs[0]->sin.sin_addr.s_addr = htonl(0xffffffff); + SU_SET_PORT(addrs[0], port); + + return TRUE; +} + static gboolean listen_impl( Device *dself, @@ -1010,6 +1068,8 @@ listen_impl( return FALSE; } + self->for_writing = for_writing; + /* first, set the window to an empty span so that the mover doesn't start * reading or writing data immediately. NDMJOB tends to reset the record * size periodically (in direct contradiction to the spec), so we reset it @@ -1020,9 +1080,34 @@ listen_impl( return FALSE; } - if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) { - set_error_from_ndmp(self); - return FALSE; + if (for_writing) { + /* if we're forcing indirecttcp, just do it */ + if (self->indirect) { + return indirecttcp_listen(self, addrs); + } + if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) { + /* NDMP4_ILLEGAL_ARGS_ERR means the NDMP server doesn't like a zero-byte + * mover window, so we'll ignore it */ + if (ndmp_connection_err_code(self->ndmp) != NDMP4_ILLEGAL_ARGS_ERR) { + set_error_from_ndmp(self); + return FALSE; + } + + g_debug("NDMP Device: cannot set zero-length mover window; " + "falling back to IndirectTCP"); + /* In this case, we need to set up IndirectTCP */ + return indirecttcp_listen(self, addrs); + } + } else { + /* For reading, set the window to the second mover record, so that the + * mover will pause immediately when it wants to read the first mover + * record. */ + if (!ndmp_connection_mover_set_window(self->ndmp, + dself->block_size, + dself->block_size)) { + set_error_from_ndmp(self); + return FALSE; + } } /* then tell it to start listening */ @@ -1034,57 +1119,99 @@ listen_impl( return FALSE; } self->listen_addrs = *addrs; - self->for_writing = for_writing; return TRUE; } -static gboolean +static gpointer +accept_wait_cond( + gpointer data) +{ + NdmpDevice *self = NDMP_DEVICE(data); + + ndmp9_mover_state state; + guint64 bytes_moved; + + gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */ + g_mutex_lock(self->abort_mutex); + while (1) { + g_mutex_unlock(self->abort_mutex); + if (!ndmp_connection_mover_get_state(self->ndmp, + &state, &bytes_moved, NULL, NULL)) { + g_mutex_lock(self->abort_mutex); + set_error_from_ndmp(self); + state = 0; + break; + } + g_mutex_lock(self->abort_mutex); + + if (state != NDMP9_MOVER_STATE_LISTEN) + break; + + /* back off a little bit to give the other side time to breathe, + * but not more than one second */ + g_mutex_unlock(self->abort_mutex); + g_usleep(backoff); + g_mutex_lock(self->abort_mutex); + if (self->cancel) + break; + backoff *= 2; + if (backoff > G_USEC_PER_SEC) + backoff = G_USEC_PER_SEC; + } + + self->cancel = TRUE; + g_cond_broadcast(self->abort_cond); + g_mutex_unlock(self->abort_mutex); + return GINT_TO_POINTER(state); +} + +static int accept_impl( Device *dself, DirectTCPConnection **dtcpconn, - ProlongProc prolong, - gpointer prolong_data) + gboolean *cancelled, + GMutex *abort_mutex, + GCond *abort_cond) { NdmpDevice *self = NDMP_DEVICE(dself); ndmp9_mover_state state; - guint64 bytes_moved; ndmp9_mover_mode mode; - ndmp9_mover_pause_reason reason; + ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA; + ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA; guint64 seek_position; + GThread *wait_thread; + int result; + char *err; - if (device_in_error(self)) return FALSE; + if (device_in_error(self)) return 1; + + self->abort_mutex = abort_mutex; + self->abort_cond = abort_cond; + self->cancelled = cancelled; + self->cancel = FALSE; g_assert(self->listen_addrs); *dtcpconn = NULL; - /* TODO: support aborting this operation - maybe just always poll? */ - prolong = prolong; - prolong_data = prolong_data; - if (!self->for_writing) { /* when reading, we don't get any kind of notification that the * connection has been established, but we can't call NDMP_MOVER_READ * until the mover is active. So we have to poll, waiting for ACTIVE. * This is ugly. */ - gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */ - while (1) { - if (!ndmp_connection_mover_get_state(self->ndmp, - &state, &bytes_moved, NULL, NULL)) { - set_error_from_ndmp(self); - return FALSE; - } - if (state != NDMP9_MOVER_STATE_LISTEN) - break; + wait_thread = g_thread_create(accept_wait_cond, (gpointer)self, TRUE, + NULL); + while (!*cancelled && !self->cancel) { + g_cond_wait(self->abort_cond, self->abort_mutex); + } + self->cancel = TRUE; + state = GPOINTER_TO_INT(g_thread_join(wait_thread)); - /* back off a little bit to give the other side time to breathe, - * but not more than one second */ - g_usleep(backoff); - backoff *= 2; - if (backoff > G_USEC_PER_SEC) - backoff = G_USEC_PER_SEC; + if (*cancelled) { + result = 2; + goto accept_failed; } /* double-check state */ @@ -1092,7 +1219,8 @@ accept_impl( device_set_error(DEVICE(self), g_strdup("mover did not enter the ACTIVE state as expected"), DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + result = 1; + goto accept_failed; } /* now, we need to get this into the PAUSED state, since right now we @@ -1103,7 +1231,8 @@ accept_impl( * beginning of a part. */ if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) { set_error_from_ndmp(self); - return FALSE; + result = 1; + goto accept_failed; } /* now we should expect a notice that the mover has paused */ @@ -1112,29 +1241,59 @@ accept_impl( * in, so there's no need to do anything to trigger the pause. */ } + if (self->indirecttcp_sock == -1) { + /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to + * write outside the window, while the standard specifies .._EOW, + * instead. When reading to a connection, we get the appropriate + * .._SEEK. It's easy enough to handle both. */ + result = ndmp_connection_wait_for_notify_with_cond(self->ndmp, + NULL, + &mover_halt_reason, + &mover_pause_reason, &seek_position, + cancelled, + abort_mutex, abort_cond); + + if (result == 1) { + set_error_from_ndmp(self); + goto accept_failed; + } else if (result == 2) { + goto accept_failed; + } + + err = NULL; + if (mover_pause_reason) { + switch (mover_pause_reason) { + case NDMP9_MOVER_PAUSE_SEEK: + case NDMP9_MOVER_PAUSE_EOW: + break; + default: + err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"; + break; + } + } else if (mover_halt_reason) { + err = "unexpected NOTIFY_MOVER_HALT"; + } + + if (err) { + device_set_error(DEVICE(self), + g_strdup_printf("waiting NDMP_MOVER_PAUSE_SEEK: %s", err), + DEVICE_STATUS_DEVICE_ERROR); + result = 1; + goto accept_failed; + + } + } + /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to write * outside the window, while the standard specifies .._EOW, instead. When * reading to a connection, we get the appropriate .._SEEK. It's easy * enough to handle both. */ - if (!ndmp_connection_wait_for_notify(self->ndmp, - NULL, - NULL, - &reason, &seek_position)) { - set_error_from_ndmp(self); - return FALSE; - } - - if (reason != NDMP9_MOVER_PAUSE_SEEK && reason != NDMP9_MOVER_PAUSE_EOW) { - device_set_error(DEVICE(self), - g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + if (self->indirecttcp_sock == -1) { + g_free(self->listen_addrs); + self->listen_addrs = NULL; } - g_free(self->listen_addrs); - self->listen_addrs = NULL; - if (self->for_writing) mode = NDMP9_MOVER_MODE_READ; else @@ -1150,35 +1309,42 @@ accept_impl( /* reference it for the caller */ g_object_ref(*dtcpconn); - return TRUE; + return 0; + +accept_failed: + if (self->indirecttcp_sock == -1) { + g_free(self->listen_addrs); + self->listen_addrs = NULL; + } + return result; } -static gboolean + +static int connect_impl( Device *dself, gboolean for_writing, DirectTCPAddr *addrs, DirectTCPConnection **dtcpconn, - ProlongProc prolong, - gpointer prolong_data) + int *cancelled, + GMutex *abort_mutex, + GCond *abort_cond) { NdmpDevice *self = NDMP_DEVICE(dself); ndmp9_mover_mode mode; - ndmp9_mover_pause_reason reason; + ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA; + ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA; guint64 seek_position; + int result; g_assert(!self->listen_addrs); *dtcpconn = NULL; self->for_writing = for_writing; - /* TODO: support aborting this operation - maybe just always poll? */ - prolong = prolong; - prolong_data = prolong_data; - if (!open_tape_agent(self)) { /* error message was set by open_tape_agent */ - return FALSE; + return 1; } /* first, set the window to an empty span so that the mover doesn't start @@ -1188,12 +1354,12 @@ connect_impl( if (!ndmp_connection_mover_set_record_size(self->ndmp, DEVICE(self)->block_size)) { set_error_from_ndmp(self); - return FALSE; + return 1; } if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) { set_error_from_ndmp(self); - return FALSE; + return 1; } if (self->for_writing) @@ -1203,7 +1369,7 @@ connect_impl( if (!ndmp_connection_mover_connect(self->ndmp, mode, addrs)) { set_error_from_ndmp(self); - return FALSE; + return 1; } if (!self->for_writing) { @@ -1213,7 +1379,7 @@ connect_impl( * SEEK pause. */ if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) { set_error_from_ndmp(self); - return FALSE; + return 1; } /* now we should expect a notice that the mover has paused */ @@ -1227,19 +1393,32 @@ connect_impl( * reading to a connection, we get the appropriate .._SEEK. It's easy * enough to handle both. */ - if (!ndmp_connection_wait_for_notify(self->ndmp, - NULL, + result = ndmp_connection_wait_for_notify_with_cond(self->ndmp, NULL, - &reason, &seek_position)) { + &mover_halt_reason, + &mover_pause_reason, &seek_position, + cancelled, + abort_mutex, abort_cond); + + if (result == 1) { set_error_from_ndmp(self); - return FALSE; + return 1; + } else if (result == 2) { + return 2; } - if (reason != NDMP9_MOVER_PAUSE_SEEK && reason != NDMP9_MOVER_PAUSE_EOW) { + if (mover_halt_reason != NDMP9_MOVER_HALT_NA) { + device_set_error(DEVICE(self), + g_strdup_printf("got NDMP9_MOVER_HALT"), + DEVICE_STATUS_DEVICE_ERROR); + return 1; + } + if (mover_pause_reason != NDMP9_MOVER_PAUSE_SEEK && + mover_pause_reason != NDMP9_MOVER_PAUSE_EOW) { device_set_error(DEVICE(self), g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"), DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + return 1; } if (self->listen_addrs) { @@ -1257,26 +1436,107 @@ connect_impl( /* reference it for the caller */ g_object_ref(*dtcpconn); - return TRUE; + return 0; } static gboolean +indirecttcp_start_writing( + NdmpDevice *self) +{ + DirectTCPAddr *real_addrs, *iter; + int conn_sock; + + /* The current state is that the other end is trying to connect to + * indirecttcp_sock. The mover remains IDLE, although its window is set + * correctly for the part we are about to write. */ + + g_debug("indirecttcp_start_writing, ready to accept"); + conn_sock = accept(self->indirecttcp_sock, NULL, NULL); + if (conn_sock < 0) { + device_set_error(DEVICE(self), + g_strdup_printf("Could not accept indirecttcp socket: %s", strerror(errno)), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + g_debug("indirecttcp_start_writing, accepted"); + + close(self->indirecttcp_sock); + self->indirecttcp_sock = -1; + + /* tell mover to start listening */ + g_assert(self->for_writing); + if (!ndmp_connection_mover_listen(self->ndmp, + NDMP4_MOVER_MODE_READ, + NDMP4_ADDR_TCP, + &real_addrs)) { + set_error_from_ndmp(self); + return FALSE; + } + + /* format the addresses and send them down the socket */ + for (iter = real_addrs; iter && SU_GET_FAMILY(iter) != 0; iter++) { + char inet[INET_ADDRSTRLEN]; + const char *addr; + char *addrspec; + + addr = inet_ntop(AF_INET, &iter->sin.sin_addr.s_addr, inet, INET_ADDRSTRLEN); + + addrspec = g_strdup_printf("%s:%d%s", addr, SU_GET_PORT(iter), + SU_GET_FAMILY(iter+1) !=0? " ":""); + g_debug("indirecttcp_start_writing, send %s", addrspec); + if (full_write(conn_sock, addrspec, strlen(addrspec)) < strlen(addrspec)) { + device_set_error(DEVICE(self), + g_strdup_printf("writing to indirecttcp socket: %s", strerror(errno)), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } + + /* close the socket for good. This ensures that the next call to + * write_from_connection_impl will not go through the mover setup process. + * */ + if (close(conn_sock) < 0) { + device_set_error(DEVICE(self), + g_strdup_printf("closing indirecttcp socket: %s", strerror(errno)), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + conn_sock = -1; + + /* and free the listen_addrs, since we didn't free them in accept_impl */ + if (self->listen_addrs) { + g_free(self->listen_addrs); + self->listen_addrs = NULL; + } + + /* Now it's up to the remote end to connect to the mover and start sending + * data. We won't get any notification when this happens, although we could + * in principle poll for such a thing. */ + return TRUE; +} + +static int write_from_connection_impl( - Device *dself, - guint64 size, - guint64 *actual_size) + Device *dself, + guint64 size, + guint64 *actual_size, + int *cancelled, + GMutex *abort_mutex, + GCond *abort_cond) { NdmpDevice *self = NDMP_DEVICE(dself); DirectTCPConnectionNDMP *nconn = self->directtcp_conn; gboolean eom = FALSE, eof = FALSE, eow = FALSE; ndmp9_mover_state mover_state; - ndmp9_mover_halt_reason halt_reason; - ndmp9_mover_pause_reason pause_reason; + ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA; + ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA; guint64 bytes_moved_before, bytes_moved_after; gchar *err; + int result; if (device_in_error(self)) return FALSE; + g_debug("write_from_connection_impl"); if (actual_size) *actual_size = 0; @@ -1288,36 +1548,58 @@ write_from_connection_impl( if (!ndmp_connection_mover_get_state(self->ndmp, &mover_state, &bytes_moved_before, NULL, NULL)) { set_error_from_ndmp(self); - return FALSE; + return 1; } - /* the mover had best be PAUSED right now */ - g_assert(mover_state == NDMP9_MOVER_STATE_PAUSED); + if (self->indirecttcp_sock != -1) { + /* If we're doing IndirectTCP, then we've deferred the whole + * mover_set_window mover_listen process.. until now. + * So the mover should be IDLE. + */ + g_assert(mover_state == NDMP9_MOVER_STATE_IDLE); + } else { + /* the mover had best be PAUSED right now */ + g_assert(mover_state == NDMP9_MOVER_STATE_PAUSED); + } + /* we want to set the window regardless of whether this is directtcp or + * indirecttcp + */ if (!ndmp_connection_mover_set_window(self->ndmp, nconn->offset, size? size : G_MAXUINT64 - nconn->offset)) { set_error_from_ndmp(self); - return FALSE; + return 1; } - if (!ndmp_connection_mover_continue(self->ndmp)) { - set_error_from_ndmp(self); - return FALSE; + /* for DirectTCP, we just tell the mover to continue; IndirectTCP is more complicated. */ + if (self->indirecttcp_sock != -1) { + if (!indirecttcp_start_writing(self)) { + return 1; + } + } else { + if (!ndmp_connection_mover_continue(self->ndmp)) { + set_error_from_ndmp(self); + return 1; + } } /* now wait for the mover to pause itself again, or halt on EOF or an error */ - if (!ndmp_connection_wait_for_notify(self->ndmp, - NULL, - &halt_reason, - &pause_reason, NULL)) { + result = ndmp_connection_wait_for_notify_with_cond(self->ndmp, + NULL, + &mover_halt_reason, + &mover_pause_reason, NULL, + cancelled, abort_mutex, abort_cond); + if (result == 1) { set_error_from_ndmp(self); - return FALSE; + return 1; + } else if (result == 2) { + return 2; } err = NULL; - if (pause_reason) { - switch (pause_reason) { + if (mover_pause_reason) { + switch (mover_pause_reason) { case NDMP9_MOVER_PAUSE_EOM: eom = TRUE; break; @@ -1333,8 +1615,8 @@ write_from_connection_impl( err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"; break; } - } else if (halt_reason) { - switch (halt_reason) { + } else if (mover_halt_reason) { + switch (mover_halt_reason) { case NDMP9_MOVER_HALT_CONNECT_CLOSED: eof = TRUE; break; @@ -1353,7 +1635,7 @@ write_from_connection_impl( device_set_error(DEVICE(self), g_strdup_printf("waiting for accept: %s", err), DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + return 1; } /* no error, so the mover stopped due to one of EOM (volume out of space), @@ -1363,7 +1645,7 @@ write_from_connection_impl( if (!ndmp_connection_mover_get_state(self->ndmp, &mover_state, &bytes_moved_after, NULL, NULL)) { set_error_from_ndmp(self); - return FALSE; + return 1; } size = bytes_moved_after - bytes_moved_before; nconn->offset += size; @@ -1381,31 +1663,39 @@ write_from_connection_impl( * we do need to figure out the actual size */ DEVICE(self)->is_eom = TRUE; } else { + g_assert_not_reached(); error("not reached"); } - return TRUE; + return 0; } -static gboolean +static int read_to_connection_impl( Device *dself, guint64 size, - guint64 *actual_size) + guint64 *actual_size, + int *cancelled, + GMutex *abort_mutex, + GCond *abort_cond) { NdmpDevice *self = NDMP_DEVICE(dself); DirectTCPConnectionNDMP *nconn = self->directtcp_conn; gboolean eom = FALSE, eof = FALSE, eow = FALSE; ndmp9_mover_state mover_state; - ndmp9_mover_halt_reason halt_reason; - ndmp9_mover_pause_reason pause_reason; + ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA; + ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA; guint64 bytes_moved_before, bytes_moved_after; gchar *err; + int result; if (actual_size) *actual_size = 0; - if (device_in_error(self)) return FALSE; + if (device_in_error(self)) return 1; + + /* read_to_connection does not support IndirectTCP */ + g_assert(self->indirecttcp_sock == -1); /* if this is false, then the caller did not use use_connection correctly */ g_assert(nconn != NULL); @@ -1415,7 +1705,7 @@ read_to_connection_impl( if (!ndmp_connection_mover_get_state(self->ndmp, &mover_state, &bytes_moved_before, NULL, NULL)) { set_error_from_ndmp(self); - return FALSE; + return 1; } /* the mover had best be PAUSED right now */ @@ -1425,26 +1715,30 @@ read_to_connection_impl( nconn->offset, size? size : G_MAXUINT64 - nconn->offset)) { set_error_from_ndmp(self); - return FALSE; + return 1; } if (!ndmp_connection_mover_continue(self->ndmp)) { set_error_from_ndmp(self); - return FALSE; + return 1; } /* now wait for the mover to pause itself again, or halt on EOF or an error */ - if (!ndmp_connection_wait_for_notify(self->ndmp, - NULL, - &halt_reason, - &pause_reason, NULL)) { + result = ndmp_connection_wait_for_notify_with_cond(self->ndmp, + NULL, + &mover_halt_reason, + &mover_pause_reason, NULL, + cancelled, abort_mutex, abort_cond); + if (result == 1) { set_error_from_ndmp(self); - return FALSE; + return 1; + } else if (result == 2) { + return 2; } err = NULL; - if (pause_reason) { - switch (pause_reason) { + if (mover_pause_reason) { + switch (mover_pause_reason) { case NDMP9_MOVER_PAUSE_EOF: eof = TRUE; break; @@ -1460,8 +1754,8 @@ read_to_connection_impl( err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"; break; } - } else if (halt_reason) { - switch (halt_reason) { + } else if (mover_halt_reason) { + switch (mover_halt_reason) { case NDMP9_MOVER_HALT_CONNECT_CLOSED: eof = TRUE; break; @@ -1480,7 +1774,7 @@ read_to_connection_impl( device_set_error(DEVICE(self), g_strdup_printf("waiting for accept: %s", err), DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + return 1; } /* no error, so the mover stopped due to one of EOM (volume out of space), @@ -1490,7 +1784,7 @@ read_to_connection_impl( if (!ndmp_connection_mover_get_state(self->ndmp, &mover_state, &bytes_moved_after, NULL, NULL)) { set_error_from_ndmp(self); - return FALSE; + return 1; } size = bytes_moved_after - bytes_moved_before; nconn->offset += size; @@ -1508,10 +1802,11 @@ read_to_connection_impl( * we do need to figure out the actual size */ DEVICE(self)->is_eom = TRUE; } else { + g_assert_not_reached(); error("not reached"); } - return TRUE; + return 0; } static gboolean @@ -1628,8 +1923,12 @@ ndmp_device_set_read_block_size_fn(Device *p_self, DevicePropertyBase *base G_GN if (read_block_size != 0 && ((gsize)read_block_size < p_self->block_size || - (gsize)read_block_size > p_self->max_block_size)) + (gsize)read_block_size > p_self->max_block_size)) { + device_set_error(p_self, + g_strdup_printf("Error setting READ-BLOCk-SIZE property to '%zu', it must be between %zu and %zu", read_block_size, p_self->block_size, p_self->max_block_size), + DEVICE_STATUS_DEVICE_ERROR); return FALSE; + } self->read_block_size = read_block_size; @@ -1638,6 +1937,18 @@ ndmp_device_set_read_block_size_fn(Device *p_self, DevicePropertyBase *base G_GN val, surety, source); } +static gboolean +ndmp_device_set_indirect_fn(Device *dself, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + NdmpDevice *self = NDMP_DEVICE(dself); + + self->indirect = g_value_get_boolean(val); + + return device_simple_property_set_fn(dself, base, val, surety, source); +} + static void ndmp_device_class_init(NdmpDeviceClass * c G_GNUC_UNUSED) { @@ -1662,8 +1973,8 @@ ndmp_device_class_init(NdmpDeviceClass * c G_GNUC_UNUSED) device_class->directtcp_supported = TRUE; device_class->listen = listen_impl; - device_class->accept = accept_impl; - device_class->connect = connect_impl; + device_class->accept= accept_impl; + device_class->connect= connect_impl; device_class->write_from_connection = write_from_connection_impl; device_class->read_to_connection = read_to_connection_impl; device_class->use_connection = use_connection_impl; @@ -1689,6 +2000,12 @@ ndmp_device_class_init(NdmpDeviceClass * c G_GNUC_UNUSED) PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK, device_simple_property_get_fn, ndmp_device_set_verbose_fn); + + device_class_register_property(device_class, PROPERTY_INDIRECT, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK, + device_simple_property_get_fn, + ndmp_device_set_indirect_fn); + device_class_register_property(device_class, PROPERTY_READ_BLOCK_SIZE, PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, device_simple_property_get_fn, @@ -1781,6 +2098,14 @@ ndmp_device_init(NdmpDevice *self) g_value_unset(&response); self->ndmp_auth = g_strdup("md5"); + g_value_init(&response, G_TYPE_BOOLEAN); + g_value_set_boolean(&response, FALSE); + device_set_simple_property(dself, PROPERTY_INDIRECT, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT); + g_value_unset(&response); + self->indirect = FALSE; + + self->indirecttcp_sock = -1; } static GType @@ -1816,10 +2141,8 @@ ndmp_device_factory( char *device_node) { Device *rval; - NdmpDevice * ndmp_rval; g_assert(0 == strcmp(device_type, NDMP_DEVICE_NAME)); rval = DEVICE(g_object_new(TYPE_NDMP_DEVICE, NULL)); - ndmp_rval = (NdmpDevice *)rval; device_open_device(rval, device_name, device_type, device_node); return rval; @@ -1842,6 +2165,10 @@ ndmp_device_register(void) device_property_fill_and_register(&device_property_ndmp_auth, G_TYPE_STRING, "ndmp_auth", "Authentication method for the NDMP agent - md5 (default), text, none, or void"); + device_property_fill_and_register(&device_property_indirect, + G_TYPE_BOOLEAN, "indirect", + "Use Indirect TCP mode, even if the NDMP server supports " + "window length 0"); } /* @@ -1855,7 +2182,7 @@ directtcp_connection_ndmp_close(DirectTCPConnection *dself) char *rv = NULL; ndmp9_mover_state state; guint64 bytes_moved; - ndmp9_mover_halt_reason reason; + ndmp9_mover_halt_reason mover_halt_reason; gboolean expect_notif = FALSE; /* based on the current state, we may need to abort or stop the @@ -1893,7 +2220,7 @@ directtcp_connection_ndmp_close(DirectTCPConnection *dself) if (expect_notif) { if (!ndmp_connection_wait_for_notify(self->ndmp, NULL, - &reason, /* value is ignored.. */ + &mover_halt_reason, /* value is ignored.. */ NULL, NULL)) { goto error; }