Updates to udp source/sink (select(), wait, cleanup)
[debian/gnuradio] / gnuradio-core / src / lib / io / gr_udp_sink.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 #include <gr_udp_sink.h>
27 #include <gr_io_signature.h>
28 #include <stdexcept>
29 #include <errno.h>
30 #include <stdio.h>
31 #include <string.h>
32 #if defined(HAVE_NETDB_H)
33 typedef void* optval_t;
34 #elif defined(HAVE_WINDOWS_H)
35 // if not posix, assume winsock
36 #define USING_WINSOCK
37 #include <winsock2.h>
38 #include <ws2tcpip.h>
39 #define SHUT_RDWR 2
40 typedef char* optval_t;
41 #endif
42
43 #include <gruel/thread.h>
44
45 #define SNK_VERBOSE 0
46
47 static int is_error( int perr )
48 {
49   // Compare error to posix error code; return nonzero if match.
50 #if defined(USING_WINSOCK)
51 #define ENOPROTOOPT 109
52 #define ECONNREFUSED 111
53   // All codes to be checked for must be defined below
54   int werr = WSAGetLastError();
55   switch( werr ) {
56   case WSAETIMEDOUT:
57     return( perr == EAGAIN );
58   case WSAENOPROTOOPT:
59     return( perr == ENOPROTOOPT );
60   case WSAECONNREFUSED:
61     return( perr == ECONNREFUSED );
62   default:
63     fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr );
64     throw std::runtime_error("internal error");
65   }
66   return 0;
67 #else
68   return( perr == errno );
69 #endif
70 }
71
72 static void report_error( const char *msg1, const char *msg2 )
73 {
74   // Deal with errors, both posix and winsock
75 #if defined(USING_WINSOCK)
76   int werr = WSAGetLastError();
77   fprintf(stderr, "%s: winsock error %d\n", msg1, werr );
78 #else
79   perror(msg1);
80 #endif
81   if( msg2 != NULL )
82     throw std::runtime_error(msg2);
83   return;
84 }
85
86 gr_udp_sink::gr_udp_sink (size_t itemsize, 
87                           const char *src, unsigned short port_src,
88                           const char *dst, unsigned short port_dst,
89                           int payload_size)
90   : gr_sync_block ("udp_sink",
91                    gr_make_io_signature (1, 1, itemsize),
92                    gr_make_io_signature (0, 0, 0)),
93     d_itemsize (itemsize), d_payload_size(payload_size)
94 {
95   int ret = 0;
96   struct addrinfo *ip_src;        // store the source ip info
97   struct addrinfo *ip_dst;        // store the destination ip info
98
99 #if defined(USING_WINSOCK) // for Windows (with MinGW)
100   // initialize winsock DLL
101   WSADATA wsaData;
102   int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
103   if( iResult != NO_ERROR ) {
104     report_error( "gr_udp_source WSAStartup", "can't open socket" );
105   }
106 #endif
107   
108   // Set up the address stucture for the source address and port numbers
109   // Get the source IP address from the host name
110   struct addrinfo hints;
111   memset( (void*)&hints, 0, sizeof(hints) );
112   hints.ai_family = AF_INET;
113   hints.ai_socktype = SOCK_DGRAM;
114   hints.ai_protocol = IPPROTO_UDP;
115   char port_str[7];
116   sprintf( port_str, "%d", port_src );
117   ret = getaddrinfo( src, port_str, &hints, &ip_src );
118   if( ret != 0 )
119     report_error("gr_udp_source/getaddrinfo",
120                  "can't initialize source socket" );
121
122   // Get the destination IP address from the host name
123   sprintf( port_str, "%d", port_dst );
124   ret = getaddrinfo( dst, port_str, &hints, &ip_dst );
125   if( ret != 0 )
126     report_error("gr_udp_source/getaddrinfo",
127                  "can't initialize destination socket" );
128
129   // create socket
130   d_socket = socket(ip_src->ai_family, ip_src->ai_socktype,
131                     ip_src->ai_protocol);
132   if(d_socket == -1) {
133     report_error("socket open","can't open socket");
134   }
135
136   // Turn on reuse address
137   int opt_val = true;
138   if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (optval_t)&opt_val, sizeof(int)) == -1) {
139     report_error("SO_REUSEADDR","can't set socket option SO_REUSEADDR");
140   }
141
142   // Don't wait when shutting down
143   linger lngr;
144   lngr.l_onoff  = 1;
145   lngr.l_linger = 0;
146   if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) {
147     if( !is_error(ENOPROTOOPT) ) {  // no SO_LINGER for SOCK_DGRAM on Windows
148       report_error("SO_LINGER","can't set socket option SO_LINGER");
149     }
150   }
151
152   // bind socket to an address and port number to listen on
153   if(bind (d_socket, ip_src->ai_addr, ip_src->ai_addrlen) == -1) {
154     report_error("socket bind","can't bind socket");
155   }
156
157   // Not sure if we should throw here or allow retries
158   if(connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) {
159     report_error("socket connect","can't connect to socket");
160   }
161
162   freeaddrinfo(ip_src);
163   freeaddrinfo(ip_dst);
164 }
165
166 // public constructor that returns a shared_ptr
167
168 gr_udp_sink_sptr
169 gr_make_udp_sink (size_t itemsize, 
170                   const char *src, unsigned short port_src,
171                   const char *dst, unsigned short port_dst,
172                   int payload_size)
173 {
174   return gr_udp_sink_sptr (new gr_udp_sink (itemsize, 
175                                             src, port_src,
176                                             dst, port_dst,
177                                             payload_size));
178 }
179
180 gr_udp_sink::~gr_udp_sink ()
181 {
182   if (d_socket){
183     shutdown(d_socket, SHUT_RDWR);
184 #if defined(USING_WINSOCK)
185     closesocket(d_socket);
186 #else
187     ::close(d_socket);
188 #endif
189     d_socket = 0;
190   }
191
192 #if defined(USING_WINSOCK) // for Windows (with MinGW)
193   // free winsock resources
194   WSACleanup();
195 #endif
196 }
197
198 int 
199 gr_udp_sink::work (int noutput_items,
200                    gr_vector_const_void_star &input_items,
201                    gr_vector_void_star &output_items)
202 {
203   const char *in = (const char *) input_items[0];
204   ssize_t r=0, bytes_sent=0, bytes_to_send=0;
205   ssize_t total_size = noutput_items*d_itemsize;
206
207   #if SNK_VERBOSE
208   printf("Entered udp_sink\n");
209   #endif
210
211   while(bytes_sent <  total_size) {
212     bytes_to_send = std::min((ssize_t)d_payload_size, (total_size-bytes_sent));
213   
214     r = send(d_socket, (in+bytes_sent), bytes_to_send, 0);
215     if(r == -1) {         // error on send command
216       if( is_error(ECONNREFUSED) )
217         r = bytes_to_send;  // discard data until receiver is started
218       else {
219         report_error("udp_sink",NULL); // there should be no error case where
220         return -1;                  // this function should not exit immediately
221       }
222     }
223     bytes_sent += r;
224     
225     #if SNK_VERBOSE
226     printf("\tbyte sent: %d bytes\n", bytes);
227     #endif
228   }
229
230   #if SNK_VERBOSE
231   printf("Sent: %d bytes (noutput_items: %d)\n", bytes_sent, noutput_items);
232   #endif
233
234   return noutput_items;
235 }