EEROS  1.0.0.0
API for the EEROS Real-Time Robotics Framework
SocketServer.hpp
Go to the documentation of this file.
1 #ifndef ORG_EEROS_SOCKET_SERVER_HPP_
2 #define ORG_EEROS_SOCKET_SERVER_HPP_
3 
4 #include <eeros/core/Thread.hpp>
5 #include <atomic>
6 #include <array>
7 #include <unistd.h>
8 #include <netdb.h>
9 #include <arpa/inet.h> /* inet_ntoa() to format IP address */
10 #include <string.h>
11 #include <sys/socket.h>
12 #include <fcntl.h>
13 #include <eeros/core/Fault.hpp>
14 #include <iostream>
15 #include <cstring>
16 #include <signal.h>
17 
18 namespace eeros {
19  namespace sockets {
20 
21  void sigPipeHandler(int signum);
22 
23  template < uint32_t BufInLen, typename inT, uint32_t BufOutLen, typename outT >
24  class SocketServer : public eeros::Thread {
25  public:
26  SocketServer(uint16_t port, double period = 0.01, double timeout = 1.0) : read1({0}), read2({0}), read3({0}) {
27  this->port = port;
28  this->period = period;
29  this->timeout = timeout;
30  signal(SIGPIPE, sigPipeHandler); // make sure, that a broken pipe does not stop application
31  read_ptr.store(&read1);
32  send_ptr.store(&send1);
33  running = false;
34  }
35 
36  virtual ~SocketServer() {
37  join();
38  }
39 
40  virtual void stop() {
41  running = false;
42  }
43 
44  virtual bool isRunning() {
45  return running;
46  }
47 
48  virtual std::array<outT, BufOutLen>& getReceiveBuffer() {
49  return *read_ptr.load();
50  }
51 
52  virtual void setSendBuffer(std::array<inT, BufInLen>& data) {
53  auto p = send_ptr.load();
54  if (p == &send1) send1 = data;
55  else if (p == &send2) send2 = data;
56  else if (p == &send3) send3 = data;
57  }
58 
59  outT readbuffer;
60  bool newData = false;
61 
62  private:
63  virtual void run() {
64  log.info() << "SocketServer thread started";
65  struct sockaddr_in servAddr;
66  sockfd = socket(AF_INET, SOCK_STREAM, 0);
67  if (sockfd < 0) throw Fault("ERROR opening socket");
68 
69  bzero((char *) &servAddr, sizeof(servAddr));
70  servAddr.sin_port = htons(port);
71  servAddr.sin_family = AF_INET;
72  servAddr.sin_addr.s_addr = htonl(INADDR_ANY) ;
73  int yes = 1;
74  if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1)
75  throw Fault("ERROR on set socket option");
76  if (bind(sockfd, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0)
77  throw Fault("ERROR on socket binding");
78 
79  socklen_t clilen;
80  listen(sockfd,1);
81  struct sockaddr_in cliAddr;
82  clilen = sizeof(cliAddr);
83 
84  running = true;
85  while (running) {
86  newsockfd = accept(sockfd, (struct sockaddr *) &cliAddr, &clilen);
87  if (newsockfd < 0) throw Fault("ERROR on socket accept");
88  bool connected = true;
89  char cliName[INET6_ADDRSTRLEN];
90  getnameinfo((struct sockaddr*)&cliAddr, sizeof cliAddr, cliName, sizeof(cliName), NULL, 0, NI_NUMERICHOST|NI_NUMERICSERV);
91  log.info() << "Client connection from ip=" << cliName << " accepted";
92  inT b_write[BufInLen]; outT b_read[BufOutLen];
93  using seconds = std::chrono::duration<double, std::chrono::seconds::period>;
94  auto next_cycle = std::chrono::steady_clock::now() + seconds(period);
95 
96  while (connected) {
97  std::this_thread::sleep_until(next_cycle);
98 
99  // write
100  std::array<inT, BufInLen> &sendValue = getNextSendBuffer();
101  for(int i = 0; i < BufInLen; i++) b_write[i] = sendValue[i];
102 // log.trace() << "try to write " << b_write[0];
103  int n = write(newsockfd, b_write, BufInLen * sizeof(inT));
104  if (n < 0) {
105  log.trace() << "error = " << std::strerror(errno);
106  connected = false;
107  }
108 
109  // read
110  size_t count = BufOutLen * sizeof(outT);
111  uint8_t* ptr = (uint8_t *)b_read;
112  auto endTime = std::chrono::steady_clock::now() + seconds(timeout);
113  while (connected && count) {
114  if (std::chrono::steady_clock::now() > endTime) {
115  log.trace() << "error = socket read timed out";
116  connected = false;
117  }
118 // log.trace() << "try to read " << count << " Bytes";
119  n = read(newsockfd, ptr, count);
120  if (n < 0) {
121  log.trace() << "error = " << std::strerror(errno);
122  connected = false;
123  }
124  ptr += n;
125  count -= n;
126  }
127  std::array<outT, BufOutLen> &readValue = getNextReceiveBuffer();
128  for(int i = 0; i < BufOutLen; i++) readValue[i] = b_read[i];
129  newData = true;
130  flip();
131  next_cycle += seconds(period);
132  }
133  close(newsockfd);
134  // if disconnected clear receive buffer
135  std::array<outT, BufOutLen> &readValue = getNextReceiveBuffer();
136  for(int i = 0; i < BufOutLen; i++) readValue[i] = 0;
137  newData = true;
138  flip();
139  }
140  close(sockfd);
141  }
142 
143  std::array<outT, BufOutLen>& getNextReceiveBuffer() {
144  auto p = read_ptr.load();
145  if (p == &read1) return read2;
146  else if (p == &read2) return read3;
147  else if (p == &read3) return read1;
148  }
149 
150  std::array<inT, BufInLen>& getNextSendBuffer() {
151  auto p = send_ptr.load();
152  if (p == &send1) return send2;
153  else if (p == &send2) return send3;
154  else if (p == &send3) return send1;
155  }
156 
157  void flip(){
158  read_ptr.store(&getNextReceiveBuffer());
159  send_ptr.store(&getNextSendBuffer());
160  }
161 
162  bool running;
163  uint16_t port;
164  double period;
165  double timeout; // time which thread tries to read until socket read timed out
166  struct hostent *server;
167  int sockfd;
168  int newsockfd;
169 
170  std::array<outT, BufOutLen> read1, read2, read3;
171  std::array<inT, BufInLen> send1, send2, send3;
172  std::atomic< std::array<outT, BufOutLen>* > read_ptr;
173  std::atomic< std::array<inT, BufInLen>* > send_ptr;
174  };
175 
176  // specialization used when server doesn't receive data from its client
177  template < uint32_t BufInLen, typename inT >
178  class SocketServer<BufInLen, inT, 0, std::nullptr_t> : public eeros::Thread {
179  public:
180  SocketServer(uint16_t port, double period = 0.01) {
181  this->port = port;
182  this->period = period;
183  signal(SIGPIPE, sigPipeHandler); // make sure, that a broken pipe does not stop application
184  send_ptr.store(&send1);
185  running = false;
186  }
187 
188  virtual ~SocketServer() {
189  join();
190  }
191 
192  virtual void stop(){
193  running = false;
194  }
195 
196  virtual bool isRunning() {
197  return running;
198  }
199 
200  virtual void setSendBuffer(std::array<inT, BufInLen>& data) {
201  auto p = send_ptr.load();
202  if (p == &send1) send1 = data;
203  else if (p == &send2) send2 = data;
204  else if (p == &send3) send3 = data;
205  }
206 
207  private:
208  virtual void run() {
209  log.info() << "SocketServer thread started";
210  struct sockaddr_in servAddr;
211  sockfd = socket(AF_INET, SOCK_STREAM, 0);
212  if (sockfd < 0) throw Fault("ERROR opening socket");
213 
214  bzero((char *) &servAddr, sizeof(servAddr));
215  servAddr.sin_port = htons(port);
216  servAddr.sin_family = AF_INET;
217  servAddr.sin_addr.s_addr = htonl(INADDR_ANY) ;
218  int yes = 1;
219  if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1)
220  throw Fault("ERROR on set socket option");
221  if (bind(sockfd, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0)
222  throw Fault("ERROR on socket binding");
223 
224  socklen_t clilen;
225  listen(sockfd,1);
226  struct sockaddr_in cliAddr;
227  clilen = sizeof(cliAddr);
228 
229  running = true;
230  while (running) {
231  newsockfd = accept(sockfd, (struct sockaddr *) &cliAddr, &clilen);
232  if (newsockfd < 0) throw Fault("ERROR on accept");
233  bool connected = true;
234  char cliName[INET6_ADDRSTRLEN];
235  getnameinfo((struct sockaddr*)&cliAddr, sizeof cliAddr, cliName, sizeof(cliName), NULL, 0, NI_NUMERICHOST|NI_NUMERICSERV);
236  log.info() << "Client connection from ip=" << cliName << " accepted";
237  inT b_write[BufInLen];
238  using seconds = std::chrono::duration<double, std::chrono::seconds::period>;
239  auto next_cycle = std::chrono::steady_clock::now() + seconds(period);
240 
241  while (connected) {
242  std::this_thread::sleep_until(next_cycle);
243 
244  // write
245  std::array<inT, BufInLen> &sendValue = getNextSendBuffer();
246  for(int i = 0; i < BufInLen; i++) b_write[i] = sendValue[i];
247 // log.trace() << "try to write " << b_write[0];
248  int n = write(newsockfd, b_write, BufInLen * sizeof(inT));
249  if (n < 0) {
250  log.trace() << "error = " << std::strerror(errno);
251  connected = false;
252  }
253 
254  flip();
255  next_cycle += seconds(period);
256  }
257  close(newsockfd);
258  }
259  close(sockfd);
260  }
261 
262  std::array<inT, BufInLen>& getNextSendBuffer() {
263  auto p = send_ptr.load();
264  if (p == &send1) return send2;
265  else if (p == &send2) return send3;
266  else if (p == &send3) return send1;
267  }
268 
269  void flip(){
270  send_ptr.store(&getNextSendBuffer());
271  }
272 
273  bool running;
274  uint16_t port;
275  double period;
276  struct hostent *server;
277  int sockfd;
278  int newsockfd;
279 
280  std::array<inT, BufInLen> send1, send2, send3;
281  std::atomic< std::array<inT, BufInLen>* > send_ptr;
282  };
283 
284  };
285 };
286 
287 #endif // ORG_EEROS_SOCKET_SERVER_HPP_
virtual ~SocketServer()
Definition: SocketServer.hpp:188
SocketServer(uint16_t port, double period=0.01)
Definition: SocketServer.hpp:180
virtual void stop()
Definition: SocketServer.hpp:192
outT readbuffer
Definition: SocketServer.hpp:59
virtual ~SocketServer()
Definition: SocketServer.hpp:36
Definition: Config.hpp:14
Definition: Thread.hpp:12
Definition: Fault.hpp:9
LogEntry info()
Definition: Logger.hpp:27
virtual std::array< outT, BufOutLen > & getReceiveBuffer()
Definition: SocketServer.hpp:48
volatile bool running
Definition: Executor.cpp:25
virtual void join()
Definition: Thread.cpp:29
eeros::logger::Logger log
Definition: Thread.hpp:26
virtual bool isRunning()
Definition: SocketServer.hpp:196
virtual void setSendBuffer(std::array< inT, BufInLen > &data)
Definition: SocketServer.hpp:52
void sigPipeHandler(int signum)
Definition: SocketServer.cpp:6
virtual void stop()
Definition: SocketServer.hpp:40
LogEntry trace()
Definition: Logger.hpp:28
Definition: SocketServer.hpp:24
virtual bool isRunning()
Definition: SocketServer.hpp:44
bool newData
Definition: SocketServer.hpp:60
virtual void setSendBuffer(std::array< inT, BufInLen > &data)
Definition: SocketServer.hpp:200
SocketServer(uint16_t port, double period=0.01, double timeout=1.0)
Definition: SocketServer.hpp:26