Merge branch 'master' into udp
authorDon Ward <don2387ward@sprynet.com>
Fri, 30 Apr 2010 21:00:33 +0000 (17:00 -0400)
committerDon Ward <don2387ward@sprynet.com>
Fri, 30 Apr 2010 21:00:33 +0000 (17:00 -0400)
gnuradio-core/src/lib/io/gr_udp_sink.cc [changed mode: 0644->0755]
gnuradio-core/src/lib/io/gr_udp_sink.h
gnuradio-core/src/lib/io/gr_udp_source.cc [changed mode: 0644->0755]
gnuradio-core/src/lib/io/gr_udp_source.h [changed mode: 0644->0755]
gnuradio-core/src/lib/io/gr_udp_source.i [changed mode: 0644->0755]
gnuradio-examples/python/network/dial_tone_sink.py
usrp2/host/include/usrp2/usrp2.h

old mode 100644 (file)
new mode 100755 (executable)
index d37adfb..263d3dd
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
 #include <gr_udp_sink.h>
 #include <gr_io_signature.h>
 #include <stdexcept>
-#if defined(HAVE_SOCKET)
-#include <netdb.h>
+#include <errno.h>
 #include <stdio.h>
+#include <string.h>
+#if defined(HAVE_NETDB_H)
 typedef void* optval_t;
-#else
+#elif defined(HAVE_WINDOWS_H)
+// if not posix, assume winsock
+#define USING_WINSOCK
+#include <winsock2.h>
+#include <ws2tcpip.h>
 #define SHUT_RDWR 2
-#define inet_aton(N,A) ( (A)->s_addr = inet_addr(N), ( (A)->s_addr != INADDR_NONE ) )
 typedef char* optval_t;
 #endif
 
@@ -40,6 +44,45 @@ typedef char* optval_t;
 
 #define SNK_VERBOSE 0
 
