Identify memory leaks that occur on error conditions
[debian/gnuradio] / gnuradio-core / src / lib / io / gr_udp_sink.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 #include <gr_udp_sink.h>
27 #include <gr_io_signature.h>
28 #include <stdexcept>
29 #include <errno.h>
30 #include <stdio.h>
31 #include <string.h>
32 #if defined(HAVE_NETDB_H)
33 #include <netdb.h>
34 #ifdef HAVE_SYS_TYPES_H
35 #include <sys/types.h>
36 #endif
37 #ifdef HAVE_SYS_SOCKET_H
38 #include <sys/socket.h>  //usually included by <netdb.h>?
39 #endif
40 typedef void* optval_t;
41 #elif defined(HAVE_WINDOWS_H)
42 // if not posix, assume winsock
43 #define USING_WINSOCK
44 #include <winsock2.h>
45 #include <ws2tcpip.h>
46 #define SHUT_RDWR 2
47 typedef char* optval_t;
48 #endif
49
50 #include <gruel/thread.h>
51
52 #define SNK_VERBOSE 0
53
54 static int is_error( int perr )
55 {
56   // Compare error to posix error code; return nonzero if match.
57 #if defined(USING_WINSOCK)
58 #define ENOPROTOOPT 109
59 #define ECONNREFUSED 111
60   // All codes to be checked for must be defined below
61   int werr = WSAGetLastError();
62   switch( werr ) {
63   case WSAETIMEDOUT:
64     return( perr == EAGAIN );
65   case WSAENOPROTOOPT:
66     return( perr == ENOPROTOOPT );
67   case WSAECONNREFUSED:
68     return( perr == ECONNREFUSED );
69   default:
70     fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr );
71     throw std::runtime_error("internal error");
72   }
73   return 0;
74 #else
75   return( perr == errno );
76 #endif
77 }
78
79 static void report_error( const char *msg1, const char *msg2 )
80 {
81   // Deal with errors, both posix and winsock
82 #if defined(USING_WINSOCK)
83   int werr = WSAGetLastError();
84   fprintf(stderr, "%s: winsock error %d\n", msg1, werr );
85 #else
86   perror(msg1);
87 #endif
88   if( msg2 != NULL )
89     throw std::runtime_error(msg2);
90   return;
91 }
92
93 gr_udp_sink::gr_udp_sink (size_t itemsize, 
94                           const char *host, unsigned short port,
95                           int payload_size, bool eof)
96   : gr_sync_block ("udp_sink",
97                    gr_make_io_signature (1, 1, itemsize),
98                    gr_make_io_signature (0, 0, 0)),
99     d_itemsize (itemsize), d_payload_size(payload_size), d_eof(eof),
100     d_socket(-1), d_connected(false)
101 {
102 #if defined(USING_WINSOCK) // for Windows (with MinGW)
103   // initialize winsock DLL
104   WSADATA wsaData;
105   int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
106   if( iResult != NO_ERROR ) {
107     report_error( "gr_udp_source WSAStartup", "can't open socket" );
108   }
109 #endif
110
111   // create socket
112   d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
113   if(d_socket == -1) {
114     report_error("socket open","can't open socket");
115   }
116
117   // Don't wait when shutting down
118   linger lngr;
119   lngr.l_onoff  = 1;
120   lngr.l_linger = 0;
121   if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) {
122     if( !is_error(ENOPROTOOPT) ) {  // no SO_LINGER for SOCK_DGRAM on Windows
123       report_error("SO_LINGER","can't set socket option SO_LINGER");
124     }
125   }
126
127   // Get the destination address
128   connect(host, port);
129 }
130
131 // public constructor that returns a shared_ptr
132
133 gr_udp_sink_sptr
134 gr_make_udp_sink (size_t itemsize, 
135                   const char *host, unsigned short port,
136                   int payload_size, bool eof)
137 {
138   return gr_udp_sink_sptr (new gr_udp_sink (itemsize, 
139                                             host, port,
140                                             payload_size, eof));
141 }
142
143 gr_udp_sink::~gr_udp_sink ()
144 {
145   if (d_connected)
146     disconnect();
147
148   if (d_socket != -1){
149     shutdown(d_socket, SHUT_RDWR);
150 #if defined(USING_WINSOCK)
151     closesocket(d_socket);
152 #else
153     ::close(d_socket);
154 #endif
155     d_socket = -1;
156   }
157
158 #if defined(USING_WINSOCK) // for Windows (with MinGW)
159   // free winsock resources
160   WSACleanup();
161 #endif
162 }
163
164 int 
165 gr_udp_sink::work (int noutput_items,
166                    gr_vector_const_void_star &input_items,
167                    gr_vector_void_star &output_items)
168 {
169   const char *in = (const char *) input_items[0];
170   ssize_t r=0, bytes_sent=0, bytes_to_send=0;
171   ssize_t total_size = noutput_items*d_itemsize;
172
173   #if SNK_VERBOSE
174   printf("Entered udp_sink\n");
175   #endif
176
177   gruel::scoped_lock guard(d_mutex);  // protect d_socket
178
179   while(bytes_sent <  total_size) {
180     bytes_to_send = std::min((ssize_t)d_payload_size, (total_size-bytes_sent));
181   
182     if(d_connected) {
183       r = send(d_socket, (in+bytes_sent), bytes_to_send, 0);
184       if(r == -1) {         // error on send command
185         if( is_error(ECONNREFUSED) )
186           r = bytes_to_send;  // discard data until receiver is started
187         else {
188           report_error("udp_sink",NULL); // there should be no error case where
189           return -1;                  // this function should not exit immediately
190         }
191       }
192     }
193     else
194       r = bytes_to_send;  // discarded for lack of connection
195     bytes_sent += r;
196     
197     #if SNK_VERBOSE
198     printf("\tbyte sent: %d bytes\n", r);
199     #endif
200   }
201
202   #if SNK_VERBOSE
203   printf("Sent: %d bytes (noutput_items: %d)\n", bytes_sent, noutput_items);
204   #endif
205
206   return noutput_items;
207 }
208
209 void gr_udp_sink::connect( const char *host, unsigned short port )
210 {
211   if(d_connected)
212     disconnect();
213
214   if(host != NULL ) {
215     // Get the destination address
216     struct addrinfo *ip_dst;
217     struct addrinfo hints;
218     memset( (void*)&hints, 0, sizeof(hints) );
219     hints.ai_family = AF_INET;
220     hints.ai_socktype = SOCK_DGRAM;
221     hints.ai_protocol = IPPROTO_UDP;
222     char port_str[12];
223     sprintf( port_str, "%d", port );
224
225     // FIXME leaks if report_error throws below
226     int ret = getaddrinfo( host, port_str, &hints, &ip_dst );
227     if( ret != 0 )
228       report_error("gr_udp_source/getaddrinfo",
229                    "can't initialize destination socket" );
230
231     // don't need d_mutex lock when !d_connected
232     if(::connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) {
233       report_error("socket connect","can't connect to socket");
234     }
235     d_connected = true;
236
237     freeaddrinfo(ip_dst);
238   }
239
240   return;
241 }
242
243 void gr_udp_sink::disconnect()
244 {
245   if(!d_connected)
246     return;
247
248   #if SNK_VERBOSE
249   printf("gr_udp_sink disconnecting\n");
250   #endif
251
252   gruel::scoped_lock guard(d_mutex);  // protect d_socket from work()
253
254   // Send a few zero-length packets to signal receiver we are done
255   if(d_eof) {
256     int i;
257     for( i = 0; i < 3; i++ )
258       (void) send( d_socket, NULL, 0, 0 );  // ignore errors
259   }
260
261   // Sending EOF can produce ERRCONNREFUSED errors that won't show up
262   //  until the next send or recv, which might confuse us if it happens
263   //  on a new connection.  The following does a nonblocking recv to
264   //  clear any such errors.
265   timeval timeout;
266   timeout.tv_sec = 0;    // zero time for immediate return
267   timeout.tv_usec = 0;
268   fd_set readfds;
269   FD_ZERO(&readfds);
270   FD_SET(d_socket, &readfds);
271   int r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
272   if(r < 0) {
273       #if SNK_VERBOSE
274       report_error("udp_sink/select",NULL);
275       #endif
276   }
277   else if(r > 0) {  // call recv() to get error return
278     r = recv(d_socket, (char*)&readfds, sizeof(readfds), 0);
279     if(r < 0) {
280         #if SNK_VERBOSE
281         report_error("udp_sink/recv",NULL);
282         #endif
283     }
284   }
285
286   // Since I can't find any way to disconnect a datagram socket in Cygwin,
287   // we just leave it connected but disable sending.
288 #if 0
289   // zeroed address structure should reset connection
290   struct sockaddr addr;
291   memset( (void*)&addr, 0, sizeof(addr) );
292   // addr.sa_family = AF_UNSPEC;  // doesn't work on Cygwin
293   // addr.sa_family = AF_INET;  // doesn't work on Cygwin
294
295   if(::connect(d_socket, &addr, sizeof(addr)) == -1)
296     report_error("socket connect","can't connect to socket");
297 #endif
298
299   d_connected = false;
300
301   return;
302 }