Merged r9481:9518 on jblum/grc_reorganize into trunk. Reorganized grc source under...
[debian/gnuradio] / grc / src / grc_gnuradio / blks2 / queue.py
1 # Copyright 2008 Free Software Foundation, Inc.
2 #
3 # This file is part of GNU Radio
4 #
5 # GNU Radio is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3, or (at your option)
8 # any later version.
9 #
10 # GNU Radio is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with GNU Radio; see the file COPYING.  If not, write to
17 # the Free Software Foundation, Inc., 51 Franklin Street,
18 # Boston, MA 02110-1301, USA.
19 #
20
21 from gnuradio import gr
22 import gnuradio.gr.gr_threading as _threading
23 import numpy
24
25 #######################################################################################
26 ## Queue Sink Thread
27 #######################################################################################
28 class queue_sink_thread(_threading.Thread):
29         """
30         Read samples from the queue sink and execute the callback.
31         """
32
33         def __init__(self, queue_sink, callback):
34                 """
35                 Queue sink thread contructor.
36                 @param queue_sink the queue to pop messages from
37                 @param callback the function of one argument
38                 """
39                 self._queue_sink = queue_sink
40                 self._callback = callback
41                 _threading.Thread.__init__(self)
42                 self.setDaemon(1)
43                 self.keep_running = True
44                 self.start()
45
46         def run(self):
47                 while self.keep_running:
48                         self._callback(self._queue_sink.pop())
49
50 #######################################################################################
51 ## Queue Sink
52 #######################################################################################
53 class _queue_sink_base(gr.hier_block2):
54         """
55         Queue sink base, a queue sink for any size queue.
56         Easy read access to a gnuradio data stream from python.
57         Call pop to read a sample from a gnuradio data stream.
58         Samples are cast as python data types, complex, float, or int.
59         """
60
61         def __init__(self, vlen=1):
62                 """
63                 Queue sink base contructor.
64                 @param vlen the vector length
65                 """
66                 self._vlen = vlen
67                 #initialize hier2
68                 gr.hier_block2.__init__(
69                         self,
70                         "queue_sink",
71                         gr.io_signature(1, 1, self._item_size*self._vlen), # Input signature
72                         gr.io_signature(0, 0, 0) # Output signature
73                 )
74                 #create message sink
75                 self._msgq = gr.msg_queue(1)
76                 message_sink = gr.message_sink(self._item_size*self._vlen, self._msgq, False) #False -> blocking
77                 #connect
78                 self.connect(self, message_sink)
79                 self.arr = ''
80
81         def pop(self):
82                 """
83                 Pop a new sample off the front of the queue.
84                 @return a new sample
85                 """
86                 while len(self.arr) < self._item_size*self._vlen:
87                         msg = self._msgq.delete_head()
88                         self.arr = self.arr + msg.to_string()
89                 sample = self.arr[:self._item_size*self._vlen]
90                 self.arr = self.arr[self._item_size*self._vlen:]
91                 sample = map(self._cast, numpy.fromstring(sample, self._numpy))
92                 if self._vlen == 1: return sample[0]
93                 return sample
94
95 class queue_sink_c(_queue_sink_base):
96         _item_size = gr.sizeof_gr_complex
97         _numpy = numpy.complex64
98         def _cast(self, arg): return complex(arg.real, arg.imag)
99
100 class queue_sink_f(_queue_sink_base):
101         _item_size = gr.sizeof_float
102         _numpy = numpy.float32
103         _cast = float
104
105 class queue_sink_i(_queue_sink_base):
106         _item_size = gr.sizeof_int
107         _numpy = numpy.int32
108         _cast = int
109
110 class queue_sink_s(_queue_sink_base):
111         _item_size = gr.sizeof_short
112         _numpy = numpy.int16
113         _cast = int
114
115 class queue_sink_b(_queue_sink_base):
116         _item_size = gr.sizeof_char
117         _numpy = numpy.int8
118         _cast = int
119
120 #######################################################################################
121 ## Queue Source
122 #######################################################################################
123 class _queue_source_base(gr.hier_block2):
124         """
125         Queue source base, a queue source for any size queue.
126         Easy write access to a gnuradio data stream from python.
127         Call push to to write a sample into the gnuradio data stream.
128         """
129
130         def __init__(self, vlen=1):
131                 """
132                 Queue source base contructor.
133                 @param vlen the vector length
134                 """
135                 self._vlen = vlen
136                 #initialize hier2
137                 gr.hier_block2.__init__(
138                         self,
139                         "queue_source",
140                         gr.io_signature(0, 0, 0), # Input signature
141                         gr.io_signature(1, 1, self._item_size*self._vlen) # Output signature
142                 )
143                 #create message sink
144                 message_source = gr.message_source(self._item_size*self._vlen, 1)
145                 self._msgq = message_source.msgq()
146                 #connect
147                 self.connect(message_source, self)
148
149         def push(self, item):
150                 """
151                 Push an item into the back of the queue.
152                 @param item the item
153                 """
154                 if self._vlen == 1: item = [item]
155                 arr = numpy.array(item, self._numpy)
156                 msg = gr.message_from_string(arr.tostring(), 0, self._item_size, self._vlen)
157                 self._msgq.insert_tail(msg)
158
159 class queue_source_c(_queue_source_base):
160         _item_size = gr.sizeof_gr_complex
161         _numpy = numpy.complex64
162
163 class queue_source_f(_queue_source_base):
164         _item_size = gr.sizeof_float
165         _numpy = numpy.float32
166
167 class queue_source_i(_queue_source_base):
168         _item_size = gr.sizeof_int
169         _numpy = numpy.int32
170
171 class queue_source_s(_queue_source_base):
172         _item_size = gr.sizeof_short
173         _numpy = numpy.int16
174
175 class queue_source_b(_queue_source_base):
176         _item_size = gr.sizeof_char
177         _numpy = numpy.int8