+static int is_error( int perr )
+{
+  // Compare error to posix error code; return nonzero if match.
+#if defined(USING_WINSOCK)
+#define ENOPROTOOPT 109
+#define ECONNREFUSED 111
+  // All codes to be checked for must be defined below
+  int werr = WSAGetLastError();
+  switch( werr ) {
+  case WSAETIMEDOUT:
+    return( perr == EAGAIN );
+  case WSAENOPROTOOPT:
+    return( perr == ENOPROTOOPT );
+  case WSAECONNREFUSED:
+    return( perr == ECONNREFUSED );
+  default:
+    fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr );
+    throw std::runtime_error("internal error");
+  }
+  return 0;
+#else
+  return( perr == errno );
+#endif
+}
+
+static void report_error( const char *msg1, const char *msg2 )
+{
+  // Deal with errors, both posix and winsock
+#if defined(USING_WINSOCK)
+  int werr = WSAGetLastError();
+  fprintf(stderr, "%s: winsock error %d\n", msg1, werr );
+#else
+  perror(msg1);
+#endif
+  if( msg2 != NULL )
+    throw std::runtime_error(msg2);
+  return;
+}
+
 gr_udp_sink::gr_udp_sink (size_t itemsize, 
                          const char *src, unsigned short port_src,
                          const char *dst, unsigned short port_dst,
@@ -47,84 +90,53 @@ gr_udp_sink::gr_udp_sink (size_t itemsize,
   : gr_sync_block ("udp_sink",
                   gr_make_io_signature (1, 1, itemsize),
                   gr_make_io_signature (0, 0, 0)),
-    d_itemsize (itemsize), d_updated(false), d_payload_size(payload_size)
+    d_itemsize (itemsize), d_payload_size(payload_size)
 {
   int ret = 0;
+  struct addrinfo *ip_src;        // store the source ip info
+  struct addrinfo *ip_dst;        // store the destination ip info
+
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
+  // initialize winsock DLL
+  WSADATA wsaData;
+  int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
+  if( iResult != NO_ERROR ) {
+    report_error( "gr_udp_source WSAStartup", "can't open socket" );
+  }
+#endif
   
   // Set up the address stucture for the source address and port numbers
   // Get the source IP address from the host name
-  struct hostent *hsrc = gethostbyname(src);
-  if(hsrc) {   // if the source was provided as a host namex
-    d_ip_src = *(struct in_addr*)hsrc->h_addr_list[0];    
-  }
-  else { // assume it was specified as an IP address
-    if((ret=inet_aton(src, &d_ip_src)) == 0) {            // format IP address
-      perror("Not a valid source IP address or host name");
-      throw std::runtime_error("can't initialize source socket");
-    }
-  }
+  struct addrinfo hints;
+  memset( (void*)&hints, 0, sizeof(hints) );
+  hints.ai_family = AF_INET;
+  hints.ai_socktype = SOCK_DGRAM;
+  hints.ai_protocol = IPPROTO_UDP;
+  char port_str[7];
+  sprintf( port_str, "%d", port_src );
+  ret = getaddrinfo( src, port_str, &hints, &ip_src );
+  if( ret != 0 )
+    report_error("gr_udp_source/getaddrinfo",
+                "can't initialize source socket" );
 
   // Get the destination IP address from the host name
-  struct hostent *hdst = gethostbyname(dst);
-  if(hdst) {   // if the source was provided as a host namex
-    d_ip_dst = *(struct in_addr*)hdst->h_addr_list[0];    
-  }
-  else { // assume it was specified as an IP address
-    if((ret=inet_aton(dst, &d_ip_dst)) == 0) {            // format IP address
-      perror("Not a valid destination IP address or host name");
-      throw std::runtime_error("can't initialize destination socket");
-    }
-  }
-
-  d_port_src = htons(port_src);           // format port number
-  d_port_dst = htons(port_dst);           // format port number
-
-  d_sockaddr_src.sin_family = AF_INET;
-  d_sockaddr_src.sin_addr   = d_ip_src;
-  d_sockaddr_src.sin_port   = d_port_src;
-
-  d_sockaddr_dst.sin_family = AF_INET;
-  d_sockaddr_dst.sin_addr   = d_ip_dst;
-  d_sockaddr_dst.sin_port   = d_port_dst;
-  
-  open();
-}
-
-// public constructor that returns a shared_ptr
-
-gr_udp_sink_sptr
-gr_make_udp_sink (size_t itemsize, 
-                 const char *src, unsigned short port_src,
-                 const char *dst, unsigned short port_dst,
-                 int payload_size)
-{
-  return gr_udp_sink_sptr (new gr_udp_sink (itemsize, 
-                                           src, port_src,
-                                           dst, port_dst,
-                                           payload_size));
-}
-
-gr_udp_sink::~gr_udp_sink ()
-{
-  close();
-}
-
-bool
-gr_udp_sink::open()
-{
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
+  sprintf( port_str, "%d", port_dst );
+  ret = getaddrinfo( dst, port_str, &hints, &ip_dst );
+  if( ret != 0 )
+    report_error("gr_udp_source/getaddrinfo",
+                "can't initialize destination socket" );
 
   // create socket
-  if((d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
-    perror("socket open");
-    throw std::runtime_error("can't open socket");
+  d_socket = socket(ip_src->ai_family, ip_src->ai_socktype,
+                   ip_src->ai_protocol);
+  if(d_socket == -1) {
+    report_error("socket open","can't open socket");
   }
 
   // Turn on reuse address
   int opt_val = true;
   if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (optval_t)&opt_val, sizeof(int)) == -1) {
-    perror("SO_REUSEADDR");
-    throw std::runtime_error("can't set socket option SO_REUSEADDR");
+    report_error("SO_REUSEADDR","can't set socket option SO_REUSEADDR");
   }
 
   // Don't wait when shutting down
@@ -132,36 +144,55 @@ gr_udp_sink::open()
   lngr.l_onoff  = 1;
   lngr.l_linger = 0;
   if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) {
-    perror("SO_LINGER");
-    throw std::runtime_error("can't set socket option SO_LINGER");
+    if( !is_error(ENOPROTOOPT) ) {  // no SO_LINGER for SOCK_DGRAM on Windows
+      report_error("SO_LINGER","can't set socket option SO_LINGER");
+    }
   }
 
   // bind socket to an address and port number to listen on
-  if(bind (d_socket, (sockaddr*)&d_sockaddr_src, sizeof(struct sockaddr)) == -1) {
-    perror("socket bind");
-    throw std::runtime_error("can't bind socket");
+  if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) {
+    report_error("socket bind","can't bind socket");
   }
 
   // Not sure if we should throw here or allow retries
-  if(connect(d_socket, (sockaddr*)&d_sockaddr_dst, sizeof(struct sockaddr)) == -1) {
-    perror("socket connect");
-    throw std::runtime_error("can't connect to socket");
+  if(connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) {
+    report_error("socket connect","can't connect to socket");
   }
 
-  d_updated = true;
-  return d_socket != 0;
+  freeaddrinfo(ip_src);
+  freeaddrinfo(ip_dst);
 }
 
-void
-gr_udp_sink::close()
+// public constructor that returns a shared_ptr
+
+gr_udp_sink_sptr
+gr_make_udp_sink (size_t itemsize, 
+                 const char *src, unsigned short port_src,
+                 const char *dst, unsigned short port_dst,
+                 int payload_size)
 {
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
+  return gr_udp_sink_sptr (new gr_udp_sink (itemsize, 
+                                           src, port_src,
+                                           dst, port_dst,
+                                           payload_size));
+}
 
+gr_udp_sink::~gr_udp_sink ()
+{
   if (d_socket){
     shutdown(d_socket, SHUT_RDWR);
+#if defined(USING_WINSOCK)
+    closesocket(d_socket);
+#else
+    ::close(d_socket);
+#endif
     d_socket = 0;
   }
-  d_updated = true;
+
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
+  // free winsock resources
+  WSACleanup();
+#endif
 }
 
 int 
@@ -174,7 +205,7 @@ gr_udp_sink::work (int noutput_items,
   ssize_t total_size = noutput_items*d_itemsize;
 
   #if SNK_VERBOSE
-  printf("Entered upd_sink\n");
+  printf("Entered udp_sink\n");
   #endif
 
   while(bytes_sent <  total_size) {
@@ -182,8 +213,12 @@ gr_udp_sink::work (int noutput_items,
   
     r = send(d_socket, (in+bytes_sent), bytes_to_send, 0);
     if(r == -1) {         // error on send command
-      perror("udp_sink"); // there should be no error case where this function 
-      return -1;          // should not exit immediately
+      if( is_error(ECONNREFUSED) )
+       r = bytes_to_send;  // discard data until receiver is started
+      else {
+       report_error("udp_sink",NULL); // there should be no error case where
+       return -1;                  // this function should not exit immediately
+      }
     }
     bytes_sent += r;
     
index f22b92dd035dd5e17f78c37ef3b9686486f61257..6b6ee40fe856a6df4c88bc77f2e643da6f4937e5 100644 (file)
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
 #define INCLUDED_GR_UDP_SINK_H
 
 #include <gr_sync_block.h>
-#include <boost/thread.hpp>
-#if defined(HAVE_SOCKET)
-#include <sys/socket.h>
-#include <arpa/inet.h>
+#if defined(HAVE_NETDB_H)
+#include <netdb.h>
+#include <sys/socket.h>  // usually #included by <netdb.h>?
 #elif defined(HAVE_WINDOWS_H)
 #include <winsock2.h>
-#include <windows.h>
-#endif
-#if defined(HAVE_NETINET_IN_H)
-#include <netinet/in.h>
+#include <ws2tcpip.h>
 #endif
 
 #include <gruel/thread.h>
@@ -70,18 +66,9 @@ class gr_udp_sink : public gr_sync_block
                                            int payload_size);
  private:
   size_t       d_itemsize;
-  bool         d_updated;
-  gruel::mutex d_mutex;
 
   int            d_payload_size;    // maximum transmission unit (packet length)
   int            d_socket;          // handle to socket
-  int            d_socket_rcv;      // handle to socket retuned in the accept call
-  struct in_addr d_ip_src;          // store the source ip info
-  struct in_addr d_ip_dst;          // store the destination ip info
-  unsigned short d_port_src;        // the port number to open for connections to this service
-  unsigned short d_port_dst;        // port number of the remove system
-  struct sockaddr_in    d_sockaddr_src;    // store the source sockaddr data (formatted IP address and port number)
-  struct sockaddr_in    d_sockaddr_dst;    // store the destination sockaddr data (formatted IP address and port number)
 
  protected:
   /*!
@@ -105,21 +92,6 @@ class gr_udp_sink : public gr_sync_block
  public:
   ~gr_udp_sink ();
 
-  /*!
-   * \brief open a socket specified by the port and ip address info
-   *
-   * Opens a socket, binds to the address, and makes connectionless association
-   * over UDP. If any of these fail, the fuction retuns the error and exits.
-   */
-  bool open();
-
-  /*!
-   * \brief Close current socket.
-   *
-   * Shuts down read/write on the socket
-   */
-  void close();
-
   /*! \brief return the PAYLOAD_SIZE of the socket */
   int payload_size() { return d_payload_size; }
 
old mode 100644 (file)
new mode 100755 (executable)
index d76d0ee..ce870d4
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
 #include <errno.h>
 #include <stdio.h>
 #include <string.h>
-#if defined(HAVE_SOCKET)
+#if defined(HAVE_NETDB_H)
 #include <netdb.h>
 typedef void* optval_t;
-#else
+#elif defined(HAVE_WINDOWS_H)
+// if not posix, assume winsock
+#define USING_WINSOCK
+#include <winsock2.h>
+#include <ws2tcpip.h>
 #define SHUT_RDWR 2
-#define inet_aton(N,A) ( (A)->s_addr = inet_addr(N), ( (A)->s_addr != INADDR_NONE ) )
 typedef char* optval_t;
 #endif
 
+#define USE_SELECT    1  // non-blocking receive on all platforms
+#define USE_RCV_TIMEO 0  // non-blocking receive on all but Cygwin
 #define SRC_VERBOSE 0
 
+static int is_error( int perr )
+{
+  // Compare error to posix error code; return nonzero if match.
+#if defined(USING_WINSOCK)
+#define ENOPROTOOPT 109
+  // All codes to be checked for must be defined below
+  int werr = WSAGetLastError();
+  switch( werr ) {
+  case WSAETIMEDOUT:
+    return( perr == EAGAIN );
+  case WSAENOPROTOOPT:
+    return( perr == ENOPROTOOPT );
+  default:
+    fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr );
+    throw std::runtime_error("internal error");
+  }
+  return 0;
+#else
+  return( perr == errno );
+#endif
+}
+
+static void report_error( char *msg1, char *msg2 )
+{
+  // Deal with errors, both posix and winsock
+#if defined(USING_WINSOCK)
+  int werr = WSAGetLastError();
+  fprintf(stderr, "%s: winsock error %d\n", msg1, werr );
+#else
+  perror(msg1);
+#endif
+  if( msg2 != NULL )
+    throw std::runtime_error(msg2);
+  return;
+}
+
 gr_udp_source::gr_udp_source(size_t itemsize, const char *src, 
-                            unsigned short port_src, int payload_size)
+                            unsigned short port_src, int payload_size,
+                            bool wait)
   : gr_sync_block ("udp_source",
                   gr_make_io_signature(0, 0, 0),
                   gr_make_io_signature(1, 1, itemsize)),
-    d_itemsize(itemsize), d_updated(false), d_payload_size(payload_size), d_residual(0), d_temp_offset(0)
+    d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0)
 {
   int ret = 0;
+  struct addrinfo *ip_src;      // store the source IP address to use
+
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
+  // initialize winsock DLL
+  WSADATA wsaData;
+  int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
+  if( iResult != NO_ERROR ) {
+    report_error( "gr_udp_source WSAStartup", "can't open socket" );
+  }
+#endif
   
   // Set up the address stucture for the source address and port numbers
   // Get the source IP address from the host name
-  struct hostent *hsrc = gethostbyname(src);
-  if(hsrc) {   // if the source was provided as a host namex
-    d_ip_src = *(struct in_addr*)hsrc->h_addr_list[0];    
-  }
-  else { // assume it was specified as an IP address
-    if((ret=inet_aton(src, &d_ip_src)) == 0) {            // format IP address
-      perror("Not a valid source IP address or host name");
-      throw std::runtime_error("can't initialize source socket");
-    }
-  }
-
-  d_port_src = htons(port_src);     // format port number
-  
-  d_sockaddr_src.sin_family = AF_INET;
-  d_sockaddr_src.sin_addr   = d_ip_src;
-  d_sockaddr_src.sin_port   = d_port_src;
+  struct addrinfo hints;
+  memset( (void*)&hints, 0, sizeof(hints) );
+  hints.ai_family = AF_INET;
+  hints.ai_socktype = SOCK_DGRAM;
+  hints.ai_protocol = IPPROTO_UDP;
+  char port_str[7];
+  sprintf( port_str, "%d", port_src );
+  ret = getaddrinfo( src, port_str, &hints, &ip_src );
+  if( ret != 0 )
+    report_error("gr_udp_source/getaddrinfo",
+                "can't initialize source socket" );
 
   d_temp_buff = new char[d_payload_size];   // allow it to hold up to payload_size bytes
-  
-  open();
-}
-
-gr_udp_source_sptr
-gr_make_udp_source (size_t itemsize, const char *ipaddr, 
-                   unsigned short port, int payload_size)
-{
-  return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
-                                               port, payload_size));
-}
 
