46 S {
new T[
static_cast<size_t>(C)]} {
64 void push(int64_t i, O&& o) noexcept {
67 S[i & M] = std::forward<O>(o);
70 T
pop(int64_t i) noexcept {
75 Array* resize(int64_t b, int64_t t) {
76 Array* ptr =
new Array {2*C};
77 for(int64_t i=t; i!=b; ++i) {
108 bool empty()
const noexcept;
113 size_t size()
const noexcept;
131 template <
typename O>
140 std::optional<T>
pop();
158 std::optional<T>
steal();
162 template <
typename T>
164 assert(c && (!(c & (c-1))));
165 _top.store(0, std::memory_order_relaxed);
166 _bottom.store(0, std::memory_order_relaxed);
167 _array.store(
new Array{c}, std::memory_order_relaxed);
168 _garbage.reserve(32);
172 template <
typename T>
174 for(
auto a : _garbage) {
177 delete _array.load();
181 template <
typename T>
183 int64_t b = _bottom.load(std::memory_order_relaxed);
184 int64_t t = _top.load(std::memory_order_relaxed);
189 template <
typename T>
191 int64_t b = _bottom.load(std::memory_order_relaxed);
192 int64_t t = _top.load(std::memory_order_relaxed);
193 return static_cast<size_t>(b >= t ? b - t : 0);
197 template <
typename T>
198 template <
typename O>
200 int64_t b = _bottom.load(std::memory_order_relaxed);
201 int64_t t = _top.load(std::memory_order_acquire);
202 Array* a = _array.load(std::memory_order_relaxed);
205 if(a->capacity() - 1 < (b - t)) {
206 Array* tmp = a->resize(b, t);
207 _garbage.push_back(a);
209 _array.store(a, std::memory_order_relaxed);
212 a->push(b, std::forward<O>(o));
213 std::atomic_thread_fence(std::memory_order_release);
214 _bottom.store(b + 1, std::memory_order_relaxed);
218 template <
typename T>
220 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
221 Array* a = _array.load(std::memory_order_relaxed);
222 _bottom.store(b, std::memory_order_relaxed);
223 std::atomic_thread_fence(std::memory_order_seq_cst);
224 int64_t t = _top.load(std::memory_order_relaxed);
226 std::optional<T> item;
232 if(!_top.compare_exchange_strong(t, t+1,
233 std::memory_order_seq_cst,
234 std::memory_order_relaxed)) {
237 _bottom.store(b + 1, std::memory_order_relaxed);
241 _bottom.store(b + 1, std::memory_order_relaxed);
248 template <
typename T>
251 int64_t t = _top.load(std::memory_order_relaxed);
252 int64_t b = _bottom.fetch_sub(1, std::memory_order_relaxed) - 1;
253 Array* a = _array.load(std::memory_order_relaxed);
255 std::optional<T> item;
261 _bottom.store(b + 1, std::memory_order_relaxed);
268 template <
typename T>
270 int64_t t = _top.load(std::memory_order_acquire);
271 std::atomic_thread_fence(std::memory_order_seq_cst);
272 int64_t b = _bottom.load(std::memory_order_acquire);
274 std::optional<T> item;
277 Array* a = _array.load(std::memory_order_consume);
279 if(!_top.compare_exchange_strong(t, t+1,
280 std::memory_order_seq_cst,
281 std::memory_order_relaxed)) {
290 template <
typename T>
292 return _array.load(std::memory_order_relaxed)->capacity();
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: spmc_queue.hpp:182
~WorkStealingQueue()
destructs the queue
Definition: spmc_queue.hpp:173
void push(O &&item)
inserts an item to the queue
Definition: spmc_queue.hpp:199
Definition: taskflow.hpp:5
size_t size() const noexcept
queries the number of items at the time of this call
Definition: spmc_queue.hpp:190
std::optional< T > unsync_pop()
pops out an item from the queue without synchronization with thieves
Definition: spmc_queue.hpp:249
int64_t capacity() const noexcept
queries the capacity of the queue
Definition: spmc_queue.hpp:291
std::optional< T > pop()
pops out an item from the queue
Definition: spmc_queue.hpp:219
Lock-free unbounded single-producer multiple-consumer queue.
Definition: spmc_queue.hpp:29
std::optional< T > steal()
steals an item from the queue
Definition: spmc_queue.hpp:269
WorkStealingQueue(int64_t capacity=1024)
constructs the queue with a given capacity
Definition: spmc_queue.hpp:163