Merge remote branch 'gnuradio/wip/udp_source_sink'
authorJohnathan Corgan <jcorgan@corganenterprises.com>
Sun, 23 May 2010 20:36:44 +0000 (13:36 -0700)
committerJohnathan Corgan <jcorgan@corganenterprises.com>
Sun, 23 May 2010 20:36:44 +0000 (13:36 -0700)
* gnuradio/wip/udp_source_sink:
  gnuradio-core: update copyrights
  gnuradio-core: allow swig to handle exceptions in UDP source/sink
  grc: update UDP source and sink block wrappers
  Simplify USE_SELECT usage
  Return immediately when using d_residual.
  Defend against a peer that sends an invalid message length.
  Move initialization of select timeout
  Correct update of d_temp_offset (parallel construction)
  Identify memory leaks that occur on error conditions
  Use -1 as file descriptor "not open" value instead of 0
  Add additional conditionalization of networking includes
  Flush pending errors in gr_udp_sink on disconnect()
  Rework UDP source and sink, with incompatible API changes
  Updates to udp source/sink (select(), wait, cleanup)
  Discard data in gr_udp_sink until receiver is started.
  Use getaddrinfo in gr_udp_{source,sink}
  Changes to gr_udp_{source,sink} for MinGW
  Ignore ENOPROTOOPT return from setsockopt(SO_LINGER)

1  2 
configure.ac
gnuradio-core/src/lib/io/gr_udp_sink.cc
gnuradio-core/src/lib/io/gr_udp_sink.h
gnuradio-core/src/lib/io/gr_udp_sink.i
gnuradio-core/src/lib/io/gr_udp_source.cc
gnuradio-core/src/lib/io/gr_udp_source.h
gnuradio-core/src/lib/io/gr_udp_source.i

diff --combined configure.ac
index 0b651fd345d3c3c45127f91f4483dde73cd6eec2,5200059e4e3e2ffe3a67665366a421ee1bb45cc5..29a84cb62167c534dd4b966b813d5d1efe936e42
@@@ -124,7 -124,7 +124,7 @@@ dnl AC_DISABLE_SHARED      dnl don't build s
  AC_ENABLE_SHARED      dnl do build shared libraries
  AC_DISABLE_STATIC     dnl don't build static libraries
  m4_ifdef([LT_INIT],[LT_INIT],[AC_PROG_LIBTOOL])
 -GR_FORTRAN
 +dnl GR_FORTRAN
  
  GR_NO_UNDEFINED               dnl do we need the -no-undefined linker flag
  GR_SCRIPTING
@@@ -170,9 -170,9 +170,9 @@@ AC_HEADER_SYS_WAI
  AC_CHECK_HEADERS(fcntl.h limits.h strings.h time.h sys/ioctl.h sys/time.h unistd.h)
  AC_CHECK_HEADERS(linux/ppdev.h dev/ppbus/ppi.h sys/mman.h sys/select.h sys/types.h)
  AC_CHECK_HEADERS(sys/resource.h stdint.h sched.h signal.h sys/syscall.h malloc.h)
- AC_CHECK_HEADERS(netinet/in.h)
  AC_CHECK_HEADERS(windows.h)
  AC_CHECK_HEADERS(vec_types.h)
+ AC_CHECK_HEADERS(netdb.h netinet/in.h arpa/inet.h sys/types.h sys/socket.h)
  
  dnl Checks for typedefs, structures, and compiler characteristics.
  AC_C_CONST
@@@ -340,6 -340,7 +340,6 @@@ GRC_GCEL
  GRC_GNURADIO_CORE
  GRC_USRP
  GRC_USRP2
 -GRC_VRT
  GRC_GR_USRP                   dnl this must come after GRC_USRP
  GRC_GR_USRP2
  GRC_GR_GCELL                  dnl this must come after GRC_GCELL and GRC_GNURADIO_CORE
