Merge r6160:6168 from jcorgan/fg into trunk.
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_runtime_impl.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2006,2007 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gr_runtime.h>
28 #include <gr_runtime_impl.h>
29 #include <gr_flat_flowgraph.h>
30 #include <gr_hier_block2.h>
31 #include <gr_hier_block2_detail.h>
32 #include <gr_local_sighandler.h>
33
34 #ifdef HAVE_SIGNAL_H
35 #include <signal.h>
36 #endif
37
38 #include <stdexcept>
39 #include <iostream>
40
41 #define GR_RUNTIME_IMPL_DEBUG 0
42
43 static gr_runtime_impl *s_runtime = 0;
44
45 // Make a vector of gr_block from a vector of gr_basic_block
46 static
47 gr_block_vector_t
48 make_gr_block_vector(gr_basic_block_vector_t &blocks)
49 {
50   gr_block_vector_t result;
51   for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
52     result.push_back(make_gr_block_sptr(*p));
53   }
54
55   return result;
56 }
57
58 // FIXME: This prevents using more than one gr_runtime instance
59 void 
60 runtime_sigint_handler(int signum)
61 {
62   if (GR_RUNTIME_IMPL_DEBUG)
63     std::cout << "SIGINT received, calling stop() on all threads" << std::endl;
64
65   if (s_runtime)
66     s_runtime->stop();
67 }
68
69 gr_runtime_impl::gr_runtime_impl(gr_hier_block2_sptr top_block, gr_runtime *owner) 
70   : d_running(false),
71     d_top_block(top_block),
72     d_ffg(gr_make_flat_flowgraph()),
73     d_owner(owner)
74 {
75   s_runtime = this;
76   top_block->set_runtime(d_owner);
77 }
78
79 gr_runtime_impl::~gr_runtime_impl()
80 {
81   s_runtime = 0; // don't call delete we don't own these
82   d_owner = 0;
83 }
84
85 void
86 gr_runtime_impl::start()
87 {
88   if (GR_RUNTIME_IMPL_DEBUG)
89     std::cout << "start: entered" << std::endl;
90
91   if (d_running)
92     throw std::runtime_error("already running");
93
94   // Create new simple flow graph by flattening hierarchical block
95   d_ffg->clear();
96   d_top_block->d_detail->flatten(d_ffg);
97
98   // Validate new simple flow graph and wire it up
99   d_ffg->validate();
100   d_ffg->setup_connections();
101
102   // Execute scheduler threads
103   start_threads();
104 }
105
106 void
107 gr_runtime_impl::start_threads()
108 {
109   if (GR_RUNTIME_IMPL_DEBUG)
110     std::cout << "start_threads: entered" << std::endl;
111
112   d_graphs = d_ffg->partition();
113   for (std::vector<gr_basic_block_vector_t>::iterator p = d_graphs.begin();
114        p != d_graphs.end(); p++) {
115     gr_scheduler_thread *thread = new gr_scheduler_thread(make_gr_block_vector(*p));
116     d_threads.push_back(thread);
117     if (GR_RUNTIME_IMPL_DEBUG)
118       std::cout << "start_threads: starting " << thread << std::endl;
119     thread->start();
120   }
121
122   d_running = true;
123 }
124
125 void
126 gr_runtime_impl::stop()
127 {
128   if (GR_RUNTIME_IMPL_DEBUG)
129     std::cout << "stop: entered" << std::endl;
130
131   for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
132     if (GR_RUNTIME_IMPL_DEBUG)
133       std::cout << "stop: stopping thread " << (*p) << std::endl;
134     (*p)->stop();
135   }
136
137   d_running = false;
138 }
139
140 void
141 gr_runtime_impl::wait()
142 {
143   if (GR_RUNTIME_IMPL_DEBUG)
144     std::cout << "wait: entered" << std::endl;
145
146   void *dummy_status; // don't ever dereference this
147   gr_local_sighandler sigint(SIGINT, runtime_sigint_handler);
148
149   for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
150     if (GR_RUNTIME_IMPL_DEBUG)
151       std::cout << "wait: joining thread " << (*p) << std::endl;
152     (*p)->join(&dummy_status); // pthreads will self-delete, so pointer is now dead
153     (*p) = 0; // FIXME: switch to stl::list and actually remove from container
154     if (GR_RUNTIME_IMPL_DEBUG)
155       std::cout << "wait: join returned" << std::endl;
156   }
157
158   d_threads.clear();
159 }
160
161
162 // N.B. lock() and unlock() cannot be called from a flow graph thread or
163 // deadlock will occur when reconfiguration happens
164 void
165 gr_runtime_impl::lock()
166 {
167   omni_mutex_lock lock(d_reconf);
168   d_lock_count++;
169   if (GR_RUNTIME_IMPL_DEBUG)
170     std::cout << "runtime: locked, count = " << d_lock_count <<  std::endl;
171 }
172
173 void
174 gr_runtime_impl::unlock()
175 {
176   omni_mutex_lock lock(d_reconf);
177   if (d_lock_count == 0)
178     throw std::runtime_error("unpaired unlock() call");
179
180   d_lock_count--;
181   if (GR_RUNTIME_IMPL_DEBUG)
182     std::cout << "runtime: unlocked, count = " << d_lock_count << std::endl;
183
184   if (d_lock_count == 0)
185     restart();
186 }
187
188 void
189 gr_runtime_impl::restart()
190 {
191   if (GR_RUNTIME_IMPL_DEBUG)
192     std::cout << "restart: entered" << std::endl;
193
194   if (!d_running)
195     throw std::runtime_error("not running");
196
197   // Stop scheduler threads and wait for completion
198   stop();
199   wait();
200   if (GR_RUNTIME_IMPL_DEBUG)
201     std::cout << "restart: threads stopped" << std::endl;
202
203   // Create new simple flow graph 
204   gr_flat_flowgraph_sptr new_ffg = gr_make_flat_flowgraph();
205   d_top_block->d_detail->flatten(new_ffg);
206   new_ffg->validate();
207   new_ffg->merge_connections(d_ffg);
208
209   if (GR_RUNTIME_IMPL_DEBUG)
210     std::cout << "restart: replacing old flow graph with new" << std::endl;
211   d_ffg = new_ffg;
212
213   start_threads();
214 }
215
216 gr_scheduler_thread::gr_scheduler_thread(gr_block_vector_t graph) :
217   omni_thread(NULL, PRIORITY_NORMAL),
218   d_sts(gr_make_single_threaded_scheduler(graph))
219 {
220 }
221
222 gr_scheduler_thread::~gr_scheduler_thread()
223 {
224 }
225
226 void gr_scheduler_thread::start()
227 {
228   start_undetached();
229 }
230
231 void *
232 gr_scheduler_thread::run_undetached(void *arg)
233 {
234   // First code to run in new thread context
235
236   // Mask off SIGINT in this thread to gaurantee mainline thread gets signal
237 #ifdef HAVE_SIGPROCMASK
238   sigset_t old_set;
239   sigset_t new_set;
240   sigemptyset(&new_set);
241   sigaddset(&new_set, SIGINT);
242   sigprocmask(SIG_BLOCK, &new_set, &old_set);
243 #endif
244   // Run the single-threaded scheduler
245   d_sts->run();
246   return 0;
247 }
248
249 void
250 gr_scheduler_thread::stop()
251 {
252   d_sts->stop();
253 }