Cpp-Taskflow  2.3.0
flow_builder.hpp
1 #pragma once
2 
3 #include "task.hpp"
4 
5 namespace tf {
6 
13 class FlowBuilder {
14 
15  friend class Task;
16 
17  public:
18 
24  FlowBuilder(Graph& graph);
25 
35  template <typename C>
36  std::enable_if_t<is_static_task_v<C>, Task> emplace(C&& callable);
37 
47  template <typename C>
48  std::enable_if_t<is_dynamic_task_v<C>, Task> emplace(C&& callable);
49 
59  template <typename C>
60  std::enable_if_t<is_condition_task_v<C>, Task> emplace(C&& callable);
61 
71  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
72  auto emplace(C&&... callables);
73 
80  Task composed_of(Taskflow& taskflow);
81 
99  template <typename I, typename C>
100  std::pair<Task, Task> parallel_for(I beg, I end, C&& callable, size_t chunk=1);
101 
119  template <
120  typename I,
121  typename C,
122  std::enable_if_t<std::is_integral<std::decay_t<I>>::value, void>* = nullptr
123  >
125  I beg, I end, I step, C&& callable, size_t chunk = 1
126  );
127 
145  template <
146  typename I,
147  typename C,
148  std::enable_if_t<std::is_floating_point<std::decay_t<I>>::value, void>* = nullptr
149  >
151  I beg, I end, I step, C&& callable, size_t chunk = 1
152  );
153 
171  template <typename I, typename T, typename B>
172  std::pair<Task, Task> reduce(I beg, I end, T& result, B&& bop);
173 
189  template <typename I, typename T>
190  std::pair<Task, Task> reduce_min(I beg, I end, T& result);
191 
207  template <typename I, typename T>
208  std::pair<Task, Task> reduce_max(I beg, I end, T& result);
209 
231  template <typename I, typename T, typename B, typename U>
232  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop, U&& uop);
233 
258  template <typename I, typename T, typename B, typename P, typename U>
260  I beg, I end, T& result, B&& bop1, P&& bop2, U&& uop
261  );
262 
268  Task placeholder();
269 
276  void precede(Task A, Task B);
277 
283  void linearize(std::vector<Task>& tasks);
284 
291 
298  void broadcast(Task A, std::vector<Task>& others);
299 
307 
314  void gather(std::vector<Task>& others, Task A);
315 
322  void gather(std::initializer_list<Task> others, Task A);
323 
324  private:
325 
326  Graph& _graph;
327 
328  template <typename L>
329  void _linearize(L&);
330 };
331 
332 // Constructor
333 inline FlowBuilder::FlowBuilder(Graph& graph) :
334  _graph {graph} {
335 }
336 
337 // ----------------------------------------------------------------------------
338 
344 class Subflow : public FlowBuilder {
345 
346  public:
347 
351  template <typename... Args>
352  Subflow(Args&&... args);
353 
357  void join();
358 
362  void detach();
363 
367  bool detached() const;
368 
372  bool joined() const;
373 
374  private:
375 
376  bool _detached {false};
377 };
378 
379 // Constructor
380 template <typename... Args>
381 Subflow::Subflow(Args&&... args) :
382  FlowBuilder {std::forward<Args>(args)...} {
383 }
384 
385 // Procedure: join
386 inline void Subflow::join() {
387  _detached = false;
388 }
389 
390 // Procedure: detach
391 inline void Subflow::detach() {
392  _detached = true;
393 }
394 
395 // Function: detached
396 inline bool Subflow::detached() const {
397  return _detached;
398 }
399 
400 // Function: joined
401 inline bool Subflow::joined() const {
402  return !_detached;
403 }
404 
405 // ----------------------------------------------------------------------------
406 // Member definition of FlowBuilder
407 // ----------------------------------------------------------------------------
408 
409 // Function: emplace
410 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
411 auto FlowBuilder::emplace(C&&... cs) {
412  return std::make_tuple(emplace(std::forward<C>(cs))...);
413 }
414 
415 // Function: emplace
416 // emplaces a static task
417 template <typename C>
418 std::enable_if_t<is_static_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
419  auto n = _graph.emplace_back(
420  nstd::in_place_type_t<Node::StaticWork>{}, std::forward<C>(c)
421  );
422  return Task(n);
423 }
424 
425 // Function: emplace
426 // emplaces a dynamic task
427 template <typename C>
428 std::enable_if_t<is_dynamic_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
429  auto n = _graph.emplace_back(nstd::in_place_type_t<Node::DynamicWork>{},
430  [c=std::forward<C>(c)] (Subflow& fb) mutable {
431  // first time execution
432  if(fb._graph.empty()) {
433  c(fb);
434  }
435  });
436  return Task(n);
437 }
438 
439 // Function: emplace
440 // emplades a condition task
441 template <typename C>
442 std::enable_if_t<is_condition_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
443  auto n = _graph.emplace_back(
444  nstd::in_place_type_t<Node::ConditionWork>{}, std::forward<C>(c)
445  );
446  return Task(n);
447 }
448 
449 //template <typename C>
450 //Task FlowBuilder::emplace(C&& c) {
451 //
452 // // static task
453 // //if constexpr(is_static_task_v<C>) {
454 // if constexpr (is_invocable_r_v<void, C> && !is_invocable_r_v<int, C>) {
455 // auto n = _graph.emplace_back(
456 // nstd::in_place_type_t<Node::StaticWork>{}, std::forward<C>(c)
457 // );
458 // return Task(n);
459 // }
460 // // condition task
461 // else if constexpr(is_condition_task_v<C>) {
462 // auto n = _graph.emplace_back(
463 // nstd::in_place_type_t<Node::ConditionWork>{}, std::forward<C>(c)
464 // );
465 // return Task(n);
466 // }
467 // // dynamic task
468 // else if constexpr(is_dynamic_task_v<C>) {
469 // auto n = _graph.emplace_back(nstd::in_place_type_t<Node::DynamicWork>{},
470 // [c=std::forward<C>(c)] (Subflow& fb) mutable {
471 // // first time execution
472 // if(fb._graph.empty()) {
473 // c(fb);
474 // }
475 // });
476 // return Task(n);
477 // }
478 // else {
479 // static_assert(dependent_false_v<C>, "invalid task work type");
480 // }
481 //}
482 
483 // Function: composed_of
485  auto node = _graph.emplace_back(
486  nstd::in_place_type_t<Node::ModuleWork>{}, &taskflow
487  );
488  return Task(node);
489 }
490 
491 // Procedure: precede
492 inline void FlowBuilder::precede(Task from, Task to) {
493  from._node->_precede(to._node);
494 }
495 
496 // Procedure: broadcast
498  for(auto to : tos) {
499  from.precede(to);
500  }
501 }
502 
503 // Procedure: broadcast
505  for(auto to : tos) {
506  from.precede(to);
507  }
508 }
509 
510 // Function: gather
511 inline void FlowBuilder::gather(std::vector<Task>& froms, Task to) {
512  for(auto from : froms) {
513  to.succeed(from);
514  }
515 }
516 
517 // Function: gather
519  for(auto from : froms) {
520  to.succeed(from);
521  }
522 }
523 
524 // Function: placeholder
526  auto node = _graph.emplace_back();
527  return Task(node);
528 }
529 
530 // Function: parallel_for
531 template <typename I, typename C>
533  I beg, I end, C&& c, size_t chunk
534 ){
535 
536  //using category = typename std::iterator_traits<I>::iterator_category;
537 
538  auto S = placeholder();
539  auto T = placeholder();
540 
541  // default partition equals to the worker count
542  if(chunk == 0) {
543  chunk = 1;
544  }
545 
546  size_t remain = std::distance(beg, end);
547 
548  while(beg != end) {
549 
550  auto e = beg;
551 
552  auto x = std::min(remain, chunk);
553  std::advance(e, x);
554  remain -= x;
555 
556  // Create a task
557  auto task = emplace([beg, e, c] () mutable {
558  std::for_each(beg, e, c);
559  });
560 
561  S.precede(task);
562  task.precede(T);
563 
564  // adjust the pointer
565  beg = e;
566  }
567 
568  // special case
569  if(S.num_successors() == 0) {
570  S.precede(T);
571  }
572 
573  return std::make_pair(S, T);
574 }
575 
576 // Function: parallel_for
577 template <
578  typename I,
579  typename C,
580  std::enable_if_t<std::is_integral<std::decay_t<I>>::value, void>*
581 >
582 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t chunk) {
583 
584  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
585  TF_THROW("invalid range [", beg, ", ", end, ") with step size ", s);
586  }
587 
588  // source and target
589  auto source = placeholder();
590  auto target = placeholder();
591 
592  if(chunk == 0) {
593  chunk = 1;
594  }
595 
596  // positive case
597  if(beg < end) {
598  while(beg != end) {
599  auto o = static_cast<I>(chunk) * s;
600  auto e = std::min(beg + o, end);
601  auto task = emplace([=] () mutable {
602  for(auto i=beg; i<e; i+=s) {
603  c(i);
604  }
605  });
606  source.precede(task);
607  task.precede(target);
608  beg = e;
609  }
610  }
611  // negative case
612  else if(beg > end) {
613  while(beg != end) {
614  auto o = static_cast<I>(chunk) * s;
615  auto e = std::max(beg + o, end);
616  auto task = emplace([=] () mutable {
617  for(auto i=beg; i>e; i+=s) {
618  c(i);
619  }
620  });
621  source.precede(task);
622  task.precede(target);
623  beg = e;
624  }
625  }
626 
627  if(source.num_successors() == 0) {
628  source.precede(target);
629  }
630 
631  return std::make_pair(source, target);
632 }
633 
634 // Function: parallel_for
635 template <typename I, typename C,
636  std::enable_if_t<std::is_floating_point<std::decay_t<I>>::value, void>*
637 >
638 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t chunk) {
639 
640  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
641  TF_THROW("invalid range [", beg, ", ", end, ") with step size ", s);
642  }
643 
644  // source and target
645  auto source = placeholder();
646  auto target = placeholder();
647 
648  if(chunk == 0) {
649  chunk = 1;
650  }
651 
652  // positive case
653  if(beg < end) {
654  size_t N=0;
655  I b = beg;
656  for(I e=beg; e<end; e+=s) {
657  if(++N == chunk) {
658  auto task = emplace([=] () mutable {
659  for(size_t i=0; i<N; ++i, b+=s) {
660  c(b);
661  }
662  });
663  source.precede(task);
664  task.precede(target);
665  N = 0;
666  b = e;
667  }
668  }
669 
670  if(N) {
671  auto task = emplace([=] () mutable {
672  for(size_t i=0; i<N; ++i, b+=s) {
673  c(b);
674  }
675  });
676  source.precede(task);
677  task.precede(target);
678  }
679  }
680  else if(beg > end) {
681  size_t N=0;
682  I b = beg;
683  for(I e=beg; e>end; e+=s) {
684  if(++N == chunk) {
685  auto task = emplace([=] () mutable {
686  for(size_t i=0; i<N; ++i, b+=s) {
687  c(b);
688  }
689  });
690  source.precede(task);
691  task.precede(target);
692  N = 0;
693  b = e;
694  }
695  }
696 
697  if(N) {
698  auto task = emplace([=] () mutable {
699  for(size_t i=0; i<N; ++i, b+=s) {
700  c(b);
701  }
702  });
703  source.precede(task);
704  task.precede(target);
705  }
706  }
707 
708  if(source.num_successors() == 0) {
709  source.precede(target);
710  }
711 
712  return std::make_pair(source, target);
713 }
714 
715 // Function: reduce_min
716 // Find the minimum element over a range of items.
717 template <typename I, typename T>
719  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
720  return std::min(l, r);
721  });
722 }
723 
724 // Function: reduce_max
725 // Find the maximum element over a range of items.
726 template <typename I, typename T>
728  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
729  return std::max(l, r);
730  });
731 }
732 
733 // Function: transform_reduce
734 template <typename I, typename T, typename B, typename U>
736  I beg, I end, T& result, B&& bop, U&& uop
737 ) {
738 
739  //using category = typename std::iterator_traits<I>::iterator_category;
740 
741  // Even partition
742  size_t d = std::distance(beg, end);
743  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
744  size_t g = std::max((d + w - 1) / w, size_t{2});
745 
746  auto source = placeholder();
747  auto target = placeholder();
748 
749  //std::vector<std::future<T>> futures;
750  auto g_results = std::make_unique<T[]>(w);
751  size_t id {0};
752 
753  size_t remain = d;
754 
755  while(beg != end) {
756 
757  auto e = beg;
758 
759  size_t x = std::min(remain, g);
760  std::advance(e, x);
761  remain -= x;
762 
763  // Create a task
764  auto task = emplace([beg, e, bop, uop, res=&(g_results[id])] () mutable {
765  *res = uop(*beg);
766  for(++beg; beg != e; ++beg) {
767  *res = bop(std::move(*res), uop(*beg));
768  }
769  });
770 
771  source.precede(task);
772  task.precede(target);
773 
774  // adjust the pointer
775  beg = e;
776  id ++;
777  }
778 
779  // target synchronizer
780  target.work([&result, bop, res=make_moc(std::move(g_results)), w=id] () {
781  for(auto i=0u; i<w; i++) {
782  result = bop(std::move(result), res.object[i]);
783  }
784  });
785 
786  return std::make_pair(source, target);
787 }
788 
789 // Function: transform_reduce
790 template <typename I, typename T, typename B, typename P, typename U>
792  I beg, I end, T& result, B&& bop, P&& pop, U&& uop
793 ) {
794 
795  //using category = typename std::iterator_traits<I>::iterator_category;
796 
797  // Even partition
798  size_t d = std::distance(beg, end);
799  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
800  size_t g = std::max((d + w - 1) / w, size_t{2});
801 
802  auto source = placeholder();
803  auto target = placeholder();
804 
805  auto g_results = std::make_unique<T[]>(w);
806 
807  size_t id {0};
808  size_t remain = d;
809 
810  while(beg != end) {
811 
812  auto e = beg;
813 
814  size_t x = std::min(remain, g);
815  std::advance(e, x);
816  remain -= x;
817 
818  // Create a task
819  auto task = emplace([beg, e, uop, pop, res= &g_results[id]] () mutable {
820  *res = uop(*beg);
821  for(++beg; beg != e; ++beg) {
822  *res = pop(std::move(*res), *beg);
823  }
824  });
825  source.precede(task);
826  task.precede(target);
827 
828  // adjust the pointer
829  beg = e;
830  id ++;
831  }
832 
833  // target synchronizer
834  target.work([&result, bop, g_results=make_moc(std::move(g_results)), w=id] () {
835  for(auto i=0u; i<w; i++) {
836  result = bop(std::move(result), std::move(g_results.object[i]));
837  }
838  });
839 
840  return std::make_pair(source, target);
841 }
842 
843 // Procedure: _linearize
844 template <typename L>
845 void FlowBuilder::_linearize(L& keys) {
846 
847  auto itr = keys.begin();
848  auto end = keys.end();
849 
850  if(itr == end) {
851  return;
852  }
853 
854  auto nxt = itr;
855 
856  for(++nxt; nxt != end; ++nxt, ++itr) {
857  itr->_node->_precede(nxt->_node);
858  }
859 }
860 
861 // Procedure: linearize
863  _linearize(keys);
864 }
865 
866 // Procedure: linearize
868  _linearize(keys);
869 }
870 
871 // Proceduer: reduce
872 template <typename I, typename T, typename B>
873 std::pair<Task, Task> FlowBuilder::reduce(I beg, I end, T& result, B&& op) {
874 
875  //using category = typename std::iterator_traits<I>::iterator_category;
876 
877  size_t d = std::distance(beg, end);
878  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
879  size_t g = std::max((d + w - 1) / w, size_t{2});
880 
881  auto source = placeholder();
882  auto target = placeholder();
883 
884  //T* g_results = static_cast<T*>(malloc(sizeof(T)*w));
885  auto g_results = std::make_unique<T[]>(w);
886  //std::vector<std::future<T>> futures;
887 
888  size_t id {0};
889  size_t remain = d;
890 
891  while(beg != end) {
892 
893  auto e = beg;
894 
895  size_t x = std::min(remain, g);
896  std::advance(e, x);
897  remain -= x;
898 
899  // Create a task
900  //auto [task, future] = emplace([beg, e, op] () mutable {
901  auto task = emplace([beg, e, op, res = &g_results[id]] () mutable {
902  *res = *beg;
903  for(++beg; beg != e; ++beg) {
904  *res = op(std::move(*res), *beg);
905  }
906  //auto init = *beg;
907  //for(++beg; beg != e; ++beg) {
908  // init = op(std::move(init), *beg);
909  //}
910  //return init;
911  });
912  source.precede(task);
913  task.precede(target);
914  //futures.push_back(std::move(future));
915 
916  // adjust the pointer
917  beg = e;
918  id ++;
919  }
920 
921  // target synchronizer
922  //target.work([&result, futures=MoC{std::move(futures)}, op] () {
923  // for(auto& fu : futures.object) {
924  // result = op(std::move(result), fu.get());
925  // }
926  //});
927  target.work([g_results=make_moc(std::move(g_results)), &result, op, w=id] () {
928  for(auto i=0u; i<w; i++) {
929  result = op(std::move(result), g_results.object[i]);
930  }
931  });
932 
933  return std::make_pair(source, target);
934 }
935 
936 // ----------------------------------------------------------------------------
937 // Cyclic Dependency: Task
938 // ----------------------------------------------------------------------------
939 
940 // Function: work
941 // assign a static work
942 template <typename C>
943 std::enable_if_t<is_static_task_v<C>, Task>& Task::work(C&& c) {
944  _node->_handle.emplace<Node::StaticWork>(std::forward<C>(c));
945  return *this;
946 }
947 
948 // Function: work
949 // assigns a dynamic work
950 template <typename C>
951 std::enable_if_t<is_dynamic_task_v<C>, Task>& Task::work(C&& c) {
952  _node->_handle.emplace<Node::DynamicWork>(
953  [c=std::forward<C>(c)] (Subflow& fb) mutable {
954  // first time execution
955  if(fb._graph.empty()) {
956  c(fb);
957  }
958  });
959  return *this;
960 }
961 
962 // Function: work
963 // assigns a condition work
964 template <typename C>
965 std::enable_if_t<is_condition_task_v<C>, Task>& Task::work(C&& c) {
966  _node->_handle.emplace<Node::ConditionWork>(std::forward<C>(c));
967  return *this;
968 }
969 
970 
971 // Function: work
972 //template <typename C>
973 //Task& Task::work(C&& c) {
974 //
975 // // static tasking
976 // if constexpr(is_static_task_v<C>) {
977 // _node->_handle.emplace<Node::StaticWork>(std::forward<C>(c));
978 // }
979 // // condition tasking
980 // else if constexpr(is_condition_task_v<C>) {
981 // _node->_handle.emplace<Node::ConditionWork>(std::forward<C>(c));
982 // }
983 // // dyanmic tasking
984 // else if constexpr(is_dynamic_task_v<C>) {
985 // _node->_handle.emplace<Node::DynamicWork>(
986 // [c=std::forward<C>(c)] (Subflow& fb) mutable {
987 // // first time execution
988 // if(fb._graph.empty()) {
989 // c(fb);
990 // }
991 // });
992 // }
993 // else {
994 // static_assert(dependent_false_v<C>, "invalid task work type");
995 // }
996 //
997 // return *this;
998 //}
999 
1000 // ----------------------------------------------------------------------------
1001 // Legacy code
1002 // ----------------------------------------------------------------------------
1003 
1004 
1005 using SubflowBuilder = Subflow;
1006 
1007 } // end of namespace tf. ---------------------------------------------------
1008 
1009 
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition: flow_builder.hpp:862
T distance(T... args)
void broadcast(Task A, std::vector< Task > &others)
adds dependency links from one task A to many tasks
Definition: flow_builder.hpp:497
std::pair< Task, Task > transform_reduce(I beg, I end, T &result, B &&bop, U &&uop)
constructs a task dependency graph of parallel transformation and reduction
Definition: flow_builder.hpp:735
T advance(T... args)
T make_tuple(T... args)
Definition: taskflow.hpp:5
T hardware_concurrency(T... args)
std::enable_if_t< is_static_task_v< C >, Task > & work(C &&callable)
assigns a static task
Definition: flow_builder.hpp:943
Task placeholder()
creates an empty task
Definition: flow_builder.hpp:525
Subflow(Args &&... args)
constructs a subflow builder object
Definition: flow_builder.hpp:381
void detach()
enables the subflow to detach from its parent task
Definition: flow_builder.hpp:391
T min(T... args)
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:396
std::pair< Task, Task > reduce_max(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::max
Definition: flow_builder.hpp:727
Task & succeed(Ts &&... tasks)
adds precedence links from other tasks to this
Definition: task.hpp:274
std::enable_if_t< is_static_task_v< C >, Task > emplace(C &&callable)
creates a static task from a given callable object
Definition: flow_builder.hpp:418
std::pair< Task, Task > parallel_for(I beg, I end, C &&callable, size_t chunk=1)
constructs a task dependency graph of range-based parallel_for
Definition: flow_builder.hpp:532
Task composed_of(Taskflow &taskflow)
creates a module task from a taskflow
Definition: flow_builder.hpp:484
void precede(Task A, Task B)
adds a dependency link from task A to task B
Definition: flow_builder.hpp:492
T make_pair(T... args)
the class to create a task dependency graph
Definition: core/taskflow.hpp:18
void gather(std::vector< Task > &others, Task A)
adds dependency links from many tasks to one task A
Definition: flow_builder.hpp:511
T max(T... args)
T move(T... args)
FlowBuilder(Graph &graph)
construct a flow builder object
Definition: flow_builder.hpp:333
Building blocks of a task dependency graph.
Definition: flow_builder.hpp:13
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:401
task handle to a node in a task dependency graph
Definition: task.hpp:51
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition: task.hpp:253
std::pair< Task, Task > reduce(I beg, I end, T &result, B &&bop)
construct a task dependency graph of parallel reduction
Definition: flow_builder.hpp:873
T for_each(T... args)
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:344
void join()
enables the subflow to join its parent task
Definition: flow_builder.hpp:386
std::pair< Task, Task > reduce_min(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::min
Definition: flow_builder.hpp:718