Merged OSX fixes for 10.5 (backwards compatible with 10.4 if not
authormichaelld <michaelld@221aa14e-8319-0410-a670-987f0aec2ac5>
Sun, 13 Jan 2008 20:41:11 +0000 (20:41 +0000)
committermichaelld <michaelld@221aa14e-8319-0410-a670-987f0aec2ac5>
Sun, 13 Jan 2008 20:41:11 +0000 (20:41 +0000)
earlier) for USRP legacy fast-usb code from r7358 branch into trunk:

Fixed DEBUG commands in all files.
Fixed flow control between originating and spawned threads.
Fixed WritePipeAsync buffer write size.
Added in debugging comments to fusb code, to better track async flow.

NOT YET updated for MacOS X 10.5-specific IOKit code, but everything
seems to work just fine as is.

git-svn-id: http://gnuradio.org/svn/gnuradio/trunk@7417 221aa14e-8319-0410-a670-987f0aec2ac5

usrp/host/lib/legacy/circular_buffer.h
usrp/host/lib/legacy/circular_linked_list.h
usrp/host/lib/legacy/fusb_darwin.cc
usrp/host/lib/legacy/mld_threads.h

index fa451d607b728e2c64a92ef067720b1551351d39..8898e4194830c6b4243f4b7e9880fd1f89be723a 100644 (file)
@@ -26,7 +26,9 @@
 #include "mld_threads.h"
 #include <stdexcept>
 
+#ifndef DO_DEBUG
 #define DO_DEBUG 0
+#endif
 
 #if DO_DEBUG
 #define DEBUG(X) do{X} while(0);
@@ -82,7 +84,7 @@ public:
     DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, "
                    "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
                    (d_doWriteBlock ? "true" : "false"),
-                   (d_doFullRead ? "true" : "false")));
+                   (d_doFullRead ? "true" : "false")););
   };
 
   ~circular_buffer () {
@@ -150,7 +152,7 @@ public:
   int enqueue (T* buf, UInt32 bufLen_I) {
     DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
                    "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
-                   d_n_avail_write_I, d_n_avail_read_I));
+                   d_n_avail_write_I, d_n_avail_read_I););
     if (bufLen_I > d_bufLen_I) {
       fprintf (stderr, "cannot add buffer longer (%ld"
               ") than instantiated length (%ld"
@@ -173,21 +175,21 @@ public:
     if (bufLen_I > d_n_avail_write_I) {
       if (d_doWriteBlock) {
        while (bufLen_I > d_n_avail_write_I) {
-         DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"));
+         DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"););
          // wait will automatically unlock() the internal mutex
          d_writeBlock->wait ();
          // and lock() it here.
          if (d_doAbort) {
            d_internal->unlock ();
-           DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"));
+           DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"););
            return (2);
          }
-         DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"));
+         DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"););
        }
       } else {
        d_n_avail_read_I = d_bufLen_I - bufLen_I;
        d_n_avail_write_I = bufLen_I;
-       DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"));
+       DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"););
        retval = -1;
       }
     }
@@ -233,7 +235,7 @@ public:
   int dequeue (T* buf, UInt32* bufLen_I) {
     DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
                    "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
-                   d_n_avail_write_I, d_n_avail_read_I));
+                   d_n_avail_write_I, d_n_avail_read_I););
     if (!bufLen_I)
       throw std::runtime_error ("circular_buffer::dequeue(): "
                                "input bufLen pointer is NULL.\n");
@@ -257,29 +259,29 @@ public:
     }
     if (d_doFullRead) {
       while (d_n_avail_read_I < l_bufLen_I) {
-       DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"));
+       DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"););
        // wait will automatically unlock() the internal mutex
        d_readBlock->wait ();
        // and lock() it here.
        if (d_doAbort) {
          d_internal->unlock ();
-         DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"));
+         DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"););
          return (2);
        }
-       DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"));
+       DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"););
      }
     } else {
       while (d_n_avail_read_I == 0) {
-       DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"));
+       DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"););
        // wait will automatically unlock() the internal mutex
        d_readBlock->wait ();
        // and lock() it here.
        if (d_doAbort) {
          d_internal->unlock ();
-         DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"));
+         DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"););
          return (2);
        }
-       DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"));
+       DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"););
       }
     }
     if (l_bufLen_I > d_n_avail_read_I)
