Imported Upstream version 3.2.2
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_single_threaded_scheduler.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2004 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gr_single_threaded_scheduler.h>
28 #include <gr_block.h>
29 #include <gr_block_detail.h>
30 #include <gr_buffer.h>
31 #include <boost/thread.hpp>
32 #include <iostream>
33 #include <limits>
34 #include <assert.h>
35 #include <stdio.h>
36
37 // must be defined to either 0 or 1
38 #define ENABLE_LOGGING 0
39
40 #if (ENABLE_LOGGING)
41 #define LOG(x) do { x; } while(0)
42 #else
43 #define LOG(x) do {;} while(0)
44 #endif
45
46 static int which_scheduler  = 0;
47
48 gr_single_threaded_scheduler_sptr
49 gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks)
50 {
51   return
52     gr_single_threaded_scheduler_sptr (new gr_single_threaded_scheduler (blocks));
53 }
54
55 gr_single_threaded_scheduler::gr_single_threaded_scheduler (
56     const std::vector<gr_block_sptr> &blocks)
57   : d_blocks (blocks), d_enabled (true), d_log(0)
58 {
59   if (ENABLE_LOGGING){
60     char name[100];
61     snprintf(name, sizeof(name), "sst-%d.log", which_scheduler++);
62     d_log = new std::ofstream(name);
63     *d_log << "gr_single_threaded_scheduler: "
64            << d_blocks.size ()
65            << " blocks\n";
66   }
67 }
68
69 gr_single_threaded_scheduler::~gr_single_threaded_scheduler ()
70 {
71   if (ENABLE_LOGGING)
72     delete d_log;
73 }
74
75 void
76 gr_single_threaded_scheduler::run ()
77 {
78   // d_enabled = true;          // KLUDGE
79   main_loop ();
80 }
81
82 void
83 gr_single_threaded_scheduler::stop ()
84
85   if (0)
86     std::cout << "gr_singled_threaded_scheduler::stop() "
87               << this << std::endl;
88   d_enabled = false;
89 }
90
91 inline static unsigned int
92 round_up (unsigned int n, unsigned int multiple)
93 {
94   return ((n + multiple - 1) / multiple) * multiple;
95 }
96
97 inline static unsigned int
98 round_down (unsigned int n, unsigned int multiple)
99 {
100   return (n / multiple) * multiple;
101 }
102
103 //
104 // Return minimum available write space in all our downstream buffers
105 // or -1 if we're output blocked and the output we're blocked
106 // on is done.
107 //
108 static int
109 min_available_space (gr_block_detail *d, int output_multiple)
110 {
111   int   min_space = std::numeric_limits<int>::max();
112
113   for (int i = 0; i < d->noutputs (); i++){
114     int n = round_down (d->output(i)->space_available (), output_multiple);
115     if (n == 0){                        // We're blocked on output.
116       if (d->output(i)->done()){        // Downstream is done, therefore we're done.
117         return -1;
118       }
119       return 0;
120     }
121     min_space = std::min (min_space, n);
122   }
123   return min_space;
124 }
125
126 void
127 gr_single_threaded_scheduler::main_loop ()
128 {
129   static const int DEFAULT_CAPACITY = 16;
130
131   int                           noutput_items;
132   gr_vector_int                 ninput_items_required (DEFAULT_CAPACITY);
133   gr_vector_int                 ninput_items (DEFAULT_CAPACITY);
134   gr_vector_const_void_star     input_items (DEFAULT_CAPACITY);
135   gr_vector_void_star           output_items (DEFAULT_CAPACITY);
136   unsigned int                  bi;
137   unsigned int                  nalive;
138   int                           max_items_avail;
139   bool                          made_progress_last_pass;
140   bool                          making_progress;
141
142   for (unsigned i = 0; i < d_blocks.size (); i++)
143     d_blocks[i]->detail()->set_done (false);            // reset any done flags
144
145   for (unsigned i = 0; i < d_blocks.size (); i++)       // enable any drivers, etc.
146     d_blocks[i]->start();
147
148
149   bi = 0;
150   made_progress_last_pass = true;
151   making_progress = false;
152
153   // Loop while there are still blocks alive
154
155   nalive = d_blocks.size ();
156   while (d_enabled && nalive > 0){
157
158     if (boost::this_thread::interruption_requested())
159       break;
160
161     gr_block            *m = d_blocks[bi].get ();
162     gr_block_detail     *d = m->detail().get ();
163
164     LOG(*d_log << std::endl << m);
165
166     if (d->done ())
167       goto next_block;
168
169     if (d->source_p ()){
170       // Invoke sources as a last resort.  As long as the previous pass
171       // made progress, don't call a source.
172       if (made_progress_last_pass){
173         LOG(*d_log << "  Skipping source\n");
174         goto next_block;
175       }
176
177       ninput_items_required.resize (0);
178       ninput_items.resize (0);
179       input_items.resize (0);
180       output_items.resize (d->noutputs ());
181
182       // determine the minimum available output space
183       noutput_items = min_available_space (d, m->output_multiple ());
184       LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
185       if (noutput_items == -1)          // we're done
186         goto were_done;
187
188       if (noutput_items == 0){          // we're output blocked
189         LOG(*d_log << "  BLKD_OUT\n");
190         goto next_block;
191       }
192
193       goto setup_call_to_work;          // jump to common code
194     }
195
196     else if (d->sink_p ()){
197       ninput_items_required.resize (d->ninputs ());
198       ninput_items.resize (d->ninputs ());
199       input_items.resize (d->ninputs ());
200       output_items.resize (0);
201       LOG(*d_log << " sink\n");
202
203       max_items_avail = 0;
204       for (int i = 0; i < d->ninputs (); i++){
205         ninput_items[i] = d->input(i)->items_available();
206         //if (ninput_items[i] == 0 && d->input(i)->done())
207         if (ninput_items[i] < m->output_multiple() && d->input(i)->done())
208           goto were_done;
209         
210         max_items_avail = std::max (max_items_avail, ninput_items[i]);
211       }
212
213       // take a swag at how much output we can sink
214       noutput_items = (int) (max_items_avail * m->relative_rate ());
215       noutput_items = round_down (noutput_items, m->output_multiple ());
216       LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
217       LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);
218
219       if (noutput_items == 0){  // we're blocked on input
220         LOG(*d_log << "  BLKD_IN\n");
221         goto next_block;
222       }
223
224       goto try_again;           // Jump to code shared with regular case.
225     }
226
227     else {
228       // do the regular thing
229       ninput_items_required.resize (d->ninputs ());
230       ninput_items.resize (d->ninputs ());
231       input_items.resize (d->ninputs ());
232       output_items.resize (d->noutputs ());
233
234       max_items_avail = 0;
235       for (int i = 0; i < d->ninputs (); i++){
236         ninput_items[i] = d->input(i)->items_available ();
237         max_items_avail = std::max (max_items_avail, ninput_items[i]);
238       }
239
240       // determine the minimum available output space
241       noutput_items = min_available_space (d, m->output_multiple ());
242       if (ENABLE_LOGGING){
243         *d_log << " regular ";
244         if (m->relative_rate() >= 1.0)
245           *d_log << "1:" << m->relative_rate() << std::endl;
246         else
247           *d_log << 1.0/m->relative_rate() << ":1\n";
248         *d_log << "  max_items_avail = " << max_items_avail << std::endl;
249         *d_log << "  noutput_items = " << noutput_items << std::endl;
250       }
251       if (noutput_items == -1)          // we're done
252         goto were_done;
253
254       if (noutput_items == 0){          // we're output blocked
255         LOG(*d_log << "  BLKD_OUT\n");
256         goto next_block;
257       }
258
259 #if 0
260       // Compute best estimate of noutput_items that we can really use.
261       noutput_items =
262         std::min ((unsigned) noutput_items,
263                   std::max ((unsigned) m->output_multiple(),
264                             round_up ((unsigned) (max_items_avail * m->relative_rate()),
265                                       m->output_multiple ())));
266
267       LOG(*d_log << "  revised noutput_items = " << noutput_items << std::endl);
268 #endif
269
270     try_again:
271       if (m->fixed_rate()){
272         // try to work it forward starting with max_items_avail.
273         // We want to try to consume all the input we've got.
274         int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
275         reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
276         if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
277           noutput_items = reqd_noutput_items;
278       }
279
280       // ask the block how much input they need to produce noutput_items
281       m->forecast (noutput_items, ninput_items_required);
282
283       // See if we've got sufficient input available
284
285       int i;
286       for (i = 0; i < d->ninputs (); i++)
287         if (ninput_items_required[i] > ninput_items[i]) // not enough
288           break;
289
290       if (i < d->ninputs ()){                   // not enough input on input[i]
291         // if we can, try reducing the size of our output request
292         if (noutput_items > m->output_multiple ()){
293           noutput_items /= 2;
294           noutput_items = round_up (noutput_items, m->output_multiple ());
295           goto try_again;
296         }
297
298         // We're blocked on input
299         LOG(*d_log << "  BLKD_IN\n");
300         if (d->input(i)->done())    // If the upstream block is done, we're done
301           goto were_done;
302
303         // Is it possible to ever fulfill this request?
304         if (ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
305           // Nope, never going to happen...
306           std::cerr << "\nsched: <gr_block " << m->name()
307                     << " (" << m->unique_id() << ")>"
308                     << " is requesting more input data\n"
309                     << "  than we can provide.\n"
310                     << "  ninput_items_required = "
311                     << ninput_items_required[i] << "\n"
312                     << "  max_possible_items_available = "
313                     << d->input(i)->max_possible_items_available() << "\n"
314                     << "  If this is a filter, consider reducing the number of taps.\n";
315           goto were_done;
316         }
317
318         goto next_block;
319       }
320
321       // We've got enough data on each input to produce noutput_items.
322       // Finish setting up the call to work.
323
324       for (int i = 0; i < d->ninputs (); i++)
325         input_items[i] = d->input(i)->read_pointer();
326
327     setup_call_to_work:
328
329       for (int i = 0; i < d->noutputs (); i++)
330         output_items[i] = d->output(i)->write_pointer();
331
332       // Do the actual work of the block
333       int n = m->general_work (noutput_items, ninput_items,
334                                input_items, output_items);
335       LOG(*d_log << "  general_work: noutput_items = " << noutput_items
336           << " result = " << n << std::endl);
337
338       if (n == -1)              // block is done
339         goto were_done;
340
341       d->produce_each (n);      // advance write pointers
342       if (n > 0)
343         making_progress = true;
344
345       goto next_block;
346     }
347     assert (0);
348     
349   were_done:
350     LOG(*d_log << "  were_done\n");
351     d->set_done (true);
352     nalive--;
353
354   next_block:
355     if (++bi >= d_blocks.size ()){
356       bi = 0;
357       made_progress_last_pass = making_progress;
358       making_progress = false;
359     }
360   }
361
362   for (unsigned i = 0; i < d_blocks.size (); i++)       // disable any drivers, etc.
363     d_blocks[i]->stop();
364 }