Updates to udp source/sink (select(), wait, cleanup)
authorDon Ward <don2387ward@sprynet.com>
Fri, 30 Apr 2010 18:48:56 +0000 (14:48 -0400)
committerDon Ward <don2387ward@sprynet.com>
Fri, 30 Apr 2010 18:48:56 +0000 (14:48 -0400)
Use select() to avoid blocking on recv() in gr_udp_source (only known
way to avoid blocking on Cygwin).

Add wait argument to gr_udp_source to allow waiting for connection
or accepting lack of connection as EOF; add --no-wait option to
dial_tone_sink.py.

Remove system dependencies from .h files; remove unused data members
and (useless?) public open and close functions.

gnuradio-core/src/lib/io/gr_udp_sink.cc
gnuradio-core/src/lib/io/gr_udp_sink.h
gnuradio-core/src/lib/io/gr_udp_source.cc
gnuradio-core/src/lib/io/gr_udp_source.h
gnuradio-core/src/lib/io/gr_udp_source.i [changed mode: 0644->0755]
gnuradio-examples/python/network/dial_tone_sink.py

index b447dd3b3a89abb9cadb0a64b49f411df8b58477..263d3dd4f171157ae93527a1aab1df5155dcc53b 100755 (executable)
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
 #include <string.h>
 #if defined(HAVE_NETDB_H)
 typedef void* optval_t;
-#else
+#elif defined(HAVE_WINDOWS_H)
 // if not posix, assume winsock
 #define USING_WINSOCK
+#include <winsock2.h>
+#include <ws2tcpip.h>
 #define SHUT_RDWR 2
 typedef char* optval_t;
 #endif
@@ -88,11 +90,13 @@ gr_udp_sink::gr_udp_sink (size_t itemsize,
   : gr_sync_block ("udp_sink",
                   gr_make_io_signature (1, 1, itemsize),
                   gr_make_io_signature (0, 0, 0)),
-    d_itemsize (itemsize), d_updated(false), d_payload_size(payload_size)
+    d_itemsize (itemsize), d_payload_size(payload_size)
 {
   int ret = 0;
+  struct addrinfo *ip_src;        // store the source ip info
+  struct addrinfo *ip_dst;        // store the destination ip info
 
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
   // initialize winsock DLL
   WSADATA wsaData;
   int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
@@ -110,55 +114,21 @@ gr_udp_sink::gr_udp_sink (size_t itemsize,
   hints.ai_protocol = IPPROTO_UDP;
   char port_str[7];
   sprintf( port_str, "%d", port_src );
-  ret = getaddrinfo( src, port_str, &hints, &d_ip_src );
+  ret = getaddrinfo( src, port_str, &hints, &ip_src );
   if( ret != 0 )
     report_error("gr_udp_source/getaddrinfo",
                 "can't initialize source socket" );
 
   // Get the destination IP address from the host name
   sprintf( port_str, "%d", port_dst );
-  ret = getaddrinfo( dst, port_str, &hints, &d_ip_dst );
+  ret = getaddrinfo( dst, port_str, &hints, &ip_dst );
   if( ret != 0 )
     report_error("gr_udp_source/getaddrinfo",
                 "can't initialize destination socket" );
-  
-  open();
-}
-
-// public constructor that returns a shared_ptr
-
-gr_udp_sink_sptr
-gr_make_udp_sink (size_t itemsize, 
-                 const char *src, unsigned short port_src,
-                 const char *dst, unsigned short port_dst,
-                 int payload_size)
-{
-  return gr_udp_sink_sptr (new gr_udp_sink (itemsize, 
-                                           src, port_src,
-                                           dst, port_dst,
-                                           payload_size));
-}
-
-gr_udp_sink::~gr_udp_sink ()
-{
-  freeaddrinfo(d_ip_src);
-  freeaddrinfo(d_ip_dst);
-  close();
-
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
-  // free winsock resources
-  WSACleanup();
-#endif
-}
-
-bool
-gr_udp_sink::open()
-{
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
 
   // create socket
-  d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype,
-                   d_ip_src->ai_protocol);
+  d_socket = socket(ip_src->ai_family, ip_src->ai_socktype,
+                   ip_src->ai_protocol);
   if(d_socket == -1) {
     report_error("socket open","can't open socket");
   }
@@ -180,24 +150,35 @@ gr_udp_sink::open()
   }
 
   // bind socket to an address and port number to listen on
-  if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) {
+  if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) {
     report_error("socket bind","can't bind socket");
   }
 
   // Not sure if we should throw here or allow retries
