Cpp-Taskflow  2.0.0
workstealing_threadpool.hpp
1 // 2019/01/03 - modified by Tsung-Wei Huang
2 // - updated the load balancing strategy
3 //
4 // 2018/12/24 - modified by Tsung-Wei Huang
5 // - refined the work balancing strategy
6 //
7 // 2018/12/06 - modified by Tsung-Wei Huang
8 // - refactored the code
9 // - added load balancing strategy
10 // - removed the storage alignment in WorkStealingQueue
11 //
12 // 2018/12/03 - created by Tsung-Wei Huang
13 // - added WorkStealingQueue class
14 
15 #pragma once
16 
17 #include <iostream>
18 #include <vector>
19 #include <cstdlib>
20 #include <cstdio>
21 #include <atomic>
22 #include <memory>
23 #include <cassert>
24 #include <deque>
25 #include <optional>
26 #include <thread>
27 #include <algorithm>
28 #include <set>
29 #include <numeric>
30 
31 namespace tf {
32 
45 template <typename T>
47 
48  //constexpr static int64_t cacheline_size = 64;
49 
50  //using storage_type = std::aligned_storage_t<sizeof(T), cacheline_size>;
51 
52  struct Array {
53 
54  int64_t C;
55  int64_t M;
56  //storage_type* S;
57  T* S;
58 
59  Array(int64_t c) :
60  C {c},
61  M {c-1},
62  //S {new storage_type[C]} {
63  S {new T[static_cast<size_t>(C)]} {
64  //for(int64_t i=0; i<C; ++i) {
65  // ::new (std::addressof(S[i])) T();
66  //}
67  }
68 
69  ~Array() {
70  //for(int64_t i=0; i<C; ++i) {
71  // reinterpret_cast<T*>(std::addressof(S[i]))->~T();
72  //}
73  delete [] S;
74  }
75 
76  int64_t capacity() const noexcept {
77  return C;
78  }
79 
80  template <typename O>
81  void push(int64_t i, O&& o) noexcept {
82  //T* ptr = reinterpret_cast<T*>(std::addressof(S[i & M]));
83  //*ptr = std::forward<O>(o);
84  S[i & M] = std::forward<O>(o);
85  }
86 
87  T pop(int64_t i) noexcept {
88  //return *reinterpret_cast<T*>(std::addressof(S[i & M]));
89  return S[i & M];
90  }
91 
92  Array* resize(int64_t b, int64_t t) {
93  Array* ptr = new Array {2*C};
94  for(int64_t i=t; i!=b; ++i) {
95  ptr->push(i, pop(i));
96  }
97  return ptr;
98  }
99 
100  };
101 
103  std::atomic<int64_t> _bottom;
104  std::atomic<Array*> _array;
105  std::vector<Array*> _garbage;
106  //char _padding[cacheline_size];
107 
108  public:
109 
115  WorkStealingQueue(int64_t capacity = 4096);
116 
121 
125  bool empty() const noexcept;
126 
130  int64_t size() const noexcept;
131 
135  int64_t capacity() const noexcept;
136 
148  template <typename O>
149  void push(O&& item);
150 
157  std::optional<T> pop();
158 
165  std::optional<T> steal();
166 };
167 
168 // Constructor
169 template <typename T>
171  assert(c && (!(c & (c-1))));
172  _top.store(0, std::memory_order_relaxed);
173  _bottom.store(0, std::memory_order_relaxed);
174  _array.store(new Array{c}, std::memory_order_relaxed);
175  _garbage.reserve(32);
176 }
177 
178 // Destructor
179 template <typename T>
181  for(auto a : _garbage) {
182  delete a;
183  }
184  delete _array.load();
185 }
186 
187 // Function: empty
188 template <typename T>
189 bool WorkStealingQueue<T>::empty() const noexcept {
190  int64_t b = _bottom.load(std::memory_order_relaxed);
191  int64_t t = _top.load(std::memory_order_relaxed);
192  return b <= t;
193 }
194 
195 // Function: size
196 template <typename T>
197 int64_t WorkStealingQueue<T>::size() const noexcept {
198  int64_t b = _bottom.load(std::memory_order_relaxed);
199  int64_t t = _top.load(std::memory_order_relaxed);
200  return b - t;
201 }
202 
203 // Function: push
204 template <typename T>
205 template <typename O>
207  int64_t b = _bottom.load(std::memory_order_relaxed);
208  int64_t t = _top.load(std::memory_order_acquire);
209  Array* a = _array.load(std::memory_order_relaxed);
210 
211  // queue is full
212  if(a->capacity() - 1 < (b - t)) {
213  Array* tmp = a->resize(b, t);
214  _garbage.push_back(a);
215  std::swap(a, tmp);
216  _array.store(a, std::memory_order_relaxed);
217  }
218 
219  a->push(b, std::forward<O>(o));
220  std::atomic_thread_fence(std::memory_order_release);
221  _bottom.store(b + 1, std::memory_order_relaxed);
222 }
223 
224 // Function: pop
225 template <typename T>
226 std::optional<T> WorkStealingQueue<T>::pop() {
227  int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
228  Array* a = _array.load(std::memory_order_relaxed);
229  _bottom.store(b, std::memory_order_relaxed);
230  std::atomic_thread_fence(std::memory_order_seq_cst);
231  int64_t t = _top.load(std::memory_order_relaxed);
232 
233  std::optional<T> item;
234 
235  if(t <= b) {
236  item = a->pop(b);
237  if(t == b) {
238  // the last item just got stolen
239  if(!_top.compare_exchange_strong(t, t+1,
240  std::memory_order_seq_cst,
241  std::memory_order_relaxed)) {
242  item = std::nullopt;
243  }
244  _bottom.store(b + 1, std::memory_order_relaxed);
245  }
246  }
247  else {
248  _bottom.store(b + 1, std::memory_order_relaxed);
249  }
250 
251  return item;
252 }
253 
254 // Function: steal
255 template <typename T>
256 std::optional<T> WorkStealingQueue<T>::steal() {
257  int64_t t = _top.load(std::memory_order_acquire);
258  std::atomic_thread_fence(std::memory_order_seq_cst);
259  int64_t b = _bottom.load(std::memory_order_acquire);
260 
261  std::optional<T> item;
262 
263  if(t < b) {
264  Array* a = _array.load(std::memory_order_consume);
265  item = a->pop(t);
266  if(!_top.compare_exchange_strong(t, t+1,
267  std::memory_order_seq_cst,
268  std::memory_order_relaxed)) {
269  return std::nullopt;
270  }
271  }
272 
273  return item;
274 }
275 
276 // Function: capacity
277 template <typename T>
278 int64_t WorkStealingQueue<T>::capacity() const noexcept {
279  return _array.load(std::memory_order_relaxed)->capacity();
280 }
281 
282 // ----------------------------------------------------------------------------
283 
284 
293 template <typename Closure>
295 
296  struct Worker {
299  std::optional<Closure> cache;
300  bool exit {false};
301  bool ready {false};
302  uint64_t seed;
303  unsigned last_victim;
304  };
305 
306  public:
307 
313  explicit WorkStealingThreadpool(unsigned N);
314 
322 
326  size_t num_workers() const;
327 
331  bool is_owner() const;
332 
340  template <typename... ArgsT>
341  void emplace(ArgsT&&... args);
342 
348  void batch(std::vector<Closure>&& closures);
349 
350  private:
351 
352  const std::thread::id _owner {std::this_thread::get_id()};
353 
354  mutable std::mutex _mutex;
355 
356  std::vector<Worker> _workers;
357  std::vector<Worker*> _idlers;
358  std::vector<std::thread> _threads;
359 
361 
363 
364  void _spawn(unsigned);
365  void _shutdown();
366  void _balance_load(unsigned);
367 
368  unsigned _randomize(uint64_t&) const;
369  unsigned _fast_modulo(unsigned, unsigned) const;
370 
371  std::optional<Closure> _steal(unsigned);
372 };
373 
374 // Constructor
375 template <typename Closure>
377  _worker_maps.reserve(N);
378  _spawn(N);
379 }
380 
381 // Destructor
382 template <typename Closure>
384  _shutdown();
385 }
386 
387 // Procedure: _shutdown
388 template <typename Closure>
390 
391  assert(is_owner());
392 
393  {
394  std::scoped_lock lock(_mutex);
395  for(auto& w : _workers){
396  w.exit = true;
397  w.cv.notify_one();
398  }
399  }
400 
401  for(auto& t : _threads){
402  t.join();
403  }
404 
405  //_threads.clear();
406  //_workers.clear();
407  //_worker_maps.clear();
408 }
409 
410 // Function: _randomize
411 template <typename Closure>
412 unsigned WorkStealingThreadpool<Closure>::_randomize(uint64_t& state) const {
413  uint64_t current = state;
414  state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
415  // Generate the random output (using the PCG-XSH-RS scheme)
416  return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
417 }
418 
419 // http://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
420 template <typename Closure>
421 unsigned WorkStealingThreadpool<Closure>::_fast_modulo(unsigned x, unsigned N) const {
422  return ((uint64_t) x * (uint64_t) N) >> 32;
423 }
424 
425 // Procedure: _spawn
426 template <typename Closure>
428 
429  // Lock to synchronize all workers before creating _worker_mapss
430  std::scoped_lock lock(_mutex);
431 
432  for(unsigned i=0; i<N; ++i) {
433  _threads.emplace_back([this, i, N] () -> void {
434 
435  std::optional<Closure> t;
436  Worker& w = (_workers[i]);
437  w.last_victim = (i + 1) % N;
438  w.seed = i + 1;
439 
440  std::unique_lock lock(_mutex, std::defer_lock);
441 
442  while(!w.exit) {
443 
444  // pop from my own queue
445  if(t = w.queue.pop(); !t) {
446  // steal from others
447  t = _steal(i);
448  }
449 
450  // no tasks
451  if(!t) {
452  if(lock.try_lock()) { // avoid contention
453  if(_queue.empty()) {
454  w.ready = false;
455  _idlers.push_back(&w);
456  while(!w.ready && !w.exit) {
457  w.cv.wait(lock);
458  }
459  }
460  lock.unlock();
461  }
462 
463  if(w.cache) {
464  std::swap(t, w.cache);
465  }
466  }
467 
468  while(t) {
469  (*t)();
470  if(w.cache) {
471  t = std::move(w.cache);
472  w.cache = std::nullopt;
473  }
474  else {
475  t = std::nullopt;
476  }
477  }
478 
479  // balance load
480  _balance_load(i);
481 
482  } // End of while ------------------------------------------------------
483 
484  });
485 
486  _worker_maps.insert({_threads.back().get_id(), i});
487  }
488 }
489 
490 // Function: is_owner
491 template <typename Closure>
493  return std::this_thread::get_id() == _owner;
494 }
495 
496 // Function: num_workers
497 template <typename Closure>
499  return _threads.size();
500 }
501 
502 // Procedure: _balance_load
503 template <typename Closure>
505 
506  auto n = _workers[me].queue.size();
507 
508  // return if no idler - this might not be the right value
509  // but it doesn't affect the correctness
510  if(_idlers.empty() || n <= 1) {
511  return;
512  }
513 
514  // try with probability 1/n
515  //if(_fast_modulo(_randomize(_workers[me].seed), n) == 0u) {
516  // wake up my partner to help balance
517  if(_mutex.try_lock()) {
518  if(!_idlers.empty()) {
519  Worker* w = _idlers.back();
520  _idlers.pop_back();
521  w->ready = true;
522  w->cache = _workers[me].queue.pop();
523  w->cv.notify_one();
524  w->last_victim = me;
525  }
526  _mutex.unlock();
527  }
528  //}
529 }
530 
531 // Function: _steal
532 template <typename Closure>
533 std::optional<Closure> WorkStealingThreadpool<Closure>::_steal(unsigned thief) {
534 
535  std::optional<Closure> task;
536 
537  for(int round=0; round<1024; ++round) {
538 
539  // try getting a task from the centralized queue
540  if(task = _queue.steal(); task) {
541  return task;
542  }
543 
544  // try stealing a task from other workers
545  unsigned victim = _workers[thief].last_victim;
546 
547  for(unsigned i=0; i<_workers.size(); i++){
548 
549  if(victim != thief) {
550  if(task = _workers[victim].queue.steal(); task){
551  _workers[thief].last_victim = victim;
552  return task;
553  }
554  }
555 
556  if(++victim; victim == _workers.size()){
557  victim = 0;
558  }
559  }
560 
561  // nothing happens this round
563  }
564 
565  return std::nullopt;
566 }
567 
568 // Procedure: emplace
569 template <typename Closure>
570 template <typename... ArgsT>
572 
573  //no worker thread available
574  if(num_workers() == 0){
575  Closure{std::forward<ArgsT>(args)...}();
576  return;
577  }
578 
579  // caller is not the owner
580  if(auto tid = std::this_thread::get_id(); tid != _owner){
581 
582  // the caller is the worker of the threadpool
583  if(auto itr = _worker_maps.find(tid); itr != _worker_maps.end()){
584 
585  unsigned me = itr->second;
586 
587  // dfs speculation
588  if(!_workers[me].cache){
589  _workers[me].cache.emplace(std::forward<ArgsT>(args)...);
590  }
591  // bfs load balancing
592  else {
593  _workers[me].queue.push(Closure{std::forward<ArgsT>(args)...});
594  }
595  return;
596  }
597  }
598 
599  std::scoped_lock lock(_mutex);
600 
601  if(_idlers.empty()){
602  _queue.push(Closure{std::forward<ArgsT>(args)...});
603  }
604  else{
605  Worker* w = _idlers.back();
606  _idlers.pop_back();
607  w->ready = true;
608  w->cache.emplace(std::forward<ArgsT>(args)...);
609  w->cv.notify_one();
610  }
611 }
612 
613 // Procedure: batch
614 template <typename Closure>
616 
617  if(tasks.empty()) {
618  return;
619  }
620 
621  //no worker thread available
622  if(num_workers() == 0){
623  for(auto &t: tasks){
624  t();
625  }
626  return;
627  }
628 
629  // caller is not the owner
630  if(auto tid = std::this_thread::get_id(); tid != _owner){
631 
632  // the caller is the worker of the threadpool
633  if(auto itr = _worker_maps.find(tid); itr != _worker_maps.end()){
634 
635  unsigned me = itr->second;
636 
637  size_t i = 0;
638 
639  if(!_workers[me].cache) {
640  _workers[me].cache = std::move(tasks[i++]);
641  }
642 
643  for(; i<tasks.size(); ++i) {
644  _workers[me].queue.push(std::move(tasks[i]));
645  }
646 
647  return;
648  }
649  }
650 
651  std::scoped_lock lock(_mutex);
652 
653  size_t N = std::min(tasks.size(), _idlers.size());
654 
655  for(size_t k=N; k<tasks.size(); ++k) {
656  _queue.push(std::move(tasks[k]));
657  }
658 
659  for(size_t i=0; i<N; ++i) {
660  Worker* w = _idlers.back();
661  _idlers.pop_back();
662  w->ready = true;
663  w->cache = std::move(tasks[i]);
664  w->cv.notify_one();
665  }
666 }
667 
668 }; // end of namespace tf. ---------------------------------------------------
669 
670 
671 
672 
673 
bool is_owner() const
queries if the caller is the owner of the executor
Definition: workstealing_threadpool.hpp:492
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: workstealing_threadpool.hpp:189
~WorkStealingQueue()
destructs the queue
Definition: workstealing_threadpool.hpp:180
T yield(T... args)
void push(O &&item)
inserts an item to the queue
Definition: workstealing_threadpool.hpp:206
WorkStealingThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: workstealing_threadpool.hpp:376
Definition: taskflow.hpp:6
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: workstealing_threadpool.hpp:615
~WorkStealingThreadpool()
destructs the executor
Definition: workstealing_threadpool.hpp:383
Executor that implements an efficient work stealing algorithm.
Definition: workstealing_threadpool.hpp:294
size_t num_workers() const
queries the number of worker threads
Definition: workstealing_threadpool.hpp:498
T get_id(T... args)
int64_t capacity() const noexcept
queries the capacity of the queue
Definition: workstealing_threadpool.hpp:278
int64_t size() const noexcept
queries the number of items at the time of this call
Definition: workstealing_threadpool.hpp:197
std::optional< T > pop()
pops out an item from the queue
Definition: workstealing_threadpool.hpp:226
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: workstealing_threadpool.hpp:571
Lock-free unbounded single-producer multiple-consumer queue.
Definition: workstealing_threadpool.hpp:46
std::optional< T > steal()
steals an item from the queue
Definition: workstealing_threadpool.hpp:256
WorkStealingQueue(int64_t capacity=4096)
constructs the queue with a given capacity
Definition: workstealing_threadpool.hpp:170