18 #include "notifier.hpp" 19 #include "observer.hpp" 20 #include "taskflow.hpp" 39 Node* cache {
nullptr};
45 Worker* worker {
nullptr};
124 template<
typename P,
typename C>
152 template<
typename Observer,
typename... Args>
174 unsigned _num_topologies {0};
191 unsigned _find_victim(
unsigned);
193 PerThread& _per_thread()
const;
195 bool _wait_for_task(Worker&, Node*&);
197 void _spawn(
unsigned);
198 void _exploit_task(Worker&, Node*&);
199 void _explore_task(Worker&, Node*&);
200 void _schedule(Node*,
bool);
201 void _schedule(PassiveVector<Node*>&);
202 void _invoke(Worker&, Node*);
203 void _invoke_static_work(Worker&, Node*);
204 void _invoke_dynamic_work(Worker&, Node*,
Subflow&);
205 void _invoke_condition_work(Worker&, Node*,
int&);
207 #ifdef TF_ENABLE_CUDA 208 void _invoke_cudaflow_work(Worker&, Node*);
209 void _invoke_cudaflow_work_impl(Worker&, Node*);
212 void _set_up_module_work(Node*,
bool&);
213 void _set_up_topology(Topology*);
214 void _tear_down_topology(Topology**);
215 void _increment_topology();
216 void _decrement_topology();
217 void _decrement_topology_and_notify();
224 _notifier {_waiters} {
227 TF_THROW(
"no workers to execute the graph");
241 _notifier.notify(
true);
243 for(
auto& t : _threads){
250 return _workers.size();
254 inline Executor::PerThread& Executor::_per_thread()
const {
255 thread_local PerThread pt;
261 auto worker = _per_thread().worker;
262 return worker ?
static_cast<int>(worker->id) : -1;
266 inline void Executor::_spawn(
unsigned N) {
269 for(
unsigned i=0; i<N; ++i) {
273 _threads.emplace_back([
this] (Worker& w) ->
void {
275 PerThread& pt = _per_thread();
287 if(_wait_for_task(w, t) ==
false) {
297 inline unsigned Executor::_find_victim(
unsigned thief) {
319 for(
unsigned vtm=0; vtm<_workers.size(); ++vtm){
320 if((thief == vtm && !_queue.
empty()) ||
321 (thief != vtm && !_workers[vtm].queue.empty())) {
326 return static_cast<unsigned>(_workers.size());
330 inline void Executor::_explore_task(Worker& thief, Node*& t) {
335 const unsigned l = 0;
336 const unsigned r =
static_cast<unsigned>(_workers.size()) - 1;
338 const size_t F = (_workers.size() + 1) << 1;
339 const size_t Y = 100;
349 t = (vtm == thief.id) ? _queue.
steal() : _workers[vtm].queue.steal();
381 inline void Executor::_exploit_task(Worker& w, Node*& t) {
386 if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) {
387 _notifier.notify(
false);
390 auto tpg = t->_topology;
391 auto par = t->_parent;
422 if(t->_parent == par) {
429 t->_topology->_join_counter.fetch_sub(w.num_executed);
432 auto ret = par->_join_counter.fetch_sub(w.num_executed);
433 if(ret == w.num_executed) {
445 if(tpg->_join_counter.fetch_sub(w.num_executed) == w.num_executed) {
447 _tear_down_topology(&tpg);
457 if(par->_join_counter.fetch_sub(w.num_executed) == w.num_executed) {
472 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
482 _explore_task(worker, t);
485 auto N = _num_thieves.fetch_sub(1);
487 _notifier.notify(
false);
492 auto waiter = &_waiters[worker.id];
494 _notifier.prepare_wait(waiter);
497 if(!_queue.
empty()) {
499 _notifier.cancel_wait(waiter);
504 auto N = _num_thieves.fetch_sub(1);
506 _notifier.notify(
false);
516 _notifier.cancel_wait(waiter);
517 _notifier.notify(
true);
522 if(_num_thieves.fetch_sub(1) == 1 && _num_actives) {
523 _notifier.cancel_wait(waiter);
528 _notifier.commit_wait(waiter);
534 template<
typename Observer,
typename... Args>
537 auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
538 tmp->set_up(_workers.size());
540 return static_cast<Observer*
>(_observer.get());
551 inline void Executor::_schedule(Node* node,
bool bypass) {
556 auto worker = _per_thread().worker;
558 if(worker !=
nullptr) {
560 worker->queue.push(node);
563 assert(!worker->cache);
564 worker->cache = node;
575 _notifier.notify(
false);
581 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
587 const auto num_nodes = nodes.size();
594 auto worker = _per_thread().worker;
596 if(worker !=
nullptr) {
597 for(
size_t i=0; i<num_nodes; ++i) {
598 worker->queue.push(nodes[i]);
606 for(
size_t k=0; k<num_nodes; ++k) {
607 _queue.
push(nodes[k]);
611 if(num_nodes >= _workers.size()) {
612 _notifier.notify(
true);
615 for(
size_t k=0; k<num_nodes; ++k) {
616 _notifier.notify(
false);
623 inline void Executor::_invoke(Worker& worker, Node* node) {
629 const auto num_successors = node->num_successors();
632 switch(node->_handle.index()) {
634 case Node::STATIC_WORK:{
635 _invoke_static_work(worker, node);
639 case Node::MODULE_WORK: {
640 bool first_time = !node->_has_state(Node::SPAWNED);
641 bool emptiness =
false;
642 _set_up_module_work(node, emptiness);
643 if(first_time && !emptiness) {
649 case Node::DYNAMIC_WORK: {
651 auto& subgraph = nstd::get<Node::DynamicWork>(node->_handle).subgraph;
654 if(!node->_has_state(Node::SPAWNED)) {
658 Subflow fb(subgraph);
660 _invoke_dynamic_work(worker, node, fb);
663 if(!node->_has_state(Node::SPAWNED)) {
664 node->_set_state(Node::SPAWNED);
665 if(!subgraph.empty()) {
667 PassiveVector<Node*> src;
669 for(
auto n : subgraph._nodes) {
671 n->_topology = node->_topology;
672 n->_set_up_join_counter();
678 if(n->num_dependents() == 0) {
683 const bool join = fb.joined();
686 node->_topology->_join_counter.fetch_add(src.size());
690 node->_join_counter.fetch_add(src.size());
693 if(node->_parent ==
nullptr) {
694 node->_topology->_join_counter.fetch_add(1);
697 node->_parent->_join_counter.fetch_add(1);
711 case Node::CONDITION_WORK: {
713 if(node->_has_state(Node::BRANCH)) {
714 node->_join_counter = node->num_strong_dependents();
717 node->_join_counter = node->num_dependents();
721 _invoke_condition_work(worker, node,
id);
723 if(
id >= 0 && static_cast<size_t>(
id) < num_successors) {
724 node->_successors[id]->_join_counter.store(0);
725 _schedule(node->_successors[
id],
true);
731 #ifdef TF_ENABLE_CUDA 732 case Node::CUDAFLOW_WORK: {
733 _invoke_cudaflow_work(worker, node);
747 if(node->_has_state(Node::BRANCH)) {
749 node->_join_counter = node->num_strong_dependents();
752 node->_join_counter = node->num_dependents();
755 node->_unset_state(Node::SPAWNED);
758 Node* cache {
nullptr};
759 size_t num_spawns {0};
761 auto& c = (node->_parent) ? node->_parent->_join_counter :
762 node->_topology->_join_counter;
764 for(
size_t i=0; i<num_successors; ++i) {
765 if(--(node->_successors[i]->_join_counter) == 0) {
767 if(num_spawns == 0) {
768 c.fetch_add(num_successors);
771 _schedule(cache,
false);
773 cache = node->_successors[i];
778 worker.num_executed += (node->_successors.size() - num_spawns);
782 _schedule(cache,
true);
787 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
789 _observer->on_entry(worker.id, TaskView(node));
790 nstd::get<Node::StaticWork>(node->_handle).work();
791 _observer->on_exit(worker.id, TaskView(node));
794 nstd::get<Node::StaticWork>(node->_handle).work();
799 inline void Executor::_invoke_dynamic_work(Worker& worker, Node* node, Subflow& sf) {
801 _observer->on_entry(worker.id, TaskView(node));
802 nstd::get<Node::DynamicWork>(node->_handle).work(sf);
803 _observer->on_exit(worker.id, TaskView(node));
806 nstd::get<Node::DynamicWork>(node->_handle).work(sf);
811 inline void Executor::_invoke_condition_work(Worker& worker, Node* node,
int&
id) {
813 _observer->on_entry(worker.id, TaskView(node));
814 id = nstd::get<Node::ConditionWork>(node->_handle).work();
815 _observer->on_exit(worker.id, TaskView(node));
818 id = nstd::get<Node::ConditionWork>(node->_handle).work();
822 #ifdef TF_ENABLE_CUDA 824 inline void Executor::_invoke_cudaflow_work(Worker& worker, Node* node) {
826 _observer->on_entry(worker.id, TaskView(node));
827 _invoke_cudaflow_work_impl(worker, node);
828 _observer->on_exit(worker.id, TaskView(node));
831 _invoke_cudaflow_work_impl(worker, node);
836 inline void Executor::_invoke_cudaflow_work_impl(Worker&, Node* node) {
838 auto& h = nstd::get<Node::cudaFlowWork>(node->_handle);
842 cudaFlow cf(h.graph);
846 cudaGraphExec_t exec;
848 cudaGraphInstantiate(&exec, h.graph._handle,
nullptr,
nullptr, 0),
849 "failed to create an exec cudaGraph" 851 TF_CHECK_CUDA(cudaGraphLaunch(exec, 0),
"failed to launch cudaGraph")
852 TF_CHECK_CUDA(cudaStreamSynchronize(0), "failed to sync cudaStream");
854 cudaGraphExecDestroy(exec), "failed to destroy an exec cudaGraph"
860 inline void Executor::_set_up_module_work(Node* node,
bool& ept) {
863 if(node->_has_state(Node::SPAWNED)) {
868 node->_set_state(Node::SPAWNED);
870 auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
872 if(module->empty()) {
877 PassiveVector<Node*> src;
879 for(
auto n: module->_graph._nodes) {
881 n->_topology = node->_topology;
883 n->_set_up_join_counter();
885 if(n->num_dependents() == 0) {
890 node->_join_counter.fetch_add(src.size());
892 if(node->_parent ==
nullptr) {
893 node->_topology->_join_counter.fetch_add(1);
896 node->_parent->_join_counter.fetch_add(1);
905 return run_n(f, 1, [](){});
909 template <
typename C>
911 return run_n(f, 1, std::forward<C>(c));
916 return run_n(f, repeat, [](){});
920 template <
typename C>
922 return run_until(f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c));
928 return run_until(f, std::forward<P>(pred), [](){});
932 inline void Executor::_set_up_topology(Topology* tpg) {
934 tpg->_sources.clear();
937 for(
auto node : tpg->_taskflow._graph._nodes) {
939 node->_topology = tpg;
940 node->_clear_state();
942 if(node->num_dependents() == 0) {
943 tpg->_sources.push_back(node);
946 node->_set_up_join_counter();
949 tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
953 inline void Executor::_tear_down_topology(Topology** tpg) {
955 auto &f = (*tpg)->_taskflow;
960 if(! (*tpg)->_pred() ) {
963 assert((*tpg)->_join_counter == 0);
964 (*tpg)->_join_counter = (*tpg)->_sources.size();
966 _schedule((*tpg)->_sources);
971 if((*tpg)->_call !=
nullptr) {
978 if(f._topologies.size() > 1) {
980 assert((*tpg)->_join_counter == 0);
983 (*tpg)->_promise.set_value();
984 f._topologies.pop_front();
988 _decrement_topology();
990 *tpg = &(f._topologies.front());
992 _set_up_topology(*tpg);
993 _schedule((*tpg)->_sources);
1005 assert(f._topologies.size() == 1);
1011 f._topologies.pop_front();
1018 _decrement_topology_and_notify();
1027 template <
typename P,
typename C>
1030 _increment_topology();
1033 if(f.
empty() || pred()) {
1035 promise.set_value();
1036 _decrement_topology_and_notify();
1037 return promise.get_future();
1076 bool run_now {
false};
1085 f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1086 tpg = &(f._topologies.back());
1087 future = tpg->_promise.get_future();
1089 if(f._topologies.size() == 1) {
1099 _set_up_topology(tpg);
1100 _schedule(tpg->_sources);
1107 inline void Executor::_increment_topology() {
1113 inline void Executor::_decrement_topology_and_notify() {
1115 if(--_num_topologies == 0) {
1116 _topology_cv.notify_all();
1121 inline void Executor::_decrement_topology() {
1129 _topology_cv.wait(lock, [&](){
return _num_topologies == 0; });
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:260
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:904
void remove_observer()
removes the associated observer
Definition: executor.hpp:544
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:927
~Executor()
destructs the executor
Definition: executor.hpp:234
T hardware_concurrency(T... args)
Lock-free unbounded single-producer multiple-consumer queue.
Definition: tsq.hpp:27
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: tsq.hpp:155
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:535
the class to create a task dependency graph
Definition: taskflow.hpp:18
void push(T item)
inserts an item to the queue
Definition: tsq.hpp:171
bool empty() const
queries the emptiness of the taskflow
Definition: taskflow.hpp:132
T steal()
steals an item from the queue
Definition: tsq.hpp:221
The executor class to run a taskflow graph.
Definition: executor.hpp:33
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:249
Executor(unsigned n=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition: executor.hpp:221
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:358
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:915
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1127