Fixed use of mutex and condition, which corrects a long-standing rare
authormichaelld <michaelld@221aa14e-8319-0410-a670-987f0aec2ac5>
Sat, 18 Aug 2007 02:35:43 +0000 (02:35 +0000)
committermichaelld <michaelld@221aa14e-8319-0410-a670-987f0aec2ac5>
Sat, 18 Aug 2007 02:35:43 +0000 (02:35 +0000)
bug over contention of shared variables.

git-svn-id: http://gnuradio.org/svn/gnuradio/trunk@6152 221aa14e-8319-0410-a670-987f0aec2ac5

usrp/host/lib/legacy/README_OSX
usrp/host/lib/legacy/circular_buffer.h
usrp/host/lib/legacy/circular_linked_list.h
usrp/host/lib/legacy/fusb_darwin.cc
usrp/host/lib/legacy/mld_threads.h

index 20230f121c0f1bd5c9822003dcb2af8401cacfba..37026f25a3daa5db76b99c5129dcea71ecefa95f 100644 (file)
@@ -51,7 +51,7 @@ usual).
 
 For -all- systems I've tested on thus far, both of these return
 exactly 41 overruns / underruns, and -most- systems start out with a
-stalled pipe.  This stall comes in a usb_control funciton call to
+stalled pipe.  This stall comes in a usb_control function call to
 LIBUSB; one would have to change the LIBUSB code to handle this issue.
 
 (3) from gr-build/gnuradio-examples/python/usrp :
index 4b5e7ba3518867197b7b60fa8335a10ed285528d..fa451d607b728e2c64a92ef067720b1551351d39 100644 (file)
 
 #define DO_DEBUG 0
 
+#if DO_DEBUG
+#define DEBUG(X) do{X} while(0);
+#else
+#define DEBUG(X) do{} while(0);
+#endif
+
 template <class T> class circular_buffer
 {
 private:
@@ -73,12 +79,10 @@ public:
     d_internal = NULL;
     d_readBlock = d_writeBlock = NULL;
     reset ();
-#if DO_DEBUG
-    fprintf (stderr, "c_b(): buf len (items) = %ld, "
-            "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
-            (d_doWriteBlock ? "true" : "false"),
-            (d_doFullRead ? "true" : "false"));
-#endif
+    DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, "
+                   "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
+                   (d_doWriteBlock ? "true" : "false"),
+                   (d_doFullRead ? "true" : "false")));
   };
 
   ~circular_buffer () {
@@ -110,9 +114,16 @@ public:
     d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
     d_n_avail_write_I = d_bufLen_I;
     delete_mutex_cond ();
+    // create a mutex to handle contention of shared resources;
+    // any routine needed access to shared resources uses lock()
+    // before doing anything, then unlock() when finished.
     d_internal = new mld_mutex ();
-    d_readBlock = new mld_condition ();
-    d_writeBlock = new mld_condition ();
+    // link the internal mutex to the read and write conditions;
+    // when wait() is called, the internal mutex will automatically
+    // be unlock()'ed.  Upon return (from a signal() to the condition),
+    // the internal mutex will be lock()'ed.
+    d_readBlock = new mld_condition (d_internal);
+    d_writeBlock = new mld_condition (d_internal);
   };
 
 /*
@@ -137,11 +148,9 @@ public:
  */
 
   int enqueue (T* buf, UInt32 bufLen_I) {
-#if DO_DEBUG
-    fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
-            "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
-            d_n_avail_write_I, d_n_avail_read_I);
-#endif
+    DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
+                   "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
+                   d_n_avail_write_I, d_n_avail_read_I));
     if (bufLen_I > d_bufLen_I) {
       fprintf (stderr, "cannot add buffer longer (%ld"
               ") than instantiated length (%ld"
@@ -164,29 +173,21 @@ public:
     if (bufLen_I > d_n_avail_write_I) {
       if (d_doWriteBlock) {
        while (bufLen_I > d_n_avail_write_I) {
-#if DO_DEBUG
-         fprintf (stderr, "enqueue: #len > #a, waiting.\n");
-#endif
-         d_internal->unlock ();
+         DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"));
+         // wait will automatically unlock() the internal mutex
          d_writeBlock->wait ();
-         d_internal->lock ();
+         // and lock() it here.
          if (d_doAbort) {
            d_internal->unlock ();
-#if DO_DEBUG
-           fprintf (stderr, "enqueue: #len > #a, aborting.\n");
-#endif
+           DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"));
            return (2);
          }
-#if DO_DEBUG
-         fprintf (stderr, "enqueue: #len > #a, done waiting.\n");
-#endif
+         DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"));
        }
       } else {
        d_n_avail_read_I = d_bufLen_I - bufLen_I;
        d_n_avail_write_I = bufLen_I;
-#if DO_DEBUG
-       fprintf (stderr, "circular_buffer::enqueue: overflow\n");
-#endif
+       DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"));
        retval = -1;
       }
     }
@@ -230,11 +231,9 @@ public:
  */
 
   int dequeue (T* buf, UInt32* bufLen_I) {
-#if DO_DEBUG
-    fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
-            "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
-            d_n_avail_write_I, d_n_avail_read_I);
-#endif
+    DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
+                   "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
+                   d_n_avail_write_I, d_n_avail_read_I));
     if (!bufLen_I)
       throw std::runtime_error ("circular_buffer::dequeue(): "
                                "input bufLen pointer is NULL.\n");
