1 #ifndef ORG_EEROS_SOCKET_SERVER_HPP_ 2 #define ORG_EEROS_SOCKET_SERVER_HPP_ 11 #include <sys/socket.h> 23 template < u
int32_t BufInLen,
typename inT, u
int32_t BufOutLen,
typename outT >
26 SocketServer(uint16_t port,
double period = 0.01,
double timeout = 1.0) : read1({0}), read2({0}), read3({0}) {
28 this->period = period;
29 this->timeout = timeout;
31 read_ptr.store(&read1);
32 send_ptr.store(&send1);
49 return *read_ptr.load();
53 auto p = send_ptr.load();
54 if (p == &send1) send1 = data;
55 else if (p == &send2) send2 = data;
56 else if (p == &send3) send3 = data;
64 log.
info() <<
"SocketServer thread started";
65 struct sockaddr_in servAddr;
66 sockfd = socket(AF_INET, SOCK_STREAM, 0);
67 if (sockfd < 0)
throw Fault(
"ERROR opening socket");
69 bzero((
char *) &servAddr,
sizeof(servAddr));
70 servAddr.sin_port = htons(port);
71 servAddr.sin_family = AF_INET;
72 servAddr.sin_addr.s_addr = htonl(INADDR_ANY) ;
74 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
sizeof(yes)) == -1)
75 throw Fault(
"ERROR on set socket option");
76 if (bind(sockfd, (
struct sockaddr *) &servAddr,
sizeof(servAddr)) < 0)
77 throw Fault(
"ERROR on socket binding");
81 struct sockaddr_in cliAddr;
82 clilen =
sizeof(cliAddr);
86 newsockfd = accept(sockfd, (
struct sockaddr *) &cliAddr, &clilen);
87 if (newsockfd < 0)
throw Fault(
"ERROR on socket accept");
88 bool connected =
true;
89 char cliName[INET6_ADDRSTRLEN];
90 getnameinfo((
struct sockaddr*)&cliAddr,
sizeof cliAddr, cliName,
sizeof(cliName), NULL, 0, NI_NUMERICHOST|NI_NUMERICSERV);
91 log.
info() <<
"Client connection from ip=" << cliName <<
" accepted";
92 inT b_write[BufInLen]; outT b_read[BufOutLen];
93 using seconds = std::chrono::duration<double, std::chrono::seconds::period>;
94 auto next_cycle = std::chrono::steady_clock::now() + seconds(period);
97 std::this_thread::sleep_until(next_cycle);
100 std::array<inT, BufInLen> &sendValue = getNextSendBuffer();
101 for(
int i = 0; i < BufInLen; i++) b_write[i] = sendValue[i];
103 int n = write(newsockfd, b_write, BufInLen *
sizeof(inT));
105 log.
trace() <<
"error = " << std::strerror(errno);
110 size_t count = BufOutLen *
sizeof(outT);
111 uint8_t* ptr = (uint8_t *)b_read;
112 auto endTime = std::chrono::steady_clock::now() + seconds(timeout);
113 while (connected && count) {
114 if (std::chrono::steady_clock::now() > endTime) {
115 log.
trace() <<
"error = socket read timed out";
119 n = read(newsockfd, ptr, count);
121 log.
trace() <<
"error = " << std::strerror(errno);
127 std::array<outT, BufOutLen> &readValue = getNextReceiveBuffer();
128 for(
int i = 0; i < BufOutLen; i++) readValue[i] = b_read[i];
131 next_cycle += seconds(period);
135 std::array<outT, BufOutLen> &readValue = getNextReceiveBuffer();
136 for(
int i = 0; i < BufOutLen; i++) readValue[i] = 0;
143 std::array<outT, BufOutLen>& getNextReceiveBuffer() {
144 auto p = read_ptr.load();
145 if (p == &read1)
return read2;
146 else if (p == &read2)
return read3;
147 else if (p == &read3)
return read1;
150 std::array<inT, BufInLen>& getNextSendBuffer() {
151 auto p = send_ptr.load();
152 if (p == &send1)
return send2;
153 else if (p == &send2)
return send3;
154 else if (p == &send3)
return send1;
158 read_ptr.store(&getNextReceiveBuffer());
159 send_ptr.store(&getNextSendBuffer());
166 struct hostent *server;
170 std::array<outT, BufOutLen> read1, read2, read3;
171 std::array<inT, BufInLen> send1, send2, send3;
172 std::atomic< std::array<outT, BufOutLen>* > read_ptr;
173 std::atomic< std::array<inT, BufInLen>* > send_ptr;
177 template < u
int32_t BufInLen,
typename inT >
182 this->period = period;
184 send_ptr.store(&send1);
201 auto p = send_ptr.load();
202 if (p == &send1) send1 = data;
203 else if (p == &send2) send2 = data;
204 else if (p == &send3) send3 = data;
209 log.
info() <<
"SocketServer thread started";
210 struct sockaddr_in servAddr;
211 sockfd = socket(AF_INET, SOCK_STREAM, 0);
212 if (sockfd < 0)
throw Fault(
"ERROR opening socket");
214 bzero((
char *) &servAddr,
sizeof(servAddr));
215 servAddr.sin_port = htons(port);
216 servAddr.sin_family = AF_INET;
217 servAddr.sin_addr.s_addr = htonl(INADDR_ANY) ;
219 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
sizeof(yes)) == -1)
220 throw Fault(
"ERROR on set socket option");
221 if (bind(sockfd, (
struct sockaddr *) &servAddr,
sizeof(servAddr)) < 0)
222 throw Fault(
"ERROR on socket binding");
226 struct sockaddr_in cliAddr;
227 clilen =
sizeof(cliAddr);
231 newsockfd = accept(sockfd, (
struct sockaddr *) &cliAddr, &clilen);
232 if (newsockfd < 0)
throw Fault(
"ERROR on accept");
233 bool connected =
true;
234 char cliName[INET6_ADDRSTRLEN];
235 getnameinfo((
struct sockaddr*)&cliAddr,
sizeof cliAddr, cliName,
sizeof(cliName), NULL, 0, NI_NUMERICHOST|NI_NUMERICSERV);
236 log.
info() <<
"Client connection from ip=" << cliName <<
" accepted";
237 inT b_write[BufInLen];
238 using seconds = std::chrono::duration<double, std::chrono::seconds::period>;
239 auto next_cycle = std::chrono::steady_clock::now() + seconds(period);
242 std::this_thread::sleep_until(next_cycle);
245 std::array<inT, BufInLen> &sendValue = getNextSendBuffer();
246 for(
int i = 0; i < BufInLen; i++) b_write[i] = sendValue[i];
248 int n = write(newsockfd, b_write, BufInLen *
sizeof(inT));
250 log.
trace() <<
"error = " << std::strerror(errno);
255 next_cycle += seconds(period);
262 std::array<inT, BufInLen>& getNextSendBuffer() {
263 auto p = send_ptr.load();
264 if (p == &send1)
return send2;
265 else if (p == &send2)
return send3;
266 else if (p == &send3)
return send1;
270 send_ptr.store(&getNextSendBuffer());
276 struct hostent *server;
280 std::array<inT, BufInLen> send1, send2, send3;
281 std::atomic< std::array<inT, BufInLen>* > send_ptr;
287 #endif // ORG_EEROS_SOCKET_SERVER_HPP_ virtual ~SocketServer()
Definition: SocketServer.hpp:188
SocketServer(uint16_t port, double period=0.01)
Definition: SocketServer.hpp:180
virtual void stop()
Definition: SocketServer.hpp:192
outT readbuffer
Definition: SocketServer.hpp:59
virtual ~SocketServer()
Definition: SocketServer.hpp:36
Definition: Config.hpp:14
Definition: Thread.hpp:12
LogEntry info()
Definition: Logger.hpp:27
virtual std::array< outT, BufOutLen > & getReceiveBuffer()
Definition: SocketServer.hpp:48
volatile bool running
Definition: Executor.cpp:25
virtual void join()
Definition: Thread.cpp:29
eeros::logger::Logger log
Definition: Thread.hpp:26
virtual bool isRunning()
Definition: SocketServer.hpp:196
virtual void setSendBuffer(std::array< inT, BufInLen > &data)
Definition: SocketServer.hpp:52
void sigPipeHandler(int signum)
Definition: SocketServer.cpp:6
virtual void stop()
Definition: SocketServer.hpp:40
LogEntry trace()
Definition: Logger.hpp:28
Definition: SocketServer.hpp:24
virtual bool isRunning()
Definition: SocketServer.hpp:44
bool newData
Definition: SocketServer.hpp:60
virtual void setSendBuffer(std::array< inT, BufInLen > &data)
Definition: SocketServer.hpp:200
SocketServer(uint16_t port, double period=0.01, double timeout=1.0)
Definition: SocketServer.hpp:26