-  if(connect(d_socket, d_ip_dst->ai_addr, d_ip_dst->ai_addrlen) == -1) {
+  if(connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) {
     report_error("socket connect","can't connect to socket");
   }
 
-  d_updated = true;
-  return d_socket != 0;
+  freeaddrinfo(ip_src);
+  freeaddrinfo(ip_dst);
 }
 
-void
-gr_udp_sink::close()
+// public constructor that returns a shared_ptr
+
+gr_udp_sink_sptr
+gr_make_udp_sink (size_t itemsize, 
+                 const char *src, unsigned short port_src,
+                 const char *dst, unsigned short port_dst,
+                 int payload_size)
 {
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
+  return gr_udp_sink_sptr (new gr_udp_sink (itemsize, 
+                                           src, port_src,
+                                           dst, port_dst,
+                                           payload_size));
+}
 
+gr_udp_sink::~gr_udp_sink ()
+{
   if (d_socket){
     shutdown(d_socket, SHUT_RDWR);
 #if defined(USING_WINSOCK)
@@ -207,7 +188,11 @@ gr_udp_sink::close()
 #endif
     d_socket = 0;
   }
-  d_updated = true;
+
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
+  // free winsock resources
+  WSACleanup();
+#endif
 }
 
 int 
index 9f50ed7f03c23aa05f79f1d1cc983e4caf212c37..6b6ee40fe856a6df4c88bc77f2e643da6f4937e5 100644 (file)
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
@@ -66,13 +66,9 @@ class gr_udp_sink : public gr_sync_block
                                            int payload_size);
  private:
   size_t       d_itemsize;
-  bool         d_updated;
-  gruel::mutex d_mutex;
 
   int            d_payload_size;    // maximum transmission unit (packet length)
   int            d_socket;          // handle to socket
-  struct addrinfo *d_ip_src;        // store the source ip info
-  struct addrinfo *d_ip_dst;        // store the destination ip info
 
  protected:
   /*!
@@ -96,21 +92,6 @@ class gr_udp_sink : public gr_sync_block
  public:
   ~gr_udp_sink ();
 
-  /*!
-   * \brief open a socket specified by the port and ip address info
-   *
-   * Opens a socket, binds to the address, and makes connectionless association
-   * over UDP. If any of these fail, the fuction retuns the error and exits.
-   */
-  bool open();
-
-  /*!
-   * \brief Close current socket.
-   *
-   * Shuts down read/write on the socket
-   */
-  void close();
-
   /*! \brief return the PAYLOAD_SIZE of the socket */
   int payload_size() { return d_payload_size; }
 
index 56499258c3557e765a1ab50ca23a0f19ee76a20a..ce870d481eff17b99b8ccd436b0bd27ebe0b8fc0 100755 (executable)
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
 #include <stdio.h>
 #include <string.h>
 #if defined(HAVE_NETDB_H)
+#include <netdb.h>
 typedef void* optval_t;
-#else
+#elif defined(HAVE_WINDOWS_H)
 // if not posix, assume winsock
 #define USING_WINSOCK
+#include <winsock2.h>
+#include <ws2tcpip.h>
 #define SHUT_RDWR 2
 typedef char* optval_t;
 #endif
 
+#define USE_SELECT    1  // non-blocking receive on all platforms
+#define USE_RCV_TIMEO 0  // non-blocking receive on all but Cygwin
 #define SRC_VERBOSE 0
 
 static int is_error( int perr )
@@ -77,15 +82,17 @@ static void report_error( char *msg1, char *msg2 )
 }
 
 gr_udp_source::gr_udp_source(size_t itemsize, const char *src, 
-                            unsigned short port_src, int payload_size)
+                            unsigned short port_src, int payload_size,
+                            bool wait)
   : gr_sync_block ("udp_source",
                   gr_make_io_signature(0, 0, 0),
                   gr_make_io_signature(1, 1, itemsize)),
