Imported Upstream version 3.2.2
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_block_executor.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2004,2008 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gr_block_executor.h>
28 #include <gr_block.h>
29 #include <gr_block_detail.h>
30 #include <gr_buffer.h>
31 #include <boost/thread.hpp>
32 #include <iostream>
33 #include <limits>
34 #include <assert.h>
35 #include <stdio.h>
36
37 // must be defined to either 0 or 1
38 #define ENABLE_LOGGING 0
39
40 #if (ENABLE_LOGGING)
41 #define LOG(x) do { x; } while(0)
42 #else
43 #define LOG(x) do {;} while(0)
44 #endif
45
46 static int which_scheduler  = 0;
47
48 inline static unsigned int
49 round_up (unsigned int n, unsigned int multiple)
50 {
51   return ((n + multiple - 1) / multiple) * multiple;
52 }
53
54 inline static unsigned int
55 round_down (unsigned int n, unsigned int multiple)
56 {
57   return (n / multiple) * multiple;
58 }
59
60 //
61 // Return minimum available write space in all our downstream buffers
62 // or -1 if we're output blocked and the output we're blocked
63 // on is done.
64 //
65 static int
66 min_available_space (gr_block_detail *d, int output_multiple)
67 {
68   int   min_space = std::numeric_limits<int>::max();
69
70   for (int i = 0; i < d->noutputs (); i++){
71     gr_buffer::scoped_lock guard(*d->output(i)->mutex());
72 #if 0
73     int n = round_down(d->output(i)->space_available(), output_multiple);
74 #else
75     int n = round_down(std::min(d->output(i)->space_available(),
76                                 d->output(i)->bufsize()/2),
77                        output_multiple);
78 #endif
79     if (n == 0){                        // We're blocked on output.
80       if (d->output(i)->done()){        // Downstream is done, therefore we're done.
81         return -1;
82       }
83       return 0;
84     }
85     min_space = std::min (min_space, n);
86   }
87   return min_space;
88 }
89
90
91
92 gr_block_executor::gr_block_executor (gr_block_sptr block)
93   : d_block(block), d_log(0)
94 {
95   if (ENABLE_LOGGING){
96     char name[100];
97     snprintf(name, sizeof(name), "sst-%03d.log", which_scheduler++);
98     d_log = new std::ofstream(name);
99     std::unitbuf(*d_log);               // make it unbuffered...
100     *d_log << "gr_block_executor: "
101            << d_block << std::endl;
102   }
103
104   d_block->start();                     // enable any drivers, etc.
105 }
106
107 gr_block_executor::~gr_block_executor ()
108 {
109   if (ENABLE_LOGGING)
110     delete d_log;
111
112   d_block->stop();                      // stop any drivers, etc.
113 }
114
115 gr_block_executor::state
116 gr_block_executor::run_one_iteration()
117 {
118   int                   noutput_items;
119   int                   max_items_avail;
120
121   gr_block              *m = d_block.get();
122   gr_block_detail       *d = m->detail().get();
123
124   LOG(*d_log << std::endl << m);
125
126   if (d->done()){
127     assert(0);
128     return DONE;
129   }
130
131   if (d->source_p ()){
132     d_ninput_items_required.resize (0);
133     d_ninput_items.resize (0);
134     d_input_items.resize (0);
135     d_input_done.resize(0);
136     d_output_items.resize (d->noutputs ());
137
138     // determine the minimum available output space
139     noutput_items = min_available_space (d, m->output_multiple ());
140     LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
141     if (noutput_items == -1)            // we're done
142       goto were_done;
143
144     if (noutput_items == 0){            // we're output blocked
145       LOG(*d_log << "  BLKD_OUT\n");
146       return BLKD_OUT;
147     }
148
149     goto setup_call_to_work;            // jump to common code
150   }
151
152   else if (d->sink_p ()){
153     d_ninput_items_required.resize (d->ninputs ());
154     d_ninput_items.resize (d->ninputs ());
155     d_input_items.resize (d->ninputs ());
156     d_input_done.resize(d->ninputs());
157     d_output_items.resize (0);
158     LOG(*d_log << " sink\n");
159
160     max_items_avail = 0;
161     for (int i = 0; i < d->ninputs (); i++){
162       {
163         /*
164          * Acquire the mutex and grab local copies of items_available and done.
165          */
166         gr_buffer::scoped_lock guard(*d->input(i)->mutex());
167         d_ninput_items[i] = d->input(i)->items_available();
168         d_input_done[i] = d->input(i)->done();
169       }
170
171       LOG(*d_log << "  d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl);
172       LOG(*d_log << "  d_input_done[" << i << "] = " << d_input_done[i] << std::endl);
173       
174       if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
175         goto were_done;
176         
177       max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
178     }
179
180     // take a swag at how much output we can sink
181     noutput_items = (int) (max_items_avail * m->relative_rate ());
182     noutput_items = round_down (noutput_items, m->output_multiple ());
183     LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
184     LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);
185
186     if (noutput_items == 0){    // we're blocked on input
187       LOG(*d_log << "  BLKD_IN\n");
188       return BLKD_IN;
189     }
190
191     goto try_again;             // Jump to code shared with regular case.
192   }
193
194   else {
195     // do the regular thing
196     d_ninput_items_required.resize (d->ninputs ());
197     d_ninput_items.resize (d->ninputs ());
198     d_input_items.resize (d->ninputs ());
199     d_input_done.resize(d->ninputs());
200     d_output_items.resize (d->noutputs ());
201
202     max_items_avail = 0;
203     for (int i = 0; i < d->ninputs (); i++){
204       {
205         /*
206          * Acquire the mutex and grab local copies of items_available and done.
207          */
208         gr_buffer::scoped_lock guard(*d->input(i)->mutex());
209         d_ninput_items[i] = d->input(i)->items_available ();
210         d_input_done[i] = d->input(i)->done();
211       }
212       max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
213     }
214
215     // determine the minimum available output space
216     noutput_items = min_available_space (d, m->output_multiple ());
217     if (ENABLE_LOGGING){
218       *d_log << " regular ";
219       if (m->relative_rate() >= 1.0)
220         *d_log << "1:" << m->relative_rate() << std::endl;
221       else
222         *d_log << 1.0/m->relative_rate() << ":1\n";
223       *d_log << "  max_items_avail = " << max_items_avail << std::endl;
224       *d_log << "  noutput_items = " << noutput_items << std::endl;
225     }
226     if (noutput_items == -1)            // we're done
227       goto were_done;
228
229     if (noutput_items == 0){            // we're output blocked
230       LOG(*d_log << "  BLKD_OUT\n");
231       return BLKD_OUT;
232     }
233
234   try_again:
235     if (m->fixed_rate()){
236       // try to work it forward starting with max_items_avail.
237       // We want to try to consume all the input we've got.
238       int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
239       reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
240       if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
241         noutput_items = reqd_noutput_items;
242     }
243
244     // ask the block how much input they need to produce noutput_items
245     m->forecast (noutput_items, d_ninput_items_required);
246
247     // See if we've got sufficient input available
248
249     int i;
250     for (i = 0; i < d->ninputs (); i++)
251       if (d_ninput_items_required[i] > d_ninput_items[i])       // not enough
252         break;
253
254     if (i < d->ninputs ()){                     // not enough input on input[i]
255       // if we can, try reducing the size of our output request
256       if (noutput_items > m->output_multiple ()){
257         noutput_items /= 2;
258         noutput_items = round_up (noutput_items, m->output_multiple ());
259         goto try_again;
260       }
261
262       // We're blocked on input
263       LOG(*d_log << "  BLKD_IN\n");
264       if (d_input_done[i])      // If the upstream block is done, we're done
265         goto were_done;
266
267       // Is it possible to ever fulfill this request?
268       if (d_ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
269         // Nope, never going to happen...
270         std::cerr << "\nsched: <gr_block " << m->name()
271                   << " (" << m->unique_id() << ")>"
272                   << " is requesting more input data\n"
273                   << "  than we can provide.\n"
274                   << "  ninput_items_required = "
275                   << d_ninput_items_required[i] << "\n"
276                   << "  max_possible_items_available = "
277                   << d->input(i)->max_possible_items_available() << "\n"
278                   << "  If this is a filter, consider reducing the number of taps.\n";
279         goto were_done;
280       }
281
282       return BLKD_IN;
283     }
284
285     // We've got enough data on each input to produce noutput_items.
286     // Finish setting up the call to work.
287
288     for (int i = 0; i < d->ninputs (); i++)
289       d_input_items[i] = d->input(i)->read_pointer();
290
291   setup_call_to_work:
292
293     for (int i = 0; i < d->noutputs (); i++)
294       d_output_items[i] = d->output(i)->write_pointer();
295
296     // Do the actual work of the block
297     int n = m->general_work (noutput_items, d_ninput_items,
298                              d_input_items, d_output_items);
299     LOG(*d_log << "  general_work: noutput_items = " << noutput_items
300         << " result = " << n << std::endl);
301
302     if (n == -1)                // block is done
303       goto were_done;
304
305     d->produce_each (n);        // advance write pointers
306     if (n > 0)
307       return READY;
308
309     // We didn't produce any output even though we called general_work.
310     // We have (most likely) consumed some input.
311
312     // If this is a source, it's broken.
313     if (d->source_p()){
314       std::cerr << "gr_block_executor: source " << m
315                 << " returned 0 from work.  We're marking it DONE.\n";
316       // FIXME maybe we ought to raise an exception...
317       goto were_done;
318     }
319
320     // Have the caller try again...
321     return READY_NO_OUTPUT;
322   }
323   assert (0);
324     
325  were_done:
326   LOG(*d_log << "  were_done\n");
327   d->set_done (true);
328   return DONE;
329 }