Imported Upstream version 3.0.4
[debian/gnuradio] / gnuradio-core / src / python / gnuradio / gr / flow_graph.py
1 #
2 # Copyright 2004,2006 Free Software Foundation, Inc.
3
4 # This file is part of GNU Radio
5
6 # GNU Radio is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3, or (at your option)
9 # any later version.
10
11 # GNU Radio is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15
16 # You should have received a copy of the GNU General Public License
17 # along with GNU Radio; see the file COPYING.  If not, write to
18 # the Free Software Foundation, Inc., 51 Franklin Street,
19 # Boston, MA 02110-1301, USA.
20
21
22 from gnuradio.gr.basic_flow_graph import remove_duplicates, endpoint, edge, \
23      basic_flow_graph
24
25 from gnuradio.gr.exceptions import *
26 from gnuradio_swig_python import buffer, buffer_add_reader, block_detail, \
27      single_threaded_scheduler
28
29 from gnuradio.gr.scheduler import scheduler
30
31 _WHITE = 0                          # graph coloring tags
32 _GRAY  = 1
33 _BLACK = 2
34
35 _flow_graph_debug = False
36
37 def set_flow_graph_debug(on):
38     global _flow_graph_debug
39     _flow_graph_debug = on
40     
41
42 class buffer_sizes (object):
43     """compute buffer sizes to use"""
44     def __init__ (self, flow_graph):
45         # We could scan the graph here and determine individual sizes
46         # based on relative_rate, number of readers, etc.
47
48         # The simplest thing that could possibly work: all buffers same size
49         self.flow_graph = flow_graph
50         self.fixed_buffer_size = 32*1024
51
52     def allocate (self, m, index):
53         """allocate buffer for output index of block m"""
54         item_size = m.output_signature().sizeof_stream_item (index)
55         nitems = self.fixed_buffer_size / item_size
56         if nitems < 2 * m.output_multiple ():
57             nitems = 2 * m.output_multiple ()
58             
59         # if any downstream blocks is a decimator and/or has a large output_multiple, 
60         # ensure that we have a buffer at least 2 * their decimation_factor*output_multiple
61         for mdown in self.flow_graph.downstream_verticies_port(m, index):
62             decimation = int(1.0 / mdown.relative_rate())
63             nitems = max(nitems, 2 * (decimation * mdown.output_multiple() + mdown.history()))
64
65         return buffer (nitems, item_size)
66     
67
68 class flow_graph (basic_flow_graph):
69     """add physical connection info to simple_flow_graph
70     """
71     # __slots__ is incompatible with weakrefs (for some reason!)
72     # __slots__ = ['blocks', 'scheduler']
73
74     def __init__ (self):
75         basic_flow_graph.__init__ (self);
76         self.blocks = None
77         self.scheduler = None
78
79     def __del__(self):
80         # print "\nflow_graph.__del__"
81         # this ensures that i/o devices such as the USRP get shutdown gracefully
82         self.stop()
83         
84     def start (self):
85         '''start graph, forking thread(s), return immediately'''
86         if self.scheduler:
87             raise RuntimeError, "Scheduler already running"
88         self._setup_connections ()
89
90         # cast down to gr_module_sptr
91         # t = [x.block () for x in self.topological_sort (self.blocks)]
92         self.scheduler = scheduler (self)
93         self.scheduler.start ()
94
95     def stop (self):
96         '''tells scheduler to stop and waits for it to happen'''
97         if self.scheduler:
98             self.scheduler.stop ()
99             self.scheduler = None
100         
101     def wait (self):
102         '''waits for scheduler to stop'''
103         if self.scheduler:
104             self.scheduler.wait ()
105             self.scheduler = None
106         
107     def is_running (self):
108         return not not self.scheduler
109     
110     def run (self):
111         '''start graph, wait for completion'''
112         self.start ()
113         self.wait ()
114         
115     def _setup_connections (self):
116         """given the basic flow graph, setup all the physical connections"""
117         self.validate ()
118         self.blocks = self.all_blocks ()
119         self._assign_details ()
120         self._assign_buffers ()
121         self._connect_inputs ()
122
123     def _assign_details (self):
124         for m in self.blocks:
125             edges = self.in_edges (m)
126             used_ports = remove_duplicates ([e.dst.port for e in edges])
127             ninputs = len (used_ports)
128
129             edges = self.out_edges (m)
130             used_ports = remove_duplicates ([e.src.port for e in edges])
131             noutputs = len (used_ports)
132
133             m.set_detail (block_detail (ninputs, noutputs))
134
135     def _assign_buffers (self):
136         """determine the buffer sizes to use, allocate them and attach to detail"""
137         sizes = buffer_sizes (self)
138         for m in self.blocks:
139             d = m.detail ()
140             for index in range (d.noutputs ()):
141                 d.set_output (index, sizes.allocate (m, index))
142
143     def _connect_inputs (self):
144         """connect all block inputs to appropriate upstream buffers"""
145         for m in self.blocks:
146             d = m.detail ()
147             # print "%r history = %d" % (m, m.history())
148             for e in self.in_edges(m):
149                 # FYI, sources don't have any in_edges
150                 our_port = e.dst.port
151                 upstream_block = e.src.block
152                 upstream_port   = e.src.port
153                 upstream_buffer = upstream_block.detail().output(upstream_port)
154                 d.set_input(our_port, buffer_add_reader(upstream_buffer, m.history()-1))
155     
156
157     def topological_sort (self, all_v):
158         '''
159         Return a topologically sorted list of vertices.
160         This is basically a depth-first search with checks
161         for back edges (the non-DAG condition)
162
163         '''
164
165         # it's correct without this sort, but this
166         # should give better ordering for cache utilization
167         all_v = self._sort_sources_first (all_v)
168
169         output = []
170         for v in all_v:
171             v.ts_color = _WHITE
172         for v in all_v:
173             if v.ts_color == _WHITE:
174                 self._dfs_visit (v, output)
175         output.reverse ()
176         return output
177
178     def _dfs_visit (self, u, output):
179         # print "dfs_visit (enter): ", u
180         u.ts_color = _GRAY
181         for v in self.downstream_verticies (u):
182             if v.ts_color == _WHITE:    # (u, v) is a tree edge
183                 self._dfs_visit (v, output)
184             elif v.ts_color == _GRAY:   # (u, v) is a back edge
185                 raise NotDAG, "The graph is not an acyclic graph (It's got a loop)"
186             elif v.ts_color == _BLACK:  # (u, v) is a cross or forward edge
187                 pass
188             else:
189                 raise CantHappen, "Invalid color on vertex"
190         u.ts_color = _BLACK
191         output.append (u)
192         # print "dfs_visit (exit):  ", u, output
193
194     def _sort_sources_first (self, all_v):
195         # the sort function is not guaranteed to be stable.
196         # We add the unique_id in to the key so we're guaranteed
197         # of reproducible results.  This is important for the test
198         # code.  There is often more than one valid topological sort.
199         # We want to force a reproducible choice.
200         x = [(not self.source_p(v), v.unique_id(), v) for v in all_v]
201         x.sort ()
202         x = [v[2] for v in x]
203         # print "sorted: ", x
204         return x
205         
206     def partition_graph (self, all_v):
207         '''Return a list of lists of nodes that are connected.
208         The result is a list of disjoint graphs.
209         The sublists are topologically sorted.
210         '''
211         result = []
212         working_v = all_v[:]    # make copy
213         while working_v:
214             rv = self._reachable_verticies (working_v[0], working_v)
215             result.append (self.topological_sort (rv))
216             for v in rv:
217                 working_v.remove (v)
218         if _flow_graph_debug:
219             print "partition_graph:", result
220         return result
221         
222     def _reachable_verticies (self, start, all_v):
223         for v in all_v:
224             v.ts_color = _WHITE
225
226         self._reachable_dfs_visit (start)
227         return [v for v in all_v if v.ts_color == _BLACK]
228
229     def _reachable_dfs_visit (self, u):
230         u.ts_color = _BLACK
231         for v in self.adjacent_verticies (u):
232             if v.ts_color == _WHITE:
233                 self._reachable_dfs_visit (v)
234         return None