Cpp-Taskflow  2.4-master-branch
executor.hpp
1 #pragma once
2 
3 #include <iostream>
4 #include <vector>
5 #include <cstdlib>
6 #include <cstdio>
7 #include <random>
8 #include <atomic>
9 #include <memory>
10 #include <deque>
11 #include <thread>
12 #include <algorithm>
13 #include <set>
14 #include <numeric>
15 #include <cassert>
16 
17 #include "tsq.hpp"
18 #include "notifier.hpp"
19 #include "observer.hpp"
20 #include "taskflow.hpp"
21 
22 namespace tf {
23 
32 class Executor {
33 
34  struct Worker {
35  unsigned id;
36  Domain domain;
37  Executor* executor;
38  Notifier::Waiter* waiter;
39  std::mt19937 rdgen { std::random_device{}() };
40  TaskQueue<Node*> wsq[NUM_DOMAINS];
41  Node* cache {nullptr};
42  };
43 
44  struct PerThread {
45  Worker* worker {nullptr};
46  };
47 
48 #ifdef TF_ENABLE_CUDA
49  struct CUDADevice {
50  int id {-1};
52  };
53 #endif
54 
55  public:
56 
57 #ifdef TF_ENABLE_CUDA
58 
61  explicit Executor(
62  unsigned N = std::thread::hardware_concurrency(),
63  unsigned M = cuda_num_devices()
64  );
65 #else
66 
69  explicit Executor(unsigned N = std::thread::hardware_concurrency());
70 #endif
71 
75  ~Executor();
76 
84  std::future<void> run(Taskflow& taskflow);
85 
94  template<typename C>
95  std::future<void> run(Taskflow& taskflow, C&& callable);
96 
105  std::future<void> run_n(Taskflow& taskflow, size_t N);
106 
116  template<typename C>
117  std::future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
118 
128  template<typename P>
129  std::future<void> run_until(Taskflow& taskflow, P&& pred);
130 
141  template<typename P, typename C>
142  std::future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
143 
147  void wait_for_all();
148 
152  size_t num_workers() const;
153 
160  size_t num_topologies() const;
161 
168  size_t num_domains() const;
169 
176  int this_worker_id() const;
177 
191  template<typename Observer, typename... Args>
192  Observer* make_observer(Args&&... args);
193 
197  void remove_observer();
198 
199 
200  private:
201 
202  std::condition_variable _topology_cv;
203  std::mutex _topology_mutex;
204  std::mutex _wsq_mutex;
205 
206  size_t _num_topologies {0};
207 
208  // scheduler field
209  std::vector<Worker> _workers;
210  std::vector<std::thread> _threads;
211 
212  Notifier _notifier[NUM_DOMAINS];
213 
214  TaskQueue<Node*> _wsq[NUM_DOMAINS];
215 
216  std::atomic<size_t> _num_actives[NUM_DOMAINS];
217  std::atomic<size_t> _num_thieves[NUM_DOMAINS];
218  std::atomic<bool> _done {0};
219 
221 
222  unsigned _find_victim(unsigned);
223 
224  PerThread& _per_thread() const;
225 
226  bool _wait_for_task(Worker&, Node*&);
227 
228  void _spawn(unsigned, Domain);
229  void _worker_loop(Worker&);
230  void _exploit_task(Worker&, Node*&);
231  void _explore_task(Worker&, Node*&);
232  void _schedule(Node*, bool);
233  void _schedule(PassiveVector<Node*>&);
234  void _invoke(Worker&, Node*);
235  void _invoke_static_work(Worker&, Node*);
236  void _invoke_dynamic_work(Worker&, Node*, Subflow&);
237  void _invoke_condition_work(Worker&, Node*, int&);
238 
239 #ifdef TF_ENABLE_CUDA
240  void _invoke_cudaflow_work(Worker&, Node*);
241  void _invoke_cudaflow_work_impl(Worker&, Node*);
242 #endif
243 
244  void _set_up_module_work(Node*, bool&);
245  void _set_up_topology(Topology*);
246  void _tear_down_topology(Topology**);
247  void _increment_topology();
248  void _decrement_topology();
249  void _decrement_topology_and_notify();
250 };
251 
252 
253 #ifdef TF_ENABLE_CUDA
254 // Constructor
255 inline Executor::Executor(unsigned N, unsigned M) :
256  _workers {N + M},
257  _notifier {Notifier(N), Notifier(M)} {
258 
259  if(N == 0) {
260  TF_THROW("no cpu workers to execute taskflows");
261  }
262 
263  if(M == 0) {
264  TF_THROW("no gpu workers to execute cudaflows");
265  }
266 
267  for(int i=0; i<NUM_DOMAINS; ++i) {
268  _num_actives[i].store(0, std::memory_order_relaxed);
269  _num_thieves[i].store(0, std::memory_order_relaxed);
270  }
271 
272  _spawn(N, HOST);
273  _spawn(M, CUDA);
274 }
275 
276 #else
277 // Constructor
278 inline Executor::Executor(unsigned N) :
279  _workers {N},
280  _notifier {Notifier(N)} {
281 
282  if(N == 0) {
283  TF_THROW("no cpu workers to execute taskflows");
284  }
285 
286  for(int i=0; i<NUM_DOMAINS; ++i) {
287  _num_actives[i].store(0, std::memory_order_relaxed);
288  _num_thieves[i].store(0, std::memory_order_relaxed);
289  }
290 
291  _spawn(N, HOST);
292 }
293 #endif
294 
295 // Destructor
297 
298  // wait for all topologies to complete
299  wait_for_all();
300 
301  // shut down the scheduler
302  _done = true;
303 
304  for(int i=0; i<NUM_DOMAINS; ++i) {
305  _notifier[i].notify(true);
306  }
307 
308  for(auto& t : _threads){
309  t.join();
310  }
311 }
312 
313 // Function: num_workers
314 inline size_t Executor::num_workers() const {
315  return _workers.size();
316 }
317 
318 // Function: num_domains
319 inline size_t Executor::num_domains() const {
320  return NUM_DOMAINS;
321 }
322 
323 // Function: num_topologies
324 inline size_t Executor::num_topologies() const {
325  return _num_topologies;
326 }
327 
328 // Function: _per_thread
329 inline Executor::PerThread& Executor::_per_thread() const {
330  thread_local PerThread pt;
331  return pt;
332 }
333 
334 // Function: this_worker_id
335 inline int Executor::this_worker_id() const {
336  auto worker = _per_thread().worker;
337  return worker ? static_cast<int>(worker->id) : -1;
338 }
339 
340 // Procedure: _spawn
341 inline void Executor::_spawn(unsigned N, Domain d) {
342 
343  auto id = _threads.size();
344 
345  for(unsigned i=0; i<N; ++i, ++id) {
346 
347  _workers[id].id = id;
348  _workers[id].domain = d;
349  _workers[id].executor = this;
350  _workers[id].waiter = &_notifier[d]._waiters[i];
351 
352  _threads.emplace_back([this] (Worker& w) -> void {
353 
354  PerThread& pt = _per_thread();
355  pt.worker = &w;
356 
357  Node* t = nullptr;
358 
359  // must use 1 as condition instead of !done
360  while(1) {
361 
362  // execute the tasks.
363  _exploit_task(w, t);
364 
365  // wait for tasks
366  if(_wait_for_task(w, t) == false) {
367  break;
368  }
369  }
370 
371  }, std::ref(_workers[id]));
372  }
373 
374 }
375 
376 // Function: _explore_task
377 inline void Executor::_explore_task(Worker& thief, Node*& t) {
378 
379  //assert(_workers[thief].wsq.empty());
380  assert(!t);
381 
382  const auto d = thief.domain;
383 
384  const unsigned l = 0;
385  const unsigned r = static_cast<unsigned>(_workers.size()) - 1;
386 
387  const size_t F = (_workers.size() + 1) << 1;
388  const size_t Y = 100;
389 
390  size_t f = 0;
391  size_t y = 0;
392 
393  // explore
394  while(!_done) {
395 
396  unsigned vtm = std::uniform_int_distribution<unsigned>{l, r}(thief.rdgen);
397 
398  t = (vtm == thief.id) ? _wsq[d].steal() : _workers[vtm].wsq[d].steal();
399 
400  if(t) {
401  break;
402  }
403 
404  if(f++ > F) {
406  if(y++ > Y) {
407  break;
408  }
409  }
410  }
411 
412 }
413 
414 // Procedure: _exploit_task
415 inline void Executor::_exploit_task(Worker& w, Node*& t) {
416 
417  assert(!w.cache);
418 
419  if(t) {
420 
421  const auto d = w.domain;
422 
423  if(_num_actives[d].fetch_add(1) == 0 && _num_thieves[d] == 0) {
424  _notifier[d].notify(false);
425  }
426 
427  auto tpg = t->_topology;
428  auto par = t->_parent;
429  auto exe = size_t{1};
430 
431  do {
432  _invoke(w, t);
433 
434  if(w.cache) {
435  t = w.cache;
436  w.cache = nullptr;
437  }
438  else {
439  t = w.wsq[d].pop();
440  if(t) {
441  // We only increment the counter when poping task from wsq
442  // (NOT including cache!)
443  if(t->_parent == par) {
444  exe++;
445  }
446  // joined subflow
447  else {
448  if(par == nullptr) {
449  // still have tasks so the topology join counter can't be zero
450  t->_topology->_join_counter.fetch_sub(exe);
451  }
452  else {
453  auto ret = par->_join_counter.fetch_sub(exe);
454  if(ret == exe) {
455  if(par->domain() == d) {
456  w.wsq[d].push(par);
457  }
458  else {
459  _schedule(par, false);
460  }
461  }
462  }
463  exe = 1;
464  par = t->_parent;
465  }
466  }
467  else {
468  // If no more local tasks!
469  if(par == nullptr) {
470  if(tpg->_join_counter.fetch_sub(exe) == exe) {
471  // TODO: Store tpg in local variable not in w
472  _tear_down_topology(&tpg);
473  if(tpg != nullptr) {
474  t = w.wsq[d].pop();
475  if(t) {
476  exe = 1;
477  }
478  }
479  }
480  }
481  else {
482  if(par->_join_counter.fetch_sub(exe) == exe) {
483  if(par->domain() == d) {
484  t = par;
485  par = par->_parent;
486  exe = 1;
487  }
488  else {
489  _schedule(par, false);
490  }
491  }
492  }
493  }
494  }
495  } while(t);
496 
497  --_num_actives[d];
498  }
499 }
500 
501 // Function: _wait_for_task
502 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
503 
504  const auto d = worker.domain;
505 
506  wait_for_task:
507 
508  assert(!t);
509 
510  ++_num_thieves[d];
511 
512  explore_task:
513 
514  _explore_task(worker, t);
515 
516  if(t) {
517  if(_num_thieves[d].fetch_sub(1) == 1) {
518  _notifier[d].notify(false);
519  }
520  return true;
521  }
522 
523  _notifier[d].prepare_wait(worker.waiter);
524 
525  //if(auto vtm = _find_victim(me); vtm != _workers.size()) {
526  if(!_wsq[d].empty()) {
527 
528  _notifier[d].cancel_wait(worker.waiter);
529  //t = (vtm == me) ? _wsq.steal() : _workers[vtm].wsq.steal();
530 
531  t = _wsq[d].steal();
532  if(t) {
533  if(_num_thieves[d].fetch_sub(1) == 1) {
534  _notifier[d].notify(false);
535  }
536  return true;
537  }
538  else {
539  goto explore_task;
540  }
541  }
542 
543  if(_done) {
544  _notifier[d].cancel_wait(worker.waiter);
545  for(int i=0; i<NUM_DOMAINS; ++i) {
546  _notifier[i].notify(true);
547  }
548  --_num_thieves[d];
549  return false;
550  }
551 
552  if(_num_thieves[d].fetch_sub(1) == 1) {
553  if(_num_actives[d]) {
554  _notifier[d].cancel_wait(worker.waiter);
555  goto wait_for_task;
556  }
557  // check all domain queue again
558  for(auto& w : _workers) {
559  if(!w.wsq[d].empty()) {
560  _notifier[d].cancel_wait(worker.waiter);
561  goto wait_for_task;
562  }
563  }
564  }
565 
566  // Now I really need to relinguish my self to others
567  _notifier[d].commit_wait(worker.waiter);
568 
569  return true;
570 }
571 
572 // Function: make_observer
573 template<typename Observer, typename... Args>
574 Observer* Executor::make_observer(Args&&... args) {
575  // use a local variable to mimic the constructor
576  auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
577  tmp->set_up(_workers.size());
578  _observer = std::move(tmp);
579  return static_cast<Observer*>(_observer.get());
580 }
581 
582 // Procedure: remove_observer
584  _observer.reset();
585 }
586 
587 // Procedure: _schedule
588 // The main procedure to schedule a give task node.
589 // Each task node has two types of tasks - regular and subflow.
590 inline void Executor::_schedule(Node* node, bool bypass_hint) {
591 
592  //assert(_workers.size() != 0);
593 
594  const auto d = node->domain();
595 
596  // caller is a worker to this pool
597  auto worker = _per_thread().worker;
598 
599  if(worker != nullptr && worker->executor == this) {
600  if(bypass_hint) {
601  assert(!worker->cache);
602  worker->cache = node;
603  }
604  else {
605  worker->wsq[d].push(node);
606  if(worker->domain != d) {
607  if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
608  _notifier[d].notify(false);
609  }
610  }
611  }
612  return;
613  }
614 
615  // other threads
616  {
617  std::lock_guard<std::mutex> lock(_wsq_mutex);
618  _wsq[d].push(node);
619  }
620 
621  _notifier[d].notify(false);
622 }
623 
624 // Procedure: _schedule
625 // The main procedure to schedule a set of task nodes.
626 // Each task node has two types of tasks - regular and subflow.
627 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
628 
629  //assert(_workers.size() != 0);
630 
631  // We need to cacth the node count to avoid accessing the nodes
632  // vector while the parent topology is removed!
633  const auto num_nodes = nodes.size();
634 
635  if(num_nodes == 0) {
636  return;
637  }
638 
639  // worker thread
640  auto worker = _per_thread().worker;
641 
642  // task counts
643  size_t tcount[NUM_DOMAINS] = {0};
644 
645  if(worker != nullptr && worker->executor == this) {
646  for(size_t i=0; i<num_nodes; ++i) {
647  const auto d = nodes[i]->domain();
648  worker->wsq[d].push(nodes[i]);
649  tcount[d]++;
650  }
651 
652  for(int d=0; d<NUM_DOMAINS; ++d) {
653  if(tcount[d] && d != worker->domain) {
654  if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
655  _notifier[d].notify_n(tcount[d]);
656  }
657  }
658  }
659 
660  return;
661  }
662 
663  // other threads
664  {
665  std::lock_guard<std::mutex> lock(_wsq_mutex);
666  for(size_t k=0; k<num_nodes; ++k) {
667  const auto d = nodes[k]->domain();
668  _wsq[d].push(nodes[k]);
669  tcount[d]++;
670  }
671  }
672 
673  for(int d=0; d<NUM_DOMAINS; ++d) {
674  _notifier[d].notify_n(tcount[d]);
675  }
676 }
677 
678 
679 // Procedure: _invoke
680 inline void Executor::_invoke(Worker& worker, Node* node) {
681 
682  //assert(_workers.size() != 0);
683 
684  // Here we need to fetch the num_successors first to avoid the invalid memory
685  // access caused by topology clear.
686  const auto num_successors = node->num_successors();
687 
688  // acquire the parent flow counter
689  auto& c = (node->_parent) ? node->_parent->_join_counter :
690  node->_topology->_join_counter;
691 
692  // switch is faster than nested if-else due to jump table
693  switch(node->_handle.index()) {
694  // static task
695  case Node::STATIC_WORK:{
696  _invoke_static_work(worker, node);
697  }
698  break;
699  // module task
700  case Node::MODULE_WORK: {
701  bool first_time = !node->_has_state(Node::SPAWNED);
702  bool emptiness = false;
703  _set_up_module_work(node, emptiness);
704  if(first_time && !emptiness) {
705  return;
706  }
707  }
708  break;
709  // dynamic task
710  case Node::DYNAMIC_WORK: {
711 
712  // Need to create a subflow if it is the first time entering here
713  if(!node->_has_state(Node::SPAWNED)) {
714 
715  auto& subgraph = nstd::get<Node::DynamicWork>(node->_handle).subgraph;
716 
717  subgraph.clear();
718  Subflow fb(subgraph);
719 
720  _invoke_dynamic_work(worker, node, fb);
721 
722  node->_set_state(Node::SPAWNED);
723 
724  if(!subgraph.empty()) {
725 
726  PassiveVector<Node*> src;
727 
728  for(auto n : subgraph._nodes) {
729 
730  n->_topology = node->_topology;
731  n->_set_up_join_counter();
732 
733  if(!fb.detached()) {
734  n->_parent = node;
735  }
736 
737  if(n->num_dependents() == 0) {
738  src.push_back(n);
739  }
740  }
741 
742  const bool join = fb.joined();
743  if(!join) {
744  // Detach mode
745  node->_topology->_join_counter.fetch_add(src.size());
746  }
747  else {
748  // Join mode
749  node->_join_counter.fetch_add(src.size());
750 
751  // spawned node needs another second-round execution
752  c.fetch_add(1);
753  }
754 
755  _schedule(src);
756 
757  if(join) {
758  return;
759  }
760  } // End of first time
761  }
762  }
763  break;
764  // condition task
765  case Node::CONDITION_WORK: {
766 
767  if(node->_has_state(Node::BRANCH)) {
768  node->_join_counter = node->num_strong_dependents();
769  }
770  else {
771  node->_join_counter = node->num_dependents();
772  }
773 
774  int id;
775  _invoke_condition_work(worker, node, id);
776 
777  if(id >= 0 && static_cast<size_t>(id) < num_successors) {
778  auto s = node->_successors[id];
779  s->_join_counter.store(0);
780 
781  if(s->domain() == worker.domain) {
782  _schedule(s, true);
783  }
784  else {
785  c.fetch_add(1);
786  _schedule(s, false);
787  }
788  }
789  return ;
790  } // no need to add a break here due to the immediate return
791 
792  // cudaflow task
793 #ifdef TF_ENABLE_CUDA
794  case Node::CUDAFLOW_WORK: {
795  _invoke_cudaflow_work(worker, node);
796  }
797  break;
798 #endif
799 
800  // monostate
801  default:
802  break;
803  }
804 
805 
806  // We MUST recover the dependency since subflow may have
807  // a condition node to go back (cyclic).
808  // This must be done before scheduling the successors, otherwise this might cause
809  // race condition on the _dependents
810  if(node->_has_state(Node::BRANCH)) {
811  // If this is a case node, we need to deduct condition predecessors
812  node->_join_counter = node->num_strong_dependents();
813  }
814  else {
815  node->_join_counter = node->num_dependents();
816  }
817 
818  node->_unset_state(Node::SPAWNED);
819 
820  // At this point, the node storage might be destructed.
821  Node* cache {nullptr};
822 
823  for(size_t i=0; i<num_successors; ++i) {
824  if(--(node->_successors[i]->_join_counter) == 0) {
825  if(node->_successors[i]->domain() != worker.domain) {
826  c.fetch_add(1);
827  _schedule(node->_successors[i], false);
828  }
829  else {
830  if(cache) {
831  c.fetch_add(1);
832  _schedule(cache, false);
833  }
834  cache = node->_successors[i];
835  }
836  }
837  }
838 
839  if(cache) {
840  _schedule(cache, true);
841  }
842 }
843 
844 // Procedure: _invoke_static_work
845 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
846  if(_observer) {
847  _observer->on_entry(worker.id, TaskView(node));
848  nstd::get<Node::StaticWork>(node->_handle).work();
849  _observer->on_exit(worker.id, TaskView(node));
850  }
851  else {
852  nstd::get<Node::StaticWork>(node->_handle).work();
853  }
854 }
855 
856 // Procedure: _invoke_dynamic_work
857 inline void Executor::_invoke_dynamic_work(Worker& worker, Node* node, Subflow& sf) {
858  if(_observer) {
859  _observer->on_entry(worker.id, TaskView(node));
860  nstd::get<Node::DynamicWork>(node->_handle).work(sf);
861  _observer->on_exit(worker.id, TaskView(node));
862  }
863  else {
864  nstd::get<Node::DynamicWork>(node->_handle).work(sf);
865  }
866 }
867 
868 // Procedure: _invoke_condition_work
869 inline void Executor::_invoke_condition_work(Worker& worker, Node* node, int& id) {
870  if(_observer) {
871  _observer->on_entry(worker.id, TaskView(node));
872  id = nstd::get<Node::ConditionWork>(node->_handle).work();
873  _observer->on_exit(worker.id, TaskView(node));
874  }
875  else {
876  id = nstd::get<Node::ConditionWork>(node->_handle).work();
877  }
878 }
879 
880 #ifdef TF_ENABLE_CUDA
881 // Procedure: _invoke_cudaflow_work
882 inline void Executor::_invoke_cudaflow_work(Worker& worker, Node* node) {
883 
884  assert(worker.domain == node->domain());
885 
886  if(_observer) {
887  _observer->on_entry(worker.id, TaskView(node));
888  _invoke_cudaflow_work_impl(worker, node);
889  _observer->on_exit(worker.id, TaskView(node));
890  }
891  else {
892  _invoke_cudaflow_work_impl(worker, node);
893  }
894 }
895 
896 // Procedure: _invoke_cudaflow_work_impl
897 inline void Executor::_invoke_cudaflow_work_impl(Worker&, Node* node) {
898 
899  auto& h = nstd::get<Node::cudaFlowWork>(node->_handle);
900 
901  h.graph.clear();
902 
903  cudaFlow cf(h.graph);
904 
905  h.work(cf);
906 
907  h.graph._make_native_graph(cf._device);
908 
909  cudaGraphExec_t exec;
910  TF_CHECK_CUDA(
911  cudaGraphInstantiate(&exec, h.graph._native_handle, nullptr, nullptr, 0),
912  "failed to create an executable cudaGraph"
913  );
914  TF_CHECK_CUDA(cudaGraphLaunch(exec, cf._stream), "failed to launch cudaGraph")
915  TF_CHECK_CUDA(cudaStreamSynchronize(cf._stream), "failed to sync cudaStream");
916  TF_CHECK_CUDA(
917  cudaGraphExecDestroy(exec), "failed to destroy an executable cudaGraph"
918  );
919 }
920 #endif
921 
922 // Procedure: _set_up_module_work
923 inline void Executor::_set_up_module_work(Node* node, bool& ept) {
924 
925  // second time to enter this context
926  if(node->_has_state(Node::SPAWNED)) {
927  return;
928  }
929 
930  // first time to enter this context
931  node->_set_state(Node::SPAWNED);
932 
933  auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
934 
935  if(module->empty()) {
936  ept = true;
937  return;
938  }
939 
940  PassiveVector<Node*> src;
941 
942  for(auto n: module->_graph._nodes) {
943 
944  n->_topology = node->_topology;
945  n->_parent = node;
946  n->_set_up_join_counter();
947 
948  if(n->num_dependents() == 0) {
949  src.push_back(n);
950  }
951  }
952 
953  node->_join_counter.fetch_add(src.size());
954 
955  if(node->_parent == nullptr) {
956  node->_topology->_join_counter.fetch_add(1);
957  }
958  else {
959  node->_parent->_join_counter.fetch_add(1);
960  }
961 
962  // src can't be empty (banned outside)
963  _schedule(src);
964 }
965 
966 // Function: run
968  return run_n(f, 1, [](){});
969 }
970 
971 // Function: run
972 template <typename C>
974  return run_n(f, 1, std::forward<C>(c));
975 }
976 
977 // Function: run_n
978 inline std::future<void> Executor::run_n(Taskflow& f, size_t repeat) {
979  return run_n(f, repeat, [](){});
980 }
981 
982 // Function: run_n
983 template <typename C>
984 std::future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
985  return run_until(f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c));
986 }
987 
988 // Function: run_until
989 template<typename P>
991  return run_until(f, std::forward<P>(pred), [](){});
992 }
993 
994 // Function: _set_up_topology
995 inline void Executor::_set_up_topology(Topology* tpg) {
996 
997  tpg->_sources.clear();
998 
999  // scan each node in the graph and build up the links
1000  for(auto node : tpg->_taskflow._graph._nodes) {
1001 
1002  node->_topology = tpg;
1003  node->_clear_state();
1004 
1005  if(node->num_dependents() == 0) {
1006  tpg->_sources.push_back(node);
1007  }
1008 
1009  node->_set_up_join_counter();
1010  }
1011 
1012  tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
1013 }
1014 
1015 // Function: _tear_down_topology
1016 inline void Executor::_tear_down_topology(Topology** tpg) {
1017 
1018  auto &f = (*tpg)->_taskflow;
1019 
1020  //assert(&tpg == &(f._topologies.front()));
1021 
1022  // case 1: we still need to run the topology again
1023  if(! (*tpg)->_pred() ) {
1024  //tpg->_recover_num_sinks();
1025 
1026  assert((*tpg)->_join_counter == 0);
1027  (*tpg)->_join_counter = (*tpg)->_sources.size();
1028 
1029  _schedule((*tpg)->_sources);
1030  }
1031  // case 2: the final run of this topology
1032  else {
1033 
1034  if((*tpg)->_call != nullptr) {
1035  (*tpg)->_call();
1036  }
1037 
1038  f._mtx.lock();
1039 
1040  // If there is another run (interleave between lock)
1041  if(f._topologies.size() > 1) {
1042 
1043  assert((*tpg)->_join_counter == 0);
1044 
1045  // Set the promise
1046  (*tpg)->_promise.set_value();
1047  f._topologies.pop_front();
1048  f._mtx.unlock();
1049 
1050  // decrement the topology but since this is not the last we don't notify
1051  _decrement_topology();
1052 
1053  *tpg = &(f._topologies.front());
1054 
1055  _set_up_topology(*tpg);
1056  _schedule((*tpg)->_sources);
1057 
1058  //f._topologies.front()._bind(f._graph);
1059  //*tpg = &(f._topologies.front());
1060 
1061  //assert(f._topologies.front()._join_counter == 0);
1062 
1063  //f._topologies.front()._join_counter = f._topologies.front()._sources.size();
1064 
1065  //_schedule(f._topologies.front()._sources);
1066  }
1067  else {
1068  assert(f._topologies.size() == 1);
1069 
1070  // Need to back up the promise first here becuz taskflow might be
1071  // destroy before taskflow leaves
1072  auto p {std::move((*tpg)->_promise)};
1073 
1074  f._topologies.pop_front();
1075 
1076  f._mtx.unlock();
1077 
1078  // We set the promise in the end in case taskflow leaves before taskflow
1079  p.set_value();
1080 
1081  _decrement_topology_and_notify();
1082 
1083  // Reset topology so caller can stop execution
1084  *tpg = nullptr;
1085  }
1086  }
1087 }
1088 
1089 // Function: run_until
1090 template <typename P, typename C>
1092 
1093  _increment_topology();
1094 
1095  // Special case of predicate
1096  if(f.empty() || pred()) {
1097  std::promise<void> promise;
1098  promise.set_value();
1099  _decrement_topology_and_notify();
1100  return promise.get_future();
1101  }
1102 
1103  // Multi-threaded execution.
1104  bool run_now {false};
1105  Topology* tpg;
1106  std::future<void> future;
1107 
1108  {
1109  std::lock_guard<std::mutex> lock(f._mtx);
1110 
1111  // create a topology for this run
1112  //tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
1113  f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1114  tpg = &(f._topologies.back());
1115  future = tpg->_promise.get_future();
1116 
1117  if(f._topologies.size() == 1) {
1118  run_now = true;
1119  //tpg->_bind(f._graph);
1120  //_schedule(tpg->_sources);
1121  }
1122  }
1123 
1124  // Notice here calling schedule may cause the topology to be removed sonner
1125  // before the function leaves.
1126  if(run_now) {
1127  _set_up_topology(tpg);
1128  _schedule(tpg->_sources);
1129  }
1130 
1131  return future;
1132 }
1133 
1134 // Procedure: _increment_topology
1135 inline void Executor::_increment_topology() {
1136  std::lock_guard<std::mutex> lock(_topology_mutex);
1137  ++_num_topologies;
1138 }
1139 
1140 // Procedure: _decrement_topology_and_notify
1141 inline void Executor::_decrement_topology_and_notify() {
1142  std::lock_guard<std::mutex> lock(_topology_mutex);
1143  if(--_num_topologies == 0) {
1144  _topology_cv.notify_all();
1145  }
1146 }
1147 
1148 // Procedure: _decrement_topology
1149 inline void Executor::_decrement_topology() {
1150  std::lock_guard<std::mutex> lock(_topology_mutex);
1151  --_num_topologies;
1152 }
1153 
1154 // Procedure: wait_for_all
1155 inline void Executor::wait_for_all() {
1156  std::unique_lock<std::mutex> lock(_topology_mutex);
1157  _topology_cv.wait(lock, [&](){ return _num_topologies == 0; });
1158 }
1159 
1160 } // end of namespace tf -----------------------------------------------------
1161 
1162 
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:335
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:967
void remove_observer()
removes the associated observer
Definition: executor.hpp:583
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:990
~Executor()
destructs the executor
Definition: executor.hpp:296
T yield(T... args)
Definition: error.hpp:9
T hardware_concurrency(T... args)
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:574
T lock(T... args)
main entry to create a task dependency graph
Definition: taskflow.hpp:18
bool empty() const
queries the emptiness of the taskflow
Definition: taskflow.hpp:132
T move(T... args)
size_t num_domains() const
queries the number of worker domains
Definition: executor.hpp:319
T ref(T... args)
Executor(unsigned N=std::thread::hardware_concurrency(), unsigned M=cuda_num_devices())
constructs the executor with N/M cpu/gpu worker threads
Definition: executor.hpp:255
execution interface for running a taskflow graph
Definition: executor.hpp:32
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:314
size_t num_topologies() const
queries the number of running topologies at the time of this call
Definition: executor.hpp:324
building methods of a subflow graph in dynamic tasking
Definition: flow_builder.hpp:860
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:978
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1155