cpp_redis  4.0.0
cpp_redis is a C++11 Asynchronous Multi-Platform Lightweight Redis Client, with support for synchronous operations and pipelining.
Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes | List of all members
cpp_redis::subscriber Class Reference

#include <subscriber.hpp>

Classes

struct  callback_holder
 

Public Types

enum  connect_state {
  connect_state::dropped, connect_state::start, connect_state::sleeping, connect_state::ok,
  connect_state::failed, connect_state::lookup_failed, connect_state::stopped
}
 
typedef std::function< void(const std::string &host, std::size_t port, connect_state status)> connect_callback_t
 
typedef std::function< void(reply &)> reply_callback_t
 
typedef std::function< void(const std::string &, const std::string &)> subscribe_callback_t
 
typedef std::function< void(int64_t)> acknowledgement_callback_t
 

Public Member Functions

 subscriber (void)
 ctor More...
 
 subscriber (const std::shared_ptr< network::tcp_client_iface > &tcp_client)
 
 ~subscriber (void)
 dtor More...
 
 subscriber (const subscriber &)=delete
 copy ctor More...
 
subscriberoperator= (const subscriber &)=delete
 assignment operator More...
 
void connect (const std::string &host="127.0.0.1", std::size_t port=6379, const connect_callback_t &connect_callback=nullptr, std::uint32_t timeout_msecs=0, std::int32_t max_reconnects=0, std::uint32_t reconnect_interval_msecs=0)
 
void connect (const std::string &name, const connect_callback_t &connect_callback=nullptr, std::uint32_t timeout_msecs=0, std::int32_t max_reconnects=0, std::uint32_t reconnect_interval_msecs=0)
 
bool is_connected (void) const
 
void disconnect (bool wait_for_removal=false)
 
bool is_reconnecting (void) const
 
void cancel_reconnect (void)
 
subscriberauth (const std::string &password, const reply_callback_t &reply_callback=nullptr)
 
subscribersubscribe (const std::string &channel, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback=nullptr)
 
subscriberpsubscribe (const std::string &pattern, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback=nullptr)
 
subscriberunsubscribe (const std::string &channel)
 
subscriberpunsubscribe (const std::string &pattern)
 
subscribercommit (void)
 
void add_sentinel (const std::string &host, std::size_t port)
 
const sentinelget_sentinel (void) const
 
sentinelget_sentinel (void)
 
void clear_sentinels (void)
 

Private Member Functions

void connection_receive_handler (network::redis_connection &connection, reply &reply)
 
void connection_disconnection_handler (network::redis_connection &connection)
 
void handle_acknowledgement_reply (const std::vector< reply > &reply)
 
void handle_subscribe_reply (const std::vector< reply > &reply)
 
void handle_psubscribe_reply (const std::vector< reply > &reply)
 
void call_acknowledgement_callback (const std::string &channel, const std::map< std::string, callback_holder > &channels, std::mutex &channels_mtx, int64_t nb_chans)
 
void reconnect (void)
 
void re_auth (void)
 
void re_subscribe (void)
 
bool should_reconnect (void) const
 
void sleep_before_next_reconnect_attempt (void)
 
void clear_subscriptions (void)
 
void unprotected_subscribe (const std::string &channel, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback)
 
void unprotected_psubscribe (const std::string &pattern, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback)
 

Private Attributes

std::string m_redis_server
 
std::size_t m_redis_port = 0
 
std::string m_master_name
 
std::string m_password
 
network::redis_connection m_client
 
cpp_redis::sentinel m_sentinel
 
std::uint32_t m_connect_timeout_msecs = 0
 
std::int32_t m_max_reconnects = 0
 
std::int32_t m_current_reconnect_attempts = 0
 
std::uint32_t m_reconnect_interval_msecs = 0
 
std::atomic_bool m_reconnecting = ATOMIC_VAR_INIT(false)
 
std::atomic_bool m_cancel = ATOMIC_VAR_INIT(false)
 
std::map< std::string, callback_holderm_subscribed_channels
 
std::map< std::string, callback_holderm_psubscribed_channels
 
connect_callback_t m_connect_callback
 
std::mutex m_psubscribed_channels_mutex
 
std::mutex m_subscribed_channels_mutex
 
reply_callback_t m_auth_reply_callback
 

Member Typedef Documentation

◆ acknowledgement_callback_t

typedef std::function<void(int64_t)> cpp_redis::subscriber::acknowledgement_callback_t

