Rework UDP source and sink, with incompatible API changes
[debian/gnuradio] / gnuradio-core / src / lib / io / gr_udp_source.cc
index ce870d481eff17b99b8ccd436b0bd27ebe0b8fc0..880388e5e3e90fbf958616f7dcb779eed8eddf4d 100755 (executable)
 #include <errno.h>
 #include <stdio.h>
 #include <string.h>
+
 #if defined(HAVE_NETDB_H)
 #include <netdb.h>
 typedef void* optval_t;
+
+// ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order
+#if defined(HAVE_NETINET_IN_H)
+#include <netinet/in.h>
+#endif
+#if defined(HAVE_ARPA_INET_H)
+#include <arpa/inet.h>
+#endif
+
 #elif defined(HAVE_WINDOWS_H)
 // if not posix, assume winsock
 #define USING_WINSOCK
@@ -67,7 +77,7 @@ static int is_error( int perr )
 #endif
 }
 
-static void report_error( char *msg1, char *msg2 )
+static void report_error( const char *msg1, const char *msg2 )
 {
   // Deal with errors, both posix and winsock
 #if defined(USING_WINSOCK)
@@ -81,16 +91,16 @@ static void report_error( char *msg1, char *msg2 )
   return;
 }
 
-gr_udp_source::gr_udp_source(size_t itemsize, const char *src
-                            unsigned short port_src, int payload_size,
-                            bool wait)
+gr_udp_source::gr_udp_source(size_t itemsize, const char *host
+                            unsigned short port, int payload_size,
+                            bool eof, bool wait)
   : gr_sync_block ("udp_source",
                   gr_make_io_signature(0, 0, 0),
                   gr_make_io_signature(1, 1, itemsize)),
-    d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0)
+    d_itemsize(itemsize), d_payload_size(payload_size),
+    d_eof(eof), d_wait(wait), d_residual(0), d_temp_offset(0)
 {
   int ret = 0;
-  struct addrinfo *ip_src;      // store the source IP address to use
 
 #if defined(USING_WINSOCK) // for Windows (with MinGW)
   // initialize winsock DLL
@@ -103,14 +113,16 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src,
   
   // Set up the address stucture for the source address and port numbers
   // Get the source IP address from the host name
+  struct addrinfo *ip_src;      // store the source IP address to use
   struct addrinfo hints;
   memset( (void*)&hints, 0, sizeof(hints) );
   hints.ai_family = AF_INET;
   hints.ai_socktype = SOCK_DGRAM;
   hints.ai_protocol = IPPROTO_UDP;
-  char port_str[7];
-  sprintf( port_str, "%d", port_src );
-  ret = getaddrinfo( src, port_str, &hints, &ip_src );
+  hints.ai_flags = AI_PASSIVE;
+  char port_str[12];
+  sprintf( port_str, "%d", port );
+  ret = getaddrinfo( host, port_str, &hints, &ip_src );
   if( ret != 0 )
     report_error("gr_udp_source/getaddrinfo",
                 "can't initialize source socket" );
@@ -166,10 +178,10 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src,
 
 gr_udp_source_sptr
 gr_make_udp_source (size_t itemsize, const char *ipaddr, 
-                   unsigned short port, int payload_size, bool wait)
+                   unsigned short port, int payload_size, bool eof, bool wait)
 {
   return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
-                                               port, payload_size, wait));
+                                               port, payload_size, eof, wait));
 }
 
 gr_udp_source::~gr_udp_source ()
@@ -279,6 +291,22 @@ gr_udp_source::work (int noutput_items,
        return -1;
       }
     }
+    else if(r==0) {
+      if(d_eof) {
+       // zero-length packet interpreted as EOF
+
+       #if SNK_VERBOSE
+       printf("\tzero-length packet received; returning EOF\n");
+       #endif
+
+       return -1;
+      }
+      else{
+       // do we need to allow boost thread interrupt?
+       boost::this_thread::interruption_point();
+       continue;
+      }
+    }
     else {
       // Calculate the number of bytes we can take from the buffer in this call
       nbytes = std::min(r, total_bytes-bytes_received);
@@ -316,3 +344,15 @@ gr_udp_source::work (int noutput_items,
   return bytes_received/d_itemsize;
 }
 
+// Return port number of d_socket
+int gr_udp_source::get_port(void)
+{
+  sockaddr_in name;
+  socklen_t len = sizeof(name);
+  int ret = getsockname( d_socket, (sockaddr*)&name, &len );
+  if( ret ) {
+    report_error("gr_udp_source/getsockname",NULL);
+    return -1;
+  }
+  return ntohs(name.sin_port);
+}