3 # Copyright 2006 Free Software Foundation, Inc.
5 # This file is part of GNU Radio
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3, or (at your option)
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING. If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
23 from gnuradio import gr, gr_unittest
28 #print "pid =", os.getpid()
29 #raw_input("Attach gdb and press return...")
32 class counter(gr.feval_dd):
33 def __init__(self, step_size=1):
34 gr.feval_dd.__init__(self)
35 self.step_size = step_size
38 def eval(self, input):
39 #print "eval: self.count =", self.count
41 self.count = self.count + self.step_size
45 class counter3(gr.feval_dd):
46 def __init__(self, f, step_size):
47 gr.feval_dd.__init__(self)
49 self.step_size = step_size
52 def eval(self, input):
54 #print "eval: self.count =", self.count
56 self.count = self.count + self.step_size
59 print "Exception: ", e
63 #print "foobar3: new_t =", new_t
67 class counter4(gr.feval_dd):
68 def __init__(self, obj_instance, step_size):
69 gr.feval_dd.__init__(self)
70 self.obj_instance = obj_instance
71 self.step_size = step_size
74 def eval(self, input):
76 #print "eval: self.count =", self.count
78 self.count = self.count + self.step_size
79 self.obj_instance.foobar4(self.count)
81 print "Exception: ", e
85 class parse_msg(object):
86 def __init__(self, msg):
87 self.center_freq = msg.arg1()
88 self.vlen = int(msg.arg2())
89 assert(msg.length() == self.vlen * gr.sizeof_float)
90 self.data = struct.unpack('%df' % (self.vlen,), msg.to_string())
93 class test_bin_statistics(gr_unittest.TestCase):
96 self.fg = gr.flow_graph ()
106 msgq = gr.msg_queue()
108 src_data = tuple([float(x) for x in
115 expected_results = tuple([float(x) for x in
122 src = gr.vector_source_f(src_data, False)
123 s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
124 stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
125 self.fg.connect(src, s2v, stats)
127 self.assertEqual(4, msgq.count())
129 m = parse_msg(msgq.delete_head())
130 #print "m =", m.center_freq, m.data
131 self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
138 msgq = gr.msg_queue()
140 src_data = tuple([float(x) for x in
147 expected_results = tuple([float(x) for x in
150 src = gr.vector_source_f(src_data, False)
151 s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
152 stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
153 self.fg.connect(src, s2v, stats)
155 self.assertEqual(1, msgq.count())
157 m = parse_msg(msgq.delete_head())
158 #print "m =", m.center_freq, m.data
159 self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
165 tune = counter3(foobar3, 1)
168 msgq = gr.msg_queue()
170 src_data = tuple([float(x) for x in
177 expected_results = tuple([float(x) for x in
180 src = gr.vector_source_f(src_data, False)
181 s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
182 stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
183 self.fg.connect(src, s2v, stats)
185 self.assertEqual(1, msgq.count())
187 m = parse_msg(msgq.delete_head())
188 #print "m =", m.center_freq, m.data
189 self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
192 def foobar4(self, new_t):
193 #print "foobar4: new_t =", new_t
198 tune = counter4(self, 1)
201 msgq = gr.msg_queue()
203 src_data = tuple([float(x) for x in
210 expected_results = tuple([float(x) for x in
213 src = gr.vector_source_f(src_data, False)
214 s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
215 stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
216 self.fg.connect(src, s2v, stats)
218 self.assertEqual(1, msgq.count())
220 m = parse_msg(msgq.delete_head())
221 #print "m =", m.center_freq, m.data
222 self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
225 if __name__ == '__main__':