3 * Copyright 2007 Free Software Foundation, Inc.
5 * This file is part of GNU Radio
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2, or (at your option)
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; if not, write to the Free Software Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 #include <mb_mblock.h>
26 #include <mb_protocol_class.h>
27 #include <mb_message.h>
28 #include <mb_class_registry.h>
33 static pmt_t s_in = pmt_intern("in");
34 static pmt_t s_out = pmt_intern("out");
35 static pmt_t s_data = pmt_intern("data");
36 static pmt_t s_start = pmt_intern("start");
37 static pmt_t s_send_batch = pmt_intern("send-batch");
38 static pmt_t s_long0 = pmt_from_long(0);
49 * \brief mblock used for QA.
51 * Messages arriving on "in" consist of a pair containing a (long)
52 * message number in the car, and a (long) bitmap in the cdr. For
53 * each message received on "in", a new message is sent on "out". The
54 * new message is the same format as the input, but the bitmap in
55 * the cdr has a "1" or'd into it that corresponds to the bit number
56 * specified in the constructor.
58 * The bitmap can be used by the ultimate receiver to confirm
59 * traversal of a set of blocks, if the blocks are assigned unique bit
62 class qa_bitset : public mb_mblock
69 qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
70 void handle_message(mb_message_sptr msg);
73 qa_bitset::qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
74 : mb_mblock(runtime, instance_name, user_arg)
76 d_bitno = pmt_to_long(user_arg); // The bit we are to set
78 d_in = define_port("in", "qa-bitset", false, mb_port::EXTERNAL);
79 d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL);
83 qa_bitset::handle_message(mb_message_sptr msg)
85 if (pmt_eq(msg->port_id(), s_in) && pmt_eq(msg->signal(), s_data)){
87 pmt_cons(pmt_car(msg->data()),
88 pmt_from_long((1L << d_bitno) | pmt_to_long(pmt_cdr(msg->data())))));
92 REGISTER_MBLOCK_CLASS(qa_bitset);
94 // ------------------------------------------------------------------------
97 * \brief mblock used for QA. Compose two qa_bitset mblocks.
99 class qa_bitset2 : public mb_mblock
105 qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
108 qa_bitset2::qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
109 : mb_mblock(runtime, instance_name, user_arg)
111 long bitno = pmt_to_long(user_arg); // The bit we are to set
113 d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
114 d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
116 define_component("bs0", "qa_bitset", pmt_from_long(bitno));
117 define_component("bs1", "qa_bitset", pmt_from_long(bitno + 1));
118 connect("self", "in", "bs0", "in");
119 connect("bs0", "out", "bs1", "in");
120 connect("bs1", "out", "self", "out");
123 REGISTER_MBLOCK_CLASS(qa_bitset2);
125 // ------------------------------------------------------------------------
128 * \brief mblock used for QA. Compose two qa_bitset2 mblocks.
130 class qa_bitset4 : public mb_mblock
136 qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
139 qa_bitset4::qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
140 : mb_mblock(runtime, instance_name, user_arg)
142 long bitno = pmt_to_long(user_arg); // The bit we are to set
144 d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
145 d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
147 define_component("bs0", "qa_bitset2", pmt_from_long(bitno));
148 define_component("bs1", "qa_bitset2", pmt_from_long(bitno + 2));
149 connect("self", "in", "bs0", "in");
150 connect("bs0", "out", "bs1", "in");
151 connect("bs1", "out", "self", "out");
154 REGISTER_MBLOCK_CLASS(qa_bitset4);
156 // ------------------------------------------------------------------------
159 * \brief mblock used for QA. Compose two qa_bitset4 mblocks.
161 class qa_bitset8 : public mb_mblock
167 qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
170 qa_bitset8::qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
171 : mb_mblock(runtime, instance_name, user_arg)
173 long bitno = pmt_to_long(user_arg); // The bit we are to set
175 d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
176 d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
178 define_component("bs0", "qa_bitset4", pmt_from_long(bitno));
179 define_component("bs1", "qa_bitset4", pmt_from_long(bitno + 4));
180 connect("self", "in", "bs0", "in");
181 connect("bs0", "out", "bs1", "in");
182 connect("bs1", "out", "self", "out");
185 REGISTER_MBLOCK_CLASS(qa_bitset8);
187 // ------------------------------------------------------------------------
190 * \brief mblock used for QA. Compose two qa_bitset8 mblocks.
192 class qa_bitset16 : public mb_mblock
198 qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
201 qa_bitset16::qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
202 : mb_mblock(runtime, instance_name, user_arg)
204 long bitno = pmt_to_long(user_arg); // The bit we are to set
206 d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
207 d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
209 define_component("bs0", "qa_bitset8", pmt_from_long(bitno));
210 define_component("bs1", "qa_bitset8", pmt_from_long(bitno + 8));
211 connect("self", "in", "bs0", "in");
212 connect("bs0", "out", "bs1", "in");
213 connect("bs1", "out", "self", "out");
216 REGISTER_MBLOCK_CLASS(qa_bitset16);
218 // ------------------------------------------------------------------------
221 * \brief mblock used for QA. Compose two qa_bitset16 mblocks.
223 class qa_bitset32 : public mb_mblock
229 qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
232 qa_bitset32::qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
233 : mb_mblock(runtime, instance_name, user_arg)
235 long bitno = pmt_to_long(user_arg); // The bit we are to set
237 d_in = define_port("in", "qa-bitset", false, mb_port::RELAY);
238 d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
240 define_component("bs0", "qa_bitset16", pmt_from_long(bitno));
241 define_component("bs1", "qa_bitset16", pmt_from_long(bitno + 16));
242 connect("self", "in", "bs0", "in");
243 connect("bs0", "out", "bs1", "in");
244 connect("bs1", "out", "self", "out");
247 REGISTER_MBLOCK_CLASS(qa_bitset32);
249 // ------------------------------------------------------------------------
251 class qa_bitset_src : public mb_mblock
253 mb_port_sptr d_cs_top;
258 long d_msg_number; // starting message number
259 long d_nmsgs_to_send; // # of messages to send
260 long d_batch_size; // # of messages to send per batch
263 qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
264 void handle_message(mb_message_sptr msg);
271 qa_bitset_src::qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
272 : mb_mblock(runtime, instance_name, user_arg)
274 d_msg_number = pmt_to_long(pmt_nth(0, user_arg));
275 d_nmsgs_to_send = pmt_to_long(pmt_nth(1, user_arg));
276 d_batch_size = pmt_to_long(pmt_nth(2, user_arg));
278 d_cs_top = define_port("cs_top", "qa-bitset-cs", true, mb_port::EXTERNAL);
279 d_cs = define_port("cs", "qa-bitset-cs", true, mb_port::EXTERNAL);
281 d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL);
285 qa_bitset_src::handle_message(mb_message_sptr msg)
287 if ((pmt_eq(msg->port_id(), d_cs_top->port_symbol())
288 || pmt_eq(msg->port_id(), d_cs->port_symbol()))
289 && pmt_eq(msg->signal(), s_send_batch)){
295 qa_bitset_src::send_batch()
297 for (int i = 0; i < d_batch_size; i++)
302 qa_bitset_src::send_one()
304 if (d_nmsgs_to_send > 0){
305 pmt_t msg_number = pmt_from_long(d_msg_number++);
306 d_out->send(s_data, pmt_cons(msg_number, s_long0));
308 if (--d_nmsgs_to_send <= 0)
312 REGISTER_MBLOCK_CLASS(qa_bitset_src);
314 // ------------------------------------------------------------------------
316 class qa_bitset_sink : public mb_mblock
318 // Maximum number of messages we can track
319 static const size_t MAX_MSGS = 1 * 1024 * 1024;
331 long d_nmsgs_to_recv; // # of messages to receive
332 long d_batch_size; // # of messages to receive per batch
333 uint32_t d_expected_mask;
335 std::bitset<MAX_MSGS> d_bitset;
339 qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
340 void handle_message(mb_message_sptr msg);
343 void receive_one(mb_message_sptr msg);
346 qa_bitset_sink::qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
347 : mb_mblock(runtime, instance_name, user_arg),
350 d_nmsgs_to_recv = pmt_to_long(pmt_nth(0, user_arg));
351 d_batch_size = pmt_to_long(pmt_nth(1, user_arg));
352 d_expected_mask = pmt_to_long(pmt_nth(2, user_arg));
354 if (d_nmsgs_to_recv > (long) MAX_MSGS)
355 throw std::out_of_range("qa_bitset_sink: nmsgs_to_recv is too big");
357 if (d_batch_size < 1)
358 throw std::out_of_range("qa_bitset_sink: batch_size must be >= 1");
360 d_cs0 = define_port("cs0", "qa-bitset-cs", true, mb_port::EXTERNAL);
361 d_cs1 = define_port("cs1", "qa-bitset-cs", true, mb_port::EXTERNAL);
362 d_cs2 = define_port("cs2", "qa-bitset-cs", true, mb_port::EXTERNAL);
363 d_cs3 = define_port("cs3", "qa-bitset-cs", true, mb_port::EXTERNAL);
365 d_in0 = define_port("in0", "qa-bitset", false, mb_port::EXTERNAL);
366 d_in1 = define_port("in1", "qa-bitset", false, mb_port::EXTERNAL);
367 d_in2 = define_port("in2", "qa-bitset", false, mb_port::EXTERNAL);
368 d_in3 = define_port("in3", "qa-bitset", false, mb_port::EXTERNAL);
372 qa_bitset_sink::handle_message(mb_message_sptr msg)
374 if ((pmt_eq(msg->port_id(), d_in0->port_symbol())
375 || pmt_eq(msg->port_id(), d_in1->port_symbol())
376 || pmt_eq(msg->port_id(), d_in2->port_symbol())
377 || pmt_eq(msg->port_id(), d_in3->port_symbol()))
378 && pmt_eq(msg->signal(), s_data)){
385 qa_bitset_sink::receive_one(mb_message_sptr msg)
387 long msg_number = pmt_to_long(pmt_car(msg->data()));
388 uint32_t mask = pmt_to_long(pmt_cdr(msg->data()));
390 // std::cout << msg->data() << std::endl;
393 if (d_nrecvd % d_batch_size == d_batch_size - 1){
394 d_cs0->send(s_send_batch);
395 d_cs1->send(s_send_batch);
396 d_cs2->send(s_send_batch);
397 d_cs3->send(s_send_batch);
400 if (msg_number >= d_nmsgs_to_recv){
401 std::cerr << "qa_bitset_sink::receive_one: msg_number too big ("
402 << msg_number << ")\n";
406 if (mask != d_expected_mask){
408 "qa_bitset_sink::receive_one: Wrong mask. Expected 0x%08x, got 0x%08x\n",
409 d_expected_mask, mask);
414 if (d_bitset.test((size_t) msg_number)){
415 std::cerr << "qa_bitset_sink::receive_one: duplicate msg_number ("
416 << msg_number << ")\n";
421 d_bitset.set((size_t) msg_number);
422 if (d_nrecvd == d_nmsgs_to_recv)
423 shutdown_all(PMT_T); // we're done!
426 REGISTER_MBLOCK_CLASS(qa_bitset_sink);
428 // ------------------------------------------------------------------------
430 class qa_bitset_top : public mb_mblock
432 static const int NPIPES = 4;
434 std::vector<mb_port_sptr> d_cs;
436 long d_nmsgs; // # of messages to send
437 long d_batch_size; // # of messages to receive per batch
440 qa_bitset_top(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
441 void initial_transition();
444 qa_bitset_top::qa_bitset_top(mb_runtime *runtime,
445 const std::string &instance_name, pmt_t user_arg)
446 : mb_mblock(runtime, instance_name, user_arg)
448 d_nmsgs = pmt_to_long(pmt_nth(0, user_arg));
449 d_nmsgs = (d_nmsgs / NPIPES) * NPIPES;
450 d_batch_size = pmt_to_long(pmt_nth(1, user_arg));
453 * We build NPIPES sources which feed NPIPES pipelines, each of which
454 * consists of 8-mblocks. All pipelines feed into a single sink
455 * which keeps track the results.
457 for (int i = 0; i < NPIPES; i++){
458 d_cs.push_back(define_port("cs"+str(i), "qa-bitset-cs", false, mb_port::INTERNAL));
460 // sources of test messages
461 define_component("src"+str(i), "qa_bitset_src",
462 pmt_list3(pmt_from_long(i * d_nmsgs/NPIPES),
463 pmt_from_long(d_nmsgs/NPIPES),
464 pmt_from_long(d_batch_size)));
466 // 8-mblock processing pipelines
467 define_component("pipeline"+str(i), "qa_bitset8", pmt_from_long(0));
470 // sink for output of pipelines
471 define_component("sink", "qa_bitset_sink",
472 pmt_list3(pmt_from_long(d_nmsgs),
473 pmt_from_long(d_batch_size * NPIPES),
474 pmt_from_long(0x000000ff)));
476 for (int i = 0; i < NPIPES; i++){
477 connect("self", "cs"+str(i), "src"+str(i), "cs_top");
478 connect("src"+str(i), "out", "pipeline"+str(i), "in");
479 connect("src"+str(i), "cs", "sink", "cs"+str(i));
480 connect("pipeline"+str(i), "out", "sink", "in"+str(i));
485 qa_bitset_top::initial_transition()
487 for (int i = 0; i < NPIPES; i++){
488 d_cs[i]->send(s_send_batch); // prime the pump
489 d_cs[i]->send(s_send_batch);
493 REGISTER_MBLOCK_CLASS(qa_bitset_top);