Cpp-Taskflow  2.3.1
tsq.hpp
1 // 2020/02/24 - created by twhuang
2 // - specialized work stealing queue for pointer
3 
4 #pragma once
5 
6 #include <atomic>
7 #include <vector>
8 #include <cassert>
9 
10 namespace tf {
11 
26 template <typename T>
27 class TaskQueue {
28 
29  static_assert(std::is_pointer<T>::value, "T must be a pointer type");
30 
31  struct Array {
32 
33  int64_t C;
34  int64_t M;
35  std::atomic<T>* S;
36 
37  explicit Array(int64_t c) :
38  C {c},
39  M {c-1},
40  S {new std::atomic<T>[static_cast<size_t>(C)]} {
41  }
42 
43  ~Array() {
44  delete [] S;
45  }
46 
47  int64_t capacity() const noexcept {
48  return C;
49  }
50 
51  template <typename O>
52  void push(int64_t i, O&& o) noexcept {
53  S[i & M].store(std::forward<O>(o), std::memory_order_relaxed);
54  }
55 
56  T pop(int64_t i) noexcept {
57  return S[i & M].load(std::memory_order_relaxed);
58  }
59 
60  Array* resize(int64_t b, int64_t t) {
61  Array* ptr = new Array {2*C};
62  for(int64_t i=t; i!=b; ++i) {
63  ptr->push(i, pop(i));
64  }
65  return ptr;
66  }
67 
68  };
69 
71  std::atomic<int64_t> _bottom;
72  std::atomic<Array*> _array;
73  std::vector<Array*> _garbage;
74 
75  public:
76 
82  explicit TaskQueue(int64_t capacity = 1024);
83 
87  ~TaskQueue();
88 
92  bool empty() const noexcept;
93 
97  size_t size() const noexcept;
98 
102  int64_t capacity() const noexcept;
103 
115  void push(T item);
116 
123  T pop();
124 
131  T steal();
132 };
133 
134 // Constructor
135 template <typename T>
137  assert(c && (!(c & (c-1))));
138  _top.store(0, std::memory_order_relaxed);
139  _bottom.store(0, std::memory_order_relaxed);
140  _array.store(new Array{c}, std::memory_order_relaxed);
141  _garbage.reserve(32);
142 }
143 
144 // Destructor
145 template <typename T>
147  for(auto a : _garbage) {
148  delete a;
149  }
150  delete _array.load();
151 }
152 
153 // Function: empty
154 template <typename T>
155 bool TaskQueue<T>::empty() const noexcept {
156  int64_t b = _bottom.load(std::memory_order_relaxed);
157  int64_t t = _top.load(std::memory_order_relaxed);
158  return b <= t;
159 }
160 
161 // Function: size
162 template <typename T>
163 size_t TaskQueue<T>::size() const noexcept {
164  int64_t b = _bottom.load(std::memory_order_relaxed);
165  int64_t t = _top.load(std::memory_order_relaxed);
166  return static_cast<size_t>(b >= t ? b - t : 0);
167 }
168 
169 // Function: push
170 template <typename T>
172  int64_t b = _bottom.load(std::memory_order_relaxed);
173  int64_t t = _top.load(std::memory_order_acquire);
174  Array* a = _array.load(std::memory_order_relaxed);
175 
176  // queue is full
177  if(a->capacity() - 1 < (b - t)) {
178  Array* tmp = a->resize(b, t);
179  _garbage.push_back(a);
180  std::swap(a, tmp);
181  _array.store(a, std::memory_order_relaxed);
182  }
183 
184  a->push(b, o);
185  std::atomic_thread_fence(std::memory_order_release);
186  _bottom.store(b + 1, std::memory_order_relaxed);
187 }
188 
189 // Function: pop
190 template <typename T>
192  int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
193  Array* a = _array.load(std::memory_order_relaxed);
194  _bottom.store(b, std::memory_order_relaxed);
195  std::atomic_thread_fence(std::memory_order_seq_cst);
196  int64_t t = _top.load(std::memory_order_relaxed);
197 
198  T item {nullptr};
199 
200  if(t <= b) {
201  item = a->pop(b);
202  if(t == b) {
203  // the last item just got stolen
204  if(!_top.compare_exchange_strong(t, t+1,
205  std::memory_order_seq_cst,
206  std::memory_order_relaxed)) {
207  item = nullptr;
208  }
209  _bottom.store(b + 1, std::memory_order_relaxed);
210  }
211  }
212  else {
213  _bottom.store(b + 1, std::memory_order_relaxed);
214  }
215 
216  return item;
217 }
218 
219 // Function: steal
220 template <typename T>
222  int64_t t = _top.load(std::memory_order_acquire);
223  std::atomic_thread_fence(std::memory_order_seq_cst);
224  int64_t b = _bottom.load(std::memory_order_acquire);
225 
226  T item {nullptr};
227 
228  if(t < b) {
229  Array* a = _array.load(std::memory_order_consume);
230  item = a->pop(t);
231  if(!_top.compare_exchange_strong(t, t+1,
232  std::memory_order_seq_cst,
233  std::memory_order_relaxed)) {
234  return nullptr;
235  }
236  }
237 
238  return item;
239 }
240 
241 // Function: capacity
242 template <typename T>
243 int64_t TaskQueue<T>::capacity() const noexcept {
244  return _array.load(std::memory_order_relaxed)->capacity();
245 }
246 
247 } // end of namespace tf -----------------------------------------------------
TaskQueue(int64_t capacity=1024)
constructs the queue with a given capacity
Definition: tsq.hpp:136
T swap(T... args)
Definition: error.hpp:9
size_t size() const noexcept
queries the number of items at the time of this call
Definition: tsq.hpp:163
Lock-free unbounded single-producer multiple-consumer queue.
Definition: tsq.hpp:27
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: tsq.hpp:155
T pop()
pops out an item from the queue
Definition: tsq.hpp:191
void push(T item)
inserts an item to the queue
Definition: tsq.hpp:171
int64_t capacity() const noexcept
queries the capacity of the queue
Definition: tsq.hpp:243
T steal()
steals an item from the queue
Definition: tsq.hpp:221
T atomic_thread_fence(T... args)
~TaskQueue()
destructs the queue
Definition: tsq.hpp:146