1 # Copyright 2008 Free Software Foundation, Inc.
3 # This file is part of GNU Radio
5 # GNU Radio is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3, or (at your option)
10 # GNU Radio is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with GNU Radio; see the file COPYING. If not, write to
17 # the Free Software Foundation, Inc., 51 Franklin Street,
18 # Boston, MA 02110-1301, USA.
21 default_win_size = 1000
23 from gnuradio import gr
24 import gnuradio.gr.gr_threading as _threading
27 #generate 1s counts array
28 _1s_counts = [sum([1&(i>>j) for j in range(8)]) for i in range(2**8)]
30 class input_watcher(_threading.Thread):
32 Read samples from the message queue and hand them to the callback.
35 def __init__(self, msgq, callback):
37 self._callback = callback
38 _threading.Thread.__init__(self)
40 self.keep_running = True
46 msg = self._msgq.delete_head()
47 itemsize = int(msg.arg1())
48 nitems = int(msg.arg2())
49 s = r + msg.to_string()
50 i = (nitems-nitems%2)*itemsize
53 samples = numpy.fromstring(s, numpy.int8)
54 self._callback(samples)
56 class error_rate(gr.hier_block2):
58 Sample the incoming data streams (byte) and calculate the bit or symbol error rate.
59 Write the running rate to the output data stream (float).
62 def __init__(self, type='BER', win_size=default_win_size, bits_per_symbol=2):
64 Error rate constructor.
65 @param type a string 'BER' or 'SER'
66 @param win_size the number of samples to calculate over
67 @param bits_per_symbol the number of information bits per symbol (BER only)
70 gr.hier_block2.__init__(
72 gr.io_signature(2, 2, gr.sizeof_char),
73 gr.io_signature(1, 1, gr.sizeof_float),
75 assert type in ('BER', 'SER')
76 self._max_samples = win_size
77 self._bits_per_symbol = bits_per_symbol
79 msg_source = gr.message_source(gr.sizeof_float, 1)
80 self._msgq_source = msg_source.msgq()
81 msgq_sink = gr.msg_queue(2)
82 msg_sink = gr.message_sink(gr.sizeof_char, msgq_sink, False) #False -> blocking
83 inter = gr.interleave(gr.sizeof_char)
88 self._err_array = numpy.zeros(self._max_samples, numpy.int8)
90 input_watcher(msgq_sink, self._handler_ber)
92 input_watcher(msgq_sink, self._handler_ser)
94 self.connect(msg_source, self)
95 self.connect((self, 0), (inter, 0))
96 self.connect((self, 1), (inter, 1))
97 self.connect(inter, msg_sink)
99 def _handler_ber(self, samples):
101 arr = numpy.zeros(num, numpy.float32)
103 old_err = self._err_array[self._err_index]
105 self._err_array[self._err_index] = _1s_counts[samples[i*2] ^ samples[i*2 + 1]]
106 self._num_errs = self._num_errs + self._err_array[self._err_index] - old_err
108 self._err_index = (self._err_index + 1)%self._max_samples
109 self._num_samps = min(self._num_samps + 1, self._max_samples)
111 arr[i] = float(self._num_errs)/float(self._num_samps*self._bits_per_symbol)
113 msg = gr.message_from_string(arr.tostring(), 0, gr.sizeof_float, num)
114 self._msgq_source.insert_tail(msg)
116 def _handler_ser(self, samples):
118 arr = numpy.zeros(num, numpy.float32)
120 old_err = self._err_array[self._err_index]
123 res = samples[i*2 + 1]
125 self._err_array[self._err_index] = 0
127 self._err_array[self._err_index] = 1
128 #update number of errors
129 self._num_errs = self._num_errs + self._err_array[self._err_index] - old_err
131 self._err_index = (self._err_index + 1)%self._max_samples
132 self._num_samps = min(self._num_samps + 1, self._max_samples)
134 arr[i] = float(self._num_errs)/float(self._num_samps)
136 msg = gr.message_from_string(arr.tostring(), 0, gr.sizeof_float, num)
137 self._msgq_source.insert_tail(msg)