acknowledgement callback called whenever a subscribe completes takes as parameter the int returned by the redis server (usually the number of channels you are subscribed to)

◆ connect_callback_t

typedef std::function<void(const std::string& host, std::size_t port, connect_state status)> cpp_redis::subscriber::connect_callback_t

connect handler, called whenever a new connection even occurred

◆ reply_callback_t

typedef std::function<void(reply&)> cpp_redis::subscriber::reply_callback_t

reply callback called whenever a reply is received takes as parameter the received reply

◆ subscribe_callback_t

typedef std::function<void(const std::string&, const std::string&)> cpp_redis::subscriber::subscribe_callback_t

subscribe callback, called whenever a new message is published on a subscribed channel takes as parameter the channel and the message

Member Enumeration Documentation

◆ connect_state

high availability (re)connection states

  • dropped: connection has dropped
  • start: attemp of connection has started
  • sleeping: sleep between two attemps
  • ok: connected
  • failed: failed to connect
  • lookup failed: failed to retrieve master sentinel
  • stopped: stop to try to reconnect
Enumerator
dropped 
start 
sleeping 
ok 
failed 
lookup_failed 
stopped 

Constructor & Destructor Documentation

◆ subscriber() [1/3]

cpp_redis::subscriber::subscriber ( void  )

ctor

◆ subscriber() [2/3]

cpp_redis::subscriber::subscriber ( const std::shared_ptr< network::tcp_client_iface > &  tcp_client)
explicit

custom ctor to specify custom tcp_client

Parameters
tcp_clienttcp client to be used for network communications

◆ ~subscriber()

cpp_redis::subscriber::~subscriber ( void  )

dtor

◆ subscriber() [3/3]

cpp_redis::subscriber::subscriber ( const subscriber )
delete

copy ctor

Member Function Documentation

◆ add_sentinel()

void cpp_redis::subscriber::add_sentinel ( const std::string &  host,
std::size_t  port 
)

add a sentinel definition. Required for connect() or get_master_addr_by_name() when autoconnect is enabled.

Parameters
hostsentinel host
portsentinel port

◆ auth()

subscriber& cpp_redis::subscriber::auth ( const std::string &  password,
const reply_callback_t reply_callback = nullptr 
)

ability to authenticate on the redis server if necessary this method should not be called repeatedly as the storage of reply_callback is NOT threadsafe (only one reply callback is stored for the subscriber client) calling repeatedly auth() is undefined concerning the execution of the associated callbacks

Parameters
passwordpassword to be used for authentication
reply_callbackcallback to be called on auth completion (nullable)
Returns
current instance

◆ call_acknowledgement_callback()

void cpp_redis::subscriber::call_acknowledgement_callback ( const std::string &  channel,
const std::map< std::string, callback_holder > &  channels,
std::mutex &  channels_mtx,
int64_t  nb_chans 
)
private

find channel or pattern that is associated to the reply and call its ack callback

Parameters
channelchannel or pattern that caused the issuance of this reply
channelslist of channels or patterns to be searched for the received channel
channels_mtxchannels or patterns mtx to be locked for race condition
nb_chansredis server ack reply

◆ cancel_reconnect()

void cpp_redis::subscriber::cancel_reconnect ( void  )

stop any reconnect in progress

◆ clear_sentinels()

void cpp_redis::subscriber::clear_sentinels ( void  )

clear all existing sentinels.

◆ clear_subscriptions()

void cpp_redis::subscriber::clear_subscriptions ( void  )
private

clear all subscriptions (dirty way, no unsub/punsub commands send: mostly used for cleaning in disconnection condition)

◆ commit()

subscriber& cpp_redis::subscriber::commit ( void  )

commit pipelined transaction that is, send to the network all commands pipelined by calling send() / subscribe() / ...

Returns
current instance

◆ connect() [1/2]

void cpp_redis::subscriber::connect ( const std::string &  host = "127.0.0.1",
std::size_t  port = 6379,
const connect_callback_t connect_callback = nullptr,
std::uint32_t  timeout_msecs = 0,
std::int32_t  max_reconnects = 0,
std::uint32_t  reconnect_interval_msecs = 0 
)

Connect to redis server

Parameters
hosthost to be connected to
portport to be connected to
connect_callbackconnect handler to be called on connect events (may be null)
timeout_msecsmaximum time to connect
max_reconnectsmaximum attemps of reconnection if connection dropped
reconnect_interval_msecstime between two attemps of reconnection

◆ connect() [2/2]