-    d_itemsize(itemsize), d_updated(false), d_payload_size(payload_size), d_residual(0), d_temp_offset(0)
+    d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0)
 {
   int ret = 0;
+  struct addrinfo *ip_src;      // store the source IP address to use
 
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
   // initialize winsock DLL
   WSADATA wsaData;
   int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
@@ -103,43 +110,16 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src,
   hints.ai_protocol = IPPROTO_UDP;
   char port_str[7];
   sprintf( port_str, "%d", port_src );
-  ret = getaddrinfo( src, port_str, &hints, &d_ip_src );
+  ret = getaddrinfo( src, port_str, &hints, &ip_src );
   if( ret != 0 )
     report_error("gr_udp_source/getaddrinfo",
                 "can't initialize source socket" );
 
   d_temp_buff = new char[d_payload_size];   // allow it to hold up to payload_size bytes
-  
-  open();
-}
-
-gr_udp_source_sptr
-gr_make_udp_source (size_t itemsize, const char *ipaddr, 
-                   unsigned short port, int payload_size)
-{
-  return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
-                                               port, payload_size));
-}
-
-gr_udp_source::~gr_udp_source ()
-{
-  freeaddrinfo(d_ip_src);
-  delete [] d_temp_buff;
-  close();
 
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
-  // free winsock resources
-  WSACleanup();
-#endif
-}
-
-bool
-gr_udp_source::open()
-{
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
   // create socket
-  d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype,
-                   d_ip_src->ai_protocol);
+  d_socket = socket(ip_src->ai_family, ip_src->ai_socktype,
+                   ip_src->ai_protocol);
   if(d_socket == -1) {
     report_error("socket open","can't open socket");
   }
@@ -160,6 +140,7 @@ gr_udp_source::open()
     }
   }
 
+#if USE_RCV_TIMEO
   // Set a timeout on the receive function to not block indefinitely
   // This value can (and probably should) be changed
   // Ignored on Cygwin
@@ -173,20 +154,27 @@ gr_udp_source::open()
   if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (optval_t)&timeout, sizeof(timeout)) == -1) {
     report_error("SO_RCVTIMEO","can't set socket option SO_RCVTIMEO");
   }
+#endif // USE_RCV_TIMEO
 
   // bind socket to an address and port number to listen on
-  if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) {
+  if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) {
     report_error("socket bind","can't bind socket");
   }
+  freeaddrinfo(ip_src);
 
-  d_updated = true;
-  return d_socket != 0;
 }
 
-void
-gr_udp_source::close()
+gr_udp_source_sptr
+gr_make_udp_source (size_t itemsize, const char *ipaddr, 
+                   unsigned short port, int payload_size, bool wait)
 {
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
+  return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
+                                               port, payload_size, wait));
+}
+
+gr_udp_source::~gr_udp_source ()
+{
+  delete [] d_temp_buff;
 
   if (d_socket){
     shutdown(d_socket, SHUT_RDWR);
@@ -197,7 +185,11 @@ gr_udp_source::close()
 #endif
     d_socket = 0;
   }
-  d_updated = true;
+
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
+  // free winsock resources
+  WSACleanup();
+#endif
 }
 
 int 
@@ -232,8 +224,38 @@ gr_udp_source::work (int noutput_items,
     d_temp_offset = d_temp_offset+d_residual;
   }
 
+#if USE_SELECT
+  // Use select() to determine when socket is readable
+  fd_set readfds;
+  timeval timeout;
+  timeout.tv_sec = 1;
+  timeout.tv_usec = 0;
+#endif
+  
   while(1) {
     // get the data into our output buffer and record the number of bytes
+
+#if USE_SELECT
+    // RCV_TIMEO doesn't work on all systems (e.g., Cygwin)
+    // use select() instead of, or in addition to RCV_TIMEO
+    FD_ZERO(&readfds);
+    FD_SET(d_socket, &readfds);
+    r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
+    if(r < 0) {
+       report_error("udp_source/select",NULL);
+       return -1;
+    }
+    else if(r == 0 ) {  // timed out
+      if( d_wait ) {
+       // Allow boost thread interrupt, then try again
+       boost::this_thread::interruption_point();
+       continue;
+      }
+      else
+       return -1;
+    }
+#endif // USE_SELECT
+
     // This is a non-blocking call with a timeout set in the constructor
     r = recv(d_socket, d_temp_buff, d_payload_size, 0);  // get the entire payload or the what's available
 
@@ -244,11 +266,16 @@ gr_udp_source::work (int noutput_items,
        printf("UDP receive timed out\n"); 
         #endif
 
-       // Break here to allow the rest of the flow graph time to run and so ctrl-C breaks
-       break;
+       if( d_wait ) {
+         // Allow boost thread interrupt, then try again
+         boost::this_thread::interruption_point();
+         continue;
+       }
+       else
+         return -1;
       }
       else {
-       report_error("udp_source",NULL);
+       report_error("udp_source/recv",NULL);
        return -1;
       }
     }
index 14d521dac7c8df63d948d03044d4e8c7b4e5226d..b06536d6a8bd038aa33a6313bc5a3e6f4013edf2 100755 (executable)
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
@@ -38,7 +38,8 @@ class gr_udp_source;
 typedef boost::shared_ptr<gr_udp_source> gr_udp_source_sptr;
 
 gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, 
