Imported Upstream version 3.0
[debian/gnuradio] / gnuradio-core / src / python / gnuradio / gr / qa_pipe_fittings.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2005 Free Software Foundation, Inc.
4
5 # This file is part of GNU Radio
6
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2, or (at your option)
10 # any later version.
11
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING.  If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
21
22
23 from gnuradio import gr, gr_unittest
24
25 if 0:
26     import os
27     print "pid =", os.getpid()
28     raw_input("Attach, then press Enter to continue")
29
30
31 def calc_expected_result(src_data, n):
32     assert (len(src_data) % n) == 0
33     result = [list() for x in range(n)]
34     #print "len(result) =", len(result)
35     for i in xrange(len(src_data)):
36         (result[i % n]).append(src_data[i])
37     return [tuple(x) for x in result]
38
39
40 class test_pipe_fittings(gr_unittest.TestCase):
41
42     def setUp(self):
43         self.fg = gr.flow_graph ()
44
45     def tearDown(self):
46         self.fg = None
47
48     def test_001(self):
49         """
50         Test stream_to_streams.
51         """
52         n = 8
53         src_len = n * 8
54         src_data = range(src_len)
55
56         expected_results = calc_expected_result(src_data, n)
57         #print "expected results: ", expected_results
58         src = gr.vector_source_i(src_data)
59         op = gr.stream_to_streams(gr.sizeof_int, n)
60         self.fg.connect(src, op)
61         
62         dsts = []
63         for i in range(n):
64             dst = gr.vector_sink_i()
65             self.fg.connect((op, i), (dst, 0))
66             dsts.append(dst)
67
68         self.fg.run()
69
70         for d in range(n):
71             self.assertEqual(expected_results[d], dsts[d].data())
72
73     def test_002(self):
74         """
75         Test streams_to_stream (using stream_to_streams).
76         """
77         n = 8
78         src_len = n * 8
79         src_data = tuple(range(src_len))
80         expected_results = src_data
81
82         src = gr.vector_source_i(src_data)
83         op1 = gr.stream_to_streams(gr.sizeof_int, n)
84         op2 = gr.streams_to_stream(gr.sizeof_int, n)
85         dst = gr.vector_sink_i()
86         
87         self.fg.connect(src, op1)
88         for i in range(n):
89             self.fg.connect((op1, i), (op2, i))
90         self.fg.connect(op2, dst)
91         
92         self.fg.run()
93         self.assertEqual(expected_results, dst.data())
94         
95     def test_003(self):
96         """
97         Test streams_to_vector (using stream_to_streams & vector_to_stream).
98         """
99         n = 8
100         src_len = n * 8
101         src_data = tuple(range(src_len))
102         expected_results = src_data
103
104         src = gr.vector_source_i(src_data)
105         op1 = gr.stream_to_streams(gr.sizeof_int, n)
106         op2 = gr.streams_to_vector(gr.sizeof_int, n)
107         op3 = gr.vector_to_stream(gr.sizeof_int, n)
108         dst = gr.vector_sink_i()
109         
110         self.fg.connect(src, op1)
111         for i in range(n):
112             self.fg.connect((op1, i), (op2, i))
113         self.fg.connect(op2, op3, dst)
114         
115         self.fg.run()
116         self.assertEqual(expected_results, dst.data())
117         
118     def test_004(self):
119         """
120         Test vector_to_streams.
121         """
122         n = 8
123         src_len = n * 8
124         src_data = tuple(range(src_len))
125         expected_results = src_data
126
127         src = gr.vector_source_i(src_data)
128         op1 = gr.stream_to_vector(gr.sizeof_int, n)
129         op2 = gr.vector_to_streams(gr.sizeof_int, n)
130         op3 = gr.streams_to_stream(gr.sizeof_int, n)
131         dst = gr.vector_sink_i()
132         
133         self.fg.connect(src, op1, op2)
134         for i in range(n):
135             self.fg.connect((op2, i), (op3, i))
136         self.fg.connect(op3, dst)
137         
138         self.fg.run()
139         self.assertEqual(expected_results, dst.data())
140
141 if __name__ == '__main__':
142     gr_unittest.main ()
143