51755a65d0f4357fa491e762868864bccbdb738c
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_top_block_impl.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gr_top_block.h>
28 #include <gr_top_block_impl.h>
29 #include <gr_flat_flowgraph.h>
30 #include <gr_scheduler_thread.h>
31 #include <gr_local_sighandler.h>
32
33 #include <stdexcept>
34 #include <iostream>
35 #include <string.h>
36 #include <unistd.h>
37
38 #define GR_TOP_BLOCK_IMPL_DEBUG 0
39
40 static gr_top_block_impl *s_impl = 0;
41
42 /*!
43  * Make a vector of gr_block from a vector of gr_basic_block
44  *
45  * Pass-by-value to avoid problem with possible asynchronous modification
46  */
47 static gr_block_vector_t
48 make_gr_block_vector(gr_basic_block_vector_t blocks)
49 {
50   gr_block_vector_t result;
51   for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
52     result.push_back(make_gr_block_sptr(*p));
53   }
54
55   return result;
56 }
57
58 // FIXME: This prevents using more than one gr_top_block instance
59
60 static void 
61 runtime_sigint_handler(int signum)
62 {
63   if (GR_TOP_BLOCK_IMPL_DEBUG){
64     char *msg = "SIGINT received, calling stop()\n";
65     ::write(1, msg, strlen(msg));       // write is OK to call from signal handler
66   }
67
68   if (s_impl)
69     s_impl->stop();
70 }
71
72 // ----------------------------------------------------------------
73
74 gr_top_block_impl::gr_top_block_impl(gr_top_block *owner) 
75   : d_running(false),
76     d_ffg(),
77     d_owner(owner),
78     d_lock_count(0)
79 {
80   if (s_impl)
81     throw std::logic_error("gr_top_block_impl: multiple simultaneous gr_top_blocks not allowed");
82
83   s_impl = this;
84 }
85
86 gr_top_block_impl::~gr_top_block_impl()
87 {
88   s_impl = 0; // don't call delete we don't own these
89   d_owner = 0;
90 }
91
92 void
93 gr_top_block_impl::start()
94 {
95   if (GR_TOP_BLOCK_IMPL_DEBUG)
96     std::cout << "start: entered " << this << std::endl;
97
98   if (d_running)
99     throw std::runtime_error("top block already running or wait() not called after previous stop()");
100
101   // Create new flat flow graph by flattening hierarchy
102   d_ffg = d_owner->flatten();
103
104   // Validate new simple flow graph and wire it up
105   d_ffg->validate();
106   d_ffg->setup_connections();
107
108   // Execute scheduler threads
109   start_threads();
110 }
111
112 void
113 gr_top_block_impl::start_threads()
114 {
115   if (GR_TOP_BLOCK_IMPL_DEBUG)
116     std::cout << "start_threads: entered" << std::endl;
117
118   d_graphs = d_ffg->partition();
119   for (std::vector<gr_basic_block_vector_t>::iterator p = d_graphs.begin();
120        p != d_graphs.end(); p++) {
121     gr_scheduler_thread *thread = new gr_scheduler_thread(make_gr_block_vector(*p));
122     d_threads.push_back(thread);
123     if (GR_TOP_BLOCK_IMPL_DEBUG)
124       std::cout << "start_threads: starting " << thread << std::endl;
125     thread->start();
126   }
127
128   d_running = true;
129 }
130
131 /*
132  * N.B. as currently implemented, it is possible that this may be
133  * invoked by the SIGINT handler which is fragile as hell...
134  */
135 void
136 gr_top_block_impl::stop()
137 {
138   if (GR_TOP_BLOCK_IMPL_DEBUG){
139     char *msg = "stop: entered\n";
140     ::write(1, msg, strlen(msg));
141   }
142
143   for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
144     if (*p)
145       (*p)->stop();
146   }
147 }
148
149 void
150 gr_top_block_impl::wait()
151 {
152   if (GR_TOP_BLOCK_IMPL_DEBUG)
153     std::cout << "wait: entered" << std::endl;
154
155   void *dummy_status; // don't ever dereference this
156   gr_local_sighandler sigint(SIGINT, runtime_sigint_handler);
157
158   for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
159     if (GR_TOP_BLOCK_IMPL_DEBUG)
160       std::cout << "wait: joining thread " << (*p) << std::endl;
161     (*p)->join(&dummy_status); // pthreads will self-delete, so pointer is now dead
162     (*p) = 0; // FIXME: switch to stl::list and actually remove from container
163     if (GR_TOP_BLOCK_IMPL_DEBUG)
164       std::cout << "wait: join returned" << std::endl;
165   }
166
167   d_threads.clear();
168   d_running = false;
169 }
170
171 // N.B. lock() and unlock() cannot be called from a flow graph thread or
172 // deadlock will occur when reconfiguration happens
173 void
174 gr_top_block_impl::lock()
175 {
176   omni_mutex_lock lock(d_reconf);
177   d_lock_count++;
178   if (GR_TOP_BLOCK_IMPL_DEBUG)
179     std::cout << "runtime: locked, count = " << d_lock_count <<  std::endl;
180 }
181
182 void
183 gr_top_block_impl::unlock()
184 {
185   omni_mutex_lock lock(d_reconf);
186   if (d_lock_count <= 0)
187     throw std::runtime_error("unpaired unlock() call");
188
189   d_lock_count--;
190   if (GR_TOP_BLOCK_IMPL_DEBUG)
191     std::cout << "unlock: unlocked, count = " << d_lock_count << std::endl;
192
193   if (d_lock_count == 0) {
194     if (GR_TOP_BLOCK_IMPL_DEBUG)
195       std::cout << "unlock: restarting flowgraph" << std::endl;
196     restart();
197   }
198 }
199
200 void
201 gr_top_block_impl::restart()
202 {
203   if (GR_TOP_BLOCK_IMPL_DEBUG)
204     std::cout << "restart: entered" << std::endl;
205
206   if (!d_running)
207     throw std::runtime_error("top block is not running");
208
209   // Stop scheduler threads and wait for completion
210   stop();
211   wait();
212   if (GR_TOP_BLOCK_IMPL_DEBUG)
213     std::cout << "restart: threads stopped" << std::endl;
214
215   // Create new simple flow graph
216   gr_flat_flowgraph_sptr new_ffg = d_owner->flatten();        
217   new_ffg->validate();                 // check consistency, sanity, etc
218
219   if (GR_TOP_BLOCK_IMPL_DEBUG) {
220       std::cout << std::endl << "*** Existing flat flowgraph @" << d_ffg << ":" << std::endl;
221       d_ffg->dump();
222   }
223   new_ffg->merge_connections(d_ffg);   // reuse buffers, etc
224
225   if (GR_TOP_BLOCK_IMPL_DEBUG) {
226     std::cout << std::endl << "*** New flat flowgraph after merge @" << new_ffg << ":" << std::endl;
227     new_ffg->dump();
228   }
229   
230   d_ffg = new_ffg;
231
232   start_threads();
233 }