# Boston, MA 02110-1301, USA.
#
-import threading
-import numpy
-import math
+##################################################
+# conditional disconnections of wx flow graph
+##################################################
import wx
-EVT_DATA = wx.NewEventType()
-class DataEvent(wx.PyEvent):
- def __init__(self, data):
- wx.PyEvent.__init__(self, wx.NewId(), EVT_DATA)
- self.data = data
+def bind_to_visible_event(win, callback):
+ """
+ Bind a callback to a window when its visibility changes.
+ Specifically, callback when the window changes visibility
+ when a update ui event in the window occurs.
+ @param win the wx window
+ @param callback a 1 param function
+ """
+ #is the window visible in the hierarchy
+ def is_wx_window_visible(my_win):
+ while True:
+ parent = my_win.GetParent()
+ if not parent: return True #reached the top of the hierarchy
+ #if we are hidden, then finish, otherwise keep traversing up
+ if isinstance(parent, wx.Notebook) and parent.GetCurrentPage() != my_win: return False
+ my_win = parent
+ #call the callback, the arg is shown or not
+ def callback_factory(my_win, my_callback):
+ return lambda *args: my_callback(is_wx_window_visible(my_win))
+ handler = callback_factory(win, callback)
+ #bind the handler to all the parent notebooks
+ win.Bind(wx.EVT_UPDATE_UI, handler)
+
+from gnuradio import gr
+
+def conditional_connect_callback_factory(source, sink, hb, win, size):
+ nulls = list()
+ cache = [None]
+ def callback(visible, init=False):
+ if visible == cache[0]: return
+ cache[0] = visible
+ if not init: hb.lock()
+ #print 'visible', visible, source, sink
+ if visible:
+ if not init:
+ hb.disconnect(source, nulls[0])
+ hb.disconnect(nulls[1], nulls[2])
+ hb.disconnect(nulls[2], sink)
+ while nulls: nulls.pop()
+ hb.connect(source, sink)
+ else:
+ if not init: hb.disconnect(source, sink)
+ nulls.extend([gr.null_sink(size), gr.null_source(size), gr.head(size, 0)])
+ hb.connect(source, nulls[0])
+ hb.connect(nulls[1], nulls[2], sink)
+ if not init: hb.unlock()
+ return callback
+
+def conditional_connect(source, sink, hb, win, size):
+ handler = conditional_connect_callback_factory(source, sink, hb, win, size)
+ handler(False, init=True) #initially connect
+ bind_to_visible_event(win, handler)
+
+class wxgui_hb(object):
+ def wxgui_connect(self, *points):
+ """
+ Use wxgui connect when the first point is the self source of the hb.
+ The win property of this object should be set to the wx window.
+ When this method tries to connect self to the next point,
+ it will conditionally make this connection based on the visibility state.
+ """
+ try:
+ assert points[0] == self or points[0][0] == self
+ conditional_connect(
+ points[0], points[1],
+ win=self.win, hb=self,
+ size=self._hb.input_signature().sizeof_stream_item(0),
+ )
+ if len(points[1:]) > 1: self.connect(*points[1:])
+ except (AssertionError, IndexError): self.connect(*points)
-class prop_setter(object):
- def _register_set_prop(self, controller, control_key, *args):
- def set_method(value): controller[control_key] = value
- if args: set_method(args[0])
- setattr(self, 'set_%s'%control_key, set_method)
+#A macro to apply an index to a key
+index_key = lambda key, i: "%s_%d"%(key, i+1)
-##################################################
-# Input Watcher Thread
-##################################################
-class input_watcher(threading.Thread):
+def _register_access_method(destination, controller, key):
"""
- Input watcher thread runs forever.
- Read messages from the message queue.
- Forward messages to the message handler.
+ Helper function for register access methods.
+ This helper creates distinct set and get methods for each key
+ and adds them to the destination object.
"""
- def __init__ (self, msgq, handle_msg):
- threading.Thread.__init__(self)
- self.setDaemon(1)
- self.msgq = msgq
- self._handle_msg = handle_msg
- self.keep_running = True
- self.start()
+ def set(value): controller[key] = value
+ setattr(destination, 'set_'+key, set)
+ def get(): return controller[key]
+ setattr(destination, 'get_'+key, get)
- def run(self):
- while self.keep_running: self._handle_msg(self.msgq.delete_head().to_string())
+def register_access_methods(destination, controller):
+ """
+ Register setter and getter functions in the destination object for all keys in the controller.
+ @param destination the object to get new setter and getter methods
+ @param controller the pubsub controller
+ """
+ for key in controller.keys(): _register_access_method(destination, controller, key)
##################################################
-# WX Shared Classes
+# Input Watcher Thread
##################################################
-class LabelText(wx.StaticText):
- """
- Label text to give the wx plots a uniform look.
- Get the default label text and set the font bold.
- """
- def __init__(self, parent, label):
- wx.StaticText.__init__(self, parent, -1, label)
- font = self.GetFont()
- font.SetWeight(wx.FONTWEIGHT_BOLD)
- self.SetFont(font)
+from gnuradio import gru
-class IncrDecrButtons(wx.BoxSizer):
+class input_watcher(gru.msgq_runner):
"""
- A horizontal box sizer with a increment and a decrement button.
+ Input watcher thread runs forever.
+ Read messages from the message queue.
+ Forward messages to the message handler.
"""
- def __init__(self, parent, on_incr, on_decr):
- """
- @param parent the parent window
- @param on_incr the event handler for increment
- @param on_decr the event handler for decrement
- """
- wx.BoxSizer.__init__(self, wx.HORIZONTAL)
- self._incr_button = wx.Button(parent, -1, '+', style=wx.BU_EXACTFIT)
- self._incr_button.Bind(wx.EVT_BUTTON, on_incr)
- self.Add(self._incr_button, 0, wx.ALIGN_CENTER_VERTICAL)
- self._decr_button = wx.Button(parent, -1, ' - ', style=wx.BU_EXACTFIT)
- self._decr_button.Bind(wx.EVT_BUTTON, on_decr)
- self.Add(self._decr_button, 0, wx.ALIGN_CENTER_VERTICAL)
-
- def Disable(self, disable=True): self.Enable(not disable)
- def Enable(self, enable=True):
- if enable:
- self._incr_button.Enable()
- self._decr_button.Enable()
- else:
- self._incr_button.Disable()
- self._decr_button.Disable()
-
-class ToggleButtonController(wx.Button):
- def __init__(self, parent, controller, control_key, true_label, false_label):
+ def __init__ (self, msgq, controller, msg_key, arg1_key='', arg2_key=''):
self._controller = controller
- self._control_key = control_key
- wx.Button.__init__(self, parent, -1, '', style=wx.BU_EXACTFIT)
- self.Bind(wx.EVT_BUTTON, self._evt_button)
- controller.subscribe(control_key, lambda x: self.SetLabel(x and true_label or false_label))
+ self._msg_key = msg_key
+ self._arg1_key = arg1_key
+ self._arg2_key = arg2_key
+ gru.msgq_runner.__init__(self, msgq, self.handle_msg)
- def _evt_button(self, e):
- self._controller[self._control_key] = not self._controller[self._control_key]
+ def handle_msg(self, msg):
+ if self._arg1_key: self._controller[self._arg1_key] = msg.arg1()
+ if self._arg2_key: self._controller[self._arg2_key] = msg.arg2()
+ self._controller[self._msg_key] = msg.to_string()
-class CheckBoxController(wx.CheckBox):
- def __init__(self, parent, label, controller, control_key):
- self._controller = controller
- self._control_key = control_key
- wx.CheckBox.__init__(self, parent, style=wx.CHK_2STATE, label=label)
- self.Bind(wx.EVT_CHECKBOX, self._evt_checkbox)
- controller.subscribe(control_key, lambda x: self.SetValue(bool(x)))
-
- def _evt_checkbox(self, e):
- self._controller[self._control_key] = bool(e.IsChecked())
-
-class LogSliderController(wx.BoxSizer):
- """
- Log slider controller with display label and slider.
- Gives logarithmic scaling to slider operation.
- """
- def __init__(self, parent, label, min_exp, max_exp, slider_steps, controller, control_key, formatter=lambda x: ': %.6f'%x):
- wx.BoxSizer.__init__(self, wx.VERTICAL)
- self._label = wx.StaticText(parent, -1, label + formatter(1/3.0))
- self.Add(self._label, 0, wx.EXPAND)
- self._slider = wx.Slider(parent, -1, 0, 0, slider_steps, style=wx.SL_HORIZONTAL)
- self.Add(self._slider, 0, wx.EXPAND)
- def _on_slider_event(event):
- controller[control_key] = \
- 10**(float(max_exp-min_exp)*self._slider.GetValue()/slider_steps + min_exp)
- self._slider.Bind(wx.EVT_SLIDER, _on_slider_event)
- def _on_controller_set(value):
- self._label.SetLabel(label + formatter(value))
- slider_value = slider_steps*(math.log10(value)-min_exp)/(max_exp-min_exp)
- slider_value = min(max(0, slider_value), slider_steps)
- if abs(slider_value - self._slider.GetValue()) > 1:
- self._slider.SetValue(slider_value)
- controller.subscribe(control_key, _on_controller_set)
-
- def Disable(self, disable=True): self.Enable(not disable)
- def Enable(self, enable=True):
- if enable:
- self._slider.Enable()
- self._label.Enable()
- else:
- self._slider.Disable()
- self._label.Disable()
-
-class DropDownController(wx.BoxSizer):
- """
- Drop down controller with label and chooser.
- Srop down selection from a set of choices.
- """
- def __init__(self, parent, label, choices, controller, control_key, size=(-1, -1)):
- """
- @param parent the parent window
- @param label the label for the drop down
- @param choices a list of tuples -> (label, value)
- @param controller the prop val controller
- @param control_key the prop key for this control
- """
- wx.BoxSizer.__init__(self, wx.HORIZONTAL)
- self._label = wx.StaticText(parent, -1, ' %s '%label)
- self.Add(self._label, 1, wx.ALIGN_CENTER_VERTICAL)
- self._chooser = wx.Choice(parent, -1, choices=[c[0] for c in choices], size=size)
- def _on_chooser_event(event):
- controller[control_key] = choices[self._chooser.GetSelection()][1]
- self._chooser.Bind(wx.EVT_CHOICE, _on_chooser_event)
- self.Add(self._chooser, 0, wx.ALIGN_CENTER_VERTICAL)
- def _on_controller_set(value):
- #only set the chooser if the value is a possible choice
- for i, choice in enumerate(choices):
- if value == choice[1]: self._chooser.SetSelection(i)
- controller.subscribe(control_key, _on_controller_set)
-
- def Disable(self, disable=True): self.Enable(not disable)
- def Enable(self, enable=True):
- if enable:
- self._chooser.Enable()
- self._label.Enable()
- else:
- self._chooser.Disable()
- self._label.Disable()
##################################################
# Shared Functions
##################################################
+import numpy
+import math
+
def get_exp(num):
"""
Get the exponent of the number in base 10.
@return the closest number
"""
if num == 0: return 0
- if num > 0: sign = 1
- else: sign = -1
+ sign = num > 0 and 1 or -1
exp = get_exp(num)
nums = numpy.array((1, 2, 5, 10))*(10**exp)
return sign*nums[numpy.argmin(numpy.abs(nums - abs(num)))]
"""
scale_factor = 3
mean = numpy.average(samples)
- rms = scale_factor*((numpy.sum((samples-mean)**2)/len(samples))**.5)
- min = mean - rms
- max = mean + rms
- return min, max
-
-def get_si_components(num):
- """
- Get the SI units for the number.
- Extract the coeff and exponent of the number.
- The exponent will be a multiple of 3.
- @param num the floating point number
- @return the tuple coeff, exp, prefix
- """
- exp = get_exp(num)
- exp -= exp%3
- exp = min(max(exp, -24), 24) #bounds on SI table below
- prefix = {
- 24: 'Y', 21: 'Z',
- 18: 'E', 15: 'P',
- 12: 'T', 9: 'G',
- 6: 'M', 3: 'K',
- 0: '',
- -3: 'm', -6: 'u',
- -9: 'n', -12: 'p',
- -15: 'f', -18: 'a',
- -21: 'z', -24: 'y',
- }[exp]
- coeff = num/10**exp
- return coeff, exp, prefix
+ rms = numpy.max([scale_factor*((numpy.sum((samples-mean)**2)/len(samples))**.5), .1])
+ min_val = mean - rms
+ max_val = mean + rms
+ return min_val, max_val
-def label_format(num):
+def get_min_max_fft(fft_samps):
"""
- Format a floating point number into a presentable string.
- If the number has an small enough exponent, use regular decimal.
- Otherwise, format the number with floating point notation.
- Exponents are normalized to multiples of 3.
- In the case where the exponent was found to be -3,
- it is best to display this as a regular decimal, with a 0 to the left.
- @param num the number to format
- @return a label string
+ Get the minimum and maximum bounds for an array of fft samples.
+ @param samples the array of real values
+ @return a tuple of min, max
"""
- coeff, exp, prefix = get_si_components(num)
- if -3 <= exp < 3: return '%g'%num
- return '%se%d'%('%.3g'%coeff, exp)
-
-if __name__ == '__main__':
- import random
- for i in range(-25, 25):
- num = random.random()*10**i
- print num, ':', get_si_components(num)
+ #get the peak level (max of the samples)
+ peak_level = numpy.max(fft_samps)
+ #separate noise samples
+ noise_samps = numpy.sort(fft_samps)[:len(fft_samps)/2]
+ #get the noise floor
+ noise_floor = numpy.average(noise_samps)
+ #get the noise deviation
+ noise_dev = numpy.std(noise_samps)
+ #determine the maximum and minimum levels
+ max_level = peak_level
+ min_level = noise_floor - abs(2*noise_dev)
+ return min_level, max_level