58 #include "spmc_queue.hpp" 59 #include "notifier.hpp" 60 #include "observer.hpp" 61 #include "taskflow.hpp" 161 template<
typename P,
typename C>
189 template<
typename Observer,
typename... Args>
202 unsigned _num_topologies {0};
220 unsigned _find_victim(
unsigned);
222 PerThread& _per_thread()
const;
224 bool _wait_for_tasks(
unsigned, std::optional<Node*>&);
226 void _spawn(
unsigned);
227 void _exploit_task(
unsigned, std::optional<Node*>&);
228 void _explore_task(
unsigned, std::optional<Node*>&);
229 void _schedule(Node*);
230 void _schedule(PassiveVector<Node*>&);
231 void _invoke(
unsigned, Node*);
232 void _invoke_static_work(
unsigned, Node*);
233 void _invoke_dynamic_work(
unsigned, Node*,
Subflow&);
234 void _init_module_node(Node*);
235 void _tear_down_topology(Topology*);
242 _notifier {_waiters} {
254 _notifier.notify(
true);
256 for(
auto& t : _threads){
263 return _workers.size();
267 inline Executor::PerThread& Executor::_per_thread()
const {
268 thread_local PerThread pt;
273 inline void Executor::_spawn(
unsigned N) {
276 for(
unsigned i=0; i<N; ++i) {
277 _threads.emplace_back([
this, i] () ->
void {
279 PerThread& pt = _per_thread();
283 std::optional<Node*> t;
293 if(_explore_task(i, t); t) {
298 if(_wait_for_tasks(i, t) ==
false) {
308 inline unsigned Executor::_find_victim(
unsigned thief) {
311 unsigned r = _workers.size() - 1;
313 _workers[thief].rdgen
317 for(
unsigned i=0; i<_workers.size(); ++i){
319 if((thief == vtm && !_queue.
empty()) ||
320 (thief != vtm && !_workers[vtm].queue.empty())) {
324 if(++vtm; vtm == _workers.size()) {
337 return _workers.size();
341 inline void Executor::_explore_task(
unsigned thief, std::optional<Node*>& t) {
349 const size_t F = (_workers.size() + 1);
350 const size_t Y = 100;
378 if(
auto vtm = _find_victim(thief); vtm != _workers.size()) {
379 t = (vtm == thief) ? _queue.
steal() : _workers[vtm].queue.steal();
396 if(
auto N = --_num_thieves; N == 0) {
397 if(t != std::nullopt) {
398 _notifier.notify(
false);
401 else if(_num_actives > 0) {
408 inline void Executor::_exploit_task(
unsigned i, std::optional<Node*>& t) {
412 auto& worker = _workers[i];
414 if(++_num_actives; _num_thieves == 0) {
415 _notifier.notify(
false);
420 t = worker.queue.pop();
428 inline bool Executor::_wait_for_tasks(
unsigned me, std::optional<Node*>& t) {
432 _notifier.prepare_wait(&_waiters[me]);
434 if(
auto vtm = _find_victim(me); vtm != _workers.size()) {
435 _notifier.cancel_wait(&_waiters[me]);
436 t = (vtm == me) ? _queue.
steal() : _workers[vtm].queue.steal();
451 _notifier.cancel_wait(&_waiters[me]);
452 _notifier.notify(
true);
457 _notifier.commit_wait(&_waiters[me]);
464 template<
typename Observer,
typename... Args>
467 auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
468 tmp->set_up(std::max(
size_t{1}, _workers.size()));
469 _observer = std::move(tmp);
470 return static_cast<Observer*
>(_observer.get());
481 inline void Executor::_schedule(Node* node) {
484 if(node->_module !=
nullptr && !node->is_spawned()) {
485 _init_module_node(node);
489 if(_workers.size() == 0){
495 if(
auto& pt = _per_thread(); pt.pool ==
this) {
496 _workers[pt.worker_id].queue.push(node);
503 _notifier.notify(
false);
509 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
513 const auto num_nodes = nodes.size();
519 for(
auto node : nodes) {
520 if(node->_module !=
nullptr && !node->is_spawned()) {
521 _init_module_node(node);
526 if(_workers.size() == 0){
527 for(
auto node: nodes){
534 if(
auto& pt = _per_thread(); pt.pool ==
this) {
535 for(
size_t i=0; i<num_nodes; ++i) {
536 _workers[pt.worker_id].queue.push(nodes[i]);
542 for(
size_t k=0; k<num_nodes; ++k) {
543 _queue.
push(nodes[k]);
544 _notifier.notify(
false);
560 inline void Executor::_init_module_node(Node* node) {
562 node->_work = [node=node,
this, tgt{PassiveVector<Node*>()}] ()
mutable {
565 if(node->is_spawned()) {
566 node->_dependents.resize(node->_dependents.size()-tgt.size());
568 t->_successors.clear();
576 PassiveVector<Node*> src;
578 for(
auto n: node->_module->_graph.nodes()) {
579 n->_topology = node->_topology;
580 if(n->num_dependents() == 0) {
583 if(n->num_successors() == 0) {
594 inline void Executor::_invoke(
unsigned me, Node* node) {
598 const auto num_successors = node->num_successors();
602 if(
auto index=node->_work.index(); index == 0) {
603 if(node->_module !=
nullptr) {
604 bool first_time = !node->is_spawned();
605 _invoke_static_work(me, node);
611 if(
auto &f = std::get<Node::StaticWork>(node->_work); f !=
nullptr){
612 _invoke_static_work(me, node);
620 if(!node->is_spawned()) {
621 node->_subgraph.emplace();
624 Subflow fb(*(node->_subgraph));
626 _invoke_dynamic_work(me, node, fb);
629 if(!node->is_spawned()) {
631 if(!node->_subgraph->empty()) {
633 PassiveVector<Node*> src;
634 for(
auto n: node->_subgraph->nodes()) {
635 n->_topology = node->_topology;
637 if(n->num_successors() == 0) {
639 node->_topology->_num_sinks ++;
645 if(n->num_dependents() == 0) {
663 if(!node->is_subtask()) {
666 if(node->_work.index() == 1 && !node->_subgraph->empty()) {
667 while(!node->_dependents.empty() && node->_dependents.back()->is_subtask()) {
668 node->_dependents.pop_back();
671 node->_num_dependents =
static_cast<int>(node->_dependents.size());
672 node->unset_spawned();
676 for(
size_t i=0; i<num_successors; ++i) {
677 if(--(node->_successors[i]->_num_dependents) == 0) {
678 _schedule(node->_successors[i]);
683 if(num_successors == 0) {
684 if(--(node->_topology->_num_sinks) == 0) {
685 if(_workers.size() > 0) {
686 _tear_down_topology(node->_topology);
693 inline void Executor::_invoke_static_work(
unsigned me, Node* node) {
695 _observer->on_entry(me,
TaskView(node));
696 std::invoke(std::get<Node::StaticWork>(node->_work));
697 _observer->on_exit(me,
TaskView(node));
700 std::invoke(std::get<Node::StaticWork>(node->_work));
705 inline void Executor::_invoke_dynamic_work(
unsigned me, Node* node,
Subflow& sf) {
707 _observer->on_entry(me,
TaskView(node));
708 std::invoke(std::get<Node::DynamicWork>(node->_work), sf);
709 _observer->on_exit(me,
TaskView(node));
712 std::invoke(std::get<Node::DynamicWork>(node->_work), sf);
718 return run_n(f, 1, [](){});
722 template <
typename C>
724 static_assert(std::is_invocable<C>::value);
725 return run_n(f, 1, std::forward<C>(c));
730 return run_n(f, repeat, [](){});
734 template <
typename C>
736 return run_until(f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c));
742 return run_until(f, std::forward<P>(pred), [](){});
746 inline void Executor::_tear_down_topology(Topology* tpg) {
748 auto &f = tpg->_taskflow;
753 if(!std::invoke(tpg->_pred)) {
754 tpg->_recover_num_sinks();
755 _schedule(tpg->_sources);
760 if(tpg->_call !=
nullptr) {
761 std::invoke(tpg->_call);
767 if(f._topologies.size() > 1) {
770 tpg->_promise.set_value();
771 f._topologies.pop_front();
775 std::scoped_lock lock(_topology_mutex);
779 f._topologies.front()._bind(f._graph);
780 _schedule(f._topologies.front()._sources);
783 assert(f._topologies.size() == 1);
786 auto p {std::move(tpg->_promise)};
788 f._topologies.pop_front();
795 std::scoped_lock lock(_topology_mutex);
798 _topology_cv.notify_one();
804 template <
typename P,
typename C>
808 static_assert(std::is_invocable_v<C> && std::is_invocable_v<P>);
810 if(std::invoke(pred)) {
811 return std::async(std::launch::deferred, [](){});
817 if(_workers.size() == 0) {
819 Topology tpg(f, std::forward<P>(pred), std::forward<C>(c));
825 _schedule(tpg._sources);
826 tpg._recover_num_sinks();
827 }
while(!std::invoke(tpg._pred));
829 if(tpg._call !=
nullptr) {
830 std::invoke(tpg._call);
833 return std::async(std::launch::deferred, [](){});
838 std::scoped_lock lock(_topology_mutex);
843 bool run_now {
false};
848 std::scoped_lock lock(f._mtx);
851 tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
852 future = tpg->_promise.get_future();
854 if(f._topologies.size() == 1) {
864 tpg->_bind(f._graph);
865 _schedule(tpg->_sources);
874 _topology_cv.wait(lock, [&](){
return _num_topologies == 0; });
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:717
void remove_observer()
removes the associated observer
Definition: executor.hpp:474
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: spmc_queue.hpp:182
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:741
~Executor()
destructs the executor
Definition: executor.hpp:247
void push(O &&item)
inserts an item to the queue
Definition: spmc_queue.hpp:199
Definition: taskflow.hpp:5
T hardware_concurrency(T... args)
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:783
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:465
the class to create a task dependency graph
Definition: core/taskflow.hpp:15
A constant wrapper class to a task node, mainly used in the tf::ExecutorObserver interface.
Definition: task.hpp:300
Lock-free unbounded single-producer multiple-consumer queue.
Definition: spmc_queue.hpp:29
The executor class to run a taskflow graph.
Definition: executor.hpp:73
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:262
std::optional< T > steal()
steals an item from the queue
Definition: spmc_queue.hpp:269
Executor(unsigned n=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition: executor.hpp:239
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:731
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:729
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:872