/*
- * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
*/
#include "amanda.h"
+#include "event.h"
+#include "sockaddr-util.h"
#include "ndmpconnobj.h"
/*
*addrs = g_new0(DirectTCPAddr, naddrs+1);
for (i = 0; i < naddrs; i++) {
ndmp4_tcp_addr *na = &reply->connect_addr.ndmp4_addr_u.tcp_addr.tcp_addr_val[i];
- (*addrs)[i].ipv4 = na->ip_addr;
- (*addrs)[i].port = na->port;
+ (*addrs)[i].sin.sin_family = AF_INET;
+ (*addrs)[i].sin.sin_addr.s_addr = htonl(na->ip_addr);
+ SU_SET_PORT(addrs[i], na->port);
}
}
NDMP_FREE();
return TRUE;
}
+ndmp_connection_mover_connect(
+ NDMPConnection *self,
+ ndmp9_mover_mode mode,
+ DirectTCPAddr *addrs)
+{
+ unsigned int naddrs, i;
+ ndmp4_tcp_addr *na;
+
+ g_assert(!self->startup_err);
+
+ /* count addrs */
+ g_assert(addrs);
+ for (naddrs = 0; SU_GET_FAMILY(&addrs[naddrs]) != 0; naddrs++) ;
+
+ /* convert addrs to an ndmp4_tcp_addr */
+ na = g_new0(ndmp4_tcp_addr, naddrs);
+ for (i = 0; i < naddrs; i++) {
+ na[i].ip_addr = ntohl(addrs[i].sin.sin_addr.s_addr);
+ na[i].port = SU_GET_PORT(&addrs[i]);
+ }
+
+
+ NDMP_TRANS(self, ndmp4_mover_connect)
+ request->mode = mode;
+ request->addr.addr_type = NDMP4_ADDR_TCP;
+ request->addr.ndmp4_addr_u.tcp_addr.tcp_addr_len = naddrs;
+ request->addr.ndmp4_addr_u.tcp_addr.tcp_addr_val = na;
+ NDMP_CALL(self);
+ NDMP_FREE();
+ NDMP_END
+ return TRUE;
+}
+
gboolean
ndmp_connection_mover_abort(
NDMPConnection *self)
while (1) {
gboolean found = FALSE;
+ int fd;
+ SELECT_ARG_TYPE readset;
+ int nfound;
/* if any desired notifications have been received, then we're
* done */
return TRUE;
/* otherwise, wait for an incoming packet and handle it, then try
- * again */
+ * again. There's some select trickery here to avoid hogging the
+ * ndmlib_mutex - basically, we want to block as long as possible
+ * outside of the ndmlib_mutex critical section. This will also be
+ * useful to allow the wait to be aborted. */
+ fd = self->conn->chan.fd;
+ FD_ZERO(&readset);
+ FD_SET(fd, &readset);
+ nfound = select(fd+1, &readset, NULL, NULL, NULL);
+
+ /* fall on through, blind to any errors - presumably the same error
+ * condition will be caught by ndmconn_recv_nmb. */
+
g_static_mutex_lock(&ndmlib_mutex);
NDMOS_MACRO_ZEROFILL(&nmb);
nmb.protocol_version = NDMP4VER;
}
}
+typedef struct notify_data_s {
+ NDMPConnection *self;
+ ndmp9_data_halt_reason *data_halt_reason;
+ ndmp9_mover_halt_reason *mover_halt_reason;
+ ndmp9_mover_pause_reason *mover_pause_reason;
+ guint64 *mover_pause_seek_position;
+ GMutex *abort_mutex;
+ GCond *abort_cond;
+ int status;
+ int in_use;
+ event_handle_t *read_event;
+} notify_data_t;
+
+static void handle_notify(void *cookie);
+
+static GStaticMutex notify_mutex = G_STATIC_MUTEX_INIT;
+static notify_data_t **notify_data = NULL;
+static int nb_notify_data = 0;
+int
+ndmp_connection_wait_for_notify_with_cond(
+ NDMPConnection *self,
+ ndmp9_data_halt_reason *data_halt_reason,
+ ndmp9_mover_halt_reason *mover_halt_reason,
+ ndmp9_mover_pause_reason *mover_pause_reason,
+ guint64 *mover_pause_seek_position,
+ int *cancelled,
+ GMutex *abort_mutex,
+ GCond *abort_cond)
+{
+ struct ndmp_msg_buf nmb;
+ notify_data_t *ndata;
+ gboolean found = FALSE;
+ int status;
+ int i;
+
+ g_static_mutex_lock(¬ify_mutex);
+ if (notify_data == NULL) {
+ glib_init();
+ nb_notify_data = 10;
+ notify_data = g_new0(notify_data_t *, nb_notify_data);
+ for (i=0;i<nb_notify_data;i++) {
+ notify_data[i] = g_new0(notify_data_t, 1);
+ }
+ }
+ /* find a not used notify_data */
+ ndata = *notify_data;
+ i = 0;
+ while (i< nb_notify_data && notify_data[i]->in_use > 0) {
+ i++;
+ }
+ if (i == nb_notify_data) {
+ int new_nb_notify_data = nb_notify_data * 2;
+ int j;
+ notify_data = g_realloc(notify_data,
+ sizeof(notify_data_t *) * new_nb_notify_data);
+ for (j=nb_notify_data; j<new_nb_notify_data; j++) {
+ notify_data[j] = g_new0(notify_data_t, 1);
+ }
+ nb_notify_data = new_nb_notify_data;
+ ndata = notify_data[i];
+ }
+ ndata = notify_data[i];
+ ndata->self = self;
+ ndata->data_halt_reason= data_halt_reason;
+ ndata->mover_halt_reason= mover_halt_reason;
+ ndata->mover_pause_reason= mover_pause_reason;
+ ndata->mover_pause_seek_position = mover_pause_seek_position;
+ ndata->abort_mutex = abort_mutex;
+ ndata->abort_cond = abort_cond;
+ ndata->status = 2;
+ ndata->in_use = 1;
+ g_static_mutex_unlock(¬ify_mutex);
+
+ g_assert(!self->startup_err);
+
+ /* initialize output parameters */
+ if (data_halt_reason)
+ *data_halt_reason = NDMP4_DATA_HALT_NA;
+ if (mover_halt_reason)
+ *mover_halt_reason = NDMP4_MOVER_HALT_NA;
+ if (mover_pause_reason)
+ *mover_pause_reason = NDMP4_MOVER_PAUSE_NA;
+ if (mover_pause_seek_position)
+ *mover_pause_seek_position = 0;
+
+ /* if any desired notifications have been received, then we're
+ * done */
+ if (data_halt_reason && self->data_halt_reason) {
+ found = TRUE;
+ *data_halt_reason = self->data_halt_reason;
+ self->data_halt_reason = NDMP4_DATA_HALT_NA;
+ }
+
+ if (mover_halt_reason && self->mover_halt_reason) {
+ found = TRUE;
+ *mover_halt_reason = self->mover_halt_reason;
+ self->mover_halt_reason = NDMP4_MOVER_HALT_NA;
+ }
+
+ if (mover_pause_reason && self->mover_pause_reason) {
+ found = TRUE;
+ *mover_pause_reason = self->mover_pause_reason;
+ if (mover_pause_seek_position)
+ *mover_pause_seek_position = self->mover_pause_seek_position;
+ self->mover_pause_reason = NDMP4_MOVER_PAUSE_NA;
+ self->mover_pause_seek_position = 0;
+ }
+
+ if (found)
+ return TRUE;
+
+ /* otherwise, wait for an incoming packet and handle it, then try
+ * again. There's some select trickery here to avoid hogging the
+ * ndmlib_mutex - basically, we want to block as long as possible
+ * outside of the ndmlib_mutex critical section. This will also be
+ * useful to allow the wait to be aborted. */
+
+ /* handle_notify can be executed before the register exit */
+ ndata->read_event = event_create(self->conn->chan.fd,
+ EV_READFD, handle_notify, ndata);
+ event_activate(ndata->read_event);
+
+ while (!*cancelled && ndata->status == 2) {
+ g_cond_wait(abort_cond, abort_mutex);
+ }
+ g_static_mutex_lock(¬ify_mutex);
+
+ if (ndata->read_event) {
+ event_release(ndata->read_event);
+ ndata->read_event = NULL;
+ }
+ if (ndata->status == 2) {
+ ndmp_connection_mover_abort(self);
+ ndmp_connection_mover_stop(self);
+ }
+ status = ndata->status;
+ ndata->in_use++;
+ if (ndata->in_use == 3)
+ ndata->in_use = 0;
+ g_static_mutex_unlock(¬ify_mutex);
+ return status;
+
+}
+
+static void
+handle_notify(void *cookie)
+{
+ notify_data_t *ndata = cookie;
+ struct ndmp_msg_buf nmb;
+ gboolean found = FALSE;
+ GCond *abort_cond = ndata->abort_cond;
+ GMutex *abort_mutex = ndata->abort_mutex;
+
+ g_mutex_lock(abort_mutex);
+
+ g_static_mutex_lock(&ndmlib_mutex);
+ NDMOS_MACRO_ZEROFILL(&nmb);
+ nmb.protocol_version = NDMP4VER;
+ ndata->self->last_rc = ndmconn_recv_nmb(ndata->self->conn, &nmb);
+ g_static_mutex_unlock(&ndmlib_mutex);
+
+ if (ndata->self->last_rc) {
+ /* (nothing to free) */
+ ndata->status = 1;
+ goto notify_done;
+ }
+
+ ndmconn_handle_notify(ndata->self, &nmb);
+
+
+ /* if any desired notifications have been received, then we're
+ * done */
+ if (ndata->data_halt_reason && ndata->self->data_halt_reason) {
+ found = TRUE;
+ *ndata->data_halt_reason = ndata->self->data_halt_reason;
+ ndata->self->data_halt_reason = NDMP4_DATA_HALT_NA;
+ }
+
+ if (ndata->mover_halt_reason && ndata->self->mover_halt_reason) {
+ found = TRUE;
+ *ndata->mover_halt_reason = ndata->self->mover_halt_reason;
+ ndata->self->mover_halt_reason = NDMP4_MOVER_HALT_NA;
+ }
+
+ if (ndata->mover_pause_reason && ndata->self->mover_pause_reason) {
+ found = TRUE;
+ *ndata->mover_pause_reason = ndata->self->mover_pause_reason;
+ if (ndata->mover_pause_seek_position)
+ *ndata->mover_pause_seek_position = ndata->self->mover_pause_seek_position;
+ ndata->self->mover_pause_reason = NDMP4_MOVER_PAUSE_NA;
+ ndata->self->mover_pause_seek_position = 0;
+ }
+
+ if (!found) {
+ g_static_mutex_lock(¬ify_mutex);
+ if (ndata->in_use == 2) {
+ goto notify_done_locked;
+ }
+ g_static_mutex_unlock(¬ify_mutex);
+
+ g_mutex_unlock(abort_mutex);
+ return;
+ }
+
+ ndata->status = 0;
+notify_done:
+ g_static_mutex_lock(¬ify_mutex);
+notify_done_locked:
+ if (ndata->read_event) {
+ event_release(ndata->read_event);
+ ndata->read_event = NULL;
+ }
+ ndata->in_use++;
+ if (ndata->in_use == 3)
+ ndata->in_use = 0;
+ g_static_mutex_unlock(¬ify_mutex);
+
+ g_cond_broadcast(abort_cond);
+ g_mutex_unlock(abort_mutex);
+}
/*
* Class Mechanics
*/