cpp_redis  4.0.0
cpp_redis is a C++11 Asynchronous Multi-Platform Lightweight Redis Client, with support for synchronous operations and pipelining.
subscriber.hpp
Go to the documentation of this file.
1 // The MIT License (MIT)
2 //
3 // Copyright (c) 2015-2017 Simon Ninon <simon.ninon@gmail.com>
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in all
13 // copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 // SOFTWARE.
22 
23 #pragma once
24 
25 #include <functional>
26 #include <map>
27 #include <mutex>
28 #include <string>
29 
33 
34 namespace cpp_redis {
35 
36 class subscriber {
37 public:
38 #ifndef __CPP_REDIS_USE_CUSTOM_TCP_CLIENT
39  subscriber(void);
41 #endif /* __CPP_REDIS_USE_CUSTOM_TCP_CLIENT */
42 
48  explicit subscriber(const std::shared_ptr<network::tcp_client_iface>& tcp_client);
49 
51  ~subscriber(void);
52 
54  subscriber(const subscriber&) = delete;
56  subscriber& operator=(const subscriber&) = delete;
57 
58 public:
69  enum class connect_state {
70  dropped,
71  start,
72  sleeping,
73  ok,
74  failed,
76  stopped
77  };
78 
79 public:
83  typedef std::function<void(const std::string& host, std::size_t port, connect_state status)> connect_callback_t;
84 
95  void connect(
96  const std::string& host = "127.0.0.1",
97  std::size_t port = 6379,
98  const connect_callback_t& connect_callback = nullptr,
99  std::uint32_t timeout_msecs = 0,
100  std::int32_t max_reconnects = 0,
101  std::uint32_t reconnect_interval_msecs = 0);
102 
112  void connect(
113  const std::string& name,
114  const connect_callback_t& connect_callback = nullptr,
115  std::uint32_t timeout_msecs = 0,
116  std::int32_t max_reconnects = 0,
117  std::uint32_t reconnect_interval_msecs = 0);
118 
122  bool is_connected(void) const;
123 
129  void disconnect(bool wait_for_removal = false);
130 
134  bool is_reconnecting(void) const;
135 
139  void cancel_reconnect(void);
140 
141 public:
146  typedef std::function<void(reply&)> reply_callback_t;
147 
157  subscriber& auth(const std::string& password, const reply_callback_t& reply_callback = nullptr);
158 
163  typedef std::function<void(const std::string&, const std::string&)> subscribe_callback_t;
164 
169  typedef std::function<void(int64_t)> acknowledgement_callback_t;
170 
182  subscriber& subscribe(const std::string& channel, const subscribe_callback_t& callback, const acknowledgement_callback_t& acknowledgement_callback = nullptr);
183 
195  subscriber& psubscribe(const std::string& pattern, const subscribe_callback_t& callback, const acknowledgement_callback_t& acknowledgement_callback = nullptr);
196 
204  subscriber& unsubscribe(const std::string& channel);
205 
213  subscriber& punsubscribe(const std::string& pattern);
214 
221  subscriber& commit(void);
222 
223 public:
230  void add_sentinel(const std::string& host, std::size_t port);
231 
237  const sentinel& get_sentinel(void) const;
238 
245  sentinel& get_sentinel(void);
246 
250  void clear_sentinels(void);
251 
252 private:
257  subscribe_callback_t subscribe_callback;
258  acknowledgement_callback_t acknowledgement_callback;
259  };
260 
261 private:
269 
276 
283  void handle_acknowledgement_reply(const std::vector<reply>& reply);
284 
291  void handle_subscribe_reply(const std::vector<reply>& reply);
292 
299  void handle_psubscribe_reply(const std::vector<reply>& reply);
300 
309  void call_acknowledgement_callback(const std::string& channel, const std::map<std::string, callback_holder>& channels, std::mutex& channels_mtx, int64_t nb_chans);
310 
311 private:
316  void reconnect(void);
317 
321  void re_auth(void);
322 
326  void re_subscribe(void);
327 
331  bool should_reconnect(void) const;
332 
337 
341  void clear_subscriptions(void);
342 
343 private:
352  void unprotected_subscribe(const std::string& channel, const subscribe_callback_t& callback, const acknowledgement_callback_t& acknowledgement_callback);
353 
362  void unprotected_psubscribe(const std::string& pattern, const subscribe_callback_t& callback, const acknowledgement_callback_t& acknowledgement_callback);
363 
364 private:
368  std::string m_redis_server;
372  std::size_t m_redis_port = 0;
376  std::string m_master_name;
380  std::string m_password;
381 
386 
391 
395  std::uint32_t m_connect_timeout_msecs = 0;
399  std::int32_t m_max_reconnects = 0;
407  std::uint32_t m_reconnect_interval_msecs = 0;
408 
412  std::atomic_bool m_reconnecting = ATOMIC_VAR_INIT(false);
416  std::atomic_bool m_cancel = ATOMIC_VAR_INIT(false);
417 
421  std::map<std::string, callback_holder> m_subscribed_channels;
425  std::map<std::string, callback_holder> m_psubscribed_channels;
426 
430  connect_callback_t m_connect_callback;
431 
440 
444  reply_callback_t m_auth_reply_callback;
445 };
446 
447 } // namespace cpp_redis
void call_acknowledgement_callback(const std::string &channel, const std::map< std::string, callback_holder > &channels, std::mutex &channels_mtx, int64_t nb_chans)
void clear_sentinels(void)
network::redis_connection m_client
Definition: subscriber.hpp:385
Definition: redis_connection.hpp:45
Definition: subscriber.hpp:36
void sleep_before_next_reconnect_attempt(void)
std::size_t m_redis_port
Definition: subscriber.hpp:372
Definition: subscriber.hpp:256
std::function< void(const std::string &host, std::size_t port, connect_state status)> connect_callback_t
Definition: subscriber.hpp:83
std::atomic_bool m_reconnecting
Definition: subscriber.hpp:412
void clear_subscriptions(void)
std::mutex m_subscribed_channels_mutex
Definition: subscriber.hpp:439
bool should_reconnect(void) const
void connection_receive_handler(network::redis_connection &connection, reply &reply)
void unprotected_subscribe(const std::string &channel, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback)
void unprotected_psubscribe(const std::string &pattern, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback)
std::int32_t m_max_reconnects
Definition: subscriber.hpp:399
std::uint32_t m_connect_timeout_msecs
Definition: subscriber.hpp:395
std::int32_t m_current_reconnect_attempts
Definition: subscriber.hpp:403
subscriber & psubscribe(const std::string &pattern, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback=nullptr)
subscribe_callback_t subscribe_callback
Definition: subscriber.hpp:257
subscriber & commit(void)
Definition: reply.hpp:33
std::function< void(reply &)> reply_callback_t
Definition: subscriber.hpp:146
acknowledgement_callback_t acknowledgement_callback
Definition: subscriber.hpp:258
subscriber & operator=(const subscriber &)=delete
assignment operator
connect_callback_t m_connect_callback
Definition: subscriber.hpp:430
std::mutex m_psubscribed_channels_mutex
Definition: subscriber.hpp:435
std::map< std::string, callback_holder > m_subscribed_channels
Definition: subscriber.hpp:421
std::atomic_bool m_cancel
Definition: subscriber.hpp:416
std::string m_redis_server
Definition: subscriber.hpp:368
void handle_psubscribe_reply(const std::vector< reply > &reply)
bool is_reconnecting(void) const
cpp_redis::sentinel m_sentinel
Definition: subscriber.hpp:390
std::function< void(int64_t)> acknowledgement_callback_t
Definition: subscriber.hpp:169
connect_state
Definition: subscriber.hpp:69
void cancel_reconnect(void)
subscriber & auth(const std::string &password, const reply_callback_t &reply_callback=nullptr)
Definition: sentinel.hpp:34
std::uint32_t m_reconnect_interval_msecs
Definition: subscriber.hpp:407
void disconnect(bool wait_for_removal=false)
const sentinel & get_sentinel(void) const
subscriber & punsubscribe(const std::string &pattern)
void handle_acknowledgement_reply(const std::vector< reply > &reply)
std::string m_master_name
Definition: subscriber.hpp:376
subscriber & unsubscribe(const std::string &channel)
reply_callback_t m_auth_reply_callback
Definition: subscriber.hpp:444
void handle_subscribe_reply(const std::vector< reply > &reply)
void add_sentinel(const std::string &host, std::size_t port)
void connect(const std::string &host="127.0.0.1", std::size_t port=6379, const connect_callback_t &connect_callback=nullptr, std::uint32_t timeout_msecs=0, std::int32_t max_reconnects=0, std::uint32_t reconnect_interval_msecs=0)
std::function< void(const std::string &, const std::string &)> subscribe_callback_t
Definition: subscriber.hpp:163
std::string m_password
Definition: subscriber.hpp:380
std::map< std::string, callback_holder > m_psubscribed_channels
Definition: subscriber.hpp:425
bool is_connected(void) const
void connection_disconnection_handler(network::redis_connection &connection)
subscriber & subscribe(const std::string &channel, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback=nullptr)
Definition: array_builder.hpp:29