index d37adfb8a8cf780f996ba6a1b1f7cc566d06e450,3084a848be7a78c537af79e069703ca404533052..3084a848be7a78c537af79e069703ca404533052
mode 100644,100755..100644
@@@ -1,6 -1,6 +1,6 @@@
  /* -*- c++ -*- */
  /*
-  * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+  * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
   * 
   * This file is part of GNU Radio
   * 
  #include <gr_udp_sink.h>
  #include <gr_io_signature.h>
  #include <stdexcept>
- #if defined(HAVE_SOCKET)
- #include <netdb.h>
+ #include <errno.h>
  #include <stdio.h>
+ #include <string.h>
+ #if defined(HAVE_NETDB_H)
+ #include <netdb.h>
+ #ifdef HAVE_SYS_TYPES_H
+ #include <sys/types.h>
+ #endif
+ #ifdef HAVE_SYS_SOCKET_H
+ #include <sys/socket.h>  //usually included by <netdb.h>?
+ #endif
  typedef void* optval_t;
- #else
+ #elif defined(HAVE_WINDOWS_H)
+ // if not posix, assume winsock
+ #define USING_WINSOCK
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
  #define SHUT_RDWR 2
- #define inet_aton(N,A) ( (A)->s_addr = inet_addr(N), ( (A)->s_addr != INADDR_NONE ) )
  typedef char* optval_t;
  #endif
  
  
  #define SNK_VERBOSE 0
  
- gr_udp_sink::gr_udp_sink (size_t itemsize, 
-                         const char *src, unsigned short port_src,
-                         const char *dst, unsigned short port_dst,
-                         int payload_size)
-   : gr_sync_block ("udp_sink",
-                  gr_make_io_signature (1, 1, itemsize),
-                  gr_make_io_signature (0, 0, 0)),
-     d_itemsize (itemsize), d_updated(false), d_payload_size(payload_size)
+ static int is_error( int perr )
  {
-   int ret = 0;
-   
-   // Set up the address stucture for the source address and port numbers
-   // Get the source IP address from the host name
-   struct hostent *hsrc = gethostbyname(src);
-   if(hsrc) {   // if the source was provided as a host namex
-     d_ip_src = *(struct in_addr*)hsrc->h_addr_list[0];    
+   // Compare error to posix error code; return nonzero if match.
+ #if defined(USING_WINSOCK)
+ #define ENOPROTOOPT 109
+ #define ECONNREFUSED 111
+   // All codes to be checked for must be defined below
+   int werr = WSAGetLastError();
+   switch( werr ) {
+   case WSAETIMEDOUT:
+     return( perr == EAGAIN );
+   case WSAENOPROTOOPT:
+     return( perr == ENOPROTOOPT );
+   case WSAECONNREFUSED:
+     return( perr == ECONNREFUSED );
+   default:
+     fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr );
+     throw std::runtime_error("internal error");
    }
-   else { // assume it was specified as an IP address
-     if((ret=inet_aton(src, &d_ip_src)) == 0) {            // format IP address
-       perror("Not a valid source IP address or host name");
-       throw std::runtime_error("can't initialize source socket");
-     }
-   }
-   // Get the destination IP address from the host name
-   struct hostent *hdst = gethostbyname(dst);
-   if(hdst) {   // if the source was provided as a host namex
-     d_ip_dst = *(struct in_addr*)hdst->h_addr_list[0];    
-   }
-   else { // assume it was specified as an IP address
-     if((ret=inet_aton(dst, &d_ip_dst)) == 0) {            // format IP address
-       perror("Not a valid destination IP address or host name");
-       throw std::runtime_error("can't initialize destination socket");
-     }
-   }
-   d_port_src = htons(port_src);           // format port number
-   d_port_dst = htons(port_dst);           // format port number
-   d_sockaddr_src.sin_family = AF_INET;
-   d_sockaddr_src.sin_addr   = d_ip_src;
-   d_sockaddr_src.sin_port   = d_port_src;
-   d_sockaddr_dst.sin_family = AF_INET;
-   d_sockaddr_dst.sin_addr   = d_ip_dst;
-   d_sockaddr_dst.sin_port   = d_port_dst;
-   
-   open();
- }
- // public constructor that returns a shared_ptr
- gr_udp_sink_sptr
- gr_make_udp_sink (size_t itemsize, 
-                 const char *src, unsigned short port_src,
-                 const char *dst, unsigned short port_dst,
-                 int payload_size)
- {
-   return gr_udp_sink_sptr (new gr_udp_sink (itemsize, 
-                                           src, port_src,
-                                           dst, port_dst,
-                                           payload_size));
+   return 0;
+ #else
+   return( perr == errno );
+ #endif
  }
  
gr_udp_sink::~gr_udp_sink ()
static void report_error( const char *msg1, const char *msg2 )
  {
-   close();
+   // Deal with errors, both posix and winsock
+ #if defined(USING_WINSOCK)
+   int werr = WSAGetLastError();
+   fprintf(stderr, "%s: winsock error %d\n", msg1, werr );
+ #else
+   perror(msg1);
+ #endif
+   if( msg2 != NULL )
+     throw std::runtime_error(msg2);
+   return;
  }
  
- bool
- gr_udp_sink::open()
+ gr_udp_sink::gr_udp_sink (size_t itemsize, 
+                         const char *host, unsigned short port,
+                         int payload_size, bool eof)
+   : gr_sync_block ("udp_sink",
+                  gr_make_io_signature (1, 1, itemsize),
+                  gr_make_io_signature (0, 0, 0)),
+     d_itemsize (itemsize), d_payload_size(payload_size), d_eof(eof),
+     d_socket(-1), d_connected(false)
  {
-   gruel::scoped_lock guard(d_mutex);  // hold mutex for duration of this function
-   // create socket
-   if((d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
-     perror("socket open");
-     throw std::runtime_error("can't open socket");
+ #if defined(USING_WINSOCK) // for Windows (with MinGW)
+   // initialize winsock DLL
+   WSADATA wsaData;
+   int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
+   if( iResult != NO_ERROR ) {
+     report_error( "gr_udp_source WSAStartup", "can't open socket" );
    }
+ #endif
  
-   // Turn on reuse address
-   int opt_val = true;
-   if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (optval_t)&opt_val, sizeof(int)) == -1) {
-     perror("SO_REUSEADDR");
-     throw std::runtime_error("can't set socket option SO_REUSEADDR");
+   // create socket
+   d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+   if(d_socket == -1) {
+     report_error("socket open","can't open socket");
    }
  
    // Don't wait when shutting down
    lngr.l_onoff  = 1;
    lngr.l_linger = 0;
    if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) {
-     perror("SO_LINGER");
-     throw std::runtime_error("can't set socket option SO_LINGER");
+     if( !is_error(ENOPROTOOPT) ) {  // no SO_LINGER for SOCK_DGRAM on Windows
+       report_error("SO_LINGER","can't set socket option SO_LINGER");
+     }
    }
  
-   // bind socket to an address and port number to listen on
-   if(bind (d_socket, (sockaddr*)&d_sockaddr_src, sizeof(struct sockaddr)) == -1) {
-     perror("socket bind");
-     throw std::runtime_error("can't bind socket");
-   }
+   // Get the destination address
+   connect(host, port);
+ }
  
-   // Not sure if we should throw here or allow retries
-   if(connect(d_socket, (sockaddr*)&d_sockaddr_dst, sizeof(struct sockaddr)) == -1) {
-     perror("socket connect");
-     throw std::runtime_error("can't connect to socket");
-   }
+ // public constructor that returns a shared_ptr
  
-   d_updated = true;
-   return d_socket != 0;
+ gr_udp_sink_sptr
+ gr_make_udp_sink (size_t itemsize, 
+                 const char *host, unsigned short port,
+                 int payload_size, bool eof)
+ {
+   return gr_udp_sink_sptr (new gr_udp_sink (itemsize, 
+                                           host, port,
+                                           payload_size, eof));
  }
  
- void
- gr_udp_sink::close()
+ gr_udp_sink::~gr_udp_sink ()
  {
-   gruel::scoped_lock guard(d_mutex);  // hold mutex for duration of this function
+   if (d_connected)
+     disconnect();
  
-   if (d_socket){
+   if (d_socket != -1){
      shutdown(d_socket, SHUT_RDWR);
-     d_socket = 0;
+ #if defined(USING_WINSOCK)
+     closesocket(d_socket);
+ #else
+     ::close(d_socket);
+ #endif
+     d_socket = -1;
    }
-   d_updated = true;
+ #if defined(USING_WINSOCK) // for Windows (with MinGW)
+   // free winsock resources
+   WSACleanup();
+ #endif
  }
  
  int 
@@@ -174,21 -171,31 +171,31 @@@ gr_udp_sink::work (int noutput_items
    ssize_t total_size = noutput_items*d_itemsize;
  
    #if SNK_VERBOSE
-   printf("Entered upd_sink\n");
+   printf("Entered udp_sink\n");
    #endif
  
+   gruel::scoped_lock guard(d_mutex);  // protect d_socket
    while(bytes_sent <  total_size) {
      bytes_to_send = std::min((ssize_t)d_payload_size, (total_size-bytes_sent));
    
-     r = send(d_socket, (in+bytes_sent), bytes_to_send, 0);
-     if(r == -1) {         // error on send command
-       perror("udp_sink"); // there should be no error case where this function 
-       return -1;          // should not exit immediately
+     if(d_connected) {
+       r = send(d_socket, (in+bytes_sent), bytes_to_send, 0);
+       if(r == -1) {         // error on send command
+       if( is_error(ECONNREFUSED) )
+         r = bytes_to_send;  // discard data until receiver is started
+       else {
+         report_error("udp_sink",NULL); // there should be no error case where
+         return -1;                  // this function should not exit immediately
+       }
+       }
      }
+     else
+       r = bytes_to_send;  // discarded for lack of connection
      bytes_sent += r;
      
      #if SNK_VERBOSE
-     printf("\tbyte sent: %d bytes\n", bytes);
+     printf("\tbyte sent: %d bytes\n", r);
      #endif
    }
  
  
    return noutput_items;
  }
+ void gr_udp_sink::connect( const char *host, unsigned short port )
+ {
+   if(d_connected)
+     disconnect();
+   if(host != NULL ) {
+     // Get the destination address
+     struct addrinfo *ip_dst;
+     struct addrinfo hints;
+     memset( (void*)&hints, 0, sizeof(hints) );
+     hints.ai_family = AF_INET;
+     hints.ai_socktype = SOCK_DGRAM;
+     hints.ai_protocol = IPPROTO_UDP;
+     char port_str[12];
+     sprintf( port_str, "%d", port );
+     // FIXME leaks if report_error throws below
+     int ret = getaddrinfo( host, port_str, &hints, &ip_dst );
+     if( ret != 0 )
+       report_error("gr_udp_source/getaddrinfo",
+                  "can't initialize destination socket" );
+     // don't need d_mutex lock when !d_connected
+     if(::connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) {
+       report_error("socket connect","can't connect to socket");
+     }
+     d_connected = true;
+     freeaddrinfo(ip_dst);
+   }
+   return;
+ }
+ void gr_udp_sink::disconnect()
+ {
+   if(!d_connected)
+     return;
+   #if SNK_VERBOSE
+   printf("gr_udp_sink disconnecting\n");
+   #endif
+   gruel::scoped_lock guard(d_mutex);  // protect d_socket from work()
+   // Send a few zero-length packets to signal receiver we are done
+   if(d_eof) {
+     int i;
+     for( i = 0; i < 3; i++ )
+       (void) send( d_socket, NULL, 0, 0 );  // ignore errors
+   }
+   // Sending EOF can produce ERRCONNREFUSED errors that won't show up
+   //  until the next send or recv, which might confuse us if it happens
+   //  on a new connection.  The following does a nonblocking recv to
+   //  clear any such errors.
+   timeval timeout;
+   timeout.tv_sec = 0;    // zero time for immediate return
+   timeout.tv_usec = 0;
+   fd_set readfds;
+   FD_ZERO(&readfds);
+   FD_SET(d_socket, &readfds);
+   int r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
+   if(r < 0) {
+       #if SNK_VERBOSE
+       report_error("udp_sink/select",NULL);
+       #endif
+   }
+   else if(r > 0) {  // call recv() to get error return
+     r = recv(d_socket, (char*)&readfds, sizeof(readfds), 0);
+     if(r < 0) {
+       #if SNK_VERBOSE
+       report_error("udp_sink/recv",NULL);
+       #endif
+     }
+   }
+   // Since I can't find any way to disconnect a datagram socket in Cygwin,
+   // we just leave it connected but disable sending.
+ #if 0
+   // zeroed address structure should reset connection
+   struct sockaddr addr;
+   memset( (void*)&addr, 0, sizeof(addr) );
+   // addr.sa_family = AF_UNSPEC;  // doesn't work on Cygwin
+   // addr.sa_family = AF_INET;  // doesn't work on Cygwin
+   if(::connect(d_socket, &addr, sizeof(addr)) == -1)
+     report_error("socket connect","can't connect to socket");
+ #endif
+   d_connected = false;
+   return;
+ }
index f22b92dd035dd5e17f78c37ef3b9686486f61257,421d514a4db270872f218bddbc116cc8b293f054..421d514a4db270872f218bddbc116cc8b293f054
mode 100644,100755..100644
@@@ -1,6 -1,6 +1,6 @@@
  /* -*- c++ -*- */
  /*
-  * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+  * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
   * 
   * This file is part of GNU Radio
   * 
  #define INCLUDED_GR_UDP_SINK_H
  
  #include <gr_sync_block.h>
- #include <boost/thread.hpp>
- #if defined(HAVE_SOCKET)
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #elif defined(HAVE_WINDOWS_H)
- #include <winsock2.h>
- #include <windows.h>
- #endif
- #if defined(HAVE_NETINET_IN_H)
- #include <netinet/in.h>
- #endif
  #include <gruel/thread.h>
  
  class gr_udp_sink;
@@@ -43,85 -31,75 +31,75 @@@ typedef boost::shared_ptr<gr_udp_sink> 
  
  gr_udp_sink_sptr
  gr_make_udp_sink (size_t itemsize, 
-                 const char *src, unsigned short port_src,
-                 const char *dst, unsigned short port_dst,
-                 int payload_size=1472);
+                 const char *host, unsigned short port,
+                 int payload_size=1472, bool eof=true);
  
  /*!
   * \brief Write stream to an UDP socket.
   * \ingroup sink_blk
   * 
   * \param itemsize     The size (in bytes) of the item datatype
-  * \param src          The source address as either the host name or the 'numbers-and-dots'
-  *                     IP address
-  * \param port_src     Destination port to bind to (0 allows socket to choose an appropriate port)
-  * \param dst          The destination address as either the host name or the 'numbers-and-dots'
-  *                     IP address
-  * \param port_dst     Destination port to connect to
-  * \param payload_size UDP payload size by default set to 
-  *                     1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+  * \param host         The name or IP address of the receiving host; use
+  *                     NULL or None for no connection
+  * \param port         Destination port to connect to on receiving host
+  * \param payload_size UDP payload size by default set to 1472 =
+  *                     (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+  * \param eof          Send zero-length packet on disconnect
   */
  
  class gr_udp_sink : public gr_sync_block
  {
    friend gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, 
-                                           const char *src, unsigned short port_src,
-                                           const char *dst, unsigned short port_dst,
-                                           int payload_size);
+                                           const char *host,
+                                           unsigned short port,
+                                           int payload_size, bool eof);
   private:
    size_t      d_itemsize;
