3 * Copyright 2004,2008 Free Software Foundation, Inc.
5 * This file is part of GNU Radio
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
27 #include <gr_block_executor.h>
29 #include <gr_block_detail.h>
30 #include <gr_buffer.h>
31 #include <boost/thread.hpp>
37 // must be defined to either 0 or 1
38 #define ENABLE_LOGGING 0
41 #define LOG(x) do { x; } while(0)
43 #define LOG(x) do {;} while(0)
46 static int which_scheduler = 0;
48 inline static unsigned int
49 round_up (unsigned int n, unsigned int multiple)
51 return ((n + multiple - 1) / multiple) * multiple;
54 inline static unsigned int
55 round_down (unsigned int n, unsigned int multiple)
57 return (n / multiple) * multiple;
61 // Return minimum available write space in all our downstream buffers
62 // or -1 if we're output blocked and the output we're blocked
66 min_available_space (gr_block_detail *d, int output_multiple)
68 int min_space = std::numeric_limits<int>::max();
70 for (int i = 0; i < d->noutputs (); i++){
71 gr_buffer::scoped_lock guard(*d->output(i)->mutex());
73 int n = round_down(d->output(i)->space_available(), output_multiple);
75 int n = round_down(std::min(d->output(i)->space_available(),
76 d->output(i)->bufsize()/2),
79 if (n == 0){ // We're blocked on output.
80 if (d->output(i)->done()){ // Downstream is done, therefore we're done.
85 min_space = std::min (min_space, n);
92 gr_block_executor::gr_block_executor (gr_block_sptr block)
93 : d_block(block), d_log(0)
97 snprintf(name, sizeof(name), "sst-%03d.log", which_scheduler++);
98 d_log = new std::ofstream(name);
99 std::unitbuf(*d_log); // make it unbuffered...
100 *d_log << "gr_block_executor: "
101 << d_block << std::endl;
104 d_block->start(); // enable any drivers, etc.
107 gr_block_executor::~gr_block_executor ()
112 d_block->stop(); // stop any drivers, etc.
115 gr_block_executor::state
116 gr_block_executor::run_one_iteration()
121 gr_block *m = d_block.get();
122 gr_block_detail *d = m->detail().get();
124 LOG(*d_log << std::endl << m);
132 d_ninput_items_required.resize (0);
133 d_ninput_items.resize (0);
134 d_input_items.resize (0);
135 d_input_done.resize(0);
136 d_output_items.resize (d->noutputs ());
138 // determine the minimum available output space
139 noutput_items = min_available_space (d, m->output_multiple ());
140 LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
141 if (noutput_items == -1) // we're done
144 if (noutput_items == 0){ // we're output blocked
145 LOG(*d_log << " BLKD_OUT\n");
149 goto setup_call_to_work; // jump to common code
152 else if (d->sink_p ()){
153 d_ninput_items_required.resize (d->ninputs ());
154 d_ninput_items.resize (d->ninputs ());
155 d_input_items.resize (d->ninputs ());
156 d_input_done.resize(d->ninputs());
157 d_output_items.resize (0);
158 LOG(*d_log << " sink\n");
161 for (int i = 0; i < d->ninputs (); i++){
164 * Acquire the mutex and grab local copies of items_available and done.
166 gr_buffer::scoped_lock guard(*d->input(i)->mutex());
167 d_ninput_items[i] = d->input(i)->items_available();
168 d_input_done[i] = d->input(i)->done();
171 LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl);
172 LOG(*d_log << " d_input_done[" << i << "] = " << d_input_done[i] << std::endl);
174 if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
177 max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
180 // take a swag at how much output we can sink
181 noutput_items = (int) (max_items_avail * m->relative_rate ());
182 noutput_items = round_down (noutput_items, m->output_multiple ());
183 LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
184 LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
186 if (noutput_items == 0){ // we're blocked on input
187 LOG(*d_log << " BLKD_IN\n");
191 goto try_again; // Jump to code shared with regular case.
195 // do the regular thing
196 d_ninput_items_required.resize (d->ninputs ());
197 d_ninput_items.resize (d->ninputs ());
198 d_input_items.resize (d->ninputs ());
199 d_input_done.resize(d->ninputs());
200 d_output_items.resize (d->noutputs ());
203 for (int i = 0; i < d->ninputs (); i++){
206 * Acquire the mutex and grab local copies of items_available and done.
208 gr_buffer::scoped_lock guard(*d->input(i)->mutex());
209 d_ninput_items[i] = d->input(i)->items_available ();
210 d_input_done[i] = d->input(i)->done();
212 max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
215 // determine the minimum available output space
216 noutput_items = min_available_space (d, m->output_multiple ());
218 *d_log << " regular ";
219 if (m->relative_rate() >= 1.0)
220 *d_log << "1:" << m->relative_rate() << std::endl;
222 *d_log << 1.0/m->relative_rate() << ":1\n";
223 *d_log << " max_items_avail = " << max_items_avail << std::endl;
224 *d_log << " noutput_items = " << noutput_items << std::endl;
226 if (noutput_items == -1) // we're done
229 if (noutput_items == 0){ // we're output blocked
230 LOG(*d_log << " BLKD_OUT\n");
235 if (m->fixed_rate()){
236 // try to work it forward starting with max_items_avail.
237 // We want to try to consume all the input we've got.
238 int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
239 reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
240 if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
241 noutput_items = reqd_noutput_items;
244 // ask the block how much input they need to produce noutput_items
245 m->forecast (noutput_items, d_ninput_items_required);
247 // See if we've got sufficient input available
250 for (i = 0; i < d->ninputs (); i++)
251 if (d_ninput_items_required[i] > d_ninput_items[i]) // not enough
254 if (i < d->ninputs ()){ // not enough input on input[i]
255 // if we can, try reducing the size of our output request
256 if (noutput_items > m->output_multiple ()){
258 noutput_items = round_up (noutput_items, m->output_multiple ());
262 // We're blocked on input
263 LOG(*d_log << " BLKD_IN\n");
264 if (d_input_done[i]) // If the upstream block is done, we're done
267 // Is it possible to ever fulfill this request?
268 if (d_ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
269 // Nope, never going to happen...
270 std::cerr << "\nsched: <gr_block " << m->name()
271 << " (" << m->unique_id() << ")>"
272 << " is requesting more input data\n"
273 << " than we can provide.\n"
274 << " ninput_items_required = "
275 << d_ninput_items_required[i] << "\n"
276 << " max_possible_items_available = "
277 << d->input(i)->max_possible_items_available() << "\n"
278 << " If this is a filter, consider reducing the number of taps.\n";
285 // We've got enough data on each input to produce noutput_items.
286 // Finish setting up the call to work.
288 for (int i = 0; i < d->ninputs (); i++)
289 d_input_items[i] = d->input(i)->read_pointer();
293 for (int i = 0; i < d->noutputs (); i++)
294 d_output_items[i] = d->output(i)->write_pointer();
296 // Do the actual work of the block
297 int n = m->general_work (noutput_items, d_ninput_items,
298 d_input_items, d_output_items);
299 LOG(*d_log << " general_work: noutput_items = " << noutput_items
300 << " result = " << n << std::endl);
302 if (n == -1) // block is done
305 d->produce_each (n); // advance write pointers
309 // We didn't produce any output even though we called general_work.
310 // We have (most likely) consumed some input.
312 // If this is a source, it's broken.
314 std::cerr << "gr_block_executor: source " << m
315 << " returned 0 from work. We're marking it DONE.\n";
316 // FIXME maybe we ought to raise an exception...
320 // Have the caller try again...
321 return READY_NO_OUTPUT;
326 LOG(*d_log << " were_done\n");