Merge branch 'vrt' of http://gnuradio.org/git/jblum
[debian/gnuradio] / gr-audio-osx / src / circular_buffer.h
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2006,2009 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio.
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifndef _CIRCULAR_BUFFER_H_
24 #define _CIRCULAR_BUFFER_H_
25
26 #include "mld_threads.h"
27 #include <iostream>
28 #include <stdexcept>
29
30 #ifndef DO_DEBUG
31 #define DO_DEBUG 0
32 #endif
33
34 #if DO_DEBUG
35 #define DEBUG(X) do{X} while(0);
36 #else
37 #define DEBUG(X) do{} while(0);
38 #endif
39
40 template <class T> class circular_buffer
41 {
42 private:
43 // the buffer to use
44   T* d_buffer;
45
46 // the following are in Items (type T)
47   size_t d_bufLen_I, d_readNdx_I, d_writeNdx_I;
48   size_t d_n_avail_write_I, d_n_avail_read_I;
49
50 // stuff to control access to class internals
51   mld_mutex_ptr d_internal;
52   mld_condition_ptr d_readBlock, d_writeBlock;
53
54 // booleans to decide how to control reading, writing, and aborting
55   bool d_doWriteBlock, d_doFullRead, d_doAbort;
56
57   void delete_mutex_cond () {
58     if (d_internal) {
59       delete d_internal;
60       d_internal = NULL;
61     }
62     if (d_readBlock) {
63       delete d_readBlock;
64       d_readBlock = NULL;
65     }
66     if (d_writeBlock) {
67       delete d_writeBlock;
68       d_writeBlock = NULL;
69     }
70   };
71
72 public:
73   circular_buffer (size_t bufLen_I,
74                    bool doWriteBlock = true, bool doFullRead = false) {
75     if (bufLen_I == 0)
76       throw std::runtime_error ("circular_buffer(): "
77                                 "Number of items to buffer must be > 0.\n");
78     d_bufLen_I = bufLen_I;
79     d_buffer = (T*) new T[d_bufLen_I];
80     d_doWriteBlock = doWriteBlock;
81     d_doFullRead = doFullRead;
82     d_internal = NULL;
83     d_readBlock = d_writeBlock = NULL;
84     reset ();
85     DEBUG (std::cerr << "c_b(): buf len (items) = " << d_bufLen_
86            << ", doWriteBlock = " << (d_doWriteBlock ? "true" : "false")
87            << ", doFullRead = " << (d_doFullRead ? "true" : "false")
88            << std::endl);
89   };
90
91   ~circular_buffer () {
92     delete_mutex_cond ();
93     delete [] d_buffer;
94   };
95
96   inline size_t n_avail_write_items () {
97     d_internal->lock ();
98     size_t retVal = d_n_avail_write_I;
99     d_internal->unlock ();
100     return (retVal);
101   };
102
103   inline size_t n_avail_read_items () {
104     d_internal->lock ();
105     size_t retVal = d_n_avail_read_I;
106     d_internal->unlock ();
107     return (retVal);
108   };
109
110   inline size_t buffer_length_items () {return (d_bufLen_I);};
111   inline bool do_write_block () {return (d_doWriteBlock);};
112   inline bool do_full_read () {return (d_doFullRead);};
113
114   void reset () {
115     d_doAbort = false;
116     bzero (d_buffer, d_bufLen_I * sizeof (T));
117     d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
118     d_n_avail_write_I = d_bufLen_I;
119     delete_mutex_cond ();
120     // create a mutex to handle contention of shared resources;
121     // any routine needed access to shared resources uses lock()
122     // before doing anything, then unlock() when finished.
123     d_internal = new mld_mutex ();
124     // link the internal mutex to the read and write conditions;
125     // when wait() is called, the internal mutex will automatically
126     // be unlock()'ed.  Upon return (from a signal() to the condition),
127     // the internal mutex will be lock()'ed.
128     d_readBlock = new mld_condition (d_internal);
129     d_writeBlock = new mld_condition (d_internal);
130   };
131
132 /*
133  * enqueue: add the given buffer of item-length to the queue,
134  *     first-in-first-out (FIFO).
135  *
136  * inputs:
137  *     buf: a pointer to the buffer holding the data
138  *
139  *     bufLen_I: the buffer length in items (of the instantiated type)
140  *
141  * returns:
142  *    -1: on overflow (write is not blocking, and data is being
143  *                     written faster than it is being read)
144  *     0: if nothing to do (0 length buffer)
145  *     1: if success
146  *     2: in the process of aborting, do doing nothing
147  *
148  * will throw runtime errors if inputs are improper:
149  *     buffer pointer is NULL
150  *     buffer length is larger than the instantiated buffer length
151  */
152
153   int enqueue (T* buf, size_t bufLen_I) {
154     DEBUG (std::cerr << "enqueue: buf = " << (void*) buf
155            << ", bufLen = " << bufLen_I
156            << ", #av_wr = " << d_n_avail_write_I
157            << ", #av_rd = " << d_n_avail_read_I << std::endl);
158     if (bufLen_I > d_bufLen_I) {
159       std::cerr << "ERROR: cannot add buffer longer ("
160                 << bufLen_I << ") than instantiated length ("
161                 << d_bufLen_I << ")." << std::endl;
162       throw std::runtime_error ("circular_buffer::enqueue()");
163     }
164
165     if (bufLen_I == 0)
166       return (0);
167     if (!buf)
168       throw std::runtime_error ("circular_buffer::enqueue(): "
169                                 "input buffer is NULL.\n");
170     d_internal->lock ();
171     if (d_doAbort) {
172       d_internal->unlock ();
173       return (2);
174     }
175     // set the return value to 1: success; change if needed
176     int retval = 1;
177     if (bufLen_I > d_n_avail_write_I) {
178       if (d_doWriteBlock) {
179         while (bufLen_I > d_n_avail_write_I) {
180           DEBUG (std::cerr << "enqueue: #len > #a, waiting." << std::endl);
181           // wait will automatically unlock() the internal mutex
182           d_writeBlock->wait ();
183           // and lock() it here.
184           if (d_doAbort) {
185             d_internal->unlock ();
186             DEBUG (std::cerr << "enqueue: #len > #a, aborting." << std::endl);
187             return (2);
188           }
189           DEBUG (std::cerr << "enqueue: #len > #a, done waiting." << std::endl);
190         }
191       } else {
192         d_n_avail_read_I = d_bufLen_I - bufLen_I;
193         d_n_avail_write_I = bufLen_I;
194         DEBUG (std::cerr << "circular_buffer::enqueue: overflow" << std::endl);
195         retval = -1;
196       }
197     }
198     size_t n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
199     if (n_now_I > bufLen_I)
200       n_now_I = bufLen_I;
201     else if (n_now_I < bufLen_I)
202       n_start_I = bufLen_I - n_now_I;
203     bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
204     if (n_start_I) {
205       bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
206       d_writeNdx_I = n_start_I;
207     } else
208       d_writeNdx_I += n_now_I;
209     d_n_avail_read_I += bufLen_I;
210     d_n_avail_write_I -= bufLen_I;
211     d_readBlock->signal ();
212     d_internal->unlock ();
213     return (retval);
214   };
215
216 /*
217  * dequeue: removes from the queue the number of items requested, or
218  *     available, into the given buffer on a FIFO basis.
219  *
220  * inputs:
221  *     buf: a pointer to the buffer into which to copy the data
222  *
223  *     bufLen_I: pointer to the number of items to remove in items
224  *         (of the instantiated type)
225  *
226  * returns:
227  *     0: if nothing to do (0 length buffer)
228  *     1: if success
229  *     2: in the process of aborting, do doing nothing
230  *
231  * will throw runtime errors if inputs are improper:
232  *     buffer pointer is NULL
233  *     buffer length pointer is NULL
234  *     buffer length is larger than the instantiated buffer length
235  */
236
237   int dequeue (T* buf, size_t* bufLen_I) {
238     DEBUG (std::cerr << "dequeue: buf = " << ((void*) buf)
239            << ", *bufLen = " << (*bufLen_I)
240            << ", #av_wr = " <<  d_n_avail_write_I
241            << ", #av_rd = " << d_n_avail_read_I << std::endl);
242     if (!bufLen_I)
243       throw std::runtime_error ("circular_buffer::dequeue(): "
244                                 "input bufLen pointer is NULL.\n");
245     if (!buf)
246       throw std::runtime_error ("circular_buffer::dequeue(): "
247                                 "input buffer pointer is NULL.\n");
248     size_t l_bufLen_I = *bufLen_I;
249     if (l_bufLen_I == 0)
250       return (0);
251     if (l_bufLen_I > d_bufLen_I) {
252       std::cerr << "ERROR: cannot remove buffer longer ("
253                 << l_bufLen_I << ") than instantiated length ("
254                 << d_bufLen_I << ")." << std::endl;
255       throw std::runtime_error ("circular_buffer::dequeue()");
256     }
257
258     d_internal->lock ();
259     if (d_doAbort) {
260       d_internal->unlock ();
261       return (2);
262     }
263     if (d_doFullRead) {
264       while (d_n_avail_read_I < l_bufLen_I) {
265         DEBUG (std::cerr << "dequeue: #a < #len, waiting." << std::endl);
266         // wait will automatically unlock() the internal mutex
267         d_readBlock->wait ();
268         // and lock() it here.
269         if (d_doAbort) {
270           d_internal->unlock ();
271           DEBUG (std::cerr << "dequeue: #a < #len, aborting." << std::endl);
272           return (2);
273         }
274         DEBUG (std::cerr << "dequeue: #a < #len, done waiting." << std::endl);
275      }
276     } else {
277       while (d_n_avail_read_I == 0) {
278         DEBUG (std::cerr << "dequeue: #a == 0, waiting." << std::endl);
279         // wait will automatically unlock() the internal mutex
280         d_readBlock->wait ();
281         // and lock() it here.
282         if (d_doAbort) {
283           d_internal->unlock ();
284           DEBUG (std::cerr << "dequeue: #a == 0, aborting." << std::endl);
285           return (2);
286         }
287         DEBUG (std::cerr << "dequeue: #a == 0, done waiting." << std::endl);
288       }
289     }
290     if (l_bufLen_I > d_n_avail_read_I)
291       l_bufLen_I = d_n_avail_read_I;
292     size_t n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
293     if (n_now_I > l_bufLen_I)
294       n_now_I = l_bufLen_I;
295     else if (n_now_I < l_bufLen_I)
296       n_start_I = l_bufLen_I - n_now_I;
297     bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
298     if (n_start_I) {
299       bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
300       d_readNdx_I = n_start_I;
301     } else
302       d_readNdx_I += n_now_I;
303     *bufLen_I = l_bufLen_I;
304     d_n_avail_read_I -= l_bufLen_I;
305     d_n_avail_write_I += l_bufLen_I;
306     d_writeBlock->signal ();
307     d_internal->unlock ();
308     return (1);
309   };
310
311   void abort () {
312     d_internal->lock ();
313     d_doAbort = true;
314     d_writeBlock->signal ();
315     d_readBlock->signal ();
316     d_internal->unlock ();
317   };
318 };
319
320 #endif /* _CIRCULAR_BUFFER_H_ */