Merge remote branch 'gnuradio/wip/udp_source_sink'
[debian/gnuradio] / gnuradio-core / src / lib / io / gr_udp_source.cc
old mode 100755 (executable)
new mode 100644 (file)
index 880388e..fea9a26
 
 #if defined(HAVE_NETDB_H)
 #include <netdb.h>
+#ifdef HAVE_SYS_TYPES_H
+#include <sys/types.h>
+#endif
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
 typedef void* optval_t;
 
 // ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order
@@ -98,7 +104,7 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *host,
                   gr_make_io_signature(0, 0, 0),
                   gr_make_io_signature(1, 1, itemsize)),
     d_itemsize(itemsize), d_payload_size(payload_size),
-    d_eof(eof), d_wait(wait), d_residual(0), d_temp_offset(0)
+    d_eof(eof), d_wait(wait), d_socket(-1), d_residual(0), d_temp_offset(0)
 {
   int ret = 0;
 
@@ -122,11 +128,14 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *host,
   hints.ai_flags = AI_PASSIVE;
   char port_str[12];
   sprintf( port_str, "%d", port );
+
+  // FIXME leaks if report_error throws below
   ret = getaddrinfo( host, port_str, &hints, &ip_src );
   if( ret != 0 )
     report_error("gr_udp_source/getaddrinfo",
                 "can't initialize source socket" );
 
+  // FIXME leaks if report_error throws below
   d_temp_buff = new char[d_payload_size];   // allow it to hold up to payload_size bytes
 
   // create socket
@@ -188,14 +197,14 @@ gr_udp_source::~gr_udp_source ()
 {
   delete [] d_temp_buff;
 
-  if (d_socket){
+  if (d_socket != -1){
     shutdown(d_socket, SHUT_RDWR);
 #if defined(USING_WINSOCK)
     closesocket(d_socket);
 #else
     ::close(d_socket);
 #endif
-    d_socket = 0;
+    d_socket = -1;
   }
 
 #if defined(USING_WINSOCK) // for Windows (with MinGW)
@@ -233,23 +242,23 @@ gr_udp_source::work (int noutput_items,
     
     // Update indexing of amount of bytes left in the buffer
     d_residual -= nbytes;
-    d_temp_offset = d_temp_offset+d_residual;
+    d_temp_offset += nbytes;
+
+    // Return now with what we've got.
+    assert(nbytes % d_itemsize == 0);
+    return nbytes/d_itemsize;
   }
 
-#if USE_SELECT
-  // Use select() to determine when socket is readable
-  fd_set readfds;
-  timeval timeout;
-  timeout.tv_sec = 1;
-  timeout.tv_usec = 0;
-#endif
-  
   while(1) {
     // get the data into our output buffer and record the number of bytes
 
 #if USE_SELECT
     // RCV_TIMEO doesn't work on all systems (e.g., Cygwin)
     // use select() instead of, or in addition to RCV_TIMEO
+    fd_set readfds;
+    timeval timeout;
+    timeout.tv_sec = 1;          // Init timeout each iteration.  Select can modify it.
+    timeout.tv_usec = 0;
     FD_ZERO(&readfds);
     FD_SET(d_socket, &readfds);
     r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
@@ -271,6 +280,11 @@ gr_udp_source::work (int noutput_items,
     // This is a non-blocking call with a timeout set in the constructor
     r = recv(d_socket, d_temp_buff, d_payload_size, 0);  // get the entire payload or the what's available
 
+    // If r > 0, round it down to a multiple of d_itemsize 
+    // (If sender is broken, don't propagate problem)
+    if (r > 0)
+      r = (r/d_itemsize) * d_itemsize;
+
     // Check if there was a problem; forget it if the operation just timed out
     if(r == -1) {
       if( is_error(EAGAIN) ) {  // handle non-blocking call timeout