6ed211b9ae31ea3d39f813045ac5d9f571a5715c
[debian/gnuradio] / vrt / lib / socket_rx_buffer.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2008,2009 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include "socket_rx_buffer.h"
27 #include "data_handler.h"
28 #include <linux/if_packet.h>
29 #include <sys/socket.h>
30 #include <sys/mman.h>
31 #include <sys/poll.h>
32 #include <iostream>
33 #include <cmath>
34 #include <errno.h>
35 #include <stdexcept>
36 #include <string.h>
37 #include <fcntl.h>
38 #include <cstdio>
39
40
41 #define SOCKET_RX_BUFFER_DEBUG      1 // define to 0 or 1
42 #if SOCKET_RX_BUFFER_DEBUG
43 #define DEBUG_LOG(x) ::write(2, (x), 1)
44 #else
45 #define DEBUG_LOG(X)
46 #endif
47
48 #define DEFAULT_MEM_SIZE 62.5e6 // ~0.5s @ 125 MB/s
49 #define MAX_MEM_SIZE     1000e6 // ~10.00s @ 100 MB/s. 
50 #define MAX_SLAB_SIZE    131072 // 128 KB (FIXME fish out of /proc/slabinfo)
51
52
53 namespace vrt {
54
55   const unsigned int socket_rx_buffer::MAX_PKTLEN = 8192;
56   const unsigned int socket_rx_buffer::MIN_PKTLEN = 64;
57   
58   socket_rx_buffer::socket_rx_buffer(int socket_fd, size_t rx_bufsize)
59     : d_fd(socket_fd), d_using_tpring(false), d_buflen(0), d_buf(0), d_frame_nr(0),
60       d_frame_size(0), d_head(0), d_ring(0)
61   {
62     if (rx_bufsize == 0)
63       d_buflen = (size_t)DEFAULT_MEM_SIZE;
64     else
65       d_buflen = std::min((size_t)MAX_MEM_SIZE, rx_bufsize);
66
67     if (!open()){
68       throw std::runtime_error("socket_rx_buffer::open failed");
69     }
70   }
71
72   socket_rx_buffer::~socket_rx_buffer()
73   {
74     close();
75   }
76   
77   bool 
78   socket_rx_buffer::open()
79   {
80     if (try_packet_ring()){
81       d_using_tpring = true;
82       // fprintf(stderr, "socket_rx_buffer: using memory mapped interface\n");
83     }
84     else {
85       d_using_tpring = false;
86       // fprintf(stderr, "socket_rx_buffer: NOT using memory mapped interface\n");
87
88       // Increase socket buffer if possible
89
90       int rcvbuf_size = d_buflen;
91 #if defined(SO_RCVBUFFORCE)
92       if (setsockopt(d_fd, SOL_SOCKET, SO_RCVBUFFORCE, &rcvbuf_size, sizeof(rcvbuf_size)) != 0){
93         perror("setsockopt(SO_RCVBUFFORCE)");
94         fprintf(stderr, "Are you running as root?  If not, please do.\n");
95       }
96       else {
97         fprintf(stderr, "SO_RCVBUFFORCE = %zd\n", d_buflen);
98       }
99 #endif
100     }
101
102     return true;
103   }
104
105   bool
106   socket_rx_buffer::try_packet_ring()
107   {
108     struct tpacket_req req;
109     size_t page_size = getpagesize();
110
111     // Calculate minimum power-of-two aligned size for frames
112     req.tp_frame_size =
113       (unsigned int)rint(pow(2, ceil(log2(TPACKET_ALIGN(TPACKET_HDRLEN)+TPACKET_ALIGN(MAX_PKTLEN)))));
114     d_frame_size = req.tp_frame_size;
115
116     // Calculate minimum contiguous pages needed to enclose a frame
117     int npages = (page_size > req.tp_frame_size) ? 1 : ((req.tp_frame_size+page_size-1)/page_size);
118     req.tp_block_size = page_size << (int)ceil(log2(npages));
119
120     // Calculate number of blocks
121     req.tp_block_nr = (int)(d_buflen/req.tp_block_size);
122                                
123
124     // Recalculate buffer length
125     d_buflen = req.tp_block_nr*req.tp_block_size;
126
127     // Finally, calculate total number of frames.  Since frames, blocks,
128     // and pages are all power-of-two aligned, frames are contiguous
129     req.tp_frame_nr = d_buflen/req.tp_frame_size;
130     d_frame_nr = req.tp_frame_nr;
131
132 #if 0
133     if (SOCKET_RX_BUFFER_DEBUG)
134       std::cerr << "socket_rx_buffer:" 
135                 << " frame_size=" << req.tp_frame_size
136                 << " block_size=" << req.tp_block_size
137                 << " block_nr=" << req.tp_block_nr
138                 << " frame_nr=" << req.tp_frame_nr
139                 << " buflen=" << d_buflen
140                 << std::endl;
141 #endif
142
143     // Try to get kernel shared memory buffer
144     if (setsockopt(d_fd, SOL_PACKET, PACKET_RX_RING, (void *)&req, sizeof(req)) != 0){
145       // perror("socket_rx_buffer: setsockopt");
146       return false;
147     }
148
149     void *p = mmap(0, d_buflen, PROT_READ|PROT_WRITE, MAP_SHARED, d_fd, 0);
150     if (p == MAP_FAILED){
151       perror("socket_rx_buffer: mmap");
152       return false;
153     }
154     d_buf = (uint8_t *) p;
155
156     // Initialize our pointers into the packet ring
157     d_ring.resize(req.tp_frame_nr);
158     for (unsigned int i=0; i < req.tp_frame_nr; i++)
159       d_ring[i] = (uint8_t *)(d_buf+i*req.tp_frame_size);
160
161     return true;
162   }
163
164   bool
165   socket_rx_buffer::close()
166   {
167     return true;
168   }
169
170   inline bool
171   socket_rx_buffer::frame_available()
172   {
173     return (((tpacket_hdr *)d_ring[d_head])->tp_status != TP_STATUS_KERNEL);
174   }
175   
176   socket_rx_buffer::result
177   socket_rx_buffer::rx_frames(data_handler *f, int timeout_in_ms)
178   {
179     if (!d_using_tpring){
180
181       // ----------------------------------------------------------------
182       // Use recv instead of kernel Rx packet ring
183       // ----------------------------------------------------------------
184
185       unsigned char buf[MAX_PKTLEN];
186       bool dont_wait = timeout_in_ms == 0;      // FIXME treating timeout as 0 or inf
187       int flags = dont_wait ? MSG_DONTWAIT : 0;
188
189       ssize_t rr = recv(d_fd, buf, sizeof(buf), flags);
190       if (rr == -1){            // error?
191         if (errno == EAGAIN){   // non-blocking, nothing there
192           return EB_WOULD_BLOCK;
193         }
194         perror("rx_frames: recv");
195         return EB_ERROR;
196       }
197
198       // Got first packet.  Call handler
199
200       data_handler::result r = (*f)(buf, rr);
201       if (r & data_handler::DONE)
202         return EB_OK;
203
204       // Now do as many as we can without blocking
205
206       while (1){
207         rr = recv(d_fd, buf, sizeof(buf), MSG_DONTWAIT);
208         if (rr == -1){          // error?
209           if (errno == EAGAIN)  // non-blocking, nothing there
210             return EB_OK;       // return OK; we've processed >= 1 packets
211           perror("rx_frames: recv");
212           return EB_ERROR;
213         }
214         
215         r = (*f)(buf, rr);
216         if (r & data_handler::DONE)
217           break;
218       }
219       return EB_OK;
220     }
221
222     // ----------------------------------------------------------------
223     // Use kernel Rx packet ring
224     // ----------------------------------------------------------------
225
226     DEBUG_LOG("\n");
227       
228     while (!frame_available()) {
229       if (timeout_in_ms == 0) {
230         DEBUG_LOG("w");
231         return EB_WOULD_BLOCK;
232       }
233       
234       struct pollfd pfd;
235       pfd.fd = d_fd;
236       pfd.revents = 0;
237       pfd.events = POLLIN;
238
239       // DEBUG_LOG("P");
240
241       int pres = poll(&pfd, 1, timeout_in_ms);
242       if (pres == -1) {
243         perror("poll");
244         return EB_ERROR;
245       }
246
247       if (pres == 0) {
248         DEBUG_LOG("t");
249         return EB_TIMED_OUT;
250       }
251     }
252
253     // Iterate through available packets
254     while (frame_available()) {
255       // Get start of ethernet frame and length
256       tpacket_hdr *hdr = (tpacket_hdr *)d_ring[d_head];
257       void *base = (uint8_t *)hdr+hdr->tp_mac;
258       size_t len = hdr->tp_len;
259       
260       if (1)
261         fprintf(stderr, "socket_rx_buffer: base = %p  tp_mac = %3d  tp_net = %3d\n",
262                 base, hdr->tp_mac, hdr->tp_net);
263
264       // Invoke data handler
265       data_handler::result r = (*f)(base, len);
266       hdr->tp_status = TP_STATUS_KERNEL; // mark it free
267
268       inc_head();
269
270       if (r & data_handler::DONE)
271         break;
272     }
273
274     DEBUG_LOG("|");
275     return EB_OK;
276   }
277
278 } // namespace vrt