Tacopie  3.0.0
Tacopie is a TCP Client & Server C++11 library.
io_service.hpp
1 // MIT License
2 //
3 // Copyright (c) 2016-2017 Simon Ninon <simon.ninon@gmail.com>
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in all
13 // copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 // SOFTWARE.
22 
23 #pragma once
24 
25 #include <atomic>
26 #include <condition_variable>
27 #include <functional>
28 #include <memory>
29 #include <mutex>
30 #include <thread>
31 #include <unordered_map>
32 #include <vector>
33 
34 #ifdef _WIN32
35 #include <Winsock2.h>
36 #else
37 #include <sys/select.h>
38 #endif /* _WIN32 */
39 
40 #include <tacopie/network/self_pipe.hpp>
41 #include <tacopie/network/tcp_socket.hpp>
42 #include <tacopie/utils/thread_pool.hpp>
43 
44 #ifndef __TACOPIE_IO_SERVICE_NB_WORKERS
45 #define __TACOPIE_IO_SERVICE_NB_WORKERS 1
46 #endif /* __TACOPIE_IO_SERVICE_NB_WORKERS */
47 
48 namespace tacopie {
49 
54 class io_service {
55 public:
61  io_service(std::size_t nb_threads = __TACOPIE_IO_SERVICE_NB_WORKERS);
62 
64  ~io_service(void);
65 
67  io_service(const io_service&) = delete;
69  io_service& operator=(const io_service&) = delete;
70 
71 public:
79  void set_nb_workers(std::size_t nb_threads);
80 
81 public:
84  typedef std::function<void(fd_t)> event_callback_t;
85 
95  void track(const tcp_socket& socket, const event_callback_t& rd_callback = nullptr, const event_callback_t& wr_callback = nullptr);
96 
104  void set_rd_callback(const tcp_socket& socket, const event_callback_t& event_callback);
105 
113  void set_wr_callback(const tcp_socket& socket, const event_callback_t& event_callback);
114 
125  void untrack(const tcp_socket& socket);
126 
133  void wait_for_removal(const tcp_socket& socket);
134 
135 private:
146  struct tracked_socket {
148  tracked_socket(void)
149  : rd_callback(nullptr)
150  , wr_callback(nullptr) {}
151 
153  event_callback_t rd_callback;
154  std::atomic<bool> is_executing_rd_callback = ATOMIC_VAR_INIT(false);
155 
157  event_callback_t wr_callback;
158  std::atomic<bool> is_executing_wr_callback = ATOMIC_VAR_INIT(false);
159 
161  std::atomic<bool> marked_for_untrack = ATOMIC_VAR_INIT(false);
162  };
163 
164 private:
169  void poll(void);
170 
177  int init_poll_fds_info(void);
178 
183  void process_events(void);
184 
191  void process_rd_event(const fd_t& fd, tracked_socket& socket);
192 
199  void process_wr_event(const fd_t& fd, tracked_socket& socket);
200 
201 private:
205  std::unordered_map<fd_t, tracked_socket> m_tracked_sockets;
206 
210  std::atomic<bool> m_should_stop;
211 
215  std::thread m_poll_worker;
216 
220  utils::thread_pool m_callback_workers;
221 
225  std::mutex m_tracked_sockets_mtx;
226 
230  std::vector<fd_t> m_polled_fds;
231 
235  fd_set m_rd_set;
236 
240  fd_set m_wr_set;
241 
245  std::condition_variable m_wait_for_removal_condvar;
246 
250  tacopie::self_pipe m_notifier;
251 };
252 
261 const std::shared_ptr<io_service>& get_default_io_service(std::uint32_t num_io_workers = 1);
262 
268 void set_default_io_service(const std::shared_ptr<io_service>& service);
269 
270 } // namespace tacopie
void set_nb_workers(std::size_t nb_threads)
~io_service(void)
dtor
std::function< void(fd_t)> event_callback_t
Definition: io_service.hpp:84
io_service(std::size_t nb_threads=__TACOPIE_IO_SERVICE_NB_WORKERS)
void untrack(const tcp_socket &socket)
io_service & operator=(const io_service &)=delete
assignment operator
Definition: tcp_socket.hpp:38
Definition: io_service.hpp:54
void wait_for_removal(const tcp_socket &socket)
Definition: self_pipe.hpp:33
Definition: io_service.hpp:48
void set_wr_callback(const tcp_socket &socket, const event_callback_t &event_callback)
void track(const tcp_socket &socket, const event_callback_t &rd_callback=nullptr, const event_callback_t &wr_callback=nullptr)
Definition: thread_pool.hpp:40
void set_rd_callback(const tcp_socket &socket, const event_callback_t &event_callback)