controllerclientcpp  0.6.1
 全て クラス ネームスペース ファイル 関数 変数 型定義 列挙型 列挙型の値 マクロ定義 ページ
mujinzmq.hpp
説明を見る。
1 // -*- coding: utf-8 -*-
2 // Copyright (C) 2012-2015 MUJIN Inc.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
17 #ifndef MUJIN_ZMQ_H
18 #define MUJIN_ZMQ_H
19 
20 #include <boost/shared_ptr.hpp>
21 #include <boost/thread.hpp>
22 #include <sstream>
23 #include <iostream>
24 #include <exception>
25 
27 
28 #ifndef MUJIN_TIME
29 #define MUJIN_TIME
30 #include <time.h>
31 
32 #ifndef _WIN32
33 #if !(defined(CLOCK_GETTIME_FOUND) && (POSIX_TIMERS > 0 || _POSIX_TIMERS > 0))
34 #include <sys/time.h>
35 #endif
36 #else
37 #define WIN32_LEAN_AND_MEAN
38 #include <windows.h>
39 #include <sys/timeb.h> // ftime(), struct timeb
40 inline void usleep(unsigned long microseconds) {
41  Sleep((microseconds+999)/1000);
42 }
43 #endif
44 
45 #ifdef _WIN32
46 inline uint64_t GetMilliTime()
47 {
48  LARGE_INTEGER count, freq;
49  QueryPerformanceCounter(&count);
50  QueryPerformanceFrequency(&freq);
51  return (uint64_t)((count.QuadPart * 1000) / freq.QuadPart);
52 }
53 
54 inline uint64_t GetMicroTime()
55 {
56  LARGE_INTEGER count, freq;
57  QueryPerformanceCounter(&count);
58  QueryPerformanceFrequency(&freq);
59  return (count.QuadPart * 1000000) / freq.QuadPart;
60 }
61 
62 inline uint64_t GetNanoTime()
63 {
64  LARGE_INTEGER count, freq;
65  QueryPerformanceCounter(&count);
66  QueryPerformanceFrequency(&freq);
67  return (count.QuadPart * 1000000000) / freq.QuadPart;
68 }
69 
70 inline static uint64_t GetNanoPerformanceTime() {
71  return GetNanoTime();
72 }
73 
74 #else
75 
76 inline void GetWallTime(uint32_t& sec, uint32_t& nsec)
77 {
78 #if defined(CLOCK_GETTIME_FOUND) && (POSIX_TIMERS > 0 || _POSIX_TIMERS > 0)
79  struct timespec start;
80  clock_gettime(CLOCK_REALTIME, &start);
81  sec = start.tv_sec;
82  nsec = start.tv_nsec;
83 #else
84  struct timeval timeofday;
85  gettimeofday(&timeofday,NULL);
86  sec = timeofday.tv_sec;
87  nsec = timeofday.tv_usec * 1000;
88 #endif
89 }
90 
91 inline uint64_t GetMilliTimeOfDay()
92 {
93  struct timeval timeofday;
94  gettimeofday(&timeofday,NULL);
95  return (uint64_t)timeofday.tv_sec*1000+(uint64_t)timeofday.tv_usec/1000;
96 }
97 
98 inline uint64_t GetNanoTime()
99 {
100  uint32_t sec,nsec;
101  GetWallTime(sec,nsec);
102  return (uint64_t)sec*1000000000 + (uint64_t)nsec;
103 }
104 
105 inline uint64_t GetMicroTime()
106 {
107  uint32_t sec,nsec;
108  GetWallTime(sec,nsec);
109  return (uint64_t)sec*1000000 + (uint64_t)nsec/1000;
110 }
111 
112 inline uint64_t GetMilliTime()
113 {
114  uint32_t sec,nsec;
115  GetWallTime(sec,nsec);
116  return (uint64_t)sec*1000 + (uint64_t)nsec/1000000;
117 }
118 
119 inline static uint64_t GetNanoPerformanceTime()
120 {
121 #if defined(CLOCK_GETTIME_FOUND) && (POSIX_TIMERS > 0 || _POSIX_TIMERS > 0) && defined(_POSIX_MONOTONIC_CLOCK)
122  struct timespec start;
123  uint32_t sec, nsec;
124  clock_gettime(CLOCK_MONOTONIC, &start);
125  sec = start.tv_sec;
126  nsec = start.tv_nsec;
127  return (uint64_t)sec*1000000000 + (uint64_t)nsec;
128 #else
129  return GetNanoTime();
130 #endif
131 }
132 #endif
133 #endif
134 
135 #define MUJIN_LOG_INFO(msg) std::cout << msg << std::endl;
136 #define MUJIN_LOG_ERROR(msg) std::cerr << msg << std::endl;
137 
138 namespace mujinzmq
139 {
140 
141 #include "mujincontrollerclient/config.h"
142 
145 class MUJINCLIENT_API ZmqSubscriber
146 {
147 public:
148  ZmqSubscriber(const std::string& host, const unsigned int port)
149  {
150  _host = host;
151  _port = port;
152  }
153 
154  virtual ~ZmqSubscriber()
155  {
156  _DestroySocket();
157  }
158 
159 protected:
160  void _InitializeSocket(boost::shared_ptr<zmq::context_t> context)
161  {
162  if (!!context) {
163  _context = context;
164  _sharedcontext = true;
165  } else {
166  _context.reset(new zmq::context_t(1));
167  _sharedcontext = false;
168  }
169  _socket.reset(new zmq::socket_t ((*_context.get()), ZMQ_SUB));
170 
171  std::ostringstream port_stream;
172  port_stream << _port;
173  _socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
174  int val = 2;
175  _socket->setsockopt(ZMQ_SNDHWM, &val, sizeof(val));
176  _socket->connect (("tcp://" + _host + ":" + port_stream.str()).c_str());
177  }
178 
179  void _DestroySocket()
180  {
181  if (!!_socket) {
182  _socket->close();
183  _socket.reset();
184  }
185  if (!!_context && !_sharedcontext) {
186  _context->close();
187  _context.reset();
188  }
189  }
190 
191  boost::shared_ptr<zmq::context_t> _context;
192  boost::shared_ptr<zmq::socket_t> _socket;
193  std::string _host;
194  unsigned int _port;
195  bool _sharedcontext;
196 };
197 
200 class MUJINCLIENT_API ZmqPublisher
201 {
202 public:
203  ZmqPublisher(const unsigned int port)
204  {
205  _port = port;
206  }
207 
208  virtual ~ZmqPublisher()
209  {
210  _DestroySocket();
211  }
212 
213  bool Publish(const std::string& messagestr)
214  {
215  zmq::message_t message(messagestr.size());
216  memcpy(message.data(), messagestr.data(), messagestr.size());
217  return _socket->send(message);
218  }
219 
220 protected:
221  void _InitializeSocket(boost::shared_ptr<zmq::context_t> context)
222  {
223  if (!!context) {
224  _context = context;
225  _sharedcontext = true;
226  } else {
227  _context.reset(new zmq::context_t(1));
228  _sharedcontext = false;
229  }
230  _socket.reset(new zmq::socket_t ((*(zmq::context_t*)_context.get()), ZMQ_PUB));
231  int val = 2;
232  _socket->setsockopt(ZMQ_SNDHWM, &val, sizeof(val));
233  std::ostringstream port_stream;
234  port_stream << _port;
235  _socket->bind (("tcp://*:" + port_stream.str()).c_str());
236  }
237 
238  void _DestroySocket()
239  {
240  if (!!_socket) {
241  _socket->close();
242  _socket.reset();
243  }
244  if (!!_context && !_sharedcontext) {
245  _context->close();
246  _context.reset();
247  }
248  }
249 
250  boost::shared_ptr<zmq::context_t> _context;
251  boost::shared_ptr<zmq::socket_t> _socket;
252  unsigned int _port;
253  bool _sharedcontext;
254 };
255 
258 class MUJINCLIENT_API ZmqClient
259 {
260 public:
261  ZmqClient(const std::string& host, const unsigned int port)
262  {
263  _host = host;
264  _port = port;
265  }
266 
267  virtual ~ZmqClient()
268  {
269  _DestroySocket();
270  }
271 
272  std::string Call(const std::string& msg, const double timeout=5.0/*secs*/)
273  {
274  //send
275  zmq::message_t request (msg.size());
276  // std::cout << msg.size() << std::endl;
277  // std::cout << msg << std::endl;
278  memcpy ((void *) request.data (), msg.c_str(), msg.size());
279 
280  uint64_t starttime = GetMilliTime();
281  bool recreatedonce = false;
282  while (GetMilliTime() - starttime < timeout*1000.0) {
283  try {
284  _socket->send(request);
285  break;
286  } catch (const zmq::error_t& e) {
287  if (e.num() == EAGAIN) {
288  MUJIN_LOG_ERROR("failed to send request, try again");
289  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
290  continue;
291  } else {
292  std::stringstream errss;
293  errss << "failed to send msg: ";
294  if (msg.length() > 1000) {
295  errss << msg.substr(0, 1000) << "...";
296  } else {
297  errss << msg;
298  }
299  MUJIN_LOG_ERROR(errss.str());
300  }
301  if (!recreatedonce) {
302  MUJIN_LOG_INFO("re-creating zmq socket and trying again");
303  if (!!_socket) {
304  _socket->close();
305  _socket.reset();
306  }
307  _InitializeSocket(_context);
308  recreatedonce = true;
309  } else{
310  std::stringstream ss;
311  ss << "failed to send request after re-creating socket";
312  throw std::runtime_error(ss.str());;
313  }
314  }
315  }
316  if (GetMilliTime() - starttime > timeout*1000.0) {
317  std::stringstream ss;
318  ss << "timed out trying to send request";
319  MUJIN_LOG_ERROR(ss.str());
320  MUJIN_LOG_INFO(msg);
321  throw std::runtime_error(ss.str());
322  }
323 
324  //recv
325  starttime = GetMilliTime();
326  recreatedonce = false;
327  zmq::message_t reply;
328  while (GetMilliTime() - starttime < timeout * 1000.0) {
329  try {
330 
331  zmq::pollitem_t pollitem;
332  memset(&pollitem, 0, sizeof(zmq::pollitem_t));
333  pollitem.socket = _socket->operator void*();
334  pollitem.events = ZMQ_POLLIN;
335 
336  // if timeout param is 0, caller means infinite
337  long timeoutms = -1;
338  if (timeout > 0) {
339  timeoutms = timeout * 1000.0;
340  }
341 
342  zmq::poll(&pollitem, 1, timeoutms);
343  if (pollitem.revents & ZMQ_POLLIN) {
344  _socket->recv(&reply);
345  std::string replystring((char *) reply.data (), (size_t) reply.size());
346  return replystring;
347  }
348  else{
349  std::stringstream ss;
350  if (msg.length() > 1000) {
351  ss << "Timed out receiving response of command " << msg.substr(0, 1000) << "... after " << timeout << " seconds";
352  } else {
353  ss << "Timed out receiving response of command " << msg << " after " << timeout << " seconds";
354  }
355  std::string errstr = ss.str();
356  boost::replace_all(errstr, "\"", ""); // need to remove " in the message so that json parser works
357  boost::replace_all(errstr, "\\", ""); // need to remove \ in the message so that json parser works
358  throw std::runtime_error(errstr);
359  }
360 
361  } catch (const zmq::error_t& e) {
362  if (e.num() == EAGAIN) {
363  MUJIN_LOG_ERROR("failed to receive reply, zmq::EAGAIN");
364  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
365  continue;
366  } else {
367  MUJIN_LOG_INFO("failed to send");
368  MUJIN_LOG_INFO(msg);
369  }
370  if (!recreatedonce) {
371  MUJIN_LOG_INFO("re-creating zmq socket and trying again");
372  if (!!_socket) {
373  _socket->close();
374  _socket.reset();
375  }
376  _InitializeSocket(_context);
377  recreatedonce = true;
378  } else{
379  std::stringstream ss;
380  ss << "failed to receive response after re-creating socket";
381  throw std::runtime_error(ss.str());;
382  }
383  }
384  }
385  if (GetMilliTime() - starttime > timeout*1000.0) {
386  std::stringstream ss;
387  ss << "timed out trying to receive request";
388  MUJIN_LOG_ERROR(ss.str());
389  MUJIN_LOG_INFO(msg);
390  throw std::runtime_error(ss.str());
391  }
392  }
393 
394 protected:
395  void _InitializeSocket(boost::shared_ptr<zmq::context_t> context)
396  {
397  if (!!context) {
398  _context = context;
399  _sharedcontext = true;
400  } else {
401  _context.reset(new zmq::context_t(1));
402  _sharedcontext = false;
403  }
404  _socket.reset(new zmq::socket_t ((*(zmq::context_t*)_context.get()), ZMQ_REQ));
405  std::ostringstream port_stream;
406  port_stream << _port;
407  std::stringstream ss;
408  ss << "connecting to socket at " << _host << ":" << _port;
409  MUJIN_LOG_INFO(ss.str());
410  _socket->connect (("tcp://" + _host + ":" + port_stream.str()).c_str());
411  }
412 
413  void _DestroySocket()
414  {
415  if (!!_socket) {
416  _socket->close();
417  _socket.reset();
418  }
419  if (!!_context && !_sharedcontext) {
420  _context->close();
421  _context.reset();
422  }
423  }
424 
425  unsigned int _port;
426  std::string _host;
427  boost::shared_ptr<zmq::context_t> _context;
428  boost::shared_ptr<zmq::socket_t> _socket;
429  bool _sharedcontext;
430 };
431 
434 class MUJINCLIENT_API ZmqServer
435 {
436 public:
437  ZmqServer(const unsigned int port) : _sharedcontext(false), _port(port) {
438  }
439 
440  virtual ~ZmqServer() {
441  _DestroySocket();
442  }
443 
444  virtual unsigned int Recv(std::string& data, long timeout=0)
445  {
446  // wait timeout in millisecond for message
447  if (timeout > 0) {
448  zmq::poll(&_pollitem, 1, timeout);
449  if ((_pollitem.revents & ZMQ_POLLIN) == 0)
450  {
451  // did not receive anything
452  return 0;
453  }
454  }
455 
456  _socket->recv(&_reply, ZMQ_NOBLOCK);
457  data.resize(_reply.size());
458  std::copy((uint8_t*)_reply.data(), (uint8_t*)_reply.data() + _reply.size(), data.begin());
459  return _reply.size();
460  }
461 
462  virtual void Send(const std::string& message)
463  {
464  zmq::message_t request(message.size());
465  memcpy((void *)request.data(), message.c_str(), message.size());
466  _socket->send(request);
467  }
468 
469 protected:
470 
471  virtual void _InitializeSocket(boost::shared_ptr<zmq::context_t> context)
472  {
473  if (!!context) {
474  _context = context;
475  _sharedcontext = true;
476  } else {
477  _context.reset(new zmq::context_t(1));
478  _sharedcontext = false;
479  }
480 
481  _socket.reset(new zmq::socket_t((*(zmq::context_t*)_context.get()), ZMQ_REP));
482 
483  // setup the pollitem
484  memset(&_pollitem, 0, sizeof(_pollitem));
485  _pollitem.socket = _socket->operator void*();
486  _pollitem.events = ZMQ_POLLIN;
487 
488  std::ostringstream endpoint;
489  endpoint << "tcp://*:" << _port;
490  _socket->bind(endpoint.str().c_str());
491  std::stringstream ss;
492  ss << "binded to " << endpoint;
493  MUJIN_LOG_INFO(ss.str());
494  }
495 
496  virtual void _DestroySocket()
497  {
498  if (!!_socket) {
499  _socket->close();
500  _socket.reset();
501  }
502  if (!!_context && !_sharedcontext) {
503  _context->close();
504  _context.reset();
505  }
506  memset(&_pollitem, 0, sizeof(_pollitem));
507  }
508 
509  bool _sharedcontext;
510  unsigned int _port;
511  boost::shared_ptr<zmq::context_t> _context;
512  boost::shared_ptr<zmq::socket_t> _socket;
513  zmq::message_t _reply;
514  zmq::pollitem_t _pollitem;
515 
516 };
517 
518 } // namespace mujinzmq
519 #endif // MUJIN_ZMQ_H