@@ -258,41 +257,29 @@ public:
     }
     if (d_doFullRead) {
       while (d_n_avail_read_I < l_bufLen_I) {
-#if DO_DEBUG
-       fprintf (stderr, "dequeue: #a < #len, waiting.\n");
-#endif
-       d_internal->unlock ();
+       DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"));
+       // wait will automatically unlock() the internal mutex
        d_readBlock->wait ();
-       d_internal->lock ();
+       // and lock() it here.
        if (d_doAbort) {
          d_internal->unlock ();
-#if DO_DEBUG
-         fprintf (stderr, "dequeue: #a < #len, aborting.\n");
-#endif
+         DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"));
          return (2);
        }
-#if DO_DEBUG
-       fprintf (stderr, "dequeue: #a < #len, done waiting.\n");
-#endif
+       DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"));
      }
     } else {
       while (d_n_avail_read_I == 0) {
-#if DO_DEBUG
-       fprintf (stderr, "dequeue: #a == 0, waiting.\n");
-#endif
-       d_internal->unlock ();
+       DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"));
+       // wait will automatically unlock() the internal mutex
        d_readBlock->wait ();
-       d_internal->lock ();
+       // and lock() it here.
        if (d_doAbort) {
          d_internal->unlock ();
-#if DO_DEBUG
-         fprintf (stderr, "dequeue: #a == 0, aborting.\n");
-#endif
+         DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"));
          return (2);
        }
-#if DO_DEBUG
-       fprintf (stderr, "dequeue: #a == 0, done waiting.\n");
-#endif
+       DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"));
       }
     }
     if (l_bufLen_I > d_n_avail_read_I)
index 38b1cdca2f60490b895de2b0fe70024f62487544..14d81ac915e2f47d648ba978a86f83c003c4c4b3 100644 (file)
 #include <stdexcept>
 
 #define __INLINE__ inline
+#define DO_DEBUG 0
+
+#if DO_DEBUG
+#define DEBUG(X) do{X} while(0);
+#else
+#define DEBUG(X) do{} while(0);
+#endif
 
 template <class T> class s_both;
 
@@ -140,8 +147,8 @@ public:
       }
     }
     d_available = d_current = l_prev;
-    d_internal = new mld_mutex ();
     d_ioBlock = new mld_condition ();
+    d_internal = d_ioBlock->mutex ();
   };
 
   ~circular_linked_list () {
@@ -151,8 +158,6 @@ public:
       delete l_node;
       l_node = iterate_next ();
     }
-    delete d_internal;
-    d_internal = NULL;
     delete d_ioBlock;
     d_ioBlock = NULL;
     d_available = d_inUse = d_iterate = d_current = NULL;
@@ -163,13 +168,17 @@ public:
     d_internal->lock ();
 // find an available node
     s_node_ptr l_node = d_available; 
+    DEBUG (fprintf (stderr, "w "));
     while (! l_node) {
-      d_internal->unlock ();
+      DEBUG (fprintf (stderr, "x\n"));
+      // the ioBlock condition will automatically unlock() d_internal
       d_ioBlock->wait ();
-      d_internal->lock ();
+      // and lock() is here
+      DEBUG (fprintf (stderr, "y\n"));
       l_node = d_available;
     }
