3 * Copyright 2007,2008,2009 Free Software Foundation, Inc.
5 * This file is part of GNU Radio
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; if not, write to the Free Software Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 #include <mb_runtime_thread_per_block.h>
26 #include <mblock/mblock.h>
27 #include <mb_mblock_impl.h>
28 #include <mblock/class_registry.h>
29 #include <mblock/exception.h>
30 #include <mb_worker.h>
31 #include <gnuradio/omnithread.h>
33 #include <mb_msg_accepter_msgq.h>
36 static gruel::pmt_t s_halt = gruel::pmt_intern("%halt");
37 static gruel::pmt_t s_sys_port = gruel::pmt_intern("%sys-port");
38 static gruel::pmt_t s_shutdown = gruel::pmt_intern("%shutdown");
39 static gruel::pmt_t s_request_shutdown = gruel::pmt_intern("%request-shutdown");
40 static gruel::pmt_t s_worker_state_changed = gruel::pmt_intern("%worker-state-changed");
41 static gruel::pmt_t s_timeout = gruel::pmt_intern("%timeout");
42 static gruel::pmt_t s_request_timeout = gruel::pmt_intern("%request-timeout");
43 static gruel::pmt_t s_cancel_timeout = gruel::pmt_intern("%cancel-timeout");
44 static gruel::pmt_t s_send_halt = gruel::pmt_intern("send-halt");
45 static gruel::pmt_t s_exit_now = gruel::pmt_intern("exit-now");
48 send_sys_msg(mb_msg_queue &msgq, gruel::pmt_t signal,
49 gruel::pmt_t data = gruel::PMT_F, gruel::pmt_t metadata = gruel::PMT_F,
50 mb_pri_t priority = MB_PRI_BEST)
52 mb_message_sptr msg = mb_make_message(signal, data, metadata, priority);
53 msg->set_port_id(s_sys_port);
58 mb_runtime_thread_per_block::mb_runtime_thread_per_block()
59 : d_shutdown_in_progress(false),
60 d_shutdown_result(gruel::PMT_T)
62 d_accepter = mb_msg_accepter_sptr(new mb_msg_accepter_msgq(&d_msgq));
65 mb_runtime_thread_per_block::~mb_runtime_thread_per_block()
67 // FIXME iterate over workers and ensure that they are dead.
69 if (!d_workers.empty())
70 std::cerr << "\nmb_runtime_thread_per_block: dtor (# workers = "
71 << d_workers.size() << ")\n";
75 mb_runtime_thread_per_block::request_shutdown(gruel::pmt_t result)
77 (*accepter())(s_request_shutdown, result, gruel::PMT_F, MB_PRI_BEST);
81 mb_runtime_thread_per_block::run(const std::string &instance_name,
82 const std::string &class_name,
83 gruel::pmt_t user_arg, gruel::pmt_t *result)
85 if (result) // set it to something now, in case we throw
86 *result = gruel::PMT_F;
88 // reset the shutdown state
89 d_shutdown_in_progress = false;
90 d_shutdown_result = gruel::PMT_T;
92 assert(d_workers.empty());
94 while (!d_timer_queue.empty()) // ensure timer queue is empty
98 * Create the top-level component, and recursively all of its
101 d_top = create_component(instance_name, class_name, user_arg);
112 *result = d_shutdown_result;
119 mb_runtime_thread_per_block::run_loop()
124 if (d_timer_queue.empty()) // Any timeouts pending?
125 msg = d_msgq.get_highest_pri_msg(); // Nope. Block forever.
128 mb_timeout_sptr to = d_timer_queue.top(); // Yep. Get earliest timeout.
130 // wait for a msg or the timeout...
131 msg = d_msgq.get_highest_pri_msg_timedwait(to->d_when);
133 if (!msg){ // We timed out.
134 d_timer_queue.pop(); // Remove timeout from timer queue.
136 // send the %timeout msg
137 (*to->d_accepter)(s_timeout, to->d_user_data, to->handle(), MB_PRI_BEST);
139 if (to->d_is_periodic){
140 to->d_when = to->d_when + to->d_delta; // update time of next firing
141 d_timer_queue.push(to); // push it back into the queue
147 gruel::pmt_t signal = msg->signal();
149 if (gruel::pmt_eq(signal, s_worker_state_changed)){ // %worker-state-changed
150 omni_mutex_lock l1(d_workers_mutex);
152 if (d_workers.empty()) // no work left to do...
155 else if (gruel::pmt_eq(signal, s_request_shutdown)){ // %request-shutdown
156 if (!d_shutdown_in_progress){
157 d_shutdown_in_progress = true;
158 d_shutdown_result = msg->data();
160 // schedule a timeout for ourselves...
161 schedule_one_shot_timeout(mb_time::time(0.100), s_send_halt, d_accepter);
162 send_all_sys_msg(s_shutdown);
165 else if (gruel::pmt_eq(signal, s_request_timeout)){ // %request-timeout
167 boost::any_cast<mb_timeout_sptr>(gruel::pmt_any_ref(msg->data()));
168 d_timer_queue.push(to);
170 else if (gruel::pmt_eq(signal, s_cancel_timeout)){ // %cancel-timeout
171 d_timer_queue.cancel(msg->data());
173 else if (gruel::pmt_eq(signal, s_timeout)
174 && gruel::pmt_eq(msg->data(), s_send_halt)){ // %timeout, send-halt
176 // schedule another timeout for ourselves...
177 schedule_one_shot_timeout(mb_time::time(0.100), s_exit_now, d_accepter);
178 send_all_sys_msg(s_halt);
180 else if (gruel::pmt_eq(signal, s_timeout)
181 && gruel::pmt_eq(msg->data(), s_exit_now)){ // %timeout, exit-now
183 // We only get here if we've sent all workers %shutdown followed
184 // by %halt, and one or more of them is still alive. They must
185 // be blocked in the kernel. FIXME We could add one more step:
186 // pthread_kill(...) but for now, we'll just ignore them...
190 std::cerr << "mb_runtime_thread_per_block: unhandled msg: " << msg << std::endl;
196 mb_runtime_thread_per_block::reap_dead_workers()
198 // Already holding mutex
199 // omni_mutex_lock l1(d_workers_mutex);
201 for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ){
204 // We can't join while holding the worker mutex, since that would
205 // attempt to destroy the mutex we're holding (omnithread's join
206 // deletes the omni_thread object after the pthread_join
207 // completes) Instead, we lock just long enough to figure out if
208 // the worker is dead.
210 omni_mutex_lock l2((*wi)->d_mutex);
211 is_dead = (*wi)->d_state == mb_worker::TS_DEAD;
216 std::cerr << "\nruntime: "
217 << "(" << (*wi)->id() << ") "
218 << (*wi)->d_mblock->instance_name() << " is TS_DEAD\n";
220 (*wi)->join(&ignore);
221 wi = d_workers.erase(wi);
229 // Create the thread, then create the component in the thread.
230 // Return a pointer to the created mblock.
232 // Can be invoked from any thread
235 mb_runtime_thread_per_block::create_component(const std::string &instance_name,
236 const std::string &class_name,
237 gruel::pmt_t user_arg)
239 mb_mblock_maker_t maker;
240 if (!mb_class_registry::lookup_maker(class_name, &maker))
241 throw mbe_no_such_class(0, class_name + " (in " + instance_name + ")");
243 // FIXME here's where we'd lookup NUMA placement requests & mblock
244 // priorities and communicate them to the worker we're creating...
246 // Create the worker thread
248 new mb_worker(this, maker, instance_name, user_arg);
250 w->start_undetached(); // start it
252 // Wait for it to reach TS_RUNNING or TS_DEAD
255 mb_worker::cause_of_death_t why_dead;
257 omni_mutex_lock l(w->d_mutex);
258 while (!(w->d_state == mb_worker::TS_RUNNING
259 || w->d_state == mb_worker::TS_DEAD))
260 w->d_state_cond.wait();
262 is_dead = w->d_state == mb_worker::TS_DEAD;
263 why_dead = w->d_why_dead;
266 // If the worker failed to init (constructor or initial_transition
267 // raised an exception), reap the worker now and raise an exception.
269 if (is_dead && why_dead != mb_worker::RIP_EXIT){
274 // FIXME with some work we ought to be able to propagate the
275 // exception from the worker.
276 throw mbe_mblock_failed(0, instance_name);
281 // Add w to the vector of workers, and return the mblock.
283 omni_mutex_lock l(d_workers_mutex);
284 d_workers.push_back(w);
288 std::cerr << "\nruntime: created "
289 << "(" << w->id() << ") "
290 << w->d_mblock->instance_name() << "\n";
296 mb_runtime_thread_per_block::send_all_sys_msg(gruel::pmt_t signal,
298 gruel::pmt_t metadata,
301 omni_mutex_lock l1(d_workers_mutex);
303 for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ++wi){
304 send_sys_msg((*wi)->d_mblock->impl()->msgq(),
305 signal, data, metadata, priority);
310 // Can be invoked from any thread.
311 // Sends a message to the runtime.
314 mb_runtime_thread_per_block::schedule_one_shot_timeout
315 (const mb_time &abs_time,
316 gruel::pmt_t user_data,
317 mb_msg_accepter_sptr accepter)
319 mb_timeout_sptr to(new mb_timeout(abs_time, user_data, accepter));
320 (*d_accepter)(s_request_timeout, gruel::pmt_make_any(to), gruel::PMT_F, MB_PRI_BEST);
325 // Can be invoked from any thread.
326 // Sends a message to the runtime.
329 mb_runtime_thread_per_block::schedule_periodic_timeout
330 (const mb_time &first_abs_time,
331 const mb_time &delta_time,
332 gruel::pmt_t user_data,
333 mb_msg_accepter_sptr accepter)
335 mb_timeout_sptr to(new mb_timeout(first_abs_time, delta_time,
336 user_data, accepter));
337 (*d_accepter)(s_request_timeout, gruel::pmt_make_any(to), gruel::PMT_F, MB_PRI_BEST);
342 // Can be invoked from any thread.
343 // Sends a message to the runtime.
346 mb_runtime_thread_per_block::cancel_timeout(gruel::pmt_t handle)
348 (*d_accepter)(s_cancel_timeout, handle, gruel::PMT_F, MB_PRI_BEST);