Refactored gr_msg_accepter and gr_tpd_thread_body.
authorEric Blossom <eb@comsec.com>
Fri, 14 Aug 2009 04:50:06 +0000 (21:50 -0700)
committerEric Blossom <eb@comsec.com>
Fri, 14 Aug 2009 04:50:06 +0000 (21:50 -0700)
Redirected gr_msg_accepter::post into gr_block::_post based on dynamic cast.

12 files changed:
gnuradio-core/src/lib/runtime/gr_basic_block.cc
gnuradio-core/src/lib/runtime/gr_block_detail.cc
gnuradio-core/src/lib/runtime/gr_block_detail.h
gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
gnuradio-core/src/lib/runtime/gr_msg_accepter.h
gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
gnuradio-core/src/lib/runtime/gr_tpb_detail.h
gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
gnuradio-examples/python/pfb/.gitignore [new file with mode: 0644]
gruel/src/include/gruel/Makefile.am
gruel/src/include/gruel/msg_accepter.h
gruel/src/include/gruel/send.h [new file with mode: 0644]

index 8efa8267a4e8e36ec8838d97965614b4587b62a3..2fa1066cb9b99f5045d48d5de7dbd4a059d0f582 100644 (file)
@@ -41,8 +41,7 @@ gr_basic_block_ncurrently_allocated()
 gr_basic_block::gr_basic_block(const std::string &name,
                                gr_io_signature_sptr input_signature,
                                gr_io_signature_sptr output_signature) 
-  : gr_msg_accepter(gruel::make_msg_queue(0)), // Non-blocking insert
-    d_name(name),
+  : d_name(name),
     d_input_signature(input_signature),
     d_output_signature(output_signature),
     d_unique_id(s_next_id++),
index ae1ea25628d63631df41dcd4924fa85cdfa1a8ed..d33dfed8461da2903be450d20b8de9fd4316de53 100644 (file)
@@ -106,3 +106,10 @@ gr_block_detail::produce_each (int how_many_items)
     for (int i = 0; i < noutputs (); i++)
       d_output[i]->update_write_pointer (how_many_items);
 }
