From: michaelld Date: Sat, 18 Aug 2007 02:56:04 +0000 (+0000) Subject: Fixed use of mutex and condition, which corrects a long-standing rare X-Git-Url: https://git.gag.com/?a=commitdiff_plain;h=d33a2b199ec6ddaf648ec6ae0081807a70d278f9;p=debian%2Fgnuradio Fixed use of mutex and condition, which corrects a long-standing rare bug over contention of shared variables. git-svn-id: http://gnuradio.org/svn/gnuradio/trunk@6153 221aa14e-8319-0410-a670-987f0aec2ac5 --- diff --git a/gr-audio-osx/src/audio_osx_sink.cc b/gr-audio-osx/src/audio_osx_sink.cc index fd3312b6..fef21bab 100644 --- a/gr-audio-osx/src/audio_osx_sink.cc +++ b/gr-audio-osx/src/audio_osx_sink.cc @@ -154,15 +154,11 @@ audio_osx_sink::audio_osx_sink (int sample_rate, // create the stuff to regulate I/O - d_internal = new mld_mutex (); - if (d_internal == NULL) - CheckErrorAndThrow (errno, "new mld_mutex (internal)", - "audio_osx_source::audio_osx_source"); - d_cond_data = new mld_condition (); if (d_cond_data == NULL) CheckErrorAndThrow (errno, "new mld_condition (data)", "audio_osx_source::audio_osx_source"); + d_internal = d_cond_data->mutex (); // initialize the AU for output @@ -235,7 +231,6 @@ audio_osx_sink::~audio_osx_sink () d_buffers = 0; // close and delete control stuff - delete d_internal; delete d_cond_data; } @@ -258,7 +253,7 @@ audio_osx_sink::work (int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - d_internal->wait (); + d_internal->lock (); /* take the input data, copy it, and push it to the bottom of the queue mono input are pushed onto queue[0]; @@ -289,13 +284,11 @@ audio_osx_sink::work (int noutput_items, if (d_do_block == true) { // block until there is data to return while (d_queueSampleCount > l_max_count) { -// release control so-as to allow data to be retrieved - d_internal->post (); +// release control so-as to allow data to be retrieved; // block until there is data to return d_cond_data->wait (); // the condition's signal() was called; acquire control // to keep thread safe - d_internal->wait (); } } } @@ -340,7 +333,7 @@ audio_osx_sink::work (int noutput_items, #endif // release control to allow for other processing parts to run - d_internal->post (); + d_internal->unlock (); return (noutput_items); } @@ -356,7 +349,7 @@ OSStatus audio_osx_sink::AUOutputCallback audio_osx_sink* This = (audio_osx_sink*) inRefCon; OSStatus err = noErr; - This->d_internal->wait (); + This->d_internal->lock (); #if _OSX_AU_DEBUG_ fprintf (stderr, "cb_in: SC = %4ld, in#F = %4ld\n", @@ -392,7 +385,7 @@ OSStatus audio_osx_sink::AUOutputCallback This->d_cond_data->signal (); // release control to allow for other processing parts to run - This->d_internal->post (); + This->d_internal->unlock (); return (err); } diff --git a/gr-audio-osx/src/audio_osx_source.cc b/gr-audio-osx/src/audio_osx_source.cc index 5b5ed4fd..e82e8ad2 100644 --- a/gr-audio-osx/src/audio_osx_source.cc +++ b/gr-audio-osx/src/audio_osx_source.cc @@ -423,15 +423,11 @@ audio_osx_source::audio_osx_source (int sample_rate, // create the stuff to regulate I/O - d_internal = new mld_mutex (); - if (d_internal == NULL) - CheckErrorAndThrow (errno, "new mld_mutex (internal)", - "audio_osx_source::audio_osx_source"); - d_cond_data = new mld_condition (); if (d_cond_data == NULL) CheckErrorAndThrow (errno, "new mld_condition (data)", "audio_osx_source::audio_osx_source"); + d_internal = d_cond_data->mutex (); // initialize the AU for input @@ -580,7 +576,6 @@ audio_osx_source::~audio_osx_source () d_buffers = 0; // close and delete the control stuff - delete d_internal; delete d_cond_data; } @@ -636,7 +631,7 @@ audio_osx_source::work gr_vector_void_star &output_items) { // acquire control to do processing here only - d_internal->wait (); + d_internal->lock (); #if _OSX_AU_DEBUG_ fprintf (stderr, "work1: SC = %4ld, #OI = %4d, #Chan = %ld\n", @@ -656,18 +651,16 @@ audio_osx_source::work // no data; do_block decides what to do if (d_do_block == true) { while (d_queueSampleCount == 0) { - // release control so-as to allow data to be retrieved - d_internal->post (); + // release control so-as to allow data to be retrieved; // block until there is data to return d_cond_data->wait (); // the condition's signal() was called; acquire control to // keep thread safe - d_internal->wait (); } } else { // no data & not blocking; return nothing // release control so-as to allow data to be retrieved - d_internal->post (); + d_internal->unlock (); return (0); } } @@ -706,7 +699,7 @@ audio_osx_source::work // release control to allow for other processing parts to run - d_internal->post (); + d_internal->unlock (); #if _OSX_AU_DEBUG_ fprintf (stderr, "work3: Returning.\n"); @@ -764,7 +757,7 @@ audio_osx_source::AUInputCallback (void* inRefCon, OSStatus err = noErr; audio_osx_source* This = static_cast(inRefCon); - This->d_internal->wait (); + This->d_internal->lock (); #if _OSX_AU_DEBUG_ fprintf (stderr, "cb0: in#F = %4ld, inBN = %ld, SC = %4ld\n", @@ -899,7 +892,7 @@ audio_osx_source::AUInputCallback (void* inRefCon, #endif // release control to allow for other processing parts to run - This->d_internal->post (); + This->d_internal->unlock (); #if _OSX_AU_DEBUG_ fprintf (stderr, "cb6: returning.\n"); diff --git a/gr-audio-osx/src/circular_buffer.h b/gr-audio-osx/src/circular_buffer.h index 4b5e7ba3..fa451d60 100644 --- a/gr-audio-osx/src/circular_buffer.h +++ b/gr-audio-osx/src/circular_buffer.h @@ -28,6 +28,12 @@ #define DO_DEBUG 0 +#if DO_DEBUG +#define DEBUG(X) do{X} while(0); +#else +#define DEBUG(X) do{} while(0); +#endif + template class circular_buffer { private: @@ -73,12 +79,10 @@ public: d_internal = NULL; d_readBlock = d_writeBlock = NULL; reset (); -#if DO_DEBUG - fprintf (stderr, "c_b(): buf len (items) = %ld, " - "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I, - (d_doWriteBlock ? "true" : "false"), - (d_doFullRead ? "true" : "false")); -#endif + DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, " + "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I, + (d_doWriteBlock ? "true" : "false"), + (d_doFullRead ? "true" : "false"))); }; ~circular_buffer () { @@ -110,9 +114,16 @@ public: d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0; d_n_avail_write_I = d_bufLen_I; delete_mutex_cond (); + // create a mutex to handle contention of shared resources; + // any routine needed access to shared resources uses lock() + // before doing anything, then unlock() when finished. d_internal = new mld_mutex (); - d_readBlock = new mld_condition (); - d_writeBlock = new mld_condition (); + // link the internal mutex to the read and write conditions; + // when wait() is called, the internal mutex will automatically + // be unlock()'ed. Upon return (from a signal() to the condition), + // the internal mutex will be lock()'ed. + d_readBlock = new mld_condition (d_internal); + d_writeBlock = new mld_condition (d_internal); }; /* @@ -137,11 +148,9 @@ public: */ int enqueue (T* buf, UInt32 bufLen_I) { -#if DO_DEBUG - fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, " - "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I, - d_n_avail_write_I, d_n_avail_read_I); -#endif + DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, " + "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I, + d_n_avail_write_I, d_n_avail_read_I)); if (bufLen_I > d_bufLen_I) { fprintf (stderr, "cannot add buffer longer (%ld" ") than instantiated length (%ld" @@ -164,29 +173,21 @@ public: if (bufLen_I > d_n_avail_write_I) { if (d_doWriteBlock) { while (bufLen_I > d_n_avail_write_I) { -#if DO_DEBUG - fprintf (stderr, "enqueue: #len > #a, waiting.\n"); -#endif - d_internal->unlock (); + DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n")); + // wait will automatically unlock() the internal mutex d_writeBlock->wait (); - d_internal->lock (); + // and lock() it here. if (d_doAbort) { d_internal->unlock (); -#if DO_DEBUG - fprintf (stderr, "enqueue: #len > #a, aborting.\n"); -#endif + DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n")); return (2); } -#if DO_DEBUG - fprintf (stderr, "enqueue: #len > #a, done waiting.\n"); -#endif + DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n")); } } else { d_n_avail_read_I = d_bufLen_I - bufLen_I; d_n_avail_write_I = bufLen_I; -#if DO_DEBUG - fprintf (stderr, "circular_buffer::enqueue: overflow\n"); -#endif + DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n")); retval = -1; } } @@ -230,11 +231,9 @@ public: */ int dequeue (T* buf, UInt32* bufLen_I) { -#if DO_DEBUG - fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, " - "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I, - d_n_avail_write_I, d_n_avail_read_I); -#endif + DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, " + "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I, + d_n_avail_write_I, d_n_avail_read_I)); if (!bufLen_I) throw std::runtime_error ("circular_buffer::dequeue(): " "input bufLen pointer is NULL.\n"); @@ -258,41 +257,29 @@ public: } if (d_doFullRead) { while (d_n_avail_read_I < l_bufLen_I) { -#if DO_DEBUG - fprintf (stderr, "dequeue: #a < #len, waiting.\n"); -#endif - d_internal->unlock (); + DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n")); + // wait will automatically unlock() the internal mutex d_readBlock->wait (); - d_internal->lock (); + // and lock() it here. if (d_doAbort) { d_internal->unlock (); -#if DO_DEBUG - fprintf (stderr, "dequeue: #a < #len, aborting.\n"); -#endif + DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n")); return (2); } -#if DO_DEBUG - fprintf (stderr, "dequeue: #a < #len, done waiting.\n"); -#endif + DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n")); } } else { while (d_n_avail_read_I == 0) { -#if DO_DEBUG - fprintf (stderr, "dequeue: #a == 0, waiting.\n"); -#endif - d_internal->unlock (); + DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n")); + // wait will automatically unlock() the internal mutex d_readBlock->wait (); - d_internal->lock (); + // and lock() it here. if (d_doAbort) { d_internal->unlock (); -#if DO_DEBUG - fprintf (stderr, "dequeue: #a == 0, aborting.\n"); -#endif + DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n")); return (2); } -#if DO_DEBUG - fprintf (stderr, "dequeue: #a == 0, done waiting.\n"); -#endif + DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n")); } } if (l_bufLen_I > d_n_avail_read_I) diff --git a/gr-audio-osx/src/mld_threads.h b/gr-audio-osx/src/mld_threads.h index cd62edff..a59a9286 100644 --- a/gr-audio-osx/src/mld_threads.h +++ b/gr-audio-osx/src/mld_threads.h @@ -4,6 +4,8 @@ * * This file is part of GNU Radio. * + * Primary Author: Michael Dickens, NCIP Lab, University of Notre Dame + * * GNU Radio is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3, or (at your option) @@ -25,6 +27,7 @@ /* classes which allow for either pthreads or omni_threads */ +#define __macos__ #ifdef _USE_OMNI_THREADS_ #include #else @@ -34,6 +37,13 @@ #include #define __INLINE__ inline +#define DO_DEBUG 0 + +#if DO_DEBUG +#define DEBUG(X) do{X} while(0); +#else +#define DEBUG(X) do{} while(0); +#endif class mld_condition_t; @@ -130,12 +140,17 @@ class mld_condition_t { private: l_condition_ptr d_condition; mld_mutex_ptr d_mutex; - bool d_waiting; + bool d_i_own_mutex; public: - __INLINE__ mld_condition_t () { - d_waiting = false; - d_mutex = new mld_mutex (); + __INLINE__ mld_condition_t (mld_mutex_ptr mutex = NULL) { + if (mutex) { + d_i_own_mutex = false; + d_mutex = mutex; + } else { + d_i_own_mutex = true; + d_mutex = new mld_mutex (); + } #ifdef _USE_OMNI_THREADS_ d_condition = new omni_condition (d_mutex->mutex ()); #else @@ -159,38 +174,40 @@ public: #endif delete d_condition; d_condition = NULL; - delete d_mutex; + if (d_i_own_mutex) + delete d_mutex; d_mutex = NULL; }; + __INLINE__ mld_mutex_ptr mutex () {return (d_mutex);}; + __INLINE__ void signal () { - if (d_waiting == true) { + DEBUG (fprintf (stderr, "a ")); + #ifdef _USE_OMNI_THREADS_ - d_condition->signal (); + d_condition->signal (); #else - int l_ret = pthread_cond_signal (d_condition); - if (l_ret != 0) { - fprintf (stderr, "mld_condition_t::signal(): " - "Error %d.\n", l_ret); - } -#endif - d_waiting = false; + int l_ret = pthread_cond_signal (d_condition); + if (l_ret != 0) { + fprintf (stderr, "mld_condition_t::signal(): " + "Error %d.\n", l_ret); } +#endif + DEBUG (fprintf (stderr, "b ")); }; __INLINE__ void wait () { - if (d_waiting == false) { - d_waiting = true; + DEBUG (fprintf (stderr, "c ")); #ifdef _USE_OMNI_THREADS_ - d_condition->wait (); + d_condition->wait (); #else - int l_ret = pthread_cond_wait (d_condition, d_mutex->mutex ()); - if (l_ret != 0) { - fprintf (stderr, "mld_condition_t::wait(): " - "Error %d.\n", l_ret); - } -#endif + int l_ret = pthread_cond_wait (d_condition, d_mutex->mutex ()); + if (l_ret != 0) { + fprintf (stderr, "mld_condition_t::wait(): " + "Error %d.\n", l_ret); } +#endif + DEBUG (printf (stderr, "d ")); }; };