Convert gr-audio-portaudio to Boost via gruel
[debian/gnuradio] / mblock / src / lib / qa_bitset.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include <mblock/mblock.h>
26 #include <mblock/protocol_class.h>
27 #include <mblock/message.h>
28 #include <mblock/class_registry.h>
29 #include <iostream>
30 #include <cstdio>
31 #include <sstream>
32 #include <bitset>
33
34 using namespace pmt;
35
36 static pmt_t s_in = pmt_intern("in");
37 static pmt_t s_out = pmt_intern("out");
38 static pmt_t s_data = pmt_intern("data");
39 static pmt_t s_start = pmt_intern("start");
40 static pmt_t s_send_batch = pmt_intern("send-batch");
41 static pmt_t s_long0 = pmt_from_long(0);
42
43 static std::string
44 str(long x)
45 {
46   std::ostringstream s;
47   s << x;
48   return s.str();
49 }
50
51 /*!
52  * \brief mblock used for QA.
53  *
54  * Messages arriving on "in" consist of a pair containing a (long)
55  * message number in the car, and a (long) bitmap in the cdr.  For
56  * each message received on "in", a new message is sent on "out".  The
57  * new message is the same format as the input, but the bitmap in
58  * the cdr has a "1" or'd into it that corresponds to the bit number
59  * specified in the constructor.
60  *
61  * The bitmap can be used by the ultimate receiver to confirm
62  * traversal of a set of blocks, if the blocks are assigned unique bit
63  * numbers.
64  */
65 class qa_bitset : public mb_mblock
66 {
67   mb_port_sptr  d_in;
68   mb_port_sptr  d_out;
69   int           d_bitno;
70
71 public:
72   qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
73   void handle_message(mb_message_sptr msg);
74 };
75
76 qa_bitset::qa_bitset(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
77   : mb_mblock(runtime, instance_name, user_arg)
78 {
79   d_bitno = pmt_to_long(user_arg);      // The bit we are to set
80
81   d_in  = define_port("in", "qa-bitset", false, mb_port::EXTERNAL);
82   d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL);
83 }
84
85 void
86 qa_bitset::handle_message(mb_message_sptr msg)
87 {
88   if (pmt_eq(msg->port_id(), s_in) && pmt_eq(msg->signal(), s_data)){
89     d_out->send(s_data,
90                 pmt_cons(pmt_car(msg->data()),
91                          pmt_from_long((1L << d_bitno) | pmt_to_long(pmt_cdr(msg->data())))));
92   }
93 }
94
95 REGISTER_MBLOCK_CLASS(qa_bitset);
96
97 // ------------------------------------------------------------------------
98
99 /*!
100  * \brief mblock used for QA.  Compose two qa_bitset mblocks.
101  */
102 class qa_bitset2 : public mb_mblock
103 {
104   mb_port_sptr  d_in;
105   mb_port_sptr  d_out;
106
107 public:
108   qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
109 };
110
111 qa_bitset2::qa_bitset2(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
112   : mb_mblock(runtime, instance_name, user_arg)
113 {
114   long bitno = pmt_to_long(user_arg);   // The bit we are to set
115
116   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
117   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
118
119   define_component("bs0", "qa_bitset", pmt_from_long(bitno));
120   define_component("bs1", "qa_bitset", pmt_from_long(bitno + 1));
121   connect("self", "in", "bs0", "in");
122   connect("bs0", "out", "bs1", "in");
123   connect("bs1", "out", "self", "out");
124 }
125
126 REGISTER_MBLOCK_CLASS(qa_bitset2);
127
128 // ------------------------------------------------------------------------
129
130 /*!
131  * \brief mblock used for QA.  Compose two qa_bitset2 mblocks.
132  */
133 class qa_bitset4 : public mb_mblock
134 {
135   mb_port_sptr  d_in;
136   mb_port_sptr  d_out;
137
138 public:
139   qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
140 };
141
142 qa_bitset4::qa_bitset4(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
143   : mb_mblock(runtime, instance_name, user_arg)
144 {
145   long bitno = pmt_to_long(user_arg);   // The bit we are to set
146
147   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
148   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
149
150   define_component("bs0", "qa_bitset2", pmt_from_long(bitno));
151   define_component("bs1", "qa_bitset2", pmt_from_long(bitno + 2));
152   connect("self", "in", "bs0", "in");
153   connect("bs0", "out", "bs1", "in");
154   connect("bs1", "out", "self", "out");
155 }
156
157 REGISTER_MBLOCK_CLASS(qa_bitset4);
158
159 // ------------------------------------------------------------------------
160
161 /*!
162  * \brief mblock used for QA.  Compose two qa_bitset4 mblocks.
163  */
164 class qa_bitset8 : public mb_mblock
165 {
166   mb_port_sptr  d_in;
167   mb_port_sptr  d_out;
168
169 public:
170   qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
171 };
172
173 qa_bitset8::qa_bitset8(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
174   : mb_mblock(runtime, instance_name, user_arg)
175 {
176   long bitno = pmt_to_long(user_arg);   // The bit we are to set
177
178   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
179   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
180
181   define_component("bs0", "qa_bitset4", pmt_from_long(bitno));
182   define_component("bs1", "qa_bitset4", pmt_from_long(bitno + 4));
183   connect("self", "in", "bs0", "in");
184   connect("bs0", "out", "bs1", "in");
185   connect("bs1", "out", "self", "out");
186 }
187
188 REGISTER_MBLOCK_CLASS(qa_bitset8);
189
190 // ------------------------------------------------------------------------
191
192 /*!
193  * \brief mblock used for QA.  Compose two qa_bitset8 mblocks.
194  */
195 class qa_bitset16 : public mb_mblock
196 {
197   mb_port_sptr  d_in;
198   mb_port_sptr  d_out;
199
200 public:
201   qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
202 };
203
204 qa_bitset16::qa_bitset16(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
205   : mb_mblock(runtime, instance_name, user_arg)
206 {
207   long bitno = pmt_to_long(user_arg);   // The bit we are to set
208
209   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
210   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
211
212   define_component("bs0", "qa_bitset8", pmt_from_long(bitno));
213   define_component("bs1", "qa_bitset8", pmt_from_long(bitno + 8));
214   connect("self", "in", "bs0", "in");
215   connect("bs0", "out", "bs1", "in");
216   connect("bs1", "out", "self", "out");
217 }
218
219 REGISTER_MBLOCK_CLASS(qa_bitset16);
220
221 // ------------------------------------------------------------------------
222
223 /*!
224  * \brief mblock used for QA.  Compose two qa_bitset16 mblocks.
225  */
226 class qa_bitset32 : public mb_mblock
227 {
228   mb_port_sptr  d_in;
229   mb_port_sptr  d_out;
230
231 public:
232   qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
233 };
234
235 qa_bitset32::qa_bitset32(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
236   : mb_mblock(runtime, instance_name, user_arg)
237 {
238   long bitno = pmt_to_long(user_arg);   // The bit we are to set
239
240   d_in  = define_port("in", "qa-bitset", false, mb_port::RELAY);
241   d_out = define_port("out", "qa-bitset", true, mb_port::RELAY);
242
243   define_component("bs0", "qa_bitset16", pmt_from_long(bitno));
244   define_component("bs1", "qa_bitset16", pmt_from_long(bitno + 16));
245   connect("self", "in", "bs0", "in");
246   connect("bs0", "out", "bs1", "in");
247   connect("bs1", "out", "self", "out");
248 }
249
250 REGISTER_MBLOCK_CLASS(qa_bitset32);
251
252 // ------------------------------------------------------------------------
253
254 class qa_bitset_src : public mb_mblock
255 {
256   mb_port_sptr  d_cs_top;
257   mb_port_sptr  d_cs;
258   
259   mb_port_sptr  d_out;
260
261   long          d_msg_number;           // starting message number
262   long          d_nmsgs_to_send;        // # of messages to send
263   long          d_batch_size;           // # of messages to send per batch
264   
265 public:
266   qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
267   void handle_message(mb_message_sptr msg);
268
269 protected:
270   void send_one();
271   void send_batch();
272 };
273
274 qa_bitset_src::qa_bitset_src(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
275   : mb_mblock(runtime, instance_name, user_arg)
276 {
277   d_msg_number    = pmt_to_long(pmt_nth(0, user_arg));
278   d_nmsgs_to_send = pmt_to_long(pmt_nth(1, user_arg));
279   d_batch_size    = pmt_to_long(pmt_nth(2, user_arg));
280
281   d_cs_top = define_port("cs_top", "qa-bitset-cs", true, mb_port::EXTERNAL);
282   d_cs = define_port("cs", "qa-bitset-cs", true, mb_port::EXTERNAL);
283
284   d_out = define_port("out", "qa-bitset", true, mb_port::EXTERNAL);
285 }
286
287 void
288 qa_bitset_src::handle_message(mb_message_sptr msg)
289 {
290   if ((pmt_eq(msg->port_id(), d_cs_top->port_symbol())
291        || pmt_eq(msg->port_id(), d_cs->port_symbol()))
292       && pmt_eq(msg->signal(), s_send_batch)){
293     send_batch();
294   }
295 }
296
297 void
298 qa_bitset_src::send_batch()
299 {
300   for (int i = 0; i < d_batch_size; i++)
301     send_one();
302 }
303
304 void
305 qa_bitset_src::send_one()
306 {
307   if (d_nmsgs_to_send > 0){
308     pmt_t msg_number = pmt_from_long(d_msg_number++);
309     d_out->send(s_data, pmt_cons(msg_number, s_long0));
310   }
311   if (--d_nmsgs_to_send <= 0)
312     exit();
313 }
314
315 REGISTER_MBLOCK_CLASS(qa_bitset_src);
316
317 // ------------------------------------------------------------------------
318
319 class qa_bitset_sink : public mb_mblock
320 {
321   // Maximum number of messages we can track
322   static const size_t MAX_MSGS = 1 * 1024 * 1024; 
323   
324   mb_port_sptr  d_cs0;
325   mb_port_sptr  d_cs1;
326   mb_port_sptr  d_cs2;
327   mb_port_sptr  d_cs3;
328   
329   mb_port_sptr  d_in0;
330   mb_port_sptr  d_in1;
331   mb_port_sptr  d_in2;
332   mb_port_sptr  d_in3;
333
334   long                  d_nmsgs_to_recv; // # of messages to receive
335   long                  d_batch_size;    // # of messages to receive per batch
336   uint32_t              d_expected_mask;
337
338   std::bitset<MAX_MSGS> d_bitset;
339   long                  d_nrecvd;
340   
341 public:
342   qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
343   void handle_message(mb_message_sptr msg);
344
345 protected:
346   void receive_one(mb_message_sptr msg);
347 };
348
349 qa_bitset_sink::qa_bitset_sink(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg)
350   : mb_mblock(runtime, instance_name, user_arg),
351     d_nrecvd(0)
352 {
353   d_nmsgs_to_recv = pmt_to_long(pmt_nth(0, user_arg));
354   d_batch_size    = pmt_to_long(pmt_nth(1, user_arg));
355   d_expected_mask = pmt_to_long(pmt_nth(2, user_arg));
356
357   if (d_nmsgs_to_recv > (long) MAX_MSGS)
358     throw std::out_of_range("qa_bitset_sink: nmsgs_to_recv is too big");
359
360   if (d_batch_size < 1)
361     throw std::out_of_range("qa_bitset_sink: batch_size must be >= 1");
362
363   d_cs0 = define_port("cs0", "qa-bitset-cs", true, mb_port::EXTERNAL);
364   d_cs1 = define_port("cs1", "qa-bitset-cs", true, mb_port::EXTERNAL);
365   d_cs2 = define_port("cs2", "qa-bitset-cs", true, mb_port::EXTERNAL);
366   d_cs3 = define_port("cs3", "qa-bitset-cs", true, mb_port::EXTERNAL);
367
368   d_in0 = define_port("in0", "qa-bitset", false, mb_port::EXTERNAL);
369   d_in1 = define_port("in1", "qa-bitset", false, mb_port::EXTERNAL);
370   d_in2 = define_port("in2", "qa-bitset", false, mb_port::EXTERNAL);
371   d_in3 = define_port("in3", "qa-bitset", false, mb_port::EXTERNAL);
372 }
373
374 void
375 qa_bitset_sink::handle_message(mb_message_sptr msg)
376 {
377   if ((pmt_eq(msg->port_id(), d_in0->port_symbol())
378        || pmt_eq(msg->port_id(), d_in1->port_symbol())
379        || pmt_eq(msg->port_id(), d_in2->port_symbol())
380        || pmt_eq(msg->port_id(), d_in3->port_symbol()))
381       && pmt_eq(msg->signal(), s_data)){
382
383     receive_one(msg);
384   }
385 }
386
387 void
388 qa_bitset_sink::receive_one(mb_message_sptr msg)
389 {
390   long msg_number = pmt_to_long(pmt_car(msg->data()));
391   uint32_t mask = pmt_to_long(pmt_cdr(msg->data()));
392
393   // std::cout << msg->data() << std::endl;
394
395   d_nrecvd++;
396   if (d_nrecvd % d_batch_size == d_batch_size - 1){
397     d_cs0->send(s_send_batch);
398     d_cs1->send(s_send_batch);
399     d_cs2->send(s_send_batch);
400     d_cs3->send(s_send_batch);
401   }
402
403   if (msg_number >= d_nmsgs_to_recv){
404     std::cerr << "qa_bitset_sink::receive_one: msg_number too big ("
405               << msg_number << ")\n";
406     shutdown_all(PMT_F);
407     return;
408   }
409   if (mask != d_expected_mask){
410     fprintf(stderr,
411             "qa_bitset_sink::receive_one: Wrong mask.  Expected 0x%08x, got 0x%08x\n",
412             d_expected_mask, mask);
413     shutdown_all(PMT_F);
414     return;
415   }
416
417   if (d_bitset.test((size_t) msg_number)){
418     std::cerr << "qa_bitset_sink::receive_one: duplicate msg_number ("
419               << msg_number << ")\n";
420     shutdown_all(PMT_F);
421     return;
422   }
423
424   d_bitset.set((size_t) msg_number);
425   if (d_nrecvd == d_nmsgs_to_recv)
426     shutdown_all(PMT_T);                // we're done!
427 }
428
429 REGISTER_MBLOCK_CLASS(qa_bitset_sink);
430
431 // ------------------------------------------------------------------------
432
433 class qa_bitset_top : public mb_mblock
434 {
435   static const int NPIPES = 4;
436
437   std::vector<mb_port_sptr>     d_cs;
438   
439   long                  d_nmsgs;         // # of messages to send
440   long                  d_batch_size;    // # of messages to receive per batch
441
442 public:
443   qa_bitset_top(mb_runtime *runtime, const std::string &instance_name, pmt_t user_arg);
444   void initial_transition();
445 };
446
447 qa_bitset_top::qa_bitset_top(mb_runtime *runtime,
448                              const std::string &instance_name, pmt_t user_arg)
449   : mb_mblock(runtime, instance_name, user_arg)
450 {
451   d_nmsgs      = pmt_to_long(pmt_nth(0, user_arg));
452   d_nmsgs = (d_nmsgs / NPIPES) * NPIPES;
453   d_batch_size = pmt_to_long(pmt_nth(1, user_arg));
454
455   /*
456    * We build NPIPES sources which feed NPIPES pipelines, each of which
457    * consists of 8-mblocks.  All pipelines feed into a single sink
458    * which keeps track the results.
459    */
460   for (int i = 0; i < NPIPES; i++){
461     d_cs.push_back(define_port("cs"+str(i), "qa-bitset-cs", false, mb_port::INTERNAL));
462   
463     // sources of test messages
464     define_component("src"+str(i), "qa_bitset_src",
465                      pmt_list3(pmt_from_long(i * d_nmsgs/NPIPES),
466                                pmt_from_long(d_nmsgs/NPIPES),
467                                pmt_from_long(d_batch_size)));
468
469     // 8-mblock processing pipelines
470     define_component("pipeline"+str(i), "qa_bitset8", pmt_from_long(0));
471   }
472
473   // sink for output of pipelines
474   define_component("sink", "qa_bitset_sink",
475                    pmt_list3(pmt_from_long(d_nmsgs),
476                              pmt_from_long(d_batch_size * NPIPES),
477                              pmt_from_long(0x000000ff)));
478
479   for (int i = 0; i < NPIPES; i++){
480     connect("self", "cs"+str(i), "src"+str(i), "cs_top");
481     connect("src"+str(i), "out", "pipeline"+str(i), "in");
482     connect("src"+str(i), "cs", "sink", "cs"+str(i));
483     connect("pipeline"+str(i), "out", "sink", "in"+str(i));
484   }
485 }
486
487 void
488 qa_bitset_top::initial_transition()
489 {
490   for (int i = 0; i < NPIPES; i++){
491     d_cs[i]->send(s_send_batch);        // prime the pump
492     d_cs[i]->send(s_send_batch);
493   }
494 }
495
496 REGISTER_MBLOCK_CLASS(qa_bitset_top);