Merge branch 'dfsg-orig'
[debian/gnuradio] / gr-wxgui / src / python / common.py
index d555a1f055a9c3b824e2b71cb0c793c0ec562fcb..17a7dc0de550fa8ae5fa32da1f938d3f5fd7edd8 100644 (file)
@@ -1,5 +1,5 @@
 #
-# Copyright 2008 Free Software Foundation, Inc.
+# Copyright 2008, 2009 Free Software Foundation, Inc.
 #
 # This file is part of GNU Radio
 #
 # Boston, MA 02110-1301, USA.
 #
 
+##################################################
+# conditional disconnections of wx flow graph
+##################################################
+import wx
+from gnuradio import gr
+
+class wxgui_hb(object):
+       """
+       The wxgui hier block helper/wrapper class:
+       A hier block should inherit from this class to make use of the wxgui connect method.
+       To use, call wxgui_connect in place of regular connect; self.win must be defined.
+       The implementation will conditionally enable the copy block after the source (self).
+       This condition depends on weather or not the window is visible with the parent notebooks.
+       This condition will be re-checked on every ui update event.
+       """
+
+       def wxgui_connect(self, *points):
+               """
+               Use wxgui connect when the first point is the self source of the hb.
+               The win property of this object should be set to the wx window.
+               When this method tries to connect self to the next point,
+               it will conditionally make this connection based on the visibility state.
+               All other points will be connected normally.
+               """
+               try:
+                       assert points[0] == self or points[0][0] == self
+                       copy = gr.copy(self._hb.input_signature().sizeof_stream_item(0))
+                       handler = self._handler_factory(copy.set_enabled)
+                       handler(False) #initially disable the copy block
+                       self._bind_to_visible_event(win=self.win, handler=handler)
+                       points = list(points)
+                       points.insert(1, copy) #insert the copy block into the chain
+               except (AssertionError, IndexError): pass
+               self.connect(*points) #actually connect the blocks
+
+       @staticmethod
+       def _handler_factory(handler):
+               """
+               Create a function that will cache the visibility flag,
+               and only call the handler when that flag changes.
+               @param handler the function to call on a change
+               @return a function of 1 argument
+               """
+               cache = [None]
+               def callback(visible):
+                       if cache[0] == visible: return
+                       cache[0] = visible
+                       #print visible, handler
+                       handler(visible)
+               return callback
+
+       @staticmethod
+       def _bind_to_visible_event(win, handler):
+               """
+               Bind a handler to a window when its visibility changes.
+               Specifically, call the handler when the window visibility changes.
+               This condition is checked on every update ui event.
+               @param win the wx window
+               @param handler a function of 1 param
+               """
+               #is the window visible in the hierarchy
+               def is_wx_window_visible(my_win):
+                       while True:
+                               parent = my_win.GetParent()
+                               if not parent: return True #reached the top of the hierarchy
+                               #if we are hidden, then finish, otherwise keep traversing up
+                               if isinstance(parent, wx.Notebook) and parent.GetCurrentPage() != my_win: return False
+                               my_win = parent
+               #call the handler, the arg is shown or not
+               def handler_factory(my_win, my_handler):
+                       def callback(evt):
+                               my_handler(is_wx_window_visible(my_win))
+                               evt.Skip() #skip so all bound handlers are called
+                       return callback
+               handler = handler_factory(win, handler)
+               #bind the handler to all the parent notebooks
+               win.Bind(wx.EVT_UPDATE_UI, handler)
+
+##################################################
+# Helpful Functions
+##################################################
+
 #A macro to apply an index to a key
 index_key = lambda key, i: "%s_%d"%(key, i+1)
 
@@ -134,9 +216,29 @@ def get_min_max(samples):
        @param samples the array of real values
        @return a tuple of min, max
        """
-       scale_factor = 3
+       factor = 2.0
        mean = numpy.average(samples)
-       rms = numpy.max([scale_factor*((numpy.sum((samples-mean)**2)/len(samples))**.5), .1])
-       min = mean - rms
-       max = mean + rms
-       return min, max
+       std = numpy.std(samples)
+       fft = numpy.abs(numpy.fft.fft(samples - mean))
+       envelope = 2*numpy.max(fft)/len(samples)
+       ampl = max(std, envelope) or 0.1
+       return mean - factor*ampl, mean + factor*ampl
+
+def get_min_max_fft(fft_samps):
+       """
+       Get the minimum and maximum bounds for an array of fft samples.
+       @param samples the array of real values
+       @return a tuple of min, max
+       """
+       #get the peak level (max of the samples)
+       peak_level = numpy.max(fft_samps)
+       #separate noise samples
+       noise_samps = numpy.sort(fft_samps)[:len(fft_samps)/2]
+       #get the noise floor
+       noise_floor = numpy.average(noise_samps)
+       #get the noise deviation
+       noise_dev = numpy.std(noise_samps)
+       #determine the maximum and minimum levels
+       max_level = peak_level
+       min_level = noise_floor - abs(2*noise_dev)
+       return min_level, max_level