Cpp-Taskflow  2.2.0
spmc_queue.hpp
1 // 2019/05/15 - created by Tsung-Wei Huang
2 // - isolated from the original workstealing executor
3 
4 #pragma once
5 
6 #include <atomic>
7 #include <vector>
8 #include <optional>
9 
10 namespace tf {
11 
28 template <typename T>
30 
31  //constexpr static int64_t cacheline_size = 64;
32 
33  //using storage_type = std::aligned_storage_t<sizeof(T), cacheline_size>;
34 
35  struct Array {
36 
37  int64_t C;
38  int64_t M;
39  //storage_type* S;
40  T* S;
41 
42  Array(int64_t c) :
43  C {c},
44  M {c-1},
45  //S {new storage_type[C]} {
46  S {new T[static_cast<size_t>(C)]} {
47  //for(int64_t i=0; i<C; ++i) {
48  // ::new (std::addressof(S[i])) T();
49  //}
50  }
51 
52  ~Array() {
53  //for(int64_t i=0; i<C; ++i) {
54  // reinterpret_cast<T*>(std::addressof(S[i]))->~T();
55  //}
56  delete [] S;
57  }
58 
59  int64_t capacity() const noexcept {
60  return C;
61  }
62 
63  template <typename O>
64  void push(int64_t i, O&& o) noexcept {
65  //T* ptr = reinterpret_cast<T*>(std::addressof(S[i & M]));
66  //*ptr = std::forward<O>(o);
67  S[i & M] = std::forward<O>(o);
68  }
69 
70  T pop(int64_t i) noexcept {
71  //return *reinterpret_cast<T*>(std::addressof(S[i & M]));
72  return S[i & M];
73  }
74 
75  Array* resize(int64_t b, int64_t t) {
76  Array* ptr = new Array {2*C};
77  for(int64_t i=t; i!=b; ++i) {
78  ptr->push(i, pop(i));
79  }
80  return ptr;
81  }
82 
83  };
84 
86  std::atomic<int64_t> _bottom;
87  std::atomic<Array*> _array;
88  std::vector<Array*> _garbage;
89  //char _padding[cacheline_size];
90 
91  public:
92 
98  WorkStealingQueue(int64_t capacity = 1024);
99 
104 
108  bool empty() const noexcept;
109 
113  size_t size() const noexcept;
114 
118  int64_t capacity() const noexcept;
119 
131  template <typename O>
132  void push(O&& item);
133 
140  std::optional<T> pop();
141 
150  std::optional<T> unsync_pop();
151 
158  std::optional<T> steal();
159 };
160 
161 // Constructor
162 template <typename T>
164  assert(c && (!(c & (c-1))));
165  _top.store(0, std::memory_order_relaxed);
166  _bottom.store(0, std::memory_order_relaxed);
167  _array.store(new Array{c}, std::memory_order_relaxed);
168  _garbage.reserve(32);
169 }
170 
171 // Destructor
172 template <typename T>
174  for(auto a : _garbage) {
175  delete a;
176  }
177  delete _array.load();
178 }
179 
180 // Function: empty
181 template <typename T>
182 bool WorkStealingQueue<T>::empty() const noexcept {
183  int64_t b = _bottom.load(std::memory_order_relaxed);
184  int64_t t = _top.load(std::memory_order_relaxed);
185  return b <= t;
186 }
187 
188 // Function: size
189 template <typename T>
190 size_t WorkStealingQueue<T>::size() const noexcept {
191  int64_t b = _bottom.load(std::memory_order_relaxed);
192  int64_t t = _top.load(std::memory_order_relaxed);
193  return static_cast<size_t>(b >= t ? b - t : 0);
194 }
195 
196 // Function: push
197 template <typename T>
198 template <typename O>
200  int64_t b = _bottom.load(std::memory_order_relaxed);
201  int64_t t = _top.load(std::memory_order_acquire);
202  Array* a = _array.load(std::memory_order_relaxed);
203 
204  // queue is full
205  if(a->capacity() - 1 < (b - t)) {
206  Array* tmp = a->resize(b, t);
207  _garbage.push_back(a);
208  std::swap(a, tmp);
209  _array.store(a, std::memory_order_relaxed);
210  }
211 
212  a->push(b, std::forward<O>(o));
213  std::atomic_thread_fence(std::memory_order_release);
214  _bottom.store(b + 1, std::memory_order_relaxed);
215 }
216 
217 // Function: pop
218 template <typename T>
219 std::optional<T> WorkStealingQueue<T>::pop() {
220  int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
221  Array* a = _array.load(std::memory_order_relaxed);
222  _bottom.store(b, std::memory_order_relaxed);
223  std::atomic_thread_fence(std::memory_order_seq_cst);
224  int64_t t = _top.load(std::memory_order_relaxed);
225 
226  std::optional<T> item;
227 
228  if(t <= b) {
229  item = a->pop(b);
230  if(t == b) {
231  // the last item just got stolen
232  if(!_top.compare_exchange_strong(t, t+1,
233  std::memory_order_seq_cst,
234  std::memory_order_relaxed)) {
235  item = std::nullopt;
236  }
237  _bottom.store(b + 1, std::memory_order_relaxed);
238  }
239  }
240  else {
241  _bottom.store(b + 1, std::memory_order_relaxed);
242  }
243 
244  return item;
245 }
246 
247 // Function: unsync_pop
248 template <typename T>
250 
251  int64_t t = _top.load(std::memory_order_relaxed);
252  int64_t b = _bottom.fetch_sub(1, std::memory_order_relaxed) - 1;
253  Array* a = _array.load(std::memory_order_relaxed);
254 
255  std::optional<T> item;
256 
257  if(t <= b) {
258  item = a->pop(b);
259  }
260  else {
261  _bottom.store(b + 1, std::memory_order_relaxed);
262  }
263 
264  return item;
265 }
266 
267 // Function: steal
268 template <typename T>
269 std::optional<T> WorkStealingQueue<T>::steal() {
270  int64_t t = _top.load(std::memory_order_acquire);
271  std::atomic_thread_fence(std::memory_order_seq_cst);
272  int64_t b = _bottom.load(std::memory_order_acquire);
273 
274  std::optional<T> item;
275 
276  if(t < b) {
277  Array* a = _array.load(std::memory_order_consume);
278  item = a->pop(t);
279  if(!_top.compare_exchange_strong(t, t+1,
280  std::memory_order_seq_cst,
281  std::memory_order_relaxed)) {
282  return std::nullopt;
283  }
284  }
285 
286  return item;
287 }
288 
289 // Function: capacity
290 template <typename T>
291 int64_t WorkStealingQueue<T>::capacity() const noexcept {
292  return _array.load(std::memory_order_relaxed)->capacity();
293 }
294 
295 } // end of namespace tf -----------------------------------------------------
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: spmc_queue.hpp:182
~WorkStealingQueue()
destructs the queue
Definition: spmc_queue.hpp:173
void push(O &&item)
inserts an item to the queue
Definition: spmc_queue.hpp:199
Definition: taskflow.hpp:5
size_t size() const noexcept
queries the number of items at the time of this call
Definition: spmc_queue.hpp:190
std::optional< T > unsync_pop()
pops out an item from the queue without synchronization with thieves
Definition: spmc_queue.hpp:249
int64_t capacity() const noexcept
queries the capacity of the queue
Definition: spmc_queue.hpp:291
std::optional< T > pop()
pops out an item from the queue
Definition: spmc_queue.hpp:219
Lock-free unbounded single-producer multiple-consumer queue.
Definition: spmc_queue.hpp:29
std::optional< T > steal()
steals an item from the queue
Definition: spmc_queue.hpp:269
WorkStealingQueue(int64_t capacity=1024)
constructs the queue with a given capacity
Definition: spmc_queue.hpp:163