lintian doesn't like orphan packages with uploaders...
[debian/amanda] / ndmp-src / ndmpconnobj.c
index 9f9a45274a615829b9aeb0fb5f92e54350ab3f0a..ef4de85ffeec14272c307217d8fd517999b13ab7 100644 (file)
@@ -1,9 +1,10 @@
 /*
- * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
  *
  * This program is distributed in the hope that it will be useful, but
  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -19,6 +20,8 @@
  */
 
 #include "amanda.h"
+#include "event.h"
+#include "sockaddr-util.h"
 #include "ndmpconnobj.h"
 
 /*
@@ -480,8 +483,9 @@ ndmp_connection_mover_listen(
            *addrs = g_new0(DirectTCPAddr, naddrs+1);
            for (i = 0; i < naddrs; i++) {
                ndmp4_tcp_addr *na = &reply->connect_addr.ndmp4_addr_u.tcp_addr.tcp_addr_val[i];
-               (*addrs)[i].ipv4 = na->ip_addr;
-               (*addrs)[i].port = na->port;
+               (*addrs)[i].sin.sin_family = AF_INET;
+               (*addrs)[i].sin.sin_addr.s_addr = htonl(na->ip_addr);
+               SU_SET_PORT(addrs[i], na->port);
            }
        }
        NDMP_FREE();
@@ -501,13 +505,13 @@ ndmp_connection_mover_connect(
 
     /* count addrs */
     g_assert(addrs);
-    for (naddrs = 0; addrs[naddrs].ipv4; naddrs++) ;
+    for (naddrs = 0; SU_GET_FAMILY(&addrs[naddrs]) != 0; naddrs++) ;
 
     /* convert addrs to an ndmp4_tcp_addr */
     na = g_new0(ndmp4_tcp_addr, naddrs);
     for (i = 0; i < naddrs; i++) {
-       na[i].ip_addr = addrs[i].ipv4;
-       na[i].port = addrs[i].port;
+       na[i].ip_addr = ntohl(addrs[i].sin.sin_addr.s_addr);
+       na[i].port = SU_GET_PORT(&addrs[i]);
     }
 
 
@@ -734,6 +738,226 @@ ndmp_connection_wait_for_notify(
     }
 }
 
