Merge remote branch 'gnuradio/wip/udp_source_sink'
[debian/gnuradio] / gnuradio-core / src / lib / io / gr_udp_source.cc
old mode 100755 (executable)
new mode 100644 (file)
index ce870d4..fea9a26
 #include <errno.h>
 #include <stdio.h>
 #include <string.h>
+
 #if defined(HAVE_NETDB_H)
 #include <netdb.h>
+#ifdef HAVE_SYS_TYPES_H
+#include <sys/types.h>
+#endif
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
 typedef void* optval_t;
+
+// ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order
+#if defined(HAVE_NETINET_IN_H)
+#include <netinet/in.h>
+#endif
+#if defined(HAVE_ARPA_INET_H)
+#include <arpa/inet.h>
+#endif
+
 #elif defined(HAVE_WINDOWS_H)
 // if not posix, assume winsock
 #define USING_WINSOCK
@@ -67,7 +83,7 @@ static int is_error( int perr )
 #endif
 }
 
-static void report_error( char *msg1, char *msg2 )
+static void report_error( const char *msg1, const char *msg2 )
 {
   // Deal with errors, both posix and winsock
 #if defined(USING_WINSOCK)
@@ -81,16 +97,16 @@ static void report_error( char *msg1, char *msg2 )
   return;
 }
 
-gr_udp_source::gr_udp_source(size_t itemsize, const char *src
-                            unsigned short port_src, int payload_size,
-                            bool wait)
+gr_udp_source::gr_udp_source(size_t itemsize, const char *host
+                            unsigned short port, int payload_size,
+                            bool eof, bool wait)
   : gr_sync_block ("udp_source",
                   gr_make_io_signature(0, 0, 0),
                   gr_make_io_signature(1, 1, itemsize)),
-    d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0)
+    d_itemsize(itemsize), d_payload_size(payload_size),
+    d_eof(eof), d_wait(wait), d_socket(-1), d_residual(0), d_temp_offset(0)
 {
   int ret = 0;
-  struct addrinfo *ip_src;      // store the source IP address to use
 
 #if defined(USING_WINSOCK) // for Windows (with MinGW)
   // initialize winsock DLL
@@ -103,18 +119,23 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src,
   
   // Set up the address stucture for the source address and port numbers
   // Get the source IP address from the host name
+  struct addrinfo *ip_src;      // store the source IP address to use
   struct addrinfo hints;
   memset( (void*)&hints, 0, sizeof(hints) );
   hints.ai_family = AF_INET;
   hints.ai_socktype = SOCK_DGRAM;
   hints.ai_protocol = IPPROTO_UDP;
-  char port_str[7];
-  sprintf( port_str, "%d", port_src );
-  ret = getaddrinfo( src, port_str, &hints, &ip_src );
+  hints.ai_flags = AI_PASSIVE;
+  char port_str[12];
+  sprintf( port_str, "%d", port );
+
+  // FIXME leaks if report_error throws below
+  ret = getaddrinfo( host, port_str, &hints, &ip_src );
   if( ret != 0 )
     report_error("gr_udp_source/getaddrinfo",
                 "can't initialize source socket" );
 
+  // FIXME leaks if report_error throws below
   d_temp_buff = new char[d_payload_size];   // allow it to hold up to payload_size bytes
 
   // create socket
@@ -166,24 +187,24 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src,
 
 gr_udp_source_sptr
 gr_make_udp_source (size_t itemsize, const char *ipaddr, 
-                   unsigned short port, int payload_size, bool wait)
+                   unsigned short port, int payload_size, bool eof, bool wait)
 {
   return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, 
-                                               port, payload_size, wait));
+                                               port, payload_size, eof, wait));
 }
 
 gr_udp_source::~gr_udp_source ()
 {
   delete [] d_temp_buff;
 
-  if (d_socket){
+  if (d_socket != -1){
     shutdown(d_socket, SHUT_RDWR);
 #if defined(USING_WINSOCK)
     closesocket(d_socket);
 #else
     ::close(d_socket);
 #endif
-    d_socket = 0;
+    d_socket = -1;
   }
 
 #if defined(USING_WINSOCK) // for Windows (with MinGW)
@@ -221,23 +242,23 @@ gr_udp_source::work (int noutput_items,
     
     // Update indexing of amount of bytes left in the buffer
     d_residual -= nbytes;
-    d_temp_offset = d_temp_offset+d_residual;
+    d_temp_offset += nbytes;
+
+    // Return now with what we've got.
+    assert(nbytes % d_itemsize == 0);
+    return nbytes/d_itemsize;
   }
 
-#if USE_SELECT
-  // Use select() to determine when socket is readable
-  fd_set readfds;
-  timeval timeout;
-  timeout.tv_sec = 1;
-  timeout.tv_usec = 0;
-#endif
-  
   while(1) {
     // get the data into our output buffer and record the number of bytes
 
 #if USE_SELECT
     // RCV_TIMEO doesn't work on all systems (e.g., Cygwin)
     // use select() instead of, or in addition to RCV_TIMEO
+    fd_set readfds;
+    timeval timeout;
+    timeout.tv_sec = 1;          // Init timeout each iteration.  Select can modify it.
+    timeout.tv_usec = 0;
     FD_ZERO(&readfds);
     FD_SET(d_socket, &readfds);
     r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
@@ -259,6 +280,11 @@ gr_udp_source::work (int noutput_items,
     // This is a non-blocking call with a timeout set in the constructor
     r = recv(d_socket, d_temp_buff, d_payload_size, 0);  // get the entire payload or the what's available
 
+    // If r > 0, round it down to a multiple of d_itemsize 
+    // (If sender is broken, don't propagate problem)
+    if (r > 0)
+      r = (r/d_itemsize) * d_itemsize;
+
     // Check if there was a problem; forget it if the operation just timed out
     if(r == -1) {
       if( is_error(EAGAIN) ) {  // handle non-blocking call timeout
@@ -279,6 +305,22 @@ gr_udp_source::work (int noutput_items,
        return -1;
       }
     }
+    else if(r==0) {
+      if(d_eof) {
+       // zero-length packet interpreted as EOF
+
+       #if SNK_VERBOSE
+       printf("\tzero-length packet received; returning EOF\n");
+       #endif
+
+       return -1;
+      }
+      else{
+       // do we need to allow boost thread interrupt?
+       boost::this_thread::interruption_point();
+       continue;
+      }
+    }
     else {
       // Calculate the number of bytes we can take from the buffer in this call
       nbytes = std::min(r, total_bytes-bytes_received);
@@ -316,3 +358,15 @@ gr_udp_source::work (int noutput_items,
   return bytes_received/d_itemsize;
 }
 
+// Return port number of d_socket
+int gr_udp_source::get_port(void)
+{
+  sockaddr_in name;
+  socklen_t len = sizeof(name);
+  int ret = getsockname( d_socket, (sockaddr*)&name, &len );
+  if( ret ) {
+    report_error("gr_udp_source/getsockname",NULL);
+    return -1;
+  }
+  return ntohs(name.sin_port);
+}