Cpp-Taskflow  2.1.0
flow_builder.hpp
1 #pragma once
2 
3 #include "task.hpp"
4 
5 namespace tf {
6 
13 class FlowBuilder {
14 
15  public:
16 
17  FlowBuilder(Graph&);
18 
28  template <typename C>
29  auto emplace(C&& callable);
30 
40  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
41  auto emplace(C&&... callables);
42 
46  template <typename C>
47  auto silent_emplace(C&& callable);
48 
52  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
53  auto silent_emplace(C&&... callables);
54 
72  template <typename I, typename C>
73  std::pair<Task, Task> parallel_for(I beg, I end, C&& callable, size_t chunk = 0);
74 
92  template <typename I, typename C, std::enable_if_t<std::is_arithmetic_v<I>, void>* = nullptr >
93  std::pair<Task, Task> parallel_for(I beg, I end, I step, C&& callable, size_t chunk = 0);
94 
112  template <typename I, typename T, typename B>
113  std::pair<Task, Task> reduce(I beg, I end, T& result, B&& bop);
114 
130  template <typename I, typename T>
131  std::pair<Task, Task> reduce_min(I beg, I end, T& result);
132 
148  template <typename I, typename T>
149  std::pair<Task, Task> reduce_max(I beg, I end, T& result);
150 
172  template <typename I, typename T, typename B, typename U>
173  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop, U&& uop);
174 
199  template <typename I, typename T, typename B, typename P, typename U>
200  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop1, P&& bop2, U&& uop);
201 
207  Task placeholder();
208 
215  void precede(Task A, Task B);
216 
222  void linearize(std::vector<Task>& tasks);
223 
230 
237  void broadcast(Task A, std::vector<Task>& others);
238 
246 
253  void gather(std::vector<Task>& others, Task A);
254 
261  void gather(std::initializer_list<Task> others, Task A);
262 
263  private:
264 
265  Graph& _graph;
266 
267  template <typename L>
268  void _linearize(L&);
269 
270  template <typename I>
271  size_t _estimate_chunk_size(I, I, I);
272 };
273 
274 // Constructor
275 inline FlowBuilder::FlowBuilder(Graph& graph) :
276  _graph {graph} {
277 }
278 
279 // Procedure: precede
280 inline void FlowBuilder::precede(Task from, Task to) {
281  from._node->precede(*(to._node));
282 }
283 
284 // Procedure: broadcast
286  from.precede(keys);
287 }
288 
289 // Procedure: broadcast
291  from.precede(keys);
292 }
293 
294 // Function: gather
296  to.gather(keys);
297 }
298 
299 // Function: gather
301  to.gather(keys);
302 }
303 
304 // Function: placeholder
306  auto& node = _graph.emplace_back();
307  return Task(node);
308 }
309 
310 // Function: parallel_for
311 template <typename I, typename C>
312 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, C&& c, size_t g) {
313 
314  using category = typename std::iterator_traits<I>::iterator_category;
315 
316  if(g == 0) {
317  auto d = std::distance(beg, end);
318  auto w = std::max(unsigned{1}, std::thread::hardware_concurrency());
319  g = (d + w - 1) / w;
320  }
321 
322  auto source = placeholder();
323  auto target = placeholder();
324 
325  while(beg != end) {
326 
327  auto e = beg;
328 
329  // Case 1: random access iterator
330  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
331  size_t r = std::distance(beg, end);
332  std::advance(e, std::min(r, g));
333  }
334  // Case 2: non-random access iterator
335  else {
336  for(size_t i=0; i<g && e != end; ++e, ++i);
337  }
338 
339  // Create a task
340  auto task = emplace([beg, e, c] () mutable {
341  std::for_each(beg, e, c);
342  });
343  source.precede(task);
344  task.precede(target);
345 
346  // adjust the pointer
347  beg = e;
348  }
349 
350  return std::make_pair(source, target);
351 }
352 
353 // Function: parallel_for
354 template <
355  typename I,
356  typename C,
357  std::enable_if_t<std::is_arithmetic_v<I>, void>*
358 >
359 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t g) {
360 
361  using T = std::decay_t<I>;
362 
363  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
364  TF_THROW(Error::FLOW_BUILDER,
365  "invalid range [", beg, ", ", end, ") with step size ", s
366  );
367  }
368 
369  auto source = placeholder();
370  auto target = placeholder();
371 
372  if(g == 0) {
373  g = _estimate_chunk_size(beg, end, s);
374  }
375 
376  // Integer indices
377  if constexpr(std::is_integral_v<T>) {
378 
379  auto offset = static_cast<T>(g) * s;
380 
381  // positive case
382  if(beg < end) {
383  while(beg != end) {
384  auto e = std::min(beg + offset, end);
385  auto task = emplace([=] () mutable {
386  for(auto i=beg; i<e; i+=s) {
387  c(i);
388  }
389  });
390  source.precede(task);
391  task.precede(target);
392  beg = e;
393  }
394  }
395  // negative case
396  else if(beg > end) {
397  while(beg != end) {
398  auto e = std::max(beg + offset, end);
399  auto task = emplace([=] () mutable {
400  for(auto i=beg; i>e; i+=s) {
401  c(i);
402  }
403  });
404  source.precede(task);
405  task.precede(target);
406  beg = e;
407  }
408  }
409  }
410  // We enumerate the entire sequence to avoid floating error
411  else if constexpr(std::is_floating_point_v<T>) {
412  size_t N = 0;
413  auto B = beg;
414  for(auto i=beg; (beg<end ? i<end : i>end); i+=s, ++N) {
415  if(N == g) {
416  auto task = emplace([=] () mutable {
417  auto b = B;
418  for(size_t n=0; n<N; ++n) {
419  c(b);
420  b += s;
421  }
422  });
423  N = 0;
424  B = i;
425  source.precede(task);
426  task.precede(target);
427  }
428  }
429 
430  // the last pices
431  if(N != 0) {
432  auto task = emplace([=] () mutable {
433  auto b = B;
434  for(size_t n=0; n<N; ++n) {
435  c(b);
436  b += s;
437  }
438  });
439  source.precede(task);
440  task.precede(target);
441  }
442  }
443 
444  return std::make_pair(source, target);
445 }
446 
447 // Function: reduce_min
448 // Find the minimum element over a range of items.
449 template <typename I, typename T>
451  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
452  return std::min(l, r);
453  });
454 }
455 
456 // Function: reduce_max
457 // Find the maximum element over a range of items.
458 template <typename I, typename T>
460  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
461  return std::max(l, r);
462  });
463 }
464 
465 // Function: transform_reduce
466 template <typename I, typename T, typename B, typename U>
467 std::pair<Task, Task> FlowBuilder::transform_reduce(I beg, I end, T& result, B&& bop, U&& uop) {
468 
469  using category = typename std::iterator_traits<I>::iterator_category;
470 
471  // Even partition
472  size_t d = std::distance(beg, end);
473  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
474  size_t g = std::max((d + w - 1) / w, size_t{2});
475 
476  auto source = placeholder();
477  auto target = placeholder();
478 
479  //std::vector<std::future<T>> futures;
480  auto g_results = std::make_unique<T[]>(w);
481  size_t id {0};
482 
483  while(beg != end) {
484 
485  auto e = beg;
486 
487  // Case 1: random access iterator
488  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
489  size_t r = std::distance(beg, end);
490  std::advance(e, std::min(r, g));
491  }
492  // Case 2: non-random access iterator
493  else {
494  for(size_t i=0; i<g && e != end; ++e, ++i);
495  }
496 
497  // Create a task
498  auto task = emplace([beg, e, bop, uop, res=&(g_results[id])] () mutable {
499  *res = uop(*beg);
500  for(++beg; beg != e; ++beg) {
501  *res = bop(std::move(*res), uop(*beg));
502  }
503  });
504 
505  source.precede(task);
506  task.precede(target);
507 
508  // adjust the pointer
509  beg = e;
510  id ++;
511  }
512 
513  // target synchronizer
514  target.work([&result, bop, res=MoC{std::move(g_results)}, w=id] () {
515  for(auto i=0u; i<w; i++) {
516  result = bop(std::move(result), res.object[i]);
517  }
518  });
519 
520  return std::make_pair(source, target);
521 }
522 
523 // Function: transform_reduce
524 template <typename I, typename T, typename B, typename P, typename U>
526  I beg, I end, T& result, B&& bop, P&& pop, U&& uop
527 ) {
528 
529  using category = typename std::iterator_traits<I>::iterator_category;
530 
531  // Even partition
532  size_t d = std::distance(beg, end);
533  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
534  size_t g = std::max((d + w - 1) / w, size_t{2});
535 
536  auto source = placeholder();
537  auto target = placeholder();
538 
539  auto g_results = std::make_unique<T[]>(w);
540 
541  size_t id {0};
542  while(beg != end) {
543 
544  auto e = beg;
545 
546  // Case 1: random access iterator
547  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
548  size_t r = std::distance(beg, end);
549  std::advance(e, std::min(r, g));
550  }
551  // Case 2: non-random access iterator
552  else {
553  for(size_t i=0; i<g && e != end; ++e, ++i);
554  }
555 
556  // Create a task
557  auto task = emplace([beg, e, uop, pop, res= &g_results[id]] () mutable {
558  *res = uop(*beg);
559  for(++beg; beg != e; ++beg) {
560  *res = pop(std::move(*res), *beg);
561  }
562  });
563  //auto [task, future] = emplace([beg, e, uop, pop] () mutable {
564  // auto init = uop(*beg);
565  // for(++beg; beg != e; ++beg) {
566  // init = pop(std::move(init), *beg);
567  // }
568  // return init;
569  //});
570  source.precede(task);
571  task.precede(target);
572  //futures.push_back(std::move(future));
573 
574  // adjust the pointer
575  beg = e;
576  id ++;
577  }
578 
579  // target synchronizer
580  target.work([&result, bop, g_results=MoC{std::move(g_results)}, w=id] () {
581  for(auto i=0u; i<w; i++) {
582  result = bop(std::move(result), std::move(g_results.object[i]));
583  }
584  });
585  //target.work([&result, futures=MoC{std::move(futures)}, bop] () {
586  // for(auto& fu : futures.object) {
587  // result = bop(std::move(result), fu.get());
588  // }
589  //});
590 
591  return std::make_pair(source, target);
592 }
593 
594 // Function: _estimate_chunk_size
595 template <typename I>
596 size_t FlowBuilder::_estimate_chunk_size(I beg, I end, I step) {
597 
598  using T = std::decay_t<I>;
599 
600  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
601  size_t N = 0;
602 
603  if constexpr(std::is_integral_v<T>) {
604  if(beg <= end) {
605  N = (end - beg + step - 1) / step;
606  }
607  else {
608  N = (end - beg + step + 1) / step;
609  }
610  }
611  else if constexpr(std::is_floating_point_v<T>) {
612  N = static_cast<size_t>(std::ceil((end - beg) / step));
613  }
614  else {
615  static_assert(dependent_false_v<T>, "can't deduce chunk size");
616  }
617 
618  return (N + w - 1) / w;
619 }
620 
621 
622 // Procedure: _linearize
623 template <typename L>
624 void FlowBuilder::_linearize(L& keys) {
625 
626  auto itr = keys.begin();
627  auto end = keys.end();
628 
629  if(itr == end) {
630  return;
631  }
632 
633  auto nxt = itr;
634 
635  for(++nxt; nxt != end; ++nxt, ++itr) {
636  itr->_node->precede(*(nxt->_node));
637  }
638 }
639 
640 // Procedure: linearize
642  _linearize(keys);
643 }
644 
645 // Procedure: linearize
647  _linearize(keys);
648 }
649 
650 // Proceduer: reduce
651 template <typename I, typename T, typename B>
652 std::pair<Task, Task> FlowBuilder::reduce(I beg, I end, T& result, B&& op) {
653 
654  using category = typename std::iterator_traits<I>::iterator_category;
655 
656  size_t d = std::distance(beg, end);
657  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
658  size_t g = std::max((d + w - 1) / w, size_t{2});
659 
660  auto source = placeholder();
661  auto target = placeholder();
662 
663  //T* g_results = static_cast<T*>(malloc(sizeof(T)*w));
664  auto g_results = std::make_unique<T[]>(w);
665  //std::vector<std::future<T>> futures;
666 
667  size_t id {0};
668  while(beg != end) {
669 
670  auto e = beg;
671 
672  // Case 1: random access iterator
673  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
674  size_t r = std::distance(beg, end);
675  std::advance(e, std::min(r, g));
676  }
677  // Case 2: non-random access iterator
678  else {
679  for(size_t i=0; i<g && e != end; ++e, ++i);
680  }
681 
682  // Create a task
683  //auto [task, future] = emplace([beg, e, op] () mutable {
684  auto task = emplace([beg, e, op, res = &g_results[id]] () mutable {
685  *res = *beg;
686  for(++beg; beg != e; ++beg) {
687  *res = op(std::move(*res), *beg);
688  }
689  //auto init = *beg;
690  //for(++beg; beg != e; ++beg) {
691  // init = op(std::move(init), *beg);
692  //}
693  //return init;
694  });
695  source.precede(task);
696  task.precede(target);
697  //futures.push_back(std::move(future));
698 
699  // adjust the pointer
700  beg = e;
701  id ++;
702  }
703 
704  // target synchronizer
705  //target.work([&result, futures=MoC{std::move(futures)}, op] () {
706  // for(auto& fu : futures.object) {
707  // result = op(std::move(result), fu.get());
708  // }
709  //});
710  target.work([g_results=MoC{std::move(g_results)}, &result, op, w=id] () {
711  for(auto i=0u; i<w; i++) {
712  result = op(std::move(result), g_results.object[i]);
713  }
714  });
715 
716  return std::make_pair(source, target);
717 }
718 
719 // ----------------------------------------------------------------------------
720 
726 class SubflowBuilder : public FlowBuilder {
727 
728  public:
729 
730  template <typename... Args>
731  SubflowBuilder(Args&&...);
732 
736  void join();
737 
741  void detach();
742 
746  bool detached() const;
747 
751  bool joined() const;
752 
753  private:
754 
755  bool _detached {false};
756 };
757 
758 // Constructor
759 template <typename... Args>
760 SubflowBuilder::SubflowBuilder(Args&&... args) :
761  FlowBuilder {std::forward<Args>(args)...} {
762 }
763 
764 // Procedure: join
765 inline void SubflowBuilder::join() {
766  _detached = false;
767 }
768 
769 // Procedure: detach
770 inline void SubflowBuilder::detach() {
771  _detached = true;
772 }
773 
774 // Function: detached
775 inline bool SubflowBuilder::detached() const {
776  return _detached;
777 }
778 
779 // Function: joined
780 inline bool SubflowBuilder::joined() const {
781  return !_detached;
782 }
783 
784 // -----
785 
786 // Function: emplace
787 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
788 auto FlowBuilder::emplace(C&&... cs) {
789  return std::make_tuple(emplace(std::forward<C>(cs))...);
790 }
791 
792 // Function: emplace
793 template <typename C>
794 auto FlowBuilder::emplace(C&& c) {
795  // dynamic tasking
796  if constexpr(std::is_invocable_v<C, SubflowBuilder&>) {
797  auto& n = _graph.emplace_back(
798  [c=std::forward<C>(c)] (SubflowBuilder& fb) mutable {
799  // first time execution
800  if(fb._graph.empty()) {
801  c(fb);
802  }
803  });
804  return Task(n);
805  }
806  // static tasking
807  else if constexpr(std::is_invocable_v<C>) {
808  auto& n = _graph.emplace_back(std::forward<C>(c));
809  return Task(n);
810  }
811  else {
812  static_assert(dependent_false_v<C>, "invalid task work type");
813  }
814 }
815 
816 // Function: silent_emplace
817 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
819  return std::make_tuple(emplace(std::forward<C>(cs))...);
820 }
821 
822 // Function: silent_emplace
823 template <typename C>
825  return emplace(std::forward<C>(c));
826 }
827 
828 // ---------- deprecated ----------
829 
831 //template <typename C>
832 //auto FlowBuilder::emplace(C&& c) {
833 // // subflow task
834 // if constexpr(std::is_invocable_v<C, SubflowBuilder&>) {
835 //
836 // using R = std::invoke_result_t<C, SubflowBuilder&>;
837 // std::promise<R> p;
838 // auto fu = p.get_future();
839 //
840 // if constexpr(std::is_same_v<void, R>) {
841 // auto& node = _graph.emplace_back([p=MoC(std::move(p)), c=std::forward<C>(c)]
842 // (SubflowBuilder& fb) mutable {
843 // if(fb._graph.empty()) {
844 // c(fb);
845 // // if subgraph is detached or empty after invoked
846 // if(fb.detached() || fb._graph.empty()) {
847 // p.get().set_value();
848 // }
849 // }
850 // else {
851 // p.get().set_value();
852 // }
853 // });
854 // return std::make_pair(Task(node), std::move(fu));
855 // }
856 // else {
857 // auto& node = _graph.emplace_back(
858 // [p=MoC(std::move(p)), c=std::forward<C>(c), r=std::optional<R>()]
859 // (SubflowBuilder& fb) mutable {
860 // if(fb._graph.empty()) {
861 // r.emplace(c(fb));
862 // if(fb.detached() || fb._graph.empty()) {
863 // p.get().set_value(std::move(*r));
864 // }
865 // }
866 // else {
867 // assert(r);
868 // p.get().set_value(std::move(*r));
869 // }
870 // });
871 // return std::make_pair(Task(node), std::move(fu));
872 // }
873 // }
874 // // regular task
875 // else if constexpr(std::is_invocable_v<C>) {
876 //
877 // using R = std::invoke_result_t<C>;
878 // std::promise<R> p;
879 // auto fu = p.get_future();
880 //
881 // if constexpr(std::is_same_v<void, R>) {
882 // auto& node = _graph.emplace_back(
883 // [p=MoC(std::move(p)), c=std::forward<C>(c)]() mutable {
884 // c();
885 // p.get().set_value();
886 // }
887 // );
888 // return std::make_pair(Task(node), std::move(fu));
889 // }
890 // else {
891 // auto& node = _graph.emplace_back(
892 // [p=MoC(std::move(p)), c=std::forward<C>(c)]() mutable {
893 // p.get().set_value(c());
894 // }
895 // );
896 // return std::make_pair(Task(node), std::move(fu));
897 // }
898 // }
899 // else {
900 // static_assert(dependent_false_v<C>, "invalid task work type");
901 // }
902 //}
903 
904 } // end of namespace tf. ---------------------------------------------------
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:780
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition: flow_builder.hpp:641
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:775
void detach()
enables the subflow to detach from its parent task
Definition: flow_builder.hpp:770
T ceil(T... args)
void broadcast(Task A, std::vector< Task > &others)
adds dependency links from one task A to many tasks
Definition: flow_builder.hpp:285
std::pair< Task, Task > transform_reduce(I beg, I end, T &result, B &&bop, U &&uop)
constructs a task dependency graph of parallel transformation and reduction
Definition: flow_builder.hpp:467
Task & gather(Ts &&... tasks)
adds precedence links from other tasks to this
Definition: task.hpp:224
T end(T... args)
Definition: taskflow.hpp:6
auto silent_emplace(C &&callable)
the same as tf::FlowBuilder::emplace (starting at 2.1.0)
Definition: flow_builder.hpp:824
T hardware_concurrency(T... args)
Task placeholder()
creates an empty task
Definition: flow_builder.hpp:305
std::pair< Task, Task > reduce_max(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::max
Definition: flow_builder.hpp:459
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:726
std::pair< Task, Task > parallel_for(I beg, I end, C &&callable, size_t chunk=0)
constructs a task dependency graph of range-based parallel_for
Definition: flow_builder.hpp:312
auto emplace(C &&callable)
creates a task from a given callable object
Definition: flow_builder.hpp:794
void precede(Task A, Task B)
adds a dependency link from task A to task B
Definition: flow_builder.hpp:280
void gather(std::vector< Task > &others, Task A)
adds dependency links from many tasks to one task A
Definition: flow_builder.hpp:295
The building blocks of task dependency graphs.
Definition: flow_builder.hpp:13
void join()
enables the subflow to join its parent task
Definition: flow_builder.hpp:765
Handle to modify and access a task.
Definition: task.hpp:18
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition: task.hpp:205
std::pair< Task, Task > reduce(I beg, I end, T &result, B &&bop)
construct a task dependency graph of parallel reduction
Definition: flow_builder.hpp:652
std::pair< Task, Task > reduce_min(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::min
Definition: flow_builder.hpp:450