void cpp_redis::subscriber::connect ( const std::string &  name,
const connect_callback_t connect_callback = nullptr,
std::uint32_t  timeout_msecs = 0,
std::int32_t  max_reconnects = 0,
std::uint32_t  reconnect_interval_msecs = 0 
)

Connect to redis server

Parameters
namesentinel name
connect_callbackconnect handler to be called on connect events (may be null)
timeout_msecsmaximum time to connect
max_reconnectsmaximum attemps of reconnection if connection dropped
reconnect_interval_msecstime between two attemps of reconnection

◆ connection_disconnection_handler()

void cpp_redis::subscriber::connection_disconnection_handler ( network::redis_connection connection)
private

redis_connection disconnection handler, triggered whenever a disconnection occured

Parameters
connectionredis_connection instance

◆ connection_receive_handler()

void cpp_redis::subscriber::connection_receive_handler ( network::redis_connection connection,
reply reply 
)
private

redis connection receive handler, triggered whenever a reply has been read by the redis connection

Parameters
connectionredis_connection instance
replyparsed reply

◆ disconnect()

void cpp_redis::subscriber::disconnect ( bool  wait_for_removal = false)

disconnect from redis server

Parameters
wait_for_removalwhen sets to true, disconnect blocks until the underlying TCP client has been effectively removed from the io_service and that all the underlying callbacks have completed.

◆ get_sentinel() [1/2]

const sentinel& cpp_redis::subscriber::get_sentinel ( void  ) const

retrieve sentinel for current client

Returns
sentinel associated to current client

◆ get_sentinel() [2/2]

sentinel& cpp_redis::subscriber::get_sentinel ( void  )

retrieve sentinel for current client non-const version

Returns
sentinel associated to current client

◆ handle_acknowledgement_reply()

void cpp_redis::subscriber::handle_acknowledgement_reply ( const std::vector< reply > &  reply)
private

trigger the ack callback for matching channel/pattern check if reply is valid

Parameters
replyreceived reply

◆ handle_psubscribe_reply()

void cpp_redis::subscriber::handle_psubscribe_reply ( const std::vector< reply > &  reply)
private

trigger the sub callback for all matching channels/patterns check if reply is valid

Parameters
replyreceived reply

◆ handle_subscribe_reply()

void cpp_redis::subscriber::handle_subscribe_reply ( const std::vector< reply > &  reply)
private

trigger the sub callback for all matching channels/patterns check if reply is valid

Parameters
replyreceived reply

◆ is_connected()

bool cpp_redis::subscriber::is_connected ( void  ) const
Returns
whether we are connected to the redis server

◆ is_reconnecting()

bool cpp_redis::subscriber::is_reconnecting ( void  ) const
Returns
whether an attemp to reconnect is in progress

◆ operator=()

subscriber& cpp_redis::subscriber::operator= ( const subscriber )
delete

assignment operator

◆ psubscribe()

subscriber& cpp_redis::subscriber::psubscribe ( const std::string &  pattern,
const subscribe_callback_t callback,
const acknowledgement_callback_t acknowledgement_callback = nullptr 
)

PSubscribes to the given channel and:

  • calls acknowledgement_callback once the server has acknowledged about the subscription.
  • calls subscribe_callback each time a message is published on this channel. The command is not effectively sent immediately but stored in an internal buffer until commit() is called.
Parameters
patternpattern to psubscribe
callbackcallback to be called whenever a message is received for this pattern
acknowledgement_callbackcallback to be called on subscription completion (nullable)
Returns
current instance

◆ punsubscribe()

subscriber& cpp_redis::subscriber::punsubscribe ( const std::string &  pattern)

punsubscribe from the given pattern The command is not effectively sent immediately, but stored inside an internal buffer until commit() is called.

Parameters
patternpattern to punsubscribe from
Returns
current instance

◆ re_auth()

void cpp_redis::subscriber::re_auth ( void  )
private

re authenticate to redis server based on previously used password

◆ re_subscribe()

void cpp_redis::subscriber::re_subscribe ( void  )
private

resubscribe (sub and psub) to previously subscribed channels/patterns

◆ reconnect()

void cpp_redis::subscriber::reconnect ( void  )
private

reconnect to the previously connected host automatically re authenticate and resubscribe to subscribed channel in case of success

◆ should_reconnect()

bool cpp_redis::subscriber::should_reconnect ( void  ) const
private
Returns
whether a reconnection attempt should be performed

◆ sleep_before_next_reconnect_attempt()