-                                     unsigned short port_src, int payload_size=1472);
+                                     unsigned short port_src,
+                                     int payload_size=1472, bool wait=true);
 
 /*! 
  * \brief Read stream from an UDP socket.
@@ -50,22 +51,22 @@ gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src,
  * \param port_src     The port number on which the socket listens for data
  * \param payload_size UDP payload size by default set to 
  *                     1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+ * \param wait         Wait for data if not immediately available (default: true)
  *
 */
 
 class gr_udp_source : public gr_sync_block
 {
   friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, 
-                                              unsigned short port_src, int payload_size);
+                                              unsigned short port_src,
+                                              int payload_size, bool wait);
 
  private:
   size_t       d_itemsize;
-  bool         d_updated;
-  gruel::mutex d_mutex;
 
   int            d_payload_size;  // maximum transmission unit (packet length)
+  bool           d_wait;          // wait if data if not immediately available
   int            d_socket;        // handle to socket
-  struct addrinfo *d_ip_src;      // store the source IP address to use
   char *d_temp_buff;    // hold buffer between calls
   ssize_t d_residual;   // hold information about number of bytes stored in the temp buffer
   size_t d_temp_offset; // point to temp buffer location offset
@@ -80,27 +81,14 @@ class gr_udp_source : public gr_sync_block
    * \param port_src     The port number on which the socket listens for data
    * \param payload_size UDP payload size by default set to 
    *                     1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+   * \param wait         Wait for data if not immediately available (default: true)
    */
-  gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, int payload_size);
+  gr_udp_source(size_t itemsize, const char *src, unsigned short port_src,
+               int payload_size, bool wait);
 
  public:
   ~gr_udp_source();
 
-  /*!
-   * \brief open a socket specified by the port and ip address info
-   *
-   * Opens a socket, binds to the address, and waits for a connection
-   * over UDP. If any of these fail, the fuction retuns the error and exits.
-   */
-  bool open();
-
-  /*!
-   * \brief Close current socket.
-   *
-   * Shuts down read/write on the socket
-   */
-  void close();
-
   /*! \brief return the PAYLOAD_SIZE of the socket */
   int payload_size() { return d_payload_size; }
 
old mode 100644 (file)
new mode 100755 (executable)
index fb39dad..efaa57c
@@ -24,19 +24,18 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_source)
 
 gr_udp_source_sptr 
 gr_make_udp_source (size_t itemsize, const char *src, 
-                   unsigned short port_src, int payload_size=1472);
+                   unsigned short port_src, int payload_size=1472,
+                   bool wait=true);
 
 class gr_udp_source : public gr_sync_block
 {
  protected:
   gr_udp_source (size_t itemsize, const char *src, 
-                unsigned short port_src, int payload_size);
+                unsigned short port_src, int payload_size, bool wait);
 
  public:
   ~gr_udp_source ();
 
-  bool open();
-  void close();
   int payload_size() { return d_payload_size; }
 
 };
index 47d24b9bcfb006764e6180b63cc95503d0e81122..82e925baa7dd05d84e46d2a9782ab534f72981e8 100755 (executable)
@@ -25,9 +25,9 @@ from gnuradio.eng_option import eng_option
 from optparse import OptionParser
 
 class dial_tone_sink(gr.top_block):
-    def __init__(self, src, port, pkt_size, sample_rate):
+    def __init__(self, src, port, pkt_size, sample_rate, wait):
         gr.top_block.__init__(self, "dial_tone_sink")
-        udp = gr.udp_source(gr.sizeof_float, src, port, pkt_size)
+        udp = gr.udp_source(gr.sizeof_float, src, port, pkt_size, wait=wait)
         sink = audio.sink(sample_rate)
         self.connect(udp, sink)
         
@@ -41,6 +41,8 @@ if __name__ == '__main__':
                       help="packet size.")
     parser.add_option("-r", "--sample-rate", type="int", default=8000,
                       help="audio signal sample rate [default=%default]")
+    parser.add_option("-n", "--no-wait", action="store_true", default=False,
+                      help="don't wait for source")
     (options, args) = parser.parse_args()
     if len(args) != 0:
         parser.print_help()
@@ -48,7 +50,8 @@ if __name__ == '__main__':
 
     # Create an instance of a hierarchical block
     top_block = dial_tone_sink(options.src_name, options.src_port,
-                               options.packet_size, options.sample_rate)
+                               options.packet_size, options.sample_rate,
+                               not options.no_wait)
     
     try:    
         # Run forever