Imported Upstream version 3.2.2
[debian/gnuradio] / mblock / src / lib / mb_msg_queue.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include <mblock/msg_queue.h>
26 #include <mblock/message.h>
27
28
29 mb_msg_queue::mb_msg_queue()
30   : d_not_empty(&d_mutex)
31 {
32 }
33
34 mb_msg_queue::~mb_msg_queue()
35 {
36 }
37
38 void
39 mb_msg_queue::insert(mb_message_sptr msg)
40 {
41   omni_mutex_lock       l(d_mutex);
42   
43   mb_pri_t q = mb_pri_clamp(msg->priority());
44
45   if (d_queue[q].empty_p()){
46     d_queue[q].tail = d_queue[q].head = msg;
47     msg->d_next.reset();        //msg->d_next = 0;
48   }
49   else {
50     d_queue[q].tail->d_next = msg;
51     d_queue[q].tail = msg;
52     msg->d_next.reset();        // msg->d_next = 0;
53   }
54
55   // FIXME set bit in bitmap
56
57   d_not_empty.signal();
58 }
59
60 /*
61  * Delete highest pri message from the queue and return it.
62  * Returns equivalent of zero pointer if queue is empty.
63  *
64  * Caller must be holding d_mutex
65  */
66 mb_message_sptr
67 mb_msg_queue::get_highest_pri_msg_helper()
68 {
69   // FIXME use bitmap and ffz to find best queue in O(1)
70
71   for (mb_pri_t q = 0; q <= MB_PRI_WORST; q++){
72
73     if (!d_queue[q].empty_p()){
74       mb_message_sptr msg = d_queue[q].head;
75       d_queue[q].head = msg->d_next;
76       if (d_queue[q].head == 0){
77         d_queue[q].tail.reset();        // d_queue[q].tail = 0;
78         // FIXME clear bit in bitmap
79       }
80
81       msg->d_next.reset();              // msg->d_next = 0;
82       return msg;
83     }
84   }
85
86   return mb_message_sptr();             // eqv to a zero pointer
87 }
88
89
90 mb_message_sptr
91 mb_msg_queue::get_highest_pri_msg_nowait()
92 {
93   omni_mutex_lock       l(d_mutex);
94
95   return get_highest_pri_msg_helper();
96 }
97
98 mb_message_sptr
99 mb_msg_queue::get_highest_pri_msg()
100 {
101   omni_mutex_lock l(d_mutex);
102
103   while (1){
104     mb_message_sptr msg = get_highest_pri_msg_helper();
105     if (msg)                    // Got one; return it
106       return msg;
107
108     d_not_empty.wait();         // Wait for something
109   }
110 }
111
112 mb_message_sptr
113 mb_msg_queue::get_highest_pri_msg_timedwait(const mb_time &abs_time)
114 {
115   unsigned long secs  = abs_time.d_secs;
116   unsigned long nsecs = abs_time.d_nsecs;
117
118   omni_mutex_lock l(d_mutex);
119
120   while (1){
121     mb_message_sptr msg = get_highest_pri_msg_helper();
122     if (msg)                    // Got one; return it
123       return msg;
124
125     if (!d_not_empty.timedwait(secs, nsecs))    // timed out
126       return mb_message_sptr();                 // eqv to zero pointer
127   }
128 }