Updated license from GPL version 2 or later to GPL version 3 or later.
[debian/gnuradio] / gnuradio-core / src / python / gnuradio / gr / qa_bin_statistics.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2006 Free Software Foundation, Inc.
4
5 # This file is part of GNU Radio
6
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3, or (at your option)
10 # any later version.
11
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING.  If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
21
22
23 from gnuradio import gr, gr_unittest
24 import random
25 import struct
26
27 #import os
28 #print "pid =", os.getpid()
29 #raw_input("Attach gdb and press return...")
30
31
32 class counter(gr.feval_dd):
33     def __init__(self, step_size=1):
34         gr.feval_dd.__init__(self)
35         self.step_size = step_size
36         self.count = 0
37
38     def eval(self, input):
39         #print "eval: self.count =", self.count
40         t = self.count
41         self.count = self.count + self.step_size
42         return t
43         
44
45 class counter3(gr.feval_dd):
46     def __init__(self, f, step_size):
47         gr.feval_dd.__init__(self)
48         self.f = f
49         self.step_size = step_size
50         self.count = 0
51
52     def eval(self, input):
53         try:
54             #print "eval: self.count =", self.count
55             t = self.count
56             self.count = self.count + self.step_size
57             self.f(self.count)
58         except Exception, e:
59             print "Exception: ", e
60         return t
61         
62 def foobar3(new_t):
63     #print "foobar3: new_t =", new_t
64     pass
65
66
67 class counter4(gr.feval_dd):
68     def __init__(self, obj_instance, step_size):
69         gr.feval_dd.__init__(self)
70         self.obj_instance = obj_instance
71         self.step_size = step_size
72         self.count = 0
73
74     def eval(self, input):
75         try:
76             #print "eval: self.count =", self.count
77             t = self.count
78             self.count = self.count + self.step_size
79             self.obj_instance.foobar4(self.count)
80         except Exception, e:
81             print "Exception: ", e
82         return t
83         
84
85 class parse_msg(object):
86     def __init__(self, msg):
87         self.center_freq = msg.arg1()
88         self.vlen = int(msg.arg2())
89         assert(msg.length() == self.vlen * gr.sizeof_float)
90         self.data = struct.unpack('%df' % (self.vlen,), msg.to_string())
91
92
93 class test_bin_statistics(gr_unittest.TestCase):
94
95     def setUp(self):
96         self.fg = gr.flow_graph ()
97
98     def tearDown(self):
99         self.fg = None
100
101     def test_001(self):
102         vlen = 4
103         tune = counter(1)
104         tune_delay = 0
105         dwell_delay = 1
106         msgq = gr.msg_queue()
107
108         src_data = tuple([float(x) for x in
109                           ( 1,  2,  3,  4,
110                             5,  6,  7,  8,
111                             9, 10, 11, 12,
112                             13, 14, 15, 16
113                             )])
114
115         expected_results = tuple([float(x) for x in
116                                   ( 1,  2,  3,  4,
117                                     5,  6,  7,  8,
118                                     9, 10, 11, 12,
119                                     13, 14, 15, 16
120                                     )])
121                             
122         src = gr.vector_source_f(src_data, False)
123         s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
124         stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
125         self.fg.connect(src, s2v, stats)
126         self.fg.run()
127         self.assertEqual(4, msgq.count())
128         for i in range(4):
129             m = parse_msg(msgq.delete_head())
130             #print "m =", m.center_freq, m.data
131             self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
132
133     def test_002(self):
134         vlen = 4
135         tune = counter(1)
136         tune_delay = 1
137         dwell_delay = 2
138         msgq = gr.msg_queue()
139
140         src_data = tuple([float(x) for x in
141                           ( 1,  2,  3,  4,
142                             9,  6, 11,  8,
143                             5, 10,  7, 12,
144                             13, 14, 15, 16
145                             )])
146
147         expected_results = tuple([float(x) for x in
148                                   ( 9, 10, 11, 12)])
149                             
150         src = gr.vector_source_f(src_data, False)
151         s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
152         stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
153         self.fg.connect(src, s2v, stats)
154         self.fg.run()
155         self.assertEqual(1, msgq.count())
156         for i in range(1):
157             m = parse_msg(msgq.delete_head())
158             #print "m =", m.center_freq, m.data
159             self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
160
161
162
163     def test_003(self):
164         vlen = 4
165         tune = counter3(foobar3, 1)
166         tune_delay = 1
167         dwell_delay = 2
168         msgq = gr.msg_queue()
169
170         src_data = tuple([float(x) for x in
171                           ( 1,  2,  3,  4,
172                             9,  6, 11,  8,
173                             5, 10,  7, 12,
174                             13, 14, 15, 16
175                             )])
176
177         expected_results = tuple([float(x) for x in
178                                   ( 9, 10, 11, 12)])
179                             
180         src = gr.vector_source_f(src_data, False)
181         s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
182         stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
183         self.fg.connect(src, s2v, stats)
184         self.fg.run()
185         self.assertEqual(1, msgq.count())
186         for i in range(1):
187             m = parse_msg(msgq.delete_head())
188             #print "m =", m.center_freq, m.data
189             self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
190
191
192     def foobar4(self, new_t):
193         #print "foobar4: new_t =", new_t
194         pass
195         
196     def test_004(self):
197         vlen = 4
198         tune = counter4(self, 1)
199         tune_delay = 1
200         dwell_delay = 2
201         msgq = gr.msg_queue()
202
203         src_data = tuple([float(x) for x in
204                           ( 1,  2,  3,  4,
205                             9,  6, 11,  8,
206                             5, 10,  7, 12,
207                             13, 14, 15, 16
208                             )])
209
210         expected_results = tuple([float(x) for x in
211                                   ( 9, 10, 11, 12)])
212                             
213         src = gr.vector_source_f(src_data, False)
214         s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
215         stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
216         self.fg.connect(src, s2v, stats)
217         self.fg.run()
218         self.assertEqual(1, msgq.count())
219         for i in range(1):
220             m = parse_msg(msgq.delete_head())
221             #print "m =", m.center_freq, m.data
222             self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
223
224
225 if __name__ == '__main__':
226    gr_unittest.main ()