Cpp-Taskflow  2.3.1
executor.hpp
1 #pragma once
2 
3 #include <iostream>
4 #include <vector>
5 #include <cstdlib>
6 #include <cstdio>
7 #include <random>
8 #include <atomic>
9 #include <memory>
10 #include <deque>
11 #include <thread>
12 #include <algorithm>
13 #include <set>
14 #include <numeric>
15 #include <cassert>
16 
17 #include "tsq.hpp"
18 #include "notifier.hpp"
19 #include "observer.hpp"
20 #include "taskflow.hpp"
21 
22 namespace tf {
23 
33 class Executor {
34 
35  struct Worker {
36  unsigned id;
37  std::mt19937 rdgen { std::random_device{}() };
38  TaskQueue<Node*> queue;
39  Node* cache {nullptr};
40 
41  int num_executed {0};
42  };
43 
44  struct PerThread {
45  Worker* worker {nullptr};
46  };
47 
48  public:
49 
53  explicit Executor(unsigned n = std::thread::hardware_concurrency());
54 
58  ~Executor();
59 
67  std::future<void> run(Taskflow& taskflow);
68 
77  template<typename C>
78  std::future<void> run(Taskflow& taskflow, C&& callable);
79 
88  std::future<void> run_n(Taskflow& taskflow, size_t N);
89 
99  template<typename C>
100  std::future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
101 
111  template<typename P>
112  std::future<void> run_until(Taskflow& taskflow, P&& pred);
113 
124  template<typename P, typename C>
125  std::future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
126 
130  void wait_for_all();
131 
137  size_t num_workers() const;
138 
152  template<typename Observer, typename... Args>
153  Observer* make_observer(Args&&... args);
154 
158  void remove_observer();
159 
166  int this_worker_id() const;
167 
168  private:
169 
170  std::condition_variable _topology_cv;
171  std::mutex _topology_mutex;
172  std::mutex _queue_mutex;
173 
174  unsigned _num_topologies {0};
175 
176  // scheduler field
177  std::vector<Worker> _workers;
179  std::vector<std::thread> _threads;
180 
181  TaskQueue<Node*> _queue;
182 
183  std::atomic<size_t> _num_actives {0};
184  std::atomic<size_t> _num_thieves {0};
185  std::atomic<bool> _done {0};
186 
187  Notifier _notifier;
188 
190 
191  unsigned _find_victim(unsigned);
192 
193  PerThread& _per_thread() const;
194 
195  bool _wait_for_task(Worker&, Node*&);
196 
197  void _spawn(unsigned);
198  void _exploit_task(Worker&, Node*&);
199  void _explore_task(Worker&, Node*&);
200  void _schedule(Node*, bool);
201  void _schedule(PassiveVector<Node*>&);
202  void _invoke(Worker&, Node*);
203  void _invoke_static_work(Worker&, Node*);
204  void _invoke_dynamic_work(Worker&, Node*, Subflow&);
205  void _invoke_condition_work(Worker&, Node*, int&);
206 
207 #ifdef TF_ENABLE_CUDA
208  void _invoke_cudaflow_work(Worker&, Node*);
209  void _invoke_cudaflow_work_impl(Worker&, Node*);
210 #endif
211 
212  void _set_up_module_work(Node*, bool&);
213  void _set_up_topology(Topology*);
214  void _tear_down_topology(Topology**);
215  void _increment_topology();
216  void _decrement_topology();
217  void _decrement_topology_and_notify();
218 };
219 
220 // Constructor
221 inline Executor::Executor(unsigned N) :
222  _workers {N},
223  _waiters {N},
224  _notifier {_waiters} {
225 
226  if(N == 0) {
227  TF_THROW("no workers to execute the graph");
228  }
229 
230  _spawn(N);
231 }
232 
233 // Destructor
235 
236  // wait for all topologies to complete
237  wait_for_all();
238 
239  // shut down the scheduler
240  _done = true;
241  _notifier.notify(true);
242 
243  for(auto& t : _threads){
244  t.join();
245  }
246 }
247 
248 // Function: num_workers
249 inline size_t Executor::num_workers() const {
250  return _workers.size();
251 }
252 
253 // Function: _per_thread
254 inline Executor::PerThread& Executor::_per_thread() const {
255  thread_local PerThread pt;
256  return pt;
257 }
258 
259 // Function: this_worker_id
260 inline int Executor::this_worker_id() const {
261  auto worker = _per_thread().worker;
262  return worker ? static_cast<int>(worker->id) : -1;
263 }
264 
265 // Procedure: _spawn
266 inline void Executor::_spawn(unsigned N) {
267 
268  // Lock to synchronize all workers before creating _worker_maps
269  for(unsigned i=0; i<N; ++i) {
270 
271  _workers[i].id = i;
272 
273  _threads.emplace_back([this] (Worker& w) -> void {
274 
275  PerThread& pt = _per_thread();
276  pt.worker = &w;
277 
278  Node* t = nullptr;
279 
280  // must use 1 as condition instead of !done
281  while(1) {
282 
283  // execute the tasks.
284  _exploit_task(w, t);
285 
286  // wait for tasks
287  if(_wait_for_task(w, t) == false) {
288  break;
289  }
290  }
291 
292  }, std::ref(_workers[i]));
293  }
294 }
295 
296 // Function: _find_victim
297 inline unsigned Executor::_find_victim(unsigned thief) {
298 
299  /*unsigned l = 0;
300  unsigned r = _workers.size() - 1;
301  unsigned vtm = std::uniform_int_distribution<unsigned>{l, r}(
302  _workers[thief].rdgen
303  );
304 
305  // try to look for a task from other workers
306  for(unsigned i=0; i<_workers.size(); ++i){
307 
308  if((thief == vtm && !_queue.empty()) ||
309  (thief != vtm && !_workers[vtm].queue.empty())) {
310  return vtm;
311  }
312 
313  if(++vtm; vtm == _workers.size()) {
314  vtm = 0;
315  }
316  } */
317 
318  // try to look for a task from other workers
319  for(unsigned vtm=0; vtm<_workers.size(); ++vtm){
320  if((thief == vtm && !_queue.empty()) ||
321  (thief != vtm && !_workers[vtm].queue.empty())) {
322  return vtm;
323  }
324  }
325 
326  return static_cast<unsigned>(_workers.size());
327 }
328 
329 // Function: _explore_task
330 inline void Executor::_explore_task(Worker& thief, Node*& t) {
331 
332  //assert(_workers[thief].queue.empty());
333  assert(!t);
334 
335  const unsigned l = 0;
336  const unsigned r = static_cast<unsigned>(_workers.size()) - 1;
337 
338  const size_t F = (_workers.size() + 1) << 1;
339  const size_t Y = 100;
340 
341  size_t f = 0;
342  size_t y = 0;
343 
344  // explore
345  while(!_done) {
346 
347  unsigned vtm = std::uniform_int_distribution<unsigned>{l, r}(thief.rdgen);
348 
349  t = (vtm == thief.id) ? _queue.steal() : _workers[vtm].queue.steal();
350 
351  if(t) {
352  break;
353  }
354 
355  if(f++ > F) {
357  if(y++ > Y) {
358  break;
359  }
360  }
361 
362  /*if(auto vtm = _find_victim(thief); vtm != _workers.size()) {
363  t = (vtm == thief) ? _queue.steal() : _workers[vtm].queue.steal();
364  // successful thief
365  if(t) {
366  break;
367  }
368  }
369  else {
370  if(f++ > F) {
371  if(std::this_thread::yield(); y++ > Y) {
372  break;
373  }
374  }
375  }*/
376  }
377 
378 }
379 
380 // Procedure: _exploit_task
381 inline void Executor::_exploit_task(Worker& w, Node*& t) {
382 
383  assert(!w.cache);
384 
385  if(t) {
386  if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) {
387  _notifier.notify(false);
388  }
389 
390  auto tpg = t->_topology;
391  auto par = t->_parent;
392  w.num_executed = 1;
393 
394  do {
395  // Only joined subflow will enter block
396  // Flush the num_executed if encountering a different subflow
397  //if((*t)->_parent != par) {
398  // if(par == nullptr) {
399  // (*t)->_topology->_join_counter.fetch_sub(w.num_executed);
400  // }
401  // else {
402  // auto ret = par->_join_counter.fetch_sub(w.num_executed);
403  // if(ret == w.num_executed) {
404  // _schedule(par, false);
405  // }
406  // }
407  // w.num_executed = 1;
408  // par = (*t)->_parent;
409  //}
410 
411  _invoke(w, t);
412 
413  if(w.cache) {
414  t = w.cache;
415  w.cache = nullptr;
416  }
417  else {
418  t = w.queue.pop();
419  if(t) {
420  // We only increment the counter when poping task from queue
421  // (NOT including cache!)
422  if(t->_parent == par) {
423  w.num_executed ++;
424  }
425  // joined subflow
426  else {
427  if(par == nullptr) {
428  // still have tasks so the topology join counter can't be zero
429  t->_topology->_join_counter.fetch_sub(w.num_executed);
430  }
431  else {
432  auto ret = par->_join_counter.fetch_sub(w.num_executed);
433  if(ret == w.num_executed) {
434  //_schedule(par, false);
435  w.queue.push(par);
436  }
437  }
438  w.num_executed = 1;
439  par = t->_parent;
440  }
441  }
442  else {
443  // If no more local tasks!
444  if(par == nullptr) {
445  if(tpg->_join_counter.fetch_sub(w.num_executed) == w.num_executed) {
446  // TODO: Store tpg in local variable not in w
447  _tear_down_topology(&tpg);
448  if(tpg != nullptr) {
449  t = w.queue.pop();
450  if(t) {
451  w.num_executed = 1;
452  }
453  }
454  }
455  }
456  else {
457  if(par->_join_counter.fetch_sub(w.num_executed) == w.num_executed) {
458  t = par;
459  w.num_executed = 1;
460  par = par->_parent;
461  }
462  }
463  }
464  }
465  } while(t);
466 
467  --_num_actives;
468  }
469 }
470 
471 // Function: _wait_for_task
472 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
473 
474  wait_for_task:
475 
476  assert(!t);
477 
478  ++_num_thieves;
479 
480  explore_task:
481 
482  _explore_task(worker, t);
483 
484  if(t) {
485  auto N = _num_thieves.fetch_sub(1);
486  if(N == 1) {
487  _notifier.notify(false);
488  }
489  return true;
490  }
491 
492  auto waiter = &_waiters[worker.id];
493 
494  _notifier.prepare_wait(waiter);
495 
496  //if(auto vtm = _find_victim(me); vtm != _workers.size()) {
497  if(!_queue.empty()) {
498 
499  _notifier.cancel_wait(waiter);
500  //t = (vtm == me) ? _queue.steal() : _workers[vtm].queue.steal();
501 
502  t = _queue.steal();
503  if(t) {
504  auto N = _num_thieves.fetch_sub(1);
505  if(N == 1) {
506  _notifier.notify(false);
507  }
508  return true;
509  }
510  else {
511  goto explore_task;
512  }
513  }
514 
515  if(_done) {
516  _notifier.cancel_wait(waiter);
517  _notifier.notify(true);
518  --_num_thieves;
519  return false;
520  }
521 
522  if(_num_thieves.fetch_sub(1) == 1 && _num_actives) {
523  _notifier.cancel_wait(waiter);
524  goto wait_for_task;
525  }
526 
527  // Now I really need to relinguish my self to others
528  _notifier.commit_wait(waiter);
529 
530  return true;
531 }
532 
533 // Function: make_observer
534 template<typename Observer, typename... Args>
535 Observer* Executor::make_observer(Args&&... args) {
536  // use a local variable to mimic the constructor
537  auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
538  tmp->set_up(_workers.size());
539  _observer = std::move(tmp);
540  return static_cast<Observer*>(_observer.get());
541 }
542 
543 // Procedure: remove_observer
545  _observer.reset();
546 }
547 
548 // Procedure: _schedule
549 // The main procedure to schedule a give task node.
550 // Each task node has two types of tasks - regular and subflow.
551 inline void Executor::_schedule(Node* node, bool bypass) {
552 
553  //assert(_workers.size() != 0);
554 
555  // caller is a worker to this pool
556  auto worker = _per_thread().worker;
557 
558  if(worker != nullptr) {
559  if(!bypass) {
560  worker->queue.push(node);
561  }
562  else {
563  assert(!worker->cache);
564  worker->cache = node;
565  }
566  return;
567  }
568 
569  // other threads
570  {
571  std::lock_guard<std::mutex> lock(_queue_mutex);
572  _queue.push(node);
573  }
574 
575  _notifier.notify(false);
576 }
577 
578 // Procedure: _schedule
579 // The main procedure to schedule a set of task nodes.
580 // Each task node has two types of tasks - regular and subflow.
581 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
582 
583  //assert(_workers.size() != 0);
584 
585  // We need to cacth the node count to avoid accessing the nodes
586  // vector while the parent topology is removed!
587  const auto num_nodes = nodes.size();
588 
589  if(num_nodes == 0) {
590  return;
591  }
592 
593  // worker thread
594  auto worker = _per_thread().worker;
595 
596  if(worker != nullptr) {
597  for(size_t i=0; i<num_nodes; ++i) {
598  worker->queue.push(nodes[i]);
599  }
600  return;
601  }
602 
603  // other threads
604  {
605  std::lock_guard<std::mutex> lock(_queue_mutex);
606  for(size_t k=0; k<num_nodes; ++k) {
607  _queue.push(nodes[k]);
608  }
609  }
610 
611  if(num_nodes >= _workers.size()) {
612  _notifier.notify(true);
613  }
614  else {
615  for(size_t k=0; k<num_nodes; ++k) {
616  _notifier.notify(false);
617  }
618  }
619 }
620 
621 
622 // Procedure: _invoke
623 inline void Executor::_invoke(Worker& worker, Node* node) {
624 
625  //assert(_workers.size() != 0);
626 
627  // Here we need to fetch the num_successors first to avoid the invalid memory
628  // access caused by topology clear.
629  const auto num_successors = node->num_successors();
630 
631  // switch is faster than nested if-else due to jump table
632  switch(node->_handle.index()) {
633  // static task
634  case Node::STATIC_WORK:{
635  _invoke_static_work(worker, node);
636  }
637  break;
638  // module task
639  case Node::MODULE_WORK: {
640  bool first_time = !node->_has_state(Node::SPAWNED);
641  bool emptiness = false;
642  _set_up_module_work(node, emptiness);
643  if(first_time && !emptiness) {
644  return;
645  }
646  }
647  break;
648  // dynamic task
649  case Node::DYNAMIC_WORK: {
650 
651  auto& subgraph = nstd::get<Node::DynamicWork>(node->_handle).subgraph;
652 
653  // Clear the subgraph before the task execution
654  if(!node->_has_state(Node::SPAWNED)) {
655  subgraph.clear();
656  }
657 
658  Subflow fb(subgraph);
659 
660  _invoke_dynamic_work(worker, node, fb);
661 
662  // Need to create a subflow if first time & subgraph is not empty
663  if(!node->_has_state(Node::SPAWNED)) {
664  node->_set_state(Node::SPAWNED);
665  if(!subgraph.empty()) {
666  // For storing the source nodes
667  PassiveVector<Node*> src;
668 
669  for(auto n : subgraph._nodes) {
670 
671  n->_topology = node->_topology;
672  n->_set_up_join_counter();
673 
674  if(!fb.detached()) {
675  n->_parent = node;
676  }
677 
678  if(n->num_dependents() == 0) {
679  src.push_back(n);
680  }
681  }
682 
683  const bool join = fb.joined();
684  if(!join) {
685  // Detach mode
686  node->_topology->_join_counter.fetch_add(src.size());
687  }
688  else {
689  // Join mode
690  node->_join_counter.fetch_add(src.size());
691 
692  // spawned node needs another second-round execution
693  if(node->_parent == nullptr) {
694  node->_topology->_join_counter.fetch_add(1);
695  }
696  else {
697  node->_parent->_join_counter.fetch_add(1);
698  }
699  }
700 
701  _schedule(src);
702 
703  if(join) {
704  return;
705  }
706  } // End of first time
707  }
708  }
709  break;
710  // condition task
711  case Node::CONDITION_WORK: {
712 
713  if(node->_has_state(Node::BRANCH)) {
714  node->_join_counter = node->num_strong_dependents();
715  }
716  else {
717  node->_join_counter = node->num_dependents();
718  }
719 
720  int id;
721  _invoke_condition_work(worker, node, id);
722 
723  if(id >= 0 && static_cast<size_t>(id) < num_successors) {
724  node->_successors[id]->_join_counter.store(0);
725  _schedule(node->_successors[id], true);
726  }
727  return ;
728  } // no need to add a break here due to the immediate return
729 
730  // cudaflow task
731 #ifdef TF_ENABLE_CUDA
732  case Node::CUDAFLOW_WORK: {
733  _invoke_cudaflow_work(worker, node);
734  }
735  break;
736 #endif
737 
738  // monostate
739  default:
740  break;
741  }
742 
743 
744  // We MUST recover the dependency since subflow is a condition node can go back (cyclic)
745  // This must be done before scheduling the successors, otherwise this might cause
746  // race condition on the _dependents
747  if(node->_has_state(Node::BRANCH)) {
748  // If this is a case node, we need to deduct condition predecessors
749  node->_join_counter = node->num_strong_dependents();
750  }
751  else {
752  node->_join_counter = node->num_dependents();
753  }
754 
755  node->_unset_state(Node::SPAWNED);
756 
757  // At this point, the node storage might be destructed.
758  Node* cache {nullptr};
759  size_t num_spawns {0};
760 
761  auto& c = (node->_parent) ? node->_parent->_join_counter :
762  node->_topology->_join_counter;
763 
764  for(size_t i=0; i<num_successors; ++i) {
765  if(--(node->_successors[i]->_join_counter) == 0) {
766  if(cache) {
767  if(num_spawns == 0) {
768  c.fetch_add(num_successors);
769  }
770  num_spawns++;
771  _schedule(cache, false);
772  }
773  cache = node->_successors[i];
774  }
775  }
776 
777  if(num_spawns) {
778  worker.num_executed += (node->_successors.size() - num_spawns);
779  }
780 
781  if(cache) {
782  _schedule(cache, true);
783  }
784 }
785 
786 // Procedure: _invoke_static_work
787 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
788  if(_observer) {
789  _observer->on_entry(worker.id, TaskView(node));
790  nstd::get<Node::StaticWork>(node->_handle).work();
791  _observer->on_exit(worker.id, TaskView(node));
792  }
793  else {
794  nstd::get<Node::StaticWork>(node->_handle).work();
795  }
796 }
797 
798 // Procedure: _invoke_dynamic_work
799 inline void Executor::_invoke_dynamic_work(Worker& worker, Node* node, Subflow& sf) {
800  if(_observer) {
801  _observer->on_entry(worker.id, TaskView(node));
802  nstd::get<Node::DynamicWork>(node->_handle).work(sf);
803  _observer->on_exit(worker.id, TaskView(node));
804  }
805  else {
806  nstd::get<Node::DynamicWork>(node->_handle).work(sf);
807  }
808 }
809 
810 // Procedure: _invoke_condition_work
811 inline void Executor::_invoke_condition_work(Worker& worker, Node* node, int& id) {
812  if(_observer) {
813  _observer->on_entry(worker.id, TaskView(node));
814  id = nstd::get<Node::ConditionWork>(node->_handle).work();
815  _observer->on_exit(worker.id, TaskView(node));
816  }
817  else {
818  id = nstd::get<Node::ConditionWork>(node->_handle).work();
819  }
820 }
821 
822 #ifdef TF_ENABLE_CUDA
823 // Procedure: _invoke_cudaflow_work
824 inline void Executor::_invoke_cudaflow_work(Worker& worker, Node* node) {
825  if(_observer) {
826  _observer->on_entry(worker.id, TaskView(node));
827  _invoke_cudaflow_work_impl(worker, node);
828  _observer->on_exit(worker.id, TaskView(node));
829  }
830  else {
831  _invoke_cudaflow_work_impl(worker, node);
832  }
833 }
834 
835 // Procedure: _invoke_cudaflow_work_impl
836 inline void Executor::_invoke_cudaflow_work_impl(Worker&, Node* node) {
837 
838  auto& h = nstd::get<Node::cudaFlowWork>(node->_handle);
839 
840  h.graph.clear();
841 
842  cudaFlow cf(h.graph);
843 
844  h.work(cf);
845 
846  cudaGraphExec_t exec;
847  TF_CHECK_CUDA(
848  cudaGraphInstantiate(&exec, h.graph._handle, nullptr, nullptr, 0),
849  "failed to create an exec cudaGraph"
850  );
851  TF_CHECK_CUDA(cudaGraphLaunch(exec, 0), "failed to launch cudaGraph")
852  TF_CHECK_CUDA(cudaStreamSynchronize(0), "failed to sync cudaStream");
853  TF_CHECK_CUDA(
854  cudaGraphExecDestroy(exec), "failed to destroy an exec cudaGraph"
855  );
856 }
857 #endif
858 
859 // Procedure: _set_up_module_work
860 inline void Executor::_set_up_module_work(Node* node, bool& ept) {
861 
862  // second time to enter this context
863  if(node->_has_state(Node::SPAWNED)) {
864  return;
865  }
866 
867  // first time to enter this context
868  node->_set_state(Node::SPAWNED);
869 
870  auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
871 
872  if(module->empty()) {
873  ept = true;
874  return;
875  }
876 
877  PassiveVector<Node*> src;
878 
879  for(auto n: module->_graph._nodes) {
880 
881  n->_topology = node->_topology;
882  n->_parent = node;
883  n->_set_up_join_counter();
884 
885  if(n->num_dependents() == 0) {
886  src.push_back(n);
887  }
888  }
889 
890  node->_join_counter.fetch_add(src.size());
891 
892  if(node->_parent == nullptr) {
893  node->_topology->_join_counter.fetch_add(1);
894  }
895  else {
896  node->_parent->_join_counter.fetch_add(1);
897  }
898 
899  // src can't be empty (banned outside)
900  _schedule(src);
901 }
902 
903 // Function: run
905  return run_n(f, 1, [](){});
906 }
907 
908 // Function: run
909 template <typename C>
911  return run_n(f, 1, std::forward<C>(c));
912 }
913 
914 // Function: run_n
915 inline std::future<void> Executor::run_n(Taskflow& f, size_t repeat) {
916  return run_n(f, repeat, [](){});
917 }
918 
919 // Function: run_n
920 template <typename C>
921 std::future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
922  return run_until(f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c));
923 }
924 
925 // Function: run_until
926 template<typename P>
928  return run_until(f, std::forward<P>(pred), [](){});
929 }
930 
931 // Function: _set_up_topology
932 inline void Executor::_set_up_topology(Topology* tpg) {
933 
934  tpg->_sources.clear();
935 
936  // scan each node in the graph and build up the links
937  for(auto node : tpg->_taskflow._graph._nodes) {
938 
939  node->_topology = tpg;
940  node->_clear_state();
941 
942  if(node->num_dependents() == 0) {
943  tpg->_sources.push_back(node);
944  }
945 
946  node->_set_up_join_counter();
947  }
948 
949  tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
950 }
951 
952 // Function: _tear_down_topology
953 inline void Executor::_tear_down_topology(Topology** tpg) {
954 
955  auto &f = (*tpg)->_taskflow;
956 
957  //assert(&tpg == &(f._topologies.front()));
958 
959  // case 1: we still need to run the topology again
960  if(! (*tpg)->_pred() ) {
961  //tpg->_recover_num_sinks();
962 
963  assert((*tpg)->_join_counter == 0);
964  (*tpg)->_join_counter = (*tpg)->_sources.size();
965 
966  _schedule((*tpg)->_sources);
967  }
968  // case 2: the final run of this topology
969  else {
970 
971  if((*tpg)->_call != nullptr) {
972  (*tpg)->_call();
973  }
974 
975  f._mtx.lock();
976 
977  // If there is another run (interleave between lock)
978  if(f._topologies.size() > 1) {
979 
980  assert((*tpg)->_join_counter == 0);
981 
982  // Set the promise
983  (*tpg)->_promise.set_value();
984  f._topologies.pop_front();
985  f._mtx.unlock();
986 
987  // decrement the topology but since this is not the last we don't notify
988  _decrement_topology();
989 
990  *tpg = &(f._topologies.front());
991 
992  _set_up_topology(*tpg);
993  _schedule((*tpg)->_sources);
994 
995  //f._topologies.front()._bind(f._graph);
996  //*tpg = &(f._topologies.front());
997 
998  //assert(f._topologies.front()._join_counter == 0);
999 
1000  //f._topologies.front()._join_counter = f._topologies.front()._sources.size();
1001 
1002  //_schedule(f._topologies.front()._sources);
1003  }
1004  else {
1005  assert(f._topologies.size() == 1);
1006 
1007  // Need to back up the promise first here becuz taskflow might be
1008  // destroy before taskflow leaves
1009  auto p {std::move((*tpg)->_promise)};
1010 
1011  f._topologies.pop_front();
1012 
1013  f._mtx.unlock();
1014 
1015  // We set the promise in the end in case taskflow leaves before taskflow
1016  p.set_value();
1017 
1018  _decrement_topology_and_notify();
1019 
1020  // Reset topology so caller can stop execution
1021  *tpg = nullptr;
1022  }
1023  }
1024 }
1025 
1026 // Function: run_until
1027 template <typename P, typename C>
1029 
1030  _increment_topology();
1031 
1032  // Special case of predicate
1033  if(f.empty() || pred()) {
1034  std::promise<void> promise;
1035  promise.set_value();
1036  _decrement_topology_and_notify();
1037  return promise.get_future();
1038  }
1039 
1040 
1041 
1045  //if(_workers.size() == 0) {
1046  //
1047  // Topology tpg(f, std::forward<P>(pred), std::forward<C>(c));
1048 
1049  // // Clear last execution data & Build precedence between nodes and target
1050  // tpg._bind(f._graph);
1051 
1052  // std::stack<Node*> stack;
1053 
1054  // do {
1055  // _schedule_unsync(tpg._sources, stack);
1056  // while(!stack.empty()) {
1057  // auto node = stack.top();
1058  // stack.pop();
1059  // _invoke_unsync(node, stack);
1060  // }
1061  // tpg._recover_num_sinks();
1062  // } while(!std::invoke(tpg._pred));
1063 
1064  // if(tpg._call != nullptr) {
1065  // std::invoke(tpg._call);
1066  // }
1067 
1068  // tpg._promise.set_value();
1069  //
1070  // _decrement_topology_and_notify();
1071  //
1072  // return tpg._promise.get_future();
1073  //}
1074 
1075  // Multi-threaded execution.
1076  bool run_now {false};
1077  Topology* tpg;
1078  std::future<void> future;
1079 
1080  {
1081  std::lock_guard<std::mutex> lock(f._mtx);
1082 
1083  // create a topology for this run
1084  //tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
1085  f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1086  tpg = &(f._topologies.back());
1087  future = tpg->_promise.get_future();
1088 
1089  if(f._topologies.size() == 1) {
1090  run_now = true;
1091  //tpg->_bind(f._graph);
1092  //_schedule(tpg->_sources);
1093  }
1094  }
1095 
1096  // Notice here calling schedule may cause the topology to be removed sonner
1097  // before the function leaves.
1098  if(run_now) {
1099  _set_up_topology(tpg);
1100  _schedule(tpg->_sources);
1101  }
1102 
1103  return future;
1104 }
1105 
1106 // Procedure: _increment_topology
1107 inline void Executor::_increment_topology() {
1108  std::lock_guard<std::mutex> lock(_topology_mutex);
1109  ++_num_topologies;
1110 }
1111 
1112 // Procedure: _decrement_topology_and_notify
1113 inline void Executor::_decrement_topology_and_notify() {
1114  std::lock_guard<std::mutex> lock(_topology_mutex);
1115  if(--_num_topologies == 0) {
1116  _topology_cv.notify_all();
1117  }
1118 }
1119 
1120 // Procedure: _decrement_topology
1121 inline void Executor::_decrement_topology() {
1122  std::lock_guard<std::mutex> lock(_topology_mutex);
1123  --_num_topologies;
1124 }
1125 
1126 // Procedure: wait_for_all
1127 inline void Executor::wait_for_all() {
1128  std::unique_lock<std::mutex> lock(_topology_mutex);
1129  _topology_cv.wait(lock, [&](){ return _num_topologies == 0; });
1130 }
1131 
1132 } // end of namespace tf -----------------------------------------------------
1133 
1134 
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:260
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:904
void remove_observer()
removes the associated observer
Definition: executor.hpp:544
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:927
~Executor()
destructs the executor
Definition: executor.hpp:234
T yield(T... args)
Definition: error.hpp:9
T hardware_concurrency(T... args)
Lock-free unbounded single-producer multiple-consumer queue.
Definition: tsq.hpp:27
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: tsq.hpp:155
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:535
T lock(T... args)
the class to create a task dependency graph
Definition: taskflow.hpp:18
void push(T item)
inserts an item to the queue
Definition: tsq.hpp:171
bool empty() const
queries the emptiness of the taskflow
Definition: taskflow.hpp:132
T move(T... args)
T ref(T... args)
T steal()
steals an item from the queue
Definition: tsq.hpp:221
The executor class to run a taskflow graph.
Definition: executor.hpp:33
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:249
Executor(unsigned n=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition: executor.hpp:221
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:358
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:915
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1127