merged changeset r4281:4292 on trondeau/ethernet into trunk
[debian/gnuradio] / gnuradio-core / src / lib / io / gr_udp_sink.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2004 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gr_udp_sink.h>
28 #include <gr_io_signature.h>
29 #include <cstdio>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <fcntl.h>
33 #include <stdexcept>
34
35 #define SNK_VERBOSE 0
36
37 gr_udp_sink::gr_udp_sink (size_t itemsize, 
38                           const char *ipaddrl, unsigned short portl,
39                           const char *ipaddrr, unsigned short portr,
40                           unsigned int mtu)
41   : gr_sync_block ("udp_sink",
42                    gr_make_io_signature (1, 1, itemsize),
43                    gr_make_io_signature (0, 0, 0)),
44     d_itemsize (itemsize), d_updated(false), d_mtu(mtu)
45 {
46   // Set up the address stucture for the local address and port numbers
47   inet_aton(ipaddrl, &d_ipaddr_local);     // format IP address
48   inet_aton(ipaddrr, &d_ipaddr_remote);    // format IP address
49   d_port_local  = htons(portl);            // format port number
50   d_port_remote = htons(portr);            // format port number
51
52   d_sockaddr_local.sin_family = AF_INET;
53   d_sockaddr_local.sin_addr   = d_ipaddr_local;
54   d_sockaddr_local.sin_port   = d_port_local;
55
56   d_sockaddr_remote.sin_family = AF_INET;
57   d_sockaddr_remote.sin_addr   = d_ipaddr_remote;
58   d_sockaddr_remote.sin_port   = d_port_remote;
59   
60   open();
61 }
62
63 // public constructor that returns a shared_ptr
64
65 gr_udp_sink_sptr
66 gr_make_udp_sink (size_t itemsize, 
67                   const char *ipaddrl, unsigned short portl,
68                   const char *ipaddrr, unsigned short portr,
69                   unsigned int mtu)
70 {
71   return gr_udp_sink_sptr (new gr_udp_sink (itemsize, 
72                                                       ipaddrl, portl,
73                                                       ipaddrr, portr,
74                                                       mtu));
75 }
76
77 gr_udp_sink::~gr_udp_sink ()
78 {
79   close();
80 }
81
82 bool
83 gr_udp_sink::open()
84 {
85   omni_mutex_lock l(d_mutex);   // hold mutex for duration of this function
86
87   // create socket
88   if((d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == NULL) {
89     perror("socket open");
90     throw std::runtime_error("can't open socket");
91   }
92
93   // Turn on reuse address
94   bool opt_val = true;
95   if(setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, (void*)&opt_val, sizeof(int))) {
96     perror("SO_REUSEADDR");
97     throw std::runtime_error("can't set socket option SO_REUSEADDR");
98   }
99
100   // Don't wait when shutting down
101   linger lngr;
102   lngr.l_onoff  = 1;
103   lngr.l_linger = 0;
104   if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (void*)&lngr, sizeof(linger))) {
105     perror("SO_LINGER");
106     throw std::runtime_error("can't set socket option SO_LINGER");
107   }
108
109   // bind socket to an address and port number to listen on
110   if(bind (d_socket, (sockaddr*)&d_sockaddr_local, sizeof(struct sockaddr))) {
111     perror("socket bind");
112     throw std::runtime_error("can't bind socket");
113   }
114
115   // Not sure if we should throw here or allow retries
116   if(connect(d_socket, (sockaddr*)&d_sockaddr_remote, sizeof(struct sockaddr))) {
117     perror("socket connect");
118     throw std::runtime_error("can't connect to socket");
119   }
120
121   d_updated = true;
122   return d_socket != 0;
123 }
124
125 void
126 gr_udp_sink::close()
127 {
128   omni_mutex_lock l(d_mutex);   // hold mutex for duration of this function
129
130   if (d_socket){
131     shutdown(d_socket, SHUT_RDWR);
132     d_socket = 0;
133   }
134   d_updated = true;
135 }
136
137 int 
138 gr_udp_sink::work (int noutput_items,
139                    gr_vector_const_void_star &input_items,
140                    gr_vector_void_star &output_items)
141 {
142   char *in = (char *) input_items[0];
143   socklen_t bytes=0, bytes_sent=0, bytes_to_send=0;
144   unsigned int total_size = noutput_items*d_itemsize;
145
146   while(bytes_sent < total_size) {
147     bytes_to_send = (bytes_sent+d_mtu < total_size ? d_mtu : total_size-bytes_sent);
148     bytes =send(d_socket, (in+bytes_sent), bytes_to_send, MSG_DONTWAIT);
149     bytes_sent += bytes;
150   }
151
152   #if SNK_VERBOSE
153   printf("Sent: %d bytes (noutput_items: %d)\n", bytes_sent, noutput_items);
154   #endif
155
156   return noutput_items;
157 }