3 * Copyright 2006 Free Software Foundation, Inc.
5 * This file is part of GNU Radio
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2, or (at your option)
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
27 #include <gr_udp_source.h>
28 #include <gr_io_signature.h>
30 #include <sys/types.h>
37 gr_udp_source::gr_udp_source(size_t itemsize, const char *ipaddr,
38 unsigned short port, unsigned int mtu)
39 : gr_sync_block ("udp_source",
40 gr_make_io_signature(0, 0, 0),
41 gr_make_io_signature(1, 1, itemsize)),
42 d_itemsize(itemsize), d_updated(false), d_mtu(mtu)
44 // Set up the address stucture for the local address and port numbers
45 inet_aton(ipaddr, &d_ipaddr_local); // format IP address
46 d_port_local = htons(port); // format port number
48 d_sockaddr_local.sin_family = AF_INET;
49 d_sockaddr_local.sin_addr = d_ipaddr_local;
50 d_sockaddr_local.sin_port = d_port_local;
56 gr_make_udp_source (size_t itemsize, const char *ipaddr,
57 unsigned short port, unsigned int mtu)
59 return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr,
63 gr_udp_source::~gr_udp_source ()
71 omni_mutex_lock l(d_mutex); // hold mutex for duration of this function
74 d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
76 perror("socket open");
77 throw std::runtime_error("can't open socket");
80 // Turn on reuse address
82 if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (void*)&opt_val, sizeof(int))) {
83 perror("SO_REUSEADDR");
84 throw std::runtime_error("can't set socket option SO_REUSEADDR");
87 // Don't wait when shutting down
91 if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (void*)&lngr, sizeof(linger))) {
93 throw std::runtime_error("can't set socket option SO_LINGER");
96 // Set a timeout on the receive function to not block indefinitely
97 // This value can (and probably should) be changed
101 if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (void*)&timeout, sizeof(timeout))) {
102 perror("SO_RCVTIMEO");
103 throw std::runtime_error("can't set socket option SO_RCVTIMEO");
106 // bind socket to an address and port number to listen on
107 if(bind (d_socket, (sockaddr*)&d_sockaddr_local, sizeof(struct sockaddr))) {
108 perror("socket bind");
109 throw std::runtime_error("can't bind socket");
113 return d_socket != 0;
117 gr_udp_source::close()
119 omni_mutex_lock l(d_mutex); // hold mutex for duration of this function
122 shutdown(d_socket, SHUT_RDWR);
129 gr_udp_source::work (int noutput_items,
130 gr_vector_const_void_star &input_items,
131 gr_vector_void_star &output_items)
133 char *out = (char *) output_items[0];
134 socklen_t bytes_to_receive=0, bytes_received=0;
137 while((bytes_received < (unsigned)noutput_items) && (bytes>-1)) {
138 // caclulate the number of byte left if we can fit in all d_mtu bytes
139 bytes_to_receive = (bytes_received+d_mtu < noutput_items ?
140 d_mtu : noutput_items-bytes_received);
142 // get the data into our output buffer and record the number of bytes
143 // This is a blocking call, but it's timeout has been set in the constructor
144 bytes = recv(d_socket, out, bytes_to_receive, 0);
147 // keep track of the total number of bytes received
148 bytes_received += bytes;
150 // increment the pointer
156 printf("\nTotal Bytes Received: %d (noutput_items=%d)\n", bytes_received, noutput_items);
159 return int(bytes_received / d_itemsize);