updated copyright info, and snuck a little something else in (you didn't see anything...)
[debian/gnuradio] / gnuradio-core / src / lib / io / gr_udp_source.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2006 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gr_udp_source.h>
28 #include <gr_io_signature.h>
29 #include <cstdio>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <fcntl.h>
33 #include <stdexcept>
34
35 #define SRC_VERBOSE 0
36
37 gr_udp_source::gr_udp_source(size_t itemsize, const char *ipaddr, 
38                              unsigned short port, unsigned int mtu)
39   : gr_sync_block ("udp_source",
40                    gr_make_io_signature(0, 0, 0),
41                    gr_make_io_signature(1, 1, itemsize)),
42     d_itemsize(itemsize), d_updated(false), d_mtu(mtu)
43 {
44   // Set up the address stucture for the local address and port numbers
45   inet_aton(ipaddr, &d_ipaddr_local);     // format IP address
46   d_port_local = htons(port);             // format port number
47   
48   d_sockaddr_local.sin_family = AF_INET;
49   d_sockaddr_local.sin_addr   = d_ipaddr_local;
50   d_sockaddr_local.sin_port   = d_port_local;
51   
52   open();
53 }
54
55 gr_udp_source_sptr
56 gr_make_udp_source (size_t itemsize, const char *ipaddr, 
57                     unsigned short port, unsigned int mtu)
58 {
59   return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
60                                                 port, mtu));
61 }
62
63 gr_udp_source::~gr_udp_source ()
64 {
65   close();
66 }
67
68 bool
69 gr_udp_source::open()
70 {
71   omni_mutex_lock l(d_mutex);   // hold mutex for duration of this function
72    
73   // create socket
74   d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
75   if(d_socket == 0) {
76     perror("socket open");
77     throw std::runtime_error("can't open socket");
78   }
79
80   // Turn on reuse address
81   bool opt_val = true;
82   if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (void*)&opt_val, sizeof(int))) {
83     perror("SO_REUSEADDR");
84     throw std::runtime_error("can't set socket option SO_REUSEADDR");
85   }
86
87   // Don't wait when shutting down
88   linger lngr;
89   lngr.l_onoff  = 1;
90   lngr.l_linger = 0;
91   if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (void*)&lngr, sizeof(linger))) {
92     perror("SO_LINGER");
93     throw std::runtime_error("can't set socket option SO_LINGER");
94   }
95
96   // Set a timeout on the receive function to not block indefinitely
97   // This value can (and probably should) be changed
98   timeval timeout;
99   timeout.tv_sec = 1;
100   timeout.tv_usec = 0;
101   if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (void*)&timeout, sizeof(timeout))) {
102     perror("SO_RCVTIMEO");
103     throw std::runtime_error("can't set socket option SO_RCVTIMEO");
104   }
105
106   // bind socket to an address and port number to listen on
107   if(bind (d_socket, (sockaddr*)&d_sockaddr_local, sizeof(struct sockaddr))) {
108     perror("socket bind");
109     throw std::runtime_error("can't bind socket");
110   }
111   
112   d_updated = true;
113   return d_socket != 0;
114 }
115
116 void
117 gr_udp_source::close()
118 {
119   omni_mutex_lock l(d_mutex);   // hold mutex for duration of this function
120
121   if (d_socket){
122     shutdown(d_socket, SHUT_RDWR);
123     d_socket = 0;
124   }
125   d_updated = true;
126 }
127
128 int 
129 gr_udp_source::work (int noutput_items,
130                      gr_vector_const_void_star &input_items,
131                      gr_vector_void_star &output_items)
132 {
133   char *out = (char *) output_items[0];
134   socklen_t bytes_to_receive=0, bytes_received=0;
135   int bytes=0;
136
137   while((bytes_received < (unsigned)noutput_items) && (bytes>-1)) {
138     // caclulate the number of byte left if we can fit in all d_mtu bytes
139     bytes_to_receive = (bytes_received+d_mtu < noutput_items ? 
140                         d_mtu : noutput_items-bytes_received);
141     
142     // get the data into our output buffer and record the number of bytes
143     // This is a blocking call, but it's timeout has been set in the constructor
144     bytes = recv(d_socket, out, bytes_to_receive, 0);
145
146     if(bytes > 0) {
147       // keep track of the total number of bytes received
148       bytes_received += bytes;
149
150       // increment the pointer
151       out += bytes;
152     }
153   }
154
155   #if SRC_VERBOSE
156   printf("\nTotal Bytes Received: %d (noutput_items=%d)\n", bytes_received, noutput_items); 
157   #endif
158
159   return int(bytes_received / d_itemsize);
160 }