X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=gr-wxgui%2Fsrc%2Fpython%2Fcommon.py;h=dca41c9a3ec1d0f11fa8e48f4b784c29f498e1be;hb=af5e21e35cda1d9d3e4bd659364dafe181274064;hp=f31e1a7085aac9983889668c2c8a72496815f464;hpb=73f3c91a5ad2acbb9f32bd440f2d4ea9ab54ac08;p=debian%2Fgnuradio diff --git a/gr-wxgui/src/python/common.py b/gr-wxgui/src/python/common.py index f31e1a70..dca41c9a 100644 --- a/gr-wxgui/src/python/common.py +++ b/gr-wxgui/src/python/common.py @@ -19,180 +19,135 @@ # Boston, MA 02110-1301, USA. # -import threading -import numpy -import math -import wx - -class prop_setter(object): - def _register_set_prop(self, controller, control_key, *args): - def set_method(value): controller[control_key] = value - if args: set_method(args[0]) - setattr(self, 'set_%s'%control_key, set_method) - ################################################## -# Custom Data Event +# conditional disconnections of wx flow graph ################################################## -EVT_DATA = wx.PyEventBinder(wx.NewEventType()) -class DataEvent(wx.PyEvent): - def __init__(self, data): - wx.PyEvent.__init__(self, wx.NewId(), EVT_DATA.typeId) - self.data = data +import wx -################################################## -# Input Watcher Thread -################################################## -class input_watcher(threading.Thread): - """ - Input watcher thread runs forever. - Read messages from the message queue. - Forward messages to the message handler. - """ - def __init__ (self, msgq, handle_msg): - threading.Thread.__init__(self) - self.setDaemon(1) - self.msgq = msgq - self._handle_msg = handle_msg - self.keep_running = True - self.start() +def bind_to_visible_event(win, callback): + """ + Bind a callback to a window when its visibility changes. + Specifically, callback when the window changes visibility + when a notebook tab event in one of the parents occurs. + @param win the wx window + @param callback a 1 param function + """ + #is the window visible in the hierarchy + def is_wx_window_visible(my_win): + while True: + parent = my_win.GetParent() + if not parent: return True #reached the top of the hierarchy + #if we are hidden, then finish, otherwise keep traversing up + if isinstance(parent, wx.Notebook) and parent.GetCurrentPage() != my_win: return False + my_win = parent + #call the callback, the arg is shown or not + def callback_factory(my_win, my_callback): + return lambda *args: my_callback(is_wx_window_visible(my_win)) + handler = callback_factory(win, callback) + #bind the handler to all the parent notebooks + while win: + if isinstance(win, wx.Notebook): + win.Bind(wx.EVT_NOTEBOOK_PAGE_CHANGED, handler) + if not win.GetParent(): + win.Bind(wx.EVT_ACTIVATE, handler) + win = win.GetParent() + +from gnuradio import gr + +def conditional_connect(source, sink, hb, win, size): + nulls = list() + cache = [None] + def callback(visible, init=False): + if visible == cache[0]: return + cache[0] = visible + if not init: hb.lock() + print 'visible', visible, source, sink + if visible: + if not init: + hb.disconnect(source, nulls[0]) + hb.disconnect(nulls[1], nulls[2]) + hb.disconnect(nulls[2], sink) + while nulls: nulls.pop() + hb.connect(source, sink) + else: + if not init: hb.disconnect(source, sink) + nulls.extend([gr.null_sink(size), gr.null_source(size), gr.head(size, 0)]) + hb.connect(source, nulls[0]) + hb.connect(nulls[1], nulls[2], sink) + if not init: hb.unlock() + callback(False, init=True) #initially connect + bind_to_visible_event(win, callback) + +class wxgui_hb(object): + def wxgui_connect(self, *points): + """ + Use wxgui connect when the first point is the self source of the hb. + The win property of this object should be set to the wx window. + When this method tries to connect self to the next point, + it will conditionally make this connection based on the visibility state. + """ + try: + assert points[0] == self or points[0][0] == self + conditional_connect( + points[0], points[1], + win=self.win, hb=self, + size=self._hb.input_signature().sizeof_stream_item(0), + ) + if len(points[1:]) > 1: self.connect(*points[1:]) + except (AssertionError, IndexError): self.connect(*points) - def run(self): - while self.keep_running: self._handle_msg(self.msgq.delete_head().to_string()) +#A macro to apply an index to a key +index_key = lambda key, i: "%s_%d"%(key, i+1) -################################################## -# WX Shared Classes -################################################## -class LabelText(wx.StaticText): +def _register_access_method(destination, controller, key): """ - Label text to give the wx plots a uniform look. - Get the default label text and set the font bold. + Helper function for register access methods. + This helper creates distinct set and get methods for each key + and adds them to the destination object. """ - def __init__(self, parent, label): - wx.StaticText.__init__(self, parent, -1, label) - font = self.GetFont() - font.SetWeight(wx.FONTWEIGHT_BOLD) - self.SetFont(font) + def set(value): controller[key] = value + setattr(destination, 'set_'+key, set) + def get(): return controller[key] + setattr(destination, 'get_'+key, get) -class IncrDecrButtons(wx.BoxSizer): +def register_access_methods(destination, controller): """ - A horizontal box sizer with a increment and a decrement button. + Register setter and getter functions in the destination object for all keys in the controller. + @param destination the object to get new setter and getter methods + @param controller the pubsub controller """ - def __init__(self, parent, on_incr, on_decr): - """ - @param parent the parent window - @param on_incr the event handler for increment - @param on_decr the event handler for decrement - """ - wx.BoxSizer.__init__(self, wx.HORIZONTAL) - self._incr_button = wx.Button(parent, -1, '+', style=wx.BU_EXACTFIT) - self._incr_button.Bind(wx.EVT_BUTTON, on_incr) - self.Add(self._incr_button, 0, wx.ALIGN_CENTER_VERTICAL) - self._decr_button = wx.Button(parent, -1, ' - ', style=wx.BU_EXACTFIT) - self._decr_button.Bind(wx.EVT_BUTTON, on_decr) - self.Add(self._decr_button, 0, wx.ALIGN_CENTER_VERTICAL) + for key in controller.keys(): _register_access_method(destination, controller, key) - def Disable(self, disable=True): self.Enable(not disable) - def Enable(self, enable=True): - if enable: - self._incr_button.Enable() - self._decr_button.Enable() - else: - self._incr_button.Disable() - self._decr_button.Disable() - -class ToggleButtonController(wx.Button): - def __init__(self, parent, controller, control_key, true_label, false_label): - self._controller = controller - self._control_key = control_key - wx.Button.__init__(self, parent, -1, '', style=wx.BU_EXACTFIT) - self.Bind(wx.EVT_BUTTON, self._evt_button) - controller.subscribe(control_key, lambda x: self.SetLabel(x and true_label or false_label)) - - def _evt_button(self, e): - self._controller[self._control_key] = not self._controller[self._control_key] - -class CheckBoxController(wx.CheckBox): - def __init__(self, parent, label, controller, control_key): - self._controller = controller - self._control_key = control_key - wx.CheckBox.__init__(self, parent, style=wx.CHK_2STATE, label=label) - self.Bind(wx.EVT_CHECKBOX, self._evt_checkbox) - controller.subscribe(control_key, lambda x: self.SetValue(bool(x))) - - def _evt_checkbox(self, e): - self._controller[self._control_key] = bool(e.IsChecked()) +################################################## +# Input Watcher Thread +################################################## +from gnuradio import gru -class LogSliderController(wx.BoxSizer): +class input_watcher(gru.msgq_runner): """ - Log slider controller with display label and slider. - Gives logarithmic scaling to slider operation. + Input watcher thread runs forever. + Read messages from the message queue. + Forward messages to the message handler. """ - def __init__(self, parent, label, min_exp, max_exp, slider_steps, controller, control_key, formatter=lambda x: ': %.6f'%x): - wx.BoxSizer.__init__(self, wx.VERTICAL) - self._label = wx.StaticText(parent, -1, label + formatter(1/3.0)) - self.Add(self._label, 0, wx.EXPAND) - self._slider = wx.Slider(parent, -1, 0, 0, slider_steps, style=wx.SL_HORIZONTAL) - self.Add(self._slider, 0, wx.EXPAND) - def _on_slider_event(event): - controller[control_key] = \ - 10**(float(max_exp-min_exp)*self._slider.GetValue()/slider_steps + min_exp) - self._slider.Bind(wx.EVT_SLIDER, _on_slider_event) - def _on_controller_set(value): - self._label.SetLabel(label + formatter(value)) - slider_value = slider_steps*(math.log10(value)-min_exp)/(max_exp-min_exp) - slider_value = min(max(0, slider_value), slider_steps) - if abs(slider_value - self._slider.GetValue()) > 1: - self._slider.SetValue(slider_value) - controller.subscribe(control_key, _on_controller_set) - - def Disable(self, disable=True): self.Enable(not disable) - def Enable(self, enable=True): - if enable: - self._slider.Enable() - self._label.Enable() - else: - self._slider.Disable() - self._label.Disable() + def __init__ (self, msgq, controller, msg_key, arg1_key='', arg2_key=''): + self._controller = controller + self._msg_key = msg_key + self._arg1_key = arg1_key + self._arg2_key = arg2_key + gru.msgq_runner.__init__(self, msgq, self.handle_msg) -class DropDownController(wx.BoxSizer): - """ - Drop down controller with label and chooser. - Srop down selection from a set of choices. - """ - def __init__(self, parent, label, choices, controller, control_key, size=(-1, -1)): - """ - @param parent the parent window - @param label the label for the drop down - @param choices a list of tuples -> (label, value) - @param controller the prop val controller - @param control_key the prop key for this control - """ - wx.BoxSizer.__init__(self, wx.HORIZONTAL) - self._label = wx.StaticText(parent, -1, ' %s '%label) - self.Add(self._label, 1, wx.ALIGN_CENTER_VERTICAL) - self._chooser = wx.Choice(parent, -1, choices=[c[0] for c in choices], size=size) - def _on_chooser_event(event): - controller[control_key] = choices[self._chooser.GetSelection()][1] - self._chooser.Bind(wx.EVT_CHOICE, _on_chooser_event) - self.Add(self._chooser, 0, wx.ALIGN_CENTER_VERTICAL) - def _on_controller_set(value): - #only set the chooser if the value is a possible choice - for i, choice in enumerate(choices): - if value == choice[1]: self._chooser.SetSelection(i) - controller.subscribe(control_key, _on_controller_set) + def handle_msg(self, msg): + if self._arg1_key: self._controller[self._arg1_key] = msg.arg1() + if self._arg2_key: self._controller[self._arg2_key] = msg.arg2() + self._controller[self._msg_key] = msg.to_string() - def Disable(self, disable=True): self.Enable(not disable) - def Enable(self, enable=True): - if enable: - self._chooser.Enable() - self._label.Enable() - else: - self._chooser.Disable() - self._label.Disable() ################################################## # Shared Functions ################################################## +import numpy +import math + def get_exp(num): """ Get the exponent of the number in base 10. @@ -209,8 +164,7 @@ def get_clean_num(num): @return the closest number """ if num == 0: return 0 - if num > 0: sign = 1 - else: sign = -1 + sign = num > 0 and 1 or -1 exp = get_exp(num) nums = numpy.array((1, 2, 5, 10))*(10**exp) return sign*nums[numpy.argmin(numpy.abs(nums - abs(num)))] @@ -259,53 +213,26 @@ def get_min_max(samples): """ scale_factor = 3 mean = numpy.average(samples) - rms = scale_factor*((numpy.sum((samples-mean)**2)/len(samples))**.5) - min = mean - rms - max = mean + rms - return min, max + rms = numpy.max([scale_factor*((numpy.sum((samples-mean)**2)/len(samples))**.5), .1]) + min_val = mean - rms + max_val = mean + rms + return min_val, max_val -def get_si_components(num): - """ - Get the SI units for the number. - Extract the coeff and exponent of the number. - The exponent will be a multiple of 3. - @param num the floating point number - @return the tuple coeff, exp, prefix +def get_min_max_fft(fft_samps): """ - exp = get_exp(num) - exp -= exp%3 - exp = min(max(exp, -24), 24) #bounds on SI table below - prefix = { - 24: 'Y', 21: 'Z', - 18: 'E', 15: 'P', - 12: 'T', 9: 'G', - 6: 'M', 3: 'K', - 0: '', - -3: 'm', -6: 'u', - -9: 'n', -12: 'p', - -15: 'f', -18: 'a', - -21: 'z', -24: 'y', - }[exp] - coeff = num/10**exp - return coeff, exp, prefix - -def label_format(num): - """ - Format a floating point number into a presentable string. - If the number has an small enough exponent, use regular decimal. - Otherwise, format the number with floating point notation. - Exponents are normalized to multiples of 3. - In the case where the exponent was found to be -3, - it is best to display this as a regular decimal, with a 0 to the left. - @param num the number to format - @return a label string + Get the minimum and maximum bounds for an array of fft samples. + @param samples the array of real values + @return a tuple of min, max """ - coeff, exp, prefix = get_si_components(num) - if -3 <= exp < 3: return '%g'%num - return '%se%d'%('%.3g'%coeff, exp) - -if __name__ == '__main__': - import random - for i in range(-25, 25): - num = random.random()*10**i - print num, ':', get_si_components(num) + #get the peak level (max of the samples) + peak_level = numpy.max(fft_samps) + #separate noise samples + noise_samps = numpy.sort(fft_samps)[:len(fft_samps)/2] + #get the noise floor + noise_floor = numpy.average(noise_samps) + #get the noise deviation + noise_dev = numpy.std(noise_samps) + #determine the maximum and minimum levels + max_level = peak_level + min_level = noise_floor - abs(2*noise_dev) + return min_level, max_level