Cpp-Taskflow  2.3.0
executor.hpp
1 #pragma once
2 
3 #include <iostream>
4 #include <vector>
5 #include <cstdlib>
6 #include <cstdio>
7 #include <random>
8 #include <atomic>
9 #include <memory>
10 #include <deque>
11 #include <thread>
12 #include <algorithm>
13 #include <set>
14 #include <numeric>
15 #include <cassert>
16 
17 #include "tsq.hpp"
18 #include "notifier.hpp"
19 #include "observer.hpp"
20 #include "taskflow.hpp"
21 
22 namespace tf {
23 
33 class Executor {
34 
35  struct Worker {
36  unsigned id;
37  std::mt19937 rdgen { std::random_device{}() };
38  TaskQueue<Node*> queue;
39  Node* cache {nullptr};
40 
41  int num_executed {0};
42  };
43 
44  struct PerThread {
45  Worker* worker {nullptr};
46  };
47 
48  public:
49 
53  explicit Executor(unsigned n = std::thread::hardware_concurrency());
54 
58  ~Executor();
59 
67  std::future<void> run(Taskflow& taskflow);
68 
77  template<typename C>
78  std::future<void> run(Taskflow& taskflow, C&& callable);
79 
88  std::future<void> run_n(Taskflow& taskflow, size_t N);
89 
99  template<typename C>
100  std::future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
101 
111  template<typename P>
112  std::future<void> run_until(Taskflow& taskflow, P&& pred);
113 
124  template<typename P, typename C>
125  std::future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
126 
130  void wait_for_all();
131 
137  size_t num_workers() const;
138 
152  template<typename Observer, typename... Args>
153  Observer* make_observer(Args&&... args);
154 
158  void remove_observer();
159 
166  int this_worker_id() const;
167 
168  private:
169 
170  std::condition_variable _topology_cv;
171  std::mutex _topology_mutex;
172  std::mutex _queue_mutex;
173 
174  unsigned _num_topologies {0};
175 
176  // scheduler field
177  std::vector<Worker> _workers;
179  std::vector<std::thread> _threads;
180 
181  TaskQueue<Node*> _queue;
182 
183  std::atomic<size_t> _num_actives {0};
184  std::atomic<size_t> _num_thieves {0};
185  std::atomic<bool> _done {0};
186 
187  Notifier _notifier;
188 
190 
191  unsigned _find_victim(unsigned);
192 
193  PerThread& _per_thread() const;
194 
195  bool _wait_for_task(Worker&, Node*&);
196 
197  void _spawn(unsigned);
198  void _exploit_task(Worker&, Node*&);
199  void _explore_task(Worker&, Node*&);
200  void _schedule(Node*, bool);
201  void _schedule(PassiveVector<Node*>&);
202  void _invoke(Worker&, Node*);
203  void _invoke_static_work(Worker&, Node*);
204  void _invoke_dynamic_work(Worker&, Node*, Subflow&);
205  void _invoke_condition_work(Worker&, Node*, int&);
206  void _set_up_module_work(Node*, bool&);
207  void _set_up_topology(Topology*);
208  void _tear_down_topology(Topology**);
209  void _increment_topology();
210  void _decrement_topology();
211  void _decrement_topology_and_notify();
212 };
213 
214 // Constructor
215 inline Executor::Executor(unsigned N) :
216  _workers {N},
217  _waiters {N},
218  _notifier {_waiters} {
219 
220  if(N == 0) {
221  TF_THROW("no workers to execute the graph");
222  }
223 
224  _spawn(N);
225 }
226 
227 // Destructor
229 
230  // wait for all topologies to complete
231  wait_for_all();
232 
233  // shut down the scheduler
234  _done = true;
235  _notifier.notify(true);
236 
237  for(auto& t : _threads){
238  t.join();
239  }
240 }
241 
242 // Function: num_workers
243 inline size_t Executor::num_workers() const {
244  return _workers.size();
245 }
246 
247 // Function: _per_thread
248 inline Executor::PerThread& Executor::_per_thread() const {
249  thread_local PerThread pt;
250  return pt;
251 }
252 
253 // Function: this_worker_id
254 inline int Executor::this_worker_id() const {
255  auto worker = _per_thread().worker;
256  return worker ? static_cast<int>(worker->id) : -1;
257 }
258 
259 // Procedure: _spawn
260 inline void Executor::_spawn(unsigned N) {
261 
262  // Lock to synchronize all workers before creating _worker_maps
263  for(unsigned i=0; i<N; ++i) {
264 
265  _workers[i].id = i;
266 
267  _threads.emplace_back([this] (Worker& w) -> void {
268 
269  PerThread& pt = _per_thread();
270  pt.worker = &w;
271 
272  Node* t = nullptr;
273 
274  // must use 1 as condition instead of !done
275  while(1) {
276 
277  // execute the tasks.
278  _exploit_task(w, t);
279 
280  // wait for tasks
281  if(_wait_for_task(w, t) == false) {
282  break;
283  }
284  }
285 
286  }, std::ref(_workers[i]));
287  }
288 }
289 
290 // Function: _find_victim
291 inline unsigned Executor::_find_victim(unsigned thief) {
292 
293  /*unsigned l = 0;
294  unsigned r = _workers.size() - 1;
295  unsigned vtm = std::uniform_int_distribution<unsigned>{l, r}(
296  _workers[thief].rdgen
297  );
298 
299  // try to look for a task from other workers
300  for(unsigned i=0; i<_workers.size(); ++i){
301 
302  if((thief == vtm && !_queue.empty()) ||
303  (thief != vtm && !_workers[vtm].queue.empty())) {
304  return vtm;
305  }
306 
307  if(++vtm; vtm == _workers.size()) {
308  vtm = 0;
309  }
310  } */
311 
312  // try to look for a task from other workers
313  for(unsigned vtm=0; vtm<_workers.size(); ++vtm){
314  if((thief == vtm && !_queue.empty()) ||
315  (thief != vtm && !_workers[vtm].queue.empty())) {
316  return vtm;
317  }
318  }
319 
320  return static_cast<unsigned>(_workers.size());
321 }
322 
323 // Function: _explore_task
324 inline void Executor::_explore_task(Worker& thief, Node*& t) {
325 
326  //assert(_workers[thief].queue.empty());
327  assert(!t);
328 
329  const unsigned l = 0;
330  const unsigned r = static_cast<unsigned>(_workers.size()) - 1;
331 
332  const size_t F = (_workers.size() + 1) << 1;
333  const size_t Y = 100;
334 
335  size_t f = 0;
336  size_t y = 0;
337 
338  // explore
339  while(!_done) {
340 
341  unsigned vtm = std::uniform_int_distribution<unsigned>{l, r}(thief.rdgen);
342 
343  t = (vtm == thief.id) ? _queue.steal() : _workers[vtm].queue.steal();
344 
345  if(t) {
346  break;
347  }
348 
349  if(f++ > F) {
351  if(y++ > Y) {
352  break;
353  }
354  }
355 
356  /*if(auto vtm = _find_victim(thief); vtm != _workers.size()) {
357  t = (vtm == thief) ? _queue.steal() : _workers[vtm].queue.steal();
358  // successful thief
359  if(t) {
360  break;
361  }
362  }
363  else {
364  if(f++ > F) {
365  if(std::this_thread::yield(); y++ > Y) {
366  break;
367  }
368  }
369  }*/
370  }
371 
372 }
373 
374 // Procedure: _exploit_task
375 inline void Executor::_exploit_task(Worker& w, Node*& t) {
376 
377  assert(!w.cache);
378 
379  if(t) {
380  if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) {
381  _notifier.notify(false);
382  }
383 
384  auto tpg = t->_topology;
385  auto par = t->_parent;
386  w.num_executed = 1;
387 
388  do {
389  // Only joined subflow will enter block
390  // Flush the num_executed if encountering a different subflow
391  //if((*t)->_parent != par) {
392  // if(par == nullptr) {
393  // (*t)->_topology->_join_counter.fetch_sub(w.num_executed);
394  // }
395  // else {
396  // auto ret = par->_join_counter.fetch_sub(w.num_executed);
397  // if(ret == w.num_executed) {
398  // _schedule(par, false);
399  // }
400  // }
401  // w.num_executed = 1;
402  // par = (*t)->_parent;
403  //}
404 
405  _invoke(w, t);
406 
407  if(w.cache) {
408  t = w.cache;
409  w.cache = nullptr;
410  }
411  else {
412  t = w.queue.pop();
413  if(t) {
414  // We only increment the counter when poping task from queue
415  // (NOT including cache!)
416  if(t->_parent == par) {
417  w.num_executed ++;
418  }
419  // joined subflow
420  else {
421  if(par == nullptr) {
422  // still have tasks so the topology join counter can't be zero
423  t->_topology->_join_counter.fetch_sub(w.num_executed);
424  }
425  else {
426  auto ret = par->_join_counter.fetch_sub(w.num_executed);
427  if(ret == w.num_executed) {
428  //_schedule(par, false);
429  w.queue.push(par);
430  }
431  }
432  w.num_executed = 1;
433  par = t->_parent;
434  }
435  }
436  else {
437  // If no more local tasks!
438  if(par == nullptr) {
439  if(tpg->_join_counter.fetch_sub(w.num_executed) == w.num_executed) {
440  // TODO: Store tpg in local variable not in w
441  _tear_down_topology(&tpg);
442  if(tpg != nullptr) {
443  t = w.queue.pop();
444  if(t) {
445  w.num_executed = 1;
446  }
447  }
448  }
449  }
450  else {
451  if(par->_join_counter.fetch_sub(w.num_executed) == w.num_executed) {
452  t = par;
453  w.num_executed = 1;
454  par = par->_parent;
455  }
456  }
457  }
458  }
459  } while(t);
460 
461  --_num_actives;
462  }
463 }
464 
465 // Function: _wait_for_task
466 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
467 
468  wait_for_task:
469 
470  assert(!t);
471 
472  ++_num_thieves;
473 
474  explore_task:
475 
476  _explore_task(worker, t);
477 
478  if(t) {
479  auto N = _num_thieves.fetch_sub(1);
480  if(N == 1) {
481  _notifier.notify(false);
482  }
483  return true;
484  }
485 
486  auto waiter = &_waiters[worker.id];
487 
488  _notifier.prepare_wait(waiter);
489 
490  //if(auto vtm = _find_victim(me); vtm != _workers.size()) {
491  if(!_queue.empty()) {
492 
493  _notifier.cancel_wait(waiter);
494  //t = (vtm == me) ? _queue.steal() : _workers[vtm].queue.steal();
495 
496  t = _queue.steal();
497  if(t) {
498  auto N = _num_thieves.fetch_sub(1);
499  if(N == 1) {
500  _notifier.notify(false);
501  }
502  return true;
503  }
504  else {
505  goto explore_task;
506  }
507  }
508 
509  if(_done) {
510  _notifier.cancel_wait(waiter);
511  _notifier.notify(true);
512  --_num_thieves;
513  return false;
514  }
515 
516  if(_num_thieves.fetch_sub(1) == 1 && _num_actives) {
517  _notifier.cancel_wait(waiter);
518  goto wait_for_task;
519  }
520 
521  // Now I really need to relinguish my self to others
522  _notifier.commit_wait(waiter);
523 
524  return true;
525 }
526 
527 // Function: make_observer
528 template<typename Observer, typename... Args>
529 Observer* Executor::make_observer(Args&&... args) {
530  // use a local variable to mimic the constructor
531  auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
532  tmp->set_up(_workers.size());
533  _observer = std::move(tmp);
534  return static_cast<Observer*>(_observer.get());
535 }
536 
537 // Procedure: remove_observer
539  _observer.reset();
540 }
541 
542 // Procedure: _schedule
543 // The main procedure to schedule a give task node.
544 // Each task node has two types of tasks - regular and subflow.
545 inline void Executor::_schedule(Node* node, bool bypass) {
546 
547  //assert(_workers.size() != 0);
548 
549  // caller is a worker to this pool
550  auto worker = _per_thread().worker;
551 
552  if(worker != nullptr) {
553  if(!bypass) {
554  worker->queue.push(node);
555  }
556  else {
557  assert(!worker->cache);
558  worker->cache = node;
559  }
560  return;
561  }
562 
563  // other threads
564  {
565  std::lock_guard<std::mutex> lock(_queue_mutex);
566  _queue.push(node);
567  }
568 
569  _notifier.notify(false);
570 }
571 
572 // Procedure: _schedule
573 // The main procedure to schedule a set of task nodes.
574 // Each task node has two types of tasks - regular and subflow.
575 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
576 
577  //assert(_workers.size() != 0);
578 
579  // We need to cacth the node count to avoid accessing the nodes
580  // vector while the parent topology is removed!
581  const auto num_nodes = nodes.size();
582 
583  if(num_nodes == 0) {
584  return;
585  }
586 
587  // worker thread
588  auto worker = _per_thread().worker;
589 
590  if(worker != nullptr) {
591  for(size_t i=0; i<num_nodes; ++i) {
592  worker->queue.push(nodes[i]);
593  }
594  return;
595  }
596 
597  // other threads
598  {
599  std::lock_guard<std::mutex> lock(_queue_mutex);
600  for(size_t k=0; k<num_nodes; ++k) {
601  _queue.push(nodes[k]);
602  }
603  }
604 
605  if(num_nodes >= _workers.size()) {
606  _notifier.notify(true);
607  }
608  else {
609  for(size_t k=0; k<num_nodes; ++k) {
610  _notifier.notify(false);
611  }
612  }
613 }
614 
615 
616 // Procedure: _invoke
617 inline void Executor::_invoke(Worker& worker, Node* node) {
618 
619  //assert(_workers.size() != 0);
620 
621  // Here we need to fetch the num_successors first to avoid the invalid memory
622  // access caused by topology clear.
623  const auto num_successors = node->num_successors();
624 
625  // static task
626  if(node->_handle.index() == Node::STATIC_WORK) {
627  _invoke_static_work(worker, node);
628  }
629  // module task
630  else if(node->_handle.index() == Node::MODULE_WORK) {
631  bool first_time = !node->_has_state(Node::SPAWNED);
632  bool emptiness = false;
633  _set_up_module_work(node, emptiness);
634  if(first_time && !emptiness) {
635  return;
636  }
637  }
638  // dynamic task
639  else if (node->_handle.index() == Node::DYNAMIC_WORK) {
640 
641  auto& subgraph = nstd::get<Node::DynamicWork>(node->_handle).subgraph;
642 
643  // Clear the subgraph before the task execution
644  if(!node->_has_state(Node::SPAWNED)) {
645  subgraph.clear();
646  }
647 
648  Subflow fb(subgraph);
649 
650  _invoke_dynamic_work(worker, node, fb);
651 
652  // Need to create a subflow if first time & subgraph is not empty
653  if(!node->_has_state(Node::SPAWNED)) {
654  node->_set_state(Node::SPAWNED);
655  if(!subgraph.empty()) {
656  // For storing the source nodes
657  PassiveVector<Node*> src;
658 
659  for(auto n : subgraph._nodes) {
660 
661  n->_topology = node->_topology;
662  n->_set_up_join_counter();
663 
664  if(!fb.detached()) {
665  n->_parent = node;
666  }
667 
668  if(n->num_dependents() == 0) {
669  src.push_back(n);
670  }
671  }
672 
673  const bool join = fb.joined();
674  if(!join) {
675  // Detach mode
676  node->_topology->_join_counter.fetch_add(src.size());
677  }
678  else {
679  // Join mode
680  node->_join_counter.fetch_add(src.size());
681 
682  // spawned node needs another second-round execution
683  if(node->_parent == nullptr) {
684  node->_topology->_join_counter.fetch_add(1);
685  }
686  else {
687  node->_parent->_join_counter.fetch_add(1);
688  }
689  }
690 
691  _schedule(src);
692 
693  if(join) {
694  return;
695  }
696  } // End of first time
697  }
698  }
699  // condition task
700  else if(node->_handle.index() == Node::CONDITION_WORK) {
701 
702  if(node->_has_state(Node::BRANCH)) {
703  node->_join_counter = node->num_strong_dependents();
704  }
705  else {
706  node->_join_counter = node->num_dependents();
707  }
708 
709  int id;
710  _invoke_condition_work(worker, node, id);
711 
712  if(id >= 0 && static_cast<size_t>(id) < num_successors) {
713  node->_successors[id]->_join_counter.store(0);
714  _schedule(node->_successors[id], true);
715  }
716  return ;
717  }
718 
719 
720  // We MUST recover the dependency since subflow is a condition node can go back (cyclic)
721  // This must be done before scheduling the successors, otherwise this might cause
722  // race condition on the _dependents
723  if(node->_has_state(Node::BRANCH)) {
724  // If this is a case node, we need to deduct condition predecessors
725  node->_join_counter = node->num_strong_dependents();
726  }
727  else {
728  node->_join_counter = node->num_dependents();
729  }
730 
731  node->_unset_state(Node::SPAWNED);
732 
733  // At this point, the node storage might be destructed.
734  Node* cache {nullptr};
735  size_t num_spawns {0};
736 
737  auto& c = (node->_parent) ? node->_parent->_join_counter : node->_topology->_join_counter;
738 
739  for(size_t i=0; i<num_successors; ++i) {
740  if(--(node->_successors[i]->_join_counter) == 0) {
741  if(cache) {
742  if(num_spawns == 0) {
743  c.fetch_add(num_successors);
744  }
745  num_spawns++;
746  _schedule(cache, false);
747  }
748  cache = node->_successors[i];
749  }
750  }
751 
752  if(num_spawns) {
753  worker.num_executed += (node->_successors.size() - num_spawns);
754  }
755 
756  if(cache) {
757  _schedule(cache, true);
758  }
759 }
760 
761 // Procedure: _invoke_static_work
762 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
763  if(_observer) {
764  _observer->on_entry(worker.id, TaskView(node));
765  nstd::get<Node::StaticWork>(node->_handle).work();
766  _observer->on_exit(worker.id, TaskView(node));
767  }
768  else {
769  nstd::get<Node::StaticWork>(node->_handle).work();
770  }
771 }
772 
773 // Procedure: _invoke_dynamic_work
774 inline void Executor::_invoke_dynamic_work(Worker& worker, Node* node, Subflow& sf) {
775  if(_observer) {
776  _observer->on_entry(worker.id, TaskView(node));
777  nstd::get<Node::DynamicWork>(node->_handle).work(sf);
778  _observer->on_exit(worker.id, TaskView(node));
779  }
780  else {
781  nstd::get<Node::DynamicWork>(node->_handle).work(sf);
782  }
783 }
784 
785 // Procedure: _invoke_condition_work
786 inline void Executor::_invoke_condition_work(Worker& worker, Node* node, int& id) {
787  if(_observer) {
788  _observer->on_entry(worker.id, TaskView(node));
789  id = nstd::get<Node::ConditionWork>(node->_handle).work();
790  _observer->on_exit(worker.id, TaskView(node));
791  }
792  else {
793  id = nstd::get<Node::ConditionWork>(node->_handle).work();
794  }
795 }
796 
797 // Procedure: _set_up_module_work
798 inline void Executor::_set_up_module_work(Node* node, bool& ept) {
799 
800  // second time to enter this context
801  if(node->_has_state(Node::SPAWNED)) {
802  return;
803  }
804 
805  // first time to enter this context
806  node->_set_state(Node::SPAWNED);
807 
808  auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
809 
810  if(module->empty()) {
811  ept = true;
812  return;
813  }
814 
815  PassiveVector<Node*> src;
816 
817  for(auto n: module->_graph._nodes) {
818 
819  n->_topology = node->_topology;
820  n->_parent = node;
821  n->_set_up_join_counter();
822 
823  if(n->num_dependents() == 0) {
824  src.push_back(n);
825  }
826  }
827 
828  node->_join_counter.fetch_add(src.size());
829 
830  if(node->_parent == nullptr) {
831  node->_topology->_join_counter.fetch_add(1);
832  }
833  else {
834  node->_parent->_join_counter.fetch_add(1);
835  }
836 
837  // src can't be empty (banned outside)
838  _schedule(src);
839 }
840 
841 // Function: run
843  return run_n(f, 1, [](){});
844 }
845 
846 // Function: run
847 template <typename C>
849  return run_n(f, 1, std::forward<C>(c));
850 }
851 
852 // Function: run_n
853 inline std::future<void> Executor::run_n(Taskflow& f, size_t repeat) {
854  return run_n(f, repeat, [](){});
855 }
856 
857 // Function: run_n
858 template <typename C>
859 std::future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
860  return run_until(f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c));
861 }
862 
863 // Function: run_until
864 template<typename P>
866  return run_until(f, std::forward<P>(pred), [](){});
867 }
868 
869 // Function: _set_up_topology
870 inline void Executor::_set_up_topology(Topology* tpg) {
871 
872  tpg->_sources.clear();
873 
874  // scan each node in the graph and build up the links
875  for(auto node : tpg->_taskflow._graph._nodes) {
876 
877  node->_topology = tpg;
878  node->_clear_state();
879 
880  if(node->num_dependents() == 0) {
881  tpg->_sources.push_back(node);
882  }
883 
884  node->_set_up_join_counter();
885  }
886 
887  tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
888 }
889 
890 // Function: _tear_down_topology
891 inline void Executor::_tear_down_topology(Topology** tpg) {
892 
893  auto &f = (*tpg)->_taskflow;
894 
895  //assert(&tpg == &(f._topologies.front()));
896 
897  // case 1: we still need to run the topology again
898  if(! (*tpg)->_pred() ) {
899  //tpg->_recover_num_sinks();
900 
901  assert((*tpg)->_join_counter == 0);
902  (*tpg)->_join_counter = (*tpg)->_sources.size();
903 
904  _schedule((*tpg)->_sources);
905  }
906  // case 2: the final run of this topology
907  else {
908 
909  if((*tpg)->_call != nullptr) {
910  (*tpg)->_call();
911  }
912 
913  f._mtx.lock();
914 
915  // If there is another run (interleave between lock)
916  if(f._topologies.size() > 1) {
917 
918  assert((*tpg)->_join_counter == 0);
919 
920  // Set the promise
921  (*tpg)->_promise.set_value();
922  f._topologies.pop_front();
923  f._mtx.unlock();
924 
925  // decrement the topology but since this is not the last we don't notify
926  _decrement_topology();
927 
928  *tpg = &(f._topologies.front());
929 
930  _set_up_topology(*tpg);
931  _schedule((*tpg)->_sources);
932 
933  //f._topologies.front()._bind(f._graph);
934  //*tpg = &(f._topologies.front());
935 
936  //assert(f._topologies.front()._join_counter == 0);
937 
938  //f._topologies.front()._join_counter = f._topologies.front()._sources.size();
939 
940  //_schedule(f._topologies.front()._sources);
941  }
942  else {
943  assert(f._topologies.size() == 1);
944 
945  // Need to back up the promise first here becuz taskflow might be
946  // destroy before taskflow leaves
947  auto p {std::move((*tpg)->_promise)};
948 
949  f._topologies.pop_front();
950 
951  f._mtx.unlock();
952 
953  // We set the promise in the end in case taskflow leaves before taskflow
954  p.set_value();
955 
956  _decrement_topology_and_notify();
957 
958  // Reset topology so caller can stop execution
959  *tpg = nullptr;
960  }
961  }
962 }
963 
964 // Function: run_until
965 template <typename P, typename C>
967 
968  _increment_topology();
969 
970  // Special case of predicate
971  if(f.empty() || pred()) {
972  std::promise<void> promise;
973  promise.set_value();
974  _decrement_topology_and_notify();
975  return promise.get_future();
976  }
977 
978 
979 
983  //if(_workers.size() == 0) {
984  //
985  // Topology tpg(f, std::forward<P>(pred), std::forward<C>(c));
986 
987  // // Clear last execution data & Build precedence between nodes and target
988  // tpg._bind(f._graph);
989 
990  // std::stack<Node*> stack;
991 
992  // do {
993  // _schedule_unsync(tpg._sources, stack);
994  // while(!stack.empty()) {
995  // auto node = stack.top();
996  // stack.pop();
997  // _invoke_unsync(node, stack);
998  // }
999  // tpg._recover_num_sinks();
1000  // } while(!std::invoke(tpg._pred));
1001 
1002  // if(tpg._call != nullptr) {
1003  // std::invoke(tpg._call);
1004  // }
1005 
1006  // tpg._promise.set_value();
1007  //
1008  // _decrement_topology_and_notify();
1009  //
1010  // return tpg._promise.get_future();
1011  //}
1012 
1013  // Multi-threaded execution.
1014  bool run_now {false};
1015  Topology* tpg;
1016  std::future<void> future;
1017 
1018  {
1019  std::lock_guard<std::mutex> lock(f._mtx);
1020 
1021  // create a topology for this run
1022  //tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
1023  f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1024  tpg = &(f._topologies.back());
1025  future = tpg->_promise.get_future();
1026 
1027  if(f._topologies.size() == 1) {
1028  run_now = true;
1029  //tpg->_bind(f._graph);
1030  //_schedule(tpg->_sources);
1031  }
1032  }
1033 
1034  // Notice here calling schedule may cause the topology to be removed sonner
1035  // before the function leaves.
1036  if(run_now) {
1037  _set_up_topology(tpg);
1038  _schedule(tpg->_sources);
1039  }
1040 
1041  return future;
1042 }
1043 
1044 // Procedure: _increment_topology
1045 inline void Executor::_increment_topology() {
1046  std::lock_guard<std::mutex> lock(_topology_mutex);
1047  ++_num_topologies;
1048 }
1049 
1050 // Procedure: _decrement_topology_and_notify
1051 inline void Executor::_decrement_topology_and_notify() {
1052  std::lock_guard<std::mutex> lock(_topology_mutex);
1053  if(--_num_topologies == 0) {
1054  _topology_cv.notify_all();
1055  }
1056 }
1057 
1058 // Procedure: _decrement_topology
1059 inline void Executor::_decrement_topology() {
1060  std::lock_guard<std::mutex> lock(_topology_mutex);
1061  --_num_topologies;
1062 }
1063 
1064 // Procedure: wait_for_all
1065 inline void Executor::wait_for_all() {
1066  std::unique_lock<std::mutex> lock(_topology_mutex);
1067  _topology_cv.wait(lock, [&](){ return _num_topologies == 0; });
1068 }
1069 
1070 } // end of namespace tf -----------------------------------------------------
1071 
1072 
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:254
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:842
void remove_observer()
removes the associated observer
Definition: executor.hpp:538
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:865
~Executor()
destructs the executor
Definition: executor.hpp:228
T yield(T... args)
Definition: taskflow.hpp:5
T hardware_concurrency(T... args)
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:396
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:529
the class to create a task dependency graph
Definition: core/taskflow.hpp:18
an immutable accessor class to a task node, mainly used in the tf::ExecutorObserver interface...
Definition: task.hpp:396
bool empty() const
queries the emptiness of the taskflow
Definition: core/taskflow.hpp:139
T move(T... args)
T ref(T... args)
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:401
The executor class to run a taskflow graph.
Definition: executor.hpp:33
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:243
Executor(unsigned n=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition: executor.hpp:215
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:344
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:853
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1065