3 * Copyright 2006,2009 Free Software Foundation, Inc.
5 * This file is part of GNU Radio.
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
23 #ifndef _CIRCULAR_BUFFER_H_
24 #define _CIRCULAR_BUFFER_H_
26 #include "mld_threads.h"
35 #define DEBUG(X) do{X} while(0);
37 #define DEBUG(X) do{} while(0);
40 template <class T> class circular_buffer
46 // the following are in Items (type T)
47 size_t d_bufLen_I, d_readNdx_I, d_writeNdx_I;
48 size_t d_n_avail_write_I, d_n_avail_read_I;
50 // stuff to control access to class internals
51 mld_mutex_ptr d_internal;
52 mld_condition_ptr d_readBlock, d_writeBlock;
54 // booleans to decide how to control reading, writing, and aborting
55 bool d_doWriteBlock, d_doFullRead, d_doAbort;
57 void delete_mutex_cond () {
73 circular_buffer (size_t bufLen_I,
74 bool doWriteBlock = true, bool doFullRead = false) {
76 throw std::runtime_error ("circular_buffer(): "
77 "Number of items to buffer must be > 0.\n");
78 d_bufLen_I = bufLen_I;
79 d_buffer = (T*) new T[d_bufLen_I];
80 d_doWriteBlock = doWriteBlock;
81 d_doFullRead = doFullRead;
83 d_readBlock = d_writeBlock = NULL;
85 DEBUG (std::cerr << "c_b(): buf len (items) = " << d_bufLen_
86 << ", doWriteBlock = " << (d_doWriteBlock ? "true" : "false")
87 << ", doFullRead = " << (d_doFullRead ? "true" : "false")
96 inline size_t n_avail_write_items () {
98 size_t retVal = d_n_avail_write_I;
99 d_internal->unlock ();
103 inline size_t n_avail_read_items () {
105 size_t retVal = d_n_avail_read_I;
106 d_internal->unlock ();
110 inline size_t buffer_length_items () {return (d_bufLen_I);};
111 inline bool do_write_block () {return (d_doWriteBlock);};
112 inline bool do_full_read () {return (d_doFullRead);};
116 bzero (d_buffer, d_bufLen_I * sizeof (T));
117 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
118 d_n_avail_write_I = d_bufLen_I;
119 delete_mutex_cond ();
120 // create a mutex to handle contention of shared resources;
121 // any routine needed access to shared resources uses lock()
122 // before doing anything, then unlock() when finished.
123 d_internal = new mld_mutex ();
124 // link the internal mutex to the read and write conditions;
125 // when wait() is called, the internal mutex will automatically
126 // be unlock()'ed. Upon return (from a signal() to the condition),
127 // the internal mutex will be lock()'ed.
128 d_readBlock = new mld_condition (d_internal);
129 d_writeBlock = new mld_condition (d_internal);
133 * enqueue: add the given buffer of item-length to the queue,
134 * first-in-first-out (FIFO).
137 * buf: a pointer to the buffer holding the data
139 * bufLen_I: the buffer length in items (of the instantiated type)
142 * -1: on overflow (write is not blocking, and data is being
143 * written faster than it is being read)
144 * 0: if nothing to do (0 length buffer)
146 * 2: in the process of aborting, do doing nothing
148 * will throw runtime errors if inputs are improper:
149 * buffer pointer is NULL
150 * buffer length is larger than the instantiated buffer length
153 int enqueue (T* buf, size_t bufLen_I) {
154 DEBUG (std::cerr << "enqueue: buf = " << (void*) buf
155 << ", bufLen = " << bufLen_I
156 << ", #av_wr = " << d_n_avail_write_I
157 << ", #av_rd = " << d_n_avail_read_I << std::endl);
158 if (bufLen_I > d_bufLen_I) {
159 std::cerr << "ERROR: cannot add buffer longer ("
160 << bufLen_I << ") than instantiated length ("
161 << d_bufLen_I << ")." << std::endl;
162 throw std::runtime_error ("circular_buffer::enqueue()");
168 throw std::runtime_error ("circular_buffer::enqueue(): "
169 "input buffer is NULL.\n");
172 d_internal->unlock ();
175 // set the return value to 1: success; change if needed
177 if (bufLen_I > d_n_avail_write_I) {
178 if (d_doWriteBlock) {
179 while (bufLen_I > d_n_avail_write_I) {
180 DEBUG (std::cerr << "enqueue: #len > #a, waiting." << std::endl);
181 // wait will automatically unlock() the internal mutex
182 d_writeBlock->wait ();
183 // and lock() it here.
185 d_internal->unlock ();
186 DEBUG (std::cerr << "enqueue: #len > #a, aborting." << std::endl);
189 DEBUG (std::cerr << "enqueue: #len > #a, done waiting." << std::endl);
192 d_n_avail_read_I = d_bufLen_I - bufLen_I;
193 d_n_avail_write_I = bufLen_I;
194 DEBUG (std::cerr << "circular_buffer::enqueue: overflow" << std::endl);
198 size_t n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
199 if (n_now_I > bufLen_I)
201 else if (n_now_I < bufLen_I)
202 n_start_I = bufLen_I - n_now_I;
203 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
205 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
206 d_writeNdx_I = n_start_I;
208 d_writeNdx_I += n_now_I;
209 d_n_avail_read_I += bufLen_I;
210 d_n_avail_write_I -= bufLen_I;
211 d_readBlock->signal ();
212 d_internal->unlock ();
217 * dequeue: removes from the queue the number of items requested, or
218 * available, into the given buffer on a FIFO basis.
221 * buf: a pointer to the buffer into which to copy the data
223 * bufLen_I: pointer to the number of items to remove in items
224 * (of the instantiated type)
227 * 0: if nothing to do (0 length buffer)
229 * 2: in the process of aborting, do doing nothing
231 * will throw runtime errors if inputs are improper:
232 * buffer pointer is NULL
233 * buffer length pointer is NULL
234 * buffer length is larger than the instantiated buffer length
237 int dequeue (T* buf, size_t* bufLen_I) {
238 DEBUG (std::cerr << "dequeue: buf = " << ((void*) buf)
239 << ", *bufLen = " << (*bufLen_I)
240 << ", #av_wr = " << d_n_avail_write_I
241 << ", #av_rd = " << d_n_avail_read_I << std::endl);
243 throw std::runtime_error ("circular_buffer::dequeue(): "
244 "input bufLen pointer is NULL.\n");
246 throw std::runtime_error ("circular_buffer::dequeue(): "
247 "input buffer pointer is NULL.\n");
248 size_t l_bufLen_I = *bufLen_I;
251 if (l_bufLen_I > d_bufLen_I) {
252 std::cerr << "ERROR: cannot remove buffer longer ("
253 << l_bufLen_I << ") than instantiated length ("
254 << d_bufLen_I << ")." << std::endl;
255 throw std::runtime_error ("circular_buffer::dequeue()");
260 d_internal->unlock ();
264 while (d_n_avail_read_I < l_bufLen_I) {
265 DEBUG (std::cerr << "dequeue: #a < #len, waiting." << std::endl);
266 // wait will automatically unlock() the internal mutex
267 d_readBlock->wait ();
268 // and lock() it here.
270 d_internal->unlock ();
271 DEBUG (std::cerr << "dequeue: #a < #len, aborting." << std::endl);
274 DEBUG (std::cerr << "dequeue: #a < #len, done waiting." << std::endl);
277 while (d_n_avail_read_I == 0) {
278 DEBUG (std::cerr << "dequeue: #a == 0, waiting." << std::endl);
279 // wait will automatically unlock() the internal mutex
280 d_readBlock->wait ();
281 // and lock() it here.
283 d_internal->unlock ();
284 DEBUG (std::cerr << "dequeue: #a == 0, aborting." << std::endl);
287 DEBUG (std::cerr << "dequeue: #a == 0, done waiting." << std::endl);
290 if (l_bufLen_I > d_n_avail_read_I)
291 l_bufLen_I = d_n_avail_read_I;
292 size_t n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
293 if (n_now_I > l_bufLen_I)
294 n_now_I = l_bufLen_I;
295 else if (n_now_I < l_bufLen_I)
296 n_start_I = l_bufLen_I - n_now_I;
297 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
299 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
300 d_readNdx_I = n_start_I;
302 d_readNdx_I += n_now_I;
303 *bufLen_I = l_bufLen_I;
304 d_n_avail_read_I -= l_bufLen_I;
305 d_n_avail_write_I += l_bufLen_I;
306 d_writeBlock->signal ();
307 d_internal->unlock ();
314 d_writeBlock->signal ();
315 d_readBlock->signal ();
316 d_internal->unlock ();
320 #endif /* _CIRCULAR_BUFFER_H_ */