+
+
+void
+gr_block_detail::_post(pmt::pmt_t msg)
+{
+  d_tpb.insert_tail(msg);
+}
index 2856c402c7a7925a5d8f762adc9d2eb7f5861633..9d63586024fd57030557951a2286476b4c1a2687 100644 (file)
@@ -79,6 +79,12 @@ class gr_block_detail {
   void produce_each (int how_many_items);
 
 
+  /*!
+   * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+   */
+  void _post(pmt::pmt_t msg);
+
+
   gr_tpb_detail                             d_tpb;     // used by thread-per-block scheduler
 
   // ----------------------------------------------------------------------------
index 50b41df88b1a23f668af1e99383f6a2b2832e1f3..89876ae2973549ad3c8ea1d4713150200e7d4466 100644 (file)
 #include <gr_msg_accepter.h>
 #include <gr_block.h>
 #include <gr_block_detail.h>
+#include <gr_hier_block2.h>
+#include <stdexcept>
 
 using namespace pmt;
 
-gr_msg_accepter::gr_msg_accepter(gruel::msg_queue_sptr msgq)
-  : gruel::msg_accepter_msgq(msgq)
+gr_msg_accepter::gr_msg_accepter()
 {
 }
 
@@ -42,15 +43,17 @@ gr_msg_accepter::~gr_msg_accepter()
 void
 gr_msg_accepter::post(pmt_t msg)
 {
-  // Let parent class do whatever it would have
-  gruel::msg_accepter_msgq::post(msg);
-
   // Notify derived class, handled case by case
   gr_block *p = dynamic_cast<gr_block *>(this);
   if (p) { 
-    p->detail()->d_tpb.notify_msg();
+    p->detail()->_post(msg);
+    return;
+  }
+  gr_hier_block2 *p2 = dynamic_cast<gr_hier_block2 *>(this);
+  if (p2){
+    // FIXME do the right thing
     return;
   }
 
-  // Test for other derived classes and handle
+  throw std::runtime_error("unknown derived class");
 }
index 2073e7ff16ee166ef1257cf7d94934be850a824e..79a631f3a6db4de4e9f558c225ed7196687231ad 100644 (file)
 #ifndef INCLUDED_GR_MSG_ACCEPTER_H
 #define INCLUDED_GR_MSG_ACCEPTER_H
 
-#include <gruel/msg_accepter_msgq.h>
+#include <gruel/msg_accepter.h>
+#include <gruel/pmt.h>
 
 /*!
  * \brief Accepts messages and inserts them into a message queue, then notifies
  * subclass gr_basic_block there is a message pending.
  */
-class gr_msg_accepter : public gruel::msg_accepter_msgq
+class gr_msg_accepter : public gruel::msg_accepter
 {
 public:
-  gr_msg_accepter(gruel::msg_queue_sptr msgq);
+  gr_msg_accepter();
   ~gr_msg_accepter();
-  
+
   void post(pmt::pmt_t msg);
+
 };
 
 #endif /* INCLUDED_GR_MSG_ACCEPTER_H */
index 02e8deed88b90a8fda7326977ab6279c918995fe..c6311ccaa3efefd9ef62342231b0ad73aadab789 100644 (file)
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2008 Free Software Foundation, Inc.
+ * Copyright 2008,2009 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
@@ -27,6 +27,8 @@
 #include <gr_block_detail.h>
 #include <gr_buffer.h>
 
+using namespace pmt;
+
 /*
  * We assume that no worker threads are ever running when the
  * graph structure is being manipulated, thus it's safe for us to poke
@@ -65,3 +67,44 @@ gr_tpb_detail::notify_neighbors(gr_block_detail *d)
   notify_downstream(d);
   notify_upstream(d);
 }
+
+void
+gr_tpb_detail::insert_tail(pmt::pmt_t msg)
+{
+  gruel::scoped_lock guard(mutex);
+
+  msg_queue.push_back(msg);
+
+  // wake up thread if BLKD_IN or BLKD_OUT
+  input_cond.notify_one();
+  output_cond.notify_one();
+}
+
+pmt_t 
+gr_tpb_detail::delete_head_nowait()
+{
+  gruel::scoped_lock guard(mutex);
+
+  if (empty_p())
+    return pmt_t();
+
+  pmt_t m(msg_queue.front());
+  msg_queue.pop_front();
+
+  return m;
+}
+
+/*
+ * Caller must already be holding the mutex
+ */
+pmt_t 
+gr_tpb_detail::delete_head_nowait_already_holding_mutex()
+{
+  if (empty_p())
+    return pmt_t();
+
+  pmt_t m(msg_queue.front());
+  msg_queue.pop_front();
+
+  return m;
+}
index a1df55806eb9b88e5db04034d570404e66d3ccc7..acfa264c7c412d02a39ce3f6d1c7e32dd48f92dc 100644 (file)
@@ -22,6 +22,8 @@
 #define INCLUDED_GR_TPB_DETAIL_H
 
 #include <gruel/thread.h>
+#include <deque>
+#include <gruel/pmt.h>
 
 class gr_block_detail;
 
@@ -36,6 +38,10 @@ struct gr_tpb_detail {
   bool                         output_changed;
   gruel::condition_variable    output_cond;
 
+private:
+  std::deque<pmt::pmt_t>       msg_queue;
+
+public:
   gr_tpb_detail()
     : input_changed(false), output_changed(false) { }
 
@@ -55,16 +61,23 @@ struct gr_tpb_detail {
     input_changed = false;
     output_changed = false;
   }
+  
+  //! is the queue empty?
+  bool empty_p() const { return msg_queue.empty(); }
 
-  //! Called to notify us that a message is pending in the queue
-  void notify_msg()
-  {
-    gruel::scoped_lock guard(mutex);
+  //| Acquires and release the mutex   
+  void insert_tail(pmt::pmt_t msg);
 
-    // Just wake up thread if BLKD_IN or BLKD_OUT
-    input_cond.notify_one();
-    output_cond.notify_one();
-  }
+  /*!
+   * \returns returns pmt at head of queue or pmt_t() if empty.
+   */
+  pmt::pmt_t delete_head_nowait();
+
+  /*!
+   * \returns returns pmt at head of queue or pmt_t() if empty.
+   * Caller must already be holding the mutex
+   */
+  pmt::pmt_t delete_head_nowait_already_holding_mutex();
 
 private:
 
@@ -83,6 +96,7 @@ private:
     output_changed = true;
     output_cond.notify_one();
   }
+
 };
 
 #endif /* INCLUDED_GR_TPB_DETAIL_H */
index c601b588c2d66dfb87ef47ea672918e99ff3351e..03eef17d937f63927f4723bc623b0474af17357c 100644 (file)
@@ -35,12 +35,15 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
 
   gr_block_detail *d = block->detail().get();
   gr_block_executor::state s;
