#include <errno.h>
#include <stdio.h>
#include <string.h>
+
#if defined(HAVE_NETDB_H)
#include <netdb.h>
+#ifdef HAVE_SYS_TYPES_H
+#include <sys/types.h>
+#endif
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
typedef void* optval_t;
+
+// ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order
+#if defined(HAVE_NETINET_IN_H)
+#include <netinet/in.h>
+#endif
+#if defined(HAVE_ARPA_INET_H)
+#include <arpa/inet.h>
+#endif
+
#elif defined(HAVE_WINDOWS_H)
// if not posix, assume winsock
#define USING_WINSOCK
#endif
}
-static void report_error( char *msg1, char *msg2 )
+static void report_error( const char *msg1, const char *msg2 )
{
// Deal with errors, both posix and winsock
#if defined(USING_WINSOCK)
return;
}
-gr_udp_source::gr_udp_source(size_t itemsize, const char *src,
- unsigned short port_src, int payload_size,
- bool wait)
+gr_udp_source::gr_udp_source(size_t itemsize, const char *host,
+ unsigned short port, int payload_size,
+ bool eof, bool wait)
: gr_sync_block ("udp_source",
gr_make_io_signature(0, 0, 0),
gr_make_io_signature(1, 1, itemsize)),
- d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0)
+ d_itemsize(itemsize), d_payload_size(payload_size),
+ d_eof(eof), d_wait(wait), d_socket(-1), d_residual(0), d_temp_offset(0)
{
int ret = 0;
- struct addrinfo *ip_src; // store the source IP address to use
#if defined(USING_WINSOCK) // for Windows (with MinGW)
// initialize winsock DLL
// Set up the address stucture for the source address and port numbers
// Get the source IP address from the host name
+ struct addrinfo *ip_src; // store the source IP address to use
struct addrinfo hints;
memset( (void*)&hints, 0, sizeof(hints) );
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
- char port_str[7];
- sprintf( port_str, "%d", port_src );
- ret = getaddrinfo( src, port_str, &hints, &ip_src );
+ hints.ai_flags = AI_PASSIVE;
+ char port_str[12];
+ sprintf( port_str, "%d", port );
+
+ // FIXME leaks if report_error throws below
+ ret = getaddrinfo( host, port_str, &hints, &ip_src );
if( ret != 0 )
report_error("gr_udp_source/getaddrinfo",
"can't initialize source socket" );
+ // FIXME leaks if report_error throws below
d_temp_buff = new char[d_payload_size]; // allow it to hold up to payload_size bytes
// create socket
gr_udp_source_sptr
gr_make_udp_source (size_t itemsize, const char *ipaddr,
- unsigned short port, int payload_size, bool wait)
+ unsigned short port, int payload_size, bool eof, bool wait)
{
return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr,
- port, payload_size, wait));
+ port, payload_size, eof, wait));
}
gr_udp_source::~gr_udp_source ()
{
delete [] d_temp_buff;
- if (d_socket){
+ if (d_socket != -1){
shutdown(d_socket, SHUT_RDWR);
#if defined(USING_WINSOCK)
closesocket(d_socket);
#else
::close(d_socket);
#endif
- d_socket = 0;
+ d_socket = -1;
}
#if defined(USING_WINSOCK) // for Windows (with MinGW)
// Update indexing of amount of bytes left in the buffer
d_residual -= nbytes;
- d_temp_offset = d_temp_offset+d_residual;
+ d_temp_offset += nbytes;
+
+ // Return now with what we've got.
+ assert(nbytes % d_itemsize == 0);
+ return nbytes/d_itemsize;
}
-#if USE_SELECT
- // Use select() to determine when socket is readable
- fd_set readfds;
- timeval timeout;
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
-#endif
-
while(1) {
// get the data into our output buffer and record the number of bytes
#if USE_SELECT
// RCV_TIMEO doesn't work on all systems (e.g., Cygwin)
// use select() instead of, or in addition to RCV_TIMEO
+ fd_set readfds;
+ timeval timeout;
+ timeout.tv_sec = 1; // Init timeout each iteration. Select can modify it.
+ timeout.tv_usec = 0;
FD_ZERO(&readfds);
FD_SET(d_socket, &readfds);
r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
// This is a non-blocking call with a timeout set in the constructor
r = recv(d_socket, d_temp_buff, d_payload_size, 0); // get the entire payload or the what's available
+ // If r > 0, round it down to a multiple of d_itemsize
+ // (If sender is broken, don't propagate problem)
+ if (r > 0)
+ r = (r/d_itemsize) * d_itemsize;
+
// Check if there was a problem; forget it if the operation just timed out
if(r == -1) {
if( is_error(EAGAIN) ) { // handle non-blocking call timeout
return -1;
}
}
+ else if(r==0) {
+ if(d_eof) {
+ // zero-length packet interpreted as EOF
+
+ #if SNK_VERBOSE
+ printf("\tzero-length packet received; returning EOF\n");
+ #endif
+
+ return -1;
+ }
+ else{
+ // do we need to allow boost thread interrupt?
+ boost::this_thread::interruption_point();
+ continue;
+ }
+ }
else {
// Calculate the number of bytes we can take from the buffer in this call
nbytes = std::min(r, total_bytes-bytes_received);
return bytes_received/d_itemsize;
}
+// Return port number of d_socket
+int gr_udp_source::get_port(void)
+{
+ sockaddr_in name;
+ socklen_t len = sizeof(name);
+ int ret = getsockname( d_socket, (sockaddr*)&name, &len );
+ if( ret ) {
+ report_error("gr_udp_source/getsockname",NULL);
+ return -1;
+ }
+ return ntohs(name.sin_port);
+}