Thread-per-block loop now checks and dequeues messages to handle_msg.
authorJohnathan Corgan <jcorgan@corganenterprises.com>
Tue, 4 Aug 2009 20:17:18 +0000 (13:17 -0700)
committerEric Blossom <eb@comsec.com>
Fri, 14 Aug 2009 04:40:08 +0000 (21:40 -0700)
gnuradio-core/src/lib/runtime/gr_basic_block.h
gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
gnuradio-core/src/lib/runtime/gr_msg_accepter.h
gnuradio-core/src/lib/runtime/gr_tpb_detail.h
gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc

index 5d5b8cbc723448399cdf8a5d6589e3bbc5db4277..b8797fdc678038d49075b0faf94cce8176ca5267 100644 (file)
@@ -40,7 +40,7 @@
  * signal processing functions.
  */
 
-class gr_basic_block : gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block>
+class gr_basic_block : public gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block>
 {
 protected:
     friend class gr_flowgraph;
index ebe11870af924c5d35a742576e7905c332be0d60..50b41df88b1a23f668af1e99383f6a2b2832e1f3 100644 (file)
@@ -45,12 +45,12 @@ gr_msg_accepter::post(pmt_t msg)
   // Let parent class do whatever it would have
   gruel::msg_accepter_msgq::post(msg);
 
-  // Notify this block's scheduler a message is pending
+  // Notify derived class, handled case by case
   gr_block *p = dynamic_cast<gr_block *>(this);
-  if (p)  
+  if (p) { 
     p->detail()->d_tpb.notify_msg();
-  else {
-    // got here somehow with a non-gr_block
-    throw std::runtime_error("gr_msg_accepter::post() - invalid derived class");
+    return;
   }
+
+  // Test for other derived classes and handle
 }
index 8ce8d1d9e39b1b4a52d85646d32202113a82bde2..2073e7ff16ee166ef1257cf7d94934be850a824e 100644 (file)
@@ -28,7 +28,7 @@
  * \brief Accepts messages and inserts them into a message queue, then notifies
  * subclass gr_basic_block there is a message pending.
  */
-class gr_msg_accepter : gruel::msg_accepter_msgq
+class gr_msg_accepter : public gruel::msg_accepter_msgq
 {
 public:
   gr_msg_accepter(gruel::msg_queue_sptr msgq);
index 29101d7307b36a6c14877c40f82b3e15616f4864..a1df55806eb9b88e5db04034d570404e66d3ccc7 100644 (file)
@@ -35,10 +35,9 @@ struct gr_tpb_detail {
   gruel::condition_variable    input_cond;
   bool                         output_changed;
   gruel::condition_variable    output_cond;
-  bool                          msg_pending;
 
   gr_tpb_detail()
-    : input_changed(false), output_changed(false), msg_pending(false) { }
+    : input_changed(false), output_changed(false) { }
 
   //! Called by us to tell all our upstream blocks that their output may have changed.
   void notify_upstream(gr_block_detail *d);
@@ -61,7 +60,8 @@ struct gr_tpb_detail {
   void notify_msg()
   {
     gruel::scoped_lock guard(mutex);
-    msg_pending = true;
+
+    // Just wake up thread if BLKD_IN or BLKD_OUT
     input_cond.notify_one();
     output_cond.notify_one();
   }
index 458b16d64cd436ee38f052f68aa8fa1071523d79..c601b588c2d66dfb87ef47ea672918e99ff3351e 100644 (file)
 #include <gr_tpb_thread_body.h>
 #include <iostream>
 #include <boost/thread.hpp>
+#include <gruel/pmt.h>
+
+using namespace pmt;
 
 gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
   : d_exec(block)
 {
   // std::cerr << "gr_tpb_thread_body: " << block << std::endl;
 
-  gr_block_detail      *d = block->detail().get();
+  gr_block_detail *d = block->detail().get();
   gr_block_executor::state s;
 
   while (1){
     boost::this_thread::interruption_point();
+    while (!block->msg_queue()->empty_p())
+      block->handle_msg(block->msg_queue()->delete_head_nowait());
 
     d->d_tpb.clear_changed();
     s = d_exec.run_one_iteration();
@@ -53,18 +59,37 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
       return;
 
     case gr_block_executor::BLKD_IN:           // Wait for input.
+      while (!d->d_tpb.input_changed) 
       {
+       boost::this_thread::interruption_point();
        gruel::scoped_lock guard(d->d_tpb.mutex);
-       while(!d->d_tpb.input_changed)
-         d->d_tpb.input_cond.wait(guard);
+       
+       // Block then wake on input_changed or msg arrived
+       while(!d->d_tpb.input_changed && !block->msg_queue()->empty_p())
+         d->d_tpb.input_cond.wait(guard); 
+
+       // Run msgq while unlocked
+       guard.unlock();
+       while (!block->msg_queue()->empty_p())
+         block->handle_msg(block->msg_queue()->delete_head_nowait());
       }
       break;
+
       
     case gr_block_executor::BLKD_OUT:          // Wait for output buffer space.
+      while (!d->d_tpb.output_changed) 
       {
+       boost::this_thread::interruption_point();
        gruel::scoped_lock guard(d->d_tpb.mutex);
-       while(!d->d_tpb.output_changed)
-         d->d_tpb.output_cond.wait(guard);
+
+       // Block then wake on output_changed or msg arrived
+       while(!d->d_tpb.output_changed && !block->msg_queue()->empty_p())
+         d->d_tpb.output_cond.wait(guard); 
+
+       // Run msgq while unlocked
+       guard.unlock();
+       while (!block->msg_queue()->empty_p())
+         block->handle_msg(block->msg_queue()->delete_head_nowait());
       }
       break;