-gr_udp_source::~gr_udp_source ()
-{
-  delete [] d_temp_buff;
-  close();
-}
-
-bool
-gr_udp_source::open()
-{
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
   // create socket
-  d_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
+  d_socket = socket(ip_src->ai_family, ip_src->ai_socktype,
+                   ip_src->ai_protocol);
   if(d_socket == -1) {
-    perror("socket open");
-    throw std::runtime_error("can't open socket");
+    report_error("socket open","can't open socket");
   }
 
   // Turn on reuse address
   int opt_val = 1;
   if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (optval_t)&opt_val, sizeof(int)) == -1) {
-    perror("SO_REUSEADDR");
-    throw std::runtime_error("can't set socket option SO_REUSEADDR");
+    report_error("SO_REUSEADDR","can't set socket option SO_REUSEADDR");
   }
 
   // Don't wait when shutting down
@@ -110,40 +135,61 @@ gr_udp_source::open()
   lngr.l_onoff  = 1;
   lngr.l_linger = 0;
   if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) {
-    perror("SO_LINGER");
-    throw std::runtime_error("can't set socket option SO_LINGER");
+    if( !is_error(ENOPROTOOPT) ) {  // no SO_LINGER for SOCK_DGRAM on Windows
+      report_error("SO_LINGER","can't set socket option SO_LINGER");
+    }
   }
 
