Cpp-Taskflow  2.1.0
workstealing_threadpool.hpp
1 // 2019/02/10 - modified by Tsung-Wei Huang
2 // - modified WorkStealingThreadpool with notifier
3 // - modified the stealing loop
4 // - improved the performance
5 //
6 // 2019/01/03 - modified by Tsung-Wei Huang
7 // - updated the load balancing strategy
8 //
9 // 2018/12/24 - modified by Tsung-Wei Huang
10 // - refined the work balancing strategy
11 //
12 // 2018/12/06 - modified by Tsung-Wei Huang
13 // - refactored the code
14 // - added load balancing strategy
15 // - removed the storage alignment in WorkStealingQueue
16 //
17 // 2018/12/03 - created by Tsung-Wei Huang
18 // - added WorkStealingQueue class
19 
20 #pragma once
21 
22 #include "notifier.hpp"
23 
24 namespace tf {
25 
38 template <typename T>
40 
41  //constexpr static int64_t cacheline_size = 64;
42 
43  //using storage_type = std::aligned_storage_t<sizeof(T), cacheline_size>;
44 
45  struct Array {
46 
47  int64_t C;
48  int64_t M;
49  //storage_type* S;
50  T* S;
51 
52  Array(int64_t c) :
53  C {c},
54  M {c-1},
55  //S {new storage_type[C]} {
56  S {new T[static_cast<size_t>(C)]} {
57  //for(int64_t i=0; i<C; ++i) {
58  // ::new (std::addressof(S[i])) T();
59  //}
60  }
61 
62  ~Array() {
63  //for(int64_t i=0; i<C; ++i) {
64  // reinterpret_cast<T*>(std::addressof(S[i]))->~T();
65  //}
66  delete [] S;
67  }
68 
69  int64_t capacity() const noexcept {
70  return C;
71  }
72 
73  template <typename O>
74  void push(int64_t i, O&& o) noexcept {
75  //T* ptr = reinterpret_cast<T*>(std::addressof(S[i & M]));
76  //*ptr = std::forward<O>(o);
77  S[i & M] = std::forward<O>(o);
78  }
79 
80  T pop(int64_t i) noexcept {
81  //return *reinterpret_cast<T*>(std::addressof(S[i & M]));
82  return S[i & M];
83  }
84 
85  Array* resize(int64_t b, int64_t t) {
86  Array* ptr = new Array {2*C};
87  for(int64_t i=t; i!=b; ++i) {
88  ptr->push(i, pop(i));
89  }
90  return ptr;
91  }
92 
93  };
94 
96  std::atomic<int64_t> _bottom;
97  std::atomic<Array*> _array;
98  std::vector<Array*> _garbage;
99  //char _padding[cacheline_size];
100 
101  public:
102 
108  WorkStealingQueue(int64_t capacity = 4096);
109 
114 
118  bool empty() const noexcept;
119 
123  int64_t size() const noexcept;
124 
128  int64_t capacity() const noexcept;
129 
141  template <typename O>
142  void push(O&& item);
143 
150  std::optional<T> pop();
151 
158  std::optional<T> steal();
159 };
160 
161 // Constructor
162 template <typename T>
164  assert(c && (!(c & (c-1))));
165  _top.store(0, std::memory_order_relaxed);
166  _bottom.store(0, std::memory_order_relaxed);
167  _array.store(new Array{c}, std::memory_order_relaxed);
168  _garbage.reserve(32);
169 }
170 
171 // Destructor
172 template <typename T>
174  for(auto a : _garbage) {
175  delete a;
176  }
177  delete _array.load();
178 }
179 
180 // Function: empty
181 template <typename T>
182 bool WorkStealingQueue<T>::empty() const noexcept {
183  int64_t b = _bottom.load(std::memory_order_relaxed);
184  int64_t t = _top.load(std::memory_order_relaxed);
185  return b <= t;
186 }
187 
188 // Function: size
189 template <typename T>
190 int64_t WorkStealingQueue<T>::size() const noexcept {
191  int64_t b = _bottom.load(std::memory_order_relaxed);
192  int64_t t = _top.load(std::memory_order_relaxed);
193  return b - t;
194 }
195 
196 // Function: push
197 template <typename T>
198 template <typename O>
200  int64_t b = _bottom.load(std::memory_order_relaxed);
201  int64_t t = _top.load(std::memory_order_acquire);
202  Array* a = _array.load(std::memory_order_relaxed);
203 
204  // queue is full
205  if(a->capacity() - 1 < (b - t)) {
206  Array* tmp = a->resize(b, t);
207  _garbage.push_back(a);
208  std::swap(a, tmp);
209  _array.store(a, std::memory_order_relaxed);
210  }
211 
212  a->push(b, std::forward<O>(o));
213  std::atomic_thread_fence(std::memory_order_release);
214  _bottom.store(b + 1, std::memory_order_relaxed);
215 }
216 
217 // Function: pop
218 template <typename T>
219 std::optional<T> WorkStealingQueue<T>::pop() {
220  int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
221  Array* a = _array.load(std::memory_order_relaxed);
222  _bottom.store(b, std::memory_order_relaxed);
223  std::atomic_thread_fence(std::memory_order_seq_cst);
224  int64_t t = _top.load(std::memory_order_relaxed);
225 
226  std::optional<T> item;
227 
228  if(t <= b) {
229  item = a->pop(b);
230  if(t == b) {
231  // the last item just got stolen
232  if(!_top.compare_exchange_strong(t, t+1,
233  std::memory_order_seq_cst,
234  std::memory_order_relaxed)) {
235  item = std::nullopt;
236  }
237  _bottom.store(b + 1, std::memory_order_relaxed);
238  }
239  }
240  else {
241  _bottom.store(b + 1, std::memory_order_relaxed);
242  }
243 
244  return item;
245 }
246 
247 // Function: steal
248 template <typename T>
249 std::optional<T> WorkStealingQueue<T>::steal() {
250  int64_t t = _top.load(std::memory_order_acquire);
251  std::atomic_thread_fence(std::memory_order_seq_cst);
252  int64_t b = _bottom.load(std::memory_order_acquire);
253 
254  std::optional<T> item;
255 
256  if(t < b) {
257  Array* a = _array.load(std::memory_order_consume);
258  item = a->pop(t);
259  if(!_top.compare_exchange_strong(t, t+1,
260  std::memory_order_seq_cst,
261  std::memory_order_relaxed)) {
262  return std::nullopt;
263  }
264  }
265 
266  return item;
267 }
268 
269 // Function: capacity
270 template <typename T>
271 int64_t WorkStealingQueue<T>::capacity() const noexcept {
272  return _array.load(std::memory_order_relaxed)->capacity();
273 }
274 
275 // ----------------------------------------------------------------------------
276 
284 template <typename Closure>
286 
287  struct Worker {
289  std::optional<Closure> cache;
290  bool exit {false};
291  unsigned last_victim;
292  };
293 
294  struct PerThread {
295  WorkStealingThreadpool* pool {nullptr};
296  int thread_id {-1};
297  };
298 
299  public:
300 
306  explicit WorkStealingThreadpool(unsigned N);
307 
315 
319  size_t num_workers() const;
320 
324  bool is_owner() const;
325 
333  template <typename... ArgsT>
334  void emplace(ArgsT&&... args);
335 
341  void batch(std::vector<Closure>&& closures);
342 
343  private:
344 
345  const std::thread::id _owner {std::this_thread::get_id()};
346 
347  std::mutex _mutex;
348 
349  std::vector<Worker> _workers;
350  std::vector<std::thread> _threads;
352 
354 
355  Notifier _notifier;
356 
357  std::atomic<size_t> _num_idlers {0};
358  std::atomic<bool> _spinning {false};
359 
360  void _spawn(unsigned);
361 
362  unsigned _randomize(uint64_t&) const;
363  unsigned _fast_modulo(unsigned, unsigned) const;
364 
365  PerThread& _per_thread() const;
366 
367  std::optional<Closure> _steal(unsigned);
368 };
369 
370 // Constructor
371 template <typename Closure>
373  _workers {N},
374  _waiters {N},
375  _notifier{_waiters} {
376 
377  _spawn(N);
378 }
379 
380 // Destructor
381 template <typename Closure>
383 
384  {
385  std::scoped_lock lock(_mutex);
386  for(auto& w : _workers){
387  w.exit = true;
388  }
389  }
390 
391  _notifier.notify(true);
392 
393  for(auto& t : _threads){
394  t.join();
395  }
396 }
397 
398 // Function: _per_thread
399 template <typename Closure>
402  thread_local PerThread pt;
403  return pt;
404 }
405 
406 // Function: _randomize
407 template <typename Closure>
408 unsigned WorkStealingThreadpool<Closure>::_randomize(uint64_t& state) const {
409  uint64_t current = state;
410  state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
411  // Generate the random output (using the PCG-XSH-RS scheme)
412  return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
413 }
414 
415 // http://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
416 template <typename Closure>
417 unsigned WorkStealingThreadpool<Closure>::_fast_modulo(unsigned x, unsigned N) const {
418  return ((uint64_t) x * (uint64_t) N) >> 32;
419 }
420 
421 // Procedure: _spawn
422 template <typename Closure>
423 void WorkStealingThreadpool<Closure>::_spawn(unsigned N) {
424 
425  // Lock to synchronize all workers before creating _worker_mapss
426  for(unsigned i=0; i<N; ++i) {
427  _threads.emplace_back([this, i, N] () -> void {
428 
429  PerThread& pt = _per_thread();
430  pt.pool = this;
431  pt.thread_id = i;
432 
433  auto& waiter = _waiters[i];
434  auto& worker = _workers[i];
435 
436  worker.last_victim = (i + 1) % N;
437 
438  std::optional<Closure> t;
439 
440  while(!worker.exit) {
441 
442  // pop from my own queue
443  if(t = worker.queue.pop(); !t) {
444  // steal from others
445  t = _steal(i);
446  }
447 
448  // Leave one thread to spin to reduce the latency
449  if (!t && !_spinning && !_spinning.exchange(true)) {
450  for(int r = 0; r < 1024 && !worker.exit && !t; r++) {
451  t = _steal(i);
452  }
453  _spinning = false;
454  }
455 
456  // Now we are going to preempt this worker thread
457  if(!t) {
458 
459  _notifier.prepare_wait(&waiter);
460 
461  bool commit {true};
462 
463  if(_mutex.try_lock()) {
464  if(worker.exit) {
465  commit = false;
466  }
467  else {
468  if(!_queue.empty()) {
469  commit = false;
470  t = _queue.pop();
471  }
472  }
473  _mutex.unlock();
474  }
475  else {
476  commit = false;
477  }
478 
479  // commit the wait if the flag is on
480  if(commit) {
481  _num_idlers++;
482  _notifier.commit_wait(&waiter);
483  _num_idlers--;
484  }
485  else {
486  _notifier.cancel_wait(&waiter);
487  }
488  }
489 
490  while(t) {
491  (*t)();
492  if(worker.cache) {
493  t = std::move(worker.cache);
494  worker.cache = std::nullopt;
495  }
496  else {
497  t = std::nullopt;
498  }
499  }
500  } // End of while ------------------------------------------------------
501 
502  });
503  }
504 }
505 
506 // Function: is_owner
507 template <typename Closure>
509  return std::this_thread::get_id() == _owner;
510 }
511 
512 // Function: num_workers
513 template <typename Closure>
515  return _workers.size();
516 }
517 
518 // Function: _steal
519 template <typename Closure>
520 std::optional<Closure> WorkStealingThreadpool<Closure>::_steal(unsigned thief) {
521 
522  std::optional<Closure> task;
523 
524  // try getting a task from the centralized queue
525  if(task = _queue.steal(); task) {
526  return task;
527  }
528 
529  // try stealing a task from other workers
530  unsigned victim = _workers[thief].last_victim;
531 
532  for(unsigned i=0; i<_workers.size(); i++){
533 
534  if(victim != thief) {
535  if(task = _workers[victim].queue.steal(); task){
536  _workers[thief].last_victim = victim;
537  return task;
538  }
539  }
540 
541  if(++victim; victim == _workers.size()){
542  victim = 0;
543  }
544  }
545 
546  return std::nullopt;
547 }
548 
549 // Procedure: emplace
550 template <typename Closure>
551 template <typename... ArgsT>
553 
554  //no worker thread available
555  if(num_workers() == 0){
556  Closure{std::forward<ArgsT>(args)...}();
557  return;
558  }
559 
560  auto& pt = _per_thread();
561 
562  // caller is a worker to this pool
563  if(pt.pool == this) {
564  if(!_workers[pt.thread_id].cache) {
565  _workers[pt.thread_id].cache.emplace(std::forward<ArgsT>(args)...);
566  return;
567  }
568  else {
569  _workers[pt.thread_id].queue.push(Closure{std::forward<ArgsT>(args)...});
570  }
571  }
572  // other threads
573  else {
574  std::scoped_lock lock(_mutex);
575  _queue.push(Closure{std::forward<ArgsT>(args)...});
576  }
577 
578  _notifier.notify(false);
579 }
580 
581 // Procedure: batch
582 template <typename Closure>
584 
585  if(tasks.empty()) {
586  return;
587  }
588 
589  //no worker thread available
590  if(num_workers() == 0){
591  for(auto &t: tasks){
592  t();
593  }
594  return;
595  }
596 
597  auto& pt = _per_thread();
598 
599  if(pt.pool == this) {
600 
601  size_t i = 0;
602 
603  if(!_workers[pt.thread_id].cache) {
604  _workers[pt.thread_id].cache = std::move(tasks[i++]);
605  }
606 
607  for(; i<tasks.size(); ++i) {
608  _workers[pt.thread_id].queue.push(std::move(tasks[i]));
609  _notifier.notify(false);
610  }
611 
612  return;
613  }
614 
615  {
616  std::scoped_lock lock(_mutex);
617 
618  for(size_t k=0; k<tasks.size(); ++k) {
619  _queue.push(std::move(tasks[k]));
620  }
621  }
622 
623  // We need to wake up at least one thread because _num_idlers may not be
624  // up-to-date.
625  size_t N = std::max(size_t{1}, std::min(_num_idlers.load(), tasks.size()));
626 
627  for(size_t i=0; i<N; ++i) {
628  _notifier.notify(false);
629  }
630 }
631 
632 } // end of namespace tf. ---------------------------------------------------
633 
634 
635 
636 
637 
bool is_owner() const
queries if the caller is the owner of the executor
Definition: workstealing_threadpool.hpp:508
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: workstealing_threadpool.hpp:182
~WorkStealingQueue()
destructs the queue
Definition: workstealing_threadpool.hpp:173
void push(O &&item)
inserts an item to the queue
Definition: workstealing_threadpool.hpp:199
WorkStealingThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: workstealing_threadpool.hpp:372
Definition: taskflow.hpp:6
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: workstealing_threadpool.hpp:583
~WorkStealingThreadpool()
destructs the executor
Definition: workstealing_threadpool.hpp:382
Executor that implements an efficient work stealing algorithm.
Definition: workstealing_threadpool.hpp:285
size_t num_workers() const
queries the number of worker threads
Definition: workstealing_threadpool.hpp:514
T move(T... args)
T get_id(T... args)
int64_t capacity() const noexcept
queries the capacity of the queue
Definition: workstealing_threadpool.hpp:271
int64_t size() const noexcept
queries the number of items at the time of this call
Definition: workstealing_threadpool.hpp:190
std::optional< T > pop()
pops out an item from the queue
Definition: workstealing_threadpool.hpp:219
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: workstealing_threadpool.hpp:552
Lock-free unbounded single-producer multiple-consumer queue.
Definition: workstealing_threadpool.hpp:39
std::optional< T > steal()
steals an item from the queue
Definition: workstealing_threadpool.hpp:249
WorkStealingQueue(int64_t capacity=4096)
constructs the queue with a given capacity
Definition: workstealing_threadpool.hpp:163