X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=ndmp-src%2Fndmpconnobj.c;h=41d14ed0e884515ab6c850dfb96c82c65e0a49c9;hb=949b8910a5e23c4285d0b1aedacfc82a14dc97a5;hp=1fe040a922b6548e75f077f33bcc502f82118de6;hpb=c6f0a88c567f8536c498f554285aed1f8150da18;p=debian%2Famanda diff --git a/ndmp-src/ndmpconnobj.c b/ndmp-src/ndmpconnobj.c index 1fe040a..41d14ed 100644 --- a/ndmp-src/ndmpconnobj.c +++ b/ndmp-src/ndmpconnobj.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 as published @@ -19,6 +19,7 @@ */ #include "amanda.h" +#include "event.h" #include "sockaddr-util.h" #include "ndmpconnobj.h" @@ -736,6 +737,165 @@ ndmp_connection_wait_for_notify( } } +typedef struct notify_data { + NDMPConnection *self; + ndmp9_data_halt_reason *data_halt_reason; + ndmp9_mover_halt_reason *mover_halt_reason; + ndmp9_mover_pause_reason *mover_pause_reason; + guint64 *mover_pause_seek_position; + GMutex *abort_mutex; + GCond *abort_cond; + int status; + event_handle_t *read_event; +} notify_data; + +static void handle_notify(void *cookie); + +int +ndmp_connection_wait_for_notify_with_cond( + NDMPConnection *self, + ndmp9_data_halt_reason *data_halt_reason, + ndmp9_mover_halt_reason *mover_halt_reason, + ndmp9_mover_pause_reason *mover_pause_reason, + guint64 *mover_pause_seek_position, + GMutex *abort_mutex, + GCond *abort_cond) +{ + struct ndmp_msg_buf nmb; + notify_data ndata; + gboolean found = FALSE; + + ndata.self = self; + ndata.data_halt_reason= data_halt_reason; + ndata.mover_halt_reason= mover_halt_reason; + ndata.mover_pause_reason= mover_pause_reason; + ndata.mover_pause_seek_position = mover_pause_seek_position; + ndata.abort_mutex = abort_mutex; + ndata.abort_cond = abort_cond; + ndata.status = 2; + + g_assert(!self->startup_err); + + /* initialize output parameters */ + if (data_halt_reason) + *data_halt_reason = NDMP4_DATA_HALT_NA; + if (mover_halt_reason) + *mover_halt_reason = NDMP4_MOVER_HALT_NA; + if (mover_pause_reason) + *mover_pause_reason = NDMP4_MOVER_PAUSE_NA; + if (mover_pause_seek_position) + *mover_pause_seek_position = 0; + + /* if any desired notifications have been received, then we're + * done */ + if (data_halt_reason && self->data_halt_reason) { + found = TRUE; + *data_halt_reason = self->data_halt_reason; + self->data_halt_reason = NDMP4_DATA_HALT_NA; + } + + if (mover_halt_reason && self->mover_halt_reason) { + found = TRUE; + *mover_halt_reason = self->mover_halt_reason; + self->mover_halt_reason = NDMP4_MOVER_HALT_NA; + } + + if (mover_pause_reason && self->mover_pause_reason) { + found = TRUE; + *mover_pause_reason = self->mover_pause_reason; + if (mover_pause_seek_position) + *mover_pause_seek_position = self->mover_pause_seek_position; + self->mover_pause_reason = NDMP4_MOVER_PAUSE_NA; + self->mover_pause_seek_position = 0; + } + + if (found) + return TRUE; + + /* otherwise, wait for an incoming packet and handle it, then try + * again. There's some select trickery here to avoid hogging the + * ndmlib_mutex - basically, we want to block as long as possible + * outside of the ndmlib_mutex critical section. This will also be + * useful to allow the wait to be aborted. */ + + ndata.read_event = event_register(self->conn->chan.fd, + EV_READFD, handle_notify, &ndata); + + g_cond_wait(abort_cond, abort_mutex); + + if (ndata.read_event) { + event_release(ndata.read_event); + } + if (ndata.status == 2) { + ndmp_connection_mover_abort(self); + ndmp_connection_mover_stop(self); + } + return ndata.status; + +} + +static void +handle_notify(void *cookie) +{ + notify_data *ndata = cookie; + struct ndmp_msg_buf nmb; + gboolean found = FALSE; + + g_mutex_lock(ndata->abort_mutex); + + event_release(ndata->read_event); + ndata->read_event = NULL; + + g_static_mutex_lock(&ndmlib_mutex); + NDMOS_MACRO_ZEROFILL(&nmb); + nmb.protocol_version = NDMP4VER; + ndata->self->last_rc = ndmconn_recv_nmb(ndata->self->conn, &nmb); + g_static_mutex_unlock(&ndmlib_mutex); + + if (ndata->self->last_rc) { + /* (nothing to free) */ + ndata->status = 1; + goto notify_done; + } + + ndmconn_handle_notify(ndata->self, &nmb); + + + /* if any desired notifications have been received, then we're + * done */ + if (ndata->data_halt_reason && ndata->self->data_halt_reason) { + found = TRUE; + *ndata->data_halt_reason = ndata->self->data_halt_reason; + ndata->self->data_halt_reason = NDMP4_DATA_HALT_NA; + } + + if (ndata->mover_halt_reason && ndata->self->mover_halt_reason) { + found = TRUE; + *ndata->mover_halt_reason = ndata->self->mover_halt_reason; + ndata->self->mover_halt_reason = NDMP4_MOVER_HALT_NA; + } + + if (ndata->mover_pause_reason && ndata->self->mover_pause_reason) { + found = TRUE; + *ndata->mover_pause_reason = ndata->self->mover_pause_reason; + if (ndata->mover_pause_seek_position) + *ndata->mover_pause_seek_position = ndata->self->mover_pause_seek_position; + ndata->self->mover_pause_reason = NDMP4_MOVER_PAUSE_NA; + ndata->self->mover_pause_seek_position = 0; + } + + if (!found) { + ndata->read_event = event_register(ndata->self->conn->chan.fd, + EV_READFD, handle_notify, ndata); + g_mutex_unlock(ndata->abort_mutex); + return; + } + + ndata->status = 0; +notify_done: + g_cond_broadcast(ndata->abort_cond); + g_mutex_unlock(ndata->abort_mutex); +} /* * Class Mechanics */