Cpp-Taskflow  2.4-master-branch
flow_builder.hpp
1 #pragma once
2 
3 #include "task.hpp"
4 
5 namespace tf {
6 
13 class FlowBuilder {
14 
15  friend class Task;
16 
17  public:
18 
24  FlowBuilder(Graph& graph);
25 
35  template <typename C>
36  std::enable_if_t<is_static_task_v<C>, Task> emplace(C&& callable);
37 
47  template <typename C>
48  std::enable_if_t<is_dynamic_task_v<C>, Task> emplace(C&& callable);
49 
59  template <typename C>
60  std::enable_if_t<is_condition_task_v<C>, Task> emplace(C&& callable);
61 
62 #ifdef TF_ENABLE_CUDA
63 
72  template <typename C>
73  std::enable_if_t<is_cudaflow_task_v<C>, Task> emplace(C&& callable);
74 #endif
75 
85  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
86  auto emplace(C&&... callables);
87 
94  Task composed_of(Taskflow& taskflow);
95 
113  template <typename I, typename C>
114  std::pair<Task, Task> parallel_for(I beg, I end, C&& callable, size_t chunk=1);
115 
133  template <
134  typename I,
135  typename C,
136  std::enable_if_t<std::is_integral<std::decay_t<I>>::value, void>* = nullptr
137  >
139  I beg, I end, I step, C&& callable, size_t chunk = 1
140  );
141 
159  template <
160  typename I,
161  typename C,
162  std::enable_if_t<std::is_floating_point<std::decay_t<I>>::value, void>* = nullptr
163  >
165  I beg, I end, I step, C&& callable, size_t chunk = 1
166  );
167 
185  template <typename I, typename T, typename B>
186  std::pair<Task, Task> reduce(I beg, I end, T& result, B&& bop);
187 
203  template <typename I, typename T>
204  std::pair<Task, Task> reduce_min(I beg, I end, T& result);
205 
221  template <typename I, typename T>
222  std::pair<Task, Task> reduce_max(I beg, I end, T& result);
223 
245  template <typename I, typename T, typename B, typename U>
246  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop, U&& uop);
247 
272  template <typename I, typename T, typename B, typename P, typename U>
274  I beg, I end, T& result, B&& bop1, P&& bop2, U&& uop
275  );
276 
282  Task placeholder();
283 
290  void precede(Task A, Task B);
291 
297  void linearize(std::vector<Task>& tasks);
298 
305 
312  void broadcast(Task A, std::vector<Task>& others);
313 
321 
328  void succeed(std::vector<Task>& others, Task A);
329 
336  void succeed(std::initializer_list<Task> others, Task A);
337 
338  private:
339 
340  Graph& _graph;
341 
342  template <typename L>
343  void _linearize(L&);
344 };
345 
346 // Constructor
347 inline FlowBuilder::FlowBuilder(Graph& graph) :
348  _graph {graph} {
349 }
350 
351 // Function: emplace
352 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
353 auto FlowBuilder::emplace(C&&... cs) {
354  return std::make_tuple(emplace(std::forward<C>(cs))...);
355 }
356 
357 // Function: emplace
358 // emplaces a static task
359 template <typename C>
360 std::enable_if_t<is_static_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
361  auto n = _graph.emplace_back(
362  nstd::in_place_type_t<Node::StaticWork>{}, std::forward<C>(c)
363  );
364  return Task(n);
365 }
366 
367 // Function: emplace
368 // emplaces a dynamic task
369 template <typename C>
370 std::enable_if_t<is_dynamic_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
371  auto n = _graph.emplace_back(
372  nstd::in_place_type_t<Node::DynamicWork>{}, std::forward<C>(c)
373  );
374  return Task(n);
375 }
376 
377 // Function: emplace
378 // emplaces a condition task
379 template <typename C>
380 std::enable_if_t<is_condition_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
381  auto n = _graph.emplace_back(
382  nstd::in_place_type_t<Node::ConditionWork>{}, std::forward<C>(c)
383  );
384  return Task(n);
385 }
386 
387 #ifdef TF_ENABLE_CUDA
388 // Function: emplace
389 // emplaces a cudaflow task
390 template <typename C>
391 std::enable_if_t<is_cudaflow_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
392  auto n = _graph.emplace_back(
393  nstd::in_place_type_t<Node::cudaFlowWork>{}, std::forward<C>(c)
394  );
395  return Task(n);
396 }
397 #endif
398 
399 // Function: composed_of
401  auto node = _graph.emplace_back(
402  nstd::in_place_type_t<Node::ModuleWork>{}, &taskflow
403  );
404  return Task(node);
405 }
406 
407 // Procedure: precede
408 inline void FlowBuilder::precede(Task from, Task to) {
409  from._node->_precede(to._node);
410 }
411 
412 // Procedure: broadcast
414  for(auto to : tos) {
415  from.precede(to);
416  }
417 }
418 
419 // Procedure: broadcast
421  for(auto to : tos) {
422  from.precede(to);
423  }
424 }
425 
426 // Function: succeed
427 inline void FlowBuilder::succeed(std::vector<Task>& froms, Task to) {
428  for(auto from : froms) {
429  to.succeed(from);
430  }
431 }
432 
433 // Function: succeed
435  for(auto from : froms) {
436  to.succeed(from);
437  }
438 }
439 
440 // Function: placeholder
442  auto node = _graph.emplace_back();
443  return Task(node);
444 }
445 
446 // Function: parallel_for
447 template <typename I, typename C>
449  I beg, I end, C&& c, size_t chunk
450 ){
451 
452  //using category = typename std::iterator_traits<I>::iterator_category;
453 
454  auto S = placeholder();
455  auto T = placeholder();
456 
457  // default partition equals to the worker count
458  if(chunk == 0) {
459  chunk = 1;
460  }
461 
462  size_t remain = std::distance(beg, end);
463 
464  while(beg != end) {
465 
466  auto e = beg;
467 
468  auto x = std::min(remain, chunk);
469  std::advance(e, x);
470  remain -= x;
471 
472  // Create a task
473  auto task = emplace([beg, e, c] () mutable {
474  std::for_each(beg, e, c);
475  });
476 
477  S.precede(task);
478  task.precede(T);
479 
480  // adjust the pointer
481  beg = e;
482  }
483 
484  // special case
485  if(S.num_successors() == 0) {
486  S.precede(T);
487  }
488 
489  return std::make_pair(S, T);
490 }
491 
492 // Function: parallel_for
493 template <
494  typename I,
495  typename C,
496  std::enable_if_t<std::is_integral<std::decay_t<I>>::value, void>*
497 >
498 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t chunk) {
499 
500  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
501  TF_THROW("invalid range [", beg, ", ", end, ") with step size ", s);
502  }
503 
504  // source and target
505  auto source = placeholder();
506  auto target = placeholder();
507 
508  if(chunk == 0) {
509  chunk = 1;
510  }
511 
512  // positive case
513  if(beg < end) {
514  while(beg != end) {
515  auto o = static_cast<I>(chunk) * s;
516  auto e = std::min(beg + o, end);
517  auto task = emplace([=] () mutable {
518  for(auto i=beg; i<e; i+=s) {
519  c(i);
520  }
521  });
522  source.precede(task);
523  task.precede(target);
524  beg = e;
525  }
526  }
527  // negative case
528  else if(beg > end) {
529  while(beg != end) {
530  auto o = static_cast<I>(chunk) * s;
531  auto e = std::max(beg + o, end);
532  auto task = emplace([=] () mutable {
533  for(auto i=beg; i>e; i+=s) {
534  c(i);
535  }
536  });
537  source.precede(task);
538  task.precede(target);
539  beg = e;
540  }
541  }
542 
543  if(source.num_successors() == 0) {
544  source.precede(target);
545  }
546 
547  return std::make_pair(source, target);
548 }
549 
550 // Function: parallel_for
551 template <typename I, typename C,
552  std::enable_if_t<std::is_floating_point<std::decay_t<I>>::value, void>*
553 >
554 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t chunk) {
555 
556  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
557  TF_THROW("invalid range [", beg, ", ", end, ") with step size ", s);
558  }
559 
560  // source and target
561  auto source = placeholder();
562  auto target = placeholder();
563 
564  if(chunk == 0) {
565  chunk = 1;
566  }
567 
568  // positive case
569  if(beg < end) {
570  size_t N=0;
571  I b = beg;
572  for(I e=beg; e<end; e+=s) {
573  if(++N == chunk) {
574  auto task = emplace([=] () mutable {
575  for(size_t i=0; i<N; ++i, b+=s) {
576  c(b);
577  }
578  });
579  source.precede(task);
580  task.precede(target);
581  N = 0;
582  b = e;
583  }
584  }
585 
586  if(N) {
587  auto task = emplace([=] () mutable {
588  for(size_t i=0; i<N; ++i, b+=s) {
589  c(b);
590  }
591  });
592  source.precede(task);
593  task.precede(target);
594  }
595  }
596  else if(beg > end) {
597  size_t N=0;
598  I b = beg;
599  for(I e=beg; e>end; e+=s) {
600  if(++N == chunk) {
601  auto task = emplace([=] () mutable {
602  for(size_t i=0; i<N; ++i, b+=s) {
603  c(b);
604  }
605  });
606  source.precede(task);
607  task.precede(target);
608  N = 0;
609  b = e;
610  }
611  }
612 
613  if(N) {
614  auto task = emplace([=] () mutable {
615  for(size_t i=0; i<N; ++i, b+=s) {
616  c(b);
617  }
618  });
619  source.precede(task);
620  task.precede(target);
621  }
622  }
623 
624  if(source.num_successors() == 0) {
625  source.precede(target);
626  }
627 
628  return std::make_pair(source, target);
629 }
630 
631 // Function: reduce_min
632 // Find the minimum element over a range of items.
633 template <typename I, typename T>
635  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
636  return std::min(l, r);
637  });
638 }
639 
640 // Function: reduce_max
641 // Find the maximum element over a range of items.
642 template <typename I, typename T>
644  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
645  return std::max(l, r);
646  });
647 }
648 
649 // Function: transform_reduce
650 template <typename I, typename T, typename B, typename U>
652  I beg, I end, T& result, B&& bop, U&& uop
653 ) {
654 
655  //using category = typename std::iterator_traits<I>::iterator_category;
656 
657  // Even partition
658  size_t d = std::distance(beg, end);
659  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
660  size_t g = std::max((d + w - 1) / w, size_t{2});
661 
662  auto source = placeholder();
663  auto target = placeholder();
664 
665  //std::vector<std::future<T>> futures;
666  auto g_results = std::make_unique<T[]>(w);
667  size_t id {0};
668 
669  size_t remain = d;
670 
671  while(beg != end) {
672 
673  auto e = beg;
674 
675  size_t x = std::min(remain, g);
676  std::advance(e, x);
677  remain -= x;
678 
679  // Create a task
680  auto task = emplace([beg, e, bop, uop, res=&(g_results[id])] () mutable {
681  *res = uop(*beg);
682  for(++beg; beg != e; ++beg) {
683  *res = bop(std::move(*res), uop(*beg));
684  }
685  });
686 
687  source.precede(task);
688  task.precede(target);
689 
690  // adjust the pointer
691  beg = e;
692  id ++;
693  }
694 
695  // target synchronizer
696  target.work([&result, bop, res=make_moc(std::move(g_results)), w=id] () {
697  for(auto i=0u; i<w; i++) {
698  result = bop(std::move(result), res.object[i]);
699  }
700  });
701 
702  return std::make_pair(source, target);
703 }
704 
705 // Function: transform_reduce
706 template <typename I, typename T, typename B, typename P, typename U>
708  I beg, I end, T& result, B&& bop, P&& pop, U&& uop
709 ) {
710 
711  //using category = typename std::iterator_traits<I>::iterator_category;
712 
713  // Even partition
714  size_t d = std::distance(beg, end);
715  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
716  size_t g = std::max((d + w - 1) / w, size_t{2});
717 
718  auto source = placeholder();
719  auto target = placeholder();
720 
721  auto g_results = std::make_unique<T[]>(w);
722 
723  size_t id {0};
724  size_t remain = d;
725 
726  while(beg != end) {
727 
728  auto e = beg;
729 
730  size_t x = std::min(remain, g);
731  std::advance(e, x);
732  remain -= x;
733 
734  // Create a task
735  auto task = emplace([beg, e, uop, pop, res= &g_results[id]] () mutable {
736  *res = uop(*beg);
737  for(++beg; beg != e; ++beg) {
738  *res = pop(std::move(*res), *beg);
739  }
740  });
741  source.precede(task);
742  task.precede(target);
743 
744  // adjust the pointer
745  beg = e;
746  id ++;
747  }
748 
749  // target synchronizer
750  target.work([&result, bop, g_results=make_moc(std::move(g_results)), w=id] () {
751  for(auto i=0u; i<w; i++) {
752  result = bop(std::move(result), std::move(g_results.object[i]));
753  }
754  });
755 
756  return std::make_pair(source, target);
757 }
758 
759 // Procedure: _linearize
760 template <typename L>
761 void FlowBuilder::_linearize(L& keys) {
762 
763  auto itr = keys.begin();
764  auto end = keys.end();
765 
766  if(itr == end) {
767  return;
768  }
769 
770  auto nxt = itr;
771 
772  for(++nxt; nxt != end; ++nxt, ++itr) {
773  itr->_node->_precede(nxt->_node);
774  }
775 }
776 
777 // Procedure: linearize
779  _linearize(keys);
780 }
781 
782 // Procedure: linearize
784  _linearize(keys);
785 }
786 
787 // Proceduer: reduce
788 template <typename I, typename T, typename B>
789 std::pair<Task, Task> FlowBuilder::reduce(I beg, I end, T& result, B&& op) {
790 
791  //using category = typename std::iterator_traits<I>::iterator_category;
792 
793  size_t d = std::distance(beg, end);
794  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
795  size_t g = std::max((d + w - 1) / w, size_t{2});
796 
797  auto source = placeholder();
798  auto target = placeholder();
799 
800  //T* g_results = static_cast<T*>(malloc(sizeof(T)*w));
801  auto g_results = std::make_unique<T[]>(w);
802  //std::vector<std::future<T>> futures;
803 
804  size_t id {0};
805  size_t remain = d;
806 
807  while(beg != end) {
808 
809  auto e = beg;
810 
811  size_t x = std::min(remain, g);
812  std::advance(e, x);
813  remain -= x;
814 
815  // Create a task
816  //auto [task, future] = emplace([beg, e, op] () mutable {
817  auto task = emplace([beg, e, op, res = &g_results[id]] () mutable {
818  *res = *beg;
819  for(++beg; beg != e; ++beg) {
820  *res = op(std::move(*res), *beg);
821  }
822  //auto init = *beg;
823  //for(++beg; beg != e; ++beg) {
824  // init = op(std::move(init), *beg);
825  //}
826  //return init;
827  });
828  source.precede(task);
829  task.precede(target);
830  //futures.push_back(std::move(future));
831 
832  // adjust the pointer
833  beg = e;
834  id ++;
835  }
836 
837  // target synchronizer
838  //target.work([&result, futures=MoC{std::move(futures)}, op] () {
839  // for(auto& fu : futures.object) {
840  // result = op(std::move(result), fu.get());
841  // }
842  //});
843  target.work([g_results=make_moc(std::move(g_results)), &result, op, w=id] () {
844  for(auto i=0u; i<w; i++) {
845  result = op(std::move(result), g_results.object[i]);
846  }
847  });
848 
849  return std::make_pair(source, target);
850 }
851 
852 // ----------------------------------------------------------------------------
853 
860 class Subflow : public FlowBuilder {
861 
862  public:
863 
867  template <typename... Args>
868  Subflow(Args&&... args);
869 
873  void join();
874 
878  void detach();
879 
883  bool detached() const;
884 
888  bool joined() const;
889 
890  private:
891 
892  bool _detached {false};
893 };
894 
895 // Constructor
896 template <typename... Args>
897 Subflow::Subflow(Args&&... args) :
898  FlowBuilder {std::forward<Args>(args)...} {
899 }
900 
901 // Procedure: join
902 inline void Subflow::join() {
903  _detached = false;
904 }
905 
906 // Procedure: detach
907 inline void Subflow::detach() {
908  _detached = true;
909 }
910 
911 // Function: detached
912 inline bool Subflow::detached() const {
913  return _detached;
914 }
915 
916 // Function: joined
917 inline bool Subflow::joined() const {
918  return !_detached;
919 }
920 
921 
922 // ----------------------------------------------------------------------------
923 // Legacy code
924 // ----------------------------------------------------------------------------
925 
926 using SubflowBuilder = Subflow;
927 
928 } // end of namespace tf. ---------------------------------------------------
929 
930 
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition: flow_builder.hpp:778
void broadcast(Task A, std::vector< Task > &others)
adds dependency links from one task A to many tasks
Definition: flow_builder.hpp:413
std::pair< Task, Task > transform_reduce(I beg, I end, T &result, B &&bop, U &&uop)
constructs a task dependency graph of parallel transformation and reduction
Definition: flow_builder.hpp:651
T end(T... args)
Definition: error.hpp:9
void succeed(std::vector< Task > &others, Task A)
adds dependency links from many tasks to one task A
Definition: flow_builder.hpp:427
T hardware_concurrency(T... args)
Task placeholder()
creates an empty task
Definition: flow_builder.hpp:441
Subflow(Args &&... args)
constructs a subflow builder object
Definition: flow_builder.hpp:897
void detach()
enables the subflow to detach from its parent task
Definition: flow_builder.hpp:907
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:912
std::pair< Task, Task > reduce_max(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::max
Definition: flow_builder.hpp:643
Task & succeed(Ts &&... tasks)
adds precedence links from other tasks to this
Definition: task.hpp:313
std::enable_if_t< is_static_task_v< C >, Task > emplace(C &&callable)
creates a static task from a given callable object
Definition: flow_builder.hpp:360
std::pair< Task, Task > parallel_for(I beg, I end, C &&callable, size_t chunk=1)
constructs a task dependency graph of range-based parallel_for
Definition: flow_builder.hpp:448
Task composed_of(Taskflow &taskflow)
creates a module task from a taskflow
Definition: flow_builder.hpp:400
void precede(Task A, Task B)
adds a dependency link from task A to task B
Definition: flow_builder.hpp:408
T make_pair(T... args)
main entry to create a task dependency graph
Definition: taskflow.hpp:18
FlowBuilder(Graph &graph)
constructs a flow builder object
Definition: flow_builder.hpp:347
building methods of a task dependency graph
Definition: flow_builder.hpp:13
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:917
handle to a node in a task dependency graph
Definition: task.hpp:68
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition: task.hpp:290
std::pair< Task, Task > reduce(I beg, I end, T &result, B &&bop)
construct a task dependency graph of parallel reduction
Definition: flow_builder.hpp:789
building methods of a subflow graph in dynamic tasking
Definition: flow_builder.hpp:860
void join()
enables the subflow to join its parent task
Definition: flow_builder.hpp:902
std::pair< Task, Task > reduce_min(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::min
Definition: flow_builder.hpp:634