-//  fprintf (stderr, "::f_n_a_n: #u = %ld, node = %p\n", num_used(), l_node);
+    DEBUG (fprintf (stderr, "::f_n_a_n: #u = %ld, node = %p\n",
+                   num_used(), l_node));
 // remove this one from the current available list
     if (num_available () == 1) {
 // last one, just set available to NULL
@@ -191,7 +200,8 @@ public:
   void make_node_available (s_node_ptr l_node) {
     if (!l_node) return;
     d_internal->lock ();
-//  fprintf (stderr, "::m_n_a: #u = %ld, node = %p\n", num_used(), l_node);
+    DEBUG (fprintf (stderr, "::m_n_a: #u = %ld, node = %p\n",
+                   num_used(), l_node));
 // remove this node from the inUse list
     if (num_used () == 1) {
 // last one, just set inUse to NULL
@@ -205,8 +215,12 @@ public:
     else
       l_node->insert_before (d_available);
     d_n_used--;
+
+    DEBUG (fprintf (stderr, "s%ld ", d_n_used));
 // signal the condition when new data arrives
     d_ioBlock->signal ();
+    DEBUG (fprintf (stderr, "t "));
+
 // unlock the mutex for thread safety
     d_internal->unlock ();
   };
index 61fe2257960092da86fffa0bd2654f17bda64d26..05db29e805d673e4b7c7eb9ccc4700a323cbe0b0 100644 (file)
@@ -366,6 +366,10 @@ fusb_ephandle_darwin::read (void* buffer, int nbytes)
 {
   UInt32 l_nbytes = (UInt32) nbytes;
   d_buffer->dequeue ((char*) buffer, &l_nbytes);
+
+  if (usb_debug > 4)
+    fprintf (stderr, "fusb_ephandle_darwin::read: request for %d bytes, %ld bytes retrieved.\n", nbytes, l_nbytes);
+
   return ((int) l_nbytes);
 }
 
index 6e60e8837eb5a65ad894765d2244b34d11a43b79..a59a928634229c9d71f185aa4bcebdeca85ad85e 100644 (file)
 #include <stdexcept>
 
 #define __INLINE__ inline
+#define DO_DEBUG 0
+
+#if DO_DEBUG
+#define DEBUG(X) do{X} while(0);
+#else
+#define DEBUG(X) do{} while(0);
+#endif
 
 class mld_condition_t;
 
@@ -133,12 +140,17 @@ class mld_condition_t {
 private:
   l_condition_ptr d_condition;
   mld_mutex_ptr d_mutex;
-  bool d_waiting;
+  bool d_i_own_mutex;
 
 public:
-  __INLINE__ mld_condition_t () {
-    d_waiting = false;
-    d_mutex = new mld_mutex ();
+  __INLINE__ mld_condition_t (mld_mutex_ptr mutex = NULL) {
+    if (mutex) {
+      d_i_own_mutex = false;
+      d_mutex = mutex;
+    } else {
+      d_i_own_mutex = true;
+      d_mutex = new mld_mutex ();
+    }
 #ifdef _USE_OMNI_THREADS_
     d_condition = new omni_condition (d_mutex->mutex ());
 #else
@@ -162,38 +174,40 @@ public:
 #endif
     delete d_condition;
     d_condition = NULL;
-    delete d_mutex;
+    if (d_i_own_mutex)
+      delete d_mutex;
     d_mutex = NULL;
   };
 
+  __INLINE__ mld_mutex_ptr mutex () {return (d_mutex);};
+
   __INLINE__ void signal () {
-    if (d_waiting == true) {
+    DEBUG (fprintf (stderr, "a "));
+
 #ifdef _USE_OMNI_THREADS_
-      d_condition->signal ();
+    d_condition->signal ();
 #else
-      int l_ret = pthread_cond_signal (d_condition);
-      if (l_ret != 0) {
-       fprintf (stderr, "mld_condition_t::signal(): "
-                "Error %d.\n", l_ret);
-      }
-#endif
-      d_waiting = false;
+    int l_ret = pthread_cond_signal (d_condition);
+    if (l_ret != 0) {
+      fprintf (stderr, "mld_condition_t::signal(): "
+              "Error %d.\n", l_ret);
     }
+#endif
+    DEBUG (fprintf (stderr, "b "));
   };
 
   __INLINE__ void wait () {
-    if (d_waiting == false) {
-      d_waiting = true;
+    DEBUG (fprintf (stderr, "c "));
 #ifdef _USE_OMNI_THREADS_
-      d_condition->wait ();
+    d_condition->wait ();
 #else
-      int l_ret = pthread_cond_wait (d_condition, d_mutex->mutex ());
-      if (l_ret != 0) {
-       fprintf (stderr, "mld_condition_t::wait(): "
-                "Error %d.\n", l_ret);
-      }
-#endif
+    int l_ret = pthread_cond_wait (d_condition, d_mutex->mutex ());
+    if (l_ret != 0) {
+      fprintf (stderr, "mld_condition_t::wait(): "
+              "Error %d.\n", l_ret);
     }
+#endif
+    DEBUG (printf (stderr, "d "));
   };
 };