3 * Copyright 2004 Free Software Foundation, Inc.
5 * This file is part of GNU Radio
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2, or (at your option)
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
27 #include <gr_single_threaded_scheduler.h>
29 #include <gr_block_detail.h>
30 #include <gr_buffer.h>
36 // must be defined to either 0 or 1
37 #define ENABLE_LOGGING 0
40 #define LOG(x) do { x; } while(0)
42 #define LOG(x) do {;} while(0)
45 static int which_scheduler = 0;
49 operator << (std::ostream& os, const gr_block *m)
51 os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
55 gr_single_threaded_scheduler_sptr
56 gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks)
59 gr_single_threaded_scheduler_sptr (new gr_single_threaded_scheduler (blocks));
62 gr_single_threaded_scheduler::gr_single_threaded_scheduler (
63 const std::vector<gr_block_sptr> &blocks)
64 : d_blocks (blocks), d_enabled (true), d_log(0)
68 snprintf(name, sizeof(name), "sst-%d.log", which_scheduler++);
69 d_log = new std::ofstream(name);
70 *d_log << "gr_single_threaded_scheduler: "
76 gr_single_threaded_scheduler::~gr_single_threaded_scheduler ()
83 gr_single_threaded_scheduler::run ()
90 inline static unsigned int
91 round_up (unsigned int n, unsigned int multiple)
93 return ((n + multiple - 1) / multiple) * multiple;
96 inline static unsigned int
97 round_down (unsigned int n, unsigned int multiple)
99 return (n / multiple) * multiple;
103 // Return minimum available write space in all our downstream buffers
104 // or -1 if we're output blocked and the output we're blocked
108 min_available_space (gr_block_detail *d, int output_multiple)
110 int min_space = std::numeric_limits<int>::max();
112 for (int i = 0; i < d->noutputs (); i++){
113 int n = round_down (d->output(i)->space_available (), output_multiple);
114 if (n == 0){ // We're blocked on output.
115 if (d->output(i)->done()){ // Downstream is done, therefore we're done.
120 min_space = std::min (min_space, n);
126 gr_single_threaded_scheduler::main_loop ()
128 static const int DEFAULT_CAPACITY = 16;
131 gr_vector_int ninput_items_required (DEFAULT_CAPACITY);
132 gr_vector_int ninput_items (DEFAULT_CAPACITY);
133 gr_vector_const_void_star input_items (DEFAULT_CAPACITY);
134 gr_vector_void_star output_items (DEFAULT_CAPACITY);
138 bool made_progress_last_pass;
139 bool making_progress;
141 for (unsigned i = 0; i < d_blocks.size (); i++)
142 d_blocks[i]->detail()->set_done (false); // reset any done flags
144 for (unsigned i = 0; i < d_blocks.size (); i++) // enable any drivers, etc.
145 d_blocks[i]->start();
149 made_progress_last_pass = true;
150 making_progress = false;
152 // Loop while there are still blocks alive
154 nalive = d_blocks.size ();
155 while (d_enabled && nalive > 0){
157 gr_block *m = d_blocks[bi].get ();
158 gr_block_detail *d = m->detail().get ();
160 LOG(*d_log << std::endl << m);
166 // Invoke sources as a last resort. As long as the previous pass
167 // made progress, don't call a source.
168 if (made_progress_last_pass){
169 LOG(*d_log << " Skipping source\n");
173 ninput_items_required.resize (0);
174 ninput_items.resize (0);
175 input_items.resize (0);
176 output_items.resize (d->noutputs ());
178 // determine the minimum available output space
179 noutput_items = min_available_space (d, m->output_multiple ());
180 LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
181 if (noutput_items == -1) // we're done
184 if (noutput_items == 0){ // we're output blocked
185 LOG(*d_log << " BLKD_OUT\n");
189 goto setup_call_to_work; // jump to common code
192 else if (d->sink_p ()){
193 ninput_items_required.resize (d->ninputs ());
194 ninput_items.resize (d->ninputs ());
195 input_items.resize (d->ninputs ());
196 output_items.resize (0);
197 LOG(*d_log << " sink\n");
200 for (int i = 0; i < d->ninputs (); i++){
201 ninput_items[i] = d->input(i)->items_available();
202 //if (ninput_items[i] == 0 && d->input(i)->done())
203 if (ninput_items[i] < m->output_multiple() && d->input(i)->done())
206 max_items_avail = std::max (max_items_avail, ninput_items[i]);
209 // take a swag at how much output we can sink
210 noutput_items = (int) (max_items_avail * m->relative_rate ());
211 noutput_items = round_down (noutput_items, m->output_multiple ());
212 LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
213 LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
215 if (noutput_items == 0){ // we're blocked on input
216 LOG(*d_log << " BLKD_IN\n");
220 goto try_again; // Jump to code shared with regular case.
224 // do the regular thing
225 ninput_items_required.resize (d->ninputs ());
226 ninput_items.resize (d->ninputs ());
227 input_items.resize (d->ninputs ());
228 output_items.resize (d->noutputs ());
231 for (int i = 0; i < d->ninputs (); i++){
232 ninput_items[i] = d->input(i)->items_available ();
233 max_items_avail = std::max (max_items_avail, ninput_items[i]);
236 // determine the minimum available output space
237 noutput_items = min_available_space (d, m->output_multiple ());
239 *d_log << " regular ";
240 if (m->relative_rate() >= 1.0)
241 *d_log << "1:" << m->relative_rate() << std::endl;
243 *d_log << 1.0/m->relative_rate() << ":1\n";
244 *d_log << " max_items_avail = " << max_items_avail << std::endl;
245 *d_log << " noutput_items = " << noutput_items << std::endl;
247 if (noutput_items == -1) // we're done
250 if (noutput_items == 0){ // we're output blocked
251 LOG(*d_log << " BLKD_OUT\n");
256 // Compute best estimate of noutput_items that we can really use.
258 std::min ((unsigned) noutput_items,
259 std::max ((unsigned) m->output_multiple(),
260 round_up ((unsigned) (max_items_avail * m->relative_rate()),
261 m->output_multiple ())));
263 LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl);
267 if (m->fixed_rate()){
268 // try to work it forward starting with max_items_avail.
269 // We want to try to consume all the input we've got.
270 int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
271 reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
272 if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
273 noutput_items = reqd_noutput_items;
276 // ask the block how much input they need to produce noutput_items
277 m->forecast (noutput_items, ninput_items_required);
279 // See if we've got sufficient input available
282 for (i = 0; i < d->ninputs (); i++)
283 if (ninput_items_required[i] > ninput_items[i]) // not enough
286 if (i < d->ninputs ()){ // not enough input on input[i]
287 // if we can, try reducing the size of our output request
288 if (noutput_items > m->output_multiple ()){
290 noutput_items = round_up (noutput_items, m->output_multiple ());
294 // We're blocked on input
295 LOG(*d_log << " BLKD_IN\n");
296 if (d->input(i)->done()) // If the upstream block is done, we're done
299 // Is it possible to ever fulfill this request?
300 if (ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
301 // Nope, never going to happen...
302 std::cerr << "\nsched: <gr_block " << m->name()
303 << " (" << m->unique_id() << ")>"
304 << " is requesting more input data\n"
305 << " than we can provide.\n"
306 << " ninput_items_required = "
307 << ninput_items_required[i] << "\n"
308 << " max_possible_items_available = "
309 << d->input(i)->max_possible_items_available() << "\n"
310 << " If this is a filter, consider reducing the number of taps.\n";
317 // We've got enough data on each input to produce noutput_items.
318 // Finish setting up the call to work.
320 for (int i = 0; i < d->ninputs (); i++)
321 input_items[i] = d->input(i)->read_pointer();
325 for (int i = 0; i < d->noutputs (); i++)
326 output_items[i] = d->output(i)->write_pointer();
328 // Do the actual work of the block
329 int n = m->general_work (noutput_items, ninput_items,
330 input_items, output_items);
331 LOG(*d_log << " general_work: noutput_items = " << noutput_items
332 << " result = " << n << std::endl);
334 if (n == -1) // block is done
337 d->produce_each (n); // advance write pointers
339 making_progress = true;
346 LOG(*d_log << " were_done\n");
351 if (++bi >= d_blocks.size ()){
353 made_progress_last_pass = making_progress;
354 making_progress = false;
358 for (unsigned i = 0; i < d_blocks.size (); i++) // disable any drivers, etc.