2 # Copyright 2004,2006 Free Software Foundation, Inc.
4 # This file is part of GNU Radio
6 # GNU Radio is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2, or (at your option)
11 # GNU Radio is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with GNU Radio; see the file COPYING. If not, write to
18 # the Free Software Foundation, Inc., 51 Franklin Street,
19 # Boston, MA 02110-1301, USA.
22 from gnuradio.gr.basic_flow_graph import remove_duplicates, endpoint, edge, \
25 from gnuradio.gr.exceptions import *
26 from gnuradio_swig_python import buffer, buffer_add_reader, block_detail, \
27 single_threaded_scheduler
29 from gnuradio.gr.scheduler import scheduler
31 _WHITE = 0 # graph coloring tags
35 _flow_graph_debug = False
37 def set_flow_graph_debug(on):
38 global _flow_graph_debug
39 _flow_graph_debug = on
42 class buffer_sizes (object):
43 """compute buffer sizes to use"""
44 def __init__ (self, flow_graph):
45 # We could scan the graph here and determine individual sizes
46 # based on relative_rate, number of readers, etc.
48 # The simplest thing that could possibly work: all buffers same size
49 self.flow_graph = flow_graph
50 self.fixed_buffer_size = 32*1024
52 def allocate (self, m, index):
53 """allocate buffer for output index of block m"""
54 item_size = m.output_signature().sizeof_stream_item (index)
55 nitems = self.fixed_buffer_size / item_size
56 if nitems < 2 * m.output_multiple ():
57 nitems = 2 * m.output_multiple ()
59 # if any downstream blocks is a decimator and/or has a large output_multiple,
60 # ensure that we have a buffer at least 2 * their decimation_factor*output_multiple
61 for mdown in self.flow_graph.downstream_verticies_port(m, index):
62 decimation = int(1.0 / mdown.relative_rate())
63 nitems = max(nitems, 2 * (decimation * mdown.output_multiple() + mdown.history()))
65 return buffer (nitems, item_size)
68 class flow_graph (basic_flow_graph):
69 """add physical connection info to simple_flow_graph
71 # __slots__ is incompatible with weakrefs (for some reason!)
72 # __slots__ = ['blocks', 'scheduler']
75 basic_flow_graph.__init__ (self);
80 # print "\nflow_graph.__del__"
81 # this ensures that i/o devices such as the USRP get shutdown gracefully
85 '''start graph, forking thread(s), return immediately'''
87 raise RuntimeError, "Scheduler already running"
88 self._setup_connections ()
90 # cast down to gr_module_sptr
91 # t = [x.block () for x in self.topological_sort (self.blocks)]
92 self.scheduler = scheduler (self)
93 self.scheduler.start ()
96 '''tells scheduler to stop and waits for it to happen'''
98 self.scheduler.stop ()
102 '''waits for scheduler to stop'''
104 self.scheduler.wait ()
105 self.scheduler = None
107 def is_running (self):
108 return not not self.scheduler
111 '''start graph, wait for completion'''
115 def _setup_connections (self):
116 """given the basic flow graph, setup all the physical connections"""
118 self.blocks = self.all_blocks ()
119 self._assign_details ()
120 self._assign_buffers ()
121 self._connect_inputs ()
123 def _assign_details (self):
124 for m in self.blocks:
125 edges = self.in_edges (m)
126 used_ports = remove_duplicates ([e.dst.port for e in edges])
127 ninputs = len (used_ports)
129 edges = self.out_edges (m)
130 used_ports = remove_duplicates ([e.src.port for e in edges])
131 noutputs = len (used_ports)
133 m.set_detail (block_detail (ninputs, noutputs))
135 def _assign_buffers (self):
136 """determine the buffer sizes to use, allocate them and attach to detail"""
137 sizes = buffer_sizes (self)
138 for m in self.blocks:
140 for index in range (d.noutputs ()):
141 d.set_output (index, sizes.allocate (m, index))
143 def _connect_inputs (self):
144 """connect all block inputs to appropriate upstream buffers"""
145 for m in self.blocks:
147 # print "%r history = %d" % (m, m.history())
148 for e in self.in_edges(m):
149 # FYI, sources don't have any in_edges
150 our_port = e.dst.port
151 upstream_block = e.src.block
152 upstream_port = e.src.port
153 upstream_buffer = upstream_block.detail().output(upstream_port)
154 d.set_input(our_port, buffer_add_reader(upstream_buffer, m.history()-1))
157 def topological_sort (self, all_v):
159 Return a topologically sorted list of vertices.
160 This is basically a depth-first search with checks
161 for back edges (the non-DAG condition)
165 # it's correct without this sort, but this
166 # should give better ordering for cache utilization
167 all_v = self._sort_sources_first (all_v)
173 if v.ts_color == _WHITE:
174 self._dfs_visit (v, output)
178 def _dfs_visit (self, u, output):
179 # print "dfs_visit (enter): ", u
181 for v in self.downstream_verticies (u):
182 if v.ts_color == _WHITE: # (u, v) is a tree edge
183 self._dfs_visit (v, output)
184 elif v.ts_color == _GRAY: # (u, v) is a back edge
185 raise NotDAG, "The graph is not an acyclic graph (It's got a loop)"
186 elif v.ts_color == _BLACK: # (u, v) is a cross or forward edge
189 raise CantHappen, "Invalid color on vertex"
192 # print "dfs_visit (exit): ", u, output
194 def _sort_sources_first (self, all_v):
195 # the sort function is not guaranteed to be stable.
196 # We add the unique_id in to the key so we're guaranteed
197 # of reproducible results. This is important for the test
198 # code. There is often more than one valid topological sort.
199 # We want to force a reproducible choice.
200 x = [(not self.source_p(v), v.unique_id(), v) for v in all_v]
202 x = [v[2] for v in x]
203 # print "sorted: ", x
206 def partition_graph (self, all_v):
207 '''Return a list of lists of nodes that are connected.
208 The result is a list of disjoint graphs.
209 The sublists are topologically sorted.
212 working_v = all_v[:] # make copy
214 rv = self._reachable_verticies (working_v[0], working_v)
215 result.append (self.topological_sort (rv))
218 if _flow_graph_debug:
219 print "partition_graph:", result
222 def _reachable_verticies (self, start, all_v):
226 self._reachable_dfs_visit (start)
227 return [v for v in all_v if v.ts_color == _BLACK]
229 def _reachable_dfs_visit (self, u):
231 for v in self.adjacent_verticies (u):
232 if v.ts_color == _WHITE:
233 self._reachable_dfs_visit (v)