20 #include <boost/shared_ptr.hpp>
21 #include <boost/thread.hpp>
33 #if !(defined(CLOCK_GETTIME_FOUND) && (POSIX_TIMERS > 0 || _POSIX_TIMERS > 0))
37 #define WIN32_LEAN_AND_MEAN
39 #include <sys/timeb.h>
40 inline void usleep(
unsigned long microseconds) {
41 Sleep((microseconds+999)/1000);
48 LARGE_INTEGER count, freq;
49 QueryPerformanceCounter(&count);
50 QueryPerformanceFrequency(&freq);
51 return (uint64_t)((count.QuadPart * 1000) / freq.QuadPart);
56 LARGE_INTEGER count, freq;
57 QueryPerformanceCounter(&count);
58 QueryPerformanceFrequency(&freq);
59 return (count.QuadPart * 1000000) / freq.QuadPart;
64 LARGE_INTEGER count, freq;
65 QueryPerformanceCounter(&count);
66 QueryPerformanceFrequency(&freq);
67 return (count.QuadPart * 1000000000) / freq.QuadPart;
78 #if defined(CLOCK_GETTIME_FOUND) && (POSIX_TIMERS > 0 || _POSIX_TIMERS > 0)
79 struct timespec start;
80 clock_gettime(CLOCK_REALTIME, &start);
84 struct timeval timeofday;
85 gettimeofday(&timeofday,NULL);
86 sec = timeofday.tv_sec;
87 nsec = timeofday.tv_usec * 1000;
93 struct timeval timeofday;
94 gettimeofday(&timeofday,NULL);
95 return (uint64_t)timeofday.tv_sec*1000+(uint64_t)timeofday.tv_usec/1000;
102 return (uint64_t)sec*1000000000 + (uint64_t)nsec;
109 return (uint64_t)sec*1000000 + (uint64_t)nsec/1000;
116 return (uint64_t)sec*1000 + (uint64_t)nsec/1000000;
121 #if defined(CLOCK_GETTIME_FOUND) && (POSIX_TIMERS > 0 || _POSIX_TIMERS > 0) && defined(_POSIX_MONOTONIC_CLOCK)
122 struct timespec start;
124 clock_gettime(CLOCK_MONOTONIC, &start);
126 nsec = start.tv_nsec;
127 return (uint64_t)sec*1000000000 + (uint64_t)nsec;
135 #define MUJIN_LOG_INFO(msg) std::cout << msg << std::endl;
136 #define MUJIN_LOG_ERROR(msg) std::cerr << msg << std::endl;
141 #include "mujincontrollerclient/config.h"
145 class MUJINCLIENT_API ZmqSubscriber
148 ZmqSubscriber(
const std::string& host,
const unsigned int port)
154 virtual ~ZmqSubscriber()
160 void _InitializeSocket(boost::shared_ptr<zmq::context_t> context)
164 _sharedcontext =
true;
167 _sharedcontext =
false;
169 _socket.reset(
new zmq::socket_t ((*_context.get()), ZMQ_SUB));
171 std::ostringstream port_stream;
172 port_stream << _port;
173 _socket->setsockopt(ZMQ_SUBSCRIBE,
"", 0);
175 _socket->setsockopt(ZMQ_SNDHWM, &val,
sizeof(val));
176 _socket->connect ((
"tcp://" + _host +
":" + port_stream.str()).c_str());
179 void _DestroySocket()
185 if (!!_context && !_sharedcontext) {
191 boost::shared_ptr<zmq::context_t> _context;
192 boost::shared_ptr<zmq::socket_t> _socket;
200 class MUJINCLIENT_API ZmqPublisher
213 bool Publish(
const std::string& messagestr)
216 memcpy(message.data(), messagestr.data(), messagestr.size());
217 return _socket->send(message);
221 void _InitializeSocket(boost::shared_ptr<zmq::context_t> context)
225 _sharedcontext =
true;
228 _sharedcontext =
false;
232 _socket->setsockopt(ZMQ_SNDHWM, &val,
sizeof(val));
233 std::ostringstream port_stream;
234 port_stream << _port;
235 _socket->bind ((
"tcp://*:" + port_stream.str()).c_str());
238 void _DestroySocket()
244 if (!!_context && !_sharedcontext) {
250 boost::shared_ptr<zmq::context_t> _context;
251 boost::shared_ptr<zmq::socket_t> _socket;
258 class MUJINCLIENT_API ZmqClient
261 ZmqClient(
const std::string& host,
const unsigned int port)
272 std::string Call(
const std::string& msg,
const double timeout=5.0)
278 memcpy ((
void *) request.data (), msg.c_str(), msg.size());
281 bool recreatedonce =
false;
284 _socket->send(request);
287 if (e.
num() == EAGAIN) {
289 boost::this_thread::sleep(boost::posix_time::milliseconds(100));
292 std::stringstream errss;
293 errss <<
"failed to send msg: ";
294 if (msg.length() > 1000) {
295 errss << msg.substr(0, 1000) <<
"...";
301 if (!recreatedonce) {
307 _InitializeSocket(_context);
308 recreatedonce =
true;
310 std::stringstream ss;
311 ss <<
"failed to send request after re-creating socket";
312 throw std::runtime_error(ss.str());;
317 std::stringstream ss;
318 ss <<
"timed out trying to send request";
321 throw std::runtime_error(ss.str());
326 recreatedonce =
false;
333 pollitem.socket = _socket->operator
void*();
334 pollitem.events = ZMQ_POLLIN;
339 timeoutms = timeout * 1000.0;
343 if (pollitem.revents & ZMQ_POLLIN) {
344 _socket->recv(&reply);
345 std::string replystring((
char *) reply.
data (), (size_t) reply.
size());
349 std::stringstream ss;
350 if (msg.length() > 1000) {
351 ss <<
"Timed out receiving response of command " << msg.substr(0, 1000) <<
"... after " << timeout <<
" seconds";
353 ss <<
"Timed out receiving response of command " << msg <<
" after " << timeout <<
" seconds";
355 std::string errstr = ss.str();
356 boost::replace_all(errstr,
"\"",
"");
357 boost::replace_all(errstr,
"\\",
"");
358 throw std::runtime_error(errstr);
362 if (e.
num() == EAGAIN) {
364 boost::this_thread::sleep(boost::posix_time::milliseconds(100));
370 if (!recreatedonce) {
376 _InitializeSocket(_context);
377 recreatedonce =
true;
379 std::stringstream ss;
380 ss <<
"failed to receive response after re-creating socket";
381 throw std::runtime_error(ss.str());;
386 std::stringstream ss;
387 ss <<
"timed out trying to receive request";
390 throw std::runtime_error(ss.str());
395 void _InitializeSocket(boost::shared_ptr<zmq::context_t> context)
399 _sharedcontext =
true;
402 _sharedcontext =
false;
405 std::ostringstream port_stream;
406 port_stream << _port;
407 std::stringstream ss;
408 ss <<
"connecting to socket at " << _host <<
":" << _port;
410 _socket->connect ((
"tcp://" + _host +
":" + port_stream.str()).c_str());
413 void _DestroySocket()
419 if (!!_context && !_sharedcontext) {
427 boost::shared_ptr<zmq::context_t> _context;
428 boost::shared_ptr<zmq::socket_t> _socket;
434 class MUJINCLIENT_API ZmqServer
437 ZmqServer(
const unsigned int port) : _sharedcontext(false), _port(port) {
444 virtual unsigned int Recv(std::string& data,
long timeout=0)
449 if ((_pollitem.revents & ZMQ_POLLIN) == 0)
456 _socket->recv(&_reply, ZMQ_NOBLOCK);
457 data.resize(_reply.size());
458 std::copy((uint8_t*)_reply.data(), (uint8_t*)_reply.data() + _reply.size(), data.begin());
459 return _reply.size();
462 virtual void Send(
const std::string& message)
465 memcpy((
void *)request.data(), message.c_str(), message.size());
466 _socket->send(request);
471 virtual void _InitializeSocket(boost::shared_ptr<zmq::context_t> context)
475 _sharedcontext =
true;
478 _sharedcontext =
false;
484 memset(&_pollitem, 0,
sizeof(_pollitem));
485 _pollitem.socket = _socket->operator
void*();
486 _pollitem.events = ZMQ_POLLIN;
488 std::ostringstream endpoint;
489 endpoint <<
"tcp://*:" << _port;
490 _socket->bind(endpoint.str().c_str());
491 std::stringstream ss;
492 ss <<
"binded to " << endpoint;
496 virtual void _DestroySocket()
502 if (!!_context && !_sharedcontext) {
506 memset(&_pollitem, 0,
sizeof(_pollitem));
511 boost::shared_ptr<zmq::context_t> _context;
512 boost::shared_ptr<zmq::socket_t> _socket;
519 #endif // MUJIN_ZMQ_H