Imported Upstream version 3.2.2
[debian/gnuradio] / gnuradio-core / src / python / gnuradio / gr / qa_bin_statistics.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2006,2007 Free Software Foundation, Inc.
4
5 # This file is part of GNU Radio
6
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3, or (at your option)
10 # any later version.
11
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING.  If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
21
22
23 from gnuradio import gr, gr_unittest
24 import random
25 import struct
26
27 #import os
28 #print "pid =", os.getpid()
29 #raw_input("Attach gdb and press return...")
30
31 """
32 Note: The QA tests below have been disabled by renaming them from test_*
33 to xtest_*.  See ticket:199 on http://gnuradio.org/trac/ticket/199
34 """
35
36 class counter(gr.feval_dd):
37     def __init__(self, step_size=1):
38         gr.feval_dd.__init__(self)
39         self.step_size = step_size
40         self.count = 0
41
42     def eval(self, input):
43         #print "eval: self.count =", self.count
44         t = self.count
45         self.count = self.count + self.step_size
46         return t
47         
48
49 class counter3(gr.feval_dd):
50     def __init__(self, f, step_size):
51         gr.feval_dd.__init__(self)
52         self.f = f
53         self.step_size = step_size
54         self.count = 0
55
56     def eval(self, input):
57         try:
58             #print "eval: self.count =", self.count
59             t = self.count
60             self.count = self.count + self.step_size
61             self.f(self.count)
62         except Exception, e:
63             print "Exception: ", e
64         return t
65         
66 def foobar3(new_t):
67     #print "foobar3: new_t =", new_t
68     pass
69
70
71 class counter4(gr.feval_dd):
72     def __init__(self, obj_instance, step_size):
73         gr.feval_dd.__init__(self)
74         self.obj_instance = obj_instance
75         self.step_size = step_size
76         self.count = 0
77
78     def eval(self, input):
79         try:
80             #print "eval: self.count =", self.count
81             t = self.count
82             self.count = self.count + self.step_size
83             self.obj_instance.foobar4(self.count)
84         except Exception, e:
85             print "Exception: ", e
86         return t
87         
88
89 class parse_msg(object):
90     def __init__(self, msg):
91         self.center_freq = msg.arg1()
92         self.vlen = int(msg.arg2())
93         assert(msg.length() == self.vlen * gr.sizeof_float)
94         self.data = struct.unpack('%df' % (self.vlen,), msg.to_string())
95
96 # FIXME: see ticket:199
97 class xtest_bin_statistics(gr_unittest.TestCase):
98
99     def setUp(self):
100         self.tb = gr.top_block ()
101
102     def tearDown(self):
103         self.tb = None
104
105     def xtest_001(self):
106         vlen = 4
107         tune = counter(1)
108         tune_delay = 0
109         dwell_delay = 1
110         msgq = gr.msg_queue()
111
112         src_data = tuple([float(x) for x in
113                           ( 1,  2,  3,  4,
114                             5,  6,  7,  8,
115                             9, 10, 11, 12,
116                             13, 14, 15, 16
117                             )])
118
119         expected_results = tuple([float(x) for x in
120                                   ( 1,  2,  3,  4,
121                                     5,  6,  7,  8,
122                                     9, 10, 11, 12,
123                                     13, 14, 15, 16
124                                     )])
125                             
126         src = gr.vector_source_f(src_data, False)
127         s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
128         stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
129         self.tb.connect(src, s2v, stats)
130         self.tb.run()
131         self.assertEqual(4, msgq.count())
132         for i in range(4):
133             m = parse_msg(msgq.delete_head())
134             #print "m =", m.center_freq, m.data
135             self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
136
137     def xtest_002(self):
138         vlen = 4
139         tune = counter(1)
140         tune_delay = 1
141         dwell_delay = 2
142         msgq = gr.msg_queue()
143
144         src_data = tuple([float(x) for x in
145                           ( 1,  2,  3,  4,
146                             9,  6, 11,  8,
147                             5, 10,  7, 12,
148                             13, 14, 15, 16
149                             )])
150
151         expected_results = tuple([float(x) for x in
152                                   ( 9, 10, 11, 12)])
153                             
154         src = gr.vector_source_f(src_data, False)
155         s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
156         stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
157         self.tb.connect(src, s2v, stats)
158         self.tb.run()
159         self.assertEqual(1, msgq.count())
160         for i in range(1):
161             m = parse_msg(msgq.delete_head())
162             #print "m =", m.center_freq, m.data
163             self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
164
165
166
167     def xtest_003(self):
168         vlen = 4
169         tune = counter3(foobar3, 1)
170         tune_delay = 1
171         dwell_delay = 2
172         msgq = gr.msg_queue()
173
174         src_data = tuple([float(x) for x in
175                           ( 1,  2,  3,  4,
176                             9,  6, 11,  8,
177                             5, 10,  7, 12,
178                             13, 14, 15, 16
179                             )])
180
181         expected_results = tuple([float(x) for x in
182                                   ( 9, 10, 11, 12)])
183                             
184         src = gr.vector_source_f(src_data, False)
185         s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
186         stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
187         self.tb.connect(src, s2v, stats)
188         self.tb.run()
189         self.assertEqual(1, msgq.count())
190         for i in range(1):
191             m = parse_msg(msgq.delete_head())
192             #print "m =", m.center_freq, m.data
193             self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
194
195
196     def foobar4(self, new_t):
197         #print "foobar4: new_t =", new_t
198         pass
199         
200     def xtest_004(self):
201         vlen = 4
202         tune = counter4(self, 1)
203         tune_delay = 1
204         dwell_delay = 2
205         msgq = gr.msg_queue()
206
207         src_data = tuple([float(x) for x in
208                           ( 1,  2,  3,  4,
209                             9,  6, 11,  8,
210                             5, 10,  7, 12,
211                             13, 14, 15, 16
212                             )])
213
214         expected_results = tuple([float(x) for x in
215                                   ( 9, 10, 11, 12)])
216                             
217         src = gr.vector_source_f(src_data, False)
218         s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
219         stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
220         self.tb.connect(src, s2v, stats)
221         self.tb.run()
222         self.assertEqual(1, msgq.count())
223         for i in range(1):
224             m = parse_msg(msgq.delete_head())
225             #print "m =", m.center_freq, m.data
226             self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
227
228
229 if __name__ == '__main__':
230    gr_unittest.main ()