Imported Upstream version 3.2.2
[debian/gnuradio] / grc / grc_gnuradio / blks2 / probe.py
1 #
2 # Copyright 2008 Free Software Foundation, Inc.
3 #
4 # This file is part of GNU Radio
5 #
6 # GNU Radio is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3, or (at your option)
9 # any later version.
10 #
11 # GNU Radio is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with GNU Radio; see the file COPYING.  If not, write to
18 # the Free Software Foundation, Inc., 51 Franklin Street,
19 # Boston, MA 02110-1301, USA.
20 #
21
22 from gnuradio import gr
23 import threading
24 import numpy
25 import time
26
27 #######################################################################################
28 ## Probe: Function
29 #######################################################################################
30 class probe_function(gr.hier_block2, threading.Thread):
31         """
32         The thread polls the function for values and writes to a message source.
33         """
34
35         def __init__(self, probe_callback, probe_rate):
36                 #init hier block
37                 gr.hier_block2.__init__(
38                         self, 'probe_function',
39                         gr.io_signature(0, 0, 0),
40                         gr.io_signature(1, 1, gr.sizeof_float),
41                 )
42                 self._probe_callback = probe_callback
43                 self.set_probe_rate(probe_rate)
44                 #create message source
45                 message_source = gr.message_source(gr.sizeof_float, 1)
46                 self._msgq = message_source.msgq()
47                 #connect
48                 self.connect(message_source, self)
49                 #setup thread
50                 threading.Thread.__init__(self)
51                 self.setDaemon(True)
52                 self.start()
53
54         def run(self):
55                 """
56                 Infinite polling loop.
57                 """
58                 while True:
59                         time.sleep(1.0/self._probe_rate)
60                         arr = numpy.array(self._probe_callback(), numpy.float32)
61                         msg = gr.message_from_string(arr.tostring(), 0, gr.sizeof_float, 1)
62                         self._msgq.insert_tail(msg)
63
64         def set_probe_rate(self, probe_rate):
65                 self._probe_rate = probe_rate
66
67 class _probe_base(gr.hier_block2):
68         def __init__(self, probe_block, probe_callback, probe_rate):
69                 #init hier block
70                 gr.hier_block2.__init__(
71                         self, 'probe',
72                         gr.io_signature(1, 1, probe_block.input_signature().sizeof_stream_items()[0]),
73                         gr.io_signature(1, 1, gr.sizeof_float),
74                 )
75                 probe_function_block = probe_function(probe_callback, probe_rate)
76                 #forward callbacks
77                 self.set_probe_rate = probe_function_block.set_probe_rate
78                 #connect
79                 self.connect(self, probe_block)
80                 self.connect(probe_function_block, self)
81
82 #######################################################################################
83 ## Probe: Average Magnitude Squared
84 #######################################################################################
85 class _probe_avg_mag_sqrd_base(_probe_base):
86         def __init__(self, threshold, alpha, probe_rate):
87                 #create block
88                 probe_block = self._probe_block_contructor[0](threshold, alpha)
89                 #forward callbacks
90                 self.set_alpha = probe_block.set_alpha
91                 self.set_threshold = probe_block.set_threshold
92                 #init
93                 _probe_base.__init__(self, probe_block, probe_block.level, probe_rate)
94
95 class probe_avg_mag_sqrd_c(_probe_avg_mag_sqrd_base): _probe_block_contructor = (gr.probe_avg_mag_sqrd_c,)
96 class probe_avg_mag_sqrd_f(_probe_avg_mag_sqrd_base): _probe_block_contructor = (gr.probe_avg_mag_sqrd_f,)
97
98 #######################################################################################
99 ## Probe: Density
100 #######################################################################################
101 class probe_density_b(_probe_base):
102         def __init__(self, alpha, probe_rate):
103                 #create block
104                 probe_block = gr.probe_density_b(alpha)
105                 #forward callbacks
106                 self.set_alpha = probe_block.set_alpha
107                 #init
108                 _probe_base.__init__(self, probe_block, probe_block.density, probe_rate)
109
110 #######################################################################################
111 ## Probe: MPSK SNR
112 #######################################################################################
113 class probe_mpsk_snr_c(_probe_base):
114         def __init__(self, type, alpha, probe_rate):
115                 """
116                 Type can be "snr", "signal_mean", or "noise_variance" 
117                 """
118                 #create block
119                 probe_block = gr.probe_mpsk_snr_c(alpha)
120                 #forward callbacks
121                 self.set_alpha = probe_block.set_alpha
122                 #init
123                 _probe_base.__init__(self, probe_block, getattr(probe_block, type), probe_rate)