EEROS  1.0.0.0
API for the EEROS Real-Time Robotics Framework
SocketClient.hpp
Go to the documentation of this file.
1 #ifndef ORG_EEROS_SOCKET_CLIENT_HPP_
2 #define ORG_EEROS_SOCKET_CLIENT_HPP_
3 
4 #include <eeros/core/Thread.hpp>
5 #include <atomic>
6 #include <array>
7 #include <unistd.h>
8 #include <netdb.h>
9 #include <arpa/inet.h> /* inet_ntoa() to format IP address */
10 #include <string.h>
11 #include <sys/socket.h>
12 #include <fcntl.h>
13 #include <eeros/core/Fault.hpp>
14 #include <iostream>
15 #include <cstring>
16 #include <signal.h>
17 
18 namespace eeros {
19  namespace sockets {
20 
21  template < uint32_t BufInLen, typename inT, uint32_t BufOutLen, typename outT >
22  class SocketClient : public eeros::Thread {
23  public:
24  SocketClient(std::string serverIP, uint16_t port, double period = 0.01, double timeout = 1.0) : read1({0}), read2({0}), read3({0}) {
25  this->port = port;
26  this->period = period;
27  this->timeout = timeout;
28  this->serverIP = serverIP;
29  signal(SIGPIPE, sigPipeHandler); // make sure, that a broken pipe does not stop application
30  read_ptr.store(&read1);
31  send_ptr.store(&send1);
32  running = false;
33  }
34 
35  virtual ~SocketClient() {
36  join();
37  }
38 
39  virtual void stop() {
40  running = false;
41  }
42 
43  virtual bool isRunning() {
44  return running;
45  }
46 
47  virtual std::array<outT, BufOutLen>& getReceiveBuffer() {
48  return *read_ptr.load();
49  }
50 
51  virtual void setSendBuffer(std::array<inT, BufInLen>& data) {
52  auto p = send_ptr.load();
53  if (p == &send1) send1 = data;
54  else if (p == &send2) send2 = data;
55  else if (p == &send3) send3 = data;
56  }
57 
58  outT readbuffer;
59  bool newData = false;
60 
61  private:
62  virtual void run() {
63  log.info() << "SocketClient thread started";
64  running = true;
65  while (running) {
66  struct sockaddr_in servAddr;
67  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
68  if (sockfd < 0) throw Fault("ERROR opening socket");
69 
70  auto server = gethostbyname(serverIP.c_str());
71  if (server == NULL) {
72  throw Fault("Server ip not found");
73  }
74  bzero((char *) &servAddr, sizeof(servAddr));
75  servAddr.sin_family = AF_INET;
76  bcopy((char *)server->h_addr,(char *)&servAddr.sin_addr.s_addr, server->h_length);
77  servAddr.sin_port = htons(port);
78 
79  using seconds = std::chrono::duration<double, std::chrono::seconds::period>;
80  auto next_cycle = std::chrono::steady_clock::now() + seconds(period);
81  while (connect(sockfd, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) {
82  std::this_thread::sleep_until(next_cycle);
83  next_cycle += seconds(period);
84  }
85  log.info() << "Client connected to ip=" << serverIP;
86  inT b_write[BufInLen]; outT b_read[BufOutLen];
87  bool connected = true;
88 
89  while (connected) {
90  std::this_thread::sleep_until(next_cycle);
91 
92  // write
93  std::array<inT, BufInLen> &sendValue = getNextSendBuffer();
94  for(int i = 0; i < BufInLen; i++) b_write[i] = sendValue[i];
95 // log.trace() << "try to write " << b_write[0];
96  int n = write(sockfd, b_write, BufInLen * sizeof(inT));
97  if (n < 0) {
98  log.trace() << "error = " << std::strerror(errno);
99  connected = false;
100  }
101 
102  // read
103  size_t count = BufOutLen * sizeof(outT);
104  uint8_t* ptr = (uint8_t *)b_read;
105  auto endTime = std::chrono::steady_clock::now() + seconds(timeout);
106  while (connected && count) {
107  if (std::chrono::steady_clock::now() > endTime) {
108  log.trace() << "error = socket read timed out";
109  connected = false;
110  }
111 // log.trace() << "try to read " << count << " Bytes";
112  n = read(sockfd, ptr, count);
113  if (n < 0) {
114  log.trace() << "error = " << std::strerror(errno);
115  connected = false;
116  }
117  ptr += n;
118  count -= n;
119  }
120  std::array<outT, BufOutLen> &readValue = getNextReceiveBuffer();
121  for(int i = 0; i < BufOutLen; i++) readValue[i] = b_read[i];
122  newData = true;
123  flip();
124  next_cycle += seconds(period);
125  }
126  close(sockfd);
127  // if disconnected clear receive buffer
128  std::array<outT, BufOutLen> &readValue = getNextReceiveBuffer();
129  for(int i = 0; i < BufOutLen; i++) readValue[i] = 0;
130  newData = true;
131  flip();
132  }
133  }
134 
135  std::array<outT, BufOutLen>& getNextReceiveBuffer() {
136  auto p = read_ptr.load();
137  if (p == &read1) return read2;
138  else if (p == &read2) return read3;
139  else if (p == &read3) return read1;
140  }
141 
142  std::array<inT, BufInLen>& getNextSendBuffer() {
143  auto p = send_ptr.load();
144  if (p == &send1) return send2;
145  else if (p == &send2) return send3;
146  else if (p == &send3) return send1;
147  }
148 
149  void flip() {
150  read_ptr.store(&getNextReceiveBuffer());
151  send_ptr.store(&getNextSendBuffer());
152  }
153 
154  bool running;
155  std::string serverIP;
156  uint16_t port;
157  double period;
158  double timeout; // time which thread tries to read until socket read timed out
159  struct hostent *server;
160  int sockfd;
161 
162  std::array<outT, BufOutLen> read1, read2, read3;
163  std::array<inT, BufInLen> send1, send2, send3;
164  std::atomic< std::array<outT, BufOutLen>* > read_ptr;
165  std::atomic< std::array<inT, BufInLen>* > send_ptr;
166  };
167 
168  // specialization used when server doesn't receive data from its client
169  template < uint32_t BufInLen, typename inT >
170  class SocketClient<BufInLen, inT, 0, std::nullptr_t> : public eeros::Thread {
171  public:
172  SocketClient(std::string serverIP, uint16_t port, double period = 0.01) {
173  this->port = port;
174  this->period = period;
175  this->serverIP = serverIP;
176  signal(SIGPIPE, sigPipeHandler); // make sure, that a broken pipe does not stop application
177  send_ptr.store(&send1);
178  running = false;
179  }
180 
181  virtual ~SocketClient() {
182  join();
183  }
184 
185  virtual void stop() {
186  running = false;
187  }
188 
189  virtual bool isRunning() {
190  return running;
191  }
192 
193  virtual void setSendBuffer(std::array<inT, BufInLen>& data) {
194  auto p = send_ptr.load();
195  if (p == &send1) send1 = data;
196  else if (p == &send2) send2 = data;
197  else if (p == &send3) send3 = data;
198  }
199 
200  private:
201  virtual void run() {
202  log.info() << "SocketClient thread started";
203  running = true;
204  while (running) {
205  struct sockaddr_in servAddr;
206  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
207  if (sockfd < 0) throw Fault("ERROR opening socket");
208 
209  auto server = gethostbyname(serverIP.c_str());
210  if (server == NULL) {
211  throw Fault("Server ip not found");
212  }
213  bzero((char *) &servAddr, sizeof(servAddr));
214  servAddr.sin_family = AF_INET;
215  bcopy((char *)server->h_addr,(char *)&servAddr.sin_addr.s_addr, server->h_length);
216  servAddr.sin_port = htons(port);
217 
218  while (connect(sockfd, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) ;
219  log.info() << "Client connected to ip=" << serverIP;
220  inT b_write[BufInLen];
221  using seconds = std::chrono::duration<double, std::chrono::seconds::period>;
222  auto next_cycle = std::chrono::steady_clock::now() + seconds(period);
223  bool connected = true;
224 
225  while (connected) {
226  std::this_thread::sleep_until(next_cycle);
227 
228  // write
229  std::array<inT, BufInLen> &sendValue = getNextSendBuffer();
230  for(int i = 0; i < BufInLen; i++) b_write[i] = sendValue[i];
231 // log.trace() << "try to write " << b_write[0];
232  int n = write(sockfd, b_write, BufInLen * sizeof(inT));
233  if (n < 0) {
234  log.trace() << "error = " << std::strerror(errno);
235  connected = false;
236  }
237 
238  flip();
239  next_cycle += seconds(period);
240  }
241  close(sockfd);
242  }
243  }
244 
245  std::array<inT, BufInLen>& getNextSendBuffer() {
246  auto p = send_ptr.load();
247  if (p == &send1) return send2;
248  else if (p == &send2) return send3;
249  else if (p == &send3) return send1;
250  }
251 
252  void flip(){
253  send_ptr.store(&getNextSendBuffer());
254  }
255 
256  bool running;
257  std::string serverIP;
258  uint16_t port;
259  double period;
260  struct hostent *server;
261  int sockfd;
262 
263  std::array<inT, BufInLen> send1, send2, send3;
264  std::atomic< std::array<inT, BufInLen>* > send_ptr;
265  };
266 
267  };
268 };
269 
270 #endif // ORG_EEROS_SOCKET_CLIENT_HPP_
virtual void setSendBuffer(std::array< inT, BufInLen > &data)
Definition: SocketClient.hpp:51
bool newData
Definition: SocketClient.hpp:59
SocketClient(std::string serverIP, uint16_t port, double period=0.01)
Definition: SocketClient.hpp:172
virtual bool isRunning()
Definition: SocketClient.hpp:189
virtual std::array< outT, BufOutLen > & getReceiveBuffer()
Definition: SocketClient.hpp:47
Definition: Config.hpp:14
virtual void setSendBuffer(std::array< inT, BufInLen > &data)
Definition: SocketClient.hpp:193
virtual void stop()
Definition: SocketClient.hpp:185
virtual ~SocketClient()
Definition: SocketClient.hpp:35
Definition: Thread.hpp:12
Definition: Fault.hpp:9
LogEntry info()
Definition: Logger.hpp:27
volatile bool running
Definition: Executor.cpp:25
virtual void stop()
Definition: SocketClient.hpp:39
virtual void join()
Definition: Thread.cpp:29
eeros::logger::Logger log
Definition: Thread.hpp:26
SocketClient(std::string serverIP, uint16_t port, double period=0.01, double timeout=1.0)
Definition: SocketClient.hpp:24
Definition: SocketClient.hpp:22
virtual ~SocketClient()
Definition: SocketClient.hpp:181
virtual bool isRunning()
Definition: SocketClient.hpp:43
void sigPipeHandler(int signum)
Definition: SocketServer.cpp:6
LogEntry trace()
Definition: Logger.hpp:28
outT readbuffer
Definition: SocketClient.hpp:58