+#if USE_RCV_TIMEO
   // Set a timeout on the receive function to not block indefinitely
   // This value can (and probably should) be changed
+  // Ignored on Cygwin
+#if defined(USING_WINSOCK)
+  DWORD timeout = 1000;  // milliseconds
+#else
   timeval timeout;
   timeout.tv_sec = 1;
   timeout.tv_usec = 0;
+#endif
   if(setsockopt(d_socket, SOL_SOCKET, SO_RCVTIMEO, (optval_t)&timeout, sizeof(timeout)) == -1) {
-    perror("SO_RCVTIMEO");
-    throw std::runtime_error("can't set socket option SO_RCVTIMEO");
+    report_error("SO_RCVTIMEO","can't set socket option SO_RCVTIMEO");
   }
+#endif // USE_RCV_TIMEO
 
   // bind socket to an address and port number to listen on
-  if(bind (d_socket, (sockaddr*)&d_sockaddr_src, sizeof(struct sockaddr)) == -1) {
-    perror("socket bind");
-    throw std::runtime_error("can't bind socket");
+  if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) {
+    report_error("socket bind","can't bind socket");
   }
-  
-  d_updated = true;
-  return d_socket != 0;
+  freeaddrinfo(ip_src);
+
+}
+
+gr_udp_source_sptr
+gr_make_udp_source (size_t itemsize, const char *ipaddr, 
+                   unsigned short port, int payload_size, bool wait)
+{
+  return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
+                                               port, payload_size, wait));
 }
 
