Imported Upstream version 3.2.2
[debian/gnuradio] / mblock / src / lib / qa_bitset.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include <mblock/mblock.h>
26 #include <mblock/protocol_class.h>
27 #include <mblock/message.h>
28 #include <mblock/class_registry.h>
29 #include <iostream>
30 #include <cstdio>
31 #include <sstream>
32 #include <bitset>
33
34 static pmt_t s_in = pmt_intern("in");
35 static pmt_t s_out = pmt_intern("out");
36 static pmt_t s_data = pmt_intern("data");
37 static pmt_t s_start = pmt_intern("start");
38 static pmt_t s_send_batch = pmt_intern("send-batch");
39 static pmt_t s_long0 = pmt_from_long(0);
40
41 static std::string
42 str(long x)
43 {
44   std::ostringstream s;
45   s << x;
46   return s.str();
47 }
48
49 /*!
50  * \brief mblock used for QA.
51  *
52  * Messages arriving on "in" consist of a pair containing a (long)
53  * message number in the car, and a (long) bitmap in the cdr.  For
54  * each message received on "in", a new message is sent on "out".  The
55  * new message is the same format as the input, but the bitmap in
56  * the cdr has a "1" or'd into it that corresponds to the bit number
57  * specified in the constructor.
58  *
59  * The bitmap can be used by the ultimate receiver to confirm
60  * traversal of a set of blocks, if the blocks are assigned unique bit
61  * numbers.
62  */
63 class qa_bitset : public mb_mblock
64 {
65   mb_port_sptr  d_in;
66   mb_port_sptr  d_out;
67   int           d_bitno;
68
69 public:
70   qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
71   void handle_message(mb_message_sptr msg);
72 };
73
74 qa_bitset::qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
75   : mb_mblock(runtime, instance_name, user_arg)
76 {
77   d_bitno = pmt_to_long(user_arg);      // The bit we are to set
78
79   d_in  = define_port("in", "qa-bitset", false, mb_port::EXTERNAL);
80   d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL);
81 }
82
83 void
84 qa_bitset::handle_message(mb_message_sptr msg)
85 {
86   if (pmt_eq(msg->port_id(), s_in) && pmt_eq(msg->signal(), s_data)){
87     d_out->send(s_data,
88                 pmt_cons(pmt_car(msg->data()),
89                          pmt_from_long((1L << d_bitno) | pmt_to_long(pmt_cdr(msg->data())))));
90   }
91 }
92
93 REGISTER_MBLOCK_CLASS(qa_bitset);
94
95 // ------------------------------------------------------------------------
96
97 /*!
98  * \brief mblock used for QA.  Compose two qa_bitset mblocks.
99  */
100 class qa_bitset2 : public mb_mblock
101 {
102   mb_port_sptr  d_in;
103   mb_port_sptr  d_out;
104
105 public:
106   qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
107 };
108
109 qa_bitset2::qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
110   : mb_mblock(runtime, instance_name, user_arg)
111 {
112   long bitno = pmt_to_long(user_arg);   // The bit we are to set
113
114   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
115   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
116
117   define_component("bs0", "qa_bitset", pmt_from_long(bitno));
118   define_component("bs1", "qa_bitset", pmt_from_long(bitno + 1));
119   connect("self", "in", "bs0", "in");
120   connect("bs0", "out", "bs1", "in");
121   connect("bs1", "out", "self", "out");
122 }
123
124 REGISTER_MBLOCK_CLASS(qa_bitset2);
125
126 // ------------------------------------------------------------------------
127
128 /*!
129  * \brief mblock used for QA.  Compose two qa_bitset2 mblocks.
130  */
131 class qa_bitset4 : public mb_mblock
132 {
133   mb_port_sptr  d_in;
134   mb_port_sptr  d_out;
135
136 public:
137   qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
138 };
139
140 qa_bitset4::qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
141   : mb_mblock(runtime, instance_name, user_arg)
142 {
143   long bitno = pmt_to_long(user_arg);   // The bit we are to set
144
145   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
146   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
147
148   define_component("bs0", "qa_bitset2", pmt_from_long(bitno));
149   define_component("bs1", "qa_bitset2", pmt_from_long(bitno + 2));
150   connect("self", "in", "bs0", "in");
151   connect("bs0", "out", "bs1", "in");
152   connect("bs1", "out", "self", "out");
153 }
154
155 REGISTER_MBLOCK_CLASS(qa_bitset4);
156
157 // ------------------------------------------------------------------------
158
159 /*!
160  * \brief mblock used for QA.  Compose two qa_bitset4 mblocks.
161  */
162 class qa_bitset8 : public mb_mblock
163 {
164   mb_port_sptr  d_in;
165   mb_port_sptr  d_out;
166
167 public:
168   qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
169 };
170
171 qa_bitset8::qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
172   : mb_mblock(runtime, instance_name, user_arg)
173 {
174   long bitno = pmt_to_long(user_arg);   // The bit we are to set
175
176   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
177   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
178
179   define_component("bs0", "qa_bitset4", pmt_from_long(bitno));
180   define_component("bs1", "qa_bitset4", pmt_from_long(bitno + 4));
181   connect("self", "in", "bs0", "in");
182   connect("bs0", "out", "bs1", "in");
183   connect("bs1", "out", "self", "out");
184 }
185
186 REGISTER_MBLOCK_CLASS(qa_bitset8);
187
188 // ------------------------------------------------------------------------
189
190 /*!
191  * \brief mblock used for QA.  Compose two qa_bitset8 mblocks.
192  */
193 class qa_bitset16 : public mb_mblock
194 {
195   mb_port_sptr  d_in;
196   mb_port_sptr  d_out;
197
198 public:
199   qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
200 };
201
202 qa_bitset16::qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
203   : mb_mblock(runtime, instance_name, user_arg)
204 {
205   long bitno = pmt_to_long(user_arg);   // The bit we are to set
206
207   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
208   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
209
210   define_component("bs0", "qa_bitset8", pmt_from_long(bitno));
211   define_component("bs1", "qa_bitset8", pmt_from_long(bitno + 8));
212   connect("self", "in", "bs0", "in");
213   connect("bs0", "out", "bs1", "in");
214   connect("bs1", "out", "self", "out");
215 }
216
217 REGISTER_MBLOCK_CLASS(qa_bitset16);
218
219 // ------------------------------------------------------------------------
220
221 /*!
222  * \brief mblock used for QA.  Compose two qa_bitset16 mblocks.
223  */
224 class qa_bitset32 : public mb_mblock
225 {
226   mb_port_sptr  d_in;
227   mb_port_sptr  d_out;
228
229 public:
230   qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
231 };
232
233 qa_bitset32::qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
234   : mb_mblock(runtime, instance_name, user_arg)
235 {
236   long bitno = pmt_to_long(user_arg);   // The bit we are to set
237
238   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
239   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
240
241   define_component("bs0", "qa_bitset16", pmt_from_long(bitno));
242   define_component("bs1", "qa_bitset16", pmt_from_long(bitno + 16));
243   connect("self", "in", "bs0", "in");
244   connect("bs0", "out", "bs1", "in");
245   connect("bs1", "out", "self", "out");
246 }
247
248 REGISTER_MBLOCK_CLASS(qa_bitset32);
249
250 // ------------------------------------------------------------------------
251
252 class qa_bitset_src : public mb_mblock
253 {
254   mb_port_sptr  d_cs_top;
255   mb_port_sptr  d_cs;
256   
257   mb_port_sptr  d_out;
258
259   long          d_msg_number;           // starting message number
260   long          d_nmsgs_to_send;        // # of messages to send
261   long          d_batch_size;           // # of messages to send per batch
262   
263 public:
264   qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
265   void handle_message(mb_message_sptr msg);
266
267 protected:
268   void send_one();
269   void send_batch();
270 };
271
272 qa_bitset_src::qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
273   : mb_mblock(runtime, instance_name, user_arg)
274 {
275   d_msg_number    = pmt_to_long(pmt_nth(0, user_arg));
276   d_nmsgs_to_send = pmt_to_long(pmt_nth(1, user_arg));
277   d_batch_size    = pmt_to_long(pmt_nth(2, user_arg));
278
279   d_cs_top = define_port("cs_top", "qa-bitset-cs", true, mb_port::EXTERNAL);
280   d_cs = define_port("cs", "qa-bitset-cs", true, mb_port::EXTERNAL);
281
282   d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL);
283 }
284
285 void
286 qa_bitset_src::handle_message(mb_message_sptr msg)
287 {
288   if ((pmt_eq(msg->port_id(), d_cs_top->port_symbol())
289        || pmt_eq(msg->port_id(), d_cs->port_symbol()))
290       && pmt_eq(msg->signal(), s_send_batch)){
291     send_batch();
292   }
293 }
294
295 void
296 qa_bitset_src::send_batch()
297 {
298   for (int i = 0; i < d_batch_size; i++)
299     send_one();
300 }
301
302 void
303 qa_bitset_src::send_one()
304 {
305   if (d_nmsgs_to_send > 0){
306     pmt_t msg_number = pmt_from_long(d_msg_number++);
307     d_out->send(s_data, pmt_cons(msg_number, s_long0));
308   }
309   if (--d_nmsgs_to_send <= 0)
310     exit();
311 }
312
313 REGISTER_MBLOCK_CLASS(qa_bitset_src);
314
315 // ------------------------------------------------------------------------
316
317 class qa_bitset_sink : public mb_mblock
318 {
319   // Maximum number of messages we can track
320   static const size_t MAX_MSGS = 1 * 1024 * 1024; 
321   
322   mb_port_sptr  d_cs0;
323   mb_port_sptr  d_cs1;
324   mb_port_sptr  d_cs2;
325   mb_port_sptr  d_cs3;
326   
327   mb_port_sptr  d_in0;
328   mb_port_sptr  d_in1;
329   mb_port_sptr  d_in2;
330   mb_port_sptr  d_in3;
331
332   long                  d_nmsgs_to_recv; // # of messages to receive
333   long                  d_batch_size;    // # of messages to receive per batch
334   uint32_t              d_expected_mask;
335
336   std::bitset<MAX_MSGS> d_bitset;
337   long                  d_nrecvd;
338   
339 public:
340   qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
341   void handle_message(mb_message_sptr msg);
342
343 protected:
344   void receive_one(mb_message_sptr msg);
345 };
346
347 qa_bitset_sink::qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
348   : mb_mblock(runtime, instance_name, user_arg),
349     d_nrecvd(0)
350 {
351   d_nmsgs_to_recv = pmt_to_long(pmt_nth(0, user_arg));
352   d_batch_size    = pmt_to_long(pmt_nth(1, user_arg));
353   d_expected_mask = pmt_to_long(pmt_nth(2, user_arg));
354
355   if (d_nmsgs_to_recv > (long) MAX_MSGS)
356     throw std::out_of_range("qa_bitset_sink: nmsgs_to_recv is too big");
357
358   if (d_batch_size < 1)
359     throw std::out_of_range("qa_bitset_sink: batch_size must be >= 1");
360
361   d_cs0 = define_port("cs0", "qa-bitset-cs", true, mb_port::EXTERNAL);
362   d_cs1 = define_port("cs1", "qa-bitset-cs", true, mb_port::EXTERNAL);
363   d_cs2 = define_port("cs2", "qa-bitset-cs", true, mb_port::EXTERNAL);
364   d_cs3 = define_port("cs3", "qa-bitset-cs", true, mb_port::EXTERNAL);
365
366   d_in0 = define_port("in0", "qa-bitset", false, mb_port::EXTERNAL);
367   d_in1 = define_port("in1", "qa-bitset", false, mb_port::EXTERNAL);
368   d_in2 = define_port("in2", "qa-bitset", false, mb_port::EXTERNAL);
369   d_in3 = define_port("in3", "qa-bitset", false, mb_port::EXTERNAL);
370 }
371
372 void
373 qa_bitset_sink::handle_message(mb_message_sptr msg)
374 {
375   if ((pmt_eq(msg->port_id(), d_in0->port_symbol())
376        || pmt_eq(msg->port_id(), d_in1->port_symbol())
377        || pmt_eq(msg->port_id(), d_in2->port_symbol())
378        || pmt_eq(msg->port_id(), d_in3->port_symbol()))
379       && pmt_eq(msg->signal(), s_data)){
380
381     receive_one(msg);
382   }
383 }
384
385 void
386 qa_bitset_sink::receive_one(mb_message_sptr msg)
387 {
388   long msg_number = pmt_to_long(pmt_car(msg->data()));
389   uint32_t mask = pmt_to_long(pmt_cdr(msg->data()));
390
391   // std::cout << msg->data() << std::endl;
392
393   d_nrecvd++;
394   if (d_nrecvd % d_batch_size == d_batch_size - 1){
395     d_cs0->send(s_send_batch);
396     d_cs1->send(s_send_batch);
397     d_cs2->send(s_send_batch);
398     d_cs3->send(s_send_batch);
399   }
400
401   if (msg_number >= d_nmsgs_to_recv){
402     std::cerr << "qa_bitset_sink::receive_one: msg_number too big ("
403               << msg_number << ")\n";
404     shutdown_all(PMT_F);
405     return;
406   }
407   if (mask != d_expected_mask){
408     fprintf(stderr,
409             "qa_bitset_sink::receive_one: Wrong mask.  Expected 0x%08x, got 0x%08x\n",
410             d_expected_mask, mask);
411     shutdown_all(PMT_F);
412     return;
413   }
414
415   if (d_bitset.test((size_t) msg_number)){
416     std::cerr << "qa_bitset_sink::receive_one: duplicate msg_number ("
417               << msg_number << ")\n";
418     shutdown_all(PMT_F);
419     return;
420   }
421
422   d_bitset.set((size_t) msg_number);
423   if (d_nrecvd == d_nmsgs_to_recv)
424     shutdown_all(PMT_T);                // we're done!
425 }
426
427 REGISTER_MBLOCK_CLASS(qa_bitset_sink);
428
429 // ------------------------------------------------------------------------
430
431 class qa_bitset_top : public mb_mblock
432 {
433   static const int NPIPES = 4;
434
435   std::vector<mb_port_sptr>     d_cs;
436   
437   long                  d_nmsgs;         // # of messages to send
438   long                  d_batch_size;    // # of messages to receive per batch
439
440 public:
441   qa_bitset_top(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
442   void initial_transition();
443 };
444
445 qa_bitset_top::qa_bitset_top(mb_runtime *runtime,
446                              const std::string &instance_name, pmt_t user_arg)
447   : mb_mblock(runtime, instance_name, user_arg)
448 {
449   d_nmsgs      = pmt_to_long(pmt_nth(0, user_arg));
450   d_nmsgs = (d_nmsgs / NPIPES) * NPIPES;
451   d_batch_size = pmt_to_long(pmt_nth(1, user_arg));
452
453   /*
454    * We build NPIPES sources which feed NPIPES pipelines, each of which
455    * consists of 8-mblocks.  All pipelines feed into a single sink
456    * which keeps track the results.
457    */
458   for (int i = 0; i < NPIPES; i++){
459     d_cs.push_back(define_port("cs"+str(i), "qa-bitset-cs", false, mb_port::INTERNAL));
460   
461     // sources of test messages
462     define_component("src"+str(i), "qa_bitset_src",
463                      pmt_list3(pmt_from_long(i * d_nmsgs/NPIPES),
464                                pmt_from_long(d_nmsgs/NPIPES),
465                                pmt_from_long(d_batch_size)));
466
467     // 8-mblock processing pipelines
468     define_component("pipeline"+str(i), "qa_bitset8", pmt_from_long(0));
469   }
470
471   // sink for output of pipelines
472   define_component("sink", "qa_bitset_sink",
473                    pmt_list3(pmt_from_long(d_nmsgs),
474                              pmt_from_long(d_batch_size * NPIPES),
475                              pmt_from_long(0x000000ff)));
476
477   for (int i = 0; i < NPIPES; i++){
478     connect("self", "cs"+str(i), "src"+str(i), "cs_top");
479     connect("src"+str(i), "out", "pipeline"+str(i), "in");
480     connect("src"+str(i), "cs", "sink", "cs"+str(i));
481     connect("pipeline"+str(i), "out", "sink", "in"+str(i));
482   }
483 }
484
485 void
486 qa_bitset_top::initial_transition()
487 {
488   for (int i = 0; i < NPIPES; i++){
489     d_cs[i]->send(s_send_batch);        // prime the pump
490     d_cs[i]->send(s_send_batch);
491   }
492 }
493
494 REGISTER_MBLOCK_CLASS(qa_bitset_top);