// Let parent class do whatever it would have
gruel::msg_accepter_msgq::post(msg);
- // Notify this block's scheduler a message is pending
+ // Notify derived class, handled case by case
gr_block *p = dynamic_cast<gr_block *>(this);
- if (p)
+ if (p) {
p->detail()->d_tpb.notify_msg();
- else {
- // got here somehow with a non-gr_block
- throw std::runtime_error("gr_msg_accepter::post() - invalid derived class");
+ return;
}
+
+ // Test for other derived classes and handle
}
gruel::condition_variable input_cond;
bool output_changed;
gruel::condition_variable output_cond;
- bool msg_pending;
gr_tpb_detail()
- : input_changed(false), output_changed(false), msg_pending(false) { }
+ : input_changed(false), output_changed(false) { }
//! Called by us to tell all our upstream blocks that their output may have changed.
void notify_upstream(gr_block_detail *d);
void notify_msg()
{
gruel::scoped_lock guard(mutex);
- msg_pending = true;
+
+ // Just wake up thread if BLKD_IN or BLKD_OUT
input_cond.notify_one();
output_cond.notify_one();
}
#include <gr_tpb_thread_body.h>
#include <iostream>
#include <boost/thread.hpp>
+#include <gruel/pmt.h>
+
+using namespace pmt;
gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
: d_exec(block)
{
// std::cerr << "gr_tpb_thread_body: " << block << std::endl;
- gr_block_detail *d = block->detail().get();
+ gr_block_detail *d = block->detail().get();
gr_block_executor::state s;
while (1){
boost::this_thread::interruption_point();
+
+ while (!block->msg_queue()->empty_p())
+ block->handle_msg(block->msg_queue()->delete_head_nowait());
d->d_tpb.clear_changed();
s = d_exec.run_one_iteration();
return;
case gr_block_executor::BLKD_IN: // Wait for input.
+ while (!d->d_tpb.input_changed)
{
+ boost::this_thread::interruption_point();
gruel::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.input_changed)
- d->d_tpb.input_cond.wait(guard);
+
+ // Block then wake on input_changed or msg arrived
+ while(!d->d_tpb.input_changed && !block->msg_queue()->empty_p())
+ d->d_tpb.input_cond.wait(guard);
+
+ // Run msgq while unlocked
+ guard.unlock();
+ while (!block->msg_queue()->empty_p())
+ block->handle_msg(block->msg_queue()->delete_head_nowait());
}
break;
+
case gr_block_executor::BLKD_OUT: // Wait for output buffer space.
+ while (!d->d_tpb.output_changed)
{
+ boost::this_thread::interruption_point();
gruel::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.output_changed)
- d->d_tpb.output_cond.wait(guard);
+
+ // Block then wake on output_changed or msg arrived
+ while(!d->d_tpb.output_changed && !block->msg_queue()->empty_p())
+ d->d_tpb.output_cond.wait(guard);
+
+ // Run msgq while unlocked
+ guard.unlock();
+ while (!block->msg_queue()->empty_p())
+ block->handle_msg(block->msg_queue()->delete_head_nowait());
}
break;