Imported Upstream version 3.0
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_single_threaded_scheduler.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2004 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gr_single_threaded_scheduler.h>
28 #include <gr_block.h>
29 #include <gr_block_detail.h>
30 #include <gr_buffer.h>
31 #include <iostream>
32 #include <limits>
33 #include <assert.h>
34 #include <stdio.h>
35
36 // must be defined to either 0 or 1
37 #define ENABLE_LOGGING 0
38
39 #if (ENABLE_LOGGING)
40 #define LOG(x) do { x; } while(0)
41 #else
42 #define LOG(x) do {;} while(0)
43 #endif
44
45 static int which_scheduler  = 0;
46
47
48 std::ostream&
49 operator << (std::ostream& os, const gr_block *m)
50 {
51   os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
52   return os;
53 }
54
55 gr_single_threaded_scheduler_sptr
56 gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks)
57 {
58   return
59     gr_single_threaded_scheduler_sptr (new gr_single_threaded_scheduler (blocks));
60 }
61
62 gr_single_threaded_scheduler::gr_single_threaded_scheduler (
63     const std::vector<gr_block_sptr> &blocks)
64   : d_blocks (blocks), d_enabled (true), d_log(0)
65 {
66   if (ENABLE_LOGGING){
67     char name[100];
68     snprintf(name, sizeof(name), "sst-%d.log", which_scheduler++);
69     d_log = new std::ofstream(name);
70     *d_log << "gr_single_threaded_scheduler: "
71            << d_blocks.size ()
72            << " blocks\n";
73   }
74 }
75
76 gr_single_threaded_scheduler::~gr_single_threaded_scheduler ()
77 {
78   if (ENABLE_LOGGING)
79     delete d_log;
80 }
81
82 void
83 gr_single_threaded_scheduler::run ()
84 {
85   d_enabled = true;
86   main_loop ();
87 }
88
89
90 inline static unsigned int
91 round_up (unsigned int n, unsigned int multiple)
92 {
93   return ((n + multiple - 1) / multiple) * multiple;
94 }
95
96 inline static unsigned int
97 round_down (unsigned int n, unsigned int multiple)
98 {
99   return (n / multiple) * multiple;
100 }
101
102 //
103 // Return minimum available write space in all our downstream buffers
104 // or -1 if we're output blocked and the output we're blocked
105 // on is done.
106 //
107 static int
108 min_available_space (gr_block_detail *d, int output_multiple)
109 {
110   int   min_space = std::numeric_limits<int>::max();
111
112   for (int i = 0; i < d->noutputs (); i++){
113     int n = round_down (d->output(i)->space_available (), output_multiple);
114     if (n == 0){                        // We're blocked on output.
115       if (d->output(i)->done()){        // Downstream is done, therefore we're done.
116         return -1;
117       }
118       return 0;
119     }
120     min_space = std::min (min_space, n);
121   }
122   return min_space;
123 }
124
125 void
126 gr_single_threaded_scheduler::main_loop ()
127 {
128   static const int DEFAULT_CAPACITY = 16;
129
130   int                           noutput_items;
131   gr_vector_int                 ninput_items_required (DEFAULT_CAPACITY);
132   gr_vector_int                 ninput_items (DEFAULT_CAPACITY);
133   gr_vector_const_void_star     input_items (DEFAULT_CAPACITY);
134   gr_vector_void_star           output_items (DEFAULT_CAPACITY);
135   unsigned int                  bi;
136   unsigned int                  nalive;
137   int                           max_items_avail;
138   bool                          made_progress_last_pass;
139   bool                          making_progress;
140
141   for (unsigned i = 0; i < d_blocks.size (); i++)
142     d_blocks[i]->detail()->set_done (false);            // reset any done flags
143
144   for (unsigned i = 0; i < d_blocks.size (); i++)       // enable any drivers, etc.
145     d_blocks[i]->start();
146
147
148   bi = 0;
149   made_progress_last_pass = true;
150   making_progress = false;
151
152   // Loop while there are still blocks alive
153
154   nalive = d_blocks.size ();
155   while (d_enabled && nalive > 0){
156
157     gr_block            *m = d_blocks[bi].get ();
158     gr_block_detail     *d = m->detail().get ();
159
160     LOG(*d_log << std::endl << m);
161
162     if (d->done ())
163       goto next_block;
164
165     if (d->source_p ()){
166       // Invoke sources as a last resort.  As long as the previous pass
167       // made progress, don't call a source.
168       if (made_progress_last_pass){
169         LOG(*d_log << "  Skipping source\n");
170         goto next_block;
171       }
172
173       ninput_items_required.resize (0);
174       ninput_items.resize (0);
175       input_items.resize (0);
176       output_items.resize (d->noutputs ());
177
178       // determine the minimum available output space
179       noutput_items = min_available_space (d, m->output_multiple ());
180       LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
181       if (noutput_items == -1)          // we're done
182         goto were_done;
183
184       if (noutput_items == 0){          // we're output blocked
185         LOG(*d_log << "  BLKD_OUT\n");
186         goto next_block;
187       }
188
189       goto setup_call_to_work;          // jump to common code
190     }
191
192     else if (d->sink_p ()){
193       ninput_items_required.resize (d->ninputs ());
194       ninput_items.resize (d->ninputs ());
195       input_items.resize (d->ninputs ());
196       output_items.resize (0);
197       LOG(*d_log << " sink\n");
198
199       max_items_avail = 0;
200       for (int i = 0; i < d->ninputs (); i++){
201         ninput_items[i] = d->input(i)->items_available();
202         //if (ninput_items[i] == 0 && d->input(i)->done())
203         if (ninput_items[i] < m->output_multiple() && d->input(i)->done())
204           goto were_done;
205         
206         max_items_avail = std::max (max_items_avail, ninput_items[i]);
207       }
208
209       // take a swag at how much output we can sink
210       noutput_items = (int) (max_items_avail * m->relative_rate ());
211       noutput_items = round_down (noutput_items, m->output_multiple ());
212       LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
213       LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);
214
215       if (noutput_items == 0){  // we're blocked on input
216         LOG(*d_log << "  BLKD_IN\n");
217         goto next_block;
218       }
219
220       goto try_again;           // Jump to code shared with regular case.
221     }
222
223     else {
224       // do the regular thing
225       ninput_items_required.resize (d->ninputs ());
226       ninput_items.resize (d->ninputs ());
227       input_items.resize (d->ninputs ());
228       output_items.resize (d->noutputs ());
229
230       max_items_avail = 0;
231       for (int i = 0; i < d->ninputs (); i++){
232         ninput_items[i] = d->input(i)->items_available ();
233         max_items_avail = std::max (max_items_avail, ninput_items[i]);
234       }
235
236       // determine the minimum available output space
237       noutput_items = min_available_space (d, m->output_multiple ());
238       if (ENABLE_LOGGING){
239         *d_log << " regular ";
240         if (m->relative_rate() >= 1.0)
241           *d_log << "1:" << m->relative_rate() << std::endl;
242         else
243           *d_log << 1.0/m->relative_rate() << ":1\n";
244         *d_log << "  max_items_avail = " << max_items_avail << std::endl;
245         *d_log << "  noutput_items = " << noutput_items << std::endl;
246       }
247       if (noutput_items == -1)          // we're done
248         goto were_done;
249
250       if (noutput_items == 0){          // we're output blocked
251         LOG(*d_log << "  BLKD_OUT\n");
252         goto next_block;
253       }
254
255 #if 0
256       // Compute best estimate of noutput_items that we can really use.
257       noutput_items =
258         std::min ((unsigned) noutput_items,
259                   std::max ((unsigned) m->output_multiple(),
260                             round_up ((unsigned) (max_items_avail * m->relative_rate()),
261                                       m->output_multiple ())));
262
263       LOG(*d_log << "  revised noutput_items = " << noutput_items << std::endl);
264 #endif
265
266     try_again:
267       if (m->fixed_rate()){
268         // try to work it forward starting with max_items_avail.
269         // We want to try to consume all the input we've got.
270         int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
271         reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
272         if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
273           noutput_items = reqd_noutput_items;
274       }
275
276       // ask the block how much input they need to produce noutput_items
277       m->forecast (noutput_items, ninput_items_required);
278
279       // See if we've got sufficient input available
280
281       int i;
282       for (i = 0; i < d->ninputs (); i++)
283         if (ninput_items_required[i] > ninput_items[i]) // not enough
284           break;
285
286       if (i < d->ninputs ()){                   // not enough input on input[i]
287         // if we can, try reducing the size of our output request
288         if (noutput_items > m->output_multiple ()){
289           noutput_items /= 2;
290           noutput_items = round_up (noutput_items, m->output_multiple ());
291           goto try_again;
292         }
293
294         // We're blocked on input
295         LOG(*d_log << "  BLKD_IN\n");
296         if (d->input(i)->done())    // If the upstream block is done, we're done
297           goto were_done;
298
299         // Is it possible to ever fulfill this request?
300         if (ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
301           // Nope, never going to happen...
302           std::cerr << "\nsched: <gr_block " << m->name()
303                     << " (" << m->unique_id() << ")>"
304                     << " is requesting more input data\n"
305                     << "  than we can provide.\n"
306                     << "  ninput_items_required = "
307                     << ninput_items_required[i] << "\n"
308                     << "  max_possible_items_available = "
309                     << d->input(i)->max_possible_items_available() << "\n"
310                     << "  If this is a filter, consider reducing the number of taps.\n";
311           goto were_done;
312         }
313
314         goto next_block;
315       }
316
317       // We've got enough data on each input to produce noutput_items.
318       // Finish setting up the call to work.
319
320       for (int i = 0; i < d->ninputs (); i++)
321         input_items[i] = d->input(i)->read_pointer();
322
323     setup_call_to_work:
324
325       for (int i = 0; i < d->noutputs (); i++)
326         output_items[i] = d->output(i)->write_pointer();
327
328       // Do the actual work of the block
329       int n = m->general_work (noutput_items, ninput_items,
330                                input_items, output_items);
331       LOG(*d_log << "  general_work: noutput_items = " << noutput_items
332           << " result = " << n << std::endl);
333
334       if (n == -1)              // block is done
335         goto were_done;
336
337       d->produce_each (n);      // advance write pointers
338       if (n > 0)
339         making_progress = true;
340
341       goto next_block;
342     }
343     assert (0);
344     
345   were_done:
346     LOG(*d_log << "  were_done\n");
347     d->set_done (true);
348     nalive--;
349
350   next_block:
351     if (++bi >= d_blocks.size ()){
352       bi = 0;
353       made_progress_last_pass = making_progress;
354       making_progress = false;
355     }
356   }
357
358   for (unsigned i = 0; i < d_blocks.size (); i++)       // disable any drivers, etc.
359     d_blocks[i]->stop();
360 }