-   bool                d_updated;
-   gruel::mutex        d_mutex;
  
-   int            d_payload_size;    // maximum transmission unit (packet length)
-   int            d_socket;          // handle to socket
-   int            d_socket_rcv;      // handle to socket retuned in the accept call
-   struct in_addr d_ip_src;          // store the source ip info
-   struct in_addr d_ip_dst;          // store the destination ip info
-   unsigned short d_port_src;        // the port number to open for connections to this service
-   unsigned short d_port_dst;        // port number of the remove system
-   struct sockaddr_in    d_sockaddr_src;    // store the source sockaddr data (formatted IP address and port number)
-   struct sockaddr_in    d_sockaddr_dst;    // store the destination sockaddr data (formatted IP address and port number)
+   int           d_payload_size;    // maximum transmission unit (packet length)
+   bool          d_eof;             // send zero-length packet on disconnect
+   int           d_socket;          // handle to socket
+   bool          d_connected;       // are we connected?
+   gruel::mutex  d_mutex;           // protects d_socket and d_connected
  
   protected:
    /*!
     * \brief UDP Sink Constructor
     * 
     * \param itemsize     The size (in bytes) of the item datatype
-    * \param src          The source address as either the host name or the 'numbers-and-dots'
-    *                     IP address
-    * \param port_src     Destination port to bind to (0 allows socket to choose an appropriate port)
-    * \param dst          The destination address as either the host name or the 'numbers-and-dots'
-    *                     IP address
-    * \param port_dst     Destination port to connect to
+    * \param host         The name or IP address of the receiving host; use
+    *                     NULL or None for no connection
+    * \param port         Destination port to connect to on receiving host
     * \param payload_size UDP payload size by default set to 
     *                     1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+    * \param eof          Send zero-length packet on disconnect
     */
    gr_udp_sink (size_t itemsize, 
-              const char *src, unsigned short port_src,
-              const char *dst, unsigned short port_dst,
-              int payload_size);
+              const char *host, unsigned short port,
+              int payload_size, bool eof);
  
   public:
    ~gr_udp_sink ();
  
