X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=ndmp-src%2Fndmpconnobj.c;h=ef4de85ffeec14272c307217d8fd517999b13ab7;hb=HEAD;hp=d5a5bf2bb3b7677bfb4558e4be7644727578c187;hpb=fd48f3e498442f0cbff5f3606c7c403d0566150e;p=debian%2Famanda diff --git a/ndmp-src/ndmpconnobj.c b/ndmp-src/ndmpconnobj.c index d5a5bf2..ef4de85 100644 --- a/ndmp-src/ndmpconnobj.c +++ b/ndmp-src/ndmpconnobj.c @@ -1,9 +1,10 @@ /* - * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published - * by the Free Software Foundation. + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY @@ -19,6 +20,8 @@ */ #include "amanda.h" +#include "event.h" +#include "sockaddr-util.h" #include "ndmpconnobj.h" /* @@ -480,8 +483,9 @@ ndmp_connection_mover_listen( *addrs = g_new0(DirectTCPAddr, naddrs+1); for (i = 0; i < naddrs; i++) { ndmp4_tcp_addr *na = &reply->connect_addr.ndmp4_addr_u.tcp_addr.tcp_addr_val[i]; - (*addrs)[i].ipv4 = na->ip_addr; - (*addrs)[i].port = na->port; + (*addrs)[i].sin.sin_family = AF_INET; + (*addrs)[i].sin.sin_addr.s_addr = htonl(na->ip_addr); + SU_SET_PORT(addrs[i], na->port); } } NDMP_FREE(); @@ -489,6 +493,39 @@ ndmp_connection_mover_listen( return TRUE; } +ndmp_connection_mover_connect( + NDMPConnection *self, + ndmp9_mover_mode mode, + DirectTCPAddr *addrs) +{ + unsigned int naddrs, i; + ndmp4_tcp_addr *na; + + g_assert(!self->startup_err); + + /* count addrs */ + g_assert(addrs); + for (naddrs = 0; SU_GET_FAMILY(&addrs[naddrs]) != 0; naddrs++) ; + + /* convert addrs to an ndmp4_tcp_addr */ + na = g_new0(ndmp4_tcp_addr, naddrs); + for (i = 0; i < naddrs; i++) { + na[i].ip_addr = ntohl(addrs[i].sin.sin_addr.s_addr); + na[i].port = SU_GET_PORT(&addrs[i]); + } + + + NDMP_TRANS(self, ndmp4_mover_connect) + request->mode = mode; + request->addr.addr_type = NDMP4_ADDR_TCP; + request->addr.ndmp4_addr_u.tcp_addr.tcp_addr_len = naddrs; + request->addr.ndmp4_addr_u.tcp_addr.tcp_addr_val = na; + NDMP_CALL(self); + NDMP_FREE(); + NDMP_END + return TRUE; +} + gboolean ndmp_connection_mover_abort( NDMPConnection *self) @@ -643,6 +680,9 @@ ndmp_connection_wait_for_notify( while (1) { gboolean found = FALSE; + int fd; + SELECT_ARG_TYPE readset; + int nfound; /* if any desired notifications have been received, then we're * done */ @@ -671,7 +711,18 @@ ndmp_connection_wait_for_notify( return TRUE; /* otherwise, wait for an incoming packet and handle it, then try - * again */ + * again. There's some select trickery here to avoid hogging the + * ndmlib_mutex - basically, we want to block as long as possible + * outside of the ndmlib_mutex critical section. This will also be + * useful to allow the wait to be aborted. */ + fd = self->conn->chan.fd; + FD_ZERO(&readset); + FD_SET(fd, &readset); + nfound = select(fd+1, &readset, NULL, NULL, NULL); + + /* fall on through, blind to any errors - presumably the same error + * condition will be caught by ndmconn_recv_nmb. */ + g_static_mutex_lock(&ndmlib_mutex); NDMOS_MACRO_ZEROFILL(&nmb); nmb.protocol_version = NDMP4VER; @@ -687,6 +738,226 @@ ndmp_connection_wait_for_notify( } } +typedef struct notify_data_s { + NDMPConnection *self; + ndmp9_data_halt_reason *data_halt_reason; + ndmp9_mover_halt_reason *mover_halt_reason; + ndmp9_mover_pause_reason *mover_pause_reason; + guint64 *mover_pause_seek_position; + GMutex *abort_mutex; + GCond *abort_cond; + int status; + int in_use; + event_handle_t *read_event; +} notify_data_t; + +static void handle_notify(void *cookie); + +static GStaticMutex notify_mutex = G_STATIC_MUTEX_INIT; +static notify_data_t **notify_data = NULL; +static int nb_notify_data = 0; +int +ndmp_connection_wait_for_notify_with_cond( + NDMPConnection *self, + ndmp9_data_halt_reason *data_halt_reason, + ndmp9_mover_halt_reason *mover_halt_reason, + ndmp9_mover_pause_reason *mover_pause_reason, + guint64 *mover_pause_seek_position, + int *cancelled, + GMutex *abort_mutex, + GCond *abort_cond) +{ + struct ndmp_msg_buf nmb; + notify_data_t *ndata; + gboolean found = FALSE; + int status; + int i; + + g_static_mutex_lock(¬ify_mutex); + if (notify_data == NULL) { + glib_init(); + nb_notify_data = 10; + notify_data = g_new0(notify_data_t *, nb_notify_data); + for (i=0;iin_use > 0) { + i++; + } + if (i == nb_notify_data) { + int new_nb_notify_data = nb_notify_data * 2; + int j; + notify_data = g_realloc(notify_data, + sizeof(notify_data_t *) * new_nb_notify_data); + for (j=nb_notify_data; jself = self; + ndata->data_halt_reason= data_halt_reason; + ndata->mover_halt_reason= mover_halt_reason; + ndata->mover_pause_reason= mover_pause_reason; + ndata->mover_pause_seek_position = mover_pause_seek_position; + ndata->abort_mutex = abort_mutex; + ndata->abort_cond = abort_cond; + ndata->status = 2; + ndata->in_use = 1; + g_static_mutex_unlock(¬ify_mutex); + + g_assert(!self->startup_err); + + /* initialize output parameters */ + if (data_halt_reason) + *data_halt_reason = NDMP4_DATA_HALT_NA; + if (mover_halt_reason) + *mover_halt_reason = NDMP4_MOVER_HALT_NA; + if (mover_pause_reason) + *mover_pause_reason = NDMP4_MOVER_PAUSE_NA; + if (mover_pause_seek_position) + *mover_pause_seek_position = 0; + + /* if any desired notifications have been received, then we're + * done */ + if (data_halt_reason && self->data_halt_reason) { + found = TRUE; + *data_halt_reason = self->data_halt_reason; + self->data_halt_reason = NDMP4_DATA_HALT_NA; + } + + if (mover_halt_reason && self->mover_halt_reason) { + found = TRUE; + *mover_halt_reason = self->mover_halt_reason; + self->mover_halt_reason = NDMP4_MOVER_HALT_NA; + } + + if (mover_pause_reason && self->mover_pause_reason) { + found = TRUE; + *mover_pause_reason = self->mover_pause_reason; + if (mover_pause_seek_position) + *mover_pause_seek_position = self->mover_pause_seek_position; + self->mover_pause_reason = NDMP4_MOVER_PAUSE_NA; + self->mover_pause_seek_position = 0; + } + + if (found) + return TRUE; + + /* otherwise, wait for an incoming packet and handle it, then try + * again. There's some select trickery here to avoid hogging the + * ndmlib_mutex - basically, we want to block as long as possible + * outside of the ndmlib_mutex critical section. This will also be + * useful to allow the wait to be aborted. */ + + /* handle_notify can be executed before the register exit */ + ndata->read_event = event_create(self->conn->chan.fd, + EV_READFD, handle_notify, ndata); + event_activate(ndata->read_event); + + while (!*cancelled && ndata->status == 2) { + g_cond_wait(abort_cond, abort_mutex); + } + g_static_mutex_lock(¬ify_mutex); + + if (ndata->read_event) { + event_release(ndata->read_event); + ndata->read_event = NULL; + } + if (ndata->status == 2) { + ndmp_connection_mover_abort(self); + ndmp_connection_mover_stop(self); + } + status = ndata->status; + ndata->in_use++; + if (ndata->in_use == 3) + ndata->in_use = 0; + g_static_mutex_unlock(¬ify_mutex); + return status; + +} + +static void +handle_notify(void *cookie) +{ + notify_data_t *ndata = cookie; + struct ndmp_msg_buf nmb; + gboolean found = FALSE; + GCond *abort_cond = ndata->abort_cond; + GMutex *abort_mutex = ndata->abort_mutex; + + g_mutex_lock(abort_mutex); + + g_static_mutex_lock(&ndmlib_mutex); + NDMOS_MACRO_ZEROFILL(&nmb); + nmb.protocol_version = NDMP4VER; + ndata->self->last_rc = ndmconn_recv_nmb(ndata->self->conn, &nmb); + g_static_mutex_unlock(&ndmlib_mutex); + + if (ndata->self->last_rc) { + /* (nothing to free) */ + ndata->status = 1; + goto notify_done; + } + + ndmconn_handle_notify(ndata->self, &nmb); + + + /* if any desired notifications have been received, then we're + * done */ + if (ndata->data_halt_reason && ndata->self->data_halt_reason) { + found = TRUE; + *ndata->data_halt_reason = ndata->self->data_halt_reason; + ndata->self->data_halt_reason = NDMP4_DATA_HALT_NA; + } + + if (ndata->mover_halt_reason && ndata->self->mover_halt_reason) { + found = TRUE; + *ndata->mover_halt_reason = ndata->self->mover_halt_reason; + ndata->self->mover_halt_reason = NDMP4_MOVER_HALT_NA; + } + + if (ndata->mover_pause_reason && ndata->self->mover_pause_reason) { + found = TRUE; + *ndata->mover_pause_reason = ndata->self->mover_pause_reason; + if (ndata->mover_pause_seek_position) + *ndata->mover_pause_seek_position = ndata->self->mover_pause_seek_position; + ndata->self->mover_pause_reason = NDMP4_MOVER_PAUSE_NA; + ndata->self->mover_pause_seek_position = 0; + } + + if (!found) { + g_static_mutex_lock(¬ify_mutex); + if (ndata->in_use == 2) { + goto notify_done_locked; + } + g_static_mutex_unlock(¬ify_mutex); + + g_mutex_unlock(abort_mutex); + return; + } + + ndata->status = 0; +notify_done: + g_static_mutex_lock(¬ify_mutex); +notify_done_locked: + if (ndata->read_event) { + event_release(ndata->read_event); + ndata->read_event = NULL; + } + ndata->in_use++; + if (ndata->in_use == 3) + ndata->in_use = 0; + g_static_mutex_unlock(¬ify_mutex); + + g_cond_broadcast(abort_cond); + g_mutex_unlock(abort_mutex); +} /* * Class Mechanics */