3 # Copyright 2004 Free Software Foundation, Inc.
5 # This file is part of GNU Radio
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2, or (at your option)
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING. If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
23 from gnuradio import gr, gr_unittest
24 import qa_basic_flow_graph
28 return (gr.block_ncurrently_allocated (),
29 gr.block_detail_ncurrently_allocated (),
30 gr.buffer_ncurrently_allocated (),
31 gr.buffer_reader_ncurrently_allocated ())
34 class wrap_add(gr.hier_block):
35 def __init__(self, fg, a):
36 add = gr.add_const_ii (a)
37 gr.hier_block.__init__ (self, fg, add, add)
39 class mult_add(gr.hier_block):
40 def __init__(self, fg, m, a):
41 mult = gr.multiply_const_ii (m)
42 add = gr.add_const_ii (a)
43 fg.connect (mult, add)
44 gr.hier_block.__init__ (self, fg, mult, add)
47 class test_flow_graph (qa_basic_flow_graph.test_basic_flow_graph):
50 ''' override qa_basic_flow_graph setUp in order to use our class'''
51 self.fg = gr.flow_graph ()
54 qa_basic_flow_graph.test_basic_flow_graph.tearDown (self)
57 # we inherit all their tests, so we can be sure we don't break
58 # any of the underlying code
61 def leak_check (self, fct):
64 # tear down early so we can check for leaks
67 self.assertEqual (begin, end)
69 def test_100_tsort_null (self):
70 self.assertEqual ([], self.fg.topological_sort (self.fg.all_blocks ()))
72 def test_101_tsort_two (self):
74 src1 = gr.null_source (gr.sizeof_int)
75 dst1 = gr.null_sink (gr.sizeof_int)
76 fg.connect ((src1, 0), (dst1, 0))
78 self.assertEqual ([src1, dst1], fg.topological_sort (fg.all_blocks ()))
80 def test_102_tsort_three_a (self):
82 src1 = gr.null_source (gr.sizeof_int)
83 nop1 = gr.nop (gr.sizeof_int)
84 dst1 = gr.null_sink (gr.sizeof_int)
85 fg.connect (src1, nop1)
86 fg.connect (nop1, dst1)
88 self.assertEqual ([src1, nop1, dst1], fg.topological_sort (fg.all_blocks ()))
90 def test_103_tsort_three_b (self):
92 src1 = gr.null_source (gr.sizeof_int)
93 nop1 = gr.nop (gr.sizeof_int)
94 dst1 = gr.null_sink (gr.sizeof_int)
95 fg.connect (nop1, dst1)
96 fg.connect (src1, nop1)
98 self.assertEqual ([src1, nop1, dst1], fg.topological_sort (fg.all_blocks ()))
100 def test_104_trivial_dag_check (self):
102 nop1 = gr.nop (gr.sizeof_int)
103 fg.connect (nop1, nop1)
105 self.assertRaises (gr.NotDAG,
106 lambda : fg.topological_sort (fg.all_blocks ()))
110 src1 = gr.null_source (gr.sizeof_int)
111 src2 = gr.null_source (gr.sizeof_int)
112 nop1 = gr.nop (gr.sizeof_int)
113 nop2 = gr.nop (gr.sizeof_int)
114 nop3 = gr.nop (gr.sizeof_int)
115 dst1 = gr.null_sink (gr.sizeof_int)
117 fg.connect (src1, nop1)
118 fg.connect (src2, nop2)
119 fg.connect (nop1, (nop3, 0))
120 fg.connect (nop2, (nop3, 1))
121 fg.connect (nop3, dst1)
123 ts = fg.topological_sort (fg.all_blocks ())
124 self.assertEqual ([src2, nop2, src1, nop1, nop3, dst1], ts)
129 src1 = gr.null_source (gr.sizeof_int)
130 src2 = gr.null_source (gr.sizeof_int)
131 nop1 = gr.nop (gr.sizeof_int)
132 nop2 = gr.nop (gr.sizeof_int)
133 nop3 = gr.nop (gr.sizeof_int)
134 dst1 = gr.null_sink (gr.sizeof_int)
136 fg.connect (nop3, dst1)
137 fg.connect (nop2, (nop3, 1))
138 fg.connect (nop1, (nop3, 0))
139 fg.connect (src2, nop2)
140 fg.connect (src1, nop1)
142 ts = fg.topological_sort (fg.all_blocks ())
143 self.assertEqual ([src2, nop2, src1, nop1, nop3, dst1], ts)
147 src1 = gr.null_source (gr.sizeof_int)
148 src2 = gr.null_source (gr.sizeof_int)
149 nop1 = gr.nop (gr.sizeof_int)
150 nop2 = gr.nop (gr.sizeof_int)
151 dst1 = gr.null_sink (gr.sizeof_int)
152 dst2 = gr.null_sink (gr.sizeof_int)
154 fg.connect (src1, nop1)
155 fg.connect (nop1, dst1)
156 fg.connect (src2, nop2)
157 fg.connect (nop2, dst2)
159 ts = fg.topological_sort (fg.all_blocks ())
160 self.assertEqual ([src2, nop2, dst2, src1, nop1, dst1], ts)
163 self.leak_check (self.body_108)
167 src1 = gr.null_source (gr.sizeof_int)
168 src2 = gr.null_source (gr.sizeof_int)
169 nop1 = gr.nop (gr.sizeof_int)
170 nop2 = gr.nop (gr.sizeof_int)
171 dst1 = gr.null_sink (gr.sizeof_int)
172 dst2 = gr.null_sink (gr.sizeof_int)
174 fg.connect (nop2, dst2)
175 fg.connect (src1, nop1)
176 fg.connect (src2, nop2)
177 fg.connect (nop1, dst1)
179 ts = fg.topological_sort (fg.all_blocks ())
180 self.assertEqual ([src2, nop2, dst2, src1, nop1, dst1], ts)
181 self.assertEqual ((6,0,0,0), all_counts ())
183 def test_109__setup_connections (self):
184 self.leak_check (self.body_109)
188 src1 = gr.null_source (gr.sizeof_int)
189 dst1 = gr.null_sink (gr.sizeof_int)
190 fg.connect (src1, dst1)
191 fg._setup_connections ()
192 self.assertEqual ((2,2,1,1), all_counts ())
194 def test_110_scheduler (self):
195 self.leak_check (self.body_110)
199 src_data = (0, 1, 2, 3)
200 src1 = gr.vector_source_i (src_data)
201 dst1 = gr.vector_sink_i ()
202 fg.connect ((src1, 0), (dst1, 0))
204 dst_data = dst1.data ()
205 self.assertEqual (src_data, dst_data)
207 def test_111_scheduler (self):
208 self.leak_check (self.body_111)
212 src_data = (0, 1, 2, 3)
213 expected_result = (2, 3, 4, 5)
214 src1 = gr.vector_source_i (src_data)
215 op = gr.add_const_ii (2)
216 dst1 = gr.vector_sink_i ()
217 fg.connect (src1, op)
218 fg.connect (op, dst1)
220 dst_data = dst1.data ()
221 self.assertEqual (expected_result, dst_data)
223 def test_111v_scheduler (self):
224 self.leak_check (self.body_111v)
226 def body_111v (self):
228 src_data = (0, 1, 2, 3)
229 expected_result = (2, 3, 4, 5)
230 src1 = gr.vector_source_i (src_data)
231 op = gr.add_const_ii (2)
232 dst1 = gr.vector_sink_i ()
233 fg.connect (src1, op, dst1)
235 dst_data = dst1.data ()
236 self.assertEqual (expected_result, dst_data)
240 nop1 = gr.nop (gr.sizeof_int)
241 nop2 = gr.nop (gr.sizeof_int)
242 nop3 = gr.nop (gr.sizeof_int)
243 fg.connect ((nop1, 0), (nop2, 0))
244 fg.connect ((nop1, 1), (nop3, 0))
245 fg._setup_connections ()
246 self.assertEqual (2, nop1.detail().noutputs())
250 nop1 = gr.nop (gr.sizeof_int)
251 nop2 = gr.nop (gr.sizeof_int)
252 nop3 = gr.nop (gr.sizeof_int)
253 fg.connect ((nop1, 0), (nop2, 0))
254 fg.connect ((nop1, 0), (nop3, 0))
255 fg._setup_connections ()
256 self.assertEqual (1, nop1.detail().noutputs())
258 def test_200_partition (self):
259 self.leak_check (self.body_200)
263 src1 = gr.null_source (gr.sizeof_int)
264 nop1 = gr.nop (gr.sizeof_int)
265 dst1 = gr.null_sink (gr.sizeof_int)
266 fg.connect (nop1, dst1)
267 fg.connect (src1, nop1)
269 p = fg.partition_graph (fg.all_blocks ())
270 self.assertEqual ([[src1, nop1, dst1]], p)
271 self.assertEqual ((3,0,0,0), all_counts ())
273 def test_201_partition (self):
274 self.leak_check (self.body_201)
278 src1 = gr.null_source (gr.sizeof_int)
279 src2 = gr.null_source (gr.sizeof_int)
280 nop1 = gr.nop (gr.sizeof_int)
281 nop2 = gr.nop (gr.sizeof_int)
282 dst1 = gr.null_sink (gr.sizeof_int)
283 dst2 = gr.null_sink (gr.sizeof_int)
285 fg.connect (nop2, dst2)
286 fg.connect (src1, nop1)
287 fg.connect (src2, nop2)
288 fg.connect (nop1, dst1)
290 p = fg.partition_graph (fg.all_blocks ())
291 self.assertEqual ([[src2, nop2, dst2], [src1, nop1, dst1]], p)
292 self.assertEqual ((6,0,0,0), all_counts ())
294 def test_300_hier (self):
295 self.leak_check (self.body_300_hier)
297 def body_300_hier (self):
299 src_data = (0, 1, 2, 3)
300 expected_result = (10, 11, 12, 13)
301 src1 = gr.vector_source_i (src_data)
302 op = wrap_add (fg, 10)
303 dst1 = gr.vector_sink_i ()
304 fg.connect (src1, op, dst1)
306 dst_data = dst1.data ()
307 self.assertEqual (expected_result, dst_data)
309 def test_301_hier (self):
310 self.leak_check (self.body_301_hier)
312 def body_301_hier (self):
314 src_data = (0, 1, 2, 3)
315 expected_result = (5, 8, 11, 14)
316 src1 = gr.vector_source_i (src_data)
317 op = mult_add (fg, 3, 5)
318 dst1 = gr.vector_sink_i ()
319 fg.connect (src1, op, dst1)
321 dst_data = dst1.data ()
322 self.assertEqual (expected_result, dst_data)
324 def test_302_hier (self):
325 self.leak_check (self.body_302_hier)
327 def body_302_hier (self):
329 src_data = (0, 1, 2, 3)
330 expected_result = (10, 11, 12, 13)
331 src1 = gr.vector_source_i (src_data)
332 op = gr.compose (fg, gr.add_const_ii (10))
333 dst1 = gr.vector_sink_i ()
334 fg.connect (src1, op, dst1)
336 dst_data = dst1.data ()
337 self.assertEqual (expected_result, dst_data)
339 def test_303_hier (self):
340 self.leak_check (self.body_303_hier)
342 def body_303_hier (self):
344 src_data = (0, 1, 2, 3)
345 expected_result = (35, 38, 41, 44)
346 src1 = gr.vector_source_i (src_data)
347 op = gr.compose (fg, gr.add_const_ii (10), mult_add (fg, 3, 5))
348 dst1 = gr.vector_sink_i ()
349 fg.connect (src1, op, dst1)
351 dst_data = dst1.data ()
352 self.assertEqual (expected_result, dst_data)
355 if __name__ == '__main__':