5 #ifndef PACKIO_CLIENT_H
6 #define PACKIO_CLIENT_H
16 #include <string_view>
17 #include <type_traits>
19 #include "internal/config.h"
20 #include "internal/manual_strand.h"
21 #include "internal/movable_function.h"
22 #include "internal/rpc.h"
23 #include "internal/utils.h"
32 template <
typename Rpc,
typename Socket,
template <
class...>
class Map = default_map>
33 class client :
public std::enable_shared_from_this<client<Rpc, Socket, Map>> {
38 using id_type =
typename rpc_type::id_type;
47 using std::enable_shared_from_this<client<Rpc, Socket, Map>>::shared_from_this;
55 : socket_{std::move(
socket)},
56 wstrand_{socket_.get_executor()},
57 call_strand_{socket_.get_executor()}
70 buffer_reserve_size_ = size;
75 return buffer_reserve_size_;
87 net::dispatch(call_strand_, [
self = shared_from_this(),
id] {
88 auto ec = make_error_code(net::error::operation_aborted);
89 self->async_call_handler(
id, ec, {});
98 net::dispatch(call_strand_, [
self = shared_from_this()] {
99 self->cancel_all_calls();
112 PACKIO_COMPLETION_TOKEN_FOR(
void(error_code))
113 NotifyHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(
executor_type),
115 typename = std::enable_if_t<internal::is_tuple_v<ArgsTuple>>>
117 std::string_view name,
119 NotifyHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(
executor_type))
121 return net::async_initiate<NotifyHandler, void(error_code)>(
122 initiate_async_notify(
this),
125 std::forward<ArgsTuple>(args));
130 PACKIO_COMPLETION_TOKEN_FOR(
void(error_code))
131 NotifyHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(
executor_type),
132 typename = std::enable_if_t<!internal::is_tuple_v<NotifyHandler>>>
134 std::string_view name,
135 NotifyHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(
executor_type))
138 name, std::tuple{}, std::forward<NotifyHandler>(handler));
150 CallHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(
executor_type),
152 typename = std::enable_if_t<internal::is_tuple_v<ArgsTuple>>>
154 std::string_view name,
156 CallHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(
executor_type),
157 std::optional<std::reference_wrapper<id_type>> call_id = std::nullopt)
159 return net::async_initiate<CallHandler, void(error_code, response_type)>(
160 initiate_async_call(
this),
163 std::forward<ArgsTuple>(args),
170 CallHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(
executor_type),
171 typename = std::enable_if_t<!internal::is_tuple_v<CallHandler>>>
173 std::string_view name,
174 CallHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(
executor_type),
175 std::optional<std::reference_wrapper<id_type>> call_id = std::nullopt)
178 name, std::tuple{}, std::forward<CallHandler>(handler), call_id);
182 using parser_type =
typename rpc_type::incremental_parser_type;
183 using async_call_handler_type =
186 void cancel_all_calls()
188 assert(call_strand_.running_in_this_thread());
189 auto ec = make_error_code(net::error::operation_aborted);
190 while (!pending_.empty()) {
191 async_call_handler(pending_.begin()->first, ec, {});
195 void maybe_stop_reading()
197 assert(call_strand_.running_in_this_thread());
198 if (reading_ && pending_.empty()) {
199 PACKIO_DEBUG(
"stop reading");
203 PACKIO_WARN(
"cancel failed: {}", ec.message());
208 template <
typename Buffer,
typename WriteHandler>
209 void async_send(std::unique_ptr<Buffer>&& buffer_ptr, WriteHandler&& handler)
211 wstrand_.push([
self = shared_from_this(),
212 buffer_ptr = std::move(buffer_ptr),
213 handler = std::forward<WriteHandler>(handler)]()
mutable {
214 internal::set_no_delay(self->socket_);
216 auto buf = rpc_type::buffer(*buffer_ptr);
221 buffer_ptr = std::move(buffer_ptr),
222 handler = std::forward<WriteHandler>(handler)](
223 error_code ec,
size_t length)
mutable {
224 self->wstrand_.next();
230 void async_read(parser_type&& parser)
232 parser.reserve_buffer(buffer_reserve_size_);
233 auto buffer = net::buffer(parser.buffer(), parser.buffer_capacity());
235 assert(call_strand_.running_in_this_thread());
237 PACKIO_TRACE(
"reading ... {} call(s) pending", pending_.size());
238 socket_.async_read_some(
242 [
self = shared_from_this(), parser = std::move(parser)](
243 error_code ec,
size_t length)
mutable {
245 assert(self->call_strand_.running_in_this_thread());
248 PACKIO_WARN(
"read error: {}", ec.message());
249 self->reading_ =
false;
252 self->cancel_all_calls();
256 PACKIO_TRACE(
"read: {}", length);
257 parser.buffer_consumed(length);
259 while (
auto response = parser.get_response()) {
261 PACKIO_ERROR(
"bad response");
264 self->async_call_handler(std::move(*response));
267 if (self->pending_.empty()) {
268 PACKIO_TRACE(
"done reading, no more pending calls");
269 self->reading_ =
false;
273 self->async_read(std::move(parser));
279 auto id = response.id;
280 return async_call_handler(
id, {}, std::move(response));
289 self = shared_from_this(),
290 response = std::move(response)]()
mutable {
292 "calling handler for id: {}", rpc_type::format_id(
id));
294 assert(call_strand_.running_in_this_thread());
295 auto it =
self->pending_.find(
id);
296 if (it == self->pending_.end()) {
297 PACKIO_WARN(
"unexisting id: {}", rpc_type::format_id(
id));
301 auto handler = std::move(it->second);
302 self->pending_.erase(it);
303 self->maybe_stop_reading();
310 self->socket_.get_executor(),
312 handler = std::move(handler),
313 response = std::move(response)]()
mutable {
314 handler(ec, std::move(response));
319 class initiate_async_notify {
323 explicit initiate_async_notify(
client*
self) : self_(self) {}
327 return self_->get_executor();
330 template <
typename NotifyHandler,
typename ArgsTuple>
332 NotifyHandler&& handler,
333 std::string_view name,
334 ArgsTuple&& args)
const
336 PACKIO_STATIC_ASSERT_TRAIT(NotifyHandler);
337 PACKIO_DEBUG(
"async_notify: {}", name);
339 auto packer_buf = internal::to_unique_ptr(std::apply(
340 [&name](
auto&&... args) {
341 return rpc_type::serialize_notification(
342 name, std::forward<decltype(args)>(args)...);
344 std::forward<ArgsTuple>(args))
348 std::move(packer_buf),
349 [handler = std::forward<NotifyHandler>(handler)](
350 error_code ec, std::size_t length)
mutable {
352 PACKIO_WARN(
"write error: {}", ec.message());
355 PACKIO_TRACE(
"write: {}", length);
367 class initiate_async_call {
371 explicit initiate_async_call(
client*
self) : self_(self) {}
375 return self_->get_executor();
378 template <
typename CallHandler,
typename ArgsTuple>
380 CallHandler&& handler,
381 std::string_view name,
383 std::optional<std::reference_wrapper<id_type>> opt_call_id)
const
385 PACKIO_STATIC_ASSERT_TTRAIT(CallHandler,
rpc_type);
386 PACKIO_DEBUG(
"async_call: {}", name);
388 id_type call_id = self_->id_.fetch_add(1, std::memory_order_acq_rel);
390 opt_call_id->get() = call_id;
393 auto packer_buf = internal::to_unique_ptr(std::apply(
394 [&name, &call_id](
auto&&... args) {
395 return rpc_type::serialize_request(
396 call_id, name, std::forward<decltype(args)>(args)...);
398 std::forward<ArgsTuple>(args)));
402 [
self = self_->shared_from_this(),
404 handler = std::forward<CallHandler>(handler),
405 packer_buf = std::move(packer_buf)]()
mutable {
408 assert(self->call_strand_.running_in_this_thread());
409 self->pending_.try_emplace(call_id, std::move(handler));
412 if (!self->reading_) {
413 PACKIO_DEBUG(
"start reading");
414 self->async_read(parser_type{});
419 std::move(packer_buf),
420 [
self = std::move(
self), call_id](
421 error_code ec, std::size_t length)
mutable {
423 PACKIO_WARN(
"write error: {}", ec.message());
424 self->async_call_handler(call_id, ec, {});
427 PACKIO_TRACE(
"write: {}", length);
439 std::size_t buffer_reserve_size_{kDefaultBufferReserveSize};
440 std::atomic<uint64_t> id_{0};
442 internal::manual_strand<executor_type> wstrand_;
444 net::strand<executor_type> call_strand_;
445 Map<id_type, async_call_handler_type> pending_;
446 bool reading_{
false};
453 template <
typename Rpc,
typename Socket,
template <
class...>
class Map = default_map>
456 return std::make_shared<client<Rpc, Socket, Map>>(
457 std::forward<Socket>(socket));
462 #endif // PACKIO_CLIENT_H