Cpp-Taskflow  2.3.1
flow_builder.hpp
1 #pragma once
2 
3 #include "task.hpp"
4 
5 namespace tf {
6 
13 class FlowBuilder {
14 
15  friend class Task;
16 
17  public:
18 
24  FlowBuilder(Graph& graph);
25 
35  template <typename C>
36  std::enable_if_t<is_static_task_v<C>, Task> emplace(C&& callable);
37 
47  template <typename C>
48  std::enable_if_t<is_dynamic_task_v<C>, Task> emplace(C&& callable);
49 
59  template <typename C>
60  std::enable_if_t<is_condition_task_v<C>, Task> emplace(C&& callable);
61 
62 #ifdef TF_ENABLE_CUDA
63 
72  template <typename C>
73  std::enable_if_t<is_cudaflow_task_v<C>, Task> emplace(C&& callable);
74 #endif
75 
85  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
86  auto emplace(C&&... callables);
87 
94  Task composed_of(Taskflow& taskflow);
95 
113  template <typename I, typename C>
114  std::pair<Task, Task> parallel_for(I beg, I end, C&& callable, size_t chunk=1);
115 
133  template <
134  typename I,
135  typename C,
136  std::enable_if_t<std::is_integral<std::decay_t<I>>::value, void>* = nullptr
137  >
139  I beg, I end, I step, C&& callable, size_t chunk = 1
140  );
141 
159  template <
160  typename I,
161  typename C,
162  std::enable_if_t<std::is_floating_point<std::decay_t<I>>::value, void>* = nullptr
163  >
165  I beg, I end, I step, C&& callable, size_t chunk = 1
166  );
167 
185  template <typename I, typename T, typename B>
186  std::pair<Task, Task> reduce(I beg, I end, T& result, B&& bop);
187 
203  template <typename I, typename T>
204  std::pair<Task, Task> reduce_min(I beg, I end, T& result);
205 
221  template <typename I, typename T>
222  std::pair<Task, Task> reduce_max(I beg, I end, T& result);
223 
245  template <typename I, typename T, typename B, typename U>
246  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop, U&& uop);
247 
272  template <typename I, typename T, typename B, typename P, typename U>
274  I beg, I end, T& result, B&& bop1, P&& bop2, U&& uop
275  );
276 
282  Task placeholder();
283 
290  void precede(Task A, Task B);
291 
297  void linearize(std::vector<Task>& tasks);
298 
305 
312  void broadcast(Task A, std::vector<Task>& others);
313 
321 
328  void gather(std::vector<Task>& others, Task A);
329 
336  void gather(std::initializer_list<Task> others, Task A);
337 
338  private:
339 
340  Graph& _graph;
341 
342  template <typename L>
343  void _linearize(L&);
344 };
345 
346 // Constructor
347 inline FlowBuilder::FlowBuilder(Graph& graph) :
348  _graph {graph} {
349 }
350 
351 // ----------------------------------------------------------------------------
352 
358 class Subflow : public FlowBuilder {
359 
360  public:
361 
365  template <typename... Args>
366  Subflow(Args&&... args);
367 
371  void join();
372 
376  void detach();
377 
381  bool detached() const;
382 
386  bool joined() const;
387 
388  private:
389 
390  bool _detached {false};
391 };
392 
393 // Constructor
394 template <typename... Args>
395 Subflow::Subflow(Args&&... args) :
396  FlowBuilder {std::forward<Args>(args)...} {
397 }
398 
399 // Procedure: join
400 inline void Subflow::join() {
401  _detached = false;
402 }
403 
404 // Procedure: detach
405 inline void Subflow::detach() {
406  _detached = true;
407 }
408 
409 // Function: detached
410 inline bool Subflow::detached() const {
411  return _detached;
412 }
413 
414 // Function: joined
415 inline bool Subflow::joined() const {
416  return !_detached;
417 }
418 
419 // ----------------------------------------------------------------------------
420 // Member definition of FlowBuilder
421 // ----------------------------------------------------------------------------
422 
423 // Function: emplace
424 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
425 auto FlowBuilder::emplace(C&&... cs) {
426  return std::make_tuple(emplace(std::forward<C>(cs))...);
427 }
428 
429 // Function: emplace
430 // emplaces a static task
431 template <typename C>
432 std::enable_if_t<is_static_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
433  auto n = _graph.emplace_back(
434  nstd::in_place_type_t<Node::StaticWork>{}, std::forward<C>(c)
435  );
436  return Task(n);
437 }
438 
439 // Function: emplace
440 // emplaces a dynamic task
441 template <typename C>
442 std::enable_if_t<is_dynamic_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
443  auto n = _graph.emplace_back(nstd::in_place_type_t<Node::DynamicWork>{},
444  [c=std::forward<C>(c)] (Subflow& fb) mutable {
445  // first time execution
446  if(fb._graph.empty()) {
447  c(fb);
448  }
449  });
450  return Task(n);
451 }
452 
453 // Function: emplace
454 // emplaces a condition task
455 template <typename C>
456 std::enable_if_t<is_condition_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
457  auto n = _graph.emplace_back(
458  nstd::in_place_type_t<Node::ConditionWork>{}, std::forward<C>(c)
459  );
460  return Task(n);
461 }
462 
463 #ifdef TF_ENABLE_CUDA
464 // Function: emplace
465 // emplaces a cudaflow task
466 template <typename C>
467 std::enable_if_t<is_cudaflow_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
468  auto n = _graph.emplace_back(
469  nstd::in_place_type_t<Node::cudaFlowWork>{}, std::forward<C>(c)
470  );
471  return Task(n);
472 }
473 #endif
474 
475 //template <typename C>
476 //Task FlowBuilder::emplace(C&& c) {
477 //
478 // // static task
479 // //if constexpr(is_static_task_v<C>) {
480 // if constexpr (is_invocable_r_v<void, C> && !is_invocable_r_v<int, C>) {
481 // auto n = _graph.emplace_back(
482 // nstd::in_place_type_t<Node::StaticWork>{}, std::forward<C>(c)
483 // );
484 // return Task(n);
485 // }
486 // // condition task
487 // else if constexpr(is_condition_task_v<C>) {
488 // auto n = _graph.emplace_back(
489 // nstd::in_place_type_t<Node::ConditionWork>{}, std::forward<C>(c)
490 // );
491 // return Task(n);
492 // }
493 // // dynamic task
494 // else if constexpr(is_dynamic_task_v<C>) {
495 // auto n = _graph.emplace_back(nstd::in_place_type_t<Node::DynamicWork>{},
496 // [c=std::forward<C>(c)] (Subflow& fb) mutable {
497 // // first time execution
498 // if(fb._graph.empty()) {
499 // c(fb);
500 // }
501 // });
502 // return Task(n);
503 // }
504 // else {
505 // static_assert(dependent_false_v<C>, "invalid task work type");
506 // }
507 //}
508 
509 // Function: composed_of
511  auto node = _graph.emplace_back(
512  nstd::in_place_type_t<Node::ModuleWork>{}, &taskflow
513  );
514  return Task(node);
515 }
516 
517 // Procedure: precede
518 inline void FlowBuilder::precede(Task from, Task to) {
519  from._node->_precede(to._node);
520 }
521 
522 // Procedure: broadcast
524  for(auto to : tos) {
525  from.precede(to);
526  }
527 }
528 
529 // Procedure: broadcast
531  for(auto to : tos) {
532  from.precede(to);
533  }
534 }
535 
536 // Function: gather
537 inline void FlowBuilder::gather(std::vector<Task>& froms, Task to) {
538  for(auto from : froms) {
539  to.succeed(from);
540  }
541 }
542 
543 // Function: gather
545  for(auto from : froms) {
546  to.succeed(from);
547  }
548 }
549 
550 // Function: placeholder
552  auto node = _graph.emplace_back();
553  return Task(node);
554 }
555 
556 // Function: parallel_for
557 template <typename I, typename C>
559  I beg, I end, C&& c, size_t chunk
560 ){
561 
562  //using category = typename std::iterator_traits<I>::iterator_category;
563 
564  auto S = placeholder();
565  auto T = placeholder();
566 
567  // default partition equals to the worker count
568  if(chunk == 0) {
569  chunk = 1;
570  }
571 
572  size_t remain = std::distance(beg, end);
573 
574  while(beg != end) {
575 
576  auto e = beg;
577 
578  auto x = std::min(remain, chunk);
579  std::advance(e, x);
580  remain -= x;
581 
582  // Create a task
583  auto task = emplace([beg, e, c] () mutable {
584  std::for_each(beg, e, c);
585  });
586 
587  S.precede(task);
588  task.precede(T);
589 
590  // adjust the pointer
591  beg = e;
592  }
593 
594  // special case
595  if(S.num_successors() == 0) {
596  S.precede(T);
597  }
598 
599  return std::make_pair(S, T);
600 }
601 
602 // Function: parallel_for
603 template <
604  typename I,
605  typename C,
606  std::enable_if_t<std::is_integral<std::decay_t<I>>::value, void>*
607 >
608 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t chunk) {
609 
610  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
611  TF_THROW("invalid range [", beg, ", ", end, ") with step size ", s);
612  }
613 
614  // source and target
615  auto source = placeholder();
616  auto target = placeholder();
617 
618  if(chunk == 0) {
619  chunk = 1;
620  }
621 
622  // positive case
623  if(beg < end) {
624  while(beg != end) {
625  auto o = static_cast<I>(chunk) * s;
626  auto e = std::min(beg + o, end);
627  auto task = emplace([=] () mutable {
628  for(auto i=beg; i<e; i+=s) {
629  c(i);
630  }
631  });
632  source.precede(task);
633  task.precede(target);
634  beg = e;
635  }
636  }
637  // negative case
638  else if(beg > end) {
639  while(beg != end) {
640  auto o = static_cast<I>(chunk) * s;
641  auto e = std::max(beg + o, end);
642  auto task = emplace([=] () mutable {
643  for(auto i=beg; i>e; i+=s) {
644  c(i);
645  }
646  });
647  source.precede(task);
648  task.precede(target);
649  beg = e;
650  }
651  }
652 
653  if(source.num_successors() == 0) {
654  source.precede(target);
655  }
656 
657  return std::make_pair(source, target);
658 }
659 
660 // Function: parallel_for
661 template <typename I, typename C,
662  std::enable_if_t<std::is_floating_point<std::decay_t<I>>::value, void>*
663 >
664 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t chunk) {
665 
666  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
667  TF_THROW("invalid range [", beg, ", ", end, ") with step size ", s);
668  }
669 
670  // source and target
671  auto source = placeholder();
672  auto target = placeholder();
673 
674  if(chunk == 0) {
675  chunk = 1;
676  }
677 
678  // positive case
679  if(beg < end) {
680  size_t N=0;
681  I b = beg;
682  for(I e=beg; e<end; e+=s) {
683  if(++N == chunk) {
684  auto task = emplace([=] () mutable {
685  for(size_t i=0; i<N; ++i, b+=s) {
686  c(b);
687  }
688  });
689  source.precede(task);
690  task.precede(target);
691  N = 0;
692  b = e;
693  }
694  }
695 
696  if(N) {
697  auto task = emplace([=] () mutable {
698  for(size_t i=0; i<N; ++i, b+=s) {
699  c(b);
700  }
701  });
702  source.precede(task);
703  task.precede(target);
704  }
705  }
706  else if(beg > end) {
707  size_t N=0;
708  I b = beg;
709  for(I e=beg; e>end; e+=s) {
710  if(++N == chunk) {
711  auto task = emplace([=] () mutable {
712  for(size_t i=0; i<N; ++i, b+=s) {
713  c(b);
714  }
715  });
716  source.precede(task);
717  task.precede(target);
718  N = 0;
719  b = e;
720  }
721  }
722 
723  if(N) {
724  auto task = emplace([=] () mutable {
725  for(size_t i=0; i<N; ++i, b+=s) {
726  c(b);
727  }
728  });
729  source.precede(task);
730  task.precede(target);
731  }
732  }
733 
734  if(source.num_successors() == 0) {
735  source.precede(target);
736  }
737 
738  return std::make_pair(source, target);
739 }
740 
741 // Function: reduce_min
742 // Find the minimum element over a range of items.
743 template <typename I, typename T>
745  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
746  return std::min(l, r);
747  });
748 }
749 
750 // Function: reduce_max
751 // Find the maximum element over a range of items.
752 template <typename I, typename T>
754  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
755  return std::max(l, r);
756  });
757 }
758 
759 // Function: transform_reduce
760 template <typename I, typename T, typename B, typename U>
762  I beg, I end, T& result, B&& bop, U&& uop
763 ) {
764 
765  //using category = typename std::iterator_traits<I>::iterator_category;
766 
767  // Even partition
768  size_t d = std::distance(beg, end);
769  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
770  size_t g = std::max((d + w - 1) / w, size_t{2});
771 
772  auto source = placeholder();
773  auto target = placeholder();
774 
775  //std::vector<std::future<T>> futures;
776  auto g_results = std::make_unique<T[]>(w);
777  size_t id {0};
778 
779  size_t remain = d;
780 
781  while(beg != end) {
782 
783  auto e = beg;
784 
785  size_t x = std::min(remain, g);
786  std::advance(e, x);
787  remain -= x;
788 
789  // Create a task
790  auto task = emplace([beg, e, bop, uop, res=&(g_results[id])] () mutable {
791  *res = uop(*beg);
792  for(++beg; beg != e; ++beg) {
793  *res = bop(std::move(*res), uop(*beg));
794  }
795  });
796 
797  source.precede(task);
798  task.precede(target);
799 
800  // adjust the pointer
801  beg = e;
802  id ++;
803  }
804 
805  // target synchronizer
806  target.work([&result, bop, res=make_moc(std::move(g_results)), w=id] () {
807  for(auto i=0u; i<w; i++) {
808  result = bop(std::move(result), res.object[i]);
809  }
810  });
811 
812  return std::make_pair(source, target);
813 }
814 
815 // Function: transform_reduce
816 template <typename I, typename T, typename B, typename P, typename U>
818  I beg, I end, T& result, B&& bop, P&& pop, U&& uop
819 ) {
820 
821  //using category = typename std::iterator_traits<I>::iterator_category;
822 
823  // Even partition
824  size_t d = std::distance(beg, end);
825  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
826  size_t g = std::max((d + w - 1) / w, size_t{2});
827 
828  auto source = placeholder();
829  auto target = placeholder();
830 
831  auto g_results = std::make_unique<T[]>(w);
832 
833  size_t id {0};
834  size_t remain = d;
835 
836  while(beg != end) {
837 
838  auto e = beg;
839 
840  size_t x = std::min(remain, g);
841  std::advance(e, x);
842  remain -= x;
843 
844  // Create a task
845  auto task = emplace([beg, e, uop, pop, res= &g_results[id]] () mutable {
846  *res = uop(*beg);
847  for(++beg; beg != e; ++beg) {
848  *res = pop(std::move(*res), *beg);
849  }
850  });
851  source.precede(task);
852  task.precede(target);
853 
854  // adjust the pointer
855  beg = e;
856  id ++;
857  }
858 
859  // target synchronizer
860  target.work([&result, bop, g_results=make_moc(std::move(g_results)), w=id] () {
861  for(auto i=0u; i<w; i++) {
862  result = bop(std::move(result), std::move(g_results.object[i]));
863  }
864  });
865 
866  return std::make_pair(source, target);
867 }
868 
869 // Procedure: _linearize
870 template <typename L>
871 void FlowBuilder::_linearize(L& keys) {
872 
873  auto itr = keys.begin();
874  auto end = keys.end();
875 
876  if(itr == end) {
877  return;
878  }
879 
880  auto nxt = itr;
881 
882  for(++nxt; nxt != end; ++nxt, ++itr) {
883  itr->_node->_precede(nxt->_node);
884  }
885 }
886 
887 // Procedure: linearize
889  _linearize(keys);
890 }
891 
892 // Procedure: linearize
894  _linearize(keys);
895 }
896 
897 // Proceduer: reduce
898 template <typename I, typename T, typename B>
899 std::pair<Task, Task> FlowBuilder::reduce(I beg, I end, T& result, B&& op) {
900 
901  //using category = typename std::iterator_traits<I>::iterator_category;
902 
903  size_t d = std::distance(beg, end);
904  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
905  size_t g = std::max((d + w - 1) / w, size_t{2});
906 
907  auto source = placeholder();
908  auto target = placeholder();
909 
910  //T* g_results = static_cast<T*>(malloc(sizeof(T)*w));
911  auto g_results = std::make_unique<T[]>(w);
912  //std::vector<std::future<T>> futures;
913 
914  size_t id {0};
915  size_t remain = d;
916 
917  while(beg != end) {
918 
919  auto e = beg;
920 
921  size_t x = std::min(remain, g);
922  std::advance(e, x);
923  remain -= x;
924 
925  // Create a task
926  //auto [task, future] = emplace([beg, e, op] () mutable {
927  auto task = emplace([beg, e, op, res = &g_results[id]] () mutable {
928  *res = *beg;
929  for(++beg; beg != e; ++beg) {
930  *res = op(std::move(*res), *beg);
931  }
932  //auto init = *beg;
933  //for(++beg; beg != e; ++beg) {
934  // init = op(std::move(init), *beg);
935  //}
936  //return init;
937  });
938  source.precede(task);
939  task.precede(target);
940  //futures.push_back(std::move(future));
941 
942  // adjust the pointer
943  beg = e;
944  id ++;
945  }
946 
947  // target synchronizer
948  //target.work([&result, futures=MoC{std::move(futures)}, op] () {
949  // for(auto& fu : futures.object) {
950  // result = op(std::move(result), fu.get());
951  // }
952  //});
953  target.work([g_results=make_moc(std::move(g_results)), &result, op, w=id] () {
954  for(auto i=0u; i<w; i++) {
955  result = op(std::move(result), g_results.object[i]);
956  }
957  });
958 
959  return std::make_pair(source, target);
960 }
961 
962 // ----------------------------------------------------------------------------
963 // Cyclic Dependency: Task
964 // ----------------------------------------------------------------------------
965 
966 // Function: work
967 // assign a static work
968 template <typename C>
969 std::enable_if_t<is_static_task_v<C>, Task>& Task::work(C&& c) {
970  _node->_handle.emplace<Node::StaticWork>(std::forward<C>(c));
971  return *this;
972 }
973 
974 // Function: work
975 // assigns a dynamic work
976 template <typename C>
977 std::enable_if_t<is_dynamic_task_v<C>, Task>& Task::work(C&& c) {
978  _node->_handle.emplace<Node::DynamicWork>(
979  [c=std::forward<C>(c)] (Subflow& fb) mutable {
980  // first time execution
981  if(fb._graph.empty()) {
982  c(fb);
983  }
984  });
985  return *this;
986 }
987 
988 // Function: work
989 // assigns a condition work
990 template <typename C>
991 std::enable_if_t<is_condition_task_v<C>, Task>& Task::work(C&& c) {
992  _node->_handle.emplace<Node::ConditionWork>(std::forward<C>(c));
993  return *this;
994 }
995 
996 
997 // Function: work
998 //template <typename C>
999 //Task& Task::work(C&& c) {
1000 //
1001 // // static tasking
1002 // if constexpr(is_static_task_v<C>) {
1003 // _node->_handle.emplace<Node::StaticWork>(std::forward<C>(c));
1004 // }
1005 // // condition tasking
1006 // else if constexpr(is_condition_task_v<C>) {
1007 // _node->_handle.emplace<Node::ConditionWork>(std::forward<C>(c));
1008 // }
1009 // // dyanmic tasking
1010 // else if constexpr(is_dynamic_task_v<C>) {
1011 // _node->_handle.emplace<Node::DynamicWork>(
1012 // [c=std::forward<C>(c)] (Subflow& fb) mutable {
1013 // // first time execution
1014 // if(fb._graph.empty()) {
1015 // c(fb);
1016 // }
1017 // });
1018 // }
1019 // else {
1020 // static_assert(dependent_false_v<C>, "invalid task work type");
1021 // }
1022 //
1023 // return *this;
1024 //}
1025 
1026 // ----------------------------------------------------------------------------
1027 // Legacy code
1028 // ----------------------------------------------------------------------------
1029 
1030 
1031 using SubflowBuilder = Subflow;
1032 
1033 } // end of namespace tf. ---------------------------------------------------
1034 
1035 
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition: flow_builder.hpp:888
T distance(T... args)
void broadcast(Task A, std::vector< Task > &others)
adds dependency links from one task A to many tasks
Definition: flow_builder.hpp:523
std::pair< Task, Task > transform_reduce(I beg, I end, T &result, B &&bop, U &&uop)
constructs a task dependency graph of parallel transformation and reduction
Definition: flow_builder.hpp:761
T advance(T... args)
T make_tuple(T... args)
T end(T... args)
Definition: error.hpp:9
T hardware_concurrency(T... args)
std::enable_if_t< is_static_task_v< C >, Task > & work(C &&callable)
assigns a static task
Definition: flow_builder.hpp:969
Task placeholder()
creates an empty task
Definition: flow_builder.hpp:551
Subflow(Args &&... args)
constructs a subflow builder object
Definition: flow_builder.hpp:395
void detach()
enables the subflow to detach from its parent task
Definition: flow_builder.hpp:405
T min(T... args)
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:410
std::pair< Task, Task > reduce_max(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::max
Definition: flow_builder.hpp:753
Task & succeed(Ts &&... tasks)
adds precedence links from other tasks to this
Definition: task.hpp:276
std::enable_if_t< is_static_task_v< C >, Task > emplace(C &&callable)
creates a static task from a given callable object
Definition: flow_builder.hpp:432
std::pair< Task, Task > parallel_for(I beg, I end, C &&callable, size_t chunk=1)
constructs a task dependency graph of range-based parallel_for
Definition: flow_builder.hpp:558
Task composed_of(Taskflow &taskflow)
creates a module task from a taskflow
Definition: flow_builder.hpp:510
void precede(Task A, Task B)
adds a dependency link from task A to task B
Definition: flow_builder.hpp:518
T make_pair(T... args)
the class to create a task dependency graph
Definition: taskflow.hpp:18
void gather(std::vector< Task > &others, Task A)
adds dependency links from many tasks to one task A
Definition: flow_builder.hpp:537
T max(T... args)
T move(T... args)
FlowBuilder(Graph &graph)
constructs a flow builder object
Definition: flow_builder.hpp:347
Building methods of a task dependency graph.
Definition: flow_builder.hpp:13
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:415
handle to a node in a task dependency graph
Definition: task.hpp:51
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition: task.hpp:253
std::pair< Task, Task > reduce(I beg, I end, T &result, B &&bop)
construct a task dependency graph of parallel reduction
Definition: flow_builder.hpp:899
T for_each(T... args)
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:358
void join()
enables the subflow to join its parent task
Definition: flow_builder.hpp:400
std::pair< Task, Task > reduce_min(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::min
Definition: flow_builder.hpp:744