From 1181c2fe069075f9ceb3b66ed937115ff39aafa9 Mon Sep 17 00:00:00 2001 From: Eric Blossom Date: Thu, 13 Aug 2009 21:50:06 -0700 Subject: [PATCH] Refactored gr_msg_accepter and gr_tpd_thread_body. Redirected gr_msg_accepter::post into gr_block::_post based on dynamic cast. --- .../src/lib/runtime/gr_basic_block.cc | 3 +- .../src/lib/runtime/gr_block_detail.cc | 7 +++ .../src/lib/runtime/gr_block_detail.h | 6 +++ .../src/lib/runtime/gr_msg_accepter.cc | 17 ++++--- .../src/lib/runtime/gr_msg_accepter.h | 10 ++-- .../src/lib/runtime/gr_tpb_detail.cc | 45 +++++++++++++++- gnuradio-core/src/lib/runtime/gr_tpb_detail.h | 30 ++++++++--- .../src/lib/runtime/gr_tpb_thread_body.cc | 51 +++++++++++-------- gnuradio-examples/python/pfb/.gitignore | 2 + gruel/src/include/gruel/Makefile.am | 1 + gruel/src/include/gruel/msg_accepter.h | 13 +++-- gruel/src/include/gruel/send.h | 49 ++++++++++++++++++ 12 files changed, 187 insertions(+), 47 deletions(-) create mode 100644 gnuradio-examples/python/pfb/.gitignore create mode 100644 gruel/src/include/gruel/send.h diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc index 8efa8267..2fa1066c 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc @@ -41,8 +41,7 @@ gr_basic_block_ncurrently_allocated() gr_basic_block::gr_basic_block(const std::string &name, gr_io_signature_sptr input_signature, gr_io_signature_sptr output_signature) - : gr_msg_accepter(gruel::make_msg_queue(0)), // Non-blocking insert - d_name(name), + : d_name(name), d_input_signature(input_signature), d_output_signature(output_signature), d_unique_id(s_next_id++), diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc index ae1ea256..d33dfed8 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc @@ -106,3 +106,10 @@ gr_block_detail::produce_each (int how_many_items) for (int i = 0; i < noutputs (); i++) d_output[i]->update_write_pointer (how_many_items); } + + +void +gr_block_detail::_post(pmt::pmt_t msg) +{ + d_tpb.insert_tail(msg); +} diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h index 2856c402..9d635860 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h @@ -79,6 +79,12 @@ class gr_block_detail { void produce_each (int how_many_items); + /*! + * Accept msg, place in queue, arrange for thread to be awakened if it's not already. + */ + void _post(pmt::pmt_t msg); + + gr_tpb_detail d_tpb; // used by thread-per-block scheduler // ---------------------------------------------------------------------------- diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc index 50b41df8..89876ae2 100644 --- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc +++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc @@ -26,11 +26,12 @@ #include #include #include +#include +#include using namespace pmt; -gr_msg_accepter::gr_msg_accepter(gruel::msg_queue_sptr msgq) - : gruel::msg_accepter_msgq(msgq) +gr_msg_accepter::gr_msg_accepter() { } @@ -42,15 +43,17 @@ gr_msg_accepter::~gr_msg_accepter() void gr_msg_accepter::post(pmt_t msg) { - // Let parent class do whatever it would have - gruel::msg_accepter_msgq::post(msg); - // Notify derived class, handled case by case gr_block *p = dynamic_cast(this); if (p) { - p->detail()->d_tpb.notify_msg(); + p->detail()->_post(msg); + return; + } + gr_hier_block2 *p2 = dynamic_cast(this); + if (p2){ + // FIXME do the right thing return; } - // Test for other derived classes and handle + throw std::runtime_error("unknown derived class"); } diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h index 2073e7ff..79a631f3 100644 --- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h +++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h @@ -22,19 +22,21 @@ #ifndef INCLUDED_GR_MSG_ACCEPTER_H #define INCLUDED_GR_MSG_ACCEPTER_H -#include +#include +#include /*! * \brief Accepts messages and inserts them into a message queue, then notifies * subclass gr_basic_block there is a message pending. */ -class gr_msg_accepter : public gruel::msg_accepter_msgq +class gr_msg_accepter : public gruel::msg_accepter { public: - gr_msg_accepter(gruel::msg_queue_sptr msgq); + gr_msg_accepter(); ~gr_msg_accepter(); - + void post(pmt::pmt_t msg); + }; #endif /* INCLUDED_GR_MSG_ACCEPTER_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc index 02e8deed..c6311cca 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2008 Free Software Foundation, Inc. + * Copyright 2008,2009 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -27,6 +27,8 @@ #include #include +using namespace pmt; + /* * We assume that no worker threads are ever running when the * graph structure is being manipulated, thus it's safe for us to poke @@ -65,3 +67,44 @@ gr_tpb_detail::notify_neighbors(gr_block_detail *d) notify_downstream(d); notify_upstream(d); } + +void +gr_tpb_detail::insert_tail(pmt::pmt_t msg) +{ + gruel::scoped_lock guard(mutex); + + msg_queue.push_back(msg); + + // wake up thread if BLKD_IN or BLKD_OUT + input_cond.notify_one(); + output_cond.notify_one(); +} + +pmt_t +gr_tpb_detail::delete_head_nowait() +{ + gruel::scoped_lock guard(mutex); + + if (empty_p()) + return pmt_t(); + + pmt_t m(msg_queue.front()); + msg_queue.pop_front(); + + return m; +} + +/* + * Caller must already be holding the mutex + */ +pmt_t +gr_tpb_detail::delete_head_nowait_already_holding_mutex() +{ + if (empty_p()) + return pmt_t(); + + pmt_t m(msg_queue.front()); + msg_queue.pop_front(); + + return m; +} diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h index a1df5580..acfa264c 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h @@ -22,6 +22,8 @@ #define INCLUDED_GR_TPB_DETAIL_H #include +#include +#include class gr_block_detail; @@ -36,6 +38,10 @@ struct gr_tpb_detail { bool output_changed; gruel::condition_variable output_cond; +private: + std::deque msg_queue; + +public: gr_tpb_detail() : input_changed(false), output_changed(false) { } @@ -55,16 +61,23 @@ struct gr_tpb_detail { input_changed = false; output_changed = false; } + + //! is the queue empty? + bool empty_p() const { return msg_queue.empty(); } - //! Called to notify us that a message is pending in the queue - void notify_msg() - { - gruel::scoped_lock guard(mutex); + //| Acquires and release the mutex + void insert_tail(pmt::pmt_t msg); - // Just wake up thread if BLKD_IN or BLKD_OUT - input_cond.notify_one(); - output_cond.notify_one(); - } + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + */ + pmt::pmt_t delete_head_nowait(); + + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + * Caller must already be holding the mutex + */ + pmt::pmt_t delete_head_nowait_already_holding_mutex(); private: @@ -83,6 +96,7 @@ private: output_changed = true; output_cond.notify_one(); } + }; #endif /* INCLUDED_GR_TPB_DETAIL_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc index c601b588..03eef17d 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -35,12 +35,15 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block) gr_block_detail *d = block->detail().get(); gr_block_executor::state s; + pmt_t msg; + while (1){ boost::this_thread::interruption_point(); - while (!block->msg_queue()->empty_p()) - block->handle_msg(block->msg_queue()->delete_head_nowait()); + // handle any queued up messages + while ((msg = d->d_tpb.delete_head_nowait())) + block->handle_msg(msg); d->d_tpb.clear_changed(); s = d_exec.run_one_iteration(); @@ -59,37 +62,41 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block) return; case gr_block_executor::BLKD_IN: // Wait for input. - while (!d->d_tpb.input_changed) { - boost::this_thread::interruption_point(); gruel::scoped_lock guard(d->d_tpb.mutex); - - // Block then wake on input_changed or msg arrived - while(!d->d_tpb.input_changed && !block->msg_queue()->empty_p()) - d->d_tpb.input_cond.wait(guard); + while (!d->d_tpb.input_changed){ + + // wait for input or message + while(!d->d_tpb.input_changed && d->d_tpb.empty_p()) + d->d_tpb.input_cond.wait(guard); - // Run msgq while unlocked - guard.unlock(); - while (!block->msg_queue()->empty_p()) - block->handle_msg(block->msg_queue()->delete_head_nowait()); + // handle all pending messages + while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){ + guard.unlock(); // release lock while processing msg + block->handle_msg(msg); + guard.lock(); + } + } } break; case gr_block_executor::BLKD_OUT: // Wait for output buffer space. - while (!d->d_tpb.output_changed) { - boost::this_thread::interruption_point(); gruel::scoped_lock guard(d->d_tpb.mutex); + while (!d->d_tpb.output_changed){ + + // wait for output room or message + while(!d->d_tpb.output_changed && d->d_tpb.empty_p()) + d->d_tpb.output_cond.wait(guard); - // Block then wake on output_changed or msg arrived - while(!d->d_tpb.output_changed && !block->msg_queue()->empty_p()) - d->d_tpb.output_cond.wait(guard); - - // Run msgq while unlocked - guard.unlock(); - while (!block->msg_queue()->empty_p()) - block->handle_msg(block->msg_queue()->delete_head_nowait()); + // handle all pending messages + while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){ + guard.unlock(); // release lock while processing msg + block->handle_msg(msg); + guard.lock(); + } + } } break; diff --git a/gnuradio-examples/python/pfb/.gitignore b/gnuradio-examples/python/pfb/.gitignore new file mode 100644 index 00000000..282522db --- /dev/null +++ b/gnuradio-examples/python/pfb/.gitignore @@ -0,0 +1,2 @@ +Makefile +Makefile.in diff --git a/gruel/src/include/gruel/Makefile.am b/gruel/src/include/gruel/Makefile.am index c38c7fa3..9f50cb61 100644 --- a/gruel/src/include/gruel/Makefile.am +++ b/gruel/src/include/gruel/Makefile.am @@ -35,6 +35,7 @@ gruelinclude_HEADERS = \ pmt_pool.h \ pmt_serial_tags.h \ realtime.h \ + send.h \ sys_pri.h \ thread_body_wrapper.h \ thread_group.h \ diff --git a/gruel/src/include/gruel/msg_accepter.h b/gruel/src/include/gruel/msg_accepter.h index bc287afa..3afd6dde 100644 --- a/gruel/src/include/gruel/msg_accepter.h +++ b/gruel/src/include/gruel/msg_accepter.h @@ -18,8 +18,8 @@ * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef INCLUDED_MSG_ACCEPTER_H -#define INCLUDED_MSG_ACCEPTER_H +#ifndef INCLUDED_GRUEL_MSG_ACCEPTER_H +#define INCLUDED_GRUEL_MSG_ACCEPTER_H #include @@ -34,9 +34,16 @@ namespace gruel { msg_accepter() {}; virtual ~msg_accepter(); + /*! + * \brief send \p msg to \p msg_accepter + * + * Sending a message is an asynchronous operation. The \p post + * call will not wait for the message either to arrive at the + * destination or to be received. + */ virtual void post(pmt::pmt_t msg) = 0; }; } /* namespace gruel */ -#endif /* INCLUDED_MSG_ACCEPTER_H */ +#endif /* INCLUDED_GRUEL_MSG_ACCEPTER_H */ diff --git a/gruel/src/include/gruel/send.h b/gruel/src/include/gruel/send.h new file mode 100644 index 00000000..292017d4 --- /dev/null +++ b/gruel/src/include/gruel/send.h @@ -0,0 +1,49 @@ +/* -*- c++ -*- */ +/* + * Copyright 2009 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef INCLUDED_GRUEL_SEND_H +#define INCLUDED_GRUEL_SEND_H + +#include + +namespace gruel { + + + /*! + * \brief send \p msg to \p msg_accepter + * + * Sending a message is an asynchronous operation. The \p send + * call will not wait for the message either to arrive at the + * destination or to be received. + * + * \returns msg + */ + static inline pmt::pmt_t + send(msg_accepter &acc, pmt::pmt_t msg) + { + return acc.post(msg); + } + + + +} /* namespace gruel */ + + +#endif /* INCLUDED_SEND_H */ -- 2.30.2