Tacopie  3.0.0
Tacopie is a TCP Client & Server C++11 library.
io_service.hpp
Go to the documentation of this file.
1 // MIT License
2 //
3 // Copyright (c) 2016-2017 Simon Ninon <simon.ninon@gmail.com>
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in all
13 // copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 // SOFTWARE.
22 
23 #pragma once
24 
25 #include <atomic>
26 #include <condition_variable>
27 #include <functional>
28 #include <memory>
29 #include <mutex>
30 #include <thread>
31 #include <unordered_map>
32 #include <vector>
33 
34 #ifdef _WIN32
35 #include <Winsock2.h>
36 #else
37 #include <sys/select.h>
38 #endif /* _WIN32 */
39 
43 
44 #ifndef __TACOPIE_IO_SERVICE_NB_WORKERS
45 #define __TACOPIE_IO_SERVICE_NB_WORKERS 1
46 #endif /* __TACOPIE_IO_SERVICE_NB_WORKERS */
47 
48 namespace tacopie {
49 
54 class io_service {
55 public:
61  io_service(std::size_t nb_threads = __TACOPIE_IO_SERVICE_NB_WORKERS);
62 
64  ~io_service(void);
65 
67  io_service(const io_service&) = delete;
69  io_service& operator=(const io_service&) = delete;
70 
71 public:
79  void set_nb_workers(std::size_t nb_threads);
80 
81 public:
84  typedef std::function<void(fd_t)> event_callback_t;
85 
87  void track(const tcp_socket& socket, const event_callback_t& rd_callback = nullptr, const event_callback_t& wr_callback = nullptr);
88  void set_rd_callback(const tcp_socket& socket, const event_callback_t& event_callback);
89  void set_wr_callback(const tcp_socket& socket, const event_callback_t& event_callback);
90  void untrack(const tcp_socket& socket);
91 
94  void wait_for_removal(const tcp_socket& socket);
95 
96 private:
107  struct tracked_socket {
110  : rd_callback(nullptr)
111  , wr_callback(nullptr) {}
112 
114  event_callback_t rd_callback;
115  std::atomic<bool> is_executing_rd_callback = ATOMIC_VAR_INIT(false);
116 
118  event_callback_t wr_callback;
119  std::atomic<bool> is_executing_wr_callback = ATOMIC_VAR_INIT(false);
120 
122  std::atomic<bool> marked_for_untrack = ATOMIC_VAR_INIT(false);
123  };
124 
125 private:
130  void poll(void);
131 
138  int init_poll_fds_info(void);
139 
144  void process_events(void);
145 
152  void process_rd_event(const fd_t& fd, tracked_socket& socket);
153 
160  void process_wr_event(const fd_t& fd, tracked_socket& socket);
161 
162 private:
166  std::unordered_map<fd_t, tracked_socket> m_tracked_sockets;
167 
171  std::atomic<bool> m_should_stop;
172 
176  std::thread m_poll_worker;
177 
182 
187 
191  std::vector<fd_t> m_polled_fds;
192 
196  fd_set m_rd_set;
197 
201  fd_set m_wr_set;
202 
206  std::condition_variable m_wait_for_removal_condvar;
207 
212 };
213 
222 const std::shared_ptr<io_service>& get_default_io_service(std::uint32_t num_io_workers = 1);
223 
229 void set_default_io_service(const std::shared_ptr<io_service>& service);
230 
231 } // namespace tacopie
int init_poll_fds_info(void)
std::atomic< bool > is_executing_wr_callback
Definition: io_service.hpp:119
void set_default_io_service(const std::shared_ptr< io_service > &service)
void set_nb_workers(std::size_t nb_threads)
#define __TACOPIE_IO_SERVICE_NB_WORKERS
Definition: io_service.hpp:45
std::atomic< bool > marked_for_untrack
marked for untrack
Definition: io_service.hpp:122
tracked_socket(void)
ctor
Definition: io_service.hpp:109
~io_service(void)
dtor
std::function< void(fd_t)> event_callback_t
Definition: io_service.hpp:84
io_service(std::size_t nb_threads=__TACOPIE_IO_SERVICE_NB_WORKERS)
tacopie::self_pipe m_notifier
Definition: io_service.hpp:211
void untrack(const tcp_socket &socket)
void process_wr_event(const fd_t &fd, tracked_socket &socket)
io_service & operator=(const io_service &)=delete
assignment operator
Definition: tcp_socket.hpp:38
Definition: io_service.hpp:54
std::atomic< bool > m_should_stop
Definition: io_service.hpp:171
utils::thread_pool m_callback_workers
Definition: io_service.hpp:181
std::condition_variable m_wait_for_removal_condvar
Definition: io_service.hpp:206
fd_set m_rd_set
Definition: io_service.hpp:196
void wait_for_removal(const tcp_socket &socket)
std::mutex m_tracked_sockets_mtx
Definition: io_service.hpp:186
Definition: self_pipe.hpp:29
std::thread m_poll_worker
Definition: io_service.hpp:176
std::vector< fd_t > m_polled_fds
Definition: io_service.hpp:191
Definition: io_service.hpp:48
void process_rd_event(const fd_t &fd, tracked_socket &socket)
std::unordered_map< fd_t, tracked_socket > m_tracked_sockets
Definition: io_service.hpp:166
void set_wr_callback(const tcp_socket &socket, const event_callback_t &event_callback)
Definition: io_service.hpp:107
fd_set m_wr_set
Definition: io_service.hpp:201
void process_events(void)
event_callback_t rd_callback
rd event
Definition: io_service.hpp:114
void track(const tcp_socket &socket, const event_callback_t &rd_callback=nullptr, const event_callback_t &wr_callback=nullptr)
track & untrack socket
Definition: thread_pool.hpp:37
int fd_t
file descriptor platform type
Definition: typedefs.hpp:36
void set_rd_callback(const tcp_socket &socket, const event_callback_t &event_callback)
const std::shared_ptr< io_service > & get_default_io_service(std::uint32_t num_io_workers=1)
event_callback_t wr_callback
wr event
Definition: io_service.hpp:118
std::atomic< bool > is_executing_rd_callback
Definition: io_service.hpp:115