Updates to udp source/sink (select(), wait, cleanup)
[debian/gnuradio] / gnuradio-core / src / lib / io / gr_udp_source.cc
index 56499258c3557e765a1ab50ca23a0f19ee76a20a..ce870d481eff17b99b8ccd436b0bd27ebe0b8fc0 100755 (executable)
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
 #include <stdio.h>
 #include <string.h>
 #if defined(HAVE_NETDB_H)
+#include <netdb.h>
 typedef void* optval_t;
-#else
+#elif defined(HAVE_WINDOWS_H)
 // if not posix, assume winsock
 #define USING_WINSOCK
+#include <winsock2.h>
+#include <ws2tcpip.h>
 #define SHUT_RDWR 2
 typedef char* optval_t;
 #endif
 
+#define USE_SELECT    1  // non-blocking receive on all platforms
+#define USE_RCV_TIMEO 0  // non-blocking receive on all but Cygwin
 #define SRC_VERBOSE 0
 
 static int is_error( int perr )
@@ -77,15 +82,17 @@ static void report_error( char *msg1, char *msg2 )
 }
 
 gr_udp_source::gr_udp_source(size_t itemsize, const char *src, 
-                            unsigned short port_src, int payload_size)
+                            unsigned short port_src, int payload_size,
+                            bool wait)
   : gr_sync_block ("udp_source",
                   gr_make_io_signature(0, 0, 0),
                   gr_make_io_signature(1, 1, itemsize)),
-    d_itemsize(itemsize), d_updated(false), d_payload_size(payload_size), d_residual(0), d_temp_offset(0)
+    d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0)
 {
   int ret = 0;
+  struct addrinfo *ip_src;      // store the source IP address to use
 
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
   // initialize winsock DLL
   WSADATA wsaData;
   int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
@@ -103,43 +110,16 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src,
   hints.ai_protocol = IPPROTO_UDP;
   char port_str[7];
   sprintf( port_str, "%d", port_src );
-  ret = getaddrinfo( src, port_str, &hints, &d_ip_src );
+  ret = getaddrinfo( src, port_str, &hints, &ip_src );
   if( ret != 0 )
     report_error("gr_udp_source/getaddrinfo",
                 "can't initialize source socket" );
 
   d_temp_buff = new char[d_payload_size];   // allow it to hold up to payload_size bytes
-  
-  open();
-}
-
-gr_udp_source_sptr
-gr_make_udp_source (size_t itemsize, const char *ipaddr, 
-                   unsigned short port, int payload_size)
-{
-  return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
-                                               port, payload_size));
-}
-
-gr_udp_source::~gr_udp_source ()
-{
-  freeaddrinfo(d_ip_src);
-  delete [] d_temp_buff;
-  close();
 
-#if !defined(HAVE_SOCKET) // for Windows (with MinGW)
-  // free winsock resources
-  WSACleanup();
-#endif
-}
-
-bool
-gr_udp_source::open()
-{
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
   // create socket
-  d_socket = socket(d_ip_src->ai_family, d_ip_src->ai_socktype,
-                   d_ip_src->ai_protocol);
+  d_socket = socket(ip_src->ai_family, ip_src->ai_socktype,
+                   ip_src->ai_protocol);
   if(d_socket == -1) {
     report_error("socket open","can't open socket");
   }
@@ -160,6 +140,7 @@ gr_udp_source::open()
     }
   }
 
+#if USE_RCV_TIMEO
   // Set a timeout on the receive function to not block indefinitely
   // This value can (and probably should) be changed
   // Ignored on Cygwin
@@ -173,20 +154,27 @@ gr_udp_source::open()
   if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (optval_t)&timeout, sizeof(timeout)) == -1) {
     report_error("SO_RCVTIMEO","can't set socket option SO_RCVTIMEO");
   }
+#endif // USE_RCV_TIMEO
 
   // bind socket to an address and port number to listen on
-  if(bind (d_socket, d_ip_src->ai_addr, d_ip_src->ai_addrlen) == -1) {
+  if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) {
     report_error("socket bind","can't bind socket");
   }
+  freeaddrinfo(ip_src);
 
-  d_updated = true;
-  return d_socket != 0;
 }
 
-void
-gr_udp_source::close()
+gr_udp_source_sptr
+gr_make_udp_source (size_t itemsize, const char *ipaddr, 
+                   unsigned short port, int payload_size, bool wait)
 {
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
+  return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
+                                               port, payload_size, wait));
+}
+
+gr_udp_source::~gr_udp_source ()
+{
+  delete [] d_temp_buff;
 
   if (d_socket){
     shutdown(d_socket, SHUT_RDWR);
@@ -197,7 +185,11 @@ gr_udp_source::close()
 #endif
     d_socket = 0;
   }
-  d_updated = true;
+
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
+  // free winsock resources
+  WSACleanup();
+#endif
 }
 
 int 
@@ -232,8 +224,38 @@ gr_udp_source::work (int noutput_items,
     d_temp_offset = d_temp_offset+d_residual;
   }
 
+#if USE_SELECT
+  // Use select() to determine when socket is readable
+  fd_set readfds;
+  timeval timeout;
+  timeout.tv_sec = 1;
+  timeout.tv_usec = 0;
+#endif
+  
   while(1) {
     // get the data into our output buffer and record the number of bytes
+
+#if USE_SELECT
+    // RCV_TIMEO doesn't work on all systems (e.g., Cygwin)
+    // use select() instead of, or in addition to RCV_TIMEO
+    FD_ZERO(&readfds);
+    FD_SET(d_socket, &readfds);
+    r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
+    if(r < 0) {
+       report_error("udp_source/select",NULL);
+       return -1;
+    }
+    else if(r == 0 ) {  // timed out
+      if( d_wait ) {
+       // Allow boost thread interrupt, then try again
+       boost::this_thread::interruption_point();
+       continue;
+      }
+      else
+       return -1;
+    }
+#endif // USE_SELECT
+
     // This is a non-blocking call with a timeout set in the constructor
     r = recv(d_socket, d_temp_buff, d_payload_size, 0);  // get the entire payload or the what's available
 
@@ -244,11 +266,16 @@ gr_udp_source::work (int noutput_items,
        printf("UDP receive timed out\n"); 
         #endif
 
-       // Break here to allow the rest of the flow graph time to run and so ctrl-C breaks
-       break;
+       if( d_wait ) {
+         // Allow boost thread interrupt, then try again
+         boost::this_thread::interruption_point();
+         continue;
+       }
+       else
+         return -1;
       }
       else {
-       report_error("udp_source",NULL);
+       report_error("udp_source/recv",NULL);
        return -1;
       }
     }