Cpp-Taskflow  2.2.0
executor.hpp
1 // 2019/06/11 - modified by Tsung-Wei Huang
2 // - fixed the bug in calling observer while the user
3 // may clear the data
4 // - added object pool for nodes
5 //
6 // 2019/05/17 - modified by Chun-Xun Lin
7 // - moved topology to taskflow
8 // - TODO: can we use aggressive find_victim method
9 // to replace the spin?
10 // - TODO: need to check why one worker runs slower
11 // than sequential version
12 //
13 // 2019/05/14 - modified by Tsung-Wei Huang
14 // - isolated the executor from the taskflow
15 //
16 // 2019/04/09 - modified by Tsung-Wei Huang
17 // - removed silent_dispatch method
18 //
19 // 2019/03/12 - modified by Chun-Xun Lin
20 // - added taskflow
21 //
22 // 2019/02/11 - modified by Tsung-Wei Huang
23 // - refactored run_until
24 // - added allocator to topologies
25 // - changed to list for topologies
26 //
27 // 2019/02/10 - modified by Chun-Xun Lin
28 // - added run_n to execute taskflow
29 // - finished first peer-review with TW
30 //
31 // 2018/07 - 2019/02/09 - missing logs
32 //
33 // 2018/06/30 - created by Tsung-Wei Huang
34 // - added BasicTaskflow template
35 
36 // TODO items:
37 // 1. come up with a better way to remove the "joined" links
38 // during the execution of a static node (1st layer)
39 //
40 
41 #pragma once
42 
43 #include <iostream>
44 #include <vector>
45 #include <cstdlib>
46 #include <cstdio>
47 #include <random>
48 #include <atomic>
49 #include <memory>
50 #include <deque>
51 #include <optional>
52 #include <thread>
53 #include <algorithm>
54 #include <set>
55 #include <numeric>
56 #include <cassert>
57 
58 #include "spmc_queue.hpp"
59 #include "notifier.hpp"
60 #include "observer.hpp"
61 #include "taskflow.hpp"
62 
63 namespace tf {
64 
73 class Executor {
74 
75  struct Worker {
76  std::mt19937 rdgen { std::random_device{}() };
78  };
79 
80  struct PerThread {
81  Executor* pool {nullptr};
82  int worker_id {-1};
83  };
84 
85  public:
86 
90  explicit Executor(unsigned n = std::thread::hardware_concurrency());
91 
95  ~Executor();
96 
104  std::future<void> run(Taskflow& taskflow);
105 
114  template<typename C>
115  std::future<void> run(Taskflow& taskflow, C&& callable);
116 
125  std::future<void> run_n(Taskflow& taskflow, size_t N);
126 
136  template<typename C>
137  std::future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
138 
148  template<typename P>
149  std::future<void> run_until(Taskflow& taskflow, P&& pred);
150 
161  template<typename P, typename C>
162  std::future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
163 
167  void wait_for_all();
168 
174  size_t num_workers() const;
175 
189  template<typename Observer, typename... Args>
190  Observer* make_observer(Args&&... args);
191 
195  void remove_observer();
196 
197  private:
198 
199  std::condition_variable _topology_cv;
200  std::mutex _topology_mutex;
201 
202  unsigned _num_topologies {0};
203 
204  // scheduler field
205  std::vector<Worker> _workers;
207  std::vector<std::thread> _threads;
208 
210 
211  std::atomic<size_t> _num_actives {0};
212  std::atomic<size_t> _num_thieves {0};
213  //std::atomic<size_t> _num_idlers {0};
214  std::atomic<bool> _done {0};
215 
216  Notifier _notifier;
217 
219 
220  unsigned _find_victim(unsigned);
221 
222  PerThread& _per_thread() const;
223 
224  bool _wait_for_tasks(unsigned, std::optional<Node*>&);
225 
226  void _spawn(unsigned);
227  void _exploit_task(unsigned, std::optional<Node*>&);
228  void _explore_task(unsigned, std::optional<Node*>&);
229  void _schedule(Node*);
230  void _schedule(PassiveVector<Node*>&);
231  void _invoke(unsigned, Node*);
232  void _invoke_static_work(unsigned, Node*);
233  void _invoke_dynamic_work(unsigned, Node*, Subflow&);
234  void _init_module_node(Node*);
235  void _tear_down_topology(Topology*);
236 };
237 
238 // Constructor
239 inline Executor::Executor(unsigned N) :
240  _workers {N},
241  _waiters {N},
242  _notifier {_waiters} {
243  _spawn(N);
244 }
245 
246 // Destructor
248 
249  // wait for all topologies to complete
250  wait_for_all();
251 
252  // shut down the scheduler
253  _done = true;
254  _notifier.notify(true);
255 
256  for(auto& t : _threads){
257  t.join();
258  }
259 }
260 
261 // Function: num_workers
262 inline size_t Executor::num_workers() const {
263  return _workers.size();
264 }
265 
266 // Function: _per_thread
267 inline Executor::PerThread& Executor::_per_thread() const {
268  thread_local PerThread pt;
269  return pt;
270 }
271 
272 // Procedure: _spawn
273 inline void Executor::_spawn(unsigned N) {
274 
275  // Lock to synchronize all workers before creating _worker_maps
276  for(unsigned i=0; i<N; ++i) {
277  _threads.emplace_back([this, i] () -> void {
278 
279  PerThread& pt = _per_thread();
280  pt.pool = this;
281  pt.worker_id = i;
282 
283  std::optional<Node*> t;
284 
285  // must use 1 as condition instead of !done
286  while(1) {
287 
288  // execute the tasks.
289  run_task:
290  _exploit_task(i, t);
291 
292  // steal loop
293  if(_explore_task(i, t); t) {
294  goto run_task;
295  }
296 
297  // wait for tasks
298  if(_wait_for_tasks(i, t) == false) {
299  break;
300  }
301  }
302 
303  });
304  }
305 }
306 
307 // Function: _find_victim
308 inline unsigned Executor::_find_victim(unsigned thief) {
309 
310  unsigned l = 0;
311  unsigned r = _workers.size() - 1;
312  unsigned vtm = std::uniform_int_distribution<unsigned>{l, r}(
313  _workers[thief].rdgen
314  );
315 
316  // try to look for a task from other workers
317  for(unsigned i=0; i<_workers.size(); ++i){
318 
319  if((thief == vtm && !_queue.empty()) ||
320  (thief != vtm && !_workers[vtm].queue.empty())) {
321  return vtm;
322  }
323 
324  if(++vtm; vtm == _workers.size()) {
325  vtm = 0;
326  }
327  }
328 
329  /*// try to look for a task from other workers
330  for(unsigned vtm=0; vtm<_workers.size(); ++vtm){
331  if((thief == vtm && !_queue.empty()) ||
332  (thief != vtm && !_workers[vtm].queue.empty())) {
333  return vtm;
334  }
335  }*/
336 
337  return _workers.size();
338 }
339 
340 // Function: _explore_task
341 inline void Executor::_explore_task(unsigned thief, std::optional<Node*>& t) {
342 
343  //assert(_workers[thief].queue.empty());
344  assert(!t);
345 
346  //const unsigned l = 0;
347  //const unsigned r = _workers.size() - 1;
348 
349  const size_t F = (_workers.size() + 1);
350  const size_t Y = 100;
351 
352  steal_loop:
353 
354  size_t f = 0;
355  size_t y = 0;
356 
357  ++_num_thieves;
358 
359  // explore
360  while(!_done) {
361 
362  /*unsigned vtm = std::uniform_int_distribution<unsigned>{l, r}(
363  _workers[thief].rdgen
364  );
365 
366  t = (vtm == thief) ? _queue.steal() : _workers[vtm].queue.steal();
367 
368  if(t) {
369  break;
370  }
371 
372  if(f++ > F) {
373  if(std::this_thread::yield(); y++ > 100) {
374  break;
375  }
376  }*/
377 
378  if(auto vtm = _find_victim(thief); vtm != _workers.size()) {
379  t = (vtm == thief) ? _queue.steal() : _workers[vtm].queue.steal();
380  // successful thief
381  if(t) {
382  break;
383  }
384  }
385  else {
386  if(f++ > F) {
387  if(std::this_thread::yield(); y++ > Y) {
388  break;
389  }
390  }
391  }
392  }
393 
394  // We need to ensure at least one thieve if there is an
395  // active worker
396  if(auto N = --_num_thieves; N == 0) {
397  if(t != std::nullopt) {
398  _notifier.notify(false);
399  return;
400  }
401  else if(_num_actives > 0) {
402  goto steal_loop;
403  }
404  }
405 }
406 
407 // Procedure: _exploit_task
408 inline void Executor::_exploit_task(unsigned i, std::optional<Node*>& t) {
409 
410  if(t) {
411 
412  auto& worker = _workers[i];
413 
414  if(++_num_actives; _num_thieves == 0) {
415  _notifier.notify(false);
416  }
417 
418  do {
419  _invoke(i, *t);
420  t = worker.queue.pop();
421  } while(t);
422 
423  --_num_actives;
424  }
425 }
426 
427 // Function: _wait_for_tasks
428 inline bool Executor::_wait_for_tasks(unsigned me, std::optional<Node*>& t) {
429 
430  assert(!t);
431 
432  _notifier.prepare_wait(&_waiters[me]);
433 
434  if(auto vtm = _find_victim(me); vtm != _workers.size()) {
435  _notifier.cancel_wait(&_waiters[me]);
436  t = (vtm == me) ? _queue.steal() : _workers[vtm].queue.steal();
437  return true;
438  }
439 
440  //if(size_t I = ++_num_idlers; _done && I == _workers.size()) {
441  // _notifier.cancel_wait(&_waiters[me]);
442  // //if(_find_victim(me) != _workers.size()) {
443  // // --_num_idlers;
444  // // return true;
445  // //}
446  // _notifier.notify(true);
447  // return false;
448  //}
449 
450  if(_done) {
451  _notifier.cancel_wait(&_waiters[me]);
452  _notifier.notify(true);
453  return false;
454  }
455 
456  // Now I really need to relinguish my self to others
457  _notifier.commit_wait(&_waiters[me]);
458  //--_num_idlers;
459 
460  return true;
461 }
462 
463 // Function: make_observer
464 template<typename Observer, typename... Args>
465 Observer* Executor::make_observer(Args&&... args) {
466  // use a local variable to mimic the constructor
467  auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
468  tmp->set_up(std::max(size_t{1}, _workers.size()));
469  _observer = std::move(tmp);
470  return static_cast<Observer*>(_observer.get());
471 }
472 
473 // Procedure: remove_observer
475  _observer.reset();
476 }
477 
478 // Procedure: _schedule
479 // The main procedure to schedule a give task node.
480 // Each task node has two types of tasks - regular and subflow.
481 inline void Executor::_schedule(Node* node) {
482 
483  // module node need another initialization
484  if(node->_module != nullptr && !node->is_spawned()) {
485  _init_module_node(node);
486  }
487 
488  //no worker thread available
489  if(_workers.size() == 0){
490  _invoke(0, node);
491  return;
492  }
493 
494  // caller is a worker to this pool
495  if(auto& pt = _per_thread(); pt.pool == this) {
496  _workers[pt.worker_id].queue.push(node);
497  return;
498  }
499 
500  // master threads
501  _queue.push(node);
502 
503  _notifier.notify(false);
504 }
505 
506 // Procedure: _schedule
507 // The main procedure to schedule a set of task nodes.
508 // Each task node has two types of tasks - regular and subflow.
509 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
510 
511  // We need to cacth the node count to avoid accessing the nodes
512  // vector while the parent topology is removed!
513  const auto num_nodes = nodes.size();
514 
515  if(num_nodes == 0) {
516  return;
517  }
518 
519  for(auto node : nodes) {
520  if(node->_module != nullptr && !node->is_spawned()) {
521  _init_module_node(node);
522  }
523  }
524 
525  //no worker thread available
526  if(_workers.size() == 0){
527  for(auto node: nodes){
528  _invoke(0, node);
529  }
530  return;
531  }
532 
533  // worker thread
534  if(auto& pt = _per_thread(); pt.pool == this) {
535  for(size_t i=0; i<num_nodes; ++i) {
536  _workers[pt.worker_id].queue.push(nodes[i]);
537  }
538  return;
539  }
540 
541  // master thread
542  for(size_t k=0; k<num_nodes; ++k) {
543  _queue.push(nodes[k]);
544  _notifier.notify(false);
545  }
546 
547  //size_t N = std::max(size_t{1}, std::min(_num_idlers.load(), num_nodes));
548 
549  //if(N >= _workers.size()) {
550  // _notifier.notify(true);
551  //}
552  //else {
553  // for(size_t k=0; k<N; ++k) {
554  // _notifier.notify(false);
555  // }
556  //}
557 }
558 
559 // Procedure: _init_module_node
560 inline void Executor::_init_module_node(Node* node) {
561 
562  node->_work = [node=node, this, tgt{PassiveVector<Node*>()}] () mutable {
563 
564  // second time to enter this context
565  if(node->is_spawned()) {
566  node->_dependents.resize(node->_dependents.size()-tgt.size());
567  for(auto& t: tgt) {
568  t->_successors.clear();
569  }
570  return ;
571  }
572 
573  // first time to enter this context
574  node->set_spawned();
575 
576  PassiveVector<Node*> src;
577 
578  for(auto n: node->_module->_graph.nodes()) {
579  n->_topology = node->_topology;
580  if(n->num_dependents() == 0) {
581  src.push_back(n);
582  }
583  if(n->num_successors() == 0) {
584  n->precede(*node);
585  tgt.push_back(n);
586  }
587  }
588 
589  _schedule(src);
590  };
591 }
592 
593 // Procedure:
594 inline void Executor::_invoke(unsigned me, Node* node) {
595 
596  // Here we need to fetch the num_successors first to avoid the invalid memory
597  // access caused by topology clear.
598  const auto num_successors = node->num_successors();
599 
600  // regular node type
601  // The default node work type. We only need to execute the callback if any.
602  if(auto index=node->_work.index(); index == 0) {
603  if(node->_module != nullptr) {
604  bool first_time = !node->is_spawned();
605  _invoke_static_work(me, node);
606  if(first_time) {
607  return ;
608  }
609  }
610  else {
611  if(auto &f = std::get<Node::StaticWork>(node->_work); f != nullptr){
612  _invoke_static_work(me, node);
613  }
614  }
615  }
616  // subflow node type
617  else {
618 
619  // Clear the subgraph before the task execution
620  if(!node->is_spawned()) {
621  node->_subgraph.emplace();
622  }
623 
624  Subflow fb(*(node->_subgraph));
625 
626  _invoke_dynamic_work(me, node, fb);
627 
628  // Need to create a subflow if first time & subgraph is not empty
629  if(!node->is_spawned()) {
630  node->set_spawned();
631  if(!node->_subgraph->empty()) {
632  // For storing the source nodes
633  PassiveVector<Node*> src;
634  for(auto n: node->_subgraph->nodes()) {
635  n->_topology = node->_topology;
636  n->set_subtask();
637  if(n->num_successors() == 0) {
638  if(fb.detached()) {
639  node->_topology->_num_sinks ++;
640  }
641  else {
642  n->precede(*node);
643  }
644  }
645  if(n->num_dependents() == 0) {
646  src.push_back(n);
647  }
648  }
649 
650  _schedule(src);
651 
652  if(!fb.detached()) {
653  return;
654  }
655  }
656  }
657  } // End of DynamicWork -----------------------------------------------------
658 
659  // Recover the runtime change due to dynamic tasking except the target & spawn tasks
660  // This must be done before scheduling the successors, otherwise this might cause
661  // race condition on the _dependents
662  //if(num_successors && !node->_subtask) {
663  if(!node->is_subtask()) {
664  // Only dynamic tasking needs to restore _dependents
665  // TODO:
666  if(node->_work.index() == 1 && !node->_subgraph->empty()) {
667  while(!node->_dependents.empty() && node->_dependents.back()->is_subtask()) {
668  node->_dependents.pop_back();
669  }
670  }
671  node->_num_dependents = static_cast<int>(node->_dependents.size());
672  node->unset_spawned();
673  }
674 
675  // At this point, the node storage might be destructed.
676  for(size_t i=0; i<num_successors; ++i) {
677  if(--(node->_successors[i]->_num_dependents) == 0) {
678  _schedule(node->_successors[i]);
679  }
680  }
681 
682  // A node without any successor should check the termination of topology
683  if(num_successors == 0) {
684  if(--(node->_topology->_num_sinks) == 0) {
685  if(_workers.size() > 0) { // finishing this topology
686  _tear_down_topology(node->_topology);
687  }
688  }
689  }
690 }
691 
692 // Procedure: _invoke_static_work
693 inline void Executor::_invoke_static_work(unsigned me, Node* node) {
694  if(_observer) {
695  _observer->on_entry(me, TaskView(node));
696  std::invoke(std::get<Node::StaticWork>(node->_work));
697  _observer->on_exit(me, TaskView(node));
698  }
699  else {
700  std::invoke(std::get<Node::StaticWork>(node->_work));
701  }
702 }
703 
704 // Procedure: _invoke_dynamic_work
705 inline void Executor::_invoke_dynamic_work(unsigned me, Node* node, Subflow& sf) {
706  if(_observer) {
707  _observer->on_entry(me, TaskView(node));
708  std::invoke(std::get<Node::DynamicWork>(node->_work), sf);
709  _observer->on_exit(me, TaskView(node));
710  }
711  else {
712  std::invoke(std::get<Node::DynamicWork>(node->_work), sf);
713  }
714 }
715 
716 // Function: run
718  return run_n(f, 1, [](){});
719 }
720 
721 // Function: run
722 template <typename C>
724  static_assert(std::is_invocable<C>::value);
725  return run_n(f, 1, std::forward<C>(c));
726 }
727 
728 // Function: run_n
729 inline std::future<void> Executor::run_n(Taskflow& f, size_t repeat) {
730  return run_n(f, repeat, [](){});
731 }
732 
733 // Function: run_n
734 template <typename C>
735 std::future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
736  return run_until(f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c));
737 }
738 
739 // Function: run_until
740 template<typename P>
742  return run_until(f, std::forward<P>(pred), [](){});
743 }
744 
745 // Function: _tear_down_topology
746 inline void Executor::_tear_down_topology(Topology* tpg) {
747 
748  auto &f = tpg->_taskflow;
749 
750  //assert(&tpg == &(f._topologies.front()));
751 
752  // case 1: we still need to run the topology again
753  if(!std::invoke(tpg->_pred)) {
754  tpg->_recover_num_sinks();
755  _schedule(tpg->_sources);
756  }
757  // case 2: the final run of this topology
758  else {
759 
760  if(tpg->_call != nullptr) {
761  std::invoke(tpg->_call);
762  }
763 
764  f._mtx.lock();
765 
766  // If there is another run (interleave between lock)
767  if(f._topologies.size() > 1) {
768 
769  // Set the promise
770  tpg->_promise.set_value();
771  f._topologies.pop_front();
772  f._mtx.unlock();
773 
774  {
775  std::scoped_lock lock(_topology_mutex);
776  _num_topologies--;
777  }
778 
779  f._topologies.front()._bind(f._graph);
780  _schedule(f._topologies.front()._sources);
781  }
782  else {
783  assert(f._topologies.size() == 1);
784  // Need to back up the promise first here becuz taskflow might be
785  // destroy before taskflow leaves
786  auto p {std::move(tpg->_promise)};
787 
788  f._topologies.pop_front();
789 
790  f._mtx.unlock();
791 
792  // We set the promise in the end in case taskflow leaves before taskflow
793  p.set_value();
794  {
795  std::scoped_lock lock(_topology_mutex);
796  _num_topologies--;
797  }
798  _topology_cv.notify_one();
799  }
800  }
801 }
802 
803 // Function: run_until
804 template <typename P, typename C>
806 
807  // Predicate must return a boolean value
808  static_assert(std::is_invocable_v<C> && std::is_invocable_v<P>);
809 
810  if(std::invoke(pred)) {
811  return std::async(std::launch::deferred, [](){});
812  }
813 
814  // Speicla case of zero workers needs
815  // - iterative execution to avoid stack overflow
816  // - avoid execution of last_work
817  if(_workers.size() == 0) {
818 
819  Topology tpg(f, std::forward<P>(pred), std::forward<C>(c));
820 
821  // Clear last execution data & Build precedence between nodes and target
822  tpg._bind(f._graph);
823 
824  do {
825  _schedule(tpg._sources);
826  tpg._recover_num_sinks();
827  } while(!std::invoke(tpg._pred));
828 
829  if(tpg._call != nullptr) {
830  std::invoke(tpg._call);
831  }
832 
833  return std::async(std::launch::deferred, [](){});
834  }
835 
836  // has worker(s)
837  {
838  std::scoped_lock lock(_topology_mutex);
839  _num_topologies++;
840  }
841 
842  // Multi-threaded execution.
843  bool run_now {false};
844  Topology* tpg;
845  std::future<void> future;
846 
847  {
848  std::scoped_lock lock(f._mtx);
849 
850  // create a topology for this run
851  tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
852  future = tpg->_promise.get_future();
853 
854  if(f._topologies.size() == 1) {
855  run_now = true;
856  //tpg->_bind(f._graph);
857  //_schedule(tpg->_sources);
858  }
859  }
860 
861  // Notice here calling schedule may cause the topology to be removed sonner
862  // before the function leaves.
863  if(run_now) {
864  tpg->_bind(f._graph);
865  _schedule(tpg->_sources);
866  }
867 
868  return future;
869 }
870 
871 // Procedure: wait_for_all
872 inline void Executor::wait_for_all() {
873  std::unique_lock lock(_topology_mutex);
874  _topology_cv.wait(lock, [&](){ return _num_topologies == 0; });
875 }
876 
877 } // end of namespace tf -----------------------------------------------------
878 
879 
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:717
void remove_observer()
removes the associated observer
Definition: executor.hpp:474
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: spmc_queue.hpp:182
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:741
~Executor()
destructs the executor
Definition: executor.hpp:247
T yield(T... args)
void push(O &&item)
inserts an item to the queue
Definition: spmc_queue.hpp:199
Definition: taskflow.hpp:5
T hardware_concurrency(T... args)
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:783
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:465
the class to create a task dependency graph
Definition: core/taskflow.hpp:15
A constant wrapper class to a task node, mainly used in the tf::ExecutorObserver interface.
Definition: task.hpp:300
Lock-free unbounded single-producer multiple-consumer queue.
Definition: spmc_queue.hpp:29
The executor class to run a taskflow graph.
Definition: executor.hpp:73
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:262
std::optional< T > steal()
steals an item from the queue
Definition: spmc_queue.hpp:269
Executor(unsigned n=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition: executor.hpp:239
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:731
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:729
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:872