#include <gr_top_block.h>
#include <gr_top_block_impl.h>
#include <gr_flat_flowgraph.h>
-#include <gr_scheduler_thread.h>
-#include <gr_local_sighandler.h>
+#include <gr_scheduler_sts.h>
+#include <gr_scheduler_tpb.h>
#include <stdexcept>
#include <iostream>
#include <string.h>
#include <unistd.h>
+#include <stdlib.h>
#define GR_TOP_BLOCK_IMPL_DEBUG 0
+
+typedef gr_scheduler_sptr (*scheduler_maker)(gr_flat_flowgraph_sptr ffg);
+
+static struct scheduler_table {
+ const char *name;
+ scheduler_maker f;
+} scheduler_table[] = {
+ { "TPB", gr_scheduler_tpb::make }, // first entry is default
+ { "STS", gr_scheduler_sts::make }
+};
+
+static gr_scheduler_sptr
+make_scheduler(gr_flat_flowgraph_sptr ffg)
+{
+ static scheduler_maker factory = 0;
+
+ if (factory == 0){
+ char *v = getenv("GR_SCHEDULER");
+ if (!v)
+ factory = scheduler_table[0].f; // use default
+ else {
+ for (size_t i = 0; i < sizeof(scheduler_table)/sizeof(scheduler_table[0]); i++){
+ if (strcmp(v, scheduler_table[i].name) == 0){
+ factory = scheduler_table[i].f;
+ break;
+ }
+ }
+ if (factory == 0){
+ std::cerr << "warning: Invalid GR_SCHEDULER environment variable value \""
+ << v << "\". Using \"" << scheduler_table[0].name << "\"\n";
+ factory = scheduler_table[0].f;
+ }
+ }
+ }
+ return factory(ffg);
+}
+
+
gr_top_block_impl::gr_top_block_impl(gr_top_block *owner)
- : d_owner(owner),
- d_running(false),
- d_ffg(),
- d_lock_count(0)
+ : d_owner(owner), d_ffg(),
+ d_state(IDLE), d_lock_count(0)
{
}
void
gr_top_block_impl::start()
{
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "start: entered " << this << std::endl;
+ gr_lock_guard l(d_mutex);
- if (d_running)
+ if (d_state != IDLE)
throw std::runtime_error("top_block::start: top block already running or wait() not called after previous stop()");
if (d_lock_count > 0)
- throw std::runtime_error("top_block::start: can't call start with flow graph locked");
+ throw std::runtime_error("top_block::start: can't start with flow graph locked");
// Create new flat flow graph by flattening hierarchy
d_ffg = d_owner->flatten();
d_ffg->validate();
d_ffg->setup_connections();
- // Execute scheduler threads
- start_threads();
- d_running = true;
+ d_scheduler = make_scheduler(d_ffg);
+ d_state = RUNNING;
}
+void
+gr_top_block_impl::stop()
+{
+ if (d_scheduler)
+ d_scheduler->stop();
+}
+
+
+void
+gr_top_block_impl::wait()
+{
+ if (d_scheduler)
+ d_scheduler->wait();
+
+ d_state = IDLE;
+}
// N.B. lock() and unlock() cannot be called from a flow graph thread or
// deadlock will occur when reconfiguration happens
void
gr_top_block_impl::lock()
{
- omni_mutex_lock lock(d_reconf);
+ gr_lock_guard lock(d_mutex);
d_lock_count++;
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "runtime: locked, count = " << d_lock_count << std::endl;
}
void
gr_top_block_impl::unlock()
{
- omni_mutex_lock lock(d_reconf);
+ gr_lock_guard lock(d_mutex);
+
if (d_lock_count <= 0){
d_lock_count = 0; // fix it, then complain
throw std::runtime_error("unpaired unlock() call");
}
d_lock_count--;
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "unlock: unlocked, count = " << d_lock_count << std::endl;
+ if (d_lock_count > 0 || d_state == IDLE) // nothing to do
+ return;
- if (d_lock_count == 0) {
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "unlock: restarting flowgraph" << std::endl;
- restart();
- }
+ restart();
}
+/*
+ * restart is called with d_mutex held
+ */
void
gr_top_block_impl::restart()
{
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "restart: entered" << std::endl;
-
- if (!d_running)
- return; // nothing to do
-
- // Stop scheduler threads and wait for completion
- stop();
+ stop(); // Stop scheduler and wait for completion
wait();
- if (GR_TOP_BLOCK_IMPL_DEBUG)
- std::cout << "restart: threads stopped" << std::endl;
// Create new simple flow graph
gr_flat_flowgraph_sptr new_ffg = d_owner->flatten();
new_ffg->validate(); // check consistency, sanity, etc
-
- if (GR_TOP_BLOCK_IMPL_DEBUG) {
- std::cout << std::endl << "*** Existing flat flowgraph @" << d_ffg << ":" << std::endl;
- d_ffg->dump();
- }
new_ffg->merge_connections(d_ffg); // reuse buffers, etc
-
- if (GR_TOP_BLOCK_IMPL_DEBUG) {
- std::cout << std::endl << "*** New flat flowgraph after merge @" << new_ffg << ":" << std::endl;
- new_ffg->dump();
- }
-
d_ffg = new_ffg;
- start_threads();
- d_running = true;
+ // Create a new scheduler to execute it
+ d_scheduler = make_scheduler(d_ffg);
+ d_state = RUNNING;
}
void
if (d_ffg)
d_ffg->dump();
}
-
-gr_block_vector_t
-gr_top_block_impl::make_gr_block_vector(gr_basic_block_vector_t blocks)
-{
- gr_block_vector_t result;
- for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
- result.push_back(make_gr_block_sptr(*p));
- }
-
- return result;
-}