22 #include "notifier.hpp" 56 S {
new T[
static_cast<size_t>(C)]} {
74 void push(int64_t i, O&& o) noexcept {
77 S[i & M] = std::forward<O>(o);
80 T
pop(int64_t i) noexcept {
85 Array* resize(int64_t b, int64_t t) {
86 Array* ptr =
new Array {2*C};
87 for(int64_t i=t; i!=b; ++i) {
118 bool empty()
const noexcept;
123 int64_t
size()
const noexcept;
141 template <
typename O>
150 std::optional<T>
pop();
158 std::optional<T>
steal();
162 template <
typename T>
164 assert(c && (!(c & (c-1))));
165 _top.store(0, std::memory_order_relaxed);
166 _bottom.store(0, std::memory_order_relaxed);
167 _array.store(
new Array{c}, std::memory_order_relaxed);
168 _garbage.reserve(32);
172 template <
typename T>
174 for(
auto a : _garbage) {
177 delete _array.load();
181 template <
typename T>
183 int64_t b = _bottom.load(std::memory_order_relaxed);
184 int64_t t = _top.load(std::memory_order_relaxed);
189 template <
typename T>
191 int64_t b = _bottom.load(std::memory_order_relaxed);
192 int64_t t = _top.load(std::memory_order_relaxed);
197 template <
typename T>
198 template <
typename O>
200 int64_t b = _bottom.load(std::memory_order_relaxed);
201 int64_t t = _top.load(std::memory_order_acquire);
202 Array* a = _array.load(std::memory_order_relaxed);
205 if(a->capacity() - 1 < (b - t)) {
206 Array* tmp = a->resize(b, t);
207 _garbage.push_back(a);
209 _array.store(a, std::memory_order_relaxed);
212 a->push(b, std::forward<O>(o));
213 std::atomic_thread_fence(std::memory_order_release);
214 _bottom.store(b + 1, std::memory_order_relaxed);
218 template <
typename T>
220 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
221 Array* a = _array.load(std::memory_order_relaxed);
222 _bottom.store(b, std::memory_order_relaxed);
223 std::atomic_thread_fence(std::memory_order_seq_cst);
224 int64_t t = _top.load(std::memory_order_relaxed);
226 std::optional<T> item;
232 if(!_top.compare_exchange_strong(t, t+1,
233 std::memory_order_seq_cst,
234 std::memory_order_relaxed)) {
237 _bottom.store(b + 1, std::memory_order_relaxed);
241 _bottom.store(b + 1, std::memory_order_relaxed);
248 template <
typename T>
250 int64_t t = _top.load(std::memory_order_acquire);
251 std::atomic_thread_fence(std::memory_order_seq_cst);
252 int64_t b = _bottom.load(std::memory_order_acquire);
254 std::optional<T> item;
257 Array* a = _array.load(std::memory_order_consume);
259 if(!_top.compare_exchange_strong(t, t+1,
260 std::memory_order_seq_cst,
261 std::memory_order_relaxed)) {
270 template <
typename T>
272 return _array.load(std::memory_order_relaxed)->capacity();
284 template <
typename Closure>
289 std::optional<Closure> cache;
291 unsigned last_victim;
333 template <
typename... ArgsT>
360 void _spawn(
unsigned);
362 unsigned _randomize(uint64_t&)
const;
363 unsigned _fast_modulo(
unsigned,
unsigned)
const;
365 PerThread& _per_thread()
const;
367 std::optional<Closure> _steal(
unsigned);
371 template <
typename Closure>
375 _notifier{_waiters} {
381 template <
typename Closure>
385 std::scoped_lock lock(_mutex);
386 for(
auto& w : _workers){
391 _notifier.notify(
true);
393 for(
auto& t : _threads){
399 template <
typename Closure>
402 thread_local PerThread pt;
407 template <
typename Closure>
408 unsigned WorkStealingThreadpool<Closure>::_randomize(uint64_t& state)
const {
409 uint64_t current = state;
410 state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
412 return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
416 template <
typename Closure>
417 unsigned WorkStealingThreadpool<Closure>::_fast_modulo(
unsigned x,
unsigned N)
const {
418 return ((uint64_t) x * (uint64_t) N) >> 32;
422 template <
typename Closure>
423 void WorkStealingThreadpool<Closure>::_spawn(
unsigned N) {
426 for(
unsigned i=0; i<N; ++i) {
427 _threads.emplace_back([
this, i, N] () ->
void {
429 PerThread& pt = _per_thread();
433 auto& waiter = _waiters[i];
434 auto& worker = _workers[i];
436 worker.last_victim = (i + 1) % N;
438 std::optional<Closure> t;
440 while(!worker.exit) {
443 if(t = worker.queue.pop(); !t) {
449 if (!t && !_spinning && !_spinning.exchange(
true)) {
450 for(
int r = 0; r < 1024 && !worker.exit && !t; r++) {
459 _notifier.prepare_wait(&waiter);
463 if(_mutex.try_lock()) {
468 if(!_queue.empty()) {
482 _notifier.commit_wait(&waiter);
486 _notifier.cancel_wait(&waiter);
494 worker.cache = std::nullopt;
507 template <
typename Closure>
513 template <
typename Closure>
515 return _workers.size();
519 template <
typename Closure>
522 std::optional<Closure> task;
525 if(task = _queue.steal(); task) {
530 unsigned victim = _workers[thief].last_victim;
532 for(
unsigned i=0; i<_workers.size(); i++){
534 if(victim != thief) {
535 if(task = _workers[victim].queue.steal(); task){
536 _workers[thief].last_victim = victim;
541 if(++victim; victim == _workers.size()){
550 template <
typename Closure>
551 template <
typename... ArgsT>
555 if(num_workers() == 0){
556 Closure{std::forward<ArgsT>(args)...}();
560 auto& pt = _per_thread();
563 if(pt.pool ==
this) {
564 if(!_workers[pt.thread_id].cache) {
565 _workers[pt.thread_id].cache.emplace(std::forward<ArgsT>(args)...);
569 _workers[pt.thread_id].queue.push(Closure{std::forward<ArgsT>(args)...});
574 std::scoped_lock lock(_mutex);
575 _queue.push(Closure{std::forward<ArgsT>(args)...});
578 _notifier.notify(
false);
582 template <
typename Closure>
590 if(num_workers() == 0){
597 auto& pt = _per_thread();
599 if(pt.pool ==
this) {
603 if(!_workers[pt.thread_id].cache) {
604 _workers[pt.thread_id].cache = std::move(tasks[i++]);
607 for(; i<tasks.size(); ++i) {
608 _workers[pt.thread_id].queue.push(std::move(tasks[i]));
609 _notifier.notify(
false);
616 std::scoped_lock lock(_mutex);
618 for(
size_t k=0; k<tasks.size(); ++k) {
619 _queue.push(std::move(tasks[k]));
625 size_t N = std::max(
size_t{1}, std::min(_num_idlers.load(), tasks.size()));
627 for(
size_t i=0; i<N; ++i) {
628 _notifier.notify(
false);
bool is_owner() const
queries if the caller is the owner of the executor
Definition: workstealing_threadpool.hpp:508
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: workstealing_threadpool.hpp:182
~WorkStealingQueue()
destructs the queue
Definition: workstealing_threadpool.hpp:173
void push(O &&item)
inserts an item to the queue
Definition: workstealing_threadpool.hpp:199
WorkStealingThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: workstealing_threadpool.hpp:372
Definition: taskflow.hpp:6
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: workstealing_threadpool.hpp:583
~WorkStealingThreadpool()
destructs the executor
Definition: workstealing_threadpool.hpp:382
Executor that implements an efficient work stealing algorithm.
Definition: workstealing_threadpool.hpp:285
size_t num_workers() const
queries the number of worker threads
Definition: workstealing_threadpool.hpp:514
int64_t capacity() const noexcept
queries the capacity of the queue
Definition: workstealing_threadpool.hpp:271
int64_t size() const noexcept
queries the number of items at the time of this call
Definition: workstealing_threadpool.hpp:190
std::optional< T > pop()
pops out an item from the queue
Definition: workstealing_threadpool.hpp:219
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: workstealing_threadpool.hpp:552
Lock-free unbounded single-producer multiple-consumer queue.
Definition: workstealing_threadpool.hpp:39
std::optional< T > steal()
steals an item from the queue
Definition: workstealing_threadpool.hpp:249
WorkStealingQueue(int64_t capacity=4096)
constructs the queue with a given capacity
Definition: workstealing_threadpool.hpp:163