-   /*!
-    * \brief open a socket specified by the port and ip address info
-    *
-    * Opens a socket, binds to the address, and makes connectionless association
-    * over UDP. If any of these fail, the fuction retuns the error and exits.
-    */
-   bool open();
+   /*! \brief return the PAYLOAD_SIZE of the socket */
+   int payload_size() { return d_payload_size; }
  
-   /*!
-    * \brief Close current socket.
+   /*! \brief Change the connection to a new destination
+    *
+    * \param host         The name or IP address of the receiving host; use
+    *                     NULL or None to break the connection without closing
+    * \param port         Destination port to connect to on receiving host
     *
-    * Shuts down read/write on the socket
+    * Calls disconnect() to terminate any current connection first.
     */
-   void close();
+   void connect( const char *host, unsigned short port );
  
-   /*! \brief return the PAYLOAD_SIZE of the socket */
-   int payload_size() { return d_payload_size; }
+   /*! \brief Send zero-length packet (if eof is requested) then stop sending
+    *
+    * Zero-byte packets can be interpreted as EOF by gr_udp_source.  Note that
+    * disconnect occurs automatically when the sink is destroyed, but not when
+    * its top_block stops.*/
+   void disconnect();
  
    // should we export anything else?
  
index 0f37b477b2c8f5a6b6512c67350a4d976bf4b53b,a71006ae03e9e47a200500f0339c0371d310c536..a71006ae03e9e47a200500f0339c0371d310c536
mode 100644,100755..100644
@@@ -1,6 -1,6 +1,6 @@@
  /* -*- c++ -*- */
  /*
-  * Copyright 2007 Free Software Foundation, Inc.
+  * Copyright 2007,2010 Free Software Foundation, Inc.
   * 
   * This file is part of GNU Radio
   * 
@@@ -25,22 -25,22 +25,22 @@@ GR_SWIG_BLOCK_MAGIC(gr,udp_sink
  
  gr_udp_sink_sptr 
  gr_make_udp_sink (size_t itemsize, 
-                 const char *src, unsigned short port_src,
-                 const char *dst, unsigned short port_dst,
-                 int payload_size=1472);
+                 const char *host, unsigned short port,
+                 int payload_size=1472, bool eof=true) throw (std::runtime_error);
  
  class gr_udp_sink : public gr_sync_block
  {
   protected:
    gr_udp_sink (size_t itemsize, 
-              const char *src, unsigned short port_src,
-              const char *dst, unsigned short port_dst,
-              int payload_size);
-   bool open();
-   void close();
-   int payload_size() { return d_payload_size; }
+              const char *host, unsigned short port,
+              int payload_size, bool eof) 
+     throw (std::runtime_error);
  
   public:
    ~gr_udp_sink ();
+   int payload_size() { return d_payload_size; }
+   void connect( const char *host, unsigned short port );
+   void disconnect();
  };
index d76d0ee32fcd5ef5ab21d2b4a2ab5373d4485f46,fea9a26ba40083c3fb25768ed08badae5763fc1b..fea9a26ba40083c3fb25768ed08badae5763fc1b
mode 100644,100755..100644
@@@ -1,6 -1,6 +1,6 @@@
  /* -*- c++ -*- */
  /*
-  * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+  * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
   * 
   * This file is part of GNU Radio
   * 
  #include <errno.h>
  #include <stdio.h>
  #include <string.h>
- #if defined(HAVE_SOCKET)
+ #if defined(HAVE_NETDB_H)
  #include <netdb.h>
+ #ifdef HAVE_SYS_TYPES_H
+ #include <sys/types.h>
+ #endif
+ #ifdef HAVE_SYS_SOCKET_H
+ #include <sys/socket.h>
+ #endif
  typedef void* optval_t;
- #else
+ // ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order
+ #if defined(HAVE_NETINET_IN_H)
+ #include <netinet/in.h>
+ #endif
+ #if defined(HAVE_ARPA_INET_H)
+ #include <arpa/inet.h>
+ #endif
+ #elif defined(HAVE_WINDOWS_H)
+ // if not posix, assume winsock
+ #define USING_WINSOCK
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
  #define SHUT_RDWR 2
- #define inet_aton(N,A) ( (A)->s_addr = inet_addr(N), ( (A)->s_addr != INADDR_NONE ) )
  typedef char* optval_t;
  #endif
  
+ #define USE_SELECT    1  // non-blocking receive on all platforms
+ #define USE_RCV_TIMEO 0  // non-blocking receive on all but Cygwin
  #define SRC_VERBOSE 0
  
- gr_udp_source::gr_udp_source(size_t itemsize, const char *src, 
-                            unsigned short port_src, int payload_size)
+ static int is_error( int perr )
+ {
+   // Compare error to posix error code; return nonzero if match.
+ #if defined(USING_WINSOCK)
+ #define ENOPROTOOPT 109
+   // All codes to be checked for must be defined below
+   int werr = WSAGetLastError();
+   switch( werr ) {
+   case WSAETIMEDOUT:
+     return( perr == EAGAIN );
+   case WSAENOPROTOOPT:
+     return( perr == ENOPROTOOPT );
+   default:
+     fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr );
+     throw std::runtime_error("internal error");
+   }
+   return 0;
+ #else
+   return( perr == errno );
+ #endif
+ }
+ static void report_error( const char *msg1, const char *msg2 )
+ {
+   // Deal with errors, both posix and winsock
+ #if defined(USING_WINSOCK)
+   int werr = WSAGetLastError();
+   fprintf(stderr, "%s: winsock error %d\n", msg1, werr );
+ #else
+   perror(msg1);
+ #endif
+   if( msg2 != NULL )
+     throw std::runtime_error(msg2);
+   return;
+ }
+ gr_udp_source::gr_udp_source(size_t itemsize, const char *host, 
+                            unsigned short port, int payload_size,
+                            bool eof, bool wait)
    : gr_sync_block ("udp_source",
                   gr_make_io_signature(0, 0, 0),
                   gr_make_io_signature(1, 1, itemsize)),
-     d_itemsize(itemsize), d_updated(false), d_payload_size(payload_size), d_residual(0), d_temp_offset(0)
+     d_itemsize(itemsize), d_payload_size(payload_size),
+     d_eof(eof), d_wait(wait), d_socket(-1), d_residual(0), d_temp_offset(0)
  {
    int ret = 0;
+ #if defined(USING_WINSOCK) // for Windows (with MinGW)
+   // initialize winsock DLL
+   WSADATA wsaData;
+   int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
+   if( iResult != NO_ERROR ) {
+     report_error( "gr_udp_source WSAStartup", "can't open socket" );
+   }
+ #endif
    
    // Set up the address stucture for the source address and port numbers
    // Get the source IP address from the host name
-   struct hostent *hsrc = gethostbyname(src);
-   if(hsrc) {   // if the source was provided as a host namex
-     d_ip_src = *(struct in_addr*)hsrc->h_addr_list[0];    
-   }
-   else { // assume it was specified as an IP address
-     if((ret=inet_aton(src, &d_ip_src)) == 0) {            // format IP address
-       perror("Not a valid source IP address or host name");
-       throw std::runtime_error("can't initialize source socket");
-     }
-   }
+   struct addrinfo *ip_src;      // store the source IP address to use
+   struct addrinfo hints;
+   memset( (void*)&hints, 0, sizeof(hints) );
+   hints.ai_family = AF_INET;
+   hints.ai_socktype = SOCK_DGRAM;
+   hints.ai_protocol = IPPROTO_UDP;
+   hints.ai_flags = AI_PASSIVE;
+   char port_str[12];
+   sprintf( port_str, "%d", port );
  
-   d_port_src = htons(port_src);     // format port number
-   
-   d_sockaddr_src.sin_family = AF_INET;
-   d_sockaddr_src.sin_addr   = d_ip_src;
-   d_sockaddr_src.sin_port   = d_port_src;
+   // FIXME leaks if report_error throws below
+   ret = getaddrinfo( host, port_str, &hints, &ip_src );
+   if( ret != 0 )
+     report_error("gr_udp_source/getaddrinfo",
+                "can't initialize source socket" );
  
+   // FIXME leaks if report_error throws below
    d_temp_buff = new char[d_payload_size];   // allow it to hold up to payload_size bytes
-   
-   open();
- }
  
- gr_udp_source_sptr
- gr_make_udp_source (size_t itemsize, const char *ipaddr, 
-                   unsigned short port, int payload_size)
- {
-   return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
-                                               port, payload_size));
- }
- gr_udp_source::~gr_udp_source ()
- {
-   delete [] d_temp_buff;
-   close();
- }
- bool
- gr_udp_source::open()
- {
-   gruel::scoped_lock guard(d_mutex);  // hold mutex for duration of this function
    // create socket
-   d_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
+   d_socket = socket(ip_src->ai_family, ip_src->ai_socktype,
+                   ip_src->ai_protocol);
    if(d_socket == -1) {
-     perror("socket open");
-     throw std::runtime_error("can't open socket");
+     report_error("socket open","can't open socket");
    }
  
    // Turn on reuse address
    int opt_val = 1;
    if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (optval_t)&opt_val, sizeof(int)) == -1) {
-     perror("SO_REUSEADDR");
-     throw std::runtime_error("can't set socket option SO_REUSEADDR");
+     report_error("SO_REUSEADDR","can't set socket option SO_REUSEADDR");
    }
  
    // Don't wait when shutting down
    lngr.l_onoff  = 1;
    lngr.l_linger = 0;
    if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) {
-     perror("SO_LINGER");
-     throw std::runtime_error("can't set socket option SO_LINGER");
+     if( !is_error(ENOPROTOOPT) ) {  // no SO_LINGER for SOCK_DGRAM on Windows
+       report_error("SO_LINGER","can't set socket option SO_LINGER");
+     }
    }
  
+ #if USE_RCV_TIMEO
    // Set a timeout on the receive function to not block indefinitely
    // This value can (and probably should) be changed
+   // Ignored on Cygwin
+ #if defined(USING_WINSOCK)
+   DWORD timeout = 1000;  // milliseconds
+ #else
    timeval timeout;
    timeout.tv_sec = 1;
    timeout.tv_usec = 0;
+ #endif
    if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (optval_t)&timeout, sizeof(timeout)) == -1) {
-     perror("SO_RCVTIMEO");
-     throw std::runtime_error("can't set socket option SO_RCVTIMEO");
+     report_error("SO_RCVTIMEO","can't set socket option SO_RCVTIMEO");
    }
+ #endif // USE_RCV_TIMEO
  
    // bind socket to an address and port number to listen on
-   if(bind (d_socket, (sockaddr*)&d_sockaddr_src, sizeof(struct sockaddr)) == -1) {
-     perror("socket bind");
-     throw std::runtime_error("can't bind socket");
+   if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) {
+     report_error("socket bind","can't bind socket");
    }
-   
-   d_updated = true;
-   return d_socket != 0;
+   freeaddrinfo(ip_src);
  }
  
- void
- gr_udp_source::close()
+ gr_udp_source_sptr
+ gr_make_udp_source (size_t itemsize, const char *ipaddr, 
+                   unsigned short port, int payload_size, bool eof, bool wait)
  {
-   gruel::scoped_lock guard(d_mutex);  // hold mutex for duration of this function
+   return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
+                                               port, payload_size, eof, wait));
+ }
+ gr_udp_source::~gr_udp_source ()
+ {
+   delete [] d_temp_buff;
  
-   if (d_socket){
+   if (d_socket != -1){
      shutdown(d_socket, SHUT_RDWR);
-     d_socket = 0;
+ #if defined(USING_WINSOCK)
+     closesocket(d_socket);
+ #else
+     ::close(d_socket);
+ #endif
+     d_socket = -1;
    }
-   d_updated = true;
+ #if defined(USING_WINSOCK) // for Windows (with MinGW)
+   // free winsock resources
+   WSACleanup();
+ #endif
  }
  
  int 
@@@ -175,29 -242,85 +242,85 @@@ gr_udp_source::work (int noutput_items
      
      // Update indexing of amount of bytes left in the buffer
      d_residual -= nbytes;
-     d_temp_offset = d_temp_offset+d_residual;
+     d_temp_offset += nbytes;
+     // Return now with what we've got.
+     assert(nbytes % d_itemsize == 0);
+     return nbytes/d_itemsize;
    }
  
    while(1) {
      // get the data into our output buffer and record the number of bytes
+ #if USE_SELECT
+     // RCV_TIMEO doesn't work on all systems (e.g., Cygwin)
+     // use select() instead of, or in addition to RCV_TIMEO
+     fd_set readfds;
+     timeval timeout;
+     timeout.tv_sec = 1;         // Init timeout each iteration.  Select can modify it.
+     timeout.tv_usec = 0;
+     FD_ZERO(&readfds);
+     FD_SET(d_socket, &readfds);
+     r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
+     if(r < 0) {
+       report_error("udp_source/select",NULL);
+       return -1;
+     }
+     else if(r == 0 ) {  // timed out
+       if( d_wait ) {
+       // Allow boost thread interrupt, then try again
+       boost::this_thread::interruption_point();
+       continue;
+       }
+       else
+       return -1;
+     }
+ #endif // USE_SELECT
      // This is a non-blocking call with a timeout set in the constructor
      r = recv(d_socket, d_temp_buff, d_payload_size, 0);  // get the entire payload or the what's available
  
+     // If r > 0, round it down to a multiple of d_itemsize 
+     // (If sender is broken, don't propagate problem)
+     if (r > 0)
+       r = (r/d_itemsize) * d_itemsize;
      // Check if there was a problem; forget it if the operation just timed out
      if(r == -1) {
-       if(errno == EAGAIN) {  // handle non-blocking call timeout
+       if( is_error(EAGAIN) ) {  // handle non-blocking call timeout
          #if SRC_VERBOSE
        printf("UDP receive timed out\n"); 
          #endif
  
-       // Break here to allow the rest of the flow graph time to run and so ctrl-C breaks
-       break;
+       if( d_wait ) {
+         // Allow boost thread interrupt, then try again
+         boost::this_thread::interruption_point();
+         continue;
+       }
+       else
+         return -1;
        }
        else {
-       perror("udp_source");
+       report_error("udp_source/recv",NULL);
        return -1;
        }
      }
+     else if(r==0) {
+       if(d_eof) {
+       // zero-length packet interpreted as EOF
+       #if SNK_VERBOSE
+       printf("\tzero-length packet received; returning EOF\n");
+       #endif
+       return -1;
+       }
+       else{
+       // do we need to allow boost thread interrupt?
+       boost::this_thread::interruption_point();
+       continue;
+       }
+     }
      else {
        // Calculate the number of bytes we can take from the buffer in this call
        nbytes = std::min(r, total_bytes-bytes_received);
    return bytes_received/d_itemsize;
  }
  
+ // Return port number of d_socket
+ int gr_udp_source::get_port(void)
+ {
+   sockaddr_in name;
+   socklen_t len = sizeof(name);
+   int ret = getsockname( d_socket, (sockaddr*)&name, &len );
+   if( ret ) {
+     report_error("gr_udp_source/getsockname",NULL);
+     return -1;
+   }
+   return ntohs(name.sin_port);
+ }
index 61d719e4da88e86be806fa98357facace22124ea,e23231aa77adf5aa296a70fbd479a9411596c557..e23231aa77adf5aa296a70fbd479a9411596c557
mode 100644,100755..100644
@@@ -1,6 -1,6 +1,6 @@@
  /* -*- c++ -*- */
  /*
-  * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+  * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
   * 
   * This file is part of GNU Radio
   * 
  #define INCLUDED_GR_UDP_SOURCE_H
  
  #include <gr_sync_block.h>
- #if defined(HAVE_SOCKET)
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #elif defined(HAVE_WINDOWS_H)
- #include <winsock2.h>
- #include <windows.h>
- #endif
- #if defined(HAVE_NETINET_IN_H)
- #include <netinet/in.h>
- #endif
  #include <gruel/thread.h>
  
  class gr_udp_source;
  typedef boost::shared_ptr<gr_udp_source> gr_udp_source_sptr;
  
- gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, 
-                                     unsigned short port_src, int payload_size=1472);
+ gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *host, 
+                                     unsigned short port,
+                                     int payload_size=1472,
+                                     bool eof=true, bool wait=true);
  
  /*! 
   * \brief Read stream from an UDP socket.
   * \ingroup source_blk
   *
   * \param itemsize     The size (in bytes) of the item datatype
-  * \param src          The source address as either the host name or the 'numbers-and-dots'
-  *                     IP address
-  * \param port_src     The port number on which the socket listens for data
-  * \param payload_size UDP payload size by default set to 
-  *                     1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+  * \param host         The name or IP address of the receiving host; can be
+  *                     NULL, None, or "0.0.0.0" to allow reading from any
+  *                     interface on the host
+  * \param port         The port number on which to receive data; use 0 to
+  *                     have the system assign an unused port number
+  * \param payload_size UDP payload size by default set to 1472 =
+  *                     (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+  * \param eof          Interpret zero-length packet as EOF (default: true)
+  * \param wait         Wait for data if not immediately available
+  *                     (default: true)
   *
  */
  
  class gr_udp_source : public gr_sync_block
  {
-   friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, 
-                                              unsigned short port_src, int payload_size);
+   friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize,
+                                              const char *host, 
+                                              unsigned short port,
+                                              int payload_size,
+                                              bool eof, bool wait);
  
   private:
    size_t      d_itemsize;
