18 #include "notifier.hpp" 19 #include "observer.hpp" 20 #include "taskflow.hpp" 38 Notifier::Waiter* waiter;
40 TaskQueue<Node*> wsq[NUM_DOMAINS];
41 Node* cache {
nullptr};
45 Worker* worker {
nullptr};
63 unsigned M = cuda_num_devices()
141 template<
typename P,
typename C>
191 template<
typename Observer,
typename... Args>
206 size_t _num_topologies {0};
212 Notifier _notifier[NUM_DOMAINS];
214 TaskQueue<Node*> _wsq[NUM_DOMAINS];
222 unsigned _find_victim(
unsigned);
224 PerThread& _per_thread()
const;
226 bool _wait_for_task(Worker&, Node*&);
228 void _spawn(
unsigned, Domain);
229 void _worker_loop(Worker&);
230 void _exploit_task(Worker&, Node*&);
231 void _explore_task(Worker&, Node*&);
232 void _schedule(Node*,
bool);
233 void _schedule(PassiveVector<Node*>&);
234 void _invoke(Worker&, Node*);
235 void _invoke_static_work(Worker&, Node*);
236 void _invoke_dynamic_work(Worker&, Node*,
Subflow&);
237 void _invoke_condition_work(Worker&, Node*,
int&);
239 #ifdef TF_ENABLE_CUDA 240 void _invoke_cudaflow_work(Worker&, Node*);
241 void _invoke_cudaflow_work_impl(Worker&, Node*);
244 void _set_up_module_work(Node*,
bool&);
245 void _set_up_topology(Topology*);
246 void _tear_down_topology(Topology**);
247 void _increment_topology();
248 void _decrement_topology();
249 void _decrement_topology_and_notify();
253 #ifdef TF_ENABLE_CUDA 257 _notifier {Notifier(N), Notifier(M)} {
260 TF_THROW(
"no cpu workers to execute taskflows");
264 TF_THROW(
"no gpu workers to execute cudaflows");
267 for(
int i=0; i<NUM_DOMAINS; ++i) {
268 _num_actives[i].store(0, std::memory_order_relaxed);
269 _num_thieves[i].store(0, std::memory_order_relaxed);
280 _notifier {Notifier(N)} {
283 TF_THROW(
"no cpu workers to execute taskflows");
286 for(
int i=0; i<NUM_DOMAINS; ++i) {
287 _num_actives[i].store(0, std::memory_order_relaxed);
288 _num_thieves[i].store(0, std::memory_order_relaxed);
304 for(
int i=0; i<NUM_DOMAINS; ++i) {
305 _notifier[i].notify(
true);
308 for(
auto& t : _threads){
315 return _workers.size();
325 return _num_topologies;
329 inline Executor::PerThread& Executor::_per_thread()
const {
330 thread_local PerThread pt;
336 auto worker = _per_thread().worker;
337 return worker ?
static_cast<int>(worker->id) : -1;
341 inline void Executor::_spawn(
unsigned N, Domain d) {
343 auto id = _threads.size();
345 for(
unsigned i=0; i<N; ++i, ++id) {
347 _workers[id].id = id;
348 _workers[id].domain = d;
349 _workers[id].executor =
this;
350 _workers[id].waiter = &_notifier[d]._waiters[i];
352 _threads.emplace_back([
this] (Worker& w) ->
void {
354 PerThread& pt = _per_thread();
366 if(_wait_for_task(w, t) ==
false) {
377 inline void Executor::_explore_task(Worker& thief, Node*& t) {
382 const auto d = thief.domain;
384 const unsigned l = 0;
385 const unsigned r =
static_cast<unsigned>(_workers.size()) - 1;
387 const size_t F = (_workers.size() + 1) << 1;
388 const size_t Y = 100;
398 t = (vtm == thief.id) ? _wsq[d].steal() : _workers[vtm].wsq[d].steal();
415 inline void Executor::_exploit_task(Worker& w, Node*& t) {
421 const auto d = w.domain;
423 if(_num_actives[d].fetch_add(1) == 0 && _num_thieves[d] == 0) {
424 _notifier[d].notify(
false);
427 auto tpg = t->_topology;
428 auto par = t->_parent;
429 auto exe =
size_t{1};
443 if(t->_parent == par) {
450 t->_topology->_join_counter.fetch_sub(exe);
453 auto ret = par->_join_counter.fetch_sub(exe);
455 if(par->domain() == d) {
459 _schedule(par,
false);
470 if(tpg->_join_counter.fetch_sub(exe) == exe) {
472 _tear_down_topology(&tpg);
482 if(par->_join_counter.fetch_sub(exe) == exe) {
483 if(par->domain() == d) {
489 _schedule(par,
false);
502 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
504 const auto d = worker.domain;
514 _explore_task(worker, t);
517 if(_num_thieves[d].fetch_sub(1) == 1) {
518 _notifier[d].notify(
false);
523 _notifier[d].prepare_wait(worker.waiter);
526 if(!_wsq[d].empty()) {
528 _notifier[d].cancel_wait(worker.waiter);
533 if(_num_thieves[d].fetch_sub(1) == 1) {
534 _notifier[d].notify(
false);
544 _notifier[d].cancel_wait(worker.waiter);
545 for(
int i=0; i<NUM_DOMAINS; ++i) {
546 _notifier[i].notify(
true);
552 if(_num_thieves[d].fetch_sub(1) == 1) {
553 if(_num_actives[d]) {
554 _notifier[d].cancel_wait(worker.waiter);
558 for(
auto& w : _workers) {
559 if(!w.wsq[d].empty()) {
560 _notifier[d].cancel_wait(worker.waiter);
567 _notifier[d].commit_wait(worker.waiter);
573 template<
typename Observer,
typename... Args>
576 auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
577 tmp->set_up(_workers.size());
578 _observer = std::move(tmp);
579 return static_cast<Observer*
>(_observer.get());
590 inline void Executor::_schedule(Node* node,
bool bypass_hint) {
594 const auto d = node->domain();
597 auto worker = _per_thread().worker;
599 if(worker !=
nullptr && worker->executor ==
this) {
601 assert(!worker->cache);
602 worker->cache = node;
605 worker->wsq[d].push(node);
606 if(worker->domain != d) {
607 if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
608 _notifier[d].notify(
false);
621 _notifier[d].notify(
false);
627 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
633 const auto num_nodes = nodes.size();
640 auto worker = _per_thread().worker;
643 size_t tcount[NUM_DOMAINS] = {0};
645 if(worker !=
nullptr && worker->executor ==
this) {
646 for(
size_t i=0; i<num_nodes; ++i) {
647 const auto d = nodes[i]->domain();
648 worker->wsq[d].push(nodes[i]);
652 for(
int d=0; d<NUM_DOMAINS; ++d) {
653 if(tcount[d] && d != worker->domain) {
654 if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
655 _notifier[d].notify_n(tcount[d]);
666 for(
size_t k=0; k<num_nodes; ++k) {
667 const auto d = nodes[k]->domain();
668 _wsq[d].push(nodes[k]);
673 for(
int d=0; d<NUM_DOMAINS; ++d) {
674 _notifier[d].notify_n(tcount[d]);
680 inline void Executor::_invoke(Worker& worker, Node* node) {
686 const auto num_successors = node->num_successors();
689 auto& c = (node->_parent) ? node->_parent->_join_counter :
690 node->_topology->_join_counter;
693 switch(node->_handle.index()) {
695 case Node::STATIC_WORK:{
696 _invoke_static_work(worker, node);
700 case Node::MODULE_WORK: {
701 bool first_time = !node->_has_state(Node::SPAWNED);
702 bool emptiness =
false;
703 _set_up_module_work(node, emptiness);
704 if(first_time && !emptiness) {
710 case Node::DYNAMIC_WORK: {
713 if(!node->_has_state(Node::SPAWNED)) {
715 auto& subgraph = nstd::get<Node::DynamicWork>(node->_handle).subgraph;
718 Subflow fb(subgraph);
720 _invoke_dynamic_work(worker, node, fb);
722 node->_set_state(Node::SPAWNED);
724 if(!subgraph.empty()) {
726 PassiveVector<Node*> src;
728 for(
auto n : subgraph._nodes) {
730 n->_topology = node->_topology;
731 n->_set_up_join_counter();
737 if(n->num_dependents() == 0) {
742 const bool join = fb.joined();
745 node->_topology->_join_counter.fetch_add(src.size());
749 node->_join_counter.fetch_add(src.size());
765 case Node::CONDITION_WORK: {
767 if(node->_has_state(Node::BRANCH)) {
768 node->_join_counter = node->num_strong_dependents();
771 node->_join_counter = node->num_dependents();
775 _invoke_condition_work(worker, node,
id);
777 if(
id >= 0 && static_cast<size_t>(
id) < num_successors) {
778 auto s = node->_successors[id];
779 s->_join_counter.store(0);
781 if(s->domain() == worker.domain) {
793 #ifdef TF_ENABLE_CUDA 794 case Node::CUDAFLOW_WORK: {
795 _invoke_cudaflow_work(worker, node);
810 if(node->_has_state(Node::BRANCH)) {
812 node->_join_counter = node->num_strong_dependents();
815 node->_join_counter = node->num_dependents();
818 node->_unset_state(Node::SPAWNED);
821 Node* cache {
nullptr};
823 for(
size_t i=0; i<num_successors; ++i) {
824 if(--(node->_successors[i]->_join_counter) == 0) {
825 if(node->_successors[i]->domain() != worker.domain) {
827 _schedule(node->_successors[i],
false);
832 _schedule(cache,
false);
834 cache = node->_successors[i];
840 _schedule(cache,
true);
845 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
847 _observer->on_entry(worker.id, TaskView(node));
848 nstd::get<Node::StaticWork>(node->_handle).work();
849 _observer->on_exit(worker.id, TaskView(node));
852 nstd::get<Node::StaticWork>(node->_handle).work();
857 inline void Executor::_invoke_dynamic_work(Worker& worker, Node* node, Subflow& sf) {
859 _observer->on_entry(worker.id, TaskView(node));
860 nstd::get<Node::DynamicWork>(node->_handle).work(sf);
861 _observer->on_exit(worker.id, TaskView(node));
864 nstd::get<Node::DynamicWork>(node->_handle).work(sf);
869 inline void Executor::_invoke_condition_work(Worker& worker, Node* node,
int&
id) {
871 _observer->on_entry(worker.id, TaskView(node));
872 id = nstd::get<Node::ConditionWork>(node->_handle).work();
873 _observer->on_exit(worker.id, TaskView(node));
876 id = nstd::get<Node::ConditionWork>(node->_handle).work();
880 #ifdef TF_ENABLE_CUDA 882 inline void Executor::_invoke_cudaflow_work(Worker& worker, Node* node) {
884 assert(worker.domain == node->domain());
887 _observer->on_entry(worker.id, TaskView(node));
888 _invoke_cudaflow_work_impl(worker, node);
889 _observer->on_exit(worker.id, TaskView(node));
892 _invoke_cudaflow_work_impl(worker, node);
897 inline void Executor::_invoke_cudaflow_work_impl(Worker&, Node* node) {
899 auto& h = nstd::get<Node::cudaFlowWork>(node->_handle);
903 cudaFlow cf(h.graph);
907 h.graph._make_native_graph(cf._device);
909 cudaGraphExec_t exec;
911 cudaGraphInstantiate(&exec, h.graph._native_handle,
nullptr,
nullptr, 0),
912 "failed to create an executable cudaGraph" 914 TF_CHECK_CUDA(cudaGraphLaunch(exec, cf._stream),
"failed to launch cudaGraph")
915 TF_CHECK_CUDA(cudaStreamSynchronize(cf._stream), "failed to sync cudaStream");
917 cudaGraphExecDestroy(exec), "failed to destroy an executable cudaGraph"
923 inline void Executor::_set_up_module_work(Node* node,
bool& ept) {
926 if(node->_has_state(Node::SPAWNED)) {
931 node->_set_state(Node::SPAWNED);
933 auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
935 if(module->empty()) {
940 PassiveVector<Node*> src;
942 for(
auto n: module->_graph._nodes) {
944 n->_topology = node->_topology;
946 n->_set_up_join_counter();
948 if(n->num_dependents() == 0) {
953 node->_join_counter.fetch_add(src.size());
955 if(node->_parent ==
nullptr) {
956 node->_topology->_join_counter.fetch_add(1);
959 node->_parent->_join_counter.fetch_add(1);
968 return run_n(f, 1, [](){});
972 template <
typename C>
974 return run_n(f, 1, std::forward<C>(c));
979 return run_n(f, repeat, [](){});
983 template <
typename C>
985 return run_until(f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c));
991 return run_until(f, std::forward<P>(pred), [](){});
995 inline void Executor::_set_up_topology(Topology* tpg) {
997 tpg->_sources.clear();
1000 for(
auto node : tpg->_taskflow._graph._nodes) {
1002 node->_topology = tpg;
1003 node->_clear_state();
1005 if(node->num_dependents() == 0) {
1006 tpg->_sources.push_back(node);
1009 node->_set_up_join_counter();
1012 tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
1016 inline void Executor::_tear_down_topology(Topology** tpg) {
1018 auto &f = (*tpg)->_taskflow;
1023 if(! (*tpg)->_pred() ) {
1026 assert((*tpg)->_join_counter == 0);
1027 (*tpg)->_join_counter = (*tpg)->_sources.size();
1029 _schedule((*tpg)->_sources);
1034 if((*tpg)->_call !=
nullptr) {
1041 if(f._topologies.size() > 1) {
1043 assert((*tpg)->_join_counter == 0);
1046 (*tpg)->_promise.set_value();
1047 f._topologies.pop_front();
1051 _decrement_topology();
1053 *tpg = &(f._topologies.front());
1055 _set_up_topology(*tpg);
1056 _schedule((*tpg)->_sources);
1068 assert(f._topologies.size() == 1);
1074 f._topologies.pop_front();
1081 _decrement_topology_and_notify();
1090 template <
typename P,
typename C>
1093 _increment_topology();
1096 if(f.
empty() || pred()) {
1098 promise.set_value();
1099 _decrement_topology_and_notify();
1100 return promise.get_future();
1104 bool run_now {
false};
1113 f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1114 tpg = &(f._topologies.back());
1115 future = tpg->_promise.get_future();
1117 if(f._topologies.size() == 1) {
1127 _set_up_topology(tpg);
1128 _schedule(tpg->_sources);
1135 inline void Executor::_increment_topology() {
1141 inline void Executor::_decrement_topology_and_notify() {
1143 if(--_num_topologies == 0) {
1144 _topology_cv.notify_all();
1149 inline void Executor::_decrement_topology() {
1157 _topology_cv.wait(lock, [&](){
return _num_topologies == 0; });
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:335
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:967
void remove_observer()
removes the associated observer
Definition: executor.hpp:583
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:990
~Executor()
destructs the executor
Definition: executor.hpp:296
T hardware_concurrency(T... args)
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:574
main entry to create a task dependency graph
Definition: taskflow.hpp:18
bool empty() const
queries the emptiness of the taskflow
Definition: taskflow.hpp:132
size_t num_domains() const
queries the number of worker domains
Definition: executor.hpp:319
Executor(unsigned N=std::thread::hardware_concurrency(), unsigned M=cuda_num_devices())
constructs the executor with N/M cpu/gpu worker threads
Definition: executor.hpp:255
execution interface for running a taskflow graph
Definition: executor.hpp:32
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:314
size_t num_topologies() const
queries the number of running topologies at the time of this call
Definition: executor.hpp:324
building methods of a subflow graph in dynamic tasking
Definition: flow_builder.hpp:860
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:978
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1155