index 14d81ac915e2f47d648ba978a86f83c003c4c4b3..e495d609b2f6c265b19d48a4b24515cd8f036588 100644 (file)
 #include <stdexcept>
 
 #define __INLINE__ inline
+
+#ifndef DO_DEBUG
 #define DO_DEBUG 0
+#endif
 
 #if DO_DEBUG
 #define DEBUG(X) do{X} while(0);
@@ -168,17 +171,17 @@ public:
     d_internal->lock ();
 // find an available node
     s_node_ptr l_node = d_available; 
-    DEBUG (fprintf (stderr, "w "));
+    DEBUG (fprintf (stderr, "w "););
     while (! l_node) {
-      DEBUG (fprintf (stderr, "x\n"));
+      DEBUG (fprintf (stderr, "x\n"););
       // the ioBlock condition will automatically unlock() d_internal
       d_ioBlock->wait ();
       // and lock() is here
-      DEBUG (fprintf (stderr, "y\n"));
+      DEBUG (fprintf (stderr, "y\n"););
       l_node = d_available;
     }
     DEBUG (fprintf (stderr, "::f_n_a_n: #u = %ld, node = %p\n",
-                   num_used(), l_node));
+                   num_used(), l_node););
 // remove this one from the current available list
     if (num_available () == 1) {
 // last one, just set available to NULL
@@ -201,7 +204,7 @@ public:
     if (!l_node) return;
     d_internal->lock ();
     DEBUG (fprintf (stderr, "::m_n_a: #u = %ld, node = %p\n",
-                   num_used(), l_node));
+                   num_used(), l_node););
 // remove this node from the inUse list
     if (num_used () == 1) {
 // last one, just set inUse to NULL
@@ -216,10 +219,10 @@ public:
       l_node->insert_before (d_available);
     d_n_used--;
 
-    DEBUG (fprintf (stderr, "s%ld ", d_n_used));
+    DEBUG (fprintf (stderr, "s%ld ", d_n_used););
 // signal the condition when new data arrives
     d_ioBlock->signal ();
-    DEBUG (fprintf (stderr, "t "));
+    DEBUG (fprintf (stderr, "t "););
 
 // unlock the mutex for thread safety
     d_internal->unlock ();
index 05db29e805d673e4b7c7eb9ccc4700a323cbe0b0..737387b877bc9cac8798ff8a659b856bba3e6123 100644 (file)
@@ -27,6 +27,7 @@
 // tell mld_threads to NOT use omni_threads,
 // but rather Darwin's pthreads
 #define _USE_OMNI_THREADS_
+#define DO_DEBUG 0
 
 #include <usb.h>
 #include "fusb.h"
@@ -190,13 +191,18 @@ fusb_ephandle_darwin::start ()
             d_endpoint, d_pipeRef, d_interface, d_interfaceRef, direction,
             number, interval, maxPacketSize);
 
-// set global start boolean
+  // set global start boolean
   d_started = true;
 
-// create the run thread, which allows OSX to process I/O separately
+  // lock the runBlock mutex, before creating the run thread.
+  // this guarantees that we can control execution between these 2 threads
+  d_runBlock->mutex ()->lock ();
+
+  // create the run thread, which allows OSX to process I/O separately
   d_runThread = new mld_thread (run_thread, this);
 
-// wait until the threads are -really- going
+  // wait until the run thread (and possibky read thread) are -really-
+  // going; this will unlock the mutex before waiting for a signal ()
   d_runBlock->wait ();
 
   if (usb_debug)