+  pmt_t msg;
+
 
   while (1){
     boost::this_thread::interruption_point();
  
-    while (!block->msg_queue()->empty_p())
-      block->handle_msg(block->msg_queue()->delete_head_nowait());
+    // handle any queued up messages
+    while ((msg = d->d_tpb.delete_head_nowait()))
+      block->handle_msg(msg);
 
     d->d_tpb.clear_changed();
     s = d_exec.run_one_iteration();
@@ -59,37 +62,41 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
       return;
 
     case gr_block_executor::BLKD_IN:           // Wait for input.
-      while (!d->d_tpb.input_changed) 
       {
-       boost::this_thread::interruption_point();
        gruel::scoped_lock guard(d->d_tpb.mutex);
-       
-       // Block then wake on input_changed or msg arrived
-       while(!d->d_tpb.input_changed && !block->msg_queue()->empty_p())
-         d->d_tpb.input_cond.wait(guard); 
+       while (!d->d_tpb.input_changed){
+         
+         // wait for input or message
+         while(!d->d_tpb.input_changed && d->d_tpb.empty_p())
+           d->d_tpb.input_cond.wait(guard);
 
-       // Run msgq while unlocked
-       guard.unlock();
-       while (!block->msg_queue()->empty_p())
-         block->handle_msg(block->msg_queue()->delete_head_nowait());
+         // handle all pending messages
+         while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+           guard.unlock();                     // release lock while processing msg
+           block->handle_msg(msg);
+           guard.lock();
+         }
+       }
       }
       break;
 
       
     case gr_block_executor::BLKD_OUT:          // Wait for output buffer space.
-      while (!d->d_tpb.output_changed) 
       {
-       boost::this_thread::interruption_point();
        gruel::scoped_lock guard(d->d_tpb.mutex);
+       while (!d->d_tpb.output_changed){
+         
+         // wait for output room or message
+         while(!d->d_tpb.output_changed && d->d_tpb.empty_p())
+           d->d_tpb.output_cond.wait(guard);
 
-       // Block then wake on output_changed or msg arrived
-       while(!d->d_tpb.output_changed && !block->msg_queue()->empty_p())
-         d->d_tpb.output_cond.wait(guard); 
-
-       // Run msgq while unlocked
-       guard.unlock();
-       while (!block->msg_queue()->empty_p())
-         block->handle_msg(block->msg_queue()->delete_head_nowait());
+         // handle all pending messages
+         while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+           guard.unlock();                     // release lock while processing msg
+           block->handle_msg(msg);
+           guard.lock();
+         }
+       }
       }
       break;
 
diff --git a/gnuradio-examples/python/pfb/.gitignore b/gnuradio-examples/python/pfb/.gitignore
new file mode 100644 (file)
index 0000000..282522d
--- /dev/null
@@ -0,0 +1,2 @@
+Makefile
+Makefile.in
index c38c7fa3833d842a75e423446867ea6155492a90..9f50cb619f3e391cfa406d8a1611c4a2ae3d5790 100644 (file)
@@ -35,6 +35,7 @@ gruelinclude_HEADERS = \
        pmt_pool.h \
        pmt_serial_tags.h \
        realtime.h \
+       send.h \
        sys_pri.h \
        thread_body_wrapper.h \
        thread_group.h \
index bc287afae7c825fd84b0c5f7d2b4be7ccaa3c624..3afd6dde08fb4616354c6af017fc8968e8246277 100644 (file)
@@ -18,8 +18,8 @@
  * with this program; if not, write to the Free Software Foundation, Inc.,
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
-#ifndef INCLUDED_MSG_ACCEPTER_H
-#define INCLUDED_MSG_ACCEPTER_H
+#ifndef INCLUDED_GRUEL_MSG_ACCEPTER_H
+#define INCLUDED_GRUEL_MSG_ACCEPTER_H
 
 #include <gruel/pmt.h>
 
@@ -34,9 +34,16 @@ namespace gruel {
     msg_accepter() {};
     virtual ~msg_accepter();
 
+    /*!
+     * \brief send \p msg to \p msg_accepter
+     *
+     * Sending a message is an asynchronous operation.  The \p post
+     * call will not wait for the message either to arrive at the
+     * destination or to be received.
+     */
     virtual void post(pmt::pmt_t msg) = 0;
   };
 
 } /* namespace gruel */
 
-#endif /* INCLUDED_MSG_ACCEPTER_H */
+#endif /* INCLUDED_GRUEL_MSG_ACCEPTER_H */
diff --git a/gruel/src/include/gruel/send.h b/gruel/src/include/gruel/send.h
new file mode 100644 (file)
index 0000000..292017d
--- /dev/null
@@ -0,0 +1,49 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2009 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GRUEL_SEND_H
+#define INCLUDED_GRUEL_SEND_H
+
+#include <gruel/msg_accepter.h>
+
+namespace gruel {
+
+
+  /*!
+   * \brief send \p msg to \p msg_accepter
+   *
+   * Sending a message is an asynchronous operation.  The \p send
+   * call will not wait for the message either to arrive at the
+   * destination or to be received.
+   *
+   * \returns msg
+   */
+  static inline pmt::pmt_t
+  send(msg_accepter &acc, pmt::pmt_t msg)
+  {
+    return acc.post(msg);
+  }
+
+
+
+} /* namespace gruel */
+
+
+#endif /* INCLUDED_SEND_H */