3 from gnuradio import gr, gr_unittest
5 class test_simple_flowgraph(gr_unittest.TestCase):
13 def test_001_define_component(self):
14 sfg = gr.simple_flowgraph()
15 sfg.define_component("src", gr.null_source(gr.sizeof_int))
16 sfg.define_component("dst", gr.null_sink(gr.sizeof_int))
18 def test_002_define_component_name_in_use(self):
19 sfg = gr.simple_flowgraph()
20 sfg.define_component("src", gr.null_source(gr.sizeof_int))
21 self.assertRaises(ValueError,
22 lambda: sfg.define_component("src", gr.null_sink(gr.sizeof_int)))
24 def test_003_connect(self):
25 sfg = gr.simple_flowgraph()
26 sfg.define_component("src", gr.null_source(gr.sizeof_int))
27 sfg.define_component("dst", gr.null_sink(gr.sizeof_int))
28 sfg.connect("src", 0, "dst", 0)
30 def test_004connect_unknown_src(self):
31 sfg = gr.simple_flowgraph()
32 sfg.define_component("dst", gr.null_sink(gr.sizeof_int))
33 self.assertRaises(ValueError,
34 lambda: sfg.connect("src", 0, "dst", 0))
36 def test_005_connect_unknown_dst(self):
37 sfg = gr.simple_flowgraph()
38 sfg.define_component("src", gr.null_source(gr.sizeof_int))
39 self.assertRaises(ValueError,
40 lambda: sfg.connect("src", 0, "dst", 0))
42 def test_006_connect_invalid_src_port_neg(self):
43 sfg = gr.simple_flowgraph()
44 sfg.define_component("src", gr.null_source(gr.sizeof_int))
45 sfg.define_component("dst", gr.null_sink(gr.sizeof_int))
46 self.assertRaises(ValueError,
47 lambda: sfg.connect("src", -1, "dst", 0))
49 def test_007_connect_invalid_src_port_exceeds(self):
50 sfg = gr.simple_flowgraph()
51 sfg.define_component("src", gr.null_source(gr.sizeof_int))
52 sfg.define_component("dst", gr.null_sink(gr.sizeof_int))
53 self.assertRaises(ValueError,
54 lambda: sfg.connect("src", 1, "dst", 0))
56 def test_008_connect_invalid_dst_port_neg(self):
57 sfg = gr.simple_flowgraph()
58 sfg.define_component("src", gr.null_source(gr.sizeof_int))
59 sfg.define_component("dst", gr.null_sink(gr.sizeof_int))
60 self.assertRaises(ValueError,
61 lambda: sfg.connect("src", 0, "dst", -1))
63 def test_009_connect_invalid_dst_port_exceeds(self):
64 sfg = gr.simple_flowgraph()
65 sfg.define_component("src", gr.null_source(gr.sizeof_int))
66 sfg.define_component("dst", gr.null_sink(gr.sizeof_int))
67 self.assertRaises(ValueError,
68 lambda: sfg.connect("src", 0, "dst", 1))
70 def test_010_connect_invalid_dst_port_in_use(self):
71 sfg = gr.simple_flowgraph()
72 sfg.define_component("src1", gr.null_source(gr.sizeof_int))
73 sfg.define_component("src2", gr.null_source(gr.sizeof_int))
74 sfg.define_component("dst", gr.null_sink(gr.sizeof_int))
75 sfg.connect("src1", 0, "dst", 0)
76 self.assertRaises(ValueError,
77 lambda: sfg.connect("src2", 0, "dst", 0))
79 def test_011_connect_one_src_two_dst(self):
80 sfg = gr.simple_flowgraph()
81 sfg.define_component("src", gr.null_source(gr.sizeof_int))
82 sfg.define_component("dst1", gr.null_sink(gr.sizeof_int))
83 sfg.define_component("dst2", gr.null_sink(gr.sizeof_int))
84 sfg.connect("src", 0, "dst1", 0)
85 sfg.connect("src", 0, "dst2", 0)
87 def test_012_connect_type_mismatch(self):
88 sfg = gr.simple_flowgraph()
89 sfg.define_component("src", gr.null_source(gr.sizeof_int))
90 sfg.define_component("dst", gr.null_sink(gr.sizeof_char))
91 self.assertRaises(ValueError,
92 lambda: sfg.connect("src", 0, "dst", 0))
94 def test_013_validate(self):
95 sfg = gr.simple_flowgraph()
96 sfg.define_component("src", gr.null_source(gr.sizeof_int))
97 sfg.define_component("dst1", gr.null_sink(gr.sizeof_int))
98 sfg.define_component("dst2", gr.null_sink(gr.sizeof_int))
99 sfg.connect("src", 0, "dst1", 0)
100 sfg.connect("src", 0, "dst2", 0)
103 def test_014_validate(self):
104 sfg = gr.simple_flowgraph()
105 sfg.define_component("src1", gr.null_source (gr.sizeof_int))
106 sfg.define_component("nop1", gr.nop (gr.sizeof_int))
107 sfg.define_component("dst1", gr.null_sink (gr.sizeof_int))
108 sfg.define_component("dst2", gr.null_sink (gr.sizeof_int))
109 sfg.connect("src1", 0, "nop1", 0)
110 sfg.connect("src1", 0, "nop1", 1)
111 sfg.connect("nop1", 0, "dst1", 0)
112 sfg.connect("nop1", 1, "dst2", 0)
115 def test_015_validate(self):
116 sfg = gr.simple_flowgraph()
117 sfg.define_component("src1", gr.null_source (gr.sizeof_int))
118 sfg.define_component("nop1", gr.nop (gr.sizeof_int))
119 sfg.define_component("dst1", gr.null_sink (gr.sizeof_int))
120 sfg.define_component("dst2", gr.null_sink (gr.sizeof_int))
121 sfg.connect("src1", 0, "nop1", 0)
122 sfg.connect("src1", 0, "nop1", 2)
123 sfg.connect("nop1", 0, "dst1", 0)
124 sfg.connect("nop1", 1, "dst2", 0)
125 self.assertRaises(RuntimeError,
126 lambda: sfg.validate ())
128 def test_016_validate(self):
129 sfg = gr.simple_flowgraph()
130 sfg.define_component("src1", gr.null_source (gr.sizeof_int))
131 sfg.define_component("nop1", gr.nop (gr.sizeof_int))
132 sfg.define_component("dst1", gr.null_sink (gr.sizeof_int))
133 sfg.define_component("dst2", gr.null_sink (gr.sizeof_int))
134 sfg.connect("src1", 0, "nop1", 0)
135 sfg.connect("src1", 0, "nop1", 1)
136 sfg.connect("nop1", 0, "dst1", 0)
137 sfg.connect("nop1", 2, "dst2", 0)
138 self.assertRaises(RuntimeError,
139 lambda: sfg.validate ())
141 if __name__ == "__main__":