63 S {
new T[
static_cast<size_t>(C)]} {
81 void push(int64_t i, O&& o) noexcept {
84 S[i & M] = std::forward<O>(o);
87 T
pop(int64_t i) noexcept {
92 Array* resize(int64_t b, int64_t t) {
93 Array* ptr =
new Array {2*C};
94 for(int64_t i=t; i!=b; ++i) {
125 bool empty()
const noexcept;
130 int64_t
size()
const noexcept;
148 template <
typename O>
157 std::optional<T>
pop();
165 std::optional<T>
steal();
169 template <
typename T>
171 assert(c && (!(c & (c-1))));
172 _top.store(0, std::memory_order_relaxed);
173 _bottom.store(0, std::memory_order_relaxed);
174 _array.store(
new Array{c}, std::memory_order_relaxed);
175 _garbage.reserve(32);
179 template <
typename T>
181 for(
auto a : _garbage) {
184 delete _array.load();
188 template <
typename T>
190 int64_t b = _bottom.load(std::memory_order_relaxed);
191 int64_t t = _top.load(std::memory_order_relaxed);
196 template <
typename T>
198 int64_t b = _bottom.load(std::memory_order_relaxed);
199 int64_t t = _top.load(std::memory_order_relaxed);
204 template <
typename T>
205 template <
typename O>
207 int64_t b = _bottom.load(std::memory_order_relaxed);
208 int64_t t = _top.load(std::memory_order_acquire);
209 Array* a = _array.load(std::memory_order_relaxed);
212 if(a->capacity() - 1 < (b - t)) {
213 Array* tmp = a->resize(b, t);
214 _garbage.push_back(a);
216 _array.store(a, std::memory_order_relaxed);
219 a->push(b, std::forward<O>(o));
220 std::atomic_thread_fence(std::memory_order_release);
221 _bottom.store(b + 1, std::memory_order_relaxed);
225 template <
typename T>
227 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
228 Array* a = _array.load(std::memory_order_relaxed);
229 _bottom.store(b, std::memory_order_relaxed);
230 std::atomic_thread_fence(std::memory_order_seq_cst);
231 int64_t t = _top.load(std::memory_order_relaxed);
233 std::optional<T> item;
239 if(!_top.compare_exchange_strong(t, t+1,
240 std::memory_order_seq_cst,
241 std::memory_order_relaxed)) {
244 _bottom.store(b + 1, std::memory_order_relaxed);
248 _bottom.store(b + 1, std::memory_order_relaxed);
255 template <
typename T>
257 int64_t t = _top.load(std::memory_order_acquire);
258 std::atomic_thread_fence(std::memory_order_seq_cst);
259 int64_t b = _bottom.load(std::memory_order_acquire);
261 std::optional<T> item;
264 Array* a = _array.load(std::memory_order_consume);
266 if(!_top.compare_exchange_strong(t, t+1,
267 std::memory_order_seq_cst,
268 std::memory_order_relaxed)) {
277 template <
typename T>
279 return _array.load(std::memory_order_relaxed)->capacity();
293 template <
typename Closure>
299 std::optional<Closure> cache;
303 unsigned last_victim;
326 size_t num_workers()
const;
331 bool is_owner()
const;
340 template <
typename... ArgsT>
341 void emplace(ArgsT&&... args);
364 void _spawn(
unsigned);
366 void _balance_load(
unsigned);
368 unsigned _randomize(uint64_t&)
const;
369 unsigned _fast_modulo(
unsigned,
unsigned)
const;
371 std::optional<Closure> _steal(
unsigned);
375 template <
typename Closure>
377 _worker_maps.reserve(N);
382 template <
typename Closure>
388 template <
typename Closure>
394 std::scoped_lock lock(_mutex);
395 for(
auto& w : _workers){
401 for(
auto& t : _threads){
411 template <
typename Closure>
413 uint64_t current = state;
414 state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
416 return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
420 template <
typename Closure>
422 return ((uint64_t) x * (uint64_t) N) >> 32;
426 template <
typename Closure>
430 std::scoped_lock lock(_mutex);
432 for(
unsigned i=0; i<N; ++i) {
433 _threads.emplace_back([
this, i, N] () ->
void {
435 std::optional<Closure> t;
436 Worker& w = (_workers[i]);
437 w.last_victim = (i + 1) % N;
445 if(t = w.queue.pop(); !t) {
452 if(lock.try_lock()) {
455 _idlers.push_back(&w);
456 while(!w.ready && !w.exit) {
464 std::swap(t, w.cache);
471 t = std::move(w.cache);
472 w.cache = std::nullopt;
486 _worker_maps.insert({_threads.back().get_id(), i});
491 template <
typename Closure>
497 template <
typename Closure>
499 return _threads.size();
503 template <
typename Closure>
506 auto n = _workers[me].queue.size();
510 if(_idlers.empty() || n <= 1) {
517 if(_mutex.try_lock()) {
518 if(!_idlers.empty()) {
519 Worker* w = _idlers.back();
522 w->cache = _workers[me].queue.pop();
532 template <
typename Closure>
535 std::optional<Closure> task;
537 for(
int round=0; round<1024; ++round) {
540 if(task = _queue.
steal(); task) {
545 unsigned victim = _workers[thief].last_victim;
547 for(
unsigned i=0; i<_workers.size(); i++){
549 if(victim != thief) {
550 if(task = _workers[victim].queue.steal(); task){
551 _workers[thief].last_victim = victim;
556 if(++victim; victim == _workers.size()){
569 template <
typename Closure>
570 template <
typename... ArgsT>
575 Closure{std::forward<ArgsT>(args)...}();
583 if(
auto itr = _worker_maps.find(tid); itr != _worker_maps.end()){
585 unsigned me = itr->second;
588 if(!_workers[me].cache){
589 _workers[me].cache.emplace(std::forward<ArgsT>(args)...);
593 _workers[me].queue.push(Closure{std::forward<ArgsT>(args)...});
599 std::scoped_lock lock(_mutex);
602 _queue.
push(Closure{std::forward<ArgsT>(args)...});
605 Worker* w = _idlers.back();
608 w->cache.emplace(std::forward<ArgsT>(args)...);
614 template <
typename Closure>
633 if(
auto itr = _worker_maps.find(tid); itr != _worker_maps.end()){
635 unsigned me = itr->second;
639 if(!_workers[me].cache) {
640 _workers[me].cache = std::move(tasks[i++]);
643 for(; i<tasks.size(); ++i) {
644 _workers[me].queue.push(std::move(tasks[i]));
651 std::scoped_lock lock(_mutex);
653 size_t N = std::min(tasks.size(), _idlers.size());
655 for(
size_t k=N; k<tasks.size(); ++k) {
656 _queue.
push(std::move(tasks[k]));
659 for(
size_t i=0; i<N; ++i) {
660 Worker* w = _idlers.back();
663 w->cache = std::move(tasks[i]);
bool is_owner() const
queries if the caller is the owner of the executor
Definition: workstealing_threadpool.hpp:492
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: workstealing_threadpool.hpp:189
~WorkStealingQueue()
destructs the queue
Definition: workstealing_threadpool.hpp:180
void push(O &&item)
inserts an item to the queue
Definition: workstealing_threadpool.hpp:206
WorkStealingThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: workstealing_threadpool.hpp:376
Definition: taskflow.hpp:6
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: workstealing_threadpool.hpp:615
~WorkStealingThreadpool()
destructs the executor
Definition: workstealing_threadpool.hpp:383
Executor that implements an efficient work stealing algorithm.
Definition: workstealing_threadpool.hpp:294
size_t num_workers() const
queries the number of worker threads
Definition: workstealing_threadpool.hpp:498
int64_t capacity() const noexcept
queries the capacity of the queue
Definition: workstealing_threadpool.hpp:278
int64_t size() const noexcept
queries the number of items at the time of this call
Definition: workstealing_threadpool.hpp:197
std::optional< T > pop()
pops out an item from the queue
Definition: workstealing_threadpool.hpp:226
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: workstealing_threadpool.hpp:571
Lock-free unbounded single-producer multiple-consumer queue.
Definition: workstealing_threadpool.hpp:46
std::optional< T > steal()
steals an item from the queue
Definition: workstealing_threadpool.hpp:256
WorkStealingQueue(int64_t capacity=4096)
constructs the queue with a given capacity
Definition: workstealing_threadpool.hpp:170