packio
server_session.h
Go to the documentation of this file.
1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 #ifndef PACKIO_SERVER_SESSION_H
6 #define PACKIO_SERVER_SESSION_H
7 
10 
11 #include <memory>
12 #include <queue>
13 
14 #include "handler.h"
15 #include "internal/config.h"
16 #include "internal/log.h"
17 #include "internal/manual_strand.h"
18 #include "internal/rpc.h"
19 #include "internal/utils.h"
20 
21 namespace packio {
22 
24 template <typename Rpc, typename Socket, typename Dispatcher>
26  : public std::enable_shared_from_this<server_session<Rpc, Socket, Dispatcher>> {
27 public:
28  using socket_type = Socket;
29  using protocol_type =
30  typename socket_type::protocol_type;
31  using executor_type =
32  typename socket_type::executor_type;
33  using std::enable_shared_from_this<server_session<Rpc, Socket, Dispatcher>>::shared_from_this;
34 
36  static constexpr size_t kDefaultBufferReserveSize = 4096;
37 
38  server_session(socket_type sock, std::shared_ptr<Dispatcher> dispatcher_ptr)
39  : socket_{std::move(sock)},
40  dispatcher_ptr_{std::move(dispatcher_ptr)},
41  wstrand_{socket_.get_executor()}
42  {
43  }
44 
46  socket_type& socket() { return socket_; }
48  const socket_type& socket() const { return socket_; }
49 
51  executor_type get_executor() { return socket().get_executor(); }
52 
54  void set_buffer_reserve_size(std::size_t size) noexcept
55  {
56  buffer_reserve_size_ = size;
57  }
59  std::size_t get_buffer_reserve_size() const noexcept
60  {
61  return buffer_reserve_size_;
62  }
63 
65  void start() { this->async_read(parser_type{}); }
66 
67 private:
68  using parser_type = typename Rpc::incremental_parser_type;
69  using request_type = typename Rpc::request_type;
70 
71  void async_read(parser_type&& parser)
72  {
73  // abort R/W on error
74  if (!socket_.is_open()) {
75  return;
76  }
77 
78  parser.reserve_buffer(buffer_reserve_size_);
79  auto buffer = net::buffer(parser.buffer(), parser.buffer_capacity());
80  socket_.async_read_some(
81  buffer,
82  [self = shared_from_this(), parser = std::move(parser)](
83  error_code ec, size_t length) mutable {
84  if (ec) {
85  PACKIO_WARN("read error: {}", ec.message());
86  self->close_connection();
87  return;
88  }
89 
90  PACKIO_TRACE("read: {}", length);
91  parser.buffer_consumed(length);
92 
93  while (auto request = parser.get_request()) {
94  if (!request) {
95  self->close_connection();
96  continue;
97  }
98  // handle the call asynchronously (post)
99  // to schedule the next read immediately
100  // this will allow parallel call handling
101  // in multi-threaded environments
102  net::post(
103  self->get_executor(),
104  [self, request = std::move(*request)]() mutable {
105  self->async_handle_request(std::move(request));
106  });
107  }
108 
109  self->async_read(std::move(parser));
110  });
111  }
112 
113  void async_handle_request(request_type&& request)
114  {
115  completion_handler<Rpc> handler(
116  request.id,
117  [type = request.type, id = request.id, self = shared_from_this()](
118  auto&& response_buffer) {
119  if (type == call_type::request) {
120  PACKIO_TRACE("result (id={})", Rpc::format_id(id));
121  (void)id;
122  self->async_send_response(std::move(response_buffer));
123  }
124  });
125 
126  const auto function = dispatcher_ptr_->get(request.method);
127  if (function) {
128  PACKIO_TRACE(
129  "call: {} (id={})", request.method, Rpc::format_id(request.id));
130  (*function)(std::move(handler), std::move(request.args));
131  }
132  else {
133  PACKIO_DEBUG("unknown function {}", request.method);
134  handler.set_error("Unknown function");
135  }
136  }
137 
138  template <typename Buffer>
139  void async_send_response(Buffer&& response_buffer)
140  {
141  // abort R/W on error
142  if (!socket_.is_open()) {
143  return;
144  }
145 
146  auto message_ptr = internal::to_unique_ptr(std::move(response_buffer));
147 
148  wstrand_.push([this,
149  self = shared_from_this(),
150  message_ptr = std::move(message_ptr)]() mutable {
151  auto buf = Rpc::buffer(*message_ptr);
152  net::async_write(
153  socket_,
154  buf,
155  [self = std::move(self), message_ptr = std::move(message_ptr)](
156  error_code ec, size_t length) {
157  self->wstrand_.next();
158 
159  if (ec) {
160  PACKIO_WARN("write error: {}", ec.message());
161  self->close_connection();
162  return;
163  }
164 
165  PACKIO_TRACE("write: {}", length);
166  (void)length;
167  });
168  });
169  }
170 
171  void close_connection()
172  {
173  error_code ec;
174  socket_.close(ec);
175  if (ec) {
176  PACKIO_WARN("close error: {}", ec.message());
177  }
178  }
179 
180  socket_type socket_;
181  std::size_t buffer_reserve_size_{kDefaultBufferReserveSize};
182  std::shared_ptr<Dispatcher> dispatcher_ptr_;
183  internal::manual_strand<typename socket_type::executor_type> wstrand_;
184 };
185 
186 } // packio
187 
188 #endif // PACKIO_SERVER_SESSION_H
handler.h
packio::server_session::get_executor
executor_type get_executor()
Get the executor associated with the object.
Definition: server_session.h:51
packio::server_session::socket_type
Socket socket_type
The socket type.
Definition: server_session.h:28
packio::server_session::protocol_type
typename socket_type::protocol_type protocol_type
The protocol type.
Definition: server_session.h:30
packio::server_session::kDefaultBufferReserveSize
static constexpr size_t kDefaultBufferReserveSize
The default size reserved by the reception buffer.
Definition: server_session.h:36
packio::server_session::set_buffer_reserve_size
void set_buffer_reserve_size(std::size_t size) noexcept
Set the size reserved by the reception buffer.
Definition: server_session.h:54
packio::server_session::start
void start()
Start the session.
Definition: server_session.h:65
packio::server_session
The server_session class, created by the server.
Definition: server_session.h:25
packio
Definition: arg.h:14
packio::server_session::socket
const socket_type & socket() const
Get the underlying socket, const.
Definition: server_session.h:48
packio::server_session::socket
socket_type & socket()
Get the underlying socket.
Definition: server_session.h:46
packio::server_session::executor_type
typename socket_type::executor_type executor_type
The executor type.
Definition: server_session.h:32
packio::server_session::get_buffer_reserve_size
std::size_t get_buffer_reserve_size() const noexcept
Get the size reserved by the reception buffer.
Definition: server_session.h:59