#if defined(HAVE_NETDB_H)
#include <netdb.h>
+#ifdef HAVE_SYS_TYPES_H
+#include <sys/types.h>
+#endif
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
typedef void* optval_t;
// ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order
gr_make_io_signature(0, 0, 0),
gr_make_io_signature(1, 1, itemsize)),
d_itemsize(itemsize), d_payload_size(payload_size),
- d_eof(eof), d_wait(wait), d_residual(0), d_temp_offset(0)
+ d_eof(eof), d_wait(wait), d_socket(-1), d_residual(0), d_temp_offset(0)
{
int ret = 0;
hints.ai_flags = AI_PASSIVE;
char port_str[12];
sprintf( port_str, "%d", port );
+
+ // FIXME leaks if report_error throws below
ret = getaddrinfo( host, port_str, &hints, &ip_src );
if( ret != 0 )
report_error("gr_udp_source/getaddrinfo",
"can't initialize source socket" );
+ // FIXME leaks if report_error throws below
d_temp_buff = new char[d_payload_size]; // allow it to hold up to payload_size bytes
// create socket
{
delete [] d_temp_buff;
- if (d_socket){
+ if (d_socket != -1){
shutdown(d_socket, SHUT_RDWR);
#if defined(USING_WINSOCK)
closesocket(d_socket);
#else
::close(d_socket);
#endif
- d_socket = 0;
+ d_socket = -1;
}
#if defined(USING_WINSOCK) // for Windows (with MinGW)
// Update indexing of amount of bytes left in the buffer
d_residual -= nbytes;
- d_temp_offset = d_temp_offset+d_residual;
+ d_temp_offset += nbytes;
+
+ // Return now with what we've got.
+ assert(nbytes % d_itemsize == 0);
+ return nbytes/d_itemsize;
}
-#if USE_SELECT
- // Use select() to determine when socket is readable
- fd_set readfds;
- timeval timeout;
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
-#endif
-
while(1) {
// get the data into our output buffer and record the number of bytes
#if USE_SELECT
// RCV_TIMEO doesn't work on all systems (e.g., Cygwin)
// use select() instead of, or in addition to RCV_TIMEO
+ fd_set readfds;
+ timeval timeout;
+ timeout.tv_sec = 1; // Init timeout each iteration. Select can modify it.
+ timeout.tv_usec = 0;
FD_ZERO(&readfds);
FD_SET(d_socket, &readfds);
r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
// This is a non-blocking call with a timeout set in the constructor
r = recv(d_socket, d_temp_buff, d_payload_size, 0); // get the entire payload or the what's available
+ // If r > 0, round it down to a multiple of d_itemsize
+ // (If sender is broken, don't propagate problem)
+ if (r > 0)
+ r = (r/d_itemsize) * d_itemsize;
+
// Check if there was a problem; forget it if the operation just timed out
if(r == -1) {
if( is_error(EAGAIN) ) { // handle non-blocking call timeout