30 #include <cpp_redis/core/sentinel.hpp> 31 #include <cpp_redis/network/redis_connection.hpp> 32 #include <cpp_redis/network/tcp_client_iface.hpp> 44 #ifndef __CPP_REDIS_USE_CUSTOM_TCP_CLIENT 54 explicit subscriber(
const std::shared_ptr<network::tcp_client_iface>& tcp_client);
89 typedef std::function<void(const std::string& host, std::size_t port, connect_state status)>
connect_callback_t;
102 const std::string& host =
"127.0.0.1",
103 std::size_t port = 6379,
104 const connect_callback_t& connect_callback =
nullptr,
105 std::uint32_t timeout_msecs = 0,
106 std::int32_t max_reconnects = 0,
107 std::uint32_t reconnect_interval_msecs = 0);
119 const std::string& name,
120 const connect_callback_t& connect_callback =
nullptr,
121 std::uint32_t timeout_msecs = 0,
122 std::int32_t max_reconnects = 0,
123 std::uint32_t reconnect_interval_msecs = 0);
135 void disconnect(
bool wait_for_removal =
false);
163 subscriber&
auth(
const std::string& password,
const reply_callback_t& reply_callback =
nullptr);
188 subscriber&
subscribe(
const std::string& channel,
const subscribe_callback_t& callback,
const acknowledgement_callback_t& acknowledgement_callback =
nullptr);
201 subscriber&
psubscribe(
const std::string& pattern,
const subscribe_callback_t& callback,
const acknowledgement_callback_t& acknowledgement_callback =
nullptr);
237 void add_sentinel(
const std::string& host, std::size_t port, std::uint32_t timeout_msecs = 0);
263 struct callback_holder {
264 subscribe_callback_t subscribe_callback;
265 acknowledgement_callback_t acknowledgement_callback;
290 void handle_acknowledgement_reply(
const std::vector<reply>&
reply);
298 void handle_subscribe_reply(
const std::vector<reply>&
reply);
306 void handle_psubscribe_reply(
const std::vector<reply>&
reply);
316 void call_acknowledgement_callback(
const std::string& channel,
const std::map<std::string, callback_holder>& channels, std::mutex& channels_mtx, int64_t nb_chans);
323 void reconnect(
void);
333 void re_subscribe(
void);
338 bool should_reconnect(
void)
const;
343 void sleep_before_next_reconnect_attempt(
void);
348 void clear_subscriptions(
void);
359 void unprotected_subscribe(
const std::string& channel,
const subscribe_callback_t& callback,
const acknowledgement_callback_t& acknowledgement_callback);
369 void unprotected_psubscribe(
const std::string& pattern,
const subscribe_callback_t& callback,
const acknowledgement_callback_t& acknowledgement_callback);
375 std::string m_redis_server;
379 std::size_t m_redis_port = 0;
383 std::string m_master_name;
387 std::string m_password;
402 std::uint32_t m_connect_timeout_msecs = 0;
406 std::int32_t m_max_reconnects = 0;
410 std::int32_t m_current_reconnect_attempts = 0;
414 std::uint32_t m_reconnect_interval_msecs = 0;
419 std::atomic_bool m_reconnecting;
423 std::atomic_bool m_cancel;
428 std::map<std::string, callback_holder> m_subscribed_channels;
432 std::map<std::string, callback_holder> m_psubscribed_channels;
437 connect_callback_t m_connect_callback;
442 std::mutex m_psubscribed_channels_mutex;
446 std::mutex m_subscribed_channels_mutex;
451 reply_callback_t m_auth_reply_callback;
void clear_sentinels(void)
Definition: redis_connection.hpp:45
Definition: subscriber.hpp:42
std::function< void(const std::string &host, std::size_t port, connect_state status)> connect_callback_t
Definition: subscriber.hpp:89
subscriber & psubscribe(const std::string &pattern, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback=nullptr)
subscriber & commit(void)
std::function< void(reply &)> reply_callback_t
Definition: subscriber.hpp:152
subscriber & operator=(const subscriber &)=delete
assignment operator
bool is_reconnecting(void) const
std::function< void(int64_t)> acknowledgement_callback_t
Definition: subscriber.hpp:175
connect_state
Definition: subscriber.hpp:75
void cancel_reconnect(void)
subscriber & auth(const std::string &password, const reply_callback_t &reply_callback=nullptr)
Definition: sentinel.hpp:39
void add_sentinel(const std::string &host, std::size_t port, std::uint32_t timeout_msecs=0)
void disconnect(bool wait_for_removal=false)
const sentinel & get_sentinel(void) const
subscriber & punsubscribe(const std::string &pattern)
subscriber & unsubscribe(const std::string &channel)
void connect(const std::string &host="127.0.0.1", std::size_t port=6379, const connect_callback_t &connect_callback=nullptr, std::uint32_t timeout_msecs=0, std::int32_t max_reconnects=0, std::uint32_t reconnect_interval_msecs=0)
std::function< void(const std::string &, const std::string &)> subscribe_callback_t
Definition: subscriber.hpp:169
bool is_connected(void) const
subscriber & subscribe(const std::string &channel, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback=nullptr)
Definition: array_builder.hpp:29