Merge branch 'upstream' into dfsg-orig
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_tpb_thread_body.cc
index e3abf6d8446bc62b6257ea3169a27ac25227112c..03eef17d937f63927f4723bc623b0474af17357c 100644 (file)
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2008 Free Software Foundation, Inc.
+ * Copyright 2008,2009 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
 #include <gr_tpb_thread_body.h>
 #include <iostream>
 #include <boost/thread.hpp>
+#include <gruel/pmt.h>
+
+using namespace pmt;
 
 gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
   : d_exec(block)
 {
   // std::cerr << "gr_tpb_thread_body: " << block << std::endl;
 
-  gr_block_detail      *d = block->detail().get();
+  gr_block_detail *d = block->detail().get();
   gr_block_executor::state s;
+  pmt_t msg;
+
 
   while (1){
     boost::this_thread::interruption_point();
+    // handle any queued up messages
+    while ((msg = d->d_tpb.delete_head_nowait()))
+      block->handle_msg(msg);
 
     d->d_tpb.clear_changed();
     s = d_exec.run_one_iteration();
@@ -54,17 +63,40 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
 
     case gr_block_executor::BLKD_IN:           // Wait for input.
       {
-       gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
-       while(!d->d_tpb.input_changed)
-         d->d_tpb.input_cond.wait(guard);
+       gruel::scoped_lock guard(d->d_tpb.mutex);
+       while (!d->d_tpb.input_changed){
+         
+         // wait for input or message
+         while(!d->d_tpb.input_changed && d->d_tpb.empty_p())
+           d->d_tpb.input_cond.wait(guard);
+
+         // handle all pending messages
+         while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+           guard.unlock();                     // release lock while processing msg
+           block->handle_msg(msg);
+           guard.lock();
+         }
+       }
       }
       break;
+
       
     case gr_block_executor::BLKD_OUT:          // Wait for output buffer space.
       {
-       gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
-       while(!d->d_tpb.output_changed)
-         d->d_tpb.output_cond.wait(guard);
+       gruel::scoped_lock guard(d->d_tpb.mutex);
+       while (!d->d_tpb.output_changed){
+         
+         // wait for output room or message
+         while(!d->d_tpb.output_changed && d->d_tpb.empty_p())
+           d->d_tpb.output_cond.wait(guard);
+
+         // handle all pending messages
+         while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+           guard.unlock();                     // release lock while processing msg
+           block->handle_msg(msg);
+           guard.lock();
+         }
+       }
       }
       break;