Fixing up GUI windows to better scale with gtgui sinks.
[debian/gnuradio] / mblock / src / lib / mb_runtime_thread_per_block.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008,2009 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include <mb_runtime_thread_per_block.h>
26 #include <mblock/mblock.h>
27 #include <mb_mblock_impl.h>
28 #include <mblock/class_registry.h>
29 #include <mblock/exception.h>
30 #include <mb_worker.h>
31 #include <gnuradio/omnithread.h>
32 #include <iostream>
33 #include <mb_msg_accepter_msgq.h>
34
35 using namespace pmt;
36
37 static pmt_t s_halt = pmt_intern("%halt");
38 static pmt_t s_sys_port = pmt_intern("%sys-port");
39 static pmt_t s_shutdown = pmt_intern("%shutdown");
40 static pmt_t s_request_shutdown = pmt_intern("%request-shutdown");
41 static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed");
42 static pmt_t s_timeout = pmt_intern("%timeout");
43 static pmt_t s_request_timeout = pmt_intern("%request-timeout");
44 static pmt_t s_cancel_timeout = pmt_intern("%cancel-timeout");
45 static pmt_t s_send_halt = pmt_intern("send-halt");
46 static pmt_t s_exit_now = pmt_intern("exit-now");
47
48 static void
49 send_sys_msg(mb_msg_queue &msgq, pmt_t signal,
50              pmt_t data = PMT_F, pmt_t metadata = PMT_F,
51              mb_pri_t priority = MB_PRI_BEST)
52 {
53   mb_message_sptr msg = mb_make_message(signal, data, metadata, priority);
54   msg->set_port_id(s_sys_port);
55   msgq.insert(msg);
56 }
57
58
59 mb_runtime_thread_per_block::mb_runtime_thread_per_block()
60   : d_shutdown_in_progress(false),
61     d_shutdown_result(PMT_T)
62 {
63   d_accepter = mb_msg_accepter_sptr(new mb_msg_accepter_msgq(&d_msgq));
64 }
65
66 mb_runtime_thread_per_block::~mb_runtime_thread_per_block()
67 {
68   // FIXME iterate over workers and ensure that they are dead.
69
70   if (!d_workers.empty())
71     std::cerr << "\nmb_runtime_thread_per_block: dtor (# workers = "
72               << d_workers.size() << ")\n";
73 }
74
75 void
76 mb_runtime_thread_per_block::request_shutdown(pmt_t result)
77 {
78   (*accepter())(s_request_shutdown, result, PMT_F, MB_PRI_BEST);
79 }
80
81 bool
82 mb_runtime_thread_per_block::run(const std::string &instance_name,
83                                  const std::string &class_name,
84                                  pmt_t user_arg, pmt_t *result)
85 {
86   if (result)           // set it to something now, in case we throw
87     *result = PMT_F;
88   
89   // reset the shutdown state
90   d_shutdown_in_progress = false;
91   d_shutdown_result = PMT_T;
92
93   assert(d_workers.empty());
94
95   while (!d_timer_queue.empty())        // ensure timer queue is empty
96     d_timer_queue.pop();
97
98   /*
99    * Create the top-level component, and recursively all of its
100    * subcomponents.
101    */
102   d_top = create_component(instance_name, class_name, user_arg);
103
104   try {
105     run_loop();
106   }
107   catch (...){
108     d_top.reset();
109     throw;
110   }
111
112   if (result)
113     *result = d_shutdown_result;
114   
115   d_top.reset();
116   return true;
117 }
118
119 void
120 mb_runtime_thread_per_block::run_loop()
121 {
122   while (1){
123     mb_message_sptr msg;
124
125     if (d_timer_queue.empty())                    // Any timeouts pending?
126       msg = d_msgq.get_highest_pri_msg();         // Nope.  Block forever.
127
128     else {
129       mb_timeout_sptr to = d_timer_queue.top();   // Yep.  Get earliest timeout.
130
131       // wait for a msg or the timeout...
132       msg = d_msgq.get_highest_pri_msg_timedwait(to->d_when);
133
134       if (!msg){                // We timed out.
135         d_timer_queue.pop();    // Remove timeout from timer queue.
136
137         // send the %timeout msg
138         (*to->d_accepter)(s_timeout, to->d_user_data, to->handle(), MB_PRI_BEST);
139
140         if (to->d_is_periodic){
141           to->d_when = to->d_when + to->d_delta;        // update time of next firing
142           d_timer_queue.push(to);                       // push it back into the queue
143         }
144         continue;
145       }
146     }
147
148     pmt_t signal = msg->signal();
149
150     if (pmt_eq(signal, s_worker_state_changed)){        // %worker-state-changed
151       omni_mutex_lock l1(d_workers_mutex);
152       reap_dead_workers();
153       if (d_workers.empty())    // no work left to do...
154         return;
155     }
156     else if (pmt_eq(signal, s_request_shutdown)){       // %request-shutdown
157       if (!d_shutdown_in_progress){
158         d_shutdown_in_progress = true;
159         d_shutdown_result = msg->data();
160
161         // schedule a timeout for ourselves...
162         schedule_one_shot_timeout(mb_time::time(0.100), s_send_halt, d_accepter);
163         send_all_sys_msg(s_shutdown);
164       }
165     }
166     else if (pmt_eq(signal, s_request_timeout)){        // %request-timeout
167       mb_timeout_sptr to =
168         boost::any_cast<mb_timeout_sptr>(pmt_any_ref(msg->data()));
169       d_timer_queue.push(to);
170     }
171     else if (pmt_eq(signal, s_cancel_timeout)){         // %cancel-timeout
172       d_timer_queue.cancel(msg->data());
173     }
174     else if (pmt_eq(signal, s_timeout)
175              && pmt_eq(msg->data(), s_send_halt)){      // %timeout, send-halt
176
177       // schedule another timeout for ourselves...
178       schedule_one_shot_timeout(mb_time::time(0.100), s_exit_now, d_accepter);
179       send_all_sys_msg(s_halt);
180     }
181     else if (pmt_eq(signal, s_timeout)
182              && pmt_eq(msg->data(), s_exit_now)){       // %timeout, exit-now
183
184       // We only get here if we've sent all workers %shutdown followed
185       // by %halt, and one or more of them is still alive.  They must
186       // be blocked in the kernel.  FIXME We could add one more step:
187       // pthread_kill(...) but for now, we'll just ignore them...
188       return;
189     }
190     else {
191       std::cerr << "mb_runtime_thread_per_block: unhandled msg: " << msg << std::endl;
192     }
193   }
194 }
195
196 void
197 mb_runtime_thread_per_block::reap_dead_workers()
198 {
199   // Already holding mutex
200   // omni_mutex_lock l1(d_workers_mutex);
201
202   for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ){
203     bool is_dead;
204
205     // We can't join while holding the worker mutex, since that would
206     // attempt to destroy the mutex we're holding (omnithread's join
207     // deletes the omni_thread object after the pthread_join
208     // completes) Instead, we lock just long enough to figure out if
209     // the worker is dead.
210     {
211       omni_mutex_lock l2((*wi)->d_mutex);
212       is_dead = (*wi)->d_state == mb_worker::TS_DEAD;
213     }
214
215     if (is_dead){
216       if (0)
217         std::cerr << "\nruntime: "
218                   << "(" << (*wi)->id() << ") "
219                   << (*wi)->d_mblock->instance_name() << " is TS_DEAD\n";
220       void *ignore;
221       (*wi)->join(&ignore);
222       wi = d_workers.erase(wi);
223       continue;
224     }
225     ++wi;
226   }
227 }
228
229 //
230 // Create the thread, then create the component in the thread.
231 // Return a pointer to the created mblock.
232 //
233 // Can be invoked from any thread
234 //
235 mb_mblock_sptr
236 mb_runtime_thread_per_block::create_component(const std::string &instance_name,
237                                               const std::string &class_name,
238                                               pmt_t user_arg)
239 {
240   mb_mblock_maker_t maker;
241   if (!mb_class_registry::lookup_maker(class_name, &maker))
242     throw mbe_no_such_class(0, class_name + " (in " + instance_name + ")");
243
244   // FIXME here's where we'd lookup NUMA placement requests & mblock
245   // priorities and communicate them to the worker we're creating...
246
247   // Create the worker thread
248   mb_worker *w =
249     new mb_worker(this, maker, instance_name, user_arg);
250
251   w->start_undetached();  // start it
252
253   // Wait for it to reach TS_RUNNING or TS_DEAD
254
255   bool                          is_dead;
256   mb_worker::cause_of_death_t   why_dead;
257   {
258     omni_mutex_lock l(w->d_mutex);
259     while (!(w->d_state == mb_worker::TS_RUNNING
260              || w->d_state == mb_worker::TS_DEAD))
261       w->d_state_cond.wait();
262
263     is_dead = w->d_state == mb_worker::TS_DEAD;
264     why_dead = w->d_why_dead;
265   }
266
267   // If the worker failed to init (constructor or initial_transition
268   // raised an exception), reap the worker now and raise an exception.
269
270   if (is_dead && why_dead != mb_worker::RIP_EXIT){
271
272     void *ignore;
273     w->join(&ignore);
274
275     // FIXME with some work we ought to be able to propagate the
276     // exception from the worker.
277     throw mbe_mblock_failed(0, instance_name);
278   }
279
280   assert(w->d_mblock);
281
282   // Add w to the vector of workers, and return the mblock.
283   {
284     omni_mutex_lock l(d_workers_mutex);
285     d_workers.push_back(w);
286   }
287
288   if (0)
289     std::cerr << "\nruntime: created "
290               << "(" << w->id() << ") "
291               << w->d_mblock->instance_name() << "\n";
292
293   return w->d_mblock;
294 }
295
296 void
297 mb_runtime_thread_per_block::send_all_sys_msg(pmt_t signal,
298                                               pmt_t data,
299                                               pmt_t metadata,
300                                               mb_pri_t priority)
301 {
302   omni_mutex_lock l1(d_workers_mutex);
303
304   for (worker_iter_t wi = d_workers.begin(); wi != d_workers.end(); ++wi){
305     send_sys_msg((*wi)->d_mblock->impl()->msgq(),
306                  signal, data, metadata, priority);
307   }
308 }
309
310 //
311 // Can be invoked from any thread.
312 // Sends a message to the runtime.
313 //
314 pmt_t
315 mb_runtime_thread_per_block::schedule_one_shot_timeout
316   (const mb_time &abs_time,
317    pmt_t user_data,
318    mb_msg_accepter_sptr accepter)
319 {
320   mb_timeout_sptr to(new mb_timeout(abs_time, user_data, accepter));
321   (*d_accepter)(s_request_timeout, pmt_make_any(to), PMT_F, MB_PRI_BEST);
322   return to->handle();
323 }
324
325 //
326 // Can be invoked from any thread.
327 // Sends a message to the runtime.
328 //
329 pmt_t
330 mb_runtime_thread_per_block::schedule_periodic_timeout
331   (const mb_time &first_abs_time,
332    const mb_time &delta_time,
333    pmt_t user_data,
334    mb_msg_accepter_sptr accepter)
335 {
336   mb_timeout_sptr to(new mb_timeout(first_abs_time, delta_time,
337                                     user_data, accepter));
338   (*d_accepter)(s_request_timeout, pmt_make_any(to), PMT_F, MB_PRI_BEST);
339   return to->handle();
340 }
341
342 //
343 // Can be invoked from any thread.
344 // Sends a message to the runtime.
345 //
346 void
347 mb_runtime_thread_per_block::cancel_timeout(pmt_t handle)
348 {
349   (*d_accepter)(s_cancel_timeout, handle, PMT_F, MB_PRI_BEST);
350 }