From: Don Ward Date: Tue, 4 May 2010 16:41:52 +0000 (-0400) Subject: Rework UDP source and sink, with incompatible API changes X-Git-Url: https://git.gag.com/?p=debian%2Fgnuradio;a=commitdiff_plain;h=d702e27d1f3b0e76ef3734ee6b5b6ac1333cdbff Rework UDP source and sink, with incompatible API changes Remove source address specifications for sink; add connect() and disconnect() to sink; add get_port() to source; add optional EOF signaling (using zero-length packets) to sink and source; modify dial_tone, vector, and audio examples to match new code; add qa test case. --- diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.cc b/gnuradio-core/src/lib/io/gr_udp_sink.cc index 263d3dd4..a9cb87a2 100755 --- a/gnuradio-core/src/lib/io/gr_udp_sink.cc +++ b/gnuradio-core/src/lib/io/gr_udp_sink.cc @@ -30,6 +30,8 @@ #include #include #if defined(HAVE_NETDB_H) +#include +#include //usually included by ? typedef void* optval_t; #elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock @@ -84,18 +86,14 @@ static void report_error( const char *msg1, const char *msg2 ) } gr_udp_sink::gr_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size) + const char *host, unsigned short port, + int payload_size, bool eof) : gr_sync_block ("udp_sink", gr_make_io_signature (1, 1, itemsize), gr_make_io_signature (0, 0, 0)), - d_itemsize (itemsize), d_payload_size(payload_size) + d_itemsize (itemsize), d_payload_size(payload_size), d_eof(eof), + d_connected(false) { - int ret = 0; - struct addrinfo *ip_src; // store the source ip info - struct addrinfo *ip_dst; // store the destination ip info - #if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL WSADATA wsaData; @@ -104,41 +102,13 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, report_error( "gr_udp_source WSAStartup", "can't open socket" ); } #endif - - // Set up the address stucture for the source address and port numbers - // Get the source IP address from the host name - struct addrinfo hints; - memset( (void*)&hints, 0, sizeof(hints) ); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = IPPROTO_UDP; - char port_str[7]; - sprintf( port_str, "%d", port_src ); - ret = getaddrinfo( src, port_str, &hints, &ip_src ); - if( ret != 0 ) - report_error("gr_udp_source/getaddrinfo", - "can't initialize source socket" ); - - // Get the destination IP address from the host name - sprintf( port_str, "%d", port_dst ); - ret = getaddrinfo( dst, port_str, &hints, &ip_dst ); - if( ret != 0 ) - report_error("gr_udp_source/getaddrinfo", - "can't initialize destination socket" ); // create socket - d_socket = socket(ip_src->ai_family, ip_src->ai_socktype, - ip_src->ai_protocol); + d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if(d_socket == -1) { report_error("socket open","can't open socket"); } - // Turn on reuse address - int opt_val = true; - if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (optval_t)&opt_val, sizeof(int)) == -1) { - report_error("SO_REUSEADDR","can't set socket option SO_REUSEADDR"); - } - // Don't wait when shutting down linger lngr; lngr.l_onoff = 1; @@ -149,36 +119,27 @@ gr_udp_sink::gr_udp_sink (size_t itemsize, } } - // bind socket to an address and port number to listen on - if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) { - report_error("socket bind","can't bind socket"); - } - - // Not sure if we should throw here or allow retries - if(connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) { - report_error("socket connect","can't connect to socket"); - } - - freeaddrinfo(ip_src); - freeaddrinfo(ip_dst); + // Get the destination address + connect(host, port); } // public constructor that returns a shared_ptr gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size) + const char *host, unsigned short port, + int payload_size, bool eof) { return gr_udp_sink_sptr (new gr_udp_sink (itemsize, - src, port_src, - dst, port_dst, - payload_size)); + host, port, + payload_size, eof)); } gr_udp_sink::~gr_udp_sink () { + if (d_connected) + disconnect(); + if (d_socket){ shutdown(d_socket, SHUT_RDWR); #if defined(USING_WINSOCK) @@ -208,22 +169,28 @@ gr_udp_sink::work (int noutput_items, printf("Entered udp_sink\n"); #endif + gruel::scoped_lock guard(d_mutex); // protect d_socket + while(bytes_sent < total_size) { bytes_to_send = std::min((ssize_t)d_payload_size, (total_size-bytes_sent)); - r = send(d_socket, (in+bytes_sent), bytes_to_send, 0); - if(r == -1) { // error on send command - if( is_error(ECONNREFUSED) ) - r = bytes_to_send; // discard data until receiver is started - else { - report_error("udp_sink",NULL); // there should be no error case where - return -1; // this function should not exit immediately + if(d_connected) { + r = send(d_socket, (in+bytes_sent), bytes_to_send, 0); + if(r == -1) { // error on send command + if( is_error(ECONNREFUSED) ) + r = bytes_to_send; // discard data until receiver is started + else { + report_error("udp_sink",NULL); // there should be no error case where + return -1; // this function should not exit immediately + } } } + else + r = bytes_to_send; // discarded for lack of connection bytes_sent += r; #if SNK_VERBOSE - printf("\tbyte sent: %d bytes\n", bytes); + printf("\tbyte sent: %d bytes\n", r); #endif } @@ -233,3 +200,71 @@ gr_udp_sink::work (int noutput_items, return noutput_items; } + +void gr_udp_sink::connect( const char *host, unsigned short port ) +{ + if(d_connected) + disconnect(); + + if(host != NULL ) { + // Get the destination address + struct addrinfo *ip_dst; + struct addrinfo hints; + memset( (void*)&hints, 0, sizeof(hints) ); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = IPPROTO_UDP; + char port_str[12]; + sprintf( port_str, "%d", port ); + int ret = getaddrinfo( host, port_str, &hints, &ip_dst ); + if( ret != 0 ) + report_error("gr_udp_source/getaddrinfo", + "can't initialize destination socket" ); + + // don't need d_mutex lock when !d_connected + if(::connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) { + report_error("socket connect","can't connect to socket"); + } + d_connected = true; + + freeaddrinfo(ip_dst); + } + + return; +} + +void gr_udp_sink::disconnect() +{ + if(!d_connected) + return; + + #if SNK_VERBOSE + printf("gr_udp_sink disconnecting\n"); + #endif + + gruel::scoped_lock guard(d_mutex); // protect d_socket from work() + + // Send a few zero-length packets to signal receiver we are done + if(d_eof) { + int i; + for( i = 0; i < 3; i++ ) + (void) send( d_socket, NULL, 0, 0 ); // ignore errors + } + + // Since I can't find any way to disconnect a datagram socket in Cygwin, + // we just leave it connected but disable sending. +#if 0 + // zeroed address structure should reset connection + struct sockaddr addr; + memset( (void*)&addr, 0, sizeof(addr) ); + // addr.sa_family = AF_UNSPEC; // doesn't work on Cygwin + // addr.sa_family = AF_INET; // doesn't work on Cygwin + + if(::connect(d_socket, &addr, sizeof(addr)) == -1) + report_error("socket connect","can't connect to socket"); +#endif + + d_connected = false; + + return; +} diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.h b/gnuradio-core/src/lib/io/gr_udp_sink.h old mode 100644 new mode 100755 index 6b6ee40f..421d514a --- a/gnuradio-core/src/lib/io/gr_udp_sink.h +++ b/gnuradio-core/src/lib/io/gr_udp_sink.h @@ -24,14 +24,6 @@ #define INCLUDED_GR_UDP_SINK_H #include -#if defined(HAVE_NETDB_H) -#include -#include // usually #included by ? -#elif defined(HAVE_WINDOWS_H) -#include -#include -#endif - #include class gr_udp_sink; @@ -39,55 +31,52 @@ typedef boost::shared_ptr gr_udp_sink_sptr; gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size=1472); + const char *host, unsigned short port, + int payload_size=1472, bool eof=true); /*! * \brief Write stream to an UDP socket. * \ingroup sink_blk * * \param itemsize The size (in bytes) of the item datatype - * \param src The source address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_src Destination port to bind to (0 allows socket to choose an appropriate port) - * \param dst The destination address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_dst Destination port to connect to - * \param payload_size UDP payload size by default set to - * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param host The name or IP address of the receiving host; use + * NULL or None for no connection + * \param port Destination port to connect to on receiving host + * \param payload_size UDP payload size by default set to 1472 = + * (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param eof Send zero-length packet on disconnect */ class gr_udp_sink : public gr_sync_block { friend gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size); + const char *host, + unsigned short port, + int payload_size, bool eof); private: size_t d_itemsize; - int d_payload_size; // maximum transmission unit (packet length) - int d_socket; // handle to socket + int d_payload_size; // maximum transmission unit (packet length) + bool d_eof; // send zero-length packet on disconnect + int d_socket; // handle to socket + bool d_connected; // are we connected? + gruel::mutex d_mutex; // protects d_socket and d_connected protected: /*! * \brief UDP Sink Constructor * * \param itemsize The size (in bytes) of the item datatype - * \param src The source address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_src Destination port to bind to (0 allows socket to choose an appropriate port) - * \param dst The destination address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_dst Destination port to connect to + * \param host The name or IP address of the receiving host; use + * NULL or None for no connection + * \param port Destination port to connect to on receiving host * \param payload_size UDP payload size by default set to * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param eof Send zero-length packet on disconnect */ gr_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size); + const char *host, unsigned short port, + int payload_size, bool eof); public: ~gr_udp_sink (); @@ -95,6 +84,23 @@ class gr_udp_sink : public gr_sync_block /*! \brief return the PAYLOAD_SIZE of the socket */ int payload_size() { return d_payload_size; } + /*! \brief Change the connection to a new destination + * + * \param host The name or IP address of the receiving host; use + * NULL or None to break the connection without closing + * \param port Destination port to connect to on receiving host + * + * Calls disconnect() to terminate any current connection first. + */ + void connect( const char *host, unsigned short port ); + + /*! \brief Send zero-length packet (if eof is requested) then stop sending + * + * Zero-byte packets can be interpreted as EOF by gr_udp_source. Note that + * disconnect occurs automatically when the sink is destroyed, but not when + * its top_block stops.*/ + void disconnect(); + // should we export anything else? int work (int noutput_items, diff --git a/gnuradio-core/src/lib/io/gr_udp_sink.i b/gnuradio-core/src/lib/io/gr_udp_sink.i old mode 100644 new mode 100755 index 0f37b477..fc8059f3 --- a/gnuradio-core/src/lib/io/gr_udp_sink.i +++ b/gnuradio-core/src/lib/io/gr_udp_sink.i @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007 Free Software Foundation, Inc. + * Copyright 2007,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -25,22 +25,21 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_sink) gr_udp_sink_sptr gr_make_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size=1472); + const char *host, unsigned short port, + int payload_size=1472, bool eof=true); class gr_udp_sink : public gr_sync_block { protected: gr_udp_sink (size_t itemsize, - const char *src, unsigned short port_src, - const char *dst, unsigned short port_dst, - int payload_size); - - bool open(); - void close(); - int payload_size() { return d_payload_size; } + const char *host, unsigned short port, + int payload_size, bool eof); public: ~gr_udp_sink (); + + int payload_size() { return d_payload_size; } + void connect( const char *host, unsigned short port ); + void disconnect(); + }; diff --git a/gnuradio-core/src/lib/io/gr_udp_source.cc b/gnuradio-core/src/lib/io/gr_udp_source.cc index ce870d48..880388e5 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.cc +++ b/gnuradio-core/src/lib/io/gr_udp_source.cc @@ -29,9 +29,19 @@ #include #include #include + #if defined(HAVE_NETDB_H) #include typedef void* optval_t; + +// ntohs() on FreeBSD may require both netinet/in.h and arpa/inet.h, in order +#if defined(HAVE_NETINET_IN_H) +#include +#endif +#if defined(HAVE_ARPA_INET_H) +#include +#endif + #elif defined(HAVE_WINDOWS_H) // if not posix, assume winsock #define USING_WINSOCK @@ -67,7 +77,7 @@ static int is_error( int perr ) #endif } -static void report_error( char *msg1, char *msg2 ) +static void report_error( const char *msg1, const char *msg2 ) { // Deal with errors, both posix and winsock #if defined(USING_WINSOCK) @@ -81,16 +91,16 @@ static void report_error( char *msg1, char *msg2 ) return; } -gr_udp_source::gr_udp_source(size_t itemsize, const char *src, - unsigned short port_src, int payload_size, - bool wait) +gr_udp_source::gr_udp_source(size_t itemsize, const char *host, + unsigned short port, int payload_size, + bool eof, bool wait) : gr_sync_block ("udp_source", gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, itemsize)), - d_itemsize(itemsize), d_payload_size(payload_size), d_wait(wait), d_residual(0), d_temp_offset(0) + d_itemsize(itemsize), d_payload_size(payload_size), + d_eof(eof), d_wait(wait), d_residual(0), d_temp_offset(0) { int ret = 0; - struct addrinfo *ip_src; // store the source IP address to use #if defined(USING_WINSOCK) // for Windows (with MinGW) // initialize winsock DLL @@ -103,14 +113,16 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, // Set up the address stucture for the source address and port numbers // Get the source IP address from the host name + struct addrinfo *ip_src; // store the source IP address to use struct addrinfo hints; memset( (void*)&hints, 0, sizeof(hints) ); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; hints.ai_protocol = IPPROTO_UDP; - char port_str[7]; - sprintf( port_str, "%d", port_src ); - ret = getaddrinfo( src, port_str, &hints, &ip_src ); + hints.ai_flags = AI_PASSIVE; + char port_str[12]; + sprintf( port_str, "%d", port ); + ret = getaddrinfo( host, port_str, &hints, &ip_src ); if( ret != 0 ) report_error("gr_udp_source/getaddrinfo", "can't initialize source socket" ); @@ -166,10 +178,10 @@ gr_udp_source::gr_udp_source(size_t itemsize, const char *src, gr_udp_source_sptr gr_make_udp_source (size_t itemsize, const char *ipaddr, - unsigned short port, int payload_size, bool wait) + unsigned short port, int payload_size, bool eof, bool wait) { return gr_udp_source_sptr (new gr_udp_source (itemsize, ipaddr, - port, payload_size, wait)); + port, payload_size, eof, wait)); } gr_udp_source::~gr_udp_source () @@ -279,6 +291,22 @@ gr_udp_source::work (int noutput_items, return -1; } } + else if(r==0) { + if(d_eof) { + // zero-length packet interpreted as EOF + + #if SNK_VERBOSE + printf("\tzero-length packet received; returning EOF\n"); + #endif + + return -1; + } + else{ + // do we need to allow boost thread interrupt? + boost::this_thread::interruption_point(); + continue; + } + } else { // Calculate the number of bytes we can take from the buffer in this call nbytes = std::min(r, total_bytes-bytes_received); @@ -316,3 +344,15 @@ gr_udp_source::work (int noutput_items, return bytes_received/d_itemsize; } +// Return port number of d_socket +int gr_udp_source::get_port(void) +{ + sockaddr_in name; + socklen_t len = sizeof(name); + int ret = getsockname( d_socket, (sockaddr*)&name, &len ); + if( ret ) { + report_error("gr_udp_source/getsockname",NULL); + return -1; + } + return ntohs(name.sin_port); +} diff --git a/gnuradio-core/src/lib/io/gr_udp_source.h b/gnuradio-core/src/lib/io/gr_udp_source.h index b06536d6..e23231aa 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.h +++ b/gnuradio-core/src/lib/io/gr_udp_source.h @@ -24,49 +24,48 @@ #define INCLUDED_GR_UDP_SOURCE_H #include -#if defined(HAVE_NETDB_H) -#include -#include // usually #included by ? -#elif defined(HAVE_WINDOWS_H) -#include -#include -#endif - #include class gr_udp_source; typedef boost::shared_ptr gr_udp_source_sptr; -gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, - unsigned short port_src, - int payload_size=1472, bool wait=true); +gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *host, + unsigned short port, + int payload_size=1472, + bool eof=true, bool wait=true); /*! * \brief Read stream from an UDP socket. * \ingroup source_blk * * \param itemsize The size (in bytes) of the item datatype - * \param src The source address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_src The port number on which the socket listens for data - * \param payload_size UDP payload size by default set to - * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) - * \param wait Wait for data if not immediately available (default: true) + * \param host The name or IP address of the receiving host; can be + * NULL, None, or "0.0.0.0" to allow reading from any + * interface on the host + * \param port The port number on which to receive data; use 0 to + * have the system assign an unused port number + * \param payload_size UDP payload size by default set to 1472 = + * (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param eof Interpret zero-length packet as EOF (default: true) + * \param wait Wait for data if not immediately available + * (default: true) * */ class gr_udp_source : public gr_sync_block { - friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, const char *src, - unsigned short port_src, - int payload_size, bool wait); + friend gr_udp_source_sptr gr_make_udp_source(size_t itemsize, + const char *host, + unsigned short port, + int payload_size, + bool eof, bool wait); private: size_t d_itemsize; - - int d_payload_size; // maximum transmission unit (packet length) - bool d_wait; // wait if data if not immediately available - int d_socket; // handle to socket + int d_payload_size; // maximum transmission unit (packet length) + bool d_eof; // zero-length packet is EOF + bool d_wait; // wait if data if not immediately available + int d_socket; // handle to socket char *d_temp_buff; // hold buffer between calls ssize_t d_residual; // hold information about number of bytes stored in the temp buffer size_t d_temp_offset; // point to temp buffer location offset @@ -76,15 +75,19 @@ class gr_udp_source : public gr_sync_block * \brief UDP Source Constructor * * \param itemsize The size (in bytes) of the item datatype - * \param src The source address as either the host name or the 'numbers-and-dots' - * IP address - * \param port_src The port number on which the socket listens for data - * \param payload_size UDP payload size by default set to - * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) - * \param wait Wait for data if not immediately available (default: true) + * \param host The name or IP address of the receiving host; can be + * NULL, None, or "0.0.0.0" to allow reading from any + * interface on the host + * \param port The port number on which to receive data; use 0 to + * have the system assign an unused port number + * \param payload_size UDP payload size by default set to 1472 = + * (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param eof Interpret zero-length packet as EOF (default: true) + * \param wait Wait for data if not immediately available + * (default: true) */ - gr_udp_source(size_t itemsize, const char *src, unsigned short port_src, - int payload_size, bool wait); + gr_udp_source(size_t itemsize, const char *host, unsigned short port, + int payload_size, bool eof, bool wait); public: ~gr_udp_source(); @@ -92,6 +95,9 @@ class gr_udp_source : public gr_sync_block /*! \brief return the PAYLOAD_SIZE of the socket */ int payload_size() { return d_payload_size; } + /*! \breif return the port number of the socket */ + int get_port(); + // should we export anything else? int work(int noutput_items, diff --git a/gnuradio-core/src/lib/io/gr_udp_source.i b/gnuradio-core/src/lib/io/gr_udp_source.i index efaa57c2..e1b23074 100755 --- a/gnuradio-core/src/lib/io/gr_udp_source.i +++ b/gnuradio-core/src/lib/io/gr_udp_source.i @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2007 Free Software Foundation, Inc. + * Copyright 2007,2010 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -23,19 +23,19 @@ GR_SWIG_BLOCK_MAGIC(gr,udp_source) gr_udp_source_sptr -gr_make_udp_source (size_t itemsize, const char *src, - unsigned short port_src, int payload_size=1472, - bool wait=true); +gr_make_udp_source (size_t itemsize, const char *host, + unsigned short port, int payload_size=1472, + bool eof=true, bool wait=true); class gr_udp_source : public gr_sync_block { protected: - gr_udp_source (size_t itemsize, const char *src, - unsigned short port_src, int payload_size, bool wait); + gr_udp_source (size_t itemsize, const char *host, + unsigned short port, int payload_size, bool eof, bool wait); public: ~gr_udp_source (); int payload_size() { return d_payload_size; } - + int get_port(); }; diff --git a/gnuradio-core/src/python/gnuradio/gr/Makefile.am b/gnuradio-core/src/python/gnuradio/gr/Makefile.am old mode 100644 new mode 100755 index 3aff89ee..74c46afb --- a/gnuradio-core/src/python/gnuradio/gr/Makefile.am +++ b/gnuradio-core/src/python/gnuradio/gr/Makefile.am @@ -97,4 +97,5 @@ noinst_PYTHON = \ qa_unpack_k_bits.py \ qa_repeat.py \ qa_scrambler.py \ + qa_udp_sink_source.py \ qa_vector_sink_source.py diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py b/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py new file mode 100755 index 00000000..e85d6eeb --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_udp_sink_source.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# +# Copyright 2008 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +from threading import Timer + +class test_sink_source(gr_unittest.TestCase): + + def setUp(self): + self.tb_snd = gr.top_block() + self.tb_rcv = gr.top_block() + + def tearDown(self): + self.tb_rcv = None + self.tb_snd = None + + def test_001(self): + port = 65500 + + n_data = 16 + src_data = [float(x) for x in range(n_data)] + expected_result = tuple(src_data) + src = gr.vector_source_f(src_data) + udp_snd = gr.udp_sink( gr.sizeof_float, 'localhost', port ) + self.tb_snd.connect( src, udp_snd ) + + udp_rcv = gr.udp_source( gr.sizeof_float, 'localhost', port ) + dst = gr.vector_sink_f() + self.tb_rcv.connect( udp_rcv, dst ) + + self.tb_rcv.start() + self.tb_snd.run() + udp_snd.disconnect() + self.timeout = False + q = Timer(3.0,self.stop_rcv) + q.start() + self.tb_rcv.wait() + q.cancel() + + result_data = dst.data() + self.assertEqual(expected_result, result_data) + self.assert_(not self.timeout) + + def test_002(self): + udp_rcv = gr.udp_source( gr.sizeof_float, '0.0.0.0', 0, eof=False ) + rcv_port = udp_rcv.get_port() + + udp_snd = gr.udp_sink( gr.sizeof_float, '127.0.0.1', 65500 ) + udp_snd.connect( 'localhost', rcv_port ) + + n_data = 16 + src_data = [float(x) for x in range(n_data)] + expected_result = tuple(src_data) + src = gr.vector_source_f(src_data) + dst = gr.vector_sink_f() + + self.tb_snd.connect( src, udp_snd ) + self.tb_rcv.connect( udp_rcv, dst ) + + self.tb_rcv.start() + self.tb_snd.run() + udp_snd.disconnect() + self.timeout = False + q = Timer(3.0,self.stop_rcv) + q.start() + self.tb_rcv.wait() + q.cancel() + + result_data = dst.data() + self.assertEqual(expected_result, result_data) + self.assert_(self.timeout) # source ignores EOF? + + def stop_rcv(self): + self.timeout = True + self.tb_rcv.stop() + #print "tb_rcv stopped by Timer" + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-examples/python/network/audio_sink.py b/gnuradio-examples/python/network/audio_sink.py index e59d5083..eb18a75a 100755 --- a/gnuradio-examples/python/network/audio_sink.py +++ b/gnuradio-examples/python/network/audio_sink.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007 Free Software Foundation, Inc. +# Copyright 2006,2007,2010 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -25,30 +25,36 @@ from gnuradio.eng_option import eng_option from optparse import OptionParser class audio_sink(gr.top_block): - def __init__(self, src, port, pkt_size, sample_rate): + def __init__(self, host, port, pkt_size, sample_rate, eof, wait): gr.top_block.__init__(self, "audio_sink") - src = gr.udp_source(gr.sizeof_float, src, port, pkt_size) + src = gr.udp_source(gr.sizeof_float, host, port, pkt_size, + eof=eof, wait=wait) dst = audio.sink(sample_rate) self.connect(src, dst) if __name__ == '__main__': parser = OptionParser(option_class=eng_option) - parser.add_option("", "--src-name", type="string", default="localhost", + parser.add_option("", "--host", type="string", default="0.0.0.0", help="local host name (domain name or IP address)") - parser.add_option("", "--src-port", type="int", default=65500, + parser.add_option("", "--port", type="int", default=65500, help="port value to listen to for connection") parser.add_option("", "--packet-size", type="int", default=1472, help="packet size.") parser.add_option("-r", "--sample-rate", type="int", default=32000, help="audio signal sample rate [default=%default]") + parser.add_option("", "--no-eof", action="store_true", default=False, + help="don't send EOF on disconnect") + parser.add_option("", "--no-wait", action="store_true", default=False, + help="don't wait for source") (options, args) = parser.parse_args() if len(args) != 0: parser.print_help() raise SystemExit, 1 # Create an instance of a hierarchical block - top_block = audio_sink(options.src_name, options.src_port, - options.packet_size, options.sample_rate) + top_block = audio_sink(options.host, options.port, + options.packet_size, options.sample_rate, + not options.no_eof, not options.no_wait) try: # Run forever diff --git a/gnuradio-examples/python/network/audio_source.py b/gnuradio-examples/python/network/audio_source.py index d7f4f6d9..5818ccbd 100755 --- a/gnuradio-examples/python/network/audio_source.py +++ b/gnuradio-examples/python/network/audio_source.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007 Free Software Foundation, Inc. +# Copyright 2006,2007,2010 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -25,32 +25,33 @@ from gnuradio.eng_option import eng_option from optparse import OptionParser class audio_source(gr.top_block): - def __init__(self, src, dst, port, pkt_size, sample_rate): + def __init__(self, host, port, pkt_size, sample_rate, eof): gr.top_block.__init__(self, "audio_source") self.audio = audio.source(sample_rate) - self.sink = gr.udp_sink(gr.sizeof_float, src, 0, dst, port, pkt_size) + self.sink = gr.udp_sink(gr.sizeof_float, host, port, pkt_size, eof=eof) self.connect(self.audio, self.sink) if __name__ == '__main__': parser = OptionParser(option_class=eng_option) - parser.add_option("", "--src-name", type="string", default="localhost", - help="local host name (domain name or IP address)") - parser.add_option("", "--dst-name", type="string", default="localhost", + parser.add_option("", "--host", type="string", default="localhost", help="Remote host name (domain name or IP address") - parser.add_option("", "--dst-port", type="int", default=65500, - help="port value to connect to") + parser.add_option("", "--port", type="int", default=65500, + help="port number to connect to") parser.add_option("", "--packet-size", type="int", default=1472, help="packet size.") parser.add_option("-r", "--sample-rate", type="int", default=32000 , help="audio signal sample rate [default=%default]") + parser.add_option("", "--no-eof", action="store_true", default=False, + help="don't send EOF on disconnect") (options, args) = parser.parse_args() if len(args) != 0: parser.print_help() raise SystemExit, 1 # Create an instance of a hierarchical block - top_block = audio_source(options.src_name, options.dst_name, options.dst_port, - options.packet_size, options.sample_rate) + top_block = audio_source(options.host, options.port, + options.packet_size, options.sample_rate, + not options.no_eof) try: # Run forever diff --git a/gnuradio-examples/python/network/dial_tone_sink.py b/gnuradio-examples/python/network/dial_tone_sink.py index 82e925ba..1b900955 100755 --- a/gnuradio-examples/python/network/dial_tone_sink.py +++ b/gnuradio-examples/python/network/dial_tone_sink.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007 Free Software Foundation, Inc. +# Copyright 2006,2007,2010 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -25,23 +25,26 @@ from gnuradio.eng_option import eng_option from optparse import OptionParser class dial_tone_sink(gr.top_block): - def __init__(self, src, port, pkt_size, sample_rate, wait): + def __init__(self, host, port, pkt_size, sample_rate, eof, wait): gr.top_block.__init__(self, "dial_tone_sink") - udp = gr.udp_source(gr.sizeof_float, src, port, pkt_size, wait=wait) + udp = gr.udp_source(gr.sizeof_float, host, port, pkt_size, + eof=eof, wait=wait) sink = audio.sink(sample_rate) self.connect(udp, sink) if __name__ == '__main__': parser = OptionParser(option_class=eng_option) - parser.add_option("", "--src-name", type="string", default="localhost", + parser.add_option("", "--host", type="string", default="0.0.0.0", help="local host name (domain name or IP address)") - parser.add_option("", "--src-port", type="int", default=65500, + parser.add_option("", "--port", type="int", default=65500, help="port value to listen to for connection") parser.add_option("", "--packet-size", type="int", default=1472, help="packet size.") parser.add_option("-r", "--sample-rate", type="int", default=8000, help="audio signal sample rate [default=%default]") - parser.add_option("-n", "--no-wait", action="store_true", default=False, + parser.add_option("", "--no-eof", action="store_true", default=False, + help="don't send EOF on disconnect") + parser.add_option("", "--no-wait", action="store_true", default=False, help="don't wait for source") (options, args) = parser.parse_args() if len(args) != 0: @@ -49,9 +52,9 @@ if __name__ == '__main__': raise SystemExit, 1 # Create an instance of a hierarchical block - top_block = dial_tone_sink(options.src_name, options.src_port, + top_block = dial_tone_sink(options.host, options.port, options.packet_size, options.sample_rate, - not options.no_wait) + not options.no_eof, not options.no_wait) try: # Run forever diff --git a/gnuradio-examples/python/network/dial_tone_source.py b/gnuradio-examples/python/network/dial_tone_source.py index 835f9aaf..766ecf16 100755 --- a/gnuradio-examples/python/network/dial_tone_source.py +++ b/gnuradio-examples/python/network/dial_tone_source.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007 Free Software Foundation, Inc. +# Copyright 2006,2007,2010 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -25,7 +25,7 @@ from gnuradio.eng_option import eng_option from optparse import OptionParser class dial_tone_source(gr.top_block): - def __init__(self, src, dst, port, pkt_size, sample_rate): + def __init__(self, host, port, pkt_size, sample_rate, eof): gr.top_block.__init__(self, "dial_tone_source") amplitude = 0.3 @@ -35,31 +35,32 @@ class dial_tone_source(gr.top_block): # Throttle needed here to account for the other side's audio card sampling rate thr = gr.throttle(gr.sizeof_float, sample_rate) - sink = gr.udp_sink(gr.sizeof_float, src, 0, dst, port, pkt_size) + sink = gr.udp_sink(gr.sizeof_float, host, port, pkt_size, eof=eof) self.connect(src0, (add, 0)) self.connect(src1, (add, 1)) self.connect(add, thr, sink) if __name__ == '__main__': parser = OptionParser(option_class=eng_option) - parser.add_option("", "--src-name", type="string", default="localhost", - help="local host name (domain name or IP address)") - parser.add_option("", "--dst-name", type="string", default="localhost", + parser.add_option("", "--host", type="string", default="localhost", help="Remote host name (domain name or IP address") - parser.add_option("", "--dst-port", type="int", default=65500, - help="port value to connect to") + parser.add_option("", "--port", type="int", default=65500, + help="port number to connect to") parser.add_option("", "--packet-size", type="int", default=1472, help="packet size.") parser.add_option("-r", "--sample-rate", type="int", default=8000, help="audio signal sample rate [default=%default]") + parser.add_option("", "--no-eof", action="store_true", default=False, + help="don't send EOF on disconnect") (options, args) = parser.parse_args() if len(args) != 0: parser.print_help() raise SystemExit, 1 # Create an instance of a hierarchical block - top_block = dial_tone_source(options.src_name, options.dst_name, options.dst_port, - options.packet_size, options.sample_rate) + top_block = dial_tone_source(options.host, options.port, + options.packet_size, options.sample_rate, + not options.no_eof) try: # Run forever @@ -67,4 +68,3 @@ if __name__ == '__main__': except KeyboardInterrupt: # Ctrl-C exits pass - diff --git a/gnuradio-examples/python/network/vector_sink.py b/gnuradio-examples/python/network/vector_sink.py index 981cc598..5d73858a 100755 --- a/gnuradio-examples/python/network/vector_sink.py +++ b/gnuradio-examples/python/network/vector_sink.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006 Free Software Foundation, Inc. +# Copyright 2006,2010 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -25,28 +25,35 @@ from gnuradio.eng_option import eng_option from optparse import OptionParser class vector_sink(gr.top_block): - def __init__(self, src, port, pkt_size): + def __init__(self, host, port, pkt_size, eof, wait): gr.top_block.__init__(self, "vector_sink") - udp = gr.udp_source(gr.sizeof_float, src, port, pkt_size) + udp = gr.udp_source(gr.sizeof_float, host, port, pkt_size, + eof=eof, wait=wait) sink = gr.file_sink(gr.sizeof_float, "received.dat") self.connect(udp, sink) if __name__ == "__main__": parser = OptionParser(option_class=eng_option) - parser.add_option("", "--src-name", type="string", default="localhost", + parser.add_option("", "--host", type="string", default="0.0.0.0", help="local host name (domain name or IP address)") - parser.add_option("", "--src-port", type="int", default=65500, + parser.add_option("", "--port", type="int", default=65500, help="port value to listen to for connection") parser.add_option("", "--packet-size", type="int", default=1471, help="packet size.") + parser.add_option("", "--no-eof", action="store_true", default=False, + help="don't send EOF on disconnect") + parser.add_option("", "--no-wait", action="store_true", default=False, + help="don't wait for source") (options, args) = parser.parse_args() if len(args) != 0: parser.print_help() raise SystemExit, 1 # Create an instance of a hierarchical block - top_block = vector_sink(options.src_name, options.src_port, options.packet_size) + top_block = vector_sink(options.host, options.port, + options.packet_size, + not options.no_eof, not options.no_wait) try: # Run forever diff --git a/gnuradio-examples/python/network/vector_source.py b/gnuradio-examples/python/network/vector_source.py index e7ec2a46..0e7d6784 100755 --- a/gnuradio-examples/python/network/vector_source.py +++ b/gnuradio-examples/python/network/vector_source.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006 Free Software Foundation, Inc. +# Copyright 2006,2010 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -25,31 +25,31 @@ from gnuradio.eng_option import eng_option from optparse import OptionParser class vector_source(gr.top_block): - def __init__(self, src, dst, port, pkt_size): + def __init__(self, host, port, pkt_size, eof): gr.top_block.__init__(self, "vector_source") data = [i*0.01 for i in range(1000)] vec = gr.vector_source_f(data, True) - udp = gr.udp_sink(gr.sizeof_float, src, 0, dst, port, pkt_size) + udp = gr.udp_sink(gr.sizeof_float, host, port, pkt_size, eof=eof) self.connect(vec, udp) if __name__ == '__main__': parser = OptionParser(option_class=eng_option) - parser.add_option("", "--src-name", type="string", default="localhost", - help="local host name (domain name or IP address)") - parser.add_option("", "--dst-name", type="string", default="localhost", + parser.add_option("", "--host", type="string", default="localhost", help="Remote host name (domain name or IP address") - parser.add_option("", "--dst-port", type="int", default=65500, - help="port value to connect to") + parser.add_option("", "--port", type="int", default=65500, + help="port number to connect to") parser.add_option("", "--packet-size", type="int", default=1471, help="packet size.") + parser.add_option("", "--no-eof", action="store_true", default=False, + help="don't send EOF on disconnect") (options, args) = parser.parse_args() if len(args) != 0: parser.print_help() raise SystemExit, 1 # Create an instance of a hierarchical block - top_block = vector_source(options.src_name, options.dst_name, - options.dst_port, options.packet_size) + top_block = vector_source(options.host, options.port, options.packet_size, + not options.no_eof) try: # Run forever