2ee6463847b8f8b5b772ef6ba5c65ee6b1f6979c
[debian/gnuradio] / usrp / limbo / inband / gen_test_packets.py
1 #!/usr/bin/env python
2
3 import random
4 import struct
5 from pprint import pprint
6 from usb_packet import *
7
8 MAX_PAYLOAD = 504
9 TIME_NOW = 0xffffffff
10
11
12 class sequence_generator(object):
13     def __init__(self):
14         self.i = 0
15     
16     def __call__(self):
17         t = self.i
18         self.i += 1
19         return t
20
21 def gen_shuffled_lengths():
22     valid_lengths = range(0, MAX_PAYLOAD+1, 4)  # [0, 4, 8, ... 504]
23     random.shuffle(valid_lengths)
24     return valid_lengths
25
26
27 class packet_sequence_generator(object):
28     def __init__(self, channel, lengths):
29         self.next = sequence_generator()
30         self.channel = channel
31         self.lengths = lengths
32
33     def __call__(self, output_file):
34         gen_packet(output_file, self.channel, self.next, self.lengths[0])
35         del self.lengths[0]
36
37
38 def gen_packet(output_file, channel, content_generator, payload_len):
39     assert (payload_len % 4) == 0
40     payload = []
41     n_iq = payload_len // 4
42     for n in range(n_iq):
43         payload.append(content_generator())  # I
44         payload.append(content_generator())  # Q
45     for n in range(MAX_PAYLOAD // 4 - n_iq):
46         payload.append(0x0000)
47         payload.append(0xffff)
48
49     assert (len(payload) == MAX_PAYLOAD // 2)
50
51     #print "\npayload_len =", payload_len
52     #pprint(payload)
53
54     output_file.write(make_header(FL_START_OF_BURST|FL_END_OF_BURST,
55                                   channel, payload_len, TIME_NOW))
56     output_file.write(struct.pack('<252h', *payload))
57
58
59 def gen_all_valid_packet_lengths_1_channel(output_file):
60     lengths = gen_shuffled_lengths()
61     npkts = len(lengths)                # number of packets we'll generator on each stream
62     pkt_gen_0 = packet_sequence_generator(0, lengths)
63     for i in range(npkts):
64         pkt_gen_0(output_file)
65     
66     assert pkt_gen_0.next() == 16002    # 2*sum(1, 2, ..., 126) == 126 * 127
67
68
69 def gen_all_valid_packet_lengths_2_channels(output_file):
70     lengths = gen_shuffled_lengths()
71     npkts = len(lengths)                # number of packets we'll generator on each stream
72     pkt_gen_0 = packet_sequence_generator(0, lengths)
73     pkt_gen_1 = packet_sequence_generator(0x1f, gen_shuffled_lengths())
74     pkt_gen = (pkt_gen_0, pkt_gen_1)
75     
76     which_gen = (npkts * [0]) + (npkts * [1])
77     random.shuffle(which_gen)
78     
79     for i in which_gen:
80         pkt_gen[i](output_file)
81     
82     assert pkt_gen_0.next() == 16002    # 2*sum(1, 2, ..., 126) == 126 * 127
83     assert pkt_gen_1.next() == 16002    # 2*sum(1, 2, ..., 126) == 126 * 127
84
85 if __name__ == '__main__':
86     random.seed(0)
87     gen_all_valid_packet_lengths_1_channel(open("all_valid_packet_lengths_1_channel.dat", "w"))
88     gen_all_valid_packet_lengths_2_channels(open("all_valid_packet_lengths_2_channels.dat", "w"))