22 #include "topology.hpp" 37 template <
template <
typename...>
typename E>
40 using StaticWork =
typename Node::StaticWork;
41 using DynamicWork =
typename Node::DynamicWork;
47 Closure(
const Closure&) =
default;
50 Closure& operator = (
const Closure&) =
default;
52 void operator ()()
const;
109 template <
typename C>
122 template <
typename C>
237 template<
typename P,
typename C>
248 void _schedule(Node&);
249 void _schedule(PassiveVector<Node*>&);
257 template <
template <
typename...>
typename E>
259 return run_n(f, 1, [](){});
263 template <
template <
typename...>
typename E>
264 template <
typename C>
266 static_assert(std::is_invocable<C>::value);
267 return run_n(f, 1, std::forward<C>(c));
271 template <
template <
typename...>
typename E>
273 return run_n(f, repeat, [](){});
277 template <
template <
typename...>
typename E>
278 template <
typename C>
280 return run_until(f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c));
285 template <
template <
typename...>
typename E>
286 template <
typename P>
288 return run_until(f, std::forward<P>(predicate), [](){});
292 template <
template <
typename...>
typename E>
293 template <
typename P,
typename C>
297 static_assert(std::is_invocable_v<C> && std::is_same_v<
bool, std::invoke_result_t<P>>);
299 if(std::invoke(predicate)) {
300 return std::async(std::launch::deferred, [](){}).share();
304 auto &tpg = _topologies.emplace_back(f, std::forward<P>(predicate));
307 if(num_workers() == 0) {
313 _schedule(tpg._sources);
314 tpg._recover_num_sinks();
315 }
while(!std::invoke(tpg._predicate));
318 tpg._promise.set_value();
324 std::scoped_lock lock(f._mtx);
326 f._topologies.push_back(&tpg);
328 bool run_now = (f._topologies.size() == 1);
334 tpg._work = [&f, c=std::forward<C>(c),
this] ()
mutable {
337 if(!std::invoke(f._topologies.front()->_predicate)) {
338 f._topologies.front()->_recover_num_sinks();
339 _schedule(f._topologies.front()->_sources);
348 if(f._topologies.size() > 1) {
351 f._topologies.front()->_promise.set_value();
352 f._topologies.pop_front();
353 f._topologies.front()->_bind(f._graph);
355 _schedule(f._topologies.front()->_sources);
358 assert(f._topologies.size() == 1);
361 auto &p = f._topologies.front()->_promise;
362 f._topologies.pop_front();
372 _schedule(tpg._sources);
452 template <
template <
typename...>
typename E>
454 taskflow{&t}, node {&n} {
458 template <
template <
typename...>
typename E>
459 void BasicTaskflow<E>::Closure::operator () ()
const {
463 const auto num_successors = node->num_successors();
467 if(
auto index=node->_work.index(); index == 0) {
468 if(
auto &f = std::get<StaticWork>(node->_work); f !=
nullptr){
476 if(!node->is_spawned()) {
477 node->_subgraph.emplace();
480 SubflowBuilder fb(*(node->_subgraph));
482 std::invoke(std::get<DynamicWork>(node->_work), fb);
485 if(!node->is_spawned()) {
487 if(!node->_subgraph->empty()) {
489 PassiveVector<Node*> src;
490 for(
auto& n : *(node->_subgraph)) {
491 n._topology = node->_topology;
493 if(n.num_successors() == 0) {
495 node->_topology->_num_sinks ++;
501 if(n.num_dependents() == 0) {
506 taskflow->_schedule(src);
519 if(!node->is_subtask()) {
522 if(node->_work.index() == 1 && !node->_subgraph->empty()) {
523 while(!node->_dependents.empty() && node->_dependents.back()->is_subtask()) {
524 node->_dependents.pop_back();
527 node->_num_dependents = node->_dependents.size();
528 node->clear_status();
532 for(
size_t i=0; i<num_successors; ++i) {
533 if(--(node->_successors[i]->_num_dependents) == 0) {
534 taskflow->_schedule(*(node->_successors[i]));
539 if(num_successors == 0) {
540 if(--(node->_topology->_num_sinks) == 0) {
543 bool is_framework = node->_topology->_handle.index() == 1;
544 if(node->_topology->_work !=
nullptr) {
545 std::invoke(node->_topology->_work);
548 node->_topology->_promise.set_value();
559 template <
template <
typename...>
typename E>
566 template <
template <
typename...>
typename E>
569 _executor {std::make_shared<Executor>(N)} {
573 template <
template <
typename...>
typename E>
576 _executor {std::move(e)} {
578 if(_executor ==
nullptr) {
579 TF_THROW(Error::EXECUTOR,
580 "failed to construct taskflow (executor cannot be null)" 586 template <
template <
typename...>
typename E>
588 wait_for_topologies();
592 template <
template <
typename...>
typename E>
594 return _graph.size();
598 template <
template <
typename...>
typename E>
600 return _executor->num_workers();
604 template <
template <
typename...>
typename E>
606 return _topologies.size();
610 template <
template <
typename...>
typename E>
616 template <
template <
typename...>
typename E>
619 if(_graph.empty())
return;
621 auto& topology = _topologies.emplace_back(std::move(_graph));
623 _schedule(topology._sources);
628 template <
template <
typename...>
typename E>
629 template <
typename C>
637 auto& topology = _topologies.emplace_back(std::move(_graph), std::forward<C>(c));
639 _schedule(topology._sources);
643 template <
template <
typename...>
typename E>
647 return std::async(std::launch::deferred, [](){}).share();
650 auto& topology = _topologies.emplace_back(std::move(_graph));
652 _schedule(topology._sources);
654 return topology._future;
659 template <
template <
typename...>
typename E>
660 template <
typename C>
665 return std::async(std::launch::deferred, [](){}).share();
668 auto& topology = _topologies.emplace_back(std::move(_graph), std::forward<C>(c));
670 _schedule(topology._sources);
672 return topology._future;
676 template <
template <
typename...>
typename E>
678 if(!_graph.empty()) {
681 wait_for_topologies();
685 template <
template <
typename...>
typename E>
687 for(
auto& t: _topologies){
696 template <
template <
typename...>
typename E>
698 _executor->
emplace(*
this, node);
705 template <
template <
typename...>
typename E>
706 void BasicTaskflow<E>::_schedule(PassiveVector<Node*>& nodes) {
708 closures.reserve(nodes.size());
709 for(
auto src : nodes) {
710 closures.emplace_back(*
this, *src);
712 _executor->batch(std::move(closures));
716 template <
template <
typename...>
typename E>
721 for(
const auto& tpg : _topologies) {
729 template <
template <
typename...>
typename E>
731 for(
const auto& tpg : _topologies) {
737 template <
template <
typename...>
typename E>
740 os <<
"digraph Taskflow {\n";
742 for(
const auto& node : _graph) {
752 template <
template <
typename...>
typename E>
std::shared_future< void > run_until(Framework &framework, P &&predicate)
runs the framework multiple times until the predicate becomes true and invoke a callback ...
Definition: basic_taskflow.hpp:287
The base class to derive a taskflow class.
Definition: basic_taskflow.hpp:38
std::shared_ptr< Executor > share_executor()
shares ownership of the executor associated with this taskflow object
Definition: basic_taskflow.hpp:611
size_t num_nodes() const
queries the number of nodes in the present task dependency graph
Definition: basic_taskflow.hpp:593
std::shared_future< void > dispatch()
dispatches the present graph to threads and returns immediately
Definition: basic_taskflow.hpp:644
Definition: taskflow.hpp:6
void silent_dispatch()
dispatches the present graph to threads and returns immediately
Definition: basic_taskflow.hpp:617
std::shared_future< void > run_n(Framework &framework, size_t N)
runs the framework for N times
Definition: basic_taskflow.hpp:272
T hardware_concurrency(T... args)
BasicTaskflow()
constructs the taskflow with std::thread::hardware_concurrency worker threads
Definition: basic_taskflow.hpp:560
A reusable task dependency graph.
Definition: framework.hpp:17
size_t num_topologies() const
queries the number of existing topologies
Definition: basic_taskflow.hpp:605
auto emplace(C &&callable)
creates a task from a given callable object
Definition: flow_builder.hpp:794
void wait_for_topologies()
blocks until all running topologies complete and then cleans up all associated storages ...
Definition: basic_taskflow.hpp:686
std::shared_future< void > run(Framework &framework)
runs the framework once
Definition: basic_taskflow.hpp:258
void wait_for_all()
dispatches the present graph to threads and wait for all topologies to complete
Definition: basic_taskflow.hpp:677
The building blocks of task dependency graphs.
Definition: flow_builder.hpp:13
size_t num_workers() const
queries the number of worker threads in the associated executor
Definition: basic_taskflow.hpp:599
E< Closure > Executor
alias of executor type
Definition: basic_taskflow.hpp:65
~BasicTaskflow()
destructs the taskflow
Definition: basic_taskflow.hpp:587
std::string dump() const
dumps the present task dependency graph in DOT format to a std::string
Definition: basic_taskflow.hpp:753
std::string dump_topologies() const
dumps the existing topologies in DOT format to a std::string
Definition: basic_taskflow.hpp:717