-   bool                d_updated;
-   gruel::mutex        d_mutex;
-   int            d_payload_size;  // maximum transmission unit (packet length)
-   int            d_socket;        // handle to socket
-   int            d_socket_rcv;    // handle to socket retuned in the accept call
-   struct in_addr d_ip_src;        // store the source IP address to use
-   unsigned short d_port_src;      // the port number to open for connections to this service
-   struct sockaddr_in    d_sockaddr_src;  // store the source sockaddr data (formatted IP address and port number)
+   int           d_payload_size;  // maximum transmission unit (packet length)
+   bool          d_eof;           // zero-length packet is EOF
+   bool          d_wait;          // wait if data if not immediately available
+   int           d_socket;        // handle to socket
    char *d_temp_buff;    // hold buffer between calls
    ssize_t d_residual;   // hold information about number of bytes stored in the temp buffer
    size_t d_temp_offset; // point to temp buffer location offset
     * \brief UDP Source Constructor
     * 
     * \param itemsize     The size (in bytes) of the item datatype
-    * \param src          The source address as either the host name or the 'numbers-and-dots'
-    *                     IP address
-    * \param port_src     The port number on which the socket listens for data
-    * \param payload_size UDP payload size by default set to 
-    *                     1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+    * \param host         The name or IP address of the receiving host; can be
+    *                     NULL, None, or "0.0.0.0" to allow reading from any
+    *                     interface on the host
+    * \param port         The port number on which to receive data; use 0 to
+    *                     have the system assign an unused port number
+    * \param payload_size UDP payload size by default set to 1472 =
+    *                     (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+    * \param eof          Interpret zero-length packet as EOF (default: true)
+    * \param wait         Wait for data if not immediately available
+    *                     (default: true)
     */
-   gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, int payload_size);
+   gr_udp_source(size_t itemsize, const char *host, unsigned short port,
+               int payload_size, bool eof, bool wait);
  
   public:
    ~gr_udp_source();
  