-void
-gr_udp_source::close()
+gr_udp_source::~gr_udp_source ()
 {
-  gruel::scoped_lock guard(d_mutex);   // hold mutex for duration of this function
+  delete [] d_temp_buff;
 
   if (d_socket){
     shutdown(d_socket, SHUT_RDWR);
+#if defined(USING_WINSOCK)
+    closesocket(d_socket);
+#else
+    ::close(d_socket);
+#endif
     d_socket = 0;
   }
-  d_updated = true;
+
+#if defined(USING_WINSOCK) // for Windows (with MinGW)
+  // free winsock resources
+  WSACleanup();
+#endif
 }
 
 int 
@@ -178,23 +224,58 @@ gr_udp_source::work (int noutput_items,
     d_temp_offset = d_temp_offset+d_residual;
   }
 
+#if USE_SELECT
+  // Use select() to determine when socket is readable
+  fd_set readfds;
+  timeval timeout;
+  timeout.tv_sec = 1;
+  timeout.tv_usec = 0;
+#endif
+  
   while(1) {
     // get the data into our output buffer and record the number of bytes
+
+#if USE_SELECT
+    // RCV_TIMEO doesn't work on all systems (e.g., Cygwin)
+    // use select() instead of, or in addition to RCV_TIMEO
+    FD_ZERO(&readfds);
+    FD_SET(d_socket, &readfds);
+    r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
+    if(r < 0) {
+       report_error("udp_source/select",NULL);
+       return -1;
+    }
+    else if(r == 0 ) {  // timed out
+      if( d_wait ) {
+       // Allow boost thread interrupt, then try again
+       boost::this_thread::interruption_point();
+       continue;
+      }
+      else
+       return -1;
+    }
+#endif // USE_SELECT
+
     // This is a non-blocking call with a timeout set in the constructor
     r = recv(d_socket, d_temp_buff, d_payload_size, 0);  // get the entire payload or the what's available
 
     // Check if there was a problem; forget it if the operation just timed out
     if(r == -1) {
-      if(errno == EAGAIN) {  // handle non-blocking call timeout
+      if( is_error(EAGAIN) ) {  // handle non-blocking call timeout
         #if SRC_VERBOSE
        printf("UDP receive timed out\n"); 
         #endif
 
-       // Break here to allow the rest of the flow graph time to run and so ctrl-C breaks
-       break;
+       if( d_wait ) {
+         // Allow boost thread interrupt, then try again
+         boost::this_thread::interruption_point();
+         continue;
+       }
+       else
+         return -1;
       }
       else {
-       perror("udp_source");
+       report_error("udp_source/recv",NULL);
        return -1;
       }
     }
old mode 100644 (file)
new mode 100755 (executable)
index 61d719e..b06536d
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007,2008,2009 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
 #define INCLUDED_GR_UDP_SOURCE_H
 
 #include <gr_sync_block.h>
-#if defined(HAVE_SOCKET)
-#include <sys/socket.h>
-#include <arpa/inet.h>
+#if defined(HAVE_NETDB_H)
+#include <netdb.h>
+#include <sys/socket.h>  // usually #included by <netdb.h>?
 #elif defined(HAVE_WINDOWS_H)
 #include <winsock2.h>
-#include <windows.h>
-#endif
-#if defined(HAVE_NETINET_IN_H)
-#include <netinet/in.h>
+#include <ws2tcpip.h>
 #endif
 
 #include <gruel/thread.h>
@@ -41,7 +38,8 @@ class gr_udp_source;
 typedef boost::shared_ptr<gr_udp_source> gr_udp_source_sptr;
 
 gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, 
-                                     unsigned short port_src, int payload_size=1472);
+                                     unsigned short port_src,
+                                     int payload_size=1472, bool wait=true);
 
 /*! 
  * \brief Read stream from an UDP socket.
@@ -53,25 +51,22 @@ gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src,
  * \param port_src     The port number on which the socket listens for data
  * \param payload_size UDP payload size by default set to 
  *                     1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+ * \param wait         Wait for data if not immediately available (default: true)
  *
 */
 
 class gr_udp_source : public gr_sync_block
 {
   friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, 
-                                              unsigned short port_src, int payload_size);
+                                              unsigned short port_src,
+                                              int payload_size, bool wait);
 
  private:
   size_t       d_itemsize;
