Imported Upstream version 3.0
[debian/gnuradio] / gnuradio-core / src / lib / runtime / gr_msg_queue.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2005 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 #include <gr_msg_queue.h>
27 #include <stdexcept>
28
29
30 gr_msg_queue_sptr
31 gr_make_msg_queue(unsigned int limit)
32 {
33   return gr_msg_queue_sptr (new gr_msg_queue(limit));
34 }
35
36
37 gr_msg_queue::gr_msg_queue(unsigned int limit)
38   : d_not_empty(&d_mutex), d_not_full(&d_mutex),
39     /*d_head(0), d_tail(0),*/ d_count(0), d_limit(limit)
40 {
41 }
42
43 gr_msg_queue::~gr_msg_queue()
44 {
45   flush ();
46 }
47
48 void
49 gr_msg_queue::insert_tail(gr_message_sptr msg)
50 {
51   if (msg->d_next)
52     throw std::invalid_argument("gr_msg_queue::insert_tail: msg already in queue");
53
54   omni_mutex_lock       l(d_mutex);
55
56   while (full_p())
57     d_not_full.wait();
58
59   if (d_tail == 0){
60     d_tail = d_head = msg;
61     //msg->d_next = 0;
62     msg->d_next.reset();
63   }
64   else {
65     d_tail->d_next = msg;
66     d_tail = msg;
67     //msg->d_next = 0;
68     msg->d_next.reset();
69   }
70   d_count++;
71   d_not_empty.signal();
72 }
73
74 gr_message_sptr
75 gr_msg_queue::delete_head()
76 {
77   omni_mutex_lock       l(d_mutex);
78   gr_message_sptr       m;
79
80   while ((m = d_head) == 0)
81     d_not_empty.wait();
82
83   d_head = m->d_next;
84   if (d_head == 0){
85     //d_tail = 0;
86     d_tail.reset();
87   }
88
89   d_count--;
90   // m->d_next = 0;
91   m->d_next.reset();
92   d_not_full.signal();
93   return m;
94 }
95
96 gr_message_sptr
97 gr_msg_queue::delete_head_nowait()
98 {
99   omni_mutex_lock       l(d_mutex);
100   gr_message_sptr       m;
101
102   if ((m = d_head) == 0){
103     //return 0;
104     return gr_message_sptr();
105   }
106
107   d_head = m->d_next;
108   if (d_head == 0){
109     //d_tail = 0;
110     d_tail.reset();
111   }
112
113   d_count--;
114   //m->d_next = 0;
115   m->d_next.reset();
116   d_not_full.signal();
117   return m;
118 }
119
120 void
121 gr_msg_queue::flush()
122 {
123   gr_message_sptr       m;
124
125   while ((m = delete_head_nowait ()) != 0)
126     ;
127 }