5 #ifndef PACKIO_SERVER_SESSION_H
6 #define PACKIO_SERVER_SESSION_H
15 #include "internal/config.h"
16 #include "internal/log.h"
17 #include "internal/manual_strand.h"
18 #include "internal/rpc.h"
19 #include "internal/utils.h"
24 template <
typename Rpc,
typename Socket,
typename Dispatcher>
26 :
public std::enable_shared_from_this<server_session<Rpc, Socket, Dispatcher>> {
30 typename socket_type::protocol_type;
32 typename socket_type::executor_type;
33 using std::enable_shared_from_this<server_session<Rpc, Socket, Dispatcher>>::shared_from_this;
39 : socket_{std::move(sock)},
40 dispatcher_ptr_{std::move(dispatcher_ptr)},
41 wstrand_{socket_.get_executor()}
56 buffer_reserve_size_ = size;
61 return buffer_reserve_size_;
65 void start() { this->async_read(parser_type{}); }
68 using parser_type =
typename Rpc::incremental_parser_type;
69 using request_type =
typename Rpc::request_type;
71 void async_read(parser_type&& parser)
74 if (!socket_.is_open()) {
78 parser.reserve_buffer(buffer_reserve_size_);
79 auto buffer = net::buffer(parser.buffer(), parser.buffer_capacity());
80 socket_.async_read_some(
82 [
self = shared_from_this(), parser = std::move(parser)](
83 error_code ec,
size_t length)
mutable {
85 PACKIO_WARN(
"read error: {}", ec.message());
86 self->close_connection();
90 PACKIO_TRACE(
"read: {}", length);
91 parser.buffer_consumed(length);
93 while (
auto request = parser.get_request()) {
95 self->close_connection();
103 self->get_executor(),
104 [
self, request = std::move(*request)]()
mutable {
105 self->async_handle_request(std::move(request));
109 self->async_read(std::move(parser));
113 void async_handle_request(request_type&& request)
115 completion_handler<Rpc> handler(
117 [type = request.type,
id = request.id,
self = shared_from_this()](
118 auto&& response_buffer) {
119 if (type == call_type::request) {
120 PACKIO_TRACE(
"result (id={})", Rpc::format_id(
id));
122 self->async_send_response(std::move(response_buffer));
126 const auto function = dispatcher_ptr_->get(request.method);
129 "call: {} (id={})", request.method, Rpc::format_id(request.id));
130 (*function)(std::move(handler), std::move(request.args));
133 PACKIO_DEBUG(
"unknown function {}", request.method);
134 handler.set_error(
"Unknown function");
138 template <
typename Buffer>
139 void async_send_response(Buffer&& response_buffer)
142 if (!socket_.is_open()) {
146 auto message_ptr = internal::to_unique_ptr(std::move(response_buffer));
149 self = shared_from_this(),
150 message_ptr = std::move(message_ptr)]()
mutable {
151 auto buf = Rpc::buffer(*message_ptr);
155 [
self = std::move(
self), message_ptr = std::move(message_ptr)](
156 error_code ec,
size_t length) {
157 self->wstrand_.next();
160 PACKIO_WARN(
"write error: {}", ec.message());
161 self->close_connection();
165 PACKIO_TRACE(
"write: {}", length);
171 void close_connection()
176 PACKIO_WARN(
"close error: {}", ec.message());
182 std::shared_ptr<Dispatcher> dispatcher_ptr_;
183 internal::manual_strand<typename socket_type::executor_type> wstrand_;
188 #endif // PACKIO_SERVER_SESSION_H