Merged r5230:5237 from jcorgan/disc2. Trunk passes distcheck.
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_runtime_impl.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2006,2007 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gr_runtime.h>
28 #include <gr_runtime_impl.h>
29 #include <gr_simple_flowgraph.h>
30 #include <gr_hier_block2.h>
31 #include <gr_hier_block2_detail.h>
32 #include <gr_local_sighandler.h>
33
34 #ifdef HAVE_SIGNAL_H
35 #include <signal.h>
36 #endif
37
38 #include <stdexcept>
39 #include <iostream>
40
41 #define GR_RUNTIME_IMPL_DEBUG 1
42
43 static gr_runtime_impl *s_runtime = 0;
44
45 // FIXME: This prevents using more than one gr_runtime instance
46 void 
47 runtime_sigint_handler(int signum)
48 {
49   if (GR_RUNTIME_IMPL_DEBUG)
50     std::cout << "SIGINT received, calling stop() on all threads" << std::endl;
51
52   if (s_runtime)
53     s_runtime->stop();
54 }
55
56 gr_runtime_impl::gr_runtime_impl(gr_hier_block2_sptr top_block) 
57   : d_running(false),
58     d_top_block(top_block),
59     d_sfg(gr_make_simple_flowgraph())
60 {
61   s_runtime = this;
62 }
63
64 gr_runtime_impl::~gr_runtime_impl()
65 {
66   s_runtime = 0; // we don't own this
67 }
68
69 void
70 gr_runtime_impl::start()
71 {
72   if (GR_RUNTIME_IMPL_DEBUG)
73     std::cout << "start: entered" << std::endl;
74
75   if (d_running)
76     throw std::runtime_error("already running");
77
78   // Create new simple flow graph by flattening hierarchical block
79   d_sfg->d_detail->reset();
80   d_top_block->d_detail->flatten(d_sfg);
81
82   // Validate new simple flow graph and wire it up
83   d_sfg->d_detail->validate();
84   d_sfg->d_detail->setup_connections();
85
86   // Execute scheduler threads
87   start_threads();
88 }
89
90 void
91 gr_runtime_impl::start_threads()
92 {
93   if (GR_RUNTIME_IMPL_DEBUG)
94     std::cout << "start_threads: entered" << std::endl;
95
96   d_graphs = d_sfg->d_detail->partition();
97   for (std::vector<gr_block_vector_t>::iterator p = d_graphs.begin();
98        p != d_graphs.end(); p++) {
99     gr_scheduler_thread *thread = new gr_scheduler_thread(*p);
100     d_threads.push_back(thread);
101     if (GR_RUNTIME_IMPL_DEBUG)
102       std::cout << "start_threads: starting " << thread << std::endl;
103     thread->start();
104   }
105
106   d_running = true;
107 }
108
109 void
110 gr_runtime_impl::stop()
111 {
112   if (GR_RUNTIME_IMPL_DEBUG)
113     std::cout << "stop: entered" << std::endl;
114
115   for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
116     if (GR_RUNTIME_IMPL_DEBUG)
117       std::cout << "stop: stopping thread " << (*p) << std::endl;
118     (*p)->stop();
119   }
120
121   d_running = false;
122 }
123
124 void
125 gr_runtime_impl::wait()
126 {
127   if (GR_RUNTIME_IMPL_DEBUG)
128     std::cout << "wait: entered" << std::endl;
129
130   void *dummy_status; // don't ever dereference this
131   gr_local_sighandler sigint(SIGINT, runtime_sigint_handler);
132
133   for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
134     if (GR_RUNTIME_IMPL_DEBUG)
135       std::cout << "wait: joining thread " << (*p) << std::endl;
136     (*p)->join(&dummy_status); // pthreads will self-delete, so pointer is now dead
137     (*p) = 0; // FIXME: switch to stl::list and actually remove from container
138     if (GR_RUNTIME_IMPL_DEBUG)
139       std::cout << "wait: join returned" << std::endl;
140   }
141
142   d_threads.clear();
143 }
144
145 void
146 gr_runtime_impl::restart()
147 {
148   if (GR_RUNTIME_IMPL_DEBUG)
149     std::cout << "restart: entered" << std::endl;
150
151   if (!d_running)
152     throw std::runtime_error("not running");
153
154   // Stop scheduler threads and wait for completion
155   stop();
156   wait();
157   if (GR_RUNTIME_IMPL_DEBUG)
158     std::cout << "restart: threads stopped" << std::endl;
159
160   // Create new simple flow graph 
161   gr_simple_flowgraph_sptr new_sfg = gr_make_simple_flowgraph();
162   d_top_block->d_detail->flatten(new_sfg);
163   new_sfg->validate();
164   new_sfg->d_detail->merge_connections(d_sfg);
165
166   if (GR_RUNTIME_IMPL_DEBUG)
167     std::cout << "restart: replacing old flow graph with new" << std::endl;
168   d_sfg = new_sfg;
169
170   start_threads();
171 }
172
173 gr_scheduler_thread::gr_scheduler_thread(gr_block_vector_t graph) :
174   omni_thread(NULL, PRIORITY_NORMAL),
175   d_sts(gr_make_single_threaded_scheduler(graph))
176 {
177 }
178
179 gr_scheduler_thread::~gr_scheduler_thread()
180 {
181 }
182
183 void gr_scheduler_thread::start()
184 {
185   start_undetached();
186 }
187
188 void *
189 gr_scheduler_thread::run_undetached(void *arg)
190 {
191   // First code to run in new thread context
192
193   // Mask off SIGINT in this thread to gaurantee mainline thread gets signal
194 #ifdef HAVE_SIGPROCMASK
195   sigset_t old_set;
196   sigset_t new_set;
197   sigemptyset(&new_set);
198   sigaddset(&new_set, SIGINT);
199   sigprocmask(SIG_BLOCK, &new_set, &old_set);
200 #endif
201   // Run the single-threaded scheduler
202   d_sts->run();
203   return 0;
204 }
205
206 void
207 gr_scheduler_thread::stop()
208 {
209   d_sts->stop();
210 }
211