void cpp_redis::subscriber::sleep_before_next_reconnect_attempt ( void  )
private

sleep between two reconnect attemps if necessary

◆ subscribe()

subscriber& cpp_redis::subscriber::subscribe ( const std::string &  channel,
const subscribe_callback_t callback,
const acknowledgement_callback_t acknowledgement_callback = nullptr 
)

Subscribes to the given channel and:

  • calls acknowledgement_callback once the server has acknowledged about the subscription.
  • calls subscribe_callback each time a message is published on this channel. The command is not effectively sent immediately but stored in an internal buffer until commit() is called.
Parameters
channelchannel to subscribe
callbackcallback to be called whenever a message is received for this channel
acknowledgement_callbackcallback to be called on subscription completion (nullable)
Returns
current instance

◆ unprotected_psubscribe()

void cpp_redis::subscriber::unprotected_psubscribe ( const std::string &  pattern,
const subscribe_callback_t callback,
const acknowledgement_callback_t acknowledgement_callback 
)
private

unprotected psub same as psubscribe, but without any mutex lock

Parameters
patternpattern to psubscribe
callbackcallback to be called whenever a message is received for this pattern
acknowledgement_callbackcallback to be called on subscription completion (nullable)

◆ unprotected_subscribe()

void cpp_redis::subscriber::unprotected_subscribe ( const std::string &  channel,
const subscribe_callback_t callback,
const acknowledgement_callback_t acknowledgement_callback 
)
private

unprotected sub same as subscribe, but without any mutex lock

Parameters
channelchannel to subscribe
callbackcallback to be called whenever a message is received for this channel
acknowledgement_callbackcallback to be called on subscription completion (nullable)

◆ unsubscribe()

subscriber& cpp_redis::subscriber::unsubscribe ( const std::string &  channel)

unsubscribe from the given channel The command is not effectively sent immediately, but stored inside an internal buffer until commit() is called.

Parameters
channelchannel to unsubscribe from
Returns
current instance

Member Data Documentation

◆ m_auth_reply_callback

reply_callback_t cpp_redis::subscriber::m_auth_reply_callback
private

auth reply callback

◆ m_cancel

std::atomic_bool cpp_redis::subscriber::m_cancel = ATOMIC_VAR_INIT(false)
private

to force cancel reconnection

◆ m_client

network::redis_connection cpp_redis::subscriber::m_client
private

tcp client for redis connection

◆ m_connect_callback

connect_callback_t cpp_redis::subscriber::m_connect_callback
private

connect handler

◆ m_connect_timeout_msecs

std::uint32_t cpp_redis::subscriber::m_connect_timeout_msecs = 0
private

max time to connect

◆ m_current_reconnect_attempts

std::int32_t cpp_redis::subscriber::m_current_reconnect_attempts = 0
private

current number of attemps to reconect

◆ m_master_name

std::string cpp_redis::subscriber::m_master_name
private

master name (if we are using sentinel) we are connected to

◆ m_max_reconnects

std::int32_t cpp_redis::subscriber::m_max_reconnects = 0
private

max number of reconnection attemps

◆ m_password

std::string cpp_redis::subscriber::m_password
private

password used to authenticate

◆ m_psubscribed_channels

std::map<std::string, callback_holder> cpp_redis::subscriber::m_psubscribed_channels
private

psubscribed channels and their associated channels

◆ m_psubscribed_channels_mutex

std::mutex cpp_redis::subscriber::m_psubscribed_channels_mutex
private

sub chans thread safety

◆ m_reconnect_interval_msecs

std::uint32_t cpp_redis::subscriber::m_reconnect_interval_msecs = 0
private

time between two reconnection attemps

◆ m_reconnecting

std::atomic_bool cpp_redis::subscriber::m_reconnecting = ATOMIC_VAR_INIT(false)
private

reconnection status

◆ m_redis_port

std::size_t cpp_redis::subscriber::m_redis_port = 0
private

port we are connected to

◆ m_redis_server

std::string cpp_redis::subscriber::m_redis_server
private

server we are connected to

◆ m_sentinel

cpp_redis::sentinel cpp_redis::subscriber::m_sentinel
private

redis sentinel

◆ m_subscribed_channels

std::map<std::string, callback_holder> cpp_redis::subscriber::m_subscribed_channels
private

subscribed channels and their associated channels

◆ m_subscribed_channels_mutex

std::mutex cpp_redis::subscriber::m_subscribed_channels_mutex
private

psub chans thread safety


The documentation for this class was generated from the following file: