Imported Upstream version 3.0
[debian/gnuradio] / gnuradio-core / src / python / gnuradio / gr / qa_flow_graph.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2004 Free Software Foundation, Inc.
4
5 # This file is part of GNU Radio
6
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2, or (at your option)
10 # any later version.
11
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING.  If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
21
22
23 from gnuradio import gr, gr_unittest
24 import qa_basic_flow_graph
25
26
27 def all_counts ():
28     return (gr.block_ncurrently_allocated (),
29             gr.block_detail_ncurrently_allocated (),
30             gr.buffer_ncurrently_allocated (),
31             gr.buffer_reader_ncurrently_allocated ())
32
33
34 class wrap_add(gr.hier_block):
35     def __init__(self, fg, a):
36         add = gr.add_const_ii (a)
37         gr.hier_block.__init__ (self, fg, add, add)
38
39 class mult_add(gr.hier_block):
40     def __init__(self, fg, m, a):
41         mult = gr.multiply_const_ii (m)
42         add = gr.add_const_ii (a)
43         fg.connect (mult, add)
44         gr.hier_block.__init__ (self, fg, mult, add)
45
46
47 class test_flow_graph (qa_basic_flow_graph.test_basic_flow_graph):
48
49     def setUp (self):
50         ''' override qa_basic_flow_graph setUp in order to use our class'''
51         self.fg = gr.flow_graph ()
52
53     def tearDown (self):
54         qa_basic_flow_graph.test_basic_flow_graph.tearDown (self)
55
56
57     # we inherit all their tests, so we can be sure we don't break
58     # any of the underlying code
59
60
61     def leak_check (self, fct):
62         begin = all_counts ()
63         fct ()
64         # tear down early so we can check for leaks
65         self.tearDown ()
66         end = all_counts ()
67         self.assertEqual (begin, end)
68
69     def test_100_tsort_null (self):
70         self.assertEqual ([], self.fg.topological_sort (self.fg.all_blocks ()))
71
72     def test_101_tsort_two (self):
73         fg = self.fg
74         src1 = gr.null_source (gr.sizeof_int)
75         dst1 = gr.null_sink (gr.sizeof_int)
76         fg.connect ((src1, 0), (dst1, 0))
77         fg.validate ()
78         self.assertEqual ([src1, dst1], fg.topological_sort (fg.all_blocks ()))
79         
80     def test_102_tsort_three_a (self):
81         fg = self.fg
82         src1 = gr.null_source (gr.sizeof_int)
83         nop1 = gr.nop (gr.sizeof_int)
84         dst1 = gr.null_sink (gr.sizeof_int)
85         fg.connect (src1, nop1)
86         fg.connect (nop1, dst1)
87         fg.validate ()
88         self.assertEqual ([src1, nop1, dst1], fg.topological_sort (fg.all_blocks ()))
89         
90     def test_103_tsort_three_b (self):
91         fg = self.fg
92         src1 = gr.null_source (gr.sizeof_int)
93         nop1 = gr.nop (gr.sizeof_int)
94         dst1 = gr.null_sink (gr.sizeof_int)
95         fg.connect (nop1, dst1)
96         fg.connect (src1, nop1)
97         fg.validate ()
98         self.assertEqual ([src1, nop1, dst1], fg.topological_sort (fg.all_blocks ()))
99         
100     def test_104_trivial_dag_check (self):
101         fg = self.fg
102         nop1 = gr.nop (gr.sizeof_int)
103         fg.connect (nop1, nop1)
104         fg.validate ()
105         self.assertRaises (gr.NotDAG,
106                            lambda : fg.topological_sort (fg.all_blocks ()))
107
108     def test_105 (self):
109         fg = self.fg
110         src1 = gr.null_source (gr.sizeof_int)
111         src2 = gr.null_source (gr.sizeof_int)
112         nop1 = gr.nop (gr.sizeof_int)
113         nop2 = gr.nop (gr.sizeof_int)
114         nop3 = gr.nop (gr.sizeof_int)
115         dst1 = gr.null_sink (gr.sizeof_int)
116         
117         fg.connect (src1, nop1)
118         fg.connect (src2, nop2)
119         fg.connect (nop1, (nop3, 0))
120         fg.connect (nop2, (nop3, 1))
121         fg.connect (nop3, dst1)
122         fg.validate ()
123         ts = fg.topological_sort (fg.all_blocks ())
124         self.assertEqual ([src2, nop2, src1, nop1, nop3, dst1], ts)
125                      
126
127     def test_106 (self):
128         fg = self.fg
129         src1 = gr.null_source (gr.sizeof_int)
130         src2 = gr.null_source (gr.sizeof_int)
131         nop1 = gr.nop (gr.sizeof_int)
132         nop2 = gr.nop (gr.sizeof_int)
133         nop3 = gr.nop (gr.sizeof_int)
134         dst1 = gr.null_sink (gr.sizeof_int)
135         
136         fg.connect (nop3, dst1)
137         fg.connect (nop2, (nop3, 1))
138         fg.connect (nop1, (nop3, 0))
139         fg.connect (src2, nop2)
140         fg.connect (src1, nop1)
141         fg.validate ()
142         ts = fg.topological_sort (fg.all_blocks ())
143         self.assertEqual ([src2, nop2, src1, nop1, nop3, dst1], ts)
144
145     def test_107 (self):
146         fg = self.fg
147         src1 = gr.null_source (gr.sizeof_int)
148         src2 = gr.null_source (gr.sizeof_int)
149         nop1 = gr.nop (gr.sizeof_int)
150         nop2 = gr.nop (gr.sizeof_int)
151         dst1 = gr.null_sink (gr.sizeof_int)
152         dst2 = gr.null_sink (gr.sizeof_int)
153         
154         fg.connect (src1, nop1)
155         fg.connect (nop1, dst1)
156         fg.connect (src2, nop2)
157         fg.connect (nop2, dst2)
158         fg.validate ()
159         ts = fg.topological_sort (fg.all_blocks ())
160         self.assertEqual ([src2, nop2, dst2, src1, nop1, dst1], ts)
161
162     def test_108 (self):
163         self.leak_check (self.body_108)
164         
165     def body_108 (self):
166         fg = self.fg
167         src1 = gr.null_source (gr.sizeof_int)
168         src2 = gr.null_source (gr.sizeof_int)
169         nop1 = gr.nop (gr.sizeof_int)
170         nop2 = gr.nop (gr.sizeof_int)
171         dst1 = gr.null_sink (gr.sizeof_int)
172         dst2 = gr.null_sink (gr.sizeof_int)
173         
174         fg.connect (nop2, dst2)
175         fg.connect (src1, nop1)
176         fg.connect (src2, nop2)
177         fg.connect (nop1, dst1)
178         fg.validate ()
179         ts = fg.topological_sort (fg.all_blocks ())
180         self.assertEqual ([src2, nop2, dst2, src1, nop1, dst1], ts)
181         self.assertEqual ((6,0,0,0), all_counts ())
182
183     def test_109__setup_connections (self):
184         self.leak_check (self.body_109)
185
186     def body_109 (self):
187         fg = self.fg
188         src1 = gr.null_source (gr.sizeof_int)
189         dst1 = gr.null_sink (gr.sizeof_int)
190         fg.connect (src1, dst1)
191         fg._setup_connections ()
192         self.assertEqual ((2,2,1,1), all_counts ())
193
194     def test_110_scheduler (self):
195         self.leak_check (self.body_110)
196
197     def body_110 (self):
198         fg = self.fg
199         src_data = (0, 1, 2, 3)
200         src1 = gr.vector_source_i (src_data)
201         dst1 = gr.vector_sink_i ()
202         fg.connect ((src1, 0), (dst1, 0))
203         fg.run ()
204         dst_data = dst1.data ()
205         self.assertEqual (src_data, dst_data)
206
207     def test_111_scheduler (self):
208         self.leak_check (self.body_111)
209
210     def body_111 (self):
211         fg = self.fg
212         src_data = (0, 1, 2, 3)
213         expected_result = (2, 3, 4, 5)
214         src1 = gr.vector_source_i (src_data)
215         op = gr.add_const_ii (2)
216         dst1 = gr.vector_sink_i ()
217         fg.connect (src1, op)
218         fg.connect (op, dst1)
219         fg.run ()
220         dst_data = dst1.data ()
221         self.assertEqual (expected_result, dst_data)
222         
223     def test_111v_scheduler (self):
224         self.leak_check (self.body_111v)
225
226     def body_111v (self):
227         fg = self.fg
228         src_data = (0, 1, 2, 3)
229         expected_result = (2, 3, 4, 5)
230         src1 = gr.vector_source_i (src_data)
231         op = gr.add_const_ii (2)
232         dst1 = gr.vector_sink_i ()
233         fg.connect (src1, op, dst1)
234         fg.run ()
235         dst_data = dst1.data ()
236         self.assertEqual (expected_result, dst_data)
237         
238     def test_112 (self):
239         fg = self.fg
240         nop1 = gr.nop (gr.sizeof_int)
241         nop2 = gr.nop (gr.sizeof_int)
242         nop3 = gr.nop (gr.sizeof_int)
243         fg.connect ((nop1, 0), (nop2, 0))
244         fg.connect ((nop1, 1), (nop3, 0))
245         fg._setup_connections ()
246         self.assertEqual (2, nop1.detail().noutputs())
247
248     def test_113 (self):
249         fg = self.fg
250         nop1 = gr.nop (gr.sizeof_int)
251         nop2 = gr.nop (gr.sizeof_int)
252         nop3 = gr.nop (gr.sizeof_int)
253         fg.connect ((nop1, 0), (nop2, 0))
254         fg.connect ((nop1, 0), (nop3, 0))
255         fg._setup_connections ()
256         self.assertEqual (1, nop1.detail().noutputs())
257
258     def test_200_partition (self):
259         self.leak_check (self.body_200)
260
261     def body_200 (self):
262         fg = self.fg
263         src1 = gr.null_source (gr.sizeof_int)
264         nop1 = gr.nop (gr.sizeof_int)
265         dst1 = gr.null_sink (gr.sizeof_int)
266         fg.connect (nop1, dst1)
267         fg.connect (src1, nop1)
268         fg.validate ()
269         p = fg.partition_graph (fg.all_blocks ())
270         self.assertEqual ([[src1, nop1, dst1]], p)
271         self.assertEqual ((3,0,0,0), all_counts ())
272         
273     def test_201_partition (self):
274         self.leak_check (self.body_201)
275
276     def body_201 (self):
277         fg = self.fg
278         src1 = gr.null_source (gr.sizeof_int)
279         src2 = gr.null_source (gr.sizeof_int)
280         nop1 = gr.nop (gr.sizeof_int)
281         nop2 = gr.nop (gr.sizeof_int)
282         dst1 = gr.null_sink (gr.sizeof_int)
283         dst2 = gr.null_sink (gr.sizeof_int)
284         
285         fg.connect (nop2, dst2)
286         fg.connect (src1, nop1)
287         fg.connect (src2, nop2)
288         fg.connect (nop1, dst1)
289         fg.validate ()
290         p = fg.partition_graph (fg.all_blocks ())
291         self.assertEqual ([[src2, nop2, dst2], [src1, nop1, dst1]], p)
292         self.assertEqual ((6,0,0,0), all_counts ())
293         
294     def test_300_hier (self):
295         self.leak_check (self.body_300_hier)
296
297     def body_300_hier (self):
298         fg = self.fg
299         src_data = (0, 1, 2, 3)
300         expected_result = (10, 11, 12, 13)
301         src1 = gr.vector_source_i (src_data)
302         op = wrap_add (fg, 10)
303         dst1 = gr.vector_sink_i ()
304         fg.connect (src1, op, dst1)
305         fg.run ()
306         dst_data = dst1.data ()
307         self.assertEqual (expected_result, dst_data)
308         
309     def test_301_hier (self):
310         self.leak_check (self.body_301_hier)
311
312     def body_301_hier (self):
313         fg = self.fg
314         src_data = (0, 1, 2, 3)
315         expected_result = (5, 8, 11, 14)
316         src1 = gr.vector_source_i (src_data)
317         op = mult_add (fg, 3, 5)
318         dst1 = gr.vector_sink_i ()
319         fg.connect (src1, op, dst1)
320         fg.run ()
321         dst_data = dst1.data ()
322         self.assertEqual (expected_result, dst_data)
323         
324     def test_302_hier (self):
325         self.leak_check (self.body_302_hier)
326
327     def body_302_hier (self):
328         fg = self.fg
329         src_data = (0, 1, 2, 3)
330         expected_result = (10, 11, 12, 13)
331         src1 = gr.vector_source_i (src_data)
332         op = gr.compose (fg, gr.add_const_ii (10))
333         dst1 = gr.vector_sink_i ()
334         fg.connect (src1, op, dst1)
335         fg.run ()
336         dst_data = dst1.data ()
337         self.assertEqual (expected_result, dst_data)
338
339     def test_303_hier (self):
340         self.leak_check (self.body_303_hier)
341
342     def body_303_hier (self):
343         fg = self.fg
344         src_data = (0, 1, 2, 3)
345         expected_result = (35, 38, 41, 44)
346         src1 = gr.vector_source_i (src_data)
347         op = gr.compose (fg, gr.add_const_ii (10), mult_add (fg, 3, 5))
348         dst1 = gr.vector_sink_i ()
349         fg.connect (src1, op, dst1)
350         fg.run ()
351         dst_data = dst1.data ()
352         self.assertEqual (expected_result, dst_data)
353         
354
355 if __name__ == '__main__':
356     gr_unittest.main ()