controllerclientcpp  0.6.1
 全て クラス ネームスペース ファイル 関数 変数 型定義 列挙型 列挙型の値 マクロ定義 ページ
zmq.hpp
説明を見る。
1 /*
2  Copyright (c) 2009-2011 250bpm s.r.o.
3  Copyright (c) 2011 Botond Ballo
4  Copyright (c) 2007-2009 iMatix Corporation
5 
6  Permission is hereby granted, free of charge, to any person obtaining a copy
7  of this software and associated documentation files (the "Software"), to
8  deal in the Software without restriction, including without limitation the
9  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  sell copies of the Software, and to permit persons to whom the Software is
11  furnished to do so, subject to the following conditions:
12 
13  The above copyright notice and this permission notice shall be included in
14  all copies or substantial portions of the Software.
15 
16  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  IN THE SOFTWARE.
23 */
24 
25 #ifndef __ZMQ_HPP_INCLUDED__
26 #define __ZMQ_HPP_INCLUDED__
27 
28 #include <zmq.h>
29 
30 #include <algorithm>
31 #include <cassert>
32 #include <cstring>
33 #include <string>
34 #include <exception>
35 
36 // Detect whether the compiler supports C++11 rvalue references.
37 #if (defined(__GNUC__) && (__GNUC__ > 4 || \
38  (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && \
39  defined(__GXX_EXPERIMENTAL_CXX0X__))
40  #define ZMQ_HAS_RVALUE_REFS
41  #define ZMQ_DELETED_FUNCTION = delete
42 #elif defined(__clang__)
43  #if __has_feature(cxx_rvalue_references)
44  #define ZMQ_HAS_RVALUE_REFS
45  #endif
46 
47  #if __has_feature(cxx_deleted_functions)
48  #define ZMQ_DELETED_FUNCTION = delete
49  #else
50  #define ZMQ_DELETED_FUNCTION
51  #endif
52 #elif defined(_MSC_VER) && (_MSC_VER >= 1600)
53  #define ZMQ_HAS_RVALUE_REFS
54  #define ZMQ_DELETED_FUNCTION
55 #else
56  #define ZMQ_DELETED_FUNCTION
57 #endif
58 
59 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0)
60 #define ZMQ_NEW_MONITOR_EVENT_LAYOUT
61 #endif
62 
63 // In order to prevent unused variable warnings when building in non-debug
64 // mode use this macro to make assertions.
65 #ifndef NDEBUG
66 # define ZMQ_ASSERT(expression) assert(expression)
67 #else
68 # define ZMQ_ASSERT(expression) (void)(expression)
69 #endif
70 
71 namespace zmq
72 {
73 
74  typedef zmq_free_fn free_fn;
75  typedef zmq_pollitem_t pollitem_t;
76 
77  class error_t : public std::exception
78  {
79  public:
80 
81  error_t () : errnum (zmq_errno ()) {}
82 
83  virtual const char *what () const throw ()
84  {
85  return zmq_strerror (errnum);
86  }
87 
88  int num () const
89  {
90  return errnum;
91  }
92 
93  private:
94 
95  int errnum;
96  };
97 
98  inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1)
99  {
100  int rc = zmq_poll (items_, nitems_, timeout_);
101  if (rc < 0)
102  throw error_t ();
103  return rc;
104  }
105 
106  inline void proxy (void *frontend, void *backend, void *capture)
107  {
108  int rc = zmq_proxy (frontend, backend, capture);
109  if (rc != 0)
110  throw error_t ();
111  }
112 
113  inline void version (int *major_, int *minor_, int *patch_)
114  {
115  zmq_version (major_, minor_, patch_);
116  }
117 
118  class message_t
119  {
120  friend class socket_t;
121 
122  public:
123 
124  inline message_t ()
125  {
126  int rc = zmq_msg_init (&msg);
127  if (rc != 0)
128  throw error_t ();
129  }
130 
131  inline explicit message_t (size_t size_)
132  {
133  int rc = zmq_msg_init_size (&msg, size_);
134  if (rc != 0)
135  throw error_t ();
136  }
137 
138  inline message_t (void *data_, size_t size_, free_fn *ffn_,
139  void *hint_ = NULL)
140  {
141  int rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_);
142  if (rc != 0)
143  throw error_t ();
144  }
145 
146 #ifdef ZMQ_HAS_RVALUE_REFS
147  inline message_t (message_t &&rhs) : msg (rhs.msg)
148  {
149  int rc = zmq_msg_init (&rhs.msg);
150  if (rc != 0)
151  throw error_t ();
152  }
153 
154  inline message_t &operator = (message_t &&rhs)
155  {
156  std::swap (msg, rhs.msg);
157  return *this;
158  }
159 #endif
160 
161  inline ~message_t ()
162  {
163  int rc = zmq_msg_close (&msg);
164  ZMQ_ASSERT (rc == 0);
165  }
166 
167  inline void rebuild ()
168  {
169  int rc = zmq_msg_close (&msg);
170  if (rc != 0)
171  throw error_t ();
172  rc = zmq_msg_init (&msg);
173  if (rc != 0)
174  throw error_t ();
175  }
176 
177  inline void rebuild (size_t size_)
178  {
179  int rc = zmq_msg_close (&msg);
180  if (rc != 0)
181  throw error_t ();
182  rc = zmq_msg_init_size (&msg, size_);
183  if (rc != 0)
184  throw error_t ();
185  }
186 
187  inline void rebuild (void *data_, size_t size_, free_fn *ffn_,
188  void *hint_ = NULL)
189  {
190  int rc = zmq_msg_close (&msg);
191  if (rc != 0)
192  throw error_t ();
193  rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_);
194  if (rc != 0)
195  throw error_t ();
196  }
197 
198  inline void move (message_t *msg_)
199  {
200  int rc = zmq_msg_move (&msg, &(msg_->msg));
201  if (rc != 0)
202  throw error_t ();
203  }
204 
205  inline void copy (message_t *msg_)
206  {
207  int rc = zmq_msg_copy (&msg, &(msg_->msg));
208  if (rc != 0)
209  throw error_t ();
210  }
211 
212  inline bool more ()
213  {
214  int rc = zmq_msg_more (&msg);
215  return rc != 0;
216  }
217 
218  inline void *data ()
219  {
220  return zmq_msg_data (&msg);
221  }
222 
223  inline const void* data () const
224  {
225  return zmq_msg_data (const_cast<zmq_msg_t*>(&msg));
226  }
227 
228  inline size_t size () const
229  {
230  return zmq_msg_size (const_cast<zmq_msg_t*>(&msg));
231  }
232 
233  // enabled
234  //message_t (const message_t& copy_me)
235  //{
236  // msg = copy_me.msg;
237  //}
238  //void operator = (const message_t&)
239  //{
240  //}
241 
242  private:
243 
244  // The underlying message
245  zmq_msg_t msg;
246 
247  // Disable implicit message copying, so that users won't use shared
248  // messages (less efficient) without being aware of the fact.
249  //message_t (const message_t&);
250  //void operator = (const message_t&);
251  };
252 
253  class context_t
254  {
255  friend class socket_t;
256 
257  public:
258  inline context_t ()
259  {
260  ptr = zmq_ctx_new ();
261  if (ptr == NULL)
262  throw error_t ();
263  }
264 
265 
266  inline explicit context_t (int io_threads_)
267  {
268  ptr = zmq_ctx_new ();
269  if (ptr == NULL)
270  throw error_t ();
271 
272  int rc = zmq_ctx_set (ptr, ZMQ_IO_THREADS, io_threads_);
273  ZMQ_ASSERT (rc == 0);
274  }
275 
276 #ifdef ZMQ_HAS_RVALUE_REFS
277  inline context_t (context_t &&rhs) : ptr (rhs.ptr)
278  {
279  rhs.ptr = NULL;
280  }
281  inline context_t &operator = (context_t &&rhs)
282  {
283  std::swap (ptr, rhs.ptr);
284  return *this;
285  }
286 #endif
287 
288  inline ~context_t ()
289  {
290  close();
291  }
292 
293  inline void close()
294  {
295  if (ptr == NULL)
296  return;
297  int rc = zmq_ctx_destroy (ptr);
298  ZMQ_ASSERT (rc == 0);
299  ptr = NULL;
300  }
301 
302  // Be careful with this, it's probably only useful for
303  // using the C api together with an existing C++ api.
304  // Normally you should never need to use this.
305  inline operator void* ()
306  {
307  return ptr;
308  }
309 
310  private:
311 
312  void *ptr;
313 
314  context_t (const context_t&);
315  void operator = (const context_t&);
316  };
317 
318  class socket_t
319  {
320  friend class monitor_t;
321  public:
322 
323  inline socket_t (context_t &context_, int type_)
324  {
325  ctxptr = context_.ptr;
326  ptr = zmq_socket (context_.ptr, type_);
327  if (ptr == NULL)
328  throw error_t ();
329  }
330 
331 #ifdef ZMQ_HAS_RVALUE_REFS
332  inline socket_t(socket_t&& rhs) : ptr(rhs.ptr)
333  {
334  rhs.ptr = NULL;
335  }
336  inline socket_t& operator=(socket_t&& rhs)
337  {
338  std::swap(ptr, rhs.ptr);
339  return *this;
340  }
341 #endif
342 
343  inline ~socket_t ()
344  {
345  close();
346  }
347 
348  inline operator void* ()
349  {
350  return ptr;
351  }
352 
353  inline void close()
354  {
355  if(ptr == NULL)
356  // already closed
357  return ;
358  int rc = zmq_close (ptr);
359  ZMQ_ASSERT (rc == 0);
360  ptr = 0 ;
361  }
362 
363  inline void setsockopt (int option_, const void *optval_,
364  size_t optvallen_)
365  {
366  int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
367  if (rc != 0)
368  throw error_t ();
369  }
370 
371  inline void getsockopt (int option_, void *optval_,
372  size_t *optvallen_)
373  {
374  int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
375  if (rc != 0)
376  throw error_t ();
377  }
378 
379  inline void bind (const char *addr_)
380  {
381  int rc = zmq_bind (ptr, addr_);
382  if (rc != 0)
383  throw error_t ();
384  }
385 
386  inline void unbind (const char *addr_)
387  {
388  int rc = zmq_unbind (ptr, addr_);
389  if (rc != 0)
390  throw error_t ();
391  }
392 
393  inline void connect (const char *addr_)
394  {
395  int rc = zmq_connect (ptr, addr_);
396  if (rc != 0)
397  throw error_t ();
398  }
399 
400  inline void disconnect (const char *addr_)
401  {
402  int rc = zmq_disconnect (ptr, addr_);
403  if (rc != 0)
404  throw error_t ();
405  }
406 
407  inline bool connected()
408  {
409  return(ptr != NULL);
410  }
411 
412  inline size_t send (const void *buf_, size_t len_, int flags_ = 0)
413  {
414  int nbytes = zmq_send (ptr, buf_, len_, flags_);
415  if (nbytes >= 0)
416  return (size_t) nbytes;
417  if (zmq_errno () == EAGAIN)
418  return 0;
419  throw error_t ();
420  }
421 
422  inline bool send (message_t &msg_, int flags_ = 0)
423  {
424  int nbytes = zmq_msg_send (&(msg_.msg), ptr, flags_);
425  if (nbytes >= 0)
426  return true;
427  if (zmq_errno () == EAGAIN)
428  return false;
429  throw error_t ();
430  }
431 
432  inline size_t recv (void *buf_, size_t len_, int flags_ = 0)
433  {
434  int nbytes = zmq_recv (ptr, buf_, len_, flags_);
435  if (nbytes >= 0)
436  return (size_t) nbytes;
437  if (zmq_errno () == EAGAIN)
438  return 0;
439  throw error_t ();
440  }
441 
442  inline bool recv (message_t *msg_, int flags_ = 0)
443  {
444  int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_);
445  if (nbytes >= 0)
446  return true;
447  if (zmq_errno () == EAGAIN)
448  return false;
449  throw error_t ();
450  }
451 
452  private:
453  void *ptr;
454  void *ctxptr;
455 
457  void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;
458  };
459 
460  class monitor_t
461  {
462  public:
463  monitor_t() : socketPtr(NULL) {}
464  virtual ~monitor_t() {}
465 
466  void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
467  {
468  int rc = zmq_socket_monitor(socket.ptr, addr_, events);
469  if (rc != 0)
470  throw error_t ();
471 
472  socketPtr = socket.ptr;
473  void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR);
474  assert (s);
475 
476  rc = zmq_connect (s, addr_);
477  assert (rc == 0);
478 
479  on_monitor_started();
480 
481  while (true) {
482  zmq_msg_t eventMsg;
483  zmq_msg_init (&eventMsg);
484  rc = zmq_recvmsg (s, &eventMsg, 0);
485  if (rc == -1 && zmq_errno() == ETERM)
486  break;
487  assert (rc != -1);
488  zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data (&eventMsg));
489 
490 #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
491  zmq_msg_t addrMsg;
492  zmq_msg_init (&addrMsg);
493  rc = zmq_recvmsg (s, &addrMsg, 0);
494  if (rc == -1 && zmq_errno() == ETERM)
495  break;
496  assert (rc != -1);
497  const char* str = static_cast<const char*>(zmq_msg_data (&addrMsg));
498  std::string address(str, str + zmq_msg_size(&addrMsg));
499  zmq_msg_close (&addrMsg);
500 #else
501  // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
502  std::string address = event->data.connected.addr;
503 #endif
504 
505 #ifdef ZMQ_EVENT_MONITOR_STOPPED
506  if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
507  break;
508 #endif
509 
510  switch (event->event) {
511  case ZMQ_EVENT_CONNECTED:
512  on_event_connected(*event, address.c_str());
513  break;
514  case ZMQ_EVENT_CONNECT_DELAYED:
515  on_event_connect_delayed(*event, address.c_str());
516  break;
517  case ZMQ_EVENT_CONNECT_RETRIED:
518  on_event_connect_retried(*event, address.c_str());
519  break;
520  case ZMQ_EVENT_LISTENING:
521  on_event_listening(*event, address.c_str());
522  break;
523  case ZMQ_EVENT_BIND_FAILED:
524  on_event_bind_failed(*event, address.c_str());
525  break;
526  case ZMQ_EVENT_ACCEPTED:
527  on_event_accepted(*event, address.c_str());
528  break;
529  case ZMQ_EVENT_ACCEPT_FAILED:
530  on_event_accept_failed(*event, address.c_str());
531  break;
532  case ZMQ_EVENT_CLOSED:
533  on_event_closed(*event, address.c_str());
534  break;
535  case ZMQ_EVENT_CLOSE_FAILED:
536  on_event_close_failed(*event, address.c_str());
537  break;
538  case ZMQ_EVENT_DISCONNECTED:
539  on_event_disconnected(*event, address.c_str());
540  break;
541  default:
542  on_event_unknown(*event, address.c_str());
543  break;
544  }
545  zmq_msg_close (&eventMsg);
546  }
547  zmq_close (s);
548  socketPtr = NULL;
549  }
550 
551 #ifdef ZMQ_EVENT_MONITOR_STOPPED
552  void abort()
553  {
554  if (socketPtr)
555  zmq_socket_monitor(socketPtr, NULL, 0);
556  }
557 #endif
558  virtual void on_monitor_started() {}
559  virtual void on_event_connected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
560  virtual void on_event_connect_delayed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
561  virtual void on_event_connect_retried(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
562  virtual void on_event_listening(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
563  virtual void on_event_bind_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
564  virtual void on_event_accepted(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
565  virtual void on_event_accept_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
566  virtual void on_event_closed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
567  virtual void on_event_close_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
568  virtual void on_event_disconnected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
569  virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
570  private:
571  void* socketPtr;
572  };
573 }
574 
575 #endif