-  bool         d_updated;
-  gruel::mutex d_mutex;
 
   int            d_payload_size;  // maximum transmission unit (packet length)
+  bool           d_wait;          // wait if data if not immediately available
   int            d_socket;        // handle to socket
-  int            d_socket_rcv;    // handle to socket retuned in the accept call
-  struct in_addr d_ip_src;        // store the source IP address to use
-  unsigned short d_port_src;      // the port number to open for connections to this service
-  struct sockaddr_in    d_sockaddr_src;  // store the source sockaddr data (formatted IP address and port number)
   char *d_temp_buff;    // hold buffer between calls
   ssize_t d_residual;   // hold information about number of bytes stored in the temp buffer
   size_t d_temp_offset; // point to temp buffer location offset
@@ -86,27 +81,14 @@ class gr_udp_source : public gr_sync_block
    * \param port_src     The port number on which the socket listens for data
    * \param payload_size UDP payload size by default set to 
    *                     1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+   * \param wait         Wait for data if not immediately available (default: true)
    */
-  gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, int payload_size);
+  gr_udp_source(size_t itemsize, const char *src, unsigned short port_src,
+               int payload_size, bool wait);
 
  public:
   ~gr_udp_source();
 
-  /*!
-   * \brief open a socket specified by the port and ip address info
-   *
-   * Opens a socket, binds to the address, and waits for a connection
-   * over UDP. If any of these fail, the fuction retuns the error and exits.
-   */
-  bool open();
-
-  /*!
-   * \brief Close current socket.
-   *
-   * Shuts down read/write on the socket
-   */
-  void close();
-
   /*! \brief return the PAYLOAD_SIZE of the socket */
   int payload_size() { return d_payload_size; }
 
