packio
dispatcher.h
Go to the documentation of this file.
1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 #ifndef PACKIO_DISPATCHER_H
6 #define PACKIO_DISPATCHER_H
7 
10 
11 #include <functional>
12 #include <memory>
13 #include <mutex>
14 #include <optional>
15 #include <string_view>
16 #include <tuple>
17 
18 #include "handler.h"
19 #include "internal/config.h"
20 #include "internal/movable_function.h"
21 #include "internal/rpc.h"
22 #include "internal/utils.h"
23 #include "traits.h"
24 
25 namespace packio {
26 
31 template <typename Rpc, template <class...> class Map = default_map, typename Lockable = default_mutex>
32 class dispatcher {
33 public:
35  using rpc_type = Rpc;
37  using mutex_type = Lockable;
39  using args_type = decltype(typename rpc_type::request_type{}.args);
41  using function_type = internal::movable_function<
44  using function_ptr_type = std::shared_ptr<function_type>;
45 
50  template <
51  typename SyncProcedure,
52  std::size_t N = internal::func_traits<SyncProcedure>::args_count>
53  bool add(
54  std::string_view name,
55  const std::array<std::string, N>& arguments_names,
56  SyncProcedure&& fct)
57  {
58  PACKIO_STATIC_ASSERT_TRAIT(SyncProcedure);
59  std::unique_lock lock{map_mutex_};
60  return function_map_
61  .emplace(
62  name,
63  std::make_shared<function_type>(wrap_sync(
64  std::forward<SyncProcedure>(fct), arguments_names)))
65  .second;
66  }
67 
69  template <typename SyncProcedure>
70  bool add(std::string_view name, SyncProcedure&& fct)
71  {
72  return add<SyncProcedure, 0>(name, {}, std::forward<SyncProcedure>(fct));
73  }
74 
79  template <
80  typename AsyncProcedure,
81  std::size_t N = internal::func_traits<AsyncProcedure>::args_count - 1>
82  bool add_async(
83  std::string_view name,
84  const std::array<std::string, N>& arguments_names,
85  AsyncProcedure&& fct)
86  {
87  PACKIO_STATIC_ASSERT_TTRAIT(AsyncProcedure, rpc_type);
88  std::unique_lock lock{map_mutex_};
89  return function_map_
90  .emplace(
91  name,
92  std::make_shared<function_type>(wrap_async(
93  std::forward<AsyncProcedure>(fct), arguments_names)))
94  .second;
95  }
96 
98  template <typename AsyncProcedure>
99  bool add_async(std::string_view name, AsyncProcedure&& fct)
100  {
101  return add_async<AsyncProcedure, 0>(
102  name, {}, std::forward<AsyncProcedure>(fct));
103  }
104 
105 #if defined(PACKIO_HAS_CO_AWAIT)
106  template <
112  typename Executor,
113  typename CoroProcedure,
114  std::size_t N = internal::func_traits<CoroProcedure>::args_count>
115  bool add_coro(
116  std::string_view name,
117  const Executor& executor,
118  const std::array<std::string, N>& arguments_names,
119  CoroProcedure&& coro)
120  {
121  PACKIO_STATIC_ASSERT_TRAIT(CoroProcedure);
122  std::unique_lock lock{map_mutex_};
123  return function_map_
124  .emplace(
125  name,
126  std::make_shared<function_type>(wrap_coro(
127  executor, std::forward<CoroProcedure>(coro), arguments_names)))
128  .second;
129  }
130 
132  template <typename Executor, typename CoroProcedure>
133  bool add_coro(std::string_view name, const Executor& executor, CoroProcedure&& coro)
134  {
135  return add_coro<Executor, CoroProcedure, 0>(
136  name, executor, {}, std::forward<CoroProcedure>(coro));
137  }
138 
140  template <
141  typename ExecutionContext,
142  typename CoroProcedure,
143  std::size_t N = internal::func_traits<CoroProcedure>::args_count>
144  bool add_coro(
145  std::string_view name,
146  ExecutionContext& ctx,
147  const std::array<std::string, N>& arguments_names,
148  CoroProcedure&& coro)
149  {
150  return add_coro<decltype(ctx.get_executor()), CoroProcedure, N>(
151  name,
152  ctx.get_executor(),
153  arguments_names,
154  std::forward<CoroProcedure>(coro));
155  }
156 
158  template <typename ExecutionContext, typename CoroProcedure>
159  bool add_coro(std::string_view name, ExecutionContext& ctx, CoroProcedure&& coro)
160  {
161  return add_coro<ExecutionContext, CoroProcedure, 0>(
162  name, ctx, {}, std::forward<CoroProcedure>(coro));
163  }
164 #endif // defined(PACKIO_HAS_CO_AWAIT)
165 
169  bool remove(const std::string& name)
170  {
171  std::unique_lock lock{map_mutex_};
172  return function_map_.erase(name);
173  }
174 
178  bool has(const std::string& name) const
179  {
180  std::unique_lock lock{map_mutex_};
181  return function_map_.find(name) != function_map_.end();
182  }
183 
186  size_t clear()
187  {
188  std::unique_lock lock{map_mutex_};
189  size_t size = function_map_.size();
190  function_map_.clear();
191  return size;
192  }
193 
196  std::vector<std::string> known() const
197  {
198  std::unique_lock lock{map_mutex_};
199  std::vector<std::string> names;
200  names.reserve(function_map_.size());
201  std::transform(
202  function_map_.begin(),
203  function_map_.end(),
204  std::back_inserter(names),
205  [](const typename decltype(function_map_)::value_type& pair) {
206  return pair.first;
207  });
208  return names;
209  }
210 
211  function_ptr_type get(const std::string& name) const
212  {
213  std::unique_lock lock{map_mutex_};
214  auto it = function_map_.find(name);
215  if (it == function_map_.end()) {
216  return {};
217  }
218  else {
219  return it->second;
220  }
221  }
222 
223 private:
224  using function_map_type = Map<std::string, function_ptr_type>;
225 
226  template <typename TArgs, std::size_t NNamedArgs>
227  static void static_assert_arguments_name_and_count()
228  {
229  static_assert(
230  NNamedArgs == 0 || std::tuple_size_v<TArgs> == NNamedArgs,
231  "incompatible arguments count and names");
232  }
233 
234  template <typename F, std::size_t N>
235  auto wrap_sync(F&& fct, const std::array<std::string, N>& args_names)
236  {
237  using value_args =
238  internal::decay_tuple_t<typename internal::func_traits<F>::args_type>;
239  using result_type = typename internal::func_traits<F>::result_type;
240  static_assert_arguments_name_and_count<value_args, N>();
241 
242  return
243  [fct = std::forward<F>(fct), args_names](
244  completion_handler<rpc_type> handler, args_type&& args) mutable {
245  auto typed_args = rpc_type::template extract_args<value_args>(
246  std::move(args), args_names);
247  if (!typed_args) {
248  PACKIO_DEBUG("incompatible arguments");
249  handler.set_error("Incompatible arguments");
250  return;
251  }
252 
253  if constexpr (std::is_void_v<result_type>) {
254  std::apply(fct, std::move(*typed_args));
255  handler();
256  }
257  else {
258  handler(std::apply(fct, std::move(*typed_args)));
259  }
260  };
261  }
262 
263  template <typename F, std::size_t N>
264  auto wrap_async(F&& fct, const std::array<std::string, N>& args_names)
265  {
266  using args = typename internal::func_traits<F>::args_type;
267  using value_args = internal::decay_tuple_t<internal::shift_tuple_t<args>>;
268  static_assert_arguments_name_and_count<value_args, N>();
269 
270  return
271  [fct = std::forward<F>(fct), args_names](
272  completion_handler<rpc_type> handler, args_type&& args) mutable {
273  auto typed_args = rpc_type::template extract_args<value_args>(
274  std::move(args), args_names);
275  if (!typed_args) {
276  PACKIO_DEBUG("incompatible arguments");
277  handler.set_error("Incompatible arguments");
278  return;
279  }
280 
281  std::apply(
282  [&](auto&&... args) {
283  fct(std::move(handler),
284  std::forward<decltype(args)>(args)...);
285  },
286  std::move(*typed_args));
287  };
288  }
289 
290 #if defined(PACKIO_HAS_CO_AWAIT)
291  template <typename E, typename C, std::size_t N>
292  auto wrap_coro(
293  const E& executor,
294  C&& coro,
295  const std::array<std::string, N>& args_names)
296  {
297  using value_args =
298  internal::decay_tuple_t<typename internal::func_traits<C>::args_type>;
299  using result_type =
300  typename internal::func_traits<C>::result_type::value_type;
301  static_assert_arguments_name_and_count<value_args, N>();
302 
303  return [executor, coro = std::forward<C>(coro), args_names](
304  completion_handler<rpc_type> handler,
305  args_type&& args) mutable {
306  auto typed_args = rpc_type::template extract_args<value_args>(
307  std::move(args), args_names);
308  if (!typed_args) {
309  PACKIO_DEBUG("incompatible arguments");
310  handler.set_error("Incompatible arguments");
311  return;
312  }
313 
314  net::co_spawn(
315  executor,
316  [typed_args = std::move(*typed_args),
317  handler = std::move(handler),
318  coro = std::forward<C>(coro)]() mutable -> net::awaitable<void> {
319  if constexpr (std::is_void_v<result_type>) {
320  co_await std::apply(coro, std::move(typed_args));
321  handler();
322  }
323  else {
324  handler(co_await std::apply(coro, std::move(typed_args)));
325  }
326  },
327  [](std::exception_ptr exc) {
328  if (exc) {
329  std::rethrow_exception(exc);
330  }
331  });
332  };
333  }
334 #endif // defined(PACKIO_HAS_CO_AWAIT)
335 
336  mutable mutex_type map_mutex_;
337  function_map_type function_map_;
338 };
339 
340 } // packio
341 
342 #endif // PACKIO_DISPATCHER_H
packio::dispatcher::function_type
internal::movable_function< void(completion_handler< rpc_type >, args_type &&args)> function_type
The type of function stored in the dispatcher.
Definition: dispatcher.h:42
packio::dispatcher::has
bool has(const std::string &name) const
Check if a procedure is registered.
Definition: dispatcher.h:178
packio::dispatcher::rpc_type
Rpc rpc_type
The RPC protocol type.
Definition: dispatcher.h:35
handler.h
packio::dispatcher::add_coro
bool add_coro(std::string_view name, ExecutionContext &ctx, CoroProcedure &&coro)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:159
packio::dispatcher
The dispatcher class, used to store and dispatch procedures.
Definition: dispatcher.h:32
packio::completion_handler
The completion_handler class.
Definition: handler.h:27
packio::dispatcher::add
bool add(std::string_view name, const std::array< std::string, N > &arguments_names, SyncProcedure &&fct)
Add a synchronous procedure to the dispatcher.
Definition: dispatcher.h:53
packio::dispatcher::add_coro
bool add_coro(std::string_view name, ExecutionContext &ctx, const std::array< std::string, N > &arguments_names, CoroProcedure &&coro)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:144
packio::dispatcher::add
bool add(std::string_view name, SyncProcedure &&fct)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:70
packio::dispatcher::function_ptr_type
std::shared_ptr< function_type > function_ptr_type
A shared pointer to function_type.
Definition: dispatcher.h:44
packio
Definition: arg.h:14
packio::dispatcher::add_async
bool add_async(std::string_view name, AsyncProcedure &&fct)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:99
packio::dispatcher::add_coro
bool add_coro(std::string_view name, const Executor &executor, CoroProcedure &&coro)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:133
packio::dispatcher::add_coro
bool add_coro(std::string_view name, const Executor &executor, const std::array< std::string, N > &arguments_names, CoroProcedure &&coro)
Add a coroutine to the dispatcher.
Definition: dispatcher.h:115
packio::dispatcher::clear
size_t clear()
Remove all procedures.
Definition: dispatcher.h:186
packio::dispatcher::remove
bool remove(const std::string &name)
Remove a procedure from the dispatcher.
Definition: dispatcher.h:169
packio::dispatcher::args_type
decltype(typename rpc_type::request_type{}.args) args_type
The type of the arguments used by the RPC protocol.
Definition: dispatcher.h:39
packio::dispatcher::mutex_type
Lockable mutex_type
The mutex type used to protect the procedure map.
Definition: dispatcher.h:37
traits.h
packio::dispatcher::add_async
bool add_async(std::string_view name, const std::array< std::string, N > &arguments_names, AsyncProcedure &&fct)
Add an asynchronous procedure to the dispatcher.
Definition: dispatcher.h:82
packio::dispatcher::known
std::vector< std::string > known() const
Get the name of all known procedures.
Definition: dispatcher.h:196