]> git.gag.com Git - debian/gnuradio/blobdiff - gnuradio-core/src/lib/runtime/gr_block_executor.cc
Imported Upstream version 3.2.2
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_block_executor.cc
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
new file mode 100644 (file)
index 0000000..fd3a916
--- /dev/null
@@ -0,0 +1,329 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+#include <boost/thread.hpp>
+#include <iostream>
+#include <limits>
+#include <assert.h>
+#include <stdio.h>
+
+// must be defined to either 0 or 1
+#define ENABLE_LOGGING 0
+
+#if (ENABLE_LOGGING)
+#define LOG(x) do { x; } while(0)
+#else
+#define LOG(x) do {;} while(0)
+#endif
+
+static int which_scheduler  = 0;
+
+inline static unsigned int
+round_up (unsigned int n, unsigned int multiple)
+{
+  return ((n + multiple - 1) / multiple) * multiple;
+}
+
+inline static unsigned int
+round_down (unsigned int n, unsigned int multiple)
+{
+  return (n / multiple) * multiple;
+}
+
+//
+// Return minimum available write space in all our downstream buffers
+// or -1 if we're output blocked and the output we're blocked
+// on is done.
+//
+static int
+min_available_space (gr_block_detail *d, int output_multiple)
+{
+  int  min_space = std::numeric_limits<int>::max();
+
+  for (int i = 0; i < d->noutputs (); i++){
+    gr_buffer::scoped_lock guard(*d->output(i)->mutex());
+#if 0
+    int n = round_down(d->output(i)->space_available(), output_multiple);
+#else
+    int n = round_down(std::min(d->output(i)->space_available(),
+                               d->output(i)->bufsize()/2),
+                      output_multiple);
+#endif
+    if (n == 0){                       // We're blocked on output.
+      if (d->output(i)->done()){       // Downstream is done, therefore we're done.
+       return -1;
+      }
+      return 0;
+    }
+    min_space = std::min (min_space, n);
+  }
+  return min_space;
+}
+
+
+
+gr_block_executor::gr_block_executor (gr_block_sptr block)
+  : d_block(block), d_log(0)
+{
+  if (ENABLE_LOGGING){
+    char name[100];
+    snprintf(name, sizeof(name), "sst-%03d.log", which_scheduler++);
+    d_log = new std::ofstream(name);
+    std::unitbuf(*d_log);              // make it unbuffered...
+    *d_log << "gr_block_executor: "
+          << d_block << std::endl;
+  }
+
+  d_block->start();                    // enable any drivers, etc.
+}
+
+gr_block_executor::~gr_block_executor ()
+{
+  if (ENABLE_LOGGING)
+    delete d_log;
+
+  d_block->stop();                     // stop any drivers, etc.
+}
+
+gr_block_executor::state
+gr_block_executor::run_one_iteration()
+{
+  int                  noutput_items;
+  int                  max_items_avail;
+
+  gr_block             *m = d_block.get();
+  gr_block_detail      *d = m->detail().get();
+
+  LOG(*d_log << std::endl << m);
+
+  if (d->done()){
+    assert(0);
+    return DONE;
+  }
+
+  if (d->source_p ()){
+    d_ninput_items_required.resize (0);
+    d_ninput_items.resize (0);
+    d_input_items.resize (0);
+    d_input_done.resize(0);
+    d_output_items.resize (d->noutputs ());
+
+    // determine the minimum available output space
+    noutput_items = min_available_space (d, m->output_multiple ());
+    LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
+    if (noutput_items == -1)           // we're done
+      goto were_done;
+
+    if (noutput_items == 0){           // we're output blocked
+      LOG(*d_log << "  BLKD_OUT\n");
+      return BLKD_OUT;
+    }
+
+    goto setup_call_to_work;           // jump to common code
+  }
+
+  else if (d->sink_p ()){
+    d_ninput_items_required.resize (d->ninputs ());
+    d_ninput_items.resize (d->ninputs ());
+    d_input_items.resize (d->ninputs ());
+    d_input_done.resize(d->ninputs());
+    d_output_items.resize (0);
+    LOG(*d_log << " sink\n");
+
+    max_items_avail = 0;
+    for (int i = 0; i < d->ninputs (); i++){
+      {
+       /*
+        * Acquire the mutex and grab local copies of items_available and done.
+        */
+       gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+       d_ninput_items[i] = d->input(i)->items_available();
+       d_input_done[i] = d->input(i)->done();
+      }
+
+      LOG(*d_log << "  d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl);
+      LOG(*d_log << "  d_input_done[" << i << "] = " << d_input_done[i] << std::endl);
+      
+      if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
+       goto were_done;
+       
+      max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+    }
+
+    // take a swag at how much output we can sink
+    noutput_items = (int) (max_items_avail * m->relative_rate ());
+    noutput_items = round_down (noutput_items, m->output_multiple ());
+    LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
+    LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);
+
+    if (noutput_items == 0){   // we're blocked on input
+      LOG(*d_log << "  BLKD_IN\n");
+      return BLKD_IN;
+    }
+
+    goto try_again;            // Jump to code shared with regular case.
+  }
+
+  else {
+    // do the regular thing
+    d_ninput_items_required.resize (d->ninputs ());
+    d_ninput_items.resize (d->ninputs ());
+    d_input_items.resize (d->ninputs ());
+    d_input_done.resize(d->ninputs());
+    d_output_items.resize (d->noutputs ());
+
+    max_items_avail = 0;
+    for (int i = 0; i < d->ninputs (); i++){
+      {
+       /*
+        * Acquire the mutex and grab local copies of items_available and done.
+        */
+       gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+       d_ninput_items[i] = d->input(i)->items_available ();
+       d_input_done[i] = d->input(i)->done();
+      }
+      max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+    }
+
+    // determine the minimum available output space
+    noutput_items = min_available_space (d, m->output_multiple ());
+    if (ENABLE_LOGGING){
+      *d_log << " regular ";
+      if (m->relative_rate() >= 1.0)
+       *d_log << "1:" << m->relative_rate() << std::endl;
+      else
+       *d_log << 1.0/m->relative_rate() << ":1\n";
+      *d_log << "  max_items_avail = " << max_items_avail << std::endl;
+      *d_log << "  noutput_items = " << noutput_items << std::endl;
+    }
+    if (noutput_items == -1)           // we're done
+      goto were_done;
+
+    if (noutput_items == 0){           // we're output blocked
+      LOG(*d_log << "  BLKD_OUT\n");
+      return BLKD_OUT;
+    }
+
+  try_again:
+    if (m->fixed_rate()){
+      // try to work it forward starting with max_items_avail.
+      // We want to try to consume all the input we've got.
+      int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
+      reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
+      if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
+       noutput_items = reqd_noutput_items;
+    }
+
+    // ask the block how much input they need to produce noutput_items
+    m->forecast (noutput_items, d_ninput_items_required);
+
+    // See if we've got sufficient input available
+
+    int i;
+    for (i = 0; i < d->ninputs (); i++)
+      if (d_ninput_items_required[i] > d_ninput_items[i])      // not enough
+       break;
+
+    if (i < d->ninputs ()){                    // not enough input on input[i]
+      // if we can, try reducing the size of our output request
+      if (noutput_items > m->output_multiple ()){
+       noutput_items /= 2;
+       noutput_items = round_up (noutput_items, m->output_multiple ());
+       goto try_again;
+      }
+
+      // We're blocked on input
+      LOG(*d_log << "  BLKD_IN\n");
+      if (d_input_done[i])     // If the upstream block is done, we're done
+       goto were_done;
+
+      // Is it possible to ever fulfill this request?
+      if (d_ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
+       // Nope, never going to happen...
+       std::cerr << "\nsched: <gr_block " << m->name()
+                 << " (" << m->unique_id() << ")>"
+                 << " is requesting more input data\n"
+                 << "  than we can provide.\n"
+                 << "  ninput_items_required = "
+                 << d_ninput_items_required[i] << "\n"
+                 << "  max_possible_items_available = "
+                 << d->input(i)->max_possible_items_available() << "\n"
+                 << "  If this is a filter, consider reducing the number of taps.\n";
+       goto were_done;
+      }
+
+      return BLKD_IN;
+    }
+
+    // We've got enough data on each input to produce noutput_items.
+    // Finish setting up the call to work.
+
+    for (int i = 0; i < d->ninputs (); i++)
+      d_input_items[i] = d->input(i)->read_pointer();
+
+  setup_call_to_work:
+
+    for (int i = 0; i < d->noutputs (); i++)
+      d_output_items[i] = d->output(i)->write_pointer();
+
+    // Do the actual work of the block
+    int n = m->general_work (noutput_items, d_ninput_items,
+                            d_input_items, d_output_items);
+    LOG(*d_log << "  general_work: noutput_items = " << noutput_items
+       << " result = " << n << std::endl);
+
+    if (n == -1)               // block is done
+      goto were_done;
+
+    d->produce_each (n);       // advance write pointers
+    if (n > 0)
+      return READY;
+
+    // We didn't produce any output even though we called general_work.
+    // We have (most likely) consumed some input.
+
+    // If this is a source, it's broken.
+    if (d->source_p()){
+      std::cerr << "gr_block_executor: source " << m
+               << " returned 0 from work.  We're marking it DONE.\n";
+      // FIXME maybe we ought to raise an exception...
+      goto were_done;
+    }
+
+    // Have the caller try again...
+    return READY_NO_OUTPUT;
+  }
+  assert (0);
+    
+ were_done:
+  LOG(*d_log << "  were_done\n");
+  d->set_done (true);
+  return DONE;
+}