packio
rpc.h
1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 #ifndef PACKIO_MSGPACK_RPC_RPC_H
6 #define PACKIO_MSGPACK_RPC_RPC_H
7 
8 #include <msgpack.hpp>
9 
10 #include "../arg.h"
11 #include "../internal/config.h"
12 #include "../internal/log.h"
13 #include "../internal/rpc.h"
14 
15 namespace packio {
16 namespace msgpack_rpc {
17 namespace internal {
18 
19 template <typename... Args>
20 constexpr bool positional_args_v = (!is_arg_v<Args> && ...);
21 
22 enum class msgpack_rpc_type { request = 0, response = 1, notification = 2 };
23 
24 using id_type = uint32_t;
25 using native_type = ::msgpack::object;
26 
28 struct request {
29  call_type type;
30  id_type id;
31  std::string method;
32  native_type args;
33 
34  std::unique_ptr<::msgpack::zone> zone;
35 };
36 
38 struct response {
39  id_type id;
40  native_type result;
41  native_type error;
42 
43  std::unique_ptr<::msgpack::zone> zone;
44 };
45 
48 public:
49  incremental_parser() : unpacker_{std::make_unique<::msgpack::unpacker>()} {}
50 
51  std::optional<request> get_request()
52  {
53  try_parse_object();
54  if (!parsed_) {
55  return std::nullopt;
56  }
57  auto object = std::move(*parsed_);
58  parsed_.reset();
59  return parse_request(std::move(object));
60  }
61 
62  std::optional<response> get_response()
63  {
64  try_parse_object();
65  if (!parsed_) {
66  return std::nullopt;
67  }
68  auto object = std::move(*parsed_);
69  parsed_.reset();
70  return parse_response(std::move(object));
71  }
72 
73  char* buffer() const
74  { //
75  return unpacker_->buffer();
76  }
77 
78  std::size_t buffer_capacity() const { return unpacker_->buffer_capacity(); }
79 
80  void buffer_consumed(std::size_t bytes)
81  {
82  unpacker_->buffer_consumed(bytes);
83  }
84 
85  void reserve_buffer(std::size_t bytes) { unpacker_->reserve_buffer(bytes); }
86 
87 private:
88  void try_parse_object()
89  {
90  if (parsed_) {
91  return;
92  }
93  ::msgpack::object_handle object;
94  if (unpacker_->next(object)) {
95  parsed_ = std::move(object);
96  }
97  }
98 
99  static std::optional<response> parse_response(::msgpack::object_handle&& res)
100  {
101  if (res->type != ::msgpack::type::ARRAY) {
102  PACKIO_ERROR("unexpected message type: {}", res->type);
103  return std::nullopt;
104  }
105  if (res->via.array.size != 4) {
106  PACKIO_ERROR("unexpected message size: {}", res->via.array.size);
107  return std::nullopt;
108  }
109  int type = res->via.array.ptr[0].as<int>();
110  if (type != static_cast<int>(msgpack_rpc_type::response)) {
111  PACKIO_ERROR("unexpected type: {}", type);
112  return std::nullopt;
113  }
114 
115  std::optional<response> parsed{std::in_place};
116  parsed->zone = std::move(res.zone());
117  const auto& array = res->via.array.ptr;
118 
119  parsed->id = array[1].as<id_type>();
120  if (array[2].type != ::msgpack::type::NIL) {
121  parsed->error = array[2];
122  }
123  else {
124  parsed->result = array[3];
125  }
126  return parsed;
127  }
128 
129  static std::optional<request> parse_request(::msgpack::object_handle&& req)
130  {
131  if (req->type != ::msgpack::type::ARRAY || req->via.array.size < 3) {
132  PACKIO_ERROR("unexpected message type: {}", req->type);
133  return std::nullopt;
134  }
135 
136  std::optional<request> parsed{std::in_place};
137  parsed->zone = std::move(req.zone());
138  const auto& array = req->via.array.ptr;
139  auto array_size = req->via.array.size;
140  ;
141 
142  try {
143  int idx = 0;
144  msgpack_rpc_type type = static_cast<msgpack_rpc_type>(
145  array[idx++].as<int>());
146 
147  std::size_t expected_size;
148  switch (type) {
149  case msgpack_rpc_type::request:
150  parsed->id = array[idx++].as<id_type>();
151  expected_size = 4;
152  parsed->type = call_type::request;
153  break;
154  case msgpack_rpc_type::notification:
155  expected_size = 3;
156  parsed->type = call_type::notification;
157  break;
158  default:
159  PACKIO_ERROR("unexpected type: {}", type);
160  return std::nullopt;
161  }
162 
163  if (array_size != expected_size) {
164  PACKIO_ERROR("unexpected message size: {}", array_size);
165  return std::nullopt;
166  }
167 
168  parsed->method = array[idx++].as<std::string>();
169  parsed->args = array[idx++];
170 
171  return parsed;
172  }
173  catch (::msgpack::type_error& exc) {
174  PACKIO_ERROR("unexpected message content: {}", exc.what());
175  (void)exc;
176  return std::nullopt;
177  }
178  }
179 
180  std::optional<::msgpack::object_handle> parsed_;
181  std::unique_ptr<::msgpack::unpacker> unpacker_;
182 };
183 
184 } // internal
185 
187 class rpc {
188 public:
190  using id_type = internal::id_type;
191 
193  using native_type = internal::native_type;
194 
197 
200 
203 
204  static std::string format_id(const id_type& id)
205  {
206  return std::to_string(id);
207  }
208 
209  template <typename... Args>
210  static auto serialize_notification(std::string_view method, Args&&... args)
211  -> std::enable_if_t<internal::positional_args_v<Args...>, ::msgpack::sbuffer>
212  {
213  ::msgpack::sbuffer buffer;
214  ::msgpack::pack(
215  buffer,
216  std::forward_as_tuple(
217  static_cast<int>(internal::msgpack_rpc_type::notification),
218  method,
219  std::forward_as_tuple(std::forward<Args>(args)...)));
220  return buffer;
221  }
222 
223  template <typename... Args>
224  static auto serialize_notification(std::string_view, Args&&...)
225  -> std::enable_if_t<!internal::positional_args_v<Args...>, ::msgpack::sbuffer>
226  {
227  static_assert(
228  internal::positional_args_v<Args...>,
229  "msgpack-RPC does not support named arguments");
230  }
231 
232  template <typename... Args>
233  static auto serialize_request(id_type id, std::string_view method, Args&&... args)
234  -> std::enable_if_t<internal::positional_args_v<Args...>, ::msgpack::sbuffer>
235  {
236  ::msgpack::sbuffer buffer;
237  ::msgpack::pack(
238  buffer,
239  std::forward_as_tuple(
240  static_cast<int>(internal::msgpack_rpc_type::request),
241  id,
242  method,
243  std::forward_as_tuple(std::forward<Args>(args)...)));
244  return buffer;
245  }
246 
247  template <typename... Args>
248  static auto serialize_request(id_type, std::string_view, Args&&...)
249  -> std::enable_if_t<!internal::positional_args_v<Args...>, ::msgpack::sbuffer>
250  {
251  static_assert(
252  internal::positional_args_v<Args...>,
253  "msgpack-RPC does not support named arguments");
254  }
255 
256  static ::msgpack::sbuffer serialize_response(id_type id)
257  {
258  return serialize_response(id, ::msgpack::object{});
259  }
260 
261  template <typename T>
262  static ::msgpack::sbuffer serialize_response(id_type id, T&& value)
263  {
264  ::msgpack::sbuffer buffer;
265  ::msgpack::pack(
266  buffer,
267  std::forward_as_tuple(
268  static_cast<int>(internal::msgpack_rpc_type::response),
269  id,
270  ::msgpack::object{},
271  std::forward<T>(value)));
272  return buffer;
273  }
274 
275  template <typename T>
276  static ::msgpack::sbuffer serialize_error_response(id_type id, T&& value)
277  {
278  ::msgpack::sbuffer buffer;
279  ::msgpack::pack(
280  buffer,
281  std::forward_as_tuple(
282  static_cast<int>(internal::msgpack_rpc_type::response),
283  id,
284  std::forward<T>(value),
285  ::msgpack::object{}));
286  return buffer;
287  }
288 
289  static net::const_buffer buffer(const ::msgpack::sbuffer& buf)
290  {
291  return net::const_buffer(buf.data(), buf.size());
292  }
293 
294  template <typename T, typename NamesContainer>
295  static std::optional<T> extract_args(
296  const ::msgpack::object& args,
297  const NamesContainer&)
298  {
299  if (args.type != ::msgpack::type::ARRAY) {
300  PACKIO_ERROR("arguments is not an array");
301  return std::nullopt;
302  }
303 
304  if (args.via.array.size != std::tuple_size_v<T>) {
305  // keep this check otherwise msgpack unpacker
306  // may silently drop arguments
307  return std::nullopt;
308  }
309 
310  try {
311  return args.as<T>();
312  }
313  catch (::msgpack::type_error&) {
314  return std::nullopt;
315  }
316  }
317 };
318 
319 } // msgpack_rpc
320 } // packio
321 
322 #endif // PACKIO_MSGPACK_RPC_RPC_H
packio::msgpack_rpc::rpc::native_type
internal::native_type native_type
The native type of the serialization library.
Definition: rpc.h:193
packio::msgpack_rpc::internal::response::zone
std::unique_ptr<::msgpack::zone > zone
Msgpack zone storing error and result.
Definition: rpc.h:43
packio::msgpack_rpc::rpc
The msgpack RPC protocol implementation.
Definition: rpc.h:187
packio::msgpack_rpc::internal::incremental_parser
The incremental parser for msgpack-RPC objects.
Definition: rpc.h:47
packio
Definition: arg.h:14
packio::msgpack_rpc::internal::request
The object representing a client request.
Definition: rpc.h:28
packio::msgpack_rpc::internal::request::zone
std::unique_ptr<::msgpack::zone > zone
Msgpack zone storing the args.
Definition: rpc.h:34
packio::msgpack_rpc::internal::response
The object representing the response to a call.
Definition: rpc.h:38
packio::msgpack_rpc::rpc::id_type
internal::id_type id_type
Type of the call ID.
Definition: rpc.h:190