packio
client.h
Go to the documentation of this file.
1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 #ifndef PACKIO_CLIENT_H
6 #define PACKIO_CLIENT_H
7 
10 
11 #include <atomic>
12 #include <chrono>
13 #include <functional>
14 #include <memory>
15 #include <queue>
16 #include <string_view>
17 #include <type_traits>
18 
19 #include "internal/config.h"
20 #include "internal/manual_strand.h"
21 #include "internal/movable_function.h"
22 #include "internal/rpc.h"
23 #include "internal/utils.h"
24 #include "traits.h"
25 
26 namespace packio {
27 
32 template <typename Rpc, typename Socket, template <class...> class Map = default_map>
33 class client : public std::enable_shared_from_this<client<Rpc, Socket, Map>> {
34 public:
36  using rpc_type = Rpc;
38  using id_type = typename rpc_type::id_type;
40  using response_type = typename rpc_type::response_type;
42  using socket_type = Socket;
44  using protocol_type = typename socket_type::protocol_type;
46  using executor_type = typename socket_type::executor_type;
47  using std::enable_shared_from_this<client<Rpc, Socket, Map>>::shared_from_this;
48 
50  static constexpr size_t kDefaultBufferReserveSize = 4096;
51 
55  : socket_{std::move(socket)},
56  wstrand_{socket_.get_executor()},
57  call_strand_{socket_.get_executor()}
58  {
59  }
60 
62  socket_type& socket() noexcept { return socket_; }
63 
65  const socket_type& socket() const noexcept { return socket_; }
66 
68  void set_buffer_reserve_size(std::size_t size) noexcept
69  {
70  buffer_reserve_size_ = size;
71  }
73  std::size_t get_buffer_reserve_size() const noexcept
74  {
75  return buffer_reserve_size_;
76  }
77 
79  executor_type get_executor() { return socket().get_executor(); }
80 
85  void cancel(id_type id)
86  {
87  net::dispatch(call_strand_, [self = shared_from_this(), id] {
88  auto ec = make_error_code(net::error::operation_aborted);
89  self->async_call_handler(id, ec, {});
90  });
91  }
92 
96  void cancel()
97  {
98  net::dispatch(call_strand_, [self = shared_from_this()] {
99  self->cancel_all_calls();
100  });
101  }
102 
111  template <
112  PACKIO_COMPLETION_TOKEN_FOR(void(error_code))
113  NotifyHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type),
114  typename ArgsTuple,
115  typename = std::enable_if_t<internal::is_tuple_v<ArgsTuple>>>
117  std::string_view name,
118  ArgsTuple&& args,
119  NotifyHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(executor_type))
120  {
121  return net::async_initiate<NotifyHandler, void(error_code)>(
122  initiate_async_notify(this),
123  handler,
124  name,
125  std::forward<ArgsTuple>(args));
126  }
127 
129  template <
130  PACKIO_COMPLETION_TOKEN_FOR(void(error_code))
131  NotifyHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type),
132  typename = std::enable_if_t<!internal::is_tuple_v<NotifyHandler>>>
134  std::string_view name,
135  NotifyHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(executor_type))
136  {
137  return async_notify(
138  name, std::tuple{}, std::forward<NotifyHandler>(handler));
139  }
140 
148  template <
149  PACKIO_COMPLETION_TOKEN_FOR(void(error_code, response_type))
150  CallHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type),
151  typename ArgsTuple,
152  typename = std::enable_if_t<internal::is_tuple_v<ArgsTuple>>>
154  std::string_view name,
155  ArgsTuple&& args,
156  CallHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(executor_type),
157  std::optional<std::reference_wrapper<id_type>> call_id = std::nullopt)
158  {
159  return net::async_initiate<CallHandler, void(error_code, response_type)>(
160  initiate_async_call(this),
161  handler,
162  name,
163  std::forward<ArgsTuple>(args),
164  call_id);
165  }
166 
168  template <
169  PACKIO_COMPLETION_TOKEN_FOR(void(error_code, response_type))
170  CallHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type),
171  typename = std::enable_if_t<!internal::is_tuple_v<CallHandler>>>
173  std::string_view name,
174  CallHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(executor_type),
175  std::optional<std::reference_wrapper<id_type>> call_id = std::nullopt)
176  {
177  return async_call(
178  name, std::tuple{}, std::forward<CallHandler>(handler), call_id);
179  }
180 
181 private:
182  using parser_type = typename rpc_type::incremental_parser_type;
183  using async_call_handler_type =
184  internal::movable_function<void(error_code, response_type)>;
185 
186  void cancel_all_calls()
187  {
188  assert(call_strand_.running_in_this_thread());
189  auto ec = make_error_code(net::error::operation_aborted);
190  while (!pending_.empty()) {
191  async_call_handler(pending_.begin()->first, ec, {});
192  }
193  }
194 
195  void maybe_stop_reading()
196  {
197  assert(call_strand_.running_in_this_thread());
198  if (reading_ && pending_.empty()) {
199  PACKIO_DEBUG("stop reading");
200  error_code ec;
201  socket_.cancel(ec);
202  if (ec) {
203  PACKIO_WARN("cancel failed: {}", ec.message());
204  }
205  }
206  }
207 
208  template <typename Buffer, typename WriteHandler>
209  void async_send(std::unique_ptr<Buffer>&& buffer_ptr, WriteHandler&& handler)
210  {
211  wstrand_.push([self = shared_from_this(),
212  buffer_ptr = std::move(buffer_ptr),
213  handler = std::forward<WriteHandler>(handler)]() mutable {
214  internal::set_no_delay(self->socket_);
215 
216  auto buf = rpc_type::buffer(*buffer_ptr);
217  net::async_write(
218  self->socket_,
219  buf,
220  [self,
221  buffer_ptr = std::move(buffer_ptr),
222  handler = std::forward<WriteHandler>(handler)](
223  error_code ec, size_t length) mutable {
224  self->wstrand_.next();
225  handler(ec, length);
226  });
227  });
228  }
229 
230  void async_read(parser_type&& parser)
231  {
232  parser.reserve_buffer(buffer_reserve_size_);
233  auto buffer = net::buffer(parser.buffer(), parser.buffer_capacity());
234 
235  assert(call_strand_.running_in_this_thread());
236  reading_ = true;
237  PACKIO_TRACE("reading ... {} call(s) pending", pending_.size());
238  socket_.async_read_some(
239  buffer,
240  net::bind_executor(
241  call_strand_,
242  [self = shared_from_this(), parser = std::move(parser)](
243  error_code ec, size_t length) mutable {
244  // stop if there is an error or there is no more pending calls
245  assert(self->call_strand_.running_in_this_thread());
246 
247  if (ec) {
248  PACKIO_WARN("read error: {}", ec.message());
249  self->reading_ = false;
250 
251  // cancel all pending calls
252  self->cancel_all_calls();
253  return;
254  }
255 
256  PACKIO_TRACE("read: {}", length);
257  parser.buffer_consumed(length);
258 
259  while (auto response = parser.get_response()) {
260  if (!response) {
261  PACKIO_ERROR("bad response");
262  continue;
263  }
264  self->async_call_handler(std::move(*response));
265  }
266 
267  if (self->pending_.empty()) {
268  PACKIO_TRACE("done reading, no more pending calls");
269  self->reading_ = false;
270  return;
271  }
272 
273  self->async_read(std::move(parser));
274  }));
275  }
276 
277  void async_call_handler(response_type&& response)
278  {
279  auto id = response.id;
280  return async_call_handler(id, {}, std::move(response));
281  }
282 
283  void async_call_handler(id_type id, error_code ec, response_type&& response)
284  {
285  net::dispatch(
286  call_strand_,
287  [ec,
288  id,
289  self = shared_from_this(),
290  response = std::move(response)]() mutable {
291  PACKIO_DEBUG(
292  "calling handler for id: {}", rpc_type::format_id(id));
293 
294  assert(call_strand_.running_in_this_thread());
295  auto it = self->pending_.find(id);
296  if (it == self->pending_.end()) {
297  PACKIO_WARN("unexisting id: {}", rpc_type::format_id(id));
298  return;
299  }
300 
301  auto handler = std::move(it->second);
302  self->pending_.erase(it);
303  self->maybe_stop_reading();
304 
305  // handle the response asynchronously (post)
306  // to schedule the next read immediately
307  // this will allow parallel response handling
308  // in multi-threaded environments
309  net::post(
310  self->socket_.get_executor(),
311  [ec,
312  handler = std::move(handler),
313  response = std::move(response)]() mutable {
314  handler(ec, std::move(response));
315  });
316  });
317  }
318 
319  class initiate_async_notify {
320  public:
321  using executor_type = typename client::executor_type;
322 
323  explicit initiate_async_notify(client* self) : self_(self) {}
324 
325  executor_type get_executor() const noexcept
326  {
327  return self_->get_executor();
328  }
329 
330  template <typename NotifyHandler, typename ArgsTuple>
331  void operator()(
332  NotifyHandler&& handler,
333  std::string_view name,
334  ArgsTuple&& args) const
335  {
336  PACKIO_STATIC_ASSERT_TRAIT(NotifyHandler);
337  PACKIO_DEBUG("async_notify: {}", name);
338 
339  auto packer_buf = internal::to_unique_ptr(std::apply(
340  [&name](auto&&... args) {
341  return rpc_type::serialize_notification(
342  name, std::forward<decltype(args)>(args)...);
343  },
344  std::forward<ArgsTuple>(args))
345 
346  );
347  self_->async_send(
348  std::move(packer_buf),
349  [handler = std::forward<NotifyHandler>(handler)](
350  error_code ec, std::size_t length) mutable {
351  if (ec) {
352  PACKIO_WARN("write error: {}", ec.message());
353  }
354  else {
355  PACKIO_TRACE("write: {}", length);
356  (void)length;
357  }
358 
359  handler(ec);
360  });
361  }
362 
363  private:
364  client* self_;
365  };
366 
367  class initiate_async_call {
368  public:
369  using executor_type = typename client::executor_type;
370 
371  explicit initiate_async_call(client* self) : self_(self) {}
372 
373  executor_type get_executor() const noexcept
374  {
375  return self_->get_executor();
376  }
377 
378  template <typename CallHandler, typename ArgsTuple>
379  void operator()(
380  CallHandler&& handler,
381  std::string_view name,
382  ArgsTuple&& args,
383  std::optional<std::reference_wrapper<id_type>> opt_call_id) const
384  {
385  PACKIO_STATIC_ASSERT_TTRAIT(CallHandler, rpc_type);
386  PACKIO_DEBUG("async_call: {}", name);
387 
388  id_type call_id = self_->id_.fetch_add(1, std::memory_order_acq_rel);
389  if (opt_call_id) {
390  opt_call_id->get() = call_id;
391  }
392 
393  auto packer_buf = internal::to_unique_ptr(std::apply(
394  [&name, &call_id](auto&&... args) {
395  return rpc_type::serialize_request(
396  call_id, name, std::forward<decltype(args)>(args)...);
397  },
398  std::forward<ArgsTuple>(args)));
399 
400  net::dispatch(
401  self_->call_strand_,
402  [self = self_->shared_from_this(),
403  call_id,
404  handler = std::forward<CallHandler>(handler),
405  packer_buf = std::move(packer_buf)]() mutable {
406  // we must emplace the id and handler before sending data
407  // otherwise we might drop a fast response
408  assert(self->call_strand_.running_in_this_thread());
409  self->pending_.try_emplace(call_id, std::move(handler));
410 
411  // if we are not reading, start the read operation
412  if (!self->reading_) {
413  PACKIO_DEBUG("start reading");
414  self->async_read(parser_type{});
415  }
416 
417  // send the request buffer
418  self->async_send(
419  std::move(packer_buf),
420  [self = std::move(self), call_id](
421  error_code ec, std::size_t length) mutable {
422  if (ec) {
423  PACKIO_WARN("write error: {}", ec.message());
424  self->async_call_handler(call_id, ec, {});
425  }
426  else {
427  PACKIO_TRACE("write: {}", length);
428  (void)length;
429  }
430  });
431  });
432  }
433 
434  private:
435  client* self_;
436  };
437 
438  socket_type socket_;
439  std::size_t buffer_reserve_size_{kDefaultBufferReserveSize};
440  std::atomic<uint64_t> id_{0};
441 
442  internal::manual_strand<executor_type> wstrand_;
443 
444  net::strand<executor_type> call_strand_;
445  Map<id_type, async_call_handler_type> pending_;
446  bool reading_{false};
447 };
448 
453 template <typename Rpc, typename Socket, template <class...> class Map = default_map>
454 auto make_client(Socket&& socket)
455 {
456  return std::make_shared<client<Rpc, Socket, Map>>(
457  std::forward<Socket>(socket));
458 }
459 
460 } // packio
461 
462 #endif // PACKIO_CLIENT_H
packio::client::socket_type
Socket socket_type
The socket type.
Definition: client.h:42
packio::msgpack_rpc::client
::packio::client< rpc, Socket, Map > client
The client for msgpack-RPC.
Definition: msgpack_rpc.h:29
packio::client
The client class.
Definition: client.h:33
packio::client::socket
const socket_type & socket() const noexcept
Get the underlying socket, const.
Definition: client.h:65
packio::client::executor_type
typename socket_type::executor_type executor_type
The executor type.
Definition: client.h:46
packio::client::set_buffer_reserve_size
void set_buffer_reserve_size(std::size_t size) noexcept
Set the size reserved by the reception buffer.
Definition: client.h:68
packio::client::async_call
auto async_call(std::string_view name, ArgsTuple &&args, CallHandler &&handler=typename net::default_completion_token< executor_type >::type(), std::optional< std::reference_wrapper< id_type >> call_id=std::nullopt)
Call a remote procedure.
Definition: client.h:153
packio::client::id_type
typename rpc_type::id_type id_type
The call ID type.
Definition: client.h:38
packio::client::protocol_type
typename socket_type::protocol_type protocol_type
The protocol type.
Definition: client.h:44
packio::client::async_call
auto async_call(std::string_view name, CallHandler &&handler=typename net::default_completion_token< executor_type >::type(), std::optional< std::reference_wrapper< id_type >> call_id=std::nullopt)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: client.h:172
packio::client::get_buffer_reserve_size
std::size_t get_buffer_reserve_size() const noexcept
Get the size reserved by the reception buffer.
Definition: client.h:73
packio
Definition: arg.h:14
packio::client::socket
socket_type & socket() noexcept
Get the underlying socket.
Definition: client.h:62
packio::client::async_notify
auto async_notify(std::string_view name, NotifyHandler &&handler=typename net::default_completion_token< executor_type >::type())
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: client.h:133
packio::client::kDefaultBufferReserveSize
static constexpr size_t kDefaultBufferReserveSize
The default size reserved by the reception buffer.
Definition: client.h:50
packio::client::rpc_type
Rpc rpc_type
The RPC protocol type.
Definition: client.h:36
packio::client::cancel
void cancel(id_type id)
Cancel a pending call.
Definition: client.h:85
packio::client::async_notify
auto async_notify(std::string_view name, ArgsTuple &&args, NotifyHandler &&handler=typename net::default_completion_token< executor_type >::type())
Send a notify request to the server with argument.
Definition: client.h:116
packio::client::cancel
void cancel()
Cancel all pending calls.
Definition: client.h:96
packio::client::response_type
typename rpc_type::response_type response_type
The response of a RPC call.
Definition: client.h:40
traits.h
packio::client::get_executor
executor_type get_executor()
Get the executor associated with the object.
Definition: client.h:79
packio::make_client
auto make_client(Socket &&socket)
Create a client from a socket.
Definition: client.h:454
packio::client::client
client(socket_type socket)
The constructor.
Definition: client.h:54