3 # Copyright 2006,2007 Free Software Foundation, Inc.
5 # This file is part of GNU Radio
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3, or (at your option)
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING. If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
23 from gnuradio import gr, gr_unittest
28 #print "pid =", os.getpid()
29 #raw_input("Attach gdb and press return...")
32 Note: The QA tests below have been disabled by renaming them from test_*
33 to xtest_*. See ticket:199 on http://gnuradio.org/trac/ticket/199
36 class counter(gr.feval_dd):
37 def __init__(self, step_size=1):
38 gr.feval_dd.__init__(self)
39 self.step_size = step_size
42 def eval(self, input):
43 #print "eval: self.count =", self.count
45 self.count = self.count + self.step_size
49 class counter3(gr.feval_dd):
50 def __init__(self, f, step_size):
51 gr.feval_dd.__init__(self)
53 self.step_size = step_size
56 def eval(self, input):
58 #print "eval: self.count =", self.count
60 self.count = self.count + self.step_size
63 print "Exception: ", e
67 #print "foobar3: new_t =", new_t
71 class counter4(gr.feval_dd):
72 def __init__(self, obj_instance, step_size):
73 gr.feval_dd.__init__(self)
74 self.obj_instance = obj_instance
75 self.step_size = step_size
78 def eval(self, input):
80 #print "eval: self.count =", self.count
82 self.count = self.count + self.step_size
83 self.obj_instance.foobar4(self.count)
85 print "Exception: ", e
89 class parse_msg(object):
90 def __init__(self, msg):
91 self.center_freq = msg.arg1()
92 self.vlen = int(msg.arg2())
93 assert(msg.length() == self.vlen * gr.sizeof_float)
94 self.data = struct.unpack('%df' % (self.vlen,), msg.to_string())
96 # FIXME: see ticket:199
97 class xtest_bin_statistics(gr_unittest.TestCase):
100 self.tb = gr.top_block ()
110 msgq = gr.msg_queue()
112 src_data = tuple([float(x) for x in
119 expected_results = tuple([float(x) for x in
126 src = gr.vector_source_f(src_data, False)
127 s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
128 stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
129 self.tb.connect(src, s2v, stats)
131 self.assertEqual(4, msgq.count())
133 m = parse_msg(msgq.delete_head())
134 #print "m =", m.center_freq, m.data
135 self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
142 msgq = gr.msg_queue()
144 src_data = tuple([float(x) for x in
151 expected_results = tuple([float(x) for x in
154 src = gr.vector_source_f(src_data, False)
155 s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
156 stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
157 self.tb.connect(src, s2v, stats)
159 self.assertEqual(1, msgq.count())
161 m = parse_msg(msgq.delete_head())
162 #print "m =", m.center_freq, m.data
163 self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
169 tune = counter3(foobar3, 1)
172 msgq = gr.msg_queue()
174 src_data = tuple([float(x) for x in
181 expected_results = tuple([float(x) for x in
184 src = gr.vector_source_f(src_data, False)
185 s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
186 stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
187 self.tb.connect(src, s2v, stats)
189 self.assertEqual(1, msgq.count())
191 m = parse_msg(msgq.delete_head())
192 #print "m =", m.center_freq, m.data
193 self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
196 def foobar4(self, new_t):
197 #print "foobar4: new_t =", new_t
202 tune = counter4(self, 1)
205 msgq = gr.msg_queue()
207 src_data = tuple([float(x) for x in
214 expected_results = tuple([float(x) for x in
217 src = gr.vector_source_f(src_data, False)
218 s2v = gr.stream_to_vector(gr.sizeof_float, vlen)
219 stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
220 self.tb.connect(src, s2v, stats)
222 self.assertEqual(1, msgq.count())
224 m = parse_msg(msgq.delete_head())
225 #print "m =", m.center_freq, m.data
226 self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
229 if __name__ == '__main__':