Cpp-Taskflow  2.2.0
flow_builder.hpp
1 #pragma once
2 
3 #include "task.hpp"
4 
5 namespace tf {
6 
13 class FlowBuilder {
14 
15  public:
16 
22  FlowBuilder(Graph& graph);
23 
33  template <typename C>
34  Task emplace(C&& callable);
35 
45  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
46  auto emplace(C&&... callables);
47 
51  template <typename C>
52  Task silent_emplace(C&& callable);
53 
57  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
58  auto silent_emplace(C&&... callables);
59 
77  template <typename I, typename C>
78  std::pair<Task, Task> parallel_for(I beg, I end, C&& callable, size_t chunk = 0);
79 
97  template <typename I, typename C, std::enable_if_t<std::is_arithmetic_v<I>, void>* = nullptr >
98  std::pair<Task, Task> parallel_for(I beg, I end, I step, C&& callable, size_t chunk = 0);
99 
117  template <typename I, typename T, typename B>
118  std::pair<Task, Task> reduce(I beg, I end, T& result, B&& bop);
119 
135  template <typename I, typename T>
136  std::pair<Task, Task> reduce_min(I beg, I end, T& result);
137 
153  template <typename I, typename T>
154  std::pair<Task, Task> reduce_max(I beg, I end, T& result);
155 
177  template <typename I, typename T, typename B, typename U>
178  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop, U&& uop);
179 
204  template <typename I, typename T, typename B, typename P, typename U>
205  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop1, P&& bop2, U&& uop);
206 
212  Task placeholder();
213 
220  void precede(Task A, Task B);
221 
227  void linearize(std::vector<Task>& tasks);
228 
235 
242  void broadcast(Task A, std::vector<Task>& others);
243 
251 
258  void gather(std::vector<Task>& others, Task A);
259 
266  void gather(std::initializer_list<Task> others, Task A);
267 
268  private:
269 
270  Graph& _graph;
271 
272  template <typename L>
273  void _linearize(L&);
274 
275  template <typename I>
276  size_t _estimate_chunk_size(I, I, I);
277 };
278 
279 // Constructor
280 inline FlowBuilder::FlowBuilder(Graph& graph) :
281  _graph {graph} {
282 }
283 
284 // Procedure: precede
285 inline void FlowBuilder::precede(Task from, Task to) {
286  from._node->precede(*(to._node));
287 }
288 
289 // Procedure: broadcast
291  from.precede(keys);
292 }
293 
294 // Procedure: broadcast
296  from.precede(keys);
297 }
298 
299 // Function: gather
301  to.gather(keys);
302 }
303 
304 // Function: gather
306  to.gather(keys);
307 }
308 
309 // Function: placeholder
311  auto& node = _graph.emplace_back();
312  return Task(node);
313 }
314 
315 // Function: parallel_for
316 template <typename I, typename C>
317 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, C&& c, size_t g) {
318 
319  using category = typename std::iterator_traits<I>::iterator_category;
320 
321  if(g == 0) {
322  auto d = std::distance(beg, end);
323  auto w = std::max(unsigned{1}, std::thread::hardware_concurrency());
324  g = (d + w - 1) / w;
325  }
326 
327  auto source = placeholder();
328  auto target = placeholder();
329 
330  while(beg != end) {
331 
332  auto e = beg;
333 
334  // Case 1: random access iterator
335  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
336  size_t r = std::distance(beg, end);
337  std::advance(e, std::min(r, g));
338  }
339  // Case 2: non-random access iterator
340  else {
341  for(size_t i=0; i<g && e != end; ++e, ++i);
342  }
343 
344  // Create a task
345  auto task = emplace([beg, e, c] () mutable {
346  std::for_each(beg, e, c);
347  });
348  source.precede(task);
349  task.precede(target);
350 
351  // adjust the pointer
352  beg = e;
353  }
354 
355  return std::make_pair(source, target);
356 }
357 
358 // Function: parallel_for
359 template <
360  typename I,
361  typename C,
362  std::enable_if_t<std::is_arithmetic_v<I>, void>*
363 >
364 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t g) {
365 
366  using T = std::decay_t<I>;
367 
368  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
369  TF_THROW(Error::TASKFLOW,
370  "invalid range [", beg, ", ", end, ") with step size ", s
371  );
372  }
373 
374  auto source = placeholder();
375  auto target = placeholder();
376 
377  if(g == 0) {
378  g = _estimate_chunk_size(beg, end, s);
379  }
380 
381  // Integer indices
382  if constexpr(std::is_integral_v<T>) {
383 
384  auto offset = static_cast<T>(g) * s;
385 
386  // positive case
387  if(beg < end) {
388  while(beg != end) {
389  auto e = std::min(beg + offset, end);
390  auto task = emplace([=] () mutable {
391  for(auto i=beg; i<e; i+=s) {
392  c(i);
393  }
394  });
395  source.precede(task);
396  task.precede(target);
397  beg = e;
398  }
399  }
400  // negative case
401  else if(beg > end) {
402  while(beg != end) {
403  auto e = std::max(beg + offset, end);
404  auto task = emplace([=] () mutable {
405  for(auto i=beg; i>e; i+=s) {
406  c(i);
407  }
408  });
409  source.precede(task);
410  task.precede(target);
411  beg = e;
412  }
413  }
414  }
415  // We enumerate the entire sequence to avoid floating error
416  else if constexpr(std::is_floating_point_v<T>) {
417  size_t N = 0;
418  auto B = beg;
419  for(auto i=beg; (beg<end ? i<end : i>end); i+=s, ++N) {
420  if(N == g) {
421  auto task = emplace([=] () mutable {
422  auto b = B;
423  for(size_t n=0; n<N; ++n) {
424  c(b);
425  b += s;
426  }
427  });
428  N = 0;
429  B = i;
430  source.precede(task);
431  task.precede(target);
432  }
433  }
434 
435  // the last pices
436  if(N != 0) {
437  auto task = emplace([=] () mutable {
438  auto b = B;
439  for(size_t n=0; n<N; ++n) {
440  c(b);
441  b += s;
442  }
443  });
444  source.precede(task);
445  task.precede(target);
446  }
447  }
448 
449  return std::make_pair(source, target);
450 }
451 
452 // Function: reduce_min
453 // Find the minimum element over a range of items.
454 template <typename I, typename T>
456  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
457  return std::min(l, r);
458  });
459 }
460 
461 // Function: reduce_max
462 // Find the maximum element over a range of items.
463 template <typename I, typename T>
465  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
466  return std::max(l, r);
467  });
468 }
469 
470 // Function: transform_reduce
471 template <typename I, typename T, typename B, typename U>
472 std::pair<Task, Task> FlowBuilder::transform_reduce(I beg, I end, T& result, B&& bop, U&& uop) {
473 
474  using category = typename std::iterator_traits<I>::iterator_category;
475 
476  // Even partition
477  size_t d = std::distance(beg, end);
478  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
479  size_t g = std::max((d + w - 1) / w, size_t{2});
480 
481  auto source = placeholder();
482  auto target = placeholder();
483 
484  //std::vector<std::future<T>> futures;
485  auto g_results = std::make_unique<T[]>(w);
486  size_t id {0};
487 
488  while(beg != end) {
489 
490  auto e = beg;
491 
492  // Case 1: random access iterator
493  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
494  size_t r = std::distance(beg, end);
495  std::advance(e, std::min(r, g));
496  }
497  // Case 2: non-random access iterator
498  else {
499  for(size_t i=0; i<g && e != end; ++e, ++i);
500  }
501 
502  // Create a task
503  auto task = emplace([beg, e, bop, uop, res=&(g_results[id])] () mutable {
504  *res = uop(*beg);
505  for(++beg; beg != e; ++beg) {
506  *res = bop(std::move(*res), uop(*beg));
507  }
508  });
509 
510  source.precede(task);
511  task.precede(target);
512 
513  // adjust the pointer
514  beg = e;
515  id ++;
516  }
517 
518  // target synchronizer
519  target.work([&result, bop, res=MoC{std::move(g_results)}, w=id] () {
520  for(auto i=0u; i<w; i++) {
521  result = bop(std::move(result), res.object[i]);
522  }
523  });
524 
525  return std::make_pair(source, target);
526 }
527 
528 // Function: transform_reduce
529 template <typename I, typename T, typename B, typename P, typename U>
531  I beg, I end, T& result, B&& bop, P&& pop, U&& uop
532 ) {
533 
534  using category = typename std::iterator_traits<I>::iterator_category;
535 
536  // Even partition
537  size_t d = std::distance(beg, end);
538  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
539  size_t g = std::max((d + w - 1) / w, size_t{2});
540 
541  auto source = placeholder();
542  auto target = placeholder();
543 
544  auto g_results = std::make_unique<T[]>(w);
545 
546  size_t id {0};
547  while(beg != end) {
548 
549  auto e = beg;
550 
551  // Case 1: random access iterator
552  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
553  size_t r = std::distance(beg, end);
554  std::advance(e, std::min(r, g));
555  }
556  // Case 2: non-random access iterator
557  else {
558  for(size_t i=0; i<g && e != end; ++e, ++i);
559  }
560 
561  // Create a task
562  auto task = emplace([beg, e, uop, pop, res= &g_results[id]] () mutable {
563  *res = uop(*beg);
564  for(++beg; beg != e; ++beg) {
565  *res = pop(std::move(*res), *beg);
566  }
567  });
568  //auto [task, future] = emplace([beg, e, uop, pop] () mutable {
569  // auto init = uop(*beg);
570  // for(++beg; beg != e; ++beg) {
571  // init = pop(std::move(init), *beg);
572  // }
573  // return init;
574  //});
575  source.precede(task);
576  task.precede(target);
577  //futures.push_back(std::move(future));
578 
579  // adjust the pointer
580  beg = e;
581  id ++;
582  }
583 
584  // target synchronizer
585  target.work([&result, bop, g_results=MoC{std::move(g_results)}, w=id] () {
586  for(auto i=0u; i<w; i++) {
587  result = bop(std::move(result), std::move(g_results.object[i]));
588  }
589  });
590  //target.work([&result, futures=MoC{std::move(futures)}, bop] () {
591  // for(auto& fu : futures.object) {
592  // result = bop(std::move(result), fu.get());
593  // }
594  //});
595 
596  return std::make_pair(source, target);
597 }
598 
599 // Function: _estimate_chunk_size
600 template <typename I>
601 size_t FlowBuilder::_estimate_chunk_size(I beg, I end, I step) {
602 
603  using T = std::decay_t<I>;
604 
605  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
606  size_t N = 0;
607 
608  if constexpr(std::is_integral_v<T>) {
609  if(beg <= end) {
610  N = (end - beg + step - 1) / step;
611  }
612  else {
613  N = (end - beg + step + 1) / step;
614  }
615  }
616  else if constexpr(std::is_floating_point_v<T>) {
617  N = static_cast<size_t>(std::ceil((end - beg) / step));
618  }
619  else {
620  static_assert(dependent_false_v<T>, "can't deduce chunk size");
621  }
622 
623  return (N + w - 1) / w;
624 }
625 
626 
627 // Procedure: _linearize
628 template <typename L>
629 void FlowBuilder::_linearize(L& keys) {
630 
631  auto itr = keys.begin();
632  auto end = keys.end();
633 
634  if(itr == end) {
635  return;
636  }
637 
638  auto nxt = itr;
639 
640  for(++nxt; nxt != end; ++nxt, ++itr) {
641  itr->_node->precede(*(nxt->_node));
642  }
643 }
644 
645 // Procedure: linearize
647  _linearize(keys);
648 }
649 
650 // Procedure: linearize
652  _linearize(keys);
653 }
654 
655 // Proceduer: reduce
656 template <typename I, typename T, typename B>
657 std::pair<Task, Task> FlowBuilder::reduce(I beg, I end, T& result, B&& op) {
658 
659  using category = typename std::iterator_traits<I>::iterator_category;
660 
661  size_t d = std::distance(beg, end);
662  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
663  size_t g = std::max((d + w - 1) / w, size_t{2});
664 
665  auto source = placeholder();
666  auto target = placeholder();
667 
668  //T* g_results = static_cast<T*>(malloc(sizeof(T)*w));
669  auto g_results = std::make_unique<T[]>(w);
670  //std::vector<std::future<T>> futures;
671 
672  size_t id {0};
673  while(beg != end) {
674 
675  auto e = beg;
676 
677  // Case 1: random access iterator
678  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
679  size_t r = std::distance(beg, end);
680  std::advance(e, std::min(r, g));
681  }
682  // Case 2: non-random access iterator
683  else {
684  for(size_t i=0; i<g && e != end; ++e, ++i);
685  }
686 
687  // Create a task
688  //auto [task, future] = emplace([beg, e, op] () mutable {
689  auto task = emplace([beg, e, op, res = &g_results[id]] () mutable {
690  *res = *beg;
691  for(++beg; beg != e; ++beg) {
692  *res = op(std::move(*res), *beg);
693  }
694  //auto init = *beg;
695  //for(++beg; beg != e; ++beg) {
696  // init = op(std::move(init), *beg);
697  //}
698  //return init;
699  });
700  source.precede(task);
701  task.precede(target);
702  //futures.push_back(std::move(future));
703 
704  // adjust the pointer
705  beg = e;
706  id ++;
707  }
708 
709  // target synchronizer
710  //target.work([&result, futures=MoC{std::move(futures)}, op] () {
711  // for(auto& fu : futures.object) {
712  // result = op(std::move(result), fu.get());
713  // }
714  //});
715  target.work([g_results=MoC{std::move(g_results)}, &result, op, w=id] () {
716  for(auto i=0u; i<w; i++) {
717  result = op(std::move(result), g_results.object[i]);
718  }
719  });
720 
721  return std::make_pair(source, target);
722 }
723 
724 // ----------------------------------------------------------------------------
725 
731 class Subflow : public FlowBuilder {
732 
733  public:
734 
738  template <typename... Args>
739  Subflow(Args&&... args);
740 
744  void join();
745 
749  void detach();
750 
754  bool detached() const;
755 
759  bool joined() const;
760 
761  private:
762 
763  bool _detached {false};
764 };
765 
766 // Constructor
767 template <typename... Args>
768 Subflow::Subflow(Args&&... args) :
769  FlowBuilder {std::forward<Args>(args)...} {
770 }
771 
772 // Procedure: join
773 inline void Subflow::join() {
774  _detached = false;
775 }
776 
777 // Procedure: detach
778 inline void Subflow::detach() {
779  _detached = true;
780 }
781 
782 // Function: detached
783 inline bool Subflow::detached() const {
784  return _detached;
785 }
786 
787 // Function: joined
788 inline bool Subflow::joined() const {
789  return !_detached;
790 }
791 
792 // -----
793 
794 // Function: emplace
795 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
796 auto FlowBuilder::emplace(C&&... cs) {
797  return std::make_tuple(emplace(std::forward<C>(cs))...);
798 }
799 
800 // Function: emplace
801 template <typename C>
803  // dynamic tasking
804  if constexpr(std::is_invocable_v<C, Subflow&>) {
805  auto& n = _graph.emplace_back(
806  [c=std::forward<C>(c)] (Subflow& fb) mutable {
807  // first time execution
808  if(fb._graph.empty()) {
809  c(fb);
810  }
811  });
812  return Task(n);
813  }
814  // static tasking
815  else if constexpr(std::is_invocable_v<C>) {
816  auto& n = _graph.emplace_back(std::forward<C>(c));
817  return Task(n);
818  }
819  else {
820  static_assert(dependent_false_v<C>, "invalid task work type");
821  }
822 }
823 
824 // Function: silent_emplace
825 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
827  return std::make_tuple(emplace(std::forward<C>(cs))...);
828 }
829 
830 // Function: silent_emplace
831 template <typename C>
833  return emplace(std::forward<C>(c));
834 }
835 
836 // ---------- deprecated ----------
837 
839 //template <typename C>
840 //auto FlowBuilder::emplace(C&& c) {
841 // // subflow task
842 // if constexpr(std::is_invocable_v<C, Subflow&>) {
843 //
844 // using R = std::invoke_result_t<C, Subflow&>;
845 // std::promise<R> p;
846 // auto fu = p.get_future();
847 //
848 // if constexpr(std::is_same_v<void, R>) {
849 // auto& node = _graph.emplace_back([p=MoC(std::move(p)), c=std::forward<C>(c)]
850 // (Subflow& fb) mutable {
851 // if(fb._graph.empty()) {
852 // c(fb);
853 // // if subgraph is detached or empty after invoked
854 // if(fb.detached() || fb._graph.empty()) {
855 // p.get().set_value();
856 // }
857 // }
858 // else {
859 // p.get().set_value();
860 // }
861 // });
862 // return std::make_pair(Task(node), std::move(fu));
863 // }
864 // else {
865 // auto& node = _graph.emplace_back(
866 // [p=MoC(std::move(p)), c=std::forward<C>(c), r=std::optional<R>()]
867 // (Subflow& fb) mutable {
868 // if(fb._graph.empty()) {
869 // r.emplace(c(fb));
870 // if(fb.detached() || fb._graph.empty()) {
871 // p.get().set_value(std::move(*r));
872 // }
873 // }
874 // else {
875 // assert(r);
876 // p.get().set_value(std::move(*r));
877 // }
878 // });
879 // return std::make_pair(Task(node), std::move(fu));
880 // }
881 // }
882 // // regular task
883 // else if constexpr(std::is_invocable_v<C>) {
884 //
885 // using R = std::invoke_result_t<C>;
886 // std::promise<R> p;
887 // auto fu = p.get_future();
888 //
889 // if constexpr(std::is_same_v<void, R>) {
890 // auto& node = _graph.emplace_back(
891 // [p=MoC(std::move(p)), c=std::forward<C>(c)]() mutable {
892 // c();
893 // p.get().set_value();
894 // }
895 // );
896 // return std::make_pair(Task(node), std::move(fu));
897 // }
898 // else {
899 // auto& node = _graph.emplace_back(
900 // [p=MoC(std::move(p)), c=std::forward<C>(c)]() mutable {
901 // p.get().set_value(c());
902 // }
903 // );
904 // return std::make_pair(Task(node), std::move(fu));
905 // }
906 // }
907 // else {
908 // static_assert(dependent_false_v<C>, "invalid task work type");
909 // }
910 //}
911 
912 using SubflowBuilder = Subflow;
913 
914 } // end of namespace tf. ---------------------------------------------------
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition: flow_builder.hpp:646
Task emplace(C &&callable)
creates a task from a given callable object
Definition: flow_builder.hpp:802
void broadcast(Task A, std::vector< Task > &others)
adds dependency links from one task A to many tasks
Definition: flow_builder.hpp:290
std::pair< Task, Task > transform_reduce(I beg, I end, T &result, B &&bop, U &&uop)
constructs a task dependency graph of parallel transformation and reduction
Definition: flow_builder.hpp:472
Task & gather(Ts &&... tasks)
adds precedence links from other tasks to this
Definition: task.hpp:201
Definition: taskflow.hpp:5
T hardware_concurrency(T... args)
Task placeholder()
creates an empty task
Definition: flow_builder.hpp:310
Subflow(Args &&... args)
constructs a subflow builder object
Definition: flow_builder.hpp:768
void detach()
enables the subflow to detach from its parent task
Definition: flow_builder.hpp:778
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:783
std::pair< Task, Task > reduce_max(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::max
Definition: flow_builder.hpp:464
std::pair< Task, Task > parallel_for(I beg, I end, C &&callable, size_t chunk=0)
constructs a task dependency graph of range-based parallel_for
Definition: flow_builder.hpp:317
void precede(Task A, Task B)
adds a dependency link from task A to task B
Definition: flow_builder.hpp:285
void gather(std::vector< Task > &others, Task A)
adds dependency links from many tasks to one task A
Definition: flow_builder.hpp:300
FlowBuilder(Graph &graph)
construct a flow builder object
Definition: flow_builder.hpp:280
Building blocks of a task dependency graph.
Definition: flow_builder.hpp:13
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:788
Handle to modify and access a task.
Definition: task.hpp:18
Task silent_emplace(C &&callable)
the same as tf::FlowBuilder::emplace (starting at 2.1.0)
Definition: flow_builder.hpp:832
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition: task.hpp:182
std::pair< Task, Task > reduce(I beg, I end, T &result, B &&bop)
construct a task dependency graph of parallel reduction
Definition: flow_builder.hpp:657
Task & work(C &&callable)
assigns a new callable object to the task
Definition: task.hpp:248
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:731
void join()
enables the subflow to join its parent task
Definition: flow_builder.hpp:773
std::pair< Task, Task > reduce_min(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::min
Definition: flow_builder.hpp:455