Updated license from GPL version 2 or later to GPL version 3 or later.
[debian/gnuradio] / gr-atsc / src / python / qa_atsc.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2004,2006 Free Software Foundation, Inc.
4
5 # This file is part of GNU Radio
6
7 # GNU Radio is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3, or (at your option)
10 # any later version.
11
12 # GNU Radio is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16
17 # You should have received a copy of the GNU General Public License
18 # along with GNU Radio; see the file COPYING.  If not, write to
19 # the Free Software Foundation, Inc., 51 Franklin Street,
20 # Boston, MA 02110-1301, USA.
21
22
23 from gnuradio import gr, gr_unittest
24 import atsc                    # qa code needs to run without being installed
25 #from gnuradio import atsc
26 from atsc_utils import *
27 import sys
28
29
30 class memoize(object):
31     def __init__(self, thunk):
32         self.thunk = thunk
33         self.cached = False
34         self.value = None
35
36     def __call__(self):
37         if self.cached:
38             return self.value
39         self.value = self.thunk()
40         self.cached = True
41         return self.value
42
43
44 """
45 Make a fake transport stream that's big enough for our purposes.
46 We generate 8 full fields.  This is relatively expensive.  It
47 takes about 2 seconds to execute.  
48 """
49 make_transport_stream = \
50     memoize(lambda : tuple(make_fake_transport_stream_packet(8 * atsc.ATSC_DSEGS_PER_FIELD)))
51
52
53 def pad_transport_stream(src):
54     """
55     An MPEG transport stream packet is 188 bytes long.  Internally we use a packet
56     that is 256 bytes long to help with buffer alignment.  This function adds the
57     appropriate trailing padding to convert each packet from 188 to 256 bytes.
58     """
59     return pad_stream(src, atsc.sizeof_atsc_mpeg_packet, atsc.sizeof_atsc_mpeg_packet_pad)
60
61
62 def depad_transport_stream(src):
63     """
64     An MPEG transport stream packet is 188 bytes long.  Internally we use a packet
65     that is 256 bytes long to help with buffer alignment.  This function removes the
66     trailing padding to convert each packet from 256 back to 188 bytes.
67     """
68     return depad_stream(src, atsc.sizeof_atsc_mpeg_packet, atsc.sizeof_atsc_mpeg_packet_pad)
69
70
71 class vector_source_ts(gr.hier_block):
72     """
73     MPEG Transport stream source for testing.
74     """
75     def __init__(self, fg, ts):
76         """
77         Pad tranport stream packets to 256 bytes and reformat appropriately.
78         
79         @param fg: flow graph
80         @param ts: MPEG transport stream.
81         @type  ts: sequence of ints in [0,255]; len(ts) % 188 == 0
82         """
83         src = gr.vector_source_b(pad_transport_stream(ts))
84         s2v = gr.stream_to_vector(gr.sizeof_char, atsc.sizeof_atsc_mpeg_packet)
85         fg.connect(src, s2v)
86         gr.hier_block.__init__(self, fg, None, s2v)
87
88
89 class vector_sink_ts(gr.hier_block):
90     """
91     MPEG Transport stream sink for testing.
92     """
93     def __init__(self, fg):
94         """
95         @param fg: flow graph
96         """
97         v2s = gr.vector_to_stream(gr.sizeof_char, atsc.sizeof_atsc_mpeg_packet)
98         self.sink = gr.vector_sink_b()
99         fg.connect(v2s, self.sink)
100         gr.hier_block.__init__(self, fg, v2s, None)
101
102     def data(self):
103         """
104         Extracts tranport stream from sink and returns it to python.
105
106         Depads tranport stream packets from 256 back to 188 bytes.
107         @rtype: tuple of ints in [0,255]; len(result) % 188 == 0
108         """
109         return tuple(depad_transport_stream(self.sink.data()))
110
111
112
113 class qa_atsc(gr_unittest.TestCase):
114
115     def setUp(self):
116         self.fg = gr.flow_graph()
117
118     def tearDown(self):
119         self.fg = None
120
121
122     # The tests are run in alphabetical order
123
124     def test_loopback_000(self):
125         """
126         Loopback randomizer to derandomizer
127         """
128         src_data = make_transport_stream()
129         expected_result = src_data
130
131         src = vector_source_ts(self.fg, src_data)
132         rand = atsc.randomizer()
133         derand = atsc.derandomizer()
134         dst = vector_sink_ts(self.fg)
135         self.fg.connect(src, rand, derand, dst)
136         self.fg.run ()
137         result_data = dst.data ()
138         self.assertEqual (expected_result, result_data)
139
140     def test_loopback_001(self):
141         """
142         Loopback randomizer/rs_encoder to rs_decoder/derandomizer
143         """
144         src_data = make_transport_stream()
145         expected_result = src_data
146
147         src = vector_source_ts(self.fg, src_data)
148         rand = atsc.randomizer()
149         rs_enc = atsc.rs_encoder()
150         rs_dec = atsc.rs_decoder()
151         derand = atsc.derandomizer()
152         dst = vector_sink_ts(self.fg)
153         self.fg.connect(src, rand, rs_enc, rs_dec, derand, dst)
154         self.fg.run ()
155         result_data = dst.data ()
156         self.assertEqual (expected_result, result_data)
157
158     def test_loopback_002(self):
159         """
160         Loopback randomizer/rs_encoder/interleaver to
161         deinterleaver/rs_decoder/derandomizer 
162         """
163         src_data = make_transport_stream()
164         interleaver_delay = 52
165         expected_result = src_data[0:len(src_data)-(interleaver_delay*atsc.ATSC_MPEG_PKT_LENGTH)]
166
167         src = vector_source_ts(self.fg, src_data)
168         rand = atsc.randomizer()
169         rs_enc = atsc.rs_encoder()
170         inter = atsc.interleaver()
171         deinter = atsc.deinterleaver()
172         rs_dec = atsc.rs_decoder()
173         derand = atsc.derandomizer()
174         dst = vector_sink_ts(self.fg)
175         self.fg.connect(src, rand, rs_enc, inter, deinter, rs_dec, derand, dst)
176         self.fg.run ()
177         result_data = dst.data ()
178         result_data = result_data[(interleaver_delay*atsc.ATSC_MPEG_PKT_LENGTH):len(result_data)]
179         self.assertEqual (expected_result, result_data)
180
181
182     def test_loopback_003(self):
183         """
184         Loopback randomizer/rs_encoder/interleaver/trellis_encoder
185         via ds_to_softds to
186         viterbi_decoder/deinterleaver/rs_decoder/derandomizer 
187         """
188         src_data = make_transport_stream()
189         interleaver_delay = 52
190         viterbi_delay = 12
191         expected_result = src_data[0:len(src_data)-((interleaver_delay+viterbi_delay)*atsc.ATSC_MPEG_PKT_LENGTH)]
192
193         src = vector_source_ts(self.fg, src_data)
194         rand = atsc.randomizer()
195         rs_enc = atsc.rs_encoder()
196         inter = atsc.interleaver()
197         trellis = atsc.trellis_encoder()
198         softds = atsc.ds_to_softds()
199         viterbi = atsc.viterbi_decoder()
200         deinter = atsc.deinterleaver()
201         rs_dec = atsc.rs_decoder()
202         derand = atsc.derandomizer()
203         dst = vector_sink_ts(self.fg)
204         self.fg.connect(src, rand, rs_enc, inter, trellis, softds, viterbi, deinter, rs_dec, derand, dst)
205         self.fg.run ()
206         result_data = dst.data ()[((interleaver_delay+viterbi_delay)*atsc.ATSC_MPEG_PKT_LENGTH):len(dst.data())]
207         self.assertEqual (expected_result, result_data)
208
209         
210 if __name__ == '__main__':
211     gr_unittest.main()
212
213
214
215
216
217