3 * Copyright 2006 Free Software Foundation, Inc.
5 * This file is part of GNU Radio.
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
23 #ifndef _CIRCULAR_BUFFER_H_
24 #define _CIRCULAR_BUFFER_H_
26 #include "mld_threads.h"
31 template <class T> class circular_buffer
37 // the following are in Items (type T)
38 UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I;
39 UInt32 d_n_avail_write_I, d_n_avail_read_I;
41 // stuff to control access to class internals
42 mld_mutex_ptr d_internal;
43 mld_condition_ptr d_readBlock, d_writeBlock;
45 // booleans to decide how to control reading, writing, and aborting
46 bool d_doWriteBlock, d_doFullRead, d_doAbort;
48 void delete_mutex_cond () {
64 circular_buffer (UInt32 bufLen_I,
65 bool doWriteBlock = true, bool doFullRead = false) {
67 throw std::runtime_error ("circular_buffer(): "
68 "Number of items to buffer must be > 0.\n");
69 d_bufLen_I = bufLen_I;
70 d_buffer = (T*) new T[d_bufLen_I];
71 d_doWriteBlock = doWriteBlock;
72 d_doFullRead = doFullRead;
74 d_readBlock = d_writeBlock = NULL;
77 fprintf (stderr, "c_b(): buf len (items) = %ld, "
78 "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
79 (d_doWriteBlock ? "true" : "false"),
80 (d_doFullRead ? "true" : "false"));
89 inline UInt32 n_avail_write_items () {
91 UInt32 retVal = d_n_avail_write_I;
92 d_internal->unlock ();
96 inline UInt32 n_avail_read_items () {
98 UInt32 retVal = d_n_avail_read_I;
99 d_internal->unlock ();
103 inline UInt32 buffer_length_items () {return (d_bufLen_I);};
104 inline bool do_write_block () {return (d_doWriteBlock);};
105 inline bool do_full_read () {return (d_doFullRead);};
109 bzero (d_buffer, d_bufLen_I * sizeof (T));
110 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
111 d_n_avail_write_I = d_bufLen_I;
112 delete_mutex_cond ();
113 d_internal = new mld_mutex ();
114 d_readBlock = new mld_condition ();
115 d_writeBlock = new mld_condition ();
119 * enqueue: add the given buffer of item-length to the queue,
120 * first-in-first-out (FIFO).
123 * buf: a pointer to the buffer holding the data
125 * bufLen_I: the buffer length in items (of the instantiated type)
128 * -1: on overflow (write is not blocking, and data is being
129 * written faster than it is being read)
130 * 0: if nothing to do (0 length buffer)
132 * 2: in the process of aborting, do doing nothing
134 * will throw runtime errors if inputs are improper:
135 * buffer pointer is NULL
136 * buffer length is larger than the instantiated buffer length
139 int enqueue (T* buf, UInt32 bufLen_I) {
141 fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
142 "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
143 d_n_avail_write_I, d_n_avail_read_I);
145 if (bufLen_I > d_bufLen_I) {
146 fprintf (stderr, "cannot add buffer longer (%ld"
147 ") than instantiated length (%ld"
148 ").\n", bufLen_I, d_bufLen_I);
149 throw std::runtime_error ("circular_buffer::enqueue()");
155 throw std::runtime_error ("circular_buffer::enqueue(): "
156 "input buffer is NULL.\n");
159 d_internal->unlock ();
162 // set the return value to 1: success; change if needed
164 if (bufLen_I > d_n_avail_write_I) {
165 if (d_doWriteBlock) {
166 while (bufLen_I > d_n_avail_write_I) {
168 fprintf (stderr, "enqueue: #len > #a, waiting.\n");
170 d_internal->unlock ();
171 d_writeBlock->wait ();
174 d_internal->unlock ();
176 fprintf (stderr, "enqueue: #len > #a, aborting.\n");
181 fprintf (stderr, "enqueue: #len > #a, done waiting.\n");
185 d_n_avail_read_I = d_bufLen_I - bufLen_I;
186 d_n_avail_write_I = bufLen_I;
188 fprintf (stderr, "circular_buffer::enqueue: overflow\n");
193 UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
194 if (n_now_I > bufLen_I)
196 else if (n_now_I < bufLen_I)
197 n_start_I = bufLen_I - n_now_I;
198 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
200 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
201 d_writeNdx_I = n_start_I;
203 d_writeNdx_I += n_now_I;
204 d_n_avail_read_I += bufLen_I;
205 d_n_avail_write_I -= bufLen_I;
206 d_readBlock->signal ();
207 d_internal->unlock ();
212 * dequeue: removes from the queue the number of items requested, or
213 * available, into the given buffer on a FIFO basis.
216 * buf: a pointer to the buffer into which to copy the data
218 * bufLen_I: pointer to the number of items to remove in items
219 * (of the instantiated type)
222 * 0: if nothing to do (0 length buffer)
224 * 2: in the process of aborting, do doing nothing
226 * will throw runtime errors if inputs are improper:
227 * buffer pointer is NULL
228 * buffer length pointer is NULL
229 * buffer length is larger than the instantiated buffer length
232 int dequeue (T* buf, UInt32* bufLen_I) {
234 fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
235 "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
236 d_n_avail_write_I, d_n_avail_read_I);
239 throw std::runtime_error ("circular_buffer::dequeue(): "
240 "input bufLen pointer is NULL.\n");
242 throw std::runtime_error ("circular_buffer::dequeue(): "
243 "input buffer pointer is NULL.\n");
244 UInt32 l_bufLen_I = *bufLen_I;
247 if (l_bufLen_I > d_bufLen_I) {
248 fprintf (stderr, "cannot remove buffer longer (%ld"
249 ") than instantiated length (%ld"
250 ").\n", l_bufLen_I, d_bufLen_I);
251 throw std::runtime_error ("circular_buffer::dequeue()");
256 d_internal->unlock ();
260 while (d_n_avail_read_I < l_bufLen_I) {
262 fprintf (stderr, "dequeue: #a < #len, waiting.\n");
264 d_internal->unlock ();
265 d_readBlock->wait ();
268 d_internal->unlock ();
270 fprintf (stderr, "dequeue: #a < #len, aborting.\n");
275 fprintf (stderr, "dequeue: #a < #len, done waiting.\n");
279 while (d_n_avail_read_I == 0) {
281 fprintf (stderr, "dequeue: #a == 0, waiting.\n");
283 d_internal->unlock ();
284 d_readBlock->wait ();
287 d_internal->unlock ();
289 fprintf (stderr, "dequeue: #a == 0, aborting.\n");
294 fprintf (stderr, "dequeue: #a == 0, done waiting.\n");
298 if (l_bufLen_I > d_n_avail_read_I)
299 l_bufLen_I = d_n_avail_read_I;
300 UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
301 if (n_now_I > l_bufLen_I)
302 n_now_I = l_bufLen_I;
303 else if (n_now_I < l_bufLen_I)
304 n_start_I = l_bufLen_I - n_now_I;
305 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
307 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
308 d_readNdx_I = n_start_I;
310 d_readNdx_I += n_now_I;
311 *bufLen_I = l_bufLen_I;
312 d_n_avail_read_I -= l_bufLen_I;
313 d_n_avail_write_I += l_bufLen_I;
314 d_writeBlock->signal ();
315 d_internal->unlock ();
322 d_writeBlock->signal ();
323 d_readBlock->signal ();
324 d_internal->unlock ();
328 #endif /* _CIRCULAR_BUFFER_H_ */