3 * Copyright 2004 Free Software Foundation, Inc.
5 * This file is part of GNU Radio
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
27 #include <gr_single_threaded_scheduler.h>
29 #include <gr_block_detail.h>
30 #include <gr_buffer.h>
31 #include <boost/thread.hpp>
37 // must be defined to either 0 or 1
38 #define ENABLE_LOGGING 0
41 #define LOG(x) do { x; } while(0)
43 #define LOG(x) do {;} while(0)
46 static int which_scheduler = 0;
48 gr_single_threaded_scheduler_sptr
49 gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks)
52 gr_single_threaded_scheduler_sptr (new gr_single_threaded_scheduler (blocks));
55 gr_single_threaded_scheduler::gr_single_threaded_scheduler (
56 const std::vector<gr_block_sptr> &blocks)
57 : d_blocks (blocks), d_enabled (true), d_log(0)
61 snprintf(name, sizeof(name), "sst-%d.log", which_scheduler++);
62 d_log = new std::ofstream(name);
63 *d_log << "gr_single_threaded_scheduler: "
69 gr_single_threaded_scheduler::~gr_single_threaded_scheduler ()
76 gr_single_threaded_scheduler::run ()
78 // d_enabled = true; // KLUDGE
83 gr_single_threaded_scheduler::stop ()
86 std::cout << "gr_singled_threaded_scheduler::stop() "
91 inline static unsigned int
92 round_up (unsigned int n, unsigned int multiple)
94 return ((n + multiple - 1) / multiple) * multiple;
97 inline static unsigned int
98 round_down (unsigned int n, unsigned int multiple)
100 return (n / multiple) * multiple;
104 // Return minimum available write space in all our downstream buffers
105 // or -1 if we're output blocked and the output we're blocked
109 min_available_space (gr_block_detail *d, int output_multiple)
111 int min_space = std::numeric_limits<int>::max();
113 for (int i = 0; i < d->noutputs (); i++){
114 int n = round_down (d->output(i)->space_available (), output_multiple);
115 if (n == 0){ // We're blocked on output.
116 if (d->output(i)->done()){ // Downstream is done, therefore we're done.
121 min_space = std::min (min_space, n);
127 gr_single_threaded_scheduler::main_loop ()
129 static const int DEFAULT_CAPACITY = 16;
132 gr_vector_int ninput_items_required (DEFAULT_CAPACITY);
133 gr_vector_int ninput_items (DEFAULT_CAPACITY);
134 gr_vector_const_void_star input_items (DEFAULT_CAPACITY);
135 gr_vector_void_star output_items (DEFAULT_CAPACITY);
139 bool made_progress_last_pass;
140 bool making_progress;
142 for (unsigned i = 0; i < d_blocks.size (); i++)
143 d_blocks[i]->detail()->set_done (false); // reset any done flags
145 for (unsigned i = 0; i < d_blocks.size (); i++) // enable any drivers, etc.
146 d_blocks[i]->start();
150 made_progress_last_pass = true;
151 making_progress = false;
153 // Loop while there are still blocks alive
155 nalive = d_blocks.size ();
156 while (d_enabled && nalive > 0){
158 if (boost::this_thread::interruption_requested())
161 gr_block *m = d_blocks[bi].get ();
162 gr_block_detail *d = m->detail().get ();
164 LOG(*d_log << std::endl << m);
170 // Invoke sources as a last resort. As long as the previous pass
171 // made progress, don't call a source.
172 if (made_progress_last_pass){
173 LOG(*d_log << " Skipping source\n");
177 ninput_items_required.resize (0);
178 ninput_items.resize (0);
179 input_items.resize (0);
180 output_items.resize (d->noutputs ());
182 // determine the minimum available output space
183 noutput_items = min_available_space (d, m->output_multiple ());
184 LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
185 if (noutput_items == -1) // we're done
188 if (noutput_items == 0){ // we're output blocked
189 LOG(*d_log << " BLKD_OUT\n");
193 goto setup_call_to_work; // jump to common code
196 else if (d->sink_p ()){
197 ninput_items_required.resize (d->ninputs ());
198 ninput_items.resize (d->ninputs ());
199 input_items.resize (d->ninputs ());
200 output_items.resize (0);
201 LOG(*d_log << " sink\n");
204 for (int i = 0; i < d->ninputs (); i++){
205 ninput_items[i] = d->input(i)->items_available();
206 //if (ninput_items[i] == 0 && d->input(i)->done())
207 if (ninput_items[i] < m->output_multiple() && d->input(i)->done())
210 max_items_avail = std::max (max_items_avail, ninput_items[i]);
213 // take a swag at how much output we can sink
214 noutput_items = (int) (max_items_avail * m->relative_rate ());
215 noutput_items = round_down (noutput_items, m->output_multiple ());
216 LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
217 LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
219 if (noutput_items == 0){ // we're blocked on input
220 LOG(*d_log << " BLKD_IN\n");
224 goto try_again; // Jump to code shared with regular case.
228 // do the regular thing
229 ninput_items_required.resize (d->ninputs ());
230 ninput_items.resize (d->ninputs ());
231 input_items.resize (d->ninputs ());
232 output_items.resize (d->noutputs ());
235 for (int i = 0; i < d->ninputs (); i++){
236 ninput_items[i] = d->input(i)->items_available ();
237 max_items_avail = std::max (max_items_avail, ninput_items[i]);
240 // determine the minimum available output space
241 noutput_items = min_available_space (d, m->output_multiple ());
243 *d_log << " regular ";
244 if (m->relative_rate() >= 1.0)
245 *d_log << "1:" << m->relative_rate() << std::endl;
247 *d_log << 1.0/m->relative_rate() << ":1\n";
248 *d_log << " max_items_avail = " << max_items_avail << std::endl;
249 *d_log << " noutput_items = " << noutput_items << std::endl;
251 if (noutput_items == -1) // we're done
254 if (noutput_items == 0){ // we're output blocked
255 LOG(*d_log << " BLKD_OUT\n");
260 // Compute best estimate of noutput_items that we can really use.
262 std::min ((unsigned) noutput_items,
263 std::max ((unsigned) m->output_multiple(),
264 round_up ((unsigned) (max_items_avail * m->relative_rate()),
265 m->output_multiple ())));
267 LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl);
271 if (m->fixed_rate()){
272 // try to work it forward starting with max_items_avail.
273 // We want to try to consume all the input we've got.
274 int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
275 reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
276 if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
277 noutput_items = reqd_noutput_items;
280 // ask the block how much input they need to produce noutput_items
281 m->forecast (noutput_items, ninput_items_required);
283 // See if we've got sufficient input available
286 for (i = 0; i < d->ninputs (); i++)
287 if (ninput_items_required[i] > ninput_items[i]) // not enough
290 if (i < d->ninputs ()){ // not enough input on input[i]
291 // if we can, try reducing the size of our output request
292 if (noutput_items > m->output_multiple ()){
294 noutput_items = round_up (noutput_items, m->output_multiple ());
298 // We're blocked on input
299 LOG(*d_log << " BLKD_IN\n");
300 if (d->input(i)->done()) // If the upstream block is done, we're done
303 // Is it possible to ever fulfill this request?
304 if (ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
305 // Nope, never going to happen...
306 std::cerr << "\nsched: <gr_block " << m->name()
307 << " (" << m->unique_id() << ")>"
308 << " is requesting more input data\n"
309 << " than we can provide.\n"
310 << " ninput_items_required = "
311 << ninput_items_required[i] << "\n"
312 << " max_possible_items_available = "
313 << d->input(i)->max_possible_items_available() << "\n"
314 << " If this is a filter, consider reducing the number of taps.\n";
321 // We've got enough data on each input to produce noutput_items.
322 // Finish setting up the call to work.
324 for (int i = 0; i < d->ninputs (); i++)
325 input_items[i] = d->input(i)->read_pointer();
329 for (int i = 0; i < d->noutputs (); i++)
330 output_items[i] = d->output(i)->write_pointer();
332 // Do the actual work of the block
333 int n = m->general_work (noutput_items, ninput_items,
334 input_items, output_items);
335 LOG(*d_log << " general_work: noutput_items = " << noutput_items
336 << " result = " << n << std::endl);
338 if (n == -1) // block is done
341 d->produce_each (n); // advance write pointers
343 making_progress = true;
350 LOG(*d_log << " were_done\n");
355 if (++bi >= d_blocks.size ()){
357 made_progress_last_pass = making_progress;
358 making_progress = false;
362 for (unsigned i = 0; i < d_blocks.size (); i++) // disable any drivers, etc.