Cpp-Taskflow  2.1.0
basic_taskflow.hpp
1 // 2019/02/11 - modified by Tsung-Wei Huang
2 // - refactored run_until
3 // - added allocator to topologies
4 // - changed to list for topologies
5 //
6 // 2019/02/10 - modified by Chun-Xun Lin
7 // - added run_n to execute framework
8 // - finished first peer-review with TW
9 //
10 // 2018/07 - 2019/02/09 - missing logs
11 //
12 // 2018/06/30 - created by Tsung-Wei Huang
13 // - added BasicTaskflow template
14 
15 // TODO items:
16 // 1. come up with a better way to remove the "joined" links
17 // during the execution of a static node (1st layer)
18 //
19 
20 #pragma once
21 
22 #include "topology.hpp"
23 
24 namespace tf {
25 
37 template <template <typename...> typename E>
38 class BasicTaskflow : public FlowBuilder {
39 
40  using StaticWork = typename Node::StaticWork;
41  using DynamicWork = typename Node::DynamicWork;
42 
43  // Closure
44  struct Closure {
45 
46  Closure() = default;
47  Closure(const Closure&) = default;
48  Closure(BasicTaskflow&, Node&);
49 
50  Closure& operator = (const Closure&) = default;
51 
52  void operator ()() const;
53 
54  BasicTaskflow* taskflow {nullptr};
55  Node* node {nullptr};
56  };
57 
58  public:
59 
65  using Executor = E<Closure>;
66 
70  explicit BasicTaskflow();
71 
75  explicit BasicTaskflow(unsigned N);
76 
80  explicit BasicTaskflow(std::shared_ptr<Executor> executor);
81 
89 
96 
103 
109  template <typename C>
111 
115  void silent_dispatch();
116 
122  template <typename C>
123  void silent_dispatch(C&& callable);
124 
128  void wait_for_all();
129 
134  void wait_for_topologies();
135 
141  void dump(std::ostream& ostream) const;
142 
148  void dump_topologies(std::ostream& ostream) const;
149 
153  size_t num_nodes() const;
154 
158  size_t num_workers() const;
159 
163  size_t num_topologies() const;
164 
168  std::string dump() const;
169 
174 
183 
192  template<typename C>
193  std::shared_future<void> run(Framework& framework, C&& callable);
194 
203  std::shared_future<void> run_n(Framework& framework, size_t N);
204 
214  template<typename C>
215  std::shared_future<void> run_n(Framework& framework, size_t N, C&& callable);
216 
225  template<typename P>
226  std::shared_future<void> run_until(Framework& framework, P&& predicate);
227 
237  template<typename P, typename C>
238  std::shared_future<void> run_until(Framework& framework, P&& predicate, C&& callable);
239 
240  private:
241 
242  Graph _graph;
243 
244  std::shared_ptr<Executor> _executor;
245 
247 
248  void _schedule(Node&);
249  void _schedule(PassiveVector<Node*>&);
250 };
251 
252 // ============================================================================
253 // BasicTaskflow::Closure Method Definitions
254 // ============================================================================
255 
256 // Function: run
257 template <template <typename...> typename E>
259  return run_n(f, 1, [](){});
260 }
261 
262 // Function: run
263 template <template <typename...> typename E>
264 template <typename C>
266  static_assert(std::is_invocable<C>::value);
267  return run_n(f, 1, std::forward<C>(c));
268 }
269 
270 // Function: run_n
271 template <template <typename...> typename E>
273  return run_n(f, repeat, [](){});
274 }
275 
276 // Function: run_n
277 template <template <typename...> typename E>
278 template <typename C>
280  return run_until(f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c));
281 }
282 
283 
284 // Function: run_until
285 template <template <typename...> typename E>
286 template <typename P>
288  return run_until(f, std::forward<P>(predicate), [](){});
289 }
290 
291 // Function: run_until
292 template <template <typename...> typename E>
293 template <typename P, typename C>
295 
296  // Predicate must return a boolean value
297  static_assert(std::is_invocable_v<C> && std::is_same_v<bool, std::invoke_result_t<P>>);
298 
299  if(std::invoke(predicate)) {
300  return std::async(std::launch::deferred, [](){}).share();
301  }
302 
303  // create a topology for this run
304  auto &tpg = _topologies.emplace_back(f, std::forward<P>(predicate));
305 
306  // Iterative execution to avoid stack overflow
307  if(num_workers() == 0) {
308 
309  // Clear last execution data & Build precedence between nodes and target
310  tpg._bind(f._graph);
311 
312  do {
313  _schedule(tpg._sources);
314  tpg._recover_num_sinks();
315  } while(!std::invoke(tpg._predicate));
316 
317  std::invoke(c);
318  tpg._promise.set_value();
319 
320  return tpg._future;
321  }
322 
323  // Multi-threaded execution.
324  std::scoped_lock lock(f._mtx);
325 
326  f._topologies.push_back(&tpg);
327 
328  bool run_now = (f._topologies.size() == 1);
329 
330  if(run_now) {
331  tpg._bind(f._graph);
332  }
333 
334  tpg._work = [&f, c=std::forward<C>(c), this] () mutable {
335 
336  // case 1: we still need to run the topology again
337  if(!std::invoke(f._topologies.front()->_predicate)) {
338  f._topologies.front()->_recover_num_sinks();
339  _schedule(f._topologies.front()->_sources);
340  }
341  // case 2: the final run of this topology
342  else {
343  std::invoke(c);
344 
345  f._mtx.lock();
346 
347  // If there is another run (interleave between lock)
348  if(f._topologies.size() > 1) {
349 
350  // Set the promise
351  f._topologies.front()->_promise.set_value();
352  f._topologies.pop_front();
353  f._topologies.front()->_bind(f._graph);
354  f._mtx.unlock();
355  _schedule(f._topologies.front()->_sources);
356  }
357  else {
358  assert(f._topologies.size() == 1);
359  // Need to back up the promise first here becuz framework might be
360  // destroy before taskflow leaves
361  auto &p = f._topologies.front()->_promise;
362  f._topologies.pop_front();
363  f._mtx.unlock();
364 
365  // We set the promise in the end in case framework leaves before taskflow
366  p.set_value();
367  }
368  }
369  };
370 
371  if(run_now) {
372  _schedule(tpg._sources);
373  }
374 
375  // -----------------------------
376 
377 
378  /*// case 1: the previous execution is still running
379  if(f._topologies.size() > 1) {
380  tpg._work = std::forward<C>(c);
381  }
382  // case 2: this epoch should run
383  else {
384 
385  tpg._bind(f._graph);
386 
387  //Set up target node's work
388  tpg._work = [
389  &f,
390  c=std::function<void()>{std::forward<C>(c)},
391  num_sinks=tpg._num_sinks.load(std::memory_order_relaxed),
392  this
393  ] () mutable {
394 
395  // f._topologies.front is myself
396 
397  // case 1: we still need to run the topology again
398  if(!std::invoke(f._topologies.front()->_predicate)) {
399  f._topologies.front()->_num_sinks = num_sinks;
400  _schedule(f._topologies.front()->_sources);
401  }
402  // case 2: the final run of this topology
403  // notice that there can be another new run request before we acquire the lock
404  else {
405  std::invoke(c);
406 
407  f._mtx.lock();
408 
409  // If there is another run
410  if(f._topologies.size() > 1) {
411 
412  // Set the promise
413  f._topologies.front()->_promise.set_value();
414 
415  // PV (2/10): why not just using the next_tpg other than moving all
416  // things to the previous one
417  auto next_tpg = std::next(f._topologies.begin());
418  c = std::move((*next_tpg)->_work);
419 
420  f._topologies.front()->_predicate = std::move((*next_tpg)->_predicate);
421  f._topologies.front()->_promise = std::move((*next_tpg)->_promise);
422  f._topologies.erase(next_tpg);
423 
424  f._mtx.unlock();
425  // The graph should be exactly the same as previous dispatch
426  f._topologies.front()->_num_sinks = num_sinks;
427 
428  _schedule(f._topologies.front()->_sources);
429  }
430  else {
431  assert(f._topologies.size() == 1);
432  // Need to back up the promise first here becuz framework might be
433  // destroy before taskflow leaves
434  auto &p = f._topologies.front()->_promise;
435  f._topologies.pop_front();
436  f._mtx.unlock();
437 
438  // We set the promise in the end in case framework leaves before taskflow
439  p.set_value();
440  }
441  }
442  }; // End of target's work ------------------------------------------------
443 
444  _schedule(tpg._sources);
445  } */
446 
447 
448  return tpg._future;
449 }
450 
451 // Constructor
452 template <template <typename...> typename E>
454  taskflow{&t}, node {&n} {
455 }
456 
457 // Operator ()
458 template <template <typename...> typename E>
459 void BasicTaskflow<E>::Closure::operator () () const {
460 
461  // Here we need to fetch the num_successors first to avoid the invalid memory
462  // access caused by topology clear.
463  const auto num_successors = node->num_successors();
464 
465  // regular node type
466  // The default node work type. We only need to execute the callback if any.
467  if(auto index=node->_work.index(); index == 0) {
468  if(auto &f = std::get<StaticWork>(node->_work); f != nullptr){
469  std::invoke(f);
470  }
471  }
472  // subflow node type
473  else {
474 
475  // Clear the subgraph before the task execution
476  if(!node->is_spawned()) {
477  node->_subgraph.emplace();
478  }
479 
480  SubflowBuilder fb(*(node->_subgraph));
481 
482  std::invoke(std::get<DynamicWork>(node->_work), fb);
483 
484  // Need to create a subflow if first time & subgraph is not empty
485  if(!node->is_spawned()) {
486  node->set_spawned();
487  if(!node->_subgraph->empty()) {
488  // For storing the source nodes
489  PassiveVector<Node*> src;
490  for(auto& n : *(node->_subgraph)) {
491  n._topology = node->_topology;
492  n.set_subtask();
493  if(n.num_successors() == 0) {
494  if(fb.detached()) {
495  node->_topology->_num_sinks ++;
496  }
497  else {
498  n.precede(*node);
499  }
500  }
501  if(n.num_dependents() == 0) {
502  src.push_back(&n);
503  }
504  }
505 
506  taskflow->_schedule(src);
507 
508  if(!fb.detached()) {
509  return;
510  }
511  }
512  }
513  } // End of DynamicWork -----------------------------------------------------
514 
515  // Recover the runtime change due to dynamic tasking except the target & spawn tasks
516  // This must be done before scheduling the successors, otherwise this might cause
517  // race condition on the _dependents
518  //if(num_successors && !node->_subtask) {
519  if(!node->is_subtask()) {
520  // Only dynamic tasking needs to restore _dependents
521  // TODO:
522  if(node->_work.index() == 1 && !node->_subgraph->empty()) {
523  while(!node->_dependents.empty() && node->_dependents.back()->is_subtask()) {
524  node->_dependents.pop_back();
525  }
526  }
527  node->_num_dependents = node->_dependents.size();
528  node->clear_status();
529  }
530 
531  // At this point, the node storage might be destructed.
532  for(size_t i=0; i<num_successors; ++i) {
533  if(--(node->_successors[i]->_num_dependents) == 0) {
534  taskflow->_schedule(*(node->_successors[i]));
535  }
536  }
537 
538  // A node without any successor should check the termination of topology
539  if(num_successors == 0) {
540  if(--(node->_topology->_num_sinks) == 0) {
541 
542  // This is the last executing node
543  bool is_framework = node->_topology->_handle.index() == 1;
544  if(node->_topology->_work != nullptr) {
545  std::invoke(node->_topology->_work);
546  }
547  if(!is_framework) {
548  node->_topology->_promise.set_value();
549  }
550  }
551  }
552 }
553 
554 // ============================================================================
555 // BasicTaskflow Method Definitions
556 // ============================================================================
557 
558 // Constructor
559 template <template <typename...> typename E>
561  FlowBuilder {_graph},
562  _executor {std::make_shared<Executor>(std::thread::hardware_concurrency())} {
563 }
564 
565 // Constructor
566 template <template <typename...> typename E>
568  FlowBuilder {_graph},
569  _executor {std::make_shared<Executor>(N)} {
570 }
571 
572 // Constructor
573 template <template <typename...> typename E>
575  FlowBuilder {_graph},
576  _executor {std::move(e)} {
577 
578  if(_executor == nullptr) {
579  TF_THROW(Error::EXECUTOR,
580  "failed to construct taskflow (executor cannot be null)"
581  );
582  }
583 }
584 
585 // Destructor
586 template <template <typename...> typename E>
588  wait_for_topologies();
589 }
590 
591 // Function: num_nodes
592 template <template <typename...> typename E>
594  return _graph.size();
595 }
596 
597 // Function: num_workers
598 template <template <typename...> typename E>
600  return _executor->num_workers();
601 }
602 
603 // Function: num_topologies
604 template <template <typename...> typename E>
606  return _topologies.size();
607 }
608 
609 // Function: share_executor
610 template <template <typename...> typename E>
612  return _executor;
613 }
614 
615 // Procedure: silent_dispatch
616 template <template <typename...> typename E>
618 
619  if(_graph.empty()) return;
620 
621  auto& topology = _topologies.emplace_back(std::move(_graph));
622 
623  _schedule(topology._sources);
624 }
625 
626 
627 // Procedure: silent_dispatch with registered callback
628 template <template <typename...> typename E>
629 template <typename C>
631 
632  if(_graph.empty()) {
633  c();
634  return;
635  }
636 
637  auto& topology = _topologies.emplace_back(std::move(_graph), std::forward<C>(c));
638 
639  _schedule(topology._sources);
640 }
641 
642 // Procedure: dispatch
643 template <template <typename...> typename E>
645 
646  if(_graph.empty()) {
647  return std::async(std::launch::deferred, [](){}).share();
648  }
649 
650  auto& topology = _topologies.emplace_back(std::move(_graph));
651 
652  _schedule(topology._sources);
653 
654  return topology._future;
655 }
656 
657 
658 // Procedure: dispatch with registered callback
659 template <template <typename...> typename E>
660 template <typename C>
662 
663  if(_graph.empty()) {
664  c();
665  return std::async(std::launch::deferred, [](){}).share();
666  }
667 
668  auto& topology = _topologies.emplace_back(std::move(_graph), std::forward<C>(c));
669 
670  _schedule(topology._sources);
671 
672  return topology._future;
673 }
674 
675 // Procedure: wait_for_all
676 template <template <typename...> typename E>
678  if(!_graph.empty()) {
679  silent_dispatch();
680  }
681  wait_for_topologies();
682 }
683 
684 // Procedure: wait_for_topologies
685 template <template <typename...> typename E>
687  for(auto& t: _topologies){
688  t._future.get();
689  }
690  _topologies.clear();
691 }
692 
693 // Procedure: _schedule
694 // The main procedure to schedule a give task node.
695 // Each task node has two types of tasks - regular and subflow.
696 template <template <typename...> typename E>
697 void BasicTaskflow<E>::_schedule(Node& node) {
698  _executor->emplace(*this, node);
699 }
700 
701 
702 // Procedure: _schedule
703 // The main procedure to schedule a set of task nodes.
704 // Each task node has two types of tasks - regular and subflow.
705 template <template <typename...> typename E>
706 void BasicTaskflow<E>::_schedule(PassiveVector<Node*>& nodes) {
707  std::vector<Closure> closures;
708  closures.reserve(nodes.size());
709  for(auto src : nodes) {
710  closures.emplace_back(*this, *src);
711  }
712  _executor->batch(std::move(closures));
713 }
714 
715 // Function: dump_topologies
716 template <template <typename...> typename E>
718 
720 
721  for(const auto& tpg : _topologies) {
722  tpg.dump(os);
723  }
724 
725  return os.str();
726 }
727 
728 // Function: dump_topologies
729 template <template <typename...> typename E>
731  for(const auto& tpg : _topologies) {
732  tpg.dump(os);
733  }
734 }
735 
736 // Function: dump
737 template <template <typename...> typename E>
739 
740  os << "digraph Taskflow {\n";
741 
742  for(const auto& node : _graph) {
743  node.dump(os);
744  }
745 
746  os << "}\n";
747 }
748 
749 // Function: dump
750 // Dumps the taskflow in graphviz.
751 // The result can be viewed at http://www.webgraphviz.com/.
752 template <template <typename...> typename E>
755  dump(os);
756  return os.str();
757 }
758 
759 } // end of namespace tf ----------------------------------------------------
760 
std::shared_future< void > run_until(Framework &framework, P &&predicate)
runs the framework multiple times until the predicate becomes true and invoke a callback ...
Definition: basic_taskflow.hpp:287
The base class to derive a taskflow class.
Definition: basic_taskflow.hpp:38
std::shared_ptr< Executor > share_executor()
shares ownership of the executor associated with this taskflow object
Definition: basic_taskflow.hpp:611
size_t num_nodes() const
queries the number of nodes in the present task dependency graph
Definition: basic_taskflow.hpp:593
std::shared_future< void > dispatch()
dispatches the present graph to threads and returns immediately
Definition: basic_taskflow.hpp:644
Definition: taskflow.hpp:6
void silent_dispatch()
dispatches the present graph to threads and returns immediately
Definition: basic_taskflow.hpp:617
std::shared_future< void > run_n(Framework &framework, size_t N)
runs the framework for N times
Definition: basic_taskflow.hpp:272
T hardware_concurrency(T... args)
BasicTaskflow()
constructs the taskflow with std::thread::hardware_concurrency worker threads
Definition: basic_taskflow.hpp:560
A reusable task dependency graph.
Definition: framework.hpp:17
size_t num_topologies() const
queries the number of existing topologies
Definition: basic_taskflow.hpp:605
auto emplace(C &&callable)
creates a task from a given callable object
Definition: flow_builder.hpp:794
void wait_for_topologies()
blocks until all running topologies complete and then cleans up all associated storages ...
Definition: basic_taskflow.hpp:686
std::shared_future< void > run(Framework &framework)
runs the framework once
Definition: basic_taskflow.hpp:258
void wait_for_all()
dispatches the present graph to threads and wait for all topologies to complete
Definition: basic_taskflow.hpp:677
The building blocks of task dependency graphs.
Definition: flow_builder.hpp:13
size_t num_workers() const
queries the number of worker threads in the associated executor
Definition: basic_taskflow.hpp:599
E< Closure > Executor
alias of executor type
Definition: basic_taskflow.hpp:65
~BasicTaskflow()
destructs the taskflow
Definition: basic_taskflow.hpp:587
std::string dump() const
dumps the present task dependency graph in DOT format to a std::string
Definition: basic_taskflow.hpp:753
std::string dump_topologies() const
dumps the existing topologies in DOT format to a std::string
Definition: basic_taskflow.hpp:717