import math
class _logpwrfft_base(gr.hier_block2):
- """!
+ """
Create a log10(abs(fft)) stream chain, with real or complex input.
"""
def __init__(self, sample_rate, fft_size, ref_scale, frame_rate, avg_alpha, average):
- """!
+ """
Create an log10(abs(fft)) stream chain.
Provide access to the setting the filter and sample rate.
@param sample_rate Incoming stream sample rate
self.set_average(average)
def set_decimation(self, decim):
- """!
+ """
Set the decimation on stream decimator.
@param decim the new decimation
"""
self._sd.set_decimation(decim)
def set_vec_rate(self, vec_rate):
- """!
+ """
Set the vector rate on stream decimator.
@param vec_rate the new vector rate
"""
self._sd.set_vec_rate(vec_rate)
def set_sample_rate(self, sample_rate):
- """!
+ """
Set the new sampling rate
@param sample_rate the new rate
"""
self._sd.set_sample_rate(sample_rate)
def set_average(self, average):
- """!
+ """
Set the averaging filter on/off.
@param average true to set averaging on
"""
self._avg.set_taps(1.0)
def set_avg_alpha(self, avg_alpha):
- """!
+ """
Set the average alpha and set the taps if average was on.
@param avg_alpha the new iir filter tap
"""
self.set_average(self._average)
def sample_rate(self):
- """!
+ """
Return the current sample rate.
"""
return self._sd.sample_rate()
def decimation(self):
- """!
+ """
Return the current decimation.
"""
return self._sd.decimation()
def frame_rate(self):
- """!
+ """
Return the current frame rate.
"""
return self._sd.frame_rate()
def average(self):
- """!
+ """
Return whether or not averaging is being performed.
"""
return self._average
def avg_alpha(self):
- """!
+ """
Return averaging filter constant.
"""
return self._avg_alpha
class logpwrfft_f(_logpwrfft_base):
- """!
+ """
Create an fft block chain, with real input.
"""
_name = "logpwrfft_f"
_fft_block = (gr.fft_vfc, )
class logpwrfft_c(_logpwrfft_base):
- """!
+ """
Create an fft block chain, with complex input.
"""
_name = "logpwrfft_c"
from gnuradio import gr
class stream_to_vector_decimator(gr.hier_block2):
- """!
+ """
Convert the stream to a vector, decimate the vector stream to achieve the vector rate.
"""
def __init__(self, item_size, sample_rate, vec_rate, vec_len):
- """!
+ """
Create the block chain.
@param item_size the number of bytes per sample
@param sample_rate the rate of incoming samples
self.connect(self, s2v, self.one_in_n, self)
def set_sample_rate(self, sample_rate):
- """!
+ """
Set the new sampling rate and update the decimator.
@param sample_rate the new rate
"""
self._update_decimator()
def set_vec_rate(self, vec_rate):
- """!
+ """
Set the new vector rate and update the decimator.
@param vec_rate the new rate
"""
self._update_decimator()
def set_decimation(self, decim):
- """!
+ """
Set the decimation parameter directly.
@param decim the new decimation
"""
self.one_in_n.set_n(self._decim)
def decimation(self):
- """!
+ """
Returns the actual decimation.
"""
return self._decim
def sample_rate(self):
- """!
+ """
Returns configured sample rate.
"""
return self._sample_rate
def frame_rate(self):
- """!
+ """
Returns actual frame rate
"""
return self._sample_rate/self._vec_len/self._decim
# Input Watcher Thread
##################################################
class input_watcher(threading.Thread):
- """!
+ """
Input watcher thread runs forever.
Read messages from the message queue.
Forward messages to the message handler.
# WX Shared Classes
##################################################
class LabelText(wx.StaticText):
- """!
+ """
Label text to give the wx plots a uniform look.
Get the default label text and set the font bold.
"""
self.SetFont(font)
class IncrDecrButtons(wx.BoxSizer):
- """!
+ """
A horizontal box sizer with a increment and a decrement button.
"""
def __init__(self, parent, on_incr, on_decr):
- """!
+ """
@param parent the parent window
@param on_incr the event handler for increment
@param on_decr the event handler for decrement
self._controller[self._control_key] = bool(e.IsChecked())
class LogSliderController(wx.BoxSizer):
- """!
+ """
Log slider controller with display label and slider.
Gives logarithmic scaling to slider operation.
"""
self._label.Disable()
class DropDownController(wx.BoxSizer):
- """!
+ """
Drop down controller with label and chooser.
Srop down selection from a set of choices.
"""
def __init__(self, parent, label, choices, controller, control_key, size=(-1, -1)):
- """!
+ """
@param parent the parent window
@param label the label for the drop down
@param choices a list of tuples -> (label, value)
# Shared Functions
##################################################
def get_exp(num):
- """!
+ """
Get the exponent of the number in base 10.
@param num the floating point number
@return the exponent as an integer
return int(math.floor(math.log10(abs(num))))
def get_clean_num(num):
- """!
+ """
Get the closest clean number match to num with bases 1, 2, 5.
@param num the number
@return the closest number
return sign*nums[numpy.argmin(numpy.abs(nums - abs(num)))]
def get_clean_incr(num):
- """!
+ """
Get the next higher clean number with bases 1, 2, 5.
@param num the number
@return the next higher number
}[coeff]*(10**exp)
def get_clean_decr(num):
- """!
+ """
Get the next lower clean number with bases 1, 2, 5.
@param num the number
@return the next lower number
}[coeff]*(10**exp)
def get_min_max(samples):
- """!
+ """
Get the minimum and maximum bounds for an array of samples.
@param samples the array of real values
@return a tuple of min, max
return min, max
def get_si_components(num):
- """!
+ """
Get the SI units for the number.
Extract the coeff and exponent of the number.
The exponent will be a multiple of 3.
return coeff, exp, prefix
def label_format(num):
- """!
+ """
Format a floating point number into a presentable string.
If the number has an small enough exponent, use regular decimal.
Otherwise, format the number with floating point notation.
# Constellation window control panel
##################################################
class control_panel(wx.Panel):
- """!
+ """
A control panel with wx widgits to control the plotter.
"""
def __init__(self, parent):
- """!
+ """
Create a new control panel.
@param parent the wx parent window
"""
self.update_grid()
def handle_msg(self, msg):
- """!
+ """
Plot the samples onto the complex grid.
@param msg the array of complex samples
"""
# Constellation sink block (wrapper for old wxgui)
##################################################
class const_sink_c(gr.hier_block2, common.prop_setter):
- """!
+ """
A constellation block with a gui window.
"""
# FFT window control panel
##################################################
class control_panel(wx.Panel):
- """!
+ """
A control panel with wx widgits to control the plotter and fft block chain.
"""
def __init__(self, parent):
- """!
+ """
Create a new control panel.
@param parent the wx parent window
"""
self.update_grid()
def autoscale(self, *args):
- """!
+ """
Autoscale the fft plot to the last frame.
Set the dynamic range and reference level.
"""
def _reset_peak_vals(self): self.peak_vals = NO_PEAK_VALS
def handle_msg(self, msg):
- """!
+ """
Handle the message from the fft sink message queue.
If complex, reorder the fft samples so the negative bins come first.
If real, keep take only the positive bins.
self.plotter.update()
def update_grid(self, *args):
- """!
+ """
Update the plotter grid.
This update method is dependent on the variables below.
Determine the x and y axis grid parameters.
# FFT sink block (wrapper for old wxgui)
##################################################
class _fft_sink_base(gr.hier_block2, common.prop_setter):
- """!
+ """
An fft block with real/complex inputs and a gui window.
"""
self.update()
def update(self):
- """!
+ """
Read the state of the fft plot settings and update the control panel.
"""
#update checkboxes
# Number window control panel
##################################################
class control_panel(wx.Panel):
- """!
+ """
A control panel with wx widgits to control the averaging.
"""
def __init__(self, parent):
- """!
+ """
Create a new control panel.
@param parent the wx parent window
"""
self.ext_controller.subscribe(msg_key, self.handle_msg)
def show_gauges(self, show_gauge):
- """!
+ """
Show or hide the gauges.
If this is real, never show the imaginary gauge.
@param show_gauge true to show
else: self.gauge_imag.Hide()
def handle_msg(self, msg):
- """!
+ """
Handle a message from the message queue.
Convert the string based message into a float or complex.
If more than one number was read, only take the last number.
# Number sink block (wrapper for old wxgui)
##################################################
class _number_sink_base(gr.hier_block2, common.prop_setter):
- """!
+ """
An decimator block with a number window display
"""
class channel_plotter(grid_plotter_base):
def __init__(self, parent):
- """!
+ """
Create a new channel plotter.
"""
#init
self.enable_legend(False)
def _gl_init(self):
- """!
+ """
Run gl initialization tasks.
"""
glEnableClientState(GL_VERTEX_ARRAY)
self._grid_compiled_list_id = glGenLists(1)
def enable_legend(self, enable=None):
- """!
+ """
Enable/disable the legend.
@param enable true to enable
@return the enable state when None
self.unlock()
def draw(self):
- """!
+ """
Draw the grid and waveforms.
"""
self.lock()
self.unlock()
def _draw_waveforms(self):
- """!
+ """
Draw the waveforms for each channel.
Scale the waveform data to the grid using gl matrix operations.
"""
glPopMatrix()
def _populate_point_label(self, x_val, y_val):
- """!
+ """
Get the text the will populate the point label.
Give X and Y values for the current point.
Give values for the channel at the X coordinate.
return label_str
def _draw_legend(self):
- """!
+ """
Draw the legend in the upper right corner.
For each channel, draw a rectangle out of the channel color,
and overlay the channel text on top of the rectangle.
x_off -= w + 4*LEGEND_BOX_PADDING
def set_waveform(self, channel, samples, color_spec, marker=None):
- """!
+ """
Set the waveform for a given channel.
@param channel the channel key
@param samples the waveform samples
# OpenGL WX Plotter Canvas
##################################################
class _plotter_base(wx.glcanvas.GLCanvas):
- """!
+ """
Plotter base class for all plot types.
"""
def __init__(self, parent):
- """!
+ """
Create a new plotter base.
Initialize the GLCanvas with double buffering.
Initialize various plotter flags.
def unlock(self): self._global_lock.release()
def _on_size(self, event):
- """!
+ """
Flag the resize event.
The paint event will handle the actual resizing.
"""
self._resized_flag = True
def _on_paint(self, event):
- """!
+ """
Respond to paint events, call update.
Initialize GL if this is the first paint event.
"""
self.draw()
def update(self):
- """!
+ """
Force a paint event.
Record the timestamp.
"""
def clear(self): glClear(GL_COLOR_BUFFER_BIT)
def changed(self, state=None):
- """!
+ """
Set the changed flag if state is not None.
Otherwise return the changed flag.
"""
self.Bind(wx.EVT_LEAVE_WINDOW, self._on_leave_window)
def _on_motion(self, event):
- """!
+ """
Mouse motion, record the position X, Y.
"""
self.lock()
self.unlock()
def _on_leave_window(self, event):
- """!
+ """
Mouse leave window, set the position to None.
"""
self.lock()
self.unlock()
def enable_point_label(self, enable=None):
- """!
+ """
Enable/disable the point label.
@param enable true to enable
@return the enable state when None
self.unlock()
def set_title(self, title):
- """!
+ """
Set the title.
@param title the title string
"""
self.unlock()
def set_x_label(self, x_label, x_units=''):
- """!
+ """
Set the x label and units.
@param x_label the x label string
@param x_units the x units string
self.unlock()
def set_y_label(self, y_label, y_units=''):
- """!
+ """
Set the y label and units.
@param y_label the y label string
@param y_units the y units string
self.unlock()
def set_x_grid(self, x_min, x_max, x_step, x_scalar=1.0):
- """!
+ """
Set the x grid parameters.
@param x_min the left-most value
@param x_max the right-most value
self.unlock()
def set_y_grid(self, y_min, y_max, y_step, y_scalar=1.0):
- """!
+ """
Set the y grid parameters.
@param y_min the bottom-most value
@param y_max the top-most value
self.unlock()
def _draw_grid(self):
- """!
+ """
Draw the border, grid, title, and units.
"""
##################################################
)
def _get_tick_label(self, tick):
- """!
+ """
Format the tick value and create a gl text.
@param tick the floating point tick value
@return the tick label text
return gltext.Text(tick_str, font_size=TICK_TEXT_FONT_SIZE)
def _get_ticks(self, min, max, step, scalar):
- """!
+ """
Determine the positions for the ticks.
@param min the lower bound
@param max the upper bound
return [i*step*scalar for i in range(start, stop+1)]
def _draw_line(self, coor1, coor2):
- """!
+ """
Draw a line from coor1 to coor2.
@param corr1 a tuple of x, y, z
@param corr2 a tuple of x, y, z
glEnd()
def _draw_rect(self, x, y, width, height, fill=True):
- """!
+ """
Draw a rectangle on the x, y plane.
X and Y are the top-left corner.
@param x the left position of the rectangle
glEnd()
def _draw_point_label(self):
- """!
+ """
Draw the point label for the last mouse motion coordinate.
The mouse coordinate must be an X, Y tuple.
The label will be drawn at the X, Y coordinate.
ceil_log2 = lambda x: 2**int(math.ceil(math.log(x)/math.log(2)))
def _get_rbga(red_pts, green_pts, blue_pts, alpha_pts=[(0, 0), (1, 0)]):
- """!
+ """
Get an array of 256 rgba values where each index maps to a color.
The scaling for red, green, blue, alpha are specified in piece-wise functions.
The piece-wise functions consist of a set of x, y coordinates.
##################################################
class waterfall_plotter(grid_plotter_base):
def __init__(self, parent):
- """!
+ """
Create a new channel plotter.
"""
#init
self.set_color_mode(COLORS.keys()[0])
def _gl_init(self):
- """!
+ """
Run gl initialization tasks.
"""
self._grid_compiled_list_id = glGenLists(1)
self._waterfall_texture = glGenTextures(1)
def draw(self):
- """!
+ """
Draw the grid and waveforms.
"""
self.lock()
self.unlock()
def _draw_waterfall(self):
- """!
+ """
Draw the waterfall from the texture.
The texture is circularly filled and will wrap around.
Use matrix modeling to shift and scale the texture onto the coordinate plane.
glDisable(GL_TEXTURE_2D)
def _populate_point_label(self, x_val, y_val):
- """!
+ """
Get the text the will populate the point label.
Give the X value for the current point.
@param x_val the current x value
return '%s: %s %s'%(self.x_label, common.label_format(x_val), self.x_units)
def _draw_legend(self):
- """!
+ """
Draw the color scale legend.
"""
if not self._color_mode: return
txt.draw_text(wx.Point(x, y))
def _resize_texture(self, flag=None):
- """!
+ """
Create the texture to fit the fft_size X num_lines.
@param flag the set/unset or update flag
"""
self._resize_texture_flag = False
def set_color_mode(self, color_mode):
- """!
+ """
Set the color mode.
New samples will be converted to the new color mode.
Old samples will not be recolorized.
self.unlock()
def set_num_lines(self, num_lines):
- """!
+ """
Set number of lines.
Powers of two only.
@param num_lines the new number of lines
self.unlock()
def set_samples(self, samples, minimum, maximum):
- """!
+ """
Set the samples to the waterfall.
Convert the samples to color data.
@param samples the array of floats
# Boston, MA 02110-1301, USA.
#
-"""!
+"""
Abstract GNU Radio publisher/subscriber interface
This is a proof of concept implementation, will likely change significantly.
# Scope window control panel
##################################################
class control_panel(wx.Panel):
- """!
+ """
A control panel with wx widgits to control the plotter and scope block.
"""
def __init__(self, parent):
- """!
+ """
Create a new control panel.
@param parent the wx parent window
"""
#self.update_grid()
def handle_msg(self, msg):
- """!
+ """
Handle the message from the scope sink message queue.
Plot the list of arrays of samples onto the grid.
Each samples array gets its own channel.
self.frame_rate_ts = time.time()
def handle_samples(self):
- """!
+ """
Handle the cached samples from the scope input.
Perform ac coupling, triggering, and auto ranging.
"""
self.plotter.update()
def update_grid(self, *args):
- """!
+ """
Update the grid to reflect the current settings:
xy divisions, xy offset, xy mode setting
"""
# Scope sink block (wrapper for old wxgui)
##################################################
class _scope_sink_base(gr.hier_block2, common.prop_setter):
- """!
+ """
A scope block with a gui window.
"""
# Waterfall window control panel
##################################################
class control_panel(wx.Panel):
- """!
+ """
A control panel with wx widgits to control the plotter and fft block chain.
"""
def __init__(self, parent):
- """!
+ """
Create a new control panel.
@param parent the wx parent window
"""
self.update_grid()
def autoscale(self, *args):
- """!
+ """
Autoscale the waterfall plot to the last frame.
Set the dynamic range and reference level.
Does not affect the current data in the waterfall.
self.set_dynamic_range(peak_level - noise_floor)
def handle_msg(self, msg):
- """!
+ """
Handle the message from the fft sink message queue.
If complex, reorder the fft samples so the negative bins come first.
If real, keep take only the positive bins.
self.plotter.update()
def update_grid(self, *args):
- """!
+ """
Update the plotter grid.
This update method is dependent on the variables below.
Determine the x and y axis grid parameters.
# Waterfall sink block (wrapper for old wxgui)
##################################################
class _waterfall_sink_base(gr.hier_block2, common.prop_setter):
- """!
+ """
An fft block with real/complex inputs and a gui window.
"""