-   /*!
-    * \brief open a socket specified by the port and ip address info
-    *
-    * Opens a socket, binds to the address, and waits for a connection
-    * over UDP. If any of these fail, the fuction retuns the error and exits.
-    */
-   bool open();
-   /*!
-    * \brief Close current socket.
-    *
-    * Shuts down read/write on the socket
-    */
-   void close();
    /*! \brief return the PAYLOAD_SIZE of the socket */
    int payload_size() { return d_payload_size; }
  
+   /*! \breif return the port number of the socket */
+   int get_port();
    // should we export anything else?
  
    int work(int noutput_items,
index fb39dad68f509aed3635a445ea764c952db1c461,2001f33e9cc8ec3cd2cc2a22fe7af7acbd71e412..2001f33e9cc8ec3cd2cc2a22fe7af7acbd71e412
mode 100644,100755..100644
@@@ -1,6 -1,6 +1,6 @@@
  /* -*- c++ -*- */
  /*
-  * Copyright 2007 Free Software Foundation, Inc.
+  * Copyright 2007,2010 Free Software Foundation, Inc.
   * 
   * This file is part of GNU Radio
   * 
  GR_SWIG_BLOCK_MAGIC(gr,udp_source)
  
  gr_udp_source_sptr 
- gr_make_udp_source (size_t itemsize, const char *src, 
-                   unsigned short port_src, int payload_size=1472);
+ gr_make_udp_source (size_t itemsize, const char *host, 
+                   unsigned short port, int payload_size=1472,
+                   bool eof=true, bool wait=true) throw (std::runtime_error);
  
  class gr_udp_source : public gr_sync_block
  {
   protected:
-   gr_udp_source (size_t itemsize, const char *src
-                unsigned short port_src, int payload_size);
+   gr_udp_source (size_t itemsize, const char *host
+                unsigned short port, int payload_size, bool eof, bool wait) throw (std::runtime_error);
  
   public:
    ~gr_udp_source ();
  
-   bool open();
-   void close();
    int payload_size() { return d_payload_size; }
+   int get_port();
  };