18 #include "notifier.hpp" 19 #include "observer.hpp" 20 #include "taskflow.hpp" 38 TaskQueue<Node*> queue;
39 Node* cache {
nullptr};
45 Worker* worker {
nullptr};
124 template<
typename P,
typename C>
152 template<
typename Observer,
typename... Args>
174 unsigned _num_topologies {0};
181 TaskQueue<Node*> _queue;
191 unsigned _find_victim(
unsigned);
193 PerThread& _per_thread()
const;
195 bool _wait_for_task(Worker&, Node*&);
197 void _spawn(
unsigned);
198 void _exploit_task(Worker&, Node*&);
199 void _explore_task(Worker&, Node*&);
200 void _schedule(Node*,
bool);
201 void _schedule(PassiveVector<Node*>&);
202 void _invoke(Worker&, Node*);
203 void _invoke_static_work(Worker&, Node*);
204 void _invoke_dynamic_work(Worker&, Node*,
Subflow&);
205 void _invoke_condition_work(Worker&, Node*,
int&);
206 void _set_up_module_work(Node*,
bool&);
207 void _set_up_topology(Topology*);
208 void _tear_down_topology(Topology**);
209 void _increment_topology();
210 void _decrement_topology();
211 void _decrement_topology_and_notify();
218 _notifier {_waiters} {
221 TF_THROW(
"no workers to execute the graph");
235 _notifier.notify(
true);
237 for(
auto& t : _threads){
244 return _workers.size();
248 inline Executor::PerThread& Executor::_per_thread()
const {
249 thread_local PerThread pt;
255 auto worker = _per_thread().worker;
256 return worker ?
static_cast<int>(worker->id) : -1;
260 inline void Executor::_spawn(
unsigned N) {
263 for(
unsigned i=0; i<N; ++i) {
267 _threads.emplace_back([
this] (Worker& w) ->
void {
269 PerThread& pt = _per_thread();
281 if(_wait_for_task(w, t) ==
false) {
291 inline unsigned Executor::_find_victim(
unsigned thief) {
313 for(
unsigned vtm=0; vtm<_workers.size(); ++vtm){
314 if((thief == vtm && !_queue.empty()) ||
315 (thief != vtm && !_workers[vtm].queue.empty())) {
320 return static_cast<unsigned>(_workers.size());
324 inline void Executor::_explore_task(Worker& thief, Node*& t) {
329 const unsigned l = 0;
330 const unsigned r =
static_cast<unsigned>(_workers.size()) - 1;
332 const size_t F = (_workers.size() + 1) << 1;
333 const size_t Y = 100;
343 t = (vtm == thief.id) ? _queue.steal() : _workers[vtm].queue.steal();
375 inline void Executor::_exploit_task(Worker& w, Node*& t) {
380 if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) {
381 _notifier.notify(
false);
384 auto tpg = t->_topology;
385 auto par = t->_parent;
416 if(t->_parent == par) {
423 t->_topology->_join_counter.fetch_sub(w.num_executed);
426 auto ret = par->_join_counter.fetch_sub(w.num_executed);
427 if(ret == w.num_executed) {
439 if(tpg->_join_counter.fetch_sub(w.num_executed) == w.num_executed) {
441 _tear_down_topology(&tpg);
451 if(par->_join_counter.fetch_sub(w.num_executed) == w.num_executed) {
466 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
476 _explore_task(worker, t);
479 auto N = _num_thieves.fetch_sub(1);
481 _notifier.notify(
false);
486 auto waiter = &_waiters[worker.id];
488 _notifier.prepare_wait(waiter);
491 if(!_queue.empty()) {
493 _notifier.cancel_wait(waiter);
498 auto N = _num_thieves.fetch_sub(1);
500 _notifier.notify(
false);
510 _notifier.cancel_wait(waiter);
511 _notifier.notify(
true);
516 if(_num_thieves.fetch_sub(1) == 1 && _num_actives) {
517 _notifier.cancel_wait(waiter);
522 _notifier.commit_wait(waiter);
528 template<
typename Observer,
typename... Args>
531 auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
532 tmp->set_up(_workers.size());
534 return static_cast<Observer*
>(_observer.get());
545 inline void Executor::_schedule(Node* node,
bool bypass) {
550 auto worker = _per_thread().worker;
552 if(worker !=
nullptr) {
554 worker->queue.push(node);
557 assert(!worker->cache);
558 worker->cache = node;
569 _notifier.notify(
false);
575 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
581 const auto num_nodes = nodes.size();
588 auto worker = _per_thread().worker;
590 if(worker !=
nullptr) {
591 for(
size_t i=0; i<num_nodes; ++i) {
592 worker->queue.push(nodes[i]);
600 for(
size_t k=0; k<num_nodes; ++k) {
601 _queue.push(nodes[k]);
605 if(num_nodes >= _workers.size()) {
606 _notifier.notify(
true);
609 for(
size_t k=0; k<num_nodes; ++k) {
610 _notifier.notify(
false);
617 inline void Executor::_invoke(Worker& worker, Node* node) {
623 const auto num_successors = node->num_successors();
626 if(node->_handle.index() == Node::STATIC_WORK) {
627 _invoke_static_work(worker, node);
630 else if(node->_handle.index() == Node::MODULE_WORK) {
631 bool first_time = !node->_has_state(Node::SPAWNED);
632 bool emptiness =
false;
633 _set_up_module_work(node, emptiness);
634 if(first_time && !emptiness) {
639 else if (node->_handle.index() == Node::DYNAMIC_WORK) {
641 auto& subgraph = nstd::get<Node::DynamicWork>(node->_handle).subgraph;
644 if(!node->_has_state(Node::SPAWNED)) {
650 _invoke_dynamic_work(worker, node, fb);
653 if(!node->_has_state(Node::SPAWNED)) {
654 node->_set_state(Node::SPAWNED);
655 if(!subgraph.empty()) {
657 PassiveVector<Node*> src;
659 for(
auto n : subgraph._nodes) {
661 n->_topology = node->_topology;
662 n->_set_up_join_counter();
668 if(n->num_dependents() == 0) {
673 const bool join = fb.
joined();
676 node->_topology->_join_counter.fetch_add(src.size());
680 node->_join_counter.fetch_add(src.size());
683 if(node->_parent ==
nullptr) {
684 node->_topology->_join_counter.fetch_add(1);
687 node->_parent->_join_counter.fetch_add(1);
700 else if(node->_handle.index() == Node::CONDITION_WORK) {
702 if(node->_has_state(Node::BRANCH)) {
703 node->_join_counter = node->num_strong_dependents();
706 node->_join_counter = node->num_dependents();
710 _invoke_condition_work(worker, node,
id);
712 if(
id >= 0 && static_cast<size_t>(
id) < num_successors) {
713 node->_successors[id]->_join_counter.store(0);
714 _schedule(node->_successors[
id],
true);
723 if(node->_has_state(Node::BRANCH)) {
725 node->_join_counter = node->num_strong_dependents();
728 node->_join_counter = node->num_dependents();
731 node->_unset_state(Node::SPAWNED);
734 Node* cache {
nullptr};
735 size_t num_spawns {0};
737 auto& c = (node->_parent) ? node->_parent->_join_counter : node->_topology->_join_counter;
739 for(
size_t i=0; i<num_successors; ++i) {
740 if(--(node->_successors[i]->_join_counter) == 0) {
742 if(num_spawns == 0) {
743 c.fetch_add(num_successors);
746 _schedule(cache,
false);
748 cache = node->_successors[i];
753 worker.num_executed += (node->_successors.size() - num_spawns);
757 _schedule(cache,
true);
762 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
764 _observer->on_entry(worker.id,
TaskView(node));
765 nstd::get<Node::StaticWork>(node->_handle).work();
766 _observer->on_exit(worker.id,
TaskView(node));
769 nstd::get<Node::StaticWork>(node->_handle).work();
774 inline void Executor::_invoke_dynamic_work(Worker& worker, Node* node,
Subflow& sf) {
776 _observer->on_entry(worker.id,
TaskView(node));
777 nstd::get<Node::DynamicWork>(node->_handle).work(sf);
778 _observer->on_exit(worker.id,
TaskView(node));
781 nstd::get<Node::DynamicWork>(node->_handle).work(sf);
786 inline void Executor::_invoke_condition_work(Worker& worker, Node* node,
int&
id) {
788 _observer->on_entry(worker.id,
TaskView(node));
789 id = nstd::get<Node::ConditionWork>(node->_handle).work();
790 _observer->on_exit(worker.id,
TaskView(node));
793 id = nstd::get<Node::ConditionWork>(node->_handle).work();
798 inline void Executor::_set_up_module_work(Node* node,
bool& ept) {
801 if(node->_has_state(Node::SPAWNED)) {
806 node->_set_state(Node::SPAWNED);
808 auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
810 if(module->empty()) {
815 PassiveVector<Node*> src;
817 for(
auto n: module->_graph._nodes) {
819 n->_topology = node->_topology;
821 n->_set_up_join_counter();
823 if(n->num_dependents() == 0) {
828 node->_join_counter.fetch_add(src.size());
830 if(node->_parent ==
nullptr) {
831 node->_topology->_join_counter.fetch_add(1);
834 node->_parent->_join_counter.fetch_add(1);
843 return run_n(f, 1, [](){});
847 template <
typename C>
849 return run_n(f, 1, std::forward<C>(c));
854 return run_n(f, repeat, [](){});
858 template <
typename C>
860 return run_until(f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c));
866 return run_until(f, std::forward<P>(pred), [](){});
870 inline void Executor::_set_up_topology(Topology* tpg) {
872 tpg->_sources.clear();
875 for(
auto node : tpg->_taskflow._graph._nodes) {
877 node->_topology = tpg;
878 node->_clear_state();
880 if(node->num_dependents() == 0) {
881 tpg->_sources.push_back(node);
884 node->_set_up_join_counter();
887 tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
891 inline void Executor::_tear_down_topology(Topology** tpg) {
893 auto &f = (*tpg)->_taskflow;
898 if(! (*tpg)->_pred() ) {
901 assert((*tpg)->_join_counter == 0);
902 (*tpg)->_join_counter = (*tpg)->_sources.size();
904 _schedule((*tpg)->_sources);
909 if((*tpg)->_call !=
nullptr) {
916 if(f._topologies.size() > 1) {
918 assert((*tpg)->_join_counter == 0);
921 (*tpg)->_promise.set_value();
922 f._topologies.pop_front();
926 _decrement_topology();
928 *tpg = &(f._topologies.front());
930 _set_up_topology(*tpg);
931 _schedule((*tpg)->_sources);
943 assert(f._topologies.size() == 1);
949 f._topologies.pop_front();
956 _decrement_topology_and_notify();
965 template <
typename P,
typename C>
968 _increment_topology();
971 if(f.
empty() || pred()) {
974 _decrement_topology_and_notify();
975 return promise.get_future();
1014 bool run_now {
false};
1023 f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1024 tpg = &(f._topologies.back());
1025 future = tpg->_promise.get_future();
1027 if(f._topologies.size() == 1) {
1037 _set_up_topology(tpg);
1038 _schedule(tpg->_sources);
1045 inline void Executor::_increment_topology() {
1051 inline void Executor::_decrement_topology_and_notify() {
1053 if(--_num_topologies == 0) {
1054 _topology_cv.notify_all();
1059 inline void Executor::_decrement_topology() {
1067 _topology_cv.wait(lock, [&](){
return _num_topologies == 0; });
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:254
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:842
void remove_observer()
removes the associated observer
Definition: executor.hpp:538
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:865
~Executor()
destructs the executor
Definition: executor.hpp:228
Definition: taskflow.hpp:5
T hardware_concurrency(T... args)
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:396
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:529
the class to create a task dependency graph
Definition: core/taskflow.hpp:18
an immutable accessor class to a task node, mainly used in the tf::ExecutorObserver interface...
Definition: task.hpp:396
bool empty() const
queries the emptiness of the taskflow
Definition: core/taskflow.hpp:139
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:401
The executor class to run a taskflow graph.
Definition: executor.hpp:33
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:243
Executor(unsigned n=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition: executor.hpp:215
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:344
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:853
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1065