+typedef struct notify_data_s {
+    NDMPConnection *self;
+    ndmp9_data_halt_reason *data_halt_reason;
+    ndmp9_mover_halt_reason *mover_halt_reason;
+    ndmp9_mover_pause_reason *mover_pause_reason;
+    guint64 *mover_pause_seek_position;
+    GMutex *abort_mutex;
+    GCond *abort_cond;
+    int status;
+    int in_use;
+    event_handle_t *read_event;
+} notify_data_t;
+
+static void handle_notify(void *cookie);
+
+static GStaticMutex notify_mutex = G_STATIC_MUTEX_INIT;
+static notify_data_t **notify_data = NULL;
+static int nb_notify_data = 0;
+int
+ndmp_connection_wait_for_notify_with_cond(
+       NDMPConnection *self,
+       ndmp9_data_halt_reason *data_halt_reason,
+       ndmp9_mover_halt_reason *mover_halt_reason,
+       ndmp9_mover_pause_reason *mover_pause_reason,
+       guint64 *mover_pause_seek_position,
+       int *cancelled,
+       GMutex *abort_mutex,
+       GCond *abort_cond)
+{
+    struct ndmp_msg_buf nmb;
+    notify_data_t *ndata;
+    gboolean found = FALSE;
+    int status;
+    int i;
+
+    g_static_mutex_lock(&notify_mutex);
+    if (notify_data == NULL) {
+       glib_init();
+        nb_notify_data = 10;
+       notify_data = g_new0(notify_data_t *, nb_notify_data);
+       for (i=0;i<nb_notify_data;i++) {
+           notify_data[i] = g_new0(notify_data_t, 1);
+       }
+    }
+    /* find a not used notify_data */
+    ndata = *notify_data;
+    i = 0;
+    while (i< nb_notify_data && notify_data[i]->in_use > 0) {
+       i++;
+    }
+    if (i == nb_notify_data) {
+        int new_nb_notify_data = nb_notify_data * 2;
+       int j;
+       notify_data = g_realloc(notify_data,
+                               sizeof(notify_data_t *) * new_nb_notify_data);
+       for (j=nb_notify_data; j<new_nb_notify_data; j++) {
+           notify_data[j] = g_new0(notify_data_t, 1);
+       }
+       nb_notify_data = new_nb_notify_data;
+       ndata = notify_data[i];
+    }
+    ndata = notify_data[i];
+    ndata->self = self;
+    ndata->data_halt_reason= data_halt_reason;
+    ndata->mover_halt_reason= mover_halt_reason;
+    ndata->mover_pause_reason= mover_pause_reason;
+    ndata->mover_pause_seek_position = mover_pause_seek_position;
+    ndata->abort_mutex = abort_mutex;
+    ndata->abort_cond = abort_cond;
+    ndata->status = 2;
+    ndata->in_use = 1;
+    g_static_mutex_unlock(&notify_mutex);
+
+    g_assert(!self->startup_err);
+
+    /* initialize output parameters */
+    if (data_halt_reason)
+       *data_halt_reason = NDMP4_DATA_HALT_NA;
+    if (mover_halt_reason)
+       *mover_halt_reason = NDMP4_MOVER_HALT_NA;
+    if (mover_pause_reason)
+       *mover_pause_reason = NDMP4_MOVER_PAUSE_NA;
+    if (mover_pause_seek_position)
+       *mover_pause_seek_position = 0;
+
+    /* if any desired notifications have been received, then we're
+     * done */
+    if (data_halt_reason && self->data_halt_reason) {
+       found = TRUE;
+       *data_halt_reason = self->data_halt_reason;
+       self->data_halt_reason = NDMP4_DATA_HALT_NA;
+    }
+
+    if (mover_halt_reason && self->mover_halt_reason) {
+       found = TRUE;
+       *mover_halt_reason = self->mover_halt_reason;
+       self->mover_halt_reason = NDMP4_MOVER_HALT_NA;
+    }
+
+    if (mover_pause_reason && self->mover_pause_reason) {
+       found = TRUE;
+       *mover_pause_reason = self->mover_pause_reason;
+       if (mover_pause_seek_position)
+           *mover_pause_seek_position = self->mover_pause_seek_position;
+       self->mover_pause_reason = NDMP4_MOVER_PAUSE_NA;
+       self->mover_pause_seek_position = 0;
+    }
+
+    if (found)
+       return TRUE;
+
+       /* otherwise, wait for an incoming packet and handle it, then try
+        * again.  There's some select trickery here to avoid hogging the
+        * ndmlib_mutex - basically, we want to block as long as possible
+        * outside of the ndmlib_mutex critical section.  This will also be
+        * useful to allow the wait to be aborted. */
+
+        /* handle_notify can be executed before the register exit */
+    ndata->read_event = event_create(self->conn->chan.fd,
+                                    EV_READFD, handle_notify, ndata);
+    event_activate(ndata->read_event);
+
+    while (!*cancelled && ndata->status == 2) {
+       g_cond_wait(abort_cond, abort_mutex);
+    }
+    g_static_mutex_lock(&notify_mutex);
+
+    if (ndata->read_event) {
+       event_release(ndata->read_event);
+       ndata->read_event = NULL;
+    }
+    if (ndata->status == 2) {
+       ndmp_connection_mover_abort(self);
+       ndmp_connection_mover_stop(self);
+    }
+    status = ndata->status;
+    ndata->in_use++;
+    if (ndata->in_use == 3)
+       ndata->in_use = 0;
+    g_static_mutex_unlock(&notify_mutex);
+    return status;
+
+}
+
+static void
+handle_notify(void *cookie)
+{
+    notify_data_t *ndata = cookie;
+    struct ndmp_msg_buf nmb;
+    gboolean found = FALSE;
+    GCond  *abort_cond = ndata->abort_cond;
+    GMutex *abort_mutex = ndata->abort_mutex;
+
+    g_mutex_lock(abort_mutex);
+
+    g_static_mutex_lock(&ndmlib_mutex);
+    NDMOS_MACRO_ZEROFILL(&nmb);
+    nmb.protocol_version = NDMP4VER;
+    ndata->self->last_rc = ndmconn_recv_nmb(ndata->self->conn, &nmb);
+    g_static_mutex_unlock(&ndmlib_mutex);
+
+    if (ndata->self->last_rc) {
+       /* (nothing to free) */
+       ndata->status = 1;
+       goto notify_done;
+    }
+
+    ndmconn_handle_notify(ndata->self, &nmb);
+
+
+    /* if any desired notifications have been received, then we're
+     * done */
+    if (ndata->data_halt_reason && ndata->self->data_halt_reason) {
+       found = TRUE;
+       *ndata->data_halt_reason = ndata->self->data_halt_reason;
+       ndata->self->data_halt_reason = NDMP4_DATA_HALT_NA;
+    }
+
+    if (ndata->mover_halt_reason && ndata->self->mover_halt_reason) {
+       found = TRUE;
+       *ndata->mover_halt_reason = ndata->self->mover_halt_reason;
+       ndata->self->mover_halt_reason = NDMP4_MOVER_HALT_NA;
+    }
+
+    if (ndata->mover_pause_reason && ndata->self->mover_pause_reason) {
+       found = TRUE;
+       *ndata->mover_pause_reason = ndata->self->mover_pause_reason;
+       if (ndata->mover_pause_seek_position)
+           *ndata->mover_pause_seek_position = ndata->self->mover_pause_seek_position;
+       ndata->self->mover_pause_reason = NDMP4_MOVER_PAUSE_NA;
+       ndata->self->mover_pause_seek_position = 0;
+    }
+
+    if (!found) {
+        g_static_mutex_lock(&notify_mutex);
+        if (ndata->in_use == 2) {
+            goto notify_done_locked;
+       }
+        g_static_mutex_unlock(&notify_mutex);
+
+       g_mutex_unlock(abort_mutex);
+       return;
+    }
+
+    ndata->status = 0;
+notify_done:
+    g_static_mutex_lock(&notify_mutex);
+notify_done_locked:
+    if (ndata->read_event) {
+        event_release(ndata->read_event);
+        ndata->read_event = NULL;
+    }
+    ndata->in_use++;
+    if (ndata->in_use == 3)
+        ndata->in_use = 0;
+    g_static_mutex_unlock(&notify_mutex);
+
+    g_cond_broadcast(abort_cond);
+    g_mutex_unlock(abort_mutex);
+}
 /*
  * Class Mechanics
  */