@@ -210,11 +216,16 @@ void
 fusb_ephandle_darwin::run_thread (void* arg)
 {
   fusb_ephandle_darwin* This = static_cast<fusb_ephandle_darwin*>(arg);
+
+  // lock the run thread running mutex; if ::stop() is called, it will
+  // first abort() the pipe then wait for the run thread to finish,
+  // via a lock() on this mutex
   mld_mutex_ptr l_runThreadRunning = This->d_runThreadRunning;
   l_runThreadRunning->lock ();
 
   mld_mutex_ptr l_readRunning = This->d_readRunning;
   mld_condition_ptr l_readBlock = This->d_readBlock;
+  mld_mutex_ptr l_readBlock_mutex = l_readBlock->mutex ();
 
   bool l_input_p = This->d_input_p;
 
@@ -237,24 +248,41 @@ fusb_ephandle_darwin::run_thread (void* arg)
   mld_thread_ptr l_rwThread = NULL;
 
   if (l_input_p) {
+    // lock the readBlock mutex, before creating the read thread.
+    // this guarantees that we can control execution between these 2 threads
+    l_readBlock_mutex->lock ();
+    // create the read thread, which just issues all of the starting
+    // async read commands, then returns
     l_rwThread = new mld_thread (read_thread, arg);
-// wait until the the rwThread is -really- going
+    // wait until the the read thread is -really- going; this will
+    // unlock the read block mutex before waiting for a signal ()
     l_readBlock->wait ();
   }
 
-// now signal the run condition to release and finish ::start()
+  // now signal the run condition to release and finish ::start().
+
+  // lock the runBlock mutex first; this will force waiting until the
+  // ->wait() command is issued in ::start()
+  mld_mutex_ptr l_run_block_mutex = This->d_runBlock->mutex ();
+  l_run_block_mutex->lock ();
+
+  // now that the lock is in place, signal the parent thread that
+  // things are running
   This->d_runBlock->signal ();
 
-// run the loop
+  // release the run_block mutex, just in case
+  l_run_block_mutex->unlock ();
+
+  // run the loop
   CFRunLoopRun ();
 
   if (l_input_p) {
-// wait for read_thread () to finish
+    // wait for read_thread () to finish, if needed
     l_readRunning->lock ();
     l_readRunning->unlock ();
   }
 
-// remove run loop stuff
+  // remove run loop stuff
   CFRunLoopRemoveSource (CFRunLoopGetCurrent (),
                         l_cfSource, kCFRunLoopDefaultMode);
 
@@ -262,6 +290,7 @@ fusb_ephandle_darwin::run_thread (void* arg)
     fprintf (stderr, "fusb_ephandle_darwin::run_thread: finished for %s.\n",
             l_input_p ? "read" : "write");
 
+  // release the run thread running mutex
   l_runThreadRunning->unlock ();
 }
 
@@ -273,13 +302,27 @@ fusb_ephandle_darwin::read_thread (void* arg)
 
   fusb_ephandle_darwin* This = static_cast<fusb_ephandle_darwin*>(arg);
 
+  // before doing anything else, lock the read running mutex.  this
+  // mutex does flow control between this thread and the run_thread
   mld_mutex_ptr l_readRunning = This->d_readRunning;
   l_readRunning->lock ();
 
-// signal the read condition from run_thread() to continue
+  // signal the read condition from run_thread() to continue
+
+  // lock the readBlock mutex first; this will force waiting until the
+  // ->wait() command is issued in ::run_thread()
   mld_condition_ptr l_readBlock = This->d_readBlock;
+  mld_mutex_ptr l_read_block_mutex = l_readBlock->mutex ();
+  l_read_block_mutex->lock ();
+
+  // now that the lock is in place, signal the parent thread that
+  // things are running here
   l_readBlock->signal ();
 
+  // release the run_block mutex, just in case
+  l_read_block_mutex->unlock ();
+
+  // queue up all of the available read requests
   s_queue_ptr l_queue = This->d_queue;
   l_queue->iterate_start ();
   s_node_ptr l_node = l_queue->iterate_next ();
@@ -291,14 +334,21 @@ fusb_ephandle_darwin::read_thread (void* arg)
   if (usb_debug)
     fprintf (stderr, "fusb_ephandle_darwin::read_thread: finished.\n");
 
+  // release the read running mutex, to let the parent thread knows
+  // that this thread is finished
   l_readRunning->unlock ();
 }
 
 void
 fusb_ephandle_darwin::read_issue (s_both_ptr l_both)
 {
-  if ((! l_both) || (! d_started))
+  if ((! l_both) || (! d_started)) {
+    if (usb_debug > 4)
+      fprintf (stderr, "fusb_ephandle_darwin::read_issue: Doing nothing; "
+              "l_both is %X; started is %s\n", (unsigned int) l_both,
+              d_started ? "TRUE" : "FALSE");
     return;
+  }
 
 // set the node and buffer from the input "both"
   s_node_ptr l_node = l_both->node ();
@@ -328,6 +378,9 @@ fusb_ephandle_darwin::read_issue (s_both_ptr l_both)
                          "(ReadPipeAsync%s): %s",
                          d_transferType == kUSBInterrupt ? "" : "TO",
                          darwin_error_str (result));
+  else if (usb_debug > 4)
+    fprintf (stderr, "fusb_ephandle_darwin::read_issue: "
+            "Queued %X (%ld Bytes)\n", (unsigned int) l_both, bufLen);
 }
 
 void
