packio
incremental_buffers.h
1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 #ifndef PACKIO_NL_JSON_RPC_INCREMENTAL_BUFFERS_H
6 #define PACKIO_NL_JSON_RPC_INCREMENTAL_BUFFERS_H
7 
8 #include <cassert>
9 #include <deque>
10 #include <optional>
11 #include <string>
12 
13 namespace packio {
14 namespace nl_json_rpc {
15 
16 class incremental_buffers {
17 public:
18  std::size_t available_buffers() const
19  { //
20  return serialized_objects_.size();
21  }
22 
23  std::optional<std::string> get_parsed_buffer()
24  {
25  if (serialized_objects_.empty()) {
26  return std::nullopt;
27  }
28 
29  auto buffer = std::move(serialized_objects_.front());
30  serialized_objects_.pop_front();
31  return buffer;
32  }
33 
34  void feed(std::string_view data)
35  {
36  reserve_in_place_buffer(data.size());
37  std::copy(begin(data), end(data), in_place_buffer());
38  in_place_buffer_consumed(data.size());
39  }
40 
41  char* in_place_buffer()
42  { //
43  return raw_buffer_.data() + buffer_.size();
44  }
45 
46  std::size_t in_place_buffer_capacity() const
47  { //
48  return raw_buffer_.size() - buffer_.size();
49  }
50 
51  void in_place_buffer_consumed(std::size_t bytes)
52  {
53  if (bytes == 0) {
54  return;
55  }
56  incremental_parse(bytes);
57  }
58 
59  void reserve_in_place_buffer(std::size_t bytes)
60  {
61  if (in_place_buffer_capacity() >= bytes) {
62  return;
63  }
64  raw_buffer_.resize(buffer_.size() + bytes);
65  }
66 
67 private:
68  void incremental_parse(std::size_t bytes)
69  {
70  if (buffer_.empty()) {
71  std::string_view new_data{in_place_buffer(), bytes};
72  auto first_pos = new_data.find_first_of("{[");
73  if (first_pos == std::string::npos) {
74  return;
75  }
76 
77  initialize(new_data[first_pos]);
78  }
79 
80  std::size_t token_pos = buffer_.size();
81  buffer_ = std::string_view{raw_buffer_.data(), buffer_.size() + bytes};
82 
83  while (true) {
84  token_pos = buffer_.find_first_of(tokens_, token_pos + 1);
85  if (token_pos == std::string::npos) {
86  break;
87  }
88 
89  char token = buffer_[token_pos];
90  if (token == '"' && !is_escaped(token_pos)) {
91  in_string_ = !in_string_;
92  continue;
93  }
94 
95  if (in_string_) {
96  continue;
97  }
98 
99  if (token == last_char_) {
100  if (--depth_ == 0) {
101  // found objet, store the interesting part of the buffer
102  std::size_t buffer_size = token_pos + 1;
103  std::string new_raw_buffer = raw_buffer_.substr(buffer_size);
104  raw_buffer_.resize(buffer_size);
105  serialized_objects_.push_back(std::move(raw_buffer_));
106  // then clear the buffer and re-feed the rest
107  std::size_t bytes_left = buffer_.size() - buffer_size;
108  raw_buffer_ = std::move(new_raw_buffer);
109  buffer_ = std::string_view{raw_buffer_.data(), bytes_left};
110  token_pos -= buffer_size;
111  }
112  }
113  else {
114  assert(token == first_char_);
115  ++depth_;
116  }
117  }
118  }
119 
120  bool is_escaped(std::size_t pos)
121  {
122  bool escaped = false;
123  while (pos-- > 0u) {
124  if (buffer_[pos] == '\\') {
125  escaped = !escaped;
126  }
127  else {
128  break;
129  }
130  }
131  return escaped;
132  }
133 
134  void initialize(char first_char)
135  {
136  first_char_ = first_char;
137  if (first_char_ == '{') {
138  last_char_ = '}';
139  tokens_ = "{}\"";
140  }
141  else {
142  assert(first_char_ == '[');
143  last_char_ = ']';
144  tokens_ = "[]\"";
145  }
146  depth_ = 1;
147  in_string_ = false;
148  }
149 
150  bool in_string_;
151  int depth_;
152  char first_char_;
153  char last_char_;
154  const char* tokens_;
155 
156  std::string_view buffer_;
157  std::string raw_buffer_;
158 
159  std::deque<std::string> serialized_objects_;
160 };
161 
162 } // nl_json_rpc
163 } // packio
164 
165 #endif // PACKIO_NL_JSON_RPC_INCREMENTAL_BUFFERS_H
packio
Definition: arg.h:14