Imported Upstream version 3.0
[debian/gnuradio] / gnuradio-core / src / python / gnuradio / gr / qa_rational_resampler.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2005,2006 Free Software Foundation, Inc.
4
5 # This file is part of GNU Radio
6
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2, or (at your option)
10 # any later version.
11
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING.  If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
21
22
23 from gnuradio import gr, gr_unittest
24 from gnuradio import blks
25 import math
26 import random
27 import sys
28
29 #import os
30 #print os.getpid()
31 #raw_input('Attach with gdb, then press Enter: ')
32
33
34 def random_floats(n):
35     r = []
36     for x in xrange(n):
37         r.append(float(random.randint(-32768, 32768)))
38     return tuple(r)
39
40
41 def reference_dec_filter(src_data, decim, taps):
42     fg = gr.flow_graph()
43     src = gr.vector_source_f(src_data)
44     op = gr.fir_filter_fff(decim, taps)
45     dst = gr.vector_sink_f()
46     fg.connect(src, op, dst)
47     fg.run()
48     result_data = dst.data()
49     fg = None
50     return result_data
51
52 def reference_interp_filter(src_data, interp, taps):
53     fg = gr.flow_graph()
54     src = gr.vector_source_f(src_data)
55     op = gr.interp_fir_filter_fff(interp, taps)
56     dst = gr.vector_sink_f()
57     fg.connect(src, op, dst)
58     fg.run()
59     result_data = dst.data()
60     fg = None
61     return result_data
62
63 def reference_interp_dec_filter(src_data, interp, decim, taps):
64     fg = gr.flow_graph()
65     src = gr.vector_source_f(src_data)
66     up = gr.interp_fir_filter_fff(interp, (1,))
67     dn = gr.fir_filter_fff(decim, taps)
68     dst = gr.vector_sink_f()
69     fg.connect(src, up, dn, dst)
70     fg.run()
71     result_data = dst.data()
72     fg = None
73     return result_data
74     
75
76 class test_rational_resampler (gr_unittest.TestCase):
77
78     def setUp(self):
79         self.fg = gr.flow_graph()
80
81     def tearDown(self):
82         self.fg = None
83
84     #
85     # test the gr.rational_resampler_base primitives...
86     #
87     
88     def test_000_1_to_1(self):
89         taps = (-4, 5)
90         src_data = (234,  -4,  23,  -56,  45,    98,  -23,  -7)
91         xr = (-936, 1186, -112, 339, -460, -167, 582)
92         expected_result = tuple([float(x) for x in xr])
93
94         src = gr.vector_source_f(src_data)
95         op = gr.rational_resampler_base_fff(1, 1, taps)
96         dst = gr.vector_sink_f()
97         self.fg.connect(src, op)
98         self.fg.connect(op, dst)
99         self.fg.run()
100         result_data = dst.data()
101         self.assertEqual(expected_result, result_data)
102         
103     def test_001_interp(self):
104         taps = [1, 10, 100, 1000, 10000]
105         src_data = (0, 2, 3, 5, 7, 11, 13, 17)
106         interpolation = 3
107         xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170,1700.0,17000.0,170000.0)
108         expected_result = tuple([float(x) for x in xr])
109
110         src = gr.vector_source_f(src_data)
111         op = gr.rational_resampler_base_fff(interpolation, 1, taps)
112         dst = gr.vector_sink_f()
113         self.fg.connect(src, op)
114         self.fg.connect(op, dst)
115         self.fg.run()
116         result_data = dst.data()
117         self.assertEqual(expected_result, result_data)
118
119     def test_002_interp(self):          
120         taps = random_floats(31)
121         #src_data = random_floats(10000)  # FIXME the 10k case fails!
122         src_data = random_floats(1000)
123         interpolation = 3
124
125         expected_result = reference_interp_filter(src_data, interpolation, taps)
126
127         src = gr.vector_source_f(src_data)
128         op = gr.rational_resampler_base_fff(interpolation, 1, taps)
129         dst = gr.vector_sink_f()
130         self.fg.connect(src, op)
131         self.fg.connect(op, dst)
132         self.fg.run()
133         result_data = dst.data()
134
135         L1 = len(result_data)
136         L2 = len(expected_result)
137         L = min(L1, L2)
138         if False:
139             sys.stderr.write('delta = %2d: ntaps = %d interp = %d ilen = %d\n' %
140                              (L2 - L1, len(taps), interpolation, len(src_data)))
141             sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
142                              (len(result_data), len(expected_result)))
143         #self.assertEqual(expected_result[0:L], result_data[0:L])
144         # FIXME check first 3 answers
145         self.assertEqual(expected_result[3:L], result_data[3:L])
146
147     def test_003_interp(self):
148         taps = random_floats(31)
149         src_data = random_floats(10000)
150         decimation = 3
151
152         expected_result = reference_dec_filter(src_data, decimation, taps)
153
154         src = gr.vector_source_f(src_data)
155         op = gr.rational_resampler_base_fff(1, decimation, taps)
156         dst = gr.vector_sink_f()
157         self.fg.connect(src, op)
158         self.fg.connect(op, dst)
159         self.fg.run()
160         result_data = dst.data()
161
162         L1 = len(result_data)
163         L2 = len(expected_result)
164         L = min(L1, L2)
165         if False:
166             sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' %
167                              (L2 - L1, len(taps), decimation, len(src_data)))
168             sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
169                              (len(result_data), len(expected_result)))
170         self.assertEqual(expected_result[0:L], result_data[0:L])
171
172     # FIXME disabled.  Triggers hang on SuSE 10.0
173     def xtest_004_decim_random_vals(self):
174         MAX_TAPS = 9
175         MAX_DECIM = 7
176         OUTPUT_LEN = 9
177
178         random.seed(0)    # we want reproducibility
179
180         for ntaps in xrange(1, MAX_TAPS + 1):
181             for decim in xrange(1, MAX_DECIM+1):
182                 for ilen in xrange(ntaps + decim, ntaps + OUTPUT_LEN*decim):
183                     src_data = random_floats(ilen)
184                     taps = random_floats(ntaps)
185                     expected_result = reference_dec_filter(src_data, decim, taps)
186
187                     fg = gr.flow_graph()
188                     src = gr.vector_source_f(src_data)
189                     op = gr.rational_resampler_base_fff(1, decim, taps)
190                     dst = gr.vector_sink_f()
191                     fg.connect(src, op, dst)
192                     fg.run()
193                     fg = None
194                     result_data = dst.data()
195                     L1 = len(result_data)
196                     L2 = len(expected_result)
197                     L = min(L1, L2)
198                     if False:
199                         sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' % (L2 - L1, ntaps, decim, ilen))
200                         sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
201                                          (len(result_data), len(expected_result)))
202                     self.assertEqual(expected_result[0:L], result_data[0:L])
203
204
205     # FIXME disabled.  Triggers hang on SuSE 10.0
206     def xtest_005_interp_random_vals(self):
207         MAX_TAPS = 9
208         MAX_INTERP = 7
209         INPUT_LEN = 9
210
211         random.seed(0)    # we want reproducibility
212
213         for ntaps in xrange(1, MAX_TAPS + 1):
214             for interp in xrange(1, MAX_INTERP+1):
215                 for ilen in xrange(ntaps, ntaps + INPUT_LEN):
216                     src_data = random_floats(ilen)
217                     taps = random_floats(ntaps)
218                     expected_result = reference_interp_filter(src_data, interp, taps)
219
220                     fg = gr.flow_graph()
221                     src = gr.vector_source_f(src_data)
222                     op = gr.rational_resampler_base_fff(interp, 1, taps)
223                     dst = gr.vector_sink_f()
224                     fg.connect(src, op, dst)
225                     fg.run()
226                     fg = None
227                     result_data = dst.data()
228                     L1 = len(result_data)
229                     L2 = len(expected_result)
230                     L = min(L1, L2)
231                     #if True or abs(L1-L2) > 1:
232                     if False:
233                         sys.stderr.write('delta = %2d: ntaps = %d interp = %d ilen = %d\n' % (L2 - L1, ntaps, interp, ilen))
234                         #sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
235                         #                 (len(result_data), len(expected_result)))
236                     #self.assertEqual(expected_result[0:L], result_data[0:L])
237                     # FIXME check first ntaps+1 answers
238                     self.assertEqual(expected_result[ntaps+1:L], result_data[ntaps+1:L])
239
240
241     def test_006_interp_decim(self):
242         taps = (0,1,0,0)
243         src_data = range(10000)
244         interp = 3
245         decimation = 2
246
247         expected_result = reference_interp_dec_filter(src_data, interp, decimation, taps)
248
249         src = gr.vector_source_f(src_data)
250         op = gr.rational_resampler_base_fff(interp, decimation, taps)
251         dst = gr.vector_sink_f()
252         self.fg.connect(src, op)
253         self.fg.connect(op, dst)
254         self.fg.run()
255         result_data = dst.data()
256
257         L1 = len(result_data)
258         L2 = len(expected_result)
259         L = min(L1, L2)
260         if False:
261             sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' %
262                              (L2 - L1, len(taps), decimation, len(src_data)))
263             sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
264                              (len(result_data), len(expected_result)))
265         self.assertEqual(expected_result[1:L], result_data[1:L])
266
267     #
268     # test the blks.rational_resampler_??? primitives...
269     #
270
271     def test_101_interp(self):
272         taps = [1, 10, 100, 1000, 10000]
273         src_data = (0, 2, 3, 5, 7, 11, 13, 17)
274         interpolation = 3
275         xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170,1700.0,17000.0,170000.0)
276         expected_result = tuple([float(x) for x in xr])
277
278         src = gr.vector_source_f(src_data)
279         op = blks.rational_resampler_fff(self.fg, interpolation, 1, taps=taps)
280         dst = gr.vector_sink_f()
281         self.fg.connect(src, op)
282         self.fg.connect(op, dst)
283         self.fg.run()
284         result_data = dst.data()
285         self.assertEqual(expected_result, result_data)
286
287
288 if __name__ == '__main__':
289     gr_unittest.main()
290