4 from gnuradio import gr, gr_unittest
6 import pygsl.wavelet as wavelet
21 class qa_classify(gr_unittest.TestCase):
24 self.tb = gr.top_block()
29 # def test_000_(self):
30 # src_data = numpy.zeros(10)
31 # trg_data = numpy.zeros(10)
32 # src = gr.vector_source_f(src_data)
33 # dst = gr.vector_sink_f()
34 # self.tb.connect(src, dst)
36 # rsl_data = dst.data()
38 # for (u,v) in zip(trg_data, rsl_data):
41 # sum /= float(len(trg_data))
45 src_data = numpy.array([-1.0, 1.0, -1.0, 1.0])
46 trg_data = src_data * 0.5
47 src = gr.vector_source_f(src_data)
48 dst = gr.vector_sink_f()
49 rail = gr.rail_ff(-0.5, 0.5)
50 self.tb.connect(src, rail)
51 self.tb.connect(rail, dst)
55 for (u, v) in zip(trg_data, rsl_data):
58 sum /= float(len(trg_data))
62 src_data = numpy.array([-1.0,
67 trg_data = copy.deepcopy(src_data)
69 src = gr.vector_source_f(src_data, False, len(src_data))
70 st = gr.stretch_ff(-1.0/5.0, len(src_data))
71 dst = gr.vector_sink_f(len(src_data))
72 self.tb.connect(src, st)
73 self.tb.connect(st, dst)
77 for (u, v) in zip(trg_data, rsl_data):
80 sum /= float(len(trg_data))
84 src_grid = (0.0, 1.0, 2.0, 3.0, 4.0)
85 trg_grid = copy.deepcopy(src_grid)
86 src_data = (0.0, 1.0, 0.0, 1.0, 0.0)
88 src = gr.vector_source_f(src_data, False, len(src_grid))
89 sq = gr.squash_ff(src_grid, trg_grid)
90 dst = gr.vector_sink_f(len(trg_grid))
91 self.tb.connect(src, sq)
92 self.tb.connect(sq, dst)
96 for (u, v) in zip(src_data, rsl_data):
99 sum /= float(len(src_data))
106 ws = wavelet.workspace(n)
107 w = wavelet.daubechies(o)
110 b = numpy.sin(a*numpy.pi/16.0)
111 c = w.transform_forward(b, ws)
112 d = w.transform_inverse(c, ws)
114 src = gr.vector_source_f(b, False, n)
115 wv = gr.wavelet_ff(n, o, True)
117 dst = gr.vector_sink_f(n)
118 self.tb.connect(src, wv)
119 self.tb.connect(wv, dst)
124 for (u, v) in zip(c, e):
132 src_data = (1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0)
134 dwav = numpy.array(src_data)
135 wvps = numpy.zeros(3)
136 # wavelet power spectrum
137 scl = 1.0/sqr(dwav[0])
139 for e in range(len(wvps)):
140 wvps[e] = scl*sqr(dwav[k:k+(01<<e)]).sum()
143 src = gr.vector_source_f(src_data, False, len(src_data))
144 kon = gr.wvps_ff(len(src_data))
145 dst = gr.vector_sink_f(int(math.ceil(math.log(len(src_data), 2))))
147 self.tb.connect(src, kon)
148 self.tb.connect(kon, dst)
151 snk_data = dst.data()
154 for (u,v) in zip(snk_data, wvps):
157 sum /= float(len(snk_data))
160 if __name__ == '__main__':