From 3ff43f7487b43436cd0f49de80ebff2c1ff1a188 Mon Sep 17 00:00:00 2001 From: Don Ward Date: Fri, 30 Apr 2010 14:48:56 -0400 Subject: [PATCH] Updates to udp source/sink (select(), wait, cleanup) Use select() to avoid blocking on recv() in gr_udp_source (only known way to avoid blocking on Cygwin). Add wait argument to gr_udp_source to allow waiting for connection or accepting lack of connection as EOF; add --no-wait option to dial_tone_sink.py. Remove system dependencies from .h files; remove unused data members and (useless?) public open and close functions. --- gnuradio-core/src/lib/io/gr_udp_sink.cc | 85 ++++++------- gnuradio-core/src/lib/io/gr_udp_sink.h | 21 +--- gnuradio-core/src/lib/io/gr_udp_source.cc | 117 +++++++++++------- gnuradio-core/src/lib/io/gr_udp_source.h | 32 ++--- gnuradio-core/src/lib/io/gr_udp_source.i | 7 +- .../python/network/dial_tone_sink.py | 9 +- 6 files changed, 127 insertions(+), 144 deletions(-) mode change 100644 => 100755 gnuradio-core/src/lib/io/gr_udp_source.i diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index b447dd3b..263d3dd4 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007,2008,2009 Free Software Foundation, Inc. + * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -31,9 +31,11 @@ #include #if defined(HAVE_NETDB_H) typedef void* optval_t; -#else +#elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock #define USING_WINSOCK +#include +#include #define SHUT_RDWR 2 typedef char* optval_t; #endif @@ -88,11 +90,13 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, : gr_sync_block ("udp_sink", gr_make_io_signature (1, 1, itemsize), gr_make_io_signature (0, 0, 0)), - d_itemsize (itemsize), d_updated(false), d_payload_size(payload_size) + d_itemsize (itemsize), d_payload_size(payload_size) { int ret = 0; + struct addrinfo *ip_src; // store the source ip info + struct addrinfo *ip_dst; // store the destination ip info -#if !defined(HAVE_SOCKET) // for Windows (with MinGW) +#if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL WSADATA wsaData; int iResult = WSAStartup( MAKEWORD(2,2), &wsaData ); @@ -110,55 +114,21 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, hints.ai_protocol = IPPROTO_UDP; char port_str[7]; sprintf( port_str, "%d", port_src ); - ret = getaddrinfo( src, port_str, &hints, &d_ip_src ); + ret = getaddrinfo( src, port_str, &hints, &ip_src ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize source socket" ); // Get the destination IP address from the host name sprintf( port_str, "%d", port_dst ); - ret = getaddrinfo( dst, port_str, &hints, &d_ip_dst ); + ret = getaddrinfo( dst, port_str, &hints, &ip_dst ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize destination socket" ); - - open(); -} - -// public constructor that returns a shared_ptr - -gr_udp_sink_sptr -gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size) -{ - return gr_udp_sink_sptr (new gr_udp_sink (itemsize, - src, port_src, - dst, port_dst, - payload_size)); -} - -gr_udp_sink::~gr_udp_sink () -{ - freeaddrinfo(d_ip_src); - freeaddrinfo(d_ip_dst); - close(); - -#if !defined(HAVE_SOCKET) // for Windows (with MinGW) - // free winsock resources - WSACleanup(); -#endif -} - -bool -gr_udp_sink::open() -{ - gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function // create socket - d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype, - d_ip_src->ai_protocol); + d_socket = socket(ip_src->ai_family, ip_src->ai_socktype, + ip_src->ai_protocol); if(d_socket == -1) { report_error("socket open","can't open socket"); } @@ -180,24 +150,35 @@ gr_udp_sink::open() } // bind socket to an address and port number to listen on - if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) { + if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) { report_error("socket bind","can't bind socket"); } // Not sure if we should throw here or allow retries - if(connect(d_socket, d_ip_dst->ai_addr, d_ip_dst->ai_addrlen) == -1) { + if(connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) { report_error("socket connect","can't connect to socket"); } - d_updated = true; - return d_socket != 0; + freeaddrinfo(ip_src); + freeaddrinfo(ip_dst); } -void -gr_udp_sink::close() +// public constructor that returns a shared_ptr + +gr_udp_sink_sptr +gr_make_udp_sink (size_t itemsize, + const char *src, unsigned short port_src, + const char *dst, unsigned short port_dst, + int payload_size) { - gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function + return gr_udp_sink_sptr (new gr_udp_sink (itemsize, + src, port_src, + dst, port_dst, + payload_size)); +} +gr_udp_sink::~gr_udp_sink () +{ if (d_socket){ shutdown(d_socket, SHUT_RDWR); #if defined(USING_WINSOCK) @@ -207,7 +188,11 @@ gr_udp_sink::close() #endif d_socket = 0; } - d_updated = true; + +#if defined(USING_WINSOCK) // for Windows (with MinGW) + // free winsock resources + WSACleanup(); +#endif } int diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.h b/gnuradio-core/src/lib/io/gr_udp_sink.h index 9f50ed7f..6b6ee40f 100644 --- a/gnuradio-core/src/lib/io/gr_udp_sink.h +++ b/gnuradio-core/src/lib/io/gr_udp_sink.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007,2008,2009 Free Software Foundation, Inc. + * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -66,13 +66,9 @@ class gr_udp_sink : public gr_sync_block int payload_size); private: size_t d_itemsize; - bool d_updated; - gruel::mutex d_mutex; int d_payload_size; // maximum transmission unit (packet length) int d_socket; // handle to socket - struct addrinfo *d_ip_src; // store the source ip info - struct addrinfo *d_ip_dst; // store the destination ip info protected: /*! @@ -96,21 +92,6 @@ class gr_udp_sink : public gr_sync_block public: ~gr_udp_sink (); - /*! - * \brief open a socket specified by the port and ip address info - * - * Opens a socket, binds to the address, and makes connectionless association - * over UDP. If any of these fail, the fuction retuns the error and exits. - */ - bool open(); - - /*! - * \brief Close current socket. - * - * Shuts down read/write on the socket - */ - void close(); - /*! \brief return the PAYLOAD_SIZE of the socket */ int payload_size() { return d_payload_size; } diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index 56499258..ce870d48 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007,2008,2009 Free Software Foundation, Inc. + * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -30,14 +30,19 @@ #include #include #if defined(HAVE_NETDB_H) +#include typedef void* optval_t; -#else +#elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock #define USING_WINSOCK +#include +#include #define SHUT_RDWR 2 typedef char* optval_t; #endif +#define USE_SELECT 1 // non-blocking receive on all platforms +#define USE_RCV_TIMEO 0 // non-blocking receive on all but Cygwin #define SRC_VERBOSE 0 static int is_error( int perr ) @@ -77,15 +82,17 @@ static void report_error( char *msg1, char *msg2 ) } gr_udp_source::gr_udp_source(size_t itemsize, const char *src, - unsigned short port_src, int payload_size) + unsigned short port_src, int payload_size, + bool wait) : gr_sync_block ("udp_source", gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, itemsize)), - d_itemsize(itemsize), d_updated(false), d_payload_size(payload_size), d_residual(0), d_temp_offset(0) + d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0) { int ret = 0; + struct addrinfo *ip_src; // store the source IP address to use -#if !defined(HAVE_SOCKET) // for Windows (with MinGW) +#if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL WSADATA wsaData; int iResult = WSAStartup( MAKEWORD(2,2), &wsaData ); @@ -103,43 +110,16 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, hints.ai_protocol = IPPROTO_UDP; char port_str[7]; sprintf( port_str, "%d", port_src ); - ret = getaddrinfo( src, port_str, &hints, &d_ip_src ); + ret = getaddrinfo( src, port_str, &hints, &ip_src ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize source socket" ); d_temp_buff = new char[d_payload_size]; // allow it to hold up to payload_size bytes - - open(); -} - -gr_udp_source_sptr -gr_make_udp_source (size_t itemsize, const char *ipaddr, - unsigned short port, int payload_size) -{ - return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, - port, payload_size)); -} - -gr_udp_source::~gr_udp_source () -{ - freeaddrinfo(d_ip_src); - delete [] d_temp_buff; - close(); -#if !defined(HAVE_SOCKET) // for Windows (with MinGW) - // free winsock resources - WSACleanup(); -#endif -} - -bool -gr_udp_source::open() -{ - gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function // create socket - d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype, - d_ip_src->ai_protocol); + d_socket = socket(ip_src->ai_family, ip_src->ai_socktype, + ip_src->ai_protocol); if(d_socket == -1) { report_error("socket open","can't open socket"); } @@ -160,6 +140,7 @@ gr_udp_source::open() } } +#if USE_RCV_TIMEO // Set a timeout on the receive function to not block indefinitely // This value can (and probably should) be changed // Ignored on Cygwin @@ -173,20 +154,27 @@ gr_udp_source::open() if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (optval_t)&timeout, sizeof(timeout)) == -1) { report_error("SO_RCVTIMEO","can't set socket option SO_RCVTIMEO"); } +#endif // USE_RCV_TIMEO // bind socket to an address and port number to listen on - if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) { + if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) { report_error("socket bind","can't bind socket"); } + freeaddrinfo(ip_src); - d_updated = true; - return d_socket != 0; } -void -gr_udp_source::close() +gr_udp_source_sptr +gr_make_udp_source (size_t itemsize, const char *ipaddr, + unsigned short port, int payload_size, bool wait) { - gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function + return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, + port, payload_size, wait)); +} + +gr_udp_source::~gr_udp_source () +{ + delete [] d_temp_buff; if (d_socket){ shutdown(d_socket, SHUT_RDWR); @@ -197,7 +185,11 @@ gr_udp_source::close() #endif d_socket = 0; } - d_updated = true; + +#if defined(USING_WINSOCK) // for Windows (with MinGW) + // free winsock resources + WSACleanup(); +#endif } int @@ -232,8 +224,38 @@ gr_udp_source::work (int noutput_items, d_temp_offset = d_temp_offset+d_residual; } +#if USE_SELECT + // Use select() to determine when socket is readable + fd_set readfds; + timeval timeout; + timeout.tv_sec = 1; + timeout.tv_usec = 0; +#endif + while(1) { // get the data into our output buffer and record the number of bytes + +#if USE_SELECT + // RCV_TIMEO doesn't work on all systems (e.g., Cygwin) + // use select() instead of, or in addition to RCV_TIMEO + FD_ZERO(&readfds); + FD_SET(d_socket, &readfds); + r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout); + if(r < 0) { + report_error("udp_source/select",NULL); + return -1; + } + else if(r == 0 ) { // timed out + if( d_wait ) { + // Allow boost thread interrupt, then try again + boost::this_thread::interruption_point(); + continue; + } + else + return -1; + } +#endif // USE_SELECT + // This is a non-blocking call with a timeout set in the constructor r = recv(d_socket, d_temp_buff, d_payload_size, 0); // get the entire payload or the what's available @@ -244,11 +266,16 @@ gr_udp_source::work (int noutput_items, printf("UDP receive timed out\n"); #endif - // Break here to allow the rest of the flow graph time to run and so ctrl-C breaks - break; + if( d_wait ) { + // Allow boost thread interrupt, then try again + boost::this_thread::interruption_point(); + continue; + } + else + return -1; } else { - report_error("udp_source",NULL); + report_error("udp_source/recv",NULL); return -1; } } diff --git a/gnuradio-core/src/lib/io/gr_udp_source.h b/gnuradio-core/src/lib/io/gr_udp_source.h index 14d521da..b06536d6 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.h +++ b/gnuradio-core/src/lib/io/gr_udp_source.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007,2008,2009 Free Software Foundation, Inc. + * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -38,7 +38,8 @@ class gr_udp_source; typedef boost::shared_ptr gr_udp_source_sptr; gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, - unsigned short port_src, int payload_size=1472); + unsigned short port_src, + int payload_size=1472, bool wait=true); /*! * \brief Read stream from an UDP socket. @@ -50,22 +51,22 @@ gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, * \param port_src The port number on which the socket listens for data * \param payload_size UDP payload size by default set to * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param wait Wait for data if not immediately available (default: true) * */ class gr_udp_source : public gr_sync_block { friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, - unsigned short port_src, int payload_size); + unsigned short port_src, + int payload_size, bool wait); private: size_t d_itemsize; - bool d_updated; - gruel::mutex d_mutex; int d_payload_size; // maximum transmission unit (packet length) + bool d_wait; // wait if data if not immediately available int d_socket; // handle to socket - struct addrinfo *d_ip_src; // store the source IP address to use char *d_temp_buff; // hold buffer between calls ssize_t d_residual; // hold information about number of bytes stored in the temp buffer size_t d_temp_offset; // point to temp buffer location offset @@ -80,27 +81,14 @@ class gr_udp_source : public gr_sync_block * \param port_src The port number on which the socket listens for data * \param payload_size UDP payload size by default set to * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param wait Wait for data if not immediately available (default: true) */ - gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, int payload_size); + gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, + int payload_size, bool wait); public: ~gr_udp_source(); - /*! - * \brief open a socket specified by the port and ip address info - * - * Opens a socket, binds to the address, and waits for a connection - * over UDP. If any of these fail, the fuction retuns the error and exits. - */ - bool open(); - - /*! - * \brief Close current socket. - * - * Shuts down read/write on the socket - */ - void close(); - /*! \brief return the PAYLOAD_SIZE of the socket */ int payload_size() { return d_payload_size; } diff --git a/gnuradio-core/src/lib/io/gr_udp_source.i b/gnuradio-core/src/lib/io/gr_udp_source.i old mode 100644 new mode 100755 index fb39dad6..efaa57c2 --- a/gnuradio-core/src/lib/io/gr_udp_source.i +++ b/gnuradio-core/src/lib/io/gr_udp_source.i @@ -24,19 +24,18 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_source) gr_udp_source_sptr gr_make_udp_source (size_t itemsize, const char *src, - unsigned short port_src, int payload_size=1472); + unsigned short port_src, int payload_size=1472, + bool wait=true); class gr_udp_source : public gr_sync_block { protected: gr_udp_source (size_t itemsize, const char *src, - unsigned short port_src, int payload_size); + unsigned short port_src, int payload_size, bool wait); public: ~gr_udp_source (); - bool open(); - void close(); int payload_size() { return d_payload_size; } }; diff --git a/gnuradio-examples/python/network/dial_tone_sink.py b/gnuradio-examples/python/network/dial_tone_sink.py index 47d24b9b..82e925ba 100755 --- a/gnuradio-examples/python/network/dial_tone_sink.py +++ b/gnuradio-examples/python/network/dial_tone_sink.py @@ -25,9 +25,9 @@ from gnuradio.eng_option import eng_option from optparse import OptionParser class dial_tone_sink(gr.top_block): - def __init__(self, src, port, pkt_size, sample_rate): + def __init__(self, src, port, pkt_size, sample_rate, wait): gr.top_block.__init__(self, "dial_tone_sink") - udp = gr.udp_source(gr.sizeof_float, src, port, pkt_size) + udp = gr.udp_source(gr.sizeof_float, src, port, pkt_size, wait=wait) sink = audio.sink(sample_rate) self.connect(udp, sink) @@ -41,6 +41,8 @@ if __name__ == '__main__': help="packet size.") parser.add_option("-r", "--sample-rate", type="int", default=8000, help="audio signal sample rate [default=%default]") + parser.add_option("-n", "--no-wait", action="store_true", default=False, + help="don't wait for source") (options, args) = parser.parse_args() if len(args) != 0: parser.print_help() @@ -48,7 +50,8 @@ if __name__ == '__main__': # Create an instance of a hierarchical block top_block = dial_tone_sink(options.src_name, options.src_port, - options.packet_size, options.sample_rate) + options.packet_size, options.sample_rate, + not options.no_wait) try: # Run forever -- 2.30.2