Merged features/mp-sched -r8915:9335 into the trunk. The trunk now
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_top_block_impl.cc
index 5914379384cfd51e5b61617cec3c11249134a613..50d480d009e80875b02744740e84917837a33e0e 100644 (file)
 #include <gr_top_block.h>
 #include <gr_top_block_impl.h>
 #include <gr_flat_flowgraph.h>
-#include <gr_scheduler_thread.h>
-#include <gr_local_sighandler.h>
+#include <gr_scheduler_sts.h>
+#include <gr_scheduler_tpb.h>
 
 #include <stdexcept>
 #include <iostream>
 #include <string.h>
 #include <unistd.h>
+#include <stdlib.h>
 
 #define GR_TOP_BLOCK_IMPL_DEBUG 0
 
+
+typedef gr_scheduler_sptr (*scheduler_maker)(gr_flat_flowgraph_sptr ffg);
+
+static struct scheduler_table {
+  const char          *name;
+  scheduler_maker      f;
+} scheduler_table[] = {
+  { "TPB",     gr_scheduler_tpb::make },       // first entry is default
+  { "STS",     gr_scheduler_sts::make }
+};
+
+static gr_scheduler_sptr
+make_scheduler(gr_flat_flowgraph_sptr ffg)
+{
+  static scheduler_maker  factory = 0;
+
+  if (factory == 0){
+    char *v = getenv("GR_SCHEDULER");
+    if (!v)
+      factory = scheduler_table[0].f;  // use default
+    else {
+      for (size_t i = 0; i < sizeof(scheduler_table)/sizeof(scheduler_table[0]); i++){
+       if (strcmp(v, scheduler_table[i].name) == 0){
+         factory = scheduler_table[i].f;
+         break;
+       }
+      }
+      if (factory == 0){
+       std::cerr << "warning: Invalid GR_SCHEDULER environment variable value \""
+                 << v << "\".  Using \"" << scheduler_table[0].name << "\"\n";
+       factory = scheduler_table[0].f;
+      }
+    }
+  }
+  return factory(ffg);
+}
+
+
 gr_top_block_impl::gr_top_block_impl(gr_top_block *owner) 
-  : d_owner(owner),
-    d_running(false),
-    d_ffg(),
-    d_lock_count(0)
+  : d_owner(owner), d_ffg(),
+    d_state(IDLE), d_lock_count(0)
 {
 }
 
@@ -53,14 +90,13 @@ gr_top_block_impl::~gr_top_block_impl()
 void
 gr_top_block_impl::start()
 {
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "start: entered " << this << std::endl;
+  gr_lock_guard        l(d_mutex);
 
-  if (d_running)
+  if (d_state != IDLE)
     throw std::runtime_error("top_block::start: top block already running or wait() not called after previous stop()");
 
   if (d_lock_count > 0)
-    throw std::runtime_error("top_block::start: can't call start with flow graph locked");
+    throw std::runtime_error("top_block::start: can't start with flow graph locked");
 
   // Create new flat flow graph by flattening hierarchy
   d_ffg = d_owner->flatten();
@@ -69,77 +105,71 @@ gr_top_block_impl::start()
   d_ffg->validate();
   d_ffg->setup_connections();
 
-  // Execute scheduler threads
-  start_threads();
-  d_running = true;
+  d_scheduler = make_scheduler(d_ffg);
+  d_state = RUNNING;
 }
 
+void 
+gr_top_block_impl::stop()
+{
+  if (d_scheduler)
+    d_scheduler->stop();
+}
+
+
+void
+gr_top_block_impl::wait()
+{
+  if (d_scheduler)
+    d_scheduler->wait();
+
+  d_state = IDLE;
+}
 
 // N.B. lock() and unlock() cannot be called from a flow graph thread or
 // deadlock will occur when reconfiguration happens
 void
 gr_top_block_impl::lock()
 {
-  omni_mutex_lock lock(d_reconf);
+  gr_lock_guard lock(d_mutex);
   d_lock_count++;
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "runtime: locked, count = " << d_lock_count <<  std::endl;
 }
 
 void
 gr_top_block_impl::unlock()
 {
-  omni_mutex_lock lock(d_reconf);
+  gr_lock_guard lock(d_mutex);
+
   if (d_lock_count <= 0){
     d_lock_count = 0;          // fix it, then complain
     throw std::runtime_error("unpaired unlock() call");
   }
 
   d_lock_count--;
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "unlock: unlocked, count = " << d_lock_count << std::endl;
+  if (d_lock_count > 0 || d_state == IDLE) // nothing to do
+    return;
 
-  if (d_lock_count == 0) {
-    if (GR_TOP_BLOCK_IMPL_DEBUG)
-      std::cout << "unlock: restarting flowgraph" << std::endl;
-    restart();
-  }
+  restart();
 }
 
+/*
+ * restart is called with d_mutex held
+ */
 void
 gr_top_block_impl::restart()
 {
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "restart: entered" << std::endl;
-
-  if (!d_running)
-    return;            // nothing to do
-
-  // Stop scheduler threads and wait for completion
-  stop();
+  stop();                   // Stop scheduler and wait for completion
   wait();
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "restart: threads stopped" << std::endl;
 
   // Create new simple flow graph
   gr_flat_flowgraph_sptr new_ffg = d_owner->flatten();        
   new_ffg->validate();                // check consistency, sanity, etc
-
-  if (GR_TOP_BLOCK_IMPL_DEBUG) {
-      std::cout << std::endl << "*** Existing flat flowgraph @" << d_ffg << ":" << std::endl;
-      d_ffg->dump();
-  }
   new_ffg->merge_connections(d_ffg);   // reuse buffers, etc
-
-  if (GR_TOP_BLOCK_IMPL_DEBUG) {
-    std::cout << std::endl << "*** New flat flowgraph after merge @" << new_ffg << ":" << std::endl;
-    new_ffg->dump();
-  }
-  
   d_ffg = new_ffg;
 
-  start_threads();
-  d_running = true;
+  // Create a new scheduler to execute it
+  d_scheduler = make_scheduler(d_ffg);
+  d_state = RUNNING;
 }
 
 void
@@ -148,14 +178,3 @@ gr_top_block_impl::dump()
   if (d_ffg)
     d_ffg->dump();
 }
-
-gr_block_vector_t
-gr_top_block_impl::make_gr_block_vector(gr_basic_block_vector_t blocks)
-{
-  gr_block_vector_t result;
-  for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
-    result.push_back(make_gr_block_sptr(*p));
-  }
-
-  return result;
-}