old mode 100644 (file)
new mode 100755 (executable)
index fb39dad..efaa57c
@@ -24,19 +24,18 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_source)
 
 gr_udp_source_sptr 
 gr_make_udp_source (size_t itemsize, const char *src, 
-                   unsigned short port_src, int payload_size=1472);
+                   unsigned short port_src, int payload_size=1472,
+                   bool wait=true);
 
 class gr_udp_source : public gr_sync_block
 {
  protected:
   gr_udp_source (size_t itemsize, const char *src, 
-                unsigned short port_src, int payload_size);
+                unsigned short port_src, int payload_size, bool wait);
 
  public:
   ~gr_udp_source ();
 
-  bool open();
-  void close();
   int payload_size() { return d_payload_size; }
 
 };
index 47d24b9bcfb006764e6180b63cc95503d0e81122..82e925baa7dd05d84e46d2a9782ab534f72981e8 100755 (executable)
@@ -25,9 +25,9 @@ from gnuradio.eng_option import eng_option
 from optparse import OptionParser
 
 class dial_tone_sink(gr.top_block):
-    def __init__(self, src, port, pkt_size, sample_rate):
+    def __init__(self, src, port, pkt_size, sample_rate, wait):
         gr.top_block.__init__(self, "dial_tone_sink")
-        udp = gr.udp_source(gr.sizeof_float, src, port, pkt_size)
+        udp = gr.udp_source(gr.sizeof_float, src, port, pkt_size, wait=wait)
         sink = audio.sink(sample_rate)
         self.connect(udp, sink)
         
@@ -41,6 +41,8 @@ if __name__ == '__main__':
                       help="packet size.")
     parser.add_option("-r", "--sample-rate", type="int", default=8000,
                       help="audio signal sample rate [default=%default]")
+    parser.add_option("-n", "--no-wait", action="store_true", default=False,
+                      help="don't wait for source")
     (options, args) = parser.parse_args()
     if len(args) != 0:
         parser.print_help()
@@ -48,7 +50,8 @@ if __name__ == '__main__':
 
     # Create an instance of a hierarchical block
     top_block = dial_tone_sink(options.src_name, options.src_port,
-                               options.packet_size, options.sample_rate)
+                               options.packet_size, options.sample_rate,
+                               not options.no_wait)
     
     try:    
         # Run forever
index 7069507cfb593fde10563f1aed7d853374c39d9d..e29caa33db1bacb71132b415ac6b4ac235c96a15 100644 (file)
@@ -21,6 +21,7 @@
 
 #include <boost/shared_ptr.hpp>
 #include <boost/utility.hpp>
+#include <boost/bind.hpp>
 #include <vector>
 #include <complex>
 #include <usrp2/rx_sample_handler.h>