Imported Upstream version 3.2.2
[debian/gnuradio] / gnuradio-core / src / python / gnuradio / gr / qa_rational_resampler.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2005,2006,2007 Free Software Foundation, Inc.
4
5 # This file is part of GNU Radio
6
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3, or (at your option)
10 # any later version.
11
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING.  If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
21
22
23 from gnuradio import gr, gr_unittest
24 from gnuradio import blks2
25 import math
26 import random
27 import sys
28
29 #import os
30 #print os.getpid()
31 #raw_input('Attach with gdb, then press Enter: ')
32
33
34 def random_floats(n):
35     r = []
36     for x in xrange(n):
37         r.append(float(random.randint(-32768, 32768)))
38     return tuple(r)
39
40
41 def reference_dec_filter(src_data, decim, taps):
42     tb = gr.top_block()
43     src = gr.vector_source_f(src_data)
44     op = gr.fir_filter_fff(decim, taps)
45     dst = gr.vector_sink_f()
46     tb.connect(src, op, dst)
47     tb.run()
48     result_data = dst.data()
49     tb = None
50     return result_data
51
52 def reference_interp_filter(src_data, interp, taps):
53     tb = gr.top_block()
54     src = gr.vector_source_f(src_data)
55     op = gr.interp_fir_filter_fff(interp, taps)
56     dst = gr.vector_sink_f()
57     tb.connect(src, op, dst)
58     tb.run()
59     result_data = dst.data()
60     tb = None
61     return result_data
62
63 def reference_interp_dec_filter(src_data, interp, decim, taps):
64     tb = gr.top_block()
65     src = gr.vector_source_f(src_data)
66     up = gr.interp_fir_filter_fff(interp, (1,))
67     dn = gr.fir_filter_fff(decim, taps)
68     dst = gr.vector_sink_f()
69     tb.connect(src, up, dn, dst)
70     tb.run()
71     result_data = dst.data()
72     tb = None
73     return result_data
74     
75
76 class test_rational_resampler (gr_unittest.TestCase):
77
78     def setUp(self):
79         pass
80
81     def tearDown(self):
82         pass
83         
84     #
85     # test the gr.rational_resampler_base primitives...
86     #
87     
88     def test_000_1_to_1(self):
89         taps = (-4, 5)
90         src_data = (234,  -4,  23,  -56,  45,    98,  -23,  -7)
91         xr = (-936, 1186, -112, 339, -460, -167, 582)
92         expected_result = tuple([float(x) for x in xr])
93
94         tb = gr.top_block()
95         src = gr.vector_source_f(src_data)
96         op = gr.rational_resampler_base_fff(1, 1, taps)
97         dst = gr.vector_sink_f()
98         tb.connect(src, op)
99         tb.connect(op, dst)
100         tb.run()
101         result_data = dst.data()
102         self.assertEqual(expected_result, result_data)
103         
104     def test_001_interp(self):
105         taps = [1, 10, 100, 1000, 10000]
106         src_data = (0, 2, 3, 5, 7, 11, 13, 17)
107         interpolation = 3
108         xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170,1700.0,17000.0,170000.0)
109         expected_result = tuple([float(x) for x in xr])
110
111         tb = gr.top_block()
112         src = gr.vector_source_f(src_data)
113         op = gr.rational_resampler_base_fff(interpolation, 1, taps)
114         dst = gr.vector_sink_f()
115         tb.connect(src, op)
116         tb.connect(op, dst)
117         tb.run()
118         result_data = dst.data()
119         self.assertEqual(expected_result, result_data)
120
121     def test_002_interp(self):          
122         taps = random_floats(31)
123         #src_data = random_floats(10000)  # FIXME the 10k case fails!
124         src_data = random_floats(1000)
125         interpolation = 3
126
127         expected_result = reference_interp_filter(src_data, interpolation, taps)
128
129         tb = gr.top_block()
130         src = gr.vector_source_f(src_data)
131         op = gr.rational_resampler_base_fff(interpolation, 1, taps)
132         dst = gr.vector_sink_f()
133         tb.connect(src, op)
134         tb.connect(op, dst)
135         tb.run()
136         result_data = dst.data()
137
138         L1 = len(result_data)
139         L2 = len(expected_result)
140         L = min(L1, L2)
141         if False:
142             sys.stderr.write('delta = %2d: ntaps = %d interp = %d ilen = %d\n' %
143                              (L2 - L1, len(taps), interpolation, len(src_data)))
144             sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
145                              (len(result_data), len(expected_result)))
146         #self.assertEqual(expected_result[0:L], result_data[0:L])
147         # FIXME check first 3 answers
148         self.assertEqual(expected_result[3:L], result_data[3:L])
149
150     def test_003_interp(self):
151         taps = random_floats(31)
152         src_data = random_floats(10000)
153         decimation = 3
154
155         expected_result = reference_dec_filter(src_data, decimation, taps)
156
157         tb = gr.top_block()
158         src = gr.vector_source_f(src_data)
159         op = gr.rational_resampler_base_fff(1, decimation, taps)
160         dst = gr.vector_sink_f()
161         tb.connect(src, op)
162         tb.connect(op, dst)
163         tb.run()
164         result_data = dst.data()
165
166         L1 = len(result_data)
167         L2 = len(expected_result)
168         L = min(L1, L2)
169         if False:
170             sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' %
171                              (L2 - L1, len(taps), decimation, len(src_data)))
172             sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
173                              (len(result_data), len(expected_result)))
174         self.assertEqual(expected_result[0:L], result_data[0:L])
175
176     # FIXME disabled.  Triggers hang on SuSE 10.0
177     def xtest_004_decim_random_vals(self):
178         MAX_TAPS = 9
179         MAX_DECIM = 7
180         OUTPUT_LEN = 9
181
182         random.seed(0)    # we want reproducibility
183
184         for ntaps in xrange(1, MAX_TAPS + 1):
185             for decim in xrange(1, MAX_DECIM+1):
186                 for ilen in xrange(ntaps + decim, ntaps + OUTPUT_LEN*decim):
187                     src_data = random_floats(ilen)
188                     taps = random_floats(ntaps)
189                     expected_result = reference_dec_filter(src_data, decim, taps)
190
191                     tb = gr.top_block()
192                     src = gr.vector_source_f(src_data)
193                     op = gr.rational_resampler_base_fff(1, decim, taps)
194                     dst = gr.vector_sink_f()
195                     tb.connect(src, op, dst)
196                     tb.run()
197                     tb = None
198                     result_data = dst.data()
199                     L1 = len(result_data)
200                     L2 = len(expected_result)
201                     L = min(L1, L2)
202                     if False:
203                         sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' % (L2 - L1, ntaps, decim, ilen))
204                         sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
205                                          (len(result_data), len(expected_result)))
206                     self.assertEqual(expected_result[0:L], result_data[0:L])
207
208
209     # FIXME disabled.  Triggers hang on SuSE 10.0
210     def xtest_005_interp_random_vals(self):
211         MAX_TAPS = 9
212         MAX_INTERP = 7
213         INPUT_LEN = 9
214
215         random.seed(0)    # we want reproducibility
216
217         for ntaps in xrange(1, MAX_TAPS + 1):
218             for interp in xrange(1, MAX_INTERP+1):
219                 for ilen in xrange(ntaps, ntaps + INPUT_LEN):
220                     src_data = random_floats(ilen)
221                     taps = random_floats(ntaps)
222                     expected_result = reference_interp_filter(src_data, interp, taps)
223
224                     tb = gr.top_block()
225                     src = gr.vector_source_f(src_data)
226                     op = gr.rational_resampler_base_fff(interp, 1, taps)
227                     dst = gr.vector_sink_f()
228                     tb.connect(src, op, dst)
229                     tb.run()
230                     tb = None
231                     result_data = dst.data()
232                     L1 = len(result_data)
233                     L2 = len(expected_result)
234                     L = min(L1, L2)
235                     #if True or abs(L1-L2) > 1:
236                     if False:
237                         sys.stderr.write('delta = %2d: ntaps = %d interp = %d ilen = %d\n' % (L2 - L1, ntaps, interp, ilen))
238                         #sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
239                         #                 (len(result_data), len(expected_result)))
240                     #self.assertEqual(expected_result[0:L], result_data[0:L])
241                     # FIXME check first ntaps+1 answers
242                     self.assertEqual(expected_result[ntaps+1:L], result_data[ntaps+1:L])
243
244
245     def test_006_interp_decim(self):
246         taps = (0,1,0,0)
247         src_data = range(10000)
248         interp = 3
249         decimation = 2
250
251         expected_result = reference_interp_dec_filter(src_data, interp, decimation, taps)
252
253         tb = gr.top_block()
254         src = gr.vector_source_f(src_data)
255         op = gr.rational_resampler_base_fff(interp, decimation, taps)
256         dst = gr.vector_sink_f()
257         tb.connect(src, op)
258         tb.connect(op, dst)
259         tb.run()
260         result_data = dst.data()
261
262         L1 = len(result_data)
263         L2 = len(expected_result)
264         L = min(L1, L2)
265         if False:
266             sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' %
267                              (L2 - L1, len(taps), decimation, len(src_data)))
268             sys.stderr.write('  len(result_data) = %d  len(expected_result) = %d\n' %
269                              (len(result_data), len(expected_result)))
270         self.assertEqual(expected_result[1:L], result_data[1:L])
271
272     #
273     # test the blks2.rational_resampler_??? primitives...
274     #
275
276     def test_101_interp(self):
277         taps = [1, 10, 100, 1000, 10000]
278         src_data = (0, 2, 3, 5, 7, 11, 13, 17)
279         interpolation = 3
280         xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170,1700.0,17000.0,170000.0)
281         expected_result = tuple([float(x) for x in xr])
282
283         tb = gr.top_block()
284         src = gr.vector_source_f(src_data)
285         op = blks2.rational_resampler_fff(interpolation, 1, taps=taps)
286         dst = gr.vector_sink_f()
287         tb.connect(src, op)
288         tb.connect(op, dst)
289         tb.run()
290         result_data = dst.data()
291         self.assertEqual(expected_result, result_data)
292
293
294 if __name__ == '__main__':
295     pass
296     # FIXME: Disabled, see ticket:210
297     # gr_unittest.main()
298