X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=gnuradio-core%2Fsrc%2Flib%2Fio%2Fgr_udp_source.cc;h=fea9a26ba40083c3fb25768ed08badae5763fc1b;hb=6c0f2f5a5e4eddefc52c272c4b92065a225be3c5;hp=ce870d481eff17b99b8ccd436b0bd27ebe0b8fc0;hpb=dda6ed353551d3493983bd56e0ca8ee8ed4407c5;p=debian%2Fgnuradio diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc old mode 100755 new mode 100644 index ce870d48..fea9a26b --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -29,9 +29,25 @@ #include #include #include + #if defined(HAVE_NETDB_H) #include +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H +#include +#endif typedef void* optval_t; + +// ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order +#if defined(HAVE_NETINET_IN_H) +#include +#endif +#if defined(HAVE_ARPA_INET_H) +#include +#endif + #elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock #define USING_WINSOCK @@ -67,7 +83,7 @@ static int is_error( int perr ) #endif } -static void report_error( char *msg1, char *msg2 ) +static void report_error( const char *msg1, const char *msg2 ) { // Deal with errors, both posix and winsock #if defined(USING_WINSOCK) @@ -81,16 +97,16 @@ static void report_error( char *msg1, char *msg2 ) return; } -gr_udp_source::gr_udp_source(size_t itemsize, const char *src, - unsigned short port_src, int payload_size, - bool wait) +gr_udp_source::gr_udp_source(size_t itemsize, const char *host, + unsigned short port, int payload_size, + bool eof, bool wait) : gr_sync_block ("udp_source", gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, itemsize)), - d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0) + d_itemsize(itemsize), d_payload_size(payload_size), + d_eof(eof), d_wait(wait), d_socket(-1), d_residual(0), d_temp_offset(0) { int ret = 0; - struct addrinfo *ip_src; // store the source IP address to use #if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL @@ -103,18 +119,23 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, // Set up the address stucture for the source address and port numbers // Get the source IP address from the host name + struct addrinfo *ip_src; // store the source IP address to use struct addrinfo hints; memset( (void*)&hints, 0, sizeof(hints) ); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; hints.ai_protocol = IPPROTO_UDP; - char port_str[7]; - sprintf( port_str, "%d", port_src ); - ret = getaddrinfo( src, port_str, &hints, &ip_src ); + hints.ai_flags = AI_PASSIVE; + char port_str[12]; + sprintf( port_str, "%d", port ); + + // FIXME leaks if report_error throws below + ret = getaddrinfo( host, port_str, &hints, &ip_src ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize source socket" ); + // FIXME leaks if report_error throws below d_temp_buff = new char[d_payload_size]; // allow it to hold up to payload_size bytes // create socket @@ -166,24 +187,24 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, gr_udp_source_sptr gr_make_udp_source (size_t itemsize, const char *ipaddr, - unsigned short port, int payload_size, bool wait) + unsigned short port, int payload_size, bool eof, bool wait) { return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, - port, payload_size, wait)); + port, payload_size, eof, wait)); } gr_udp_source::~gr_udp_source () { delete [] d_temp_buff; - if (d_socket){ + if (d_socket != -1){ shutdown(d_socket, SHUT_RDWR); #if defined(USING_WINSOCK) closesocket(d_socket); #else ::close(d_socket); #endif - d_socket = 0; + d_socket = -1; } #if defined(USING_WINSOCK) // for Windows (with MinGW) @@ -221,23 +242,23 @@ gr_udp_source::work (int noutput_items, // Update indexing of amount of bytes left in the buffer d_residual -= nbytes; - d_temp_offset = d_temp_offset+d_residual; + d_temp_offset += nbytes; + + // Return now with what we've got. + assert(nbytes % d_itemsize == 0); + return nbytes/d_itemsize; } -#if USE_SELECT - // Use select() to determine when socket is readable - fd_set readfds; - timeval timeout; - timeout.tv_sec = 1; - timeout.tv_usec = 0; -#endif - while(1) { // get the data into our output buffer and record the number of bytes #if USE_SELECT // RCV_TIMEO doesn't work on all systems (e.g., Cygwin) // use select() instead of, or in addition to RCV_TIMEO + fd_set readfds; + timeval timeout; + timeout.tv_sec = 1; // Init timeout each iteration. Select can modify it. + timeout.tv_usec = 0; FD_ZERO(&readfds); FD_SET(d_socket, &readfds); r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout); @@ -259,6 +280,11 @@ gr_udp_source::work (int noutput_items, // This is a non-blocking call with a timeout set in the constructor r = recv(d_socket, d_temp_buff, d_payload_size, 0); // get the entire payload or the what's available + // If r > 0, round it down to a multiple of d_itemsize + // (If sender is broken, don't propagate problem) + if (r > 0) + r = (r/d_itemsize) * d_itemsize; + // Check if there was a problem; forget it if the operation just timed out if(r == -1) { if( is_error(EAGAIN) ) { // handle non-blocking call timeout @@ -279,6 +305,22 @@ gr_udp_source::work (int noutput_items, return -1; } } + else if(r==0) { + if(d_eof) { + // zero-length packet interpreted as EOF + + #if SNK_VERBOSE + printf("\tzero-length packet received; returning EOF\n"); + #endif + + return -1; + } + else{ + // do we need to allow boost thread interrupt? + boost::this_thread::interruption_point(); + continue; + } + } else { // Calculate the number of bytes we can take from the buffer in this call nbytes = std::min(r, total_bytes-bytes_received); @@ -316,3 +358,15 @@ gr_udp_source::work (int noutput_items, return bytes_received/d_itemsize; } +// Return port number of d_socket +int gr_udp_source::get_port(void) +{ + sockaddr_in name; + socklen_t len = sizeof(name); + int ret = getsockname( d_socket, (sockaddr*)&name, &len ); + if( ret ) { + report_error("gr_udp_source/getsockname",NULL); + return -1; + } + return ntohs(name.sin_port); +}