@@ -347,6 +400,10 @@ fusb_ephandle_darwin::read_completed (void* refCon,
     fprintf (stderr, "fusb_ephandle_darwin::read_completed: "
             "Expected %ld bytes; read %ld.\n",
             l_i_size, l_size);
+  else if (usb_debug > 4)
+    fprintf (stderr, "fusb_ephandle_darwin::read_completed: "
+            "Read %X (%ld bytes)\n",
+            (unsigned int) l_both, l_size);
 
 // add this read to the transfer buffer
   if (l_buffer->enqueue (l_buf->buffer (), l_size) == -1) {
@@ -378,7 +435,12 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes)
 {
   UInt32 l_nbytes = (UInt32) nbytes;
 
-  if (! d_started) return (0);
+  if (! d_started) {
+    if (usb_debug)
+      fprintf (stderr, "fusb_ephandle_darwin::write: Not yet started.\n");
+
+    return (0);
+  }
 
   while (l_nbytes != 0) {
 // find out how much data to copy; limited to "d_bufLenBytes" per node
@@ -400,11 +462,11 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes)
     if (d_transferType == kUSBInterrupt)
 /* This is an interrupt pipe ... can't specify a timeout. */
       result = d_interface->WritePipeAsync
-       (d_interfaceRef, d_pipeRef, v_buffer, l_nbytes,
+       (d_interfaceRef, d_pipeRef, v_buffer, t_nbytes,
         (IOAsyncCallback1) write_completed, (void*) l_both);
     else
       result = d_interface->WritePipeAsyncTO
-       (d_interfaceRef, d_pipeRef, v_buffer, l_nbytes, 0, USB_TIMEOUT,
+       (d_interfaceRef, d_pipeRef, v_buffer, t_nbytes, 0, USB_TIMEOUT,
         (IOAsyncCallback1) write_completed, (void*) l_both);
 
     if (result != kIOReturnSuccess)
@@ -413,6 +475,10 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes)
                     "(WritePipeAsync%s): %s",
                     d_transferType == kUSBInterrupt ? "" : "TO",
                     darwin_error_str (result));
+    else if (usb_debug > 4) {
+      fprintf (stderr, "fusb_ephandle_darwin::write_thread: "
+              "Queued %X (%ld Bytes)\n", (unsigned int) l_both, t_nbytes);
+    }
     l_nbytes -= t_nbytes;
   }
 
@@ -436,6 +502,9 @@ fusb_ephandle_darwin::write_completed (void* refCon,
     fprintf (stderr, "fusb_ephandle_darwin::write_completed: "
             "Expected %ld bytes written; wrote %ld.\n",
             l_i_size, l_size);
+  else if (usb_debug > 4)
+    fprintf (stderr, "fusb_ephandle_darwin::write_completed: "
+            "Wrote %X (%ld Bytes)\n", (unsigned int) l_both, l_size);
 
 // set buffer's # data to 0
   l_buf->n_used (0);
index a59a928634229c9d71f185aa4bcebdeca85ad85e..b2ec657510043b2fa1b7404ac8b5921a2b0644e5 100644 (file)
 #include <stdexcept>
 
 #define __INLINE__ inline
+
+#ifndef DO_DEBUG
 #define DO_DEBUG 0
+#endif
 
 #if DO_DEBUG
 #define DEBUG(X) do{X} while(0);
@@ -182,7 +185,7 @@ public:
   __INLINE__ mld_mutex_ptr mutex () {return (d_mutex);};
 
   __INLINE__ void signal () {
-    DEBUG (fprintf (stderr, "a "));
+    DEBUG (fprintf (stderr, "a "););
 
 #ifdef _USE_OMNI_THREADS_
     d_condition->signal ();
@@ -193,11 +196,11 @@ public:
               "Error %d.\n", l_ret);
     }
 #endif
-    DEBUG (fprintf (stderr, "b "));
+    DEBUG (fprintf (stderr, "b "););
   };
 
   __INLINE__ void wait () {
-    DEBUG (fprintf (stderr, "c "));
+    DEBUG (fprintf (stderr, "c "););
 #ifdef _USE_OMNI_THREADS_
     d_condition->wait ();
 #else
@@ -207,7 +210,7 @@ public:
               "Error %d.\n", l_ret);
     }
 #endif
-    DEBUG (printf (stderr, "d "));
+    DEBUG (fprintf (stderr, "d "););
   };
 };