/* -*- c++ -*- */
/*
- * Copyright 2008 Free Software Foundation, Inc.
+ * Copyright 2008,2009 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
#include <gr_tpb_thread_body.h>
#include <iostream>
#include <boost/thread.hpp>
+#include <gruel/pmt.h>
+
+using namespace pmt;
gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
: d_exec(block)
{
// std::cerr << "gr_tpb_thread_body: " << block << std::endl;
- gr_block_detail *d = block->detail().get();
+ gr_block_detail *d = block->detail().get();
gr_block_executor::state s;
+ pmt_t msg;
+
while (1){
boost::this_thread::interruption_point();
+
+ // handle any queued up messages
+ while ((msg = d->d_tpb.delete_head_nowait()))
+ block->handle_msg(msg);
d->d_tpb.clear_changed();
s = d_exec.run_one_iteration();
case gr_block_executor::BLKD_IN: // Wait for input.
{
- gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.input_changed)
- d->d_tpb.input_cond.wait(guard);
+ gruel::scoped_lock guard(d->d_tpb.mutex);
+ while (!d->d_tpb.input_changed){
+
+ // wait for input or message
+ while(!d->d_tpb.input_changed && d->d_tpb.empty_p())
+ d->d_tpb.input_cond.wait(guard);
+
+ // handle all pending messages
+ while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+ guard.unlock(); // release lock while processing msg
+ block->handle_msg(msg);
+ guard.lock();
+ }
+ }
}
break;
+
case gr_block_executor::BLKD_OUT: // Wait for output buffer space.
{
- gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.output_changed)
- d->d_tpb.output_cond.wait(guard);
+ gruel::scoped_lock guard(d->d_tpb.mutex);
+ while (!d->d_tpb.output_changed){
+
+ // wait for output room or message
+ while(!d->d_tpb.output_changed && d->d_tpb.empty_p())
+ d->d_tpb.output_cond.wait(guard);
+
+ // handle all pending messages
+ while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+ guard.unlock(); // release lock while processing msg
+ block->handle_msg(msg);
+ guard.lock();
+ }
+ }
}
break;