--- /dev/null
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+#include <boost/thread.hpp>
+#include <iostream>
+#include <limits>
+#include <assert.h>
+#include <stdio.h>
+
+// must be defined to either 0 or 1
+#define ENABLE_LOGGING 0
+
+#if (ENABLE_LOGGING)
+#define LOG(x) do { x; } while(0)
+#else
+#define LOG(x) do {;} while(0)
+#endif
+
+static int which_scheduler = 0;
+
+inline static unsigned int
+round_up (unsigned int n, unsigned int multiple)
+{
+ return ((n + multiple - 1) / multiple) * multiple;
+}
+
+inline static unsigned int
+round_down (unsigned int n, unsigned int multiple)
+{
+ return (n / multiple) * multiple;
+}
+
+//
+// Return minimum available write space in all our downstream buffers
+// or -1 if we're output blocked and the output we're blocked
+// on is done.
+//
+static int
+min_available_space (gr_block_detail *d, int output_multiple)
+{
+ int min_space = std::numeric_limits<int>::max();
+
+ for (int i = 0; i < d->noutputs (); i++){
+ gr_buffer::scoped_lock guard(*d->output(i)->mutex());
+#if 0
+ int n = round_down(d->output(i)->space_available(), output_multiple);
+#else
+ int n = round_down(std::min(d->output(i)->space_available(),
+ d->output(i)->bufsize()/2),
+ output_multiple);
+#endif
+ if (n == 0){ // We're blocked on output.
+ if (d->output(i)->done()){ // Downstream is done, therefore we're done.
+ return -1;
+ }
+ return 0;
+ }
+ min_space = std::min (min_space, n);
+ }
+ return min_space;
+}
+
+
+
+gr_block_executor::gr_block_executor (gr_block_sptr block)
+ : d_block(block), d_log(0)
+{
+ if (ENABLE_LOGGING){
+ char name[100];
+ snprintf(name, sizeof(name), "sst-%03d.log", which_scheduler++);
+ d_log = new std::ofstream(name);
+ std::unitbuf(*d_log); // make it unbuffered...
+ *d_log << "gr_block_executor: "
+ << d_block << std::endl;
+ }
+
+ d_block->start(); // enable any drivers, etc.
+}
+
+gr_block_executor::~gr_block_executor ()
+{
+ if (ENABLE_LOGGING)
+ delete d_log;
+
+ d_block->stop(); // stop any drivers, etc.
+}
+
+gr_block_executor::state
+gr_block_executor::run_one_iteration()
+{
+ int noutput_items;
+ int max_items_avail;
+
+ gr_block *m = d_block.get();
+ gr_block_detail *d = m->detail().get();
+
+ LOG(*d_log << std::endl << m);
+
+ if (d->done()){
+ assert(0);
+ return DONE;
+ }
+
+ if (d->source_p ()){
+ d_ninput_items_required.resize (0);
+ d_ninput_items.resize (0);
+ d_input_items.resize (0);
+ d_input_done.resize(0);
+ d_output_items.resize (d->noutputs ());
+
+ // determine the minimum available output space
+ noutput_items = min_available_space (d, m->output_multiple ());
+ LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
+ if (noutput_items == -1) // we're done
+ goto were_done;
+
+ if (noutput_items == 0){ // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ return BLKD_OUT;
+ }
+
+ goto setup_call_to_work; // jump to common code
+ }
+
+ else if (d->sink_p ()){
+ d_ninput_items_required.resize (d->ninputs ());
+ d_ninput_items.resize (d->ninputs ());
+ d_input_items.resize (d->ninputs ());
+ d_input_done.resize(d->ninputs());
+ d_output_items.resize (0);
+ LOG(*d_log << " sink\n");
+
+ max_items_avail = 0;
+ for (int i = 0; i < d->ninputs (); i++){
+ {
+ /*
+ * Acquire the mutex and grab local copies of items_available and done.
+ */
+ gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+ d_ninput_items[i] = d->input(i)->items_available();
+ d_input_done[i] = d->input(i)->done();
+ }
+
+ LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl);
+ LOG(*d_log << " d_input_done[" << i << "] = " << d_input_done[i] << std::endl);
+
+ if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
+ goto were_done;
+
+ max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+ }
+
+ // take a swag at how much output we can sink
+ noutput_items = (int) (max_items_avail * m->relative_rate ());
+ noutput_items = round_down (noutput_items, m->output_multiple ());
+ LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
+ LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
+
+ if (noutput_items == 0){ // we're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ return BLKD_IN;
+ }
+
+ goto try_again; // Jump to code shared with regular case.
+ }
+
+ else {
+ // do the regular thing
+ d_ninput_items_required.resize (d->ninputs ());
+ d_ninput_items.resize (d->ninputs ());
+ d_input_items.resize (d->ninputs ());
+ d_input_done.resize(d->ninputs());
+ d_output_items.resize (d->noutputs ());
+
+ max_items_avail = 0;
+ for (int i = 0; i < d->ninputs (); i++){
+ {
+ /*
+ * Acquire the mutex and grab local copies of items_available and done.
+ */
+ gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+ d_ninput_items[i] = d->input(i)->items_available ();
+ d_input_done[i] = d->input(i)->done();
+ }
+ max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+ }
+
+ // determine the minimum available output space
+ noutput_items = min_available_space (d, m->output_multiple ());
+ if (ENABLE_LOGGING){
+ *d_log << " regular ";
+ if (m->relative_rate() >= 1.0)
+ *d_log << "1:" << m->relative_rate() << std::endl;
+ else
+ *d_log << 1.0/m->relative_rate() << ":1\n";
+ *d_log << " max_items_avail = " << max_items_avail << std::endl;
+ *d_log << " noutput_items = " << noutput_items << std::endl;
+ }
+ if (noutput_items == -1) // we're done
+ goto were_done;
+
+ if (noutput_items == 0){ // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ return BLKD_OUT;
+ }
+
+ try_again:
+ if (m->fixed_rate()){
+ // try to work it forward starting with max_items_avail.
+ // We want to try to consume all the input we've got.
+ int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
+ reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
+ if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
+ noutput_items = reqd_noutput_items;
+ }
+
+ // ask the block how much input they need to produce noutput_items
+ m->forecast (noutput_items, d_ninput_items_required);
+
+ // See if we've got sufficient input available
+
+ int i;
+ for (i = 0; i < d->ninputs (); i++)
+ if (d_ninput_items_required[i] > d_ninput_items[i]) // not enough
+ break;
+
+ if (i < d->ninputs ()){ // not enough input on input[i]
+ // if we can, try reducing the size of our output request
+ if (noutput_items > m->output_multiple ()){
+ noutput_items /= 2;
+ noutput_items = round_up (noutput_items, m->output_multiple ());
+ goto try_again;
+ }
+
+ // We're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ if (d_input_done[i]) // If the upstream block is done, we're done
+ goto were_done;
+
+ // Is it possible to ever fulfill this request?
+ if (d_ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
+ // Nope, never going to happen...
+ std::cerr << "\nsched: <gr_block " << m->name()
+ << " (" << m->unique_id() << ")>"
+ << " is requesting more input data\n"
+ << " than we can provide.\n"
+ << " ninput_items_required = "
+ << d_ninput_items_required[i] << "\n"
+ << " max_possible_items_available = "
+ << d->input(i)->max_possible_items_available() << "\n"
+ << " If this is a filter, consider reducing the number of taps.\n";
+ goto were_done;
+ }
+
+ return BLKD_IN;
+ }
+
+ // We've got enough data on each input to produce noutput_items.
+ // Finish setting up the call to work.
+
+ for (int i = 0; i < d->ninputs (); i++)
+ d_input_items[i] = d->input(i)->read_pointer();
+
+ setup_call_to_work:
+
+ for (int i = 0; i < d->noutputs (); i++)
+ d_output_items[i] = d->output(i)->write_pointer();
+
+ // Do the actual work of the block
+ int n = m->general_work (noutput_items, d_ninput_items,
+ d_input_items, d_output_items);
+ LOG(*d_log << " general_work: noutput_items = " << noutput_items
+ << " result = " << n << std::endl);
+
+ if (n == -1) // block is done
+ goto were_done;
+
+ d->produce_each (n); // advance write pointers
+ if (n > 0)
+ return READY;
+
+ // We didn't produce any output even though we called general_work.
+ // We have (most likely) consumed some input.
+
+ // If this is a source, it's broken.
+ if (d->source_p()){
+ std::cerr << "gr_block_executor: source " << m
+ << " returned 0 from work. We're marking it DONE.\n";
+ // FIXME maybe we ought to raise an exception...
+ goto were_done;
+ }
+
+ // Have the caller try again...
+ return READY_NO_OUTPUT;
+ }
+ assert (0);
+
+ were_done:
+ LOG(*d_log << " were_done\n");
+ d->set_done (true);
+ return DONE;
+}