Cpp-Taskflow  2.0.0
flow_builder.hpp
1 #pragma once
2 
3 #include "task.hpp"
4 
5 namespace tf {
6 
13 class FlowBuilder {
14 
15  public:
16 
17  FlowBuilder(Graph&);
18 
28  template <typename C>
29  auto emplace(C&& callable);
30 
40  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
41  auto emplace(C&&... callables);
42 
52  template <typename C>
53  auto silent_emplace(C&& callable);
54 
64  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
65  auto silent_emplace(C&&... callables);
66 
84  template <typename I, typename C>
85  std::pair<Task, Task> parallel_for(I beg, I end, C&& callable, size_t chunk = 0);
86 
104  template <typename I, typename C, std::enable_if_t<std::is_arithmetic_v<I>, void>* = nullptr >
105  std::pair<Task, Task> parallel_for(I beg, I end, I step, C&& callable, size_t chunk = 0);
106 
124  template <typename I, typename T, typename B>
125  std::pair<Task, Task> reduce(I beg, I end, T& result, B&& bop);
126 
142  template <typename I, typename T>
143  std::pair<Task, Task> reduce_min(I beg, I end, T& result);
144 
160  template <typename I, typename T>
161  std::pair<Task, Task> reduce_max(I beg, I end, T& result);
162 
184  template <typename I, typename T, typename B, typename U>
185  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop, U&& uop);
186 
211  template <typename I, typename T, typename B, typename P, typename U>
212  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop1, P&& bop2, U&& uop);
213 
219  Task placeholder();
220 
227  void precede(Task A, Task B);
228 
234  void linearize(std::vector<Task>& tasks);
235 
242 
249  void broadcast(Task A, std::vector<Task>& others);
250 
258 
265  void gather(std::vector<Task>& others, Task A);
266 
273  void gather(std::initializer_list<Task> others, Task A);
274 
275  private:
276 
277  Graph& _graph;
278 
279  template <typename L>
280  void _linearize(L&);
281 
282  template <typename I>
283  size_t _estimate_chunk_size(I, I, I);
284 };
285 
286 // Constructor
287 inline FlowBuilder::FlowBuilder(Graph& graph) :
288  _graph {graph} {
289 }
290 
291 // Procedure: precede
292 inline void FlowBuilder::precede(Task from, Task to) {
293  from._node->precede(*(to._node));
294 }
295 
296 // Procedure: broadcast
298  from.precede(keys);
299 }
300 
301 // Procedure: broadcast
303  from.precede(keys);
304 }
305 
306 // Function: gather
308  to.gather(keys);
309 }
310 
311 // Function: gather
313  to.gather(keys);
314 }
315 
316 // Function: placeholder
318  auto& node = _graph.emplace_back();
319  return Task(node);
320 }
321 
322 // Function: silent_emplace
323 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
325  return std::make_tuple(silent_emplace(std::forward<C>(cs))...);
326 }
327 
328 // Function: parallel_for
329 template <typename I, typename C>
330 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, C&& c, size_t g) {
331 
332  using category = typename std::iterator_traits<I>::iterator_category;
333 
334  if(g == 0) {
335  auto d = std::distance(beg, end);
336  auto w = std::max(unsigned{1}, std::thread::hardware_concurrency());
337  g = (d + w - 1) / w;
338  }
339 
340  auto source = placeholder();
341  auto target = placeholder();
342 
343  while(beg != end) {
344 
345  auto e = beg;
346 
347  // Case 1: random access iterator
348  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
349  size_t r = std::distance(beg, end);
350  std::advance(e, std::min(r, g));
351  }
352  // Case 2: non-random access iterator
353  else {
354  for(size_t i=0; i<g && e != end; ++e, ++i);
355  }
356 
357  // Create a task
358  auto task = silent_emplace([beg, e, c] () mutable {
359  std::for_each(beg, e, c);
360  });
361  source.precede(task);
362  task.precede(target);
363 
364  // adjust the pointer
365  beg = e;
366  }
367 
368  return std::make_pair(source, target);
369 }
370 
371 // Function: parallel_for
372 template <
373  typename I,
374  typename C,
375  std::enable_if_t<std::is_arithmetic_v<I>, void>*
376 >
377 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t g) {
378 
379  using T = std::decay_t<I>;
380 
381  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
382  TF_THROW(Error::FLOW_BUILDER,
383  "invalid range [", beg, ", ", end, ") with step size ", s
384  );
385  }
386 
387  auto source = placeholder();
388  auto target = placeholder();
389 
390  if(g == 0) {
391  g = _estimate_chunk_size(beg, end, s);
392  }
393 
394  // Integer indices
395  if constexpr(std::is_integral_v<T>) {
396 
397  auto offset = static_cast<T>(g) * s;
398 
399  // positive case
400  if(beg < end) {
401  while(beg != end) {
402  auto e = std::min(beg + offset, end);
403  auto task = silent_emplace([=] () mutable {
404  for(auto i=beg; i<e; i+=s) {
405  c(i);
406  }
407  });
408  source.precede(task);
409  task.precede(target);
410  beg = e;
411  }
412  }
413  // negative case
414  else if(beg > end) {
415  while(beg != end) {
416  auto e = std::max(beg + offset, end);
417  auto task = silent_emplace([=] () mutable {
418  for(auto i=beg; i>e; i+=s) {
419  c(i);
420  }
421  });
422  source.precede(task);
423  task.precede(target);
424  beg = e;
425  }
426  }
427  }
428  // We enumerate the entire sequence to avoid floating error
429  else if constexpr(std::is_floating_point_v<T>) {
430  size_t N = 0;
431  auto B = beg;
432  for(auto i=beg; (beg<end ? i<end : i>end); i+=s, ++N) {
433  if(N == g) {
434  auto task = silent_emplace([=] () mutable {
435  auto b = B;
436  for(size_t n=0; n<N; ++n) {
437  c(b);
438  b += s;
439  }
440  });
441  N = 0;
442  B = i;
443  source.precede(task);
444  task.precede(target);
445  }
446  }
447 
448  // the last pices
449  if(N != 0) {
450  auto task = silent_emplace([=] () mutable {
451  auto b = B;
452  for(size_t n=0; n<N; ++n) {
453  c(b);
454  b += s;
455  }
456  });
457  source.precede(task);
458  task.precede(target);
459  }
460  }
461 
462  return std::make_pair(source, target);
463 }
464 
465 // Function: reduce_min
466 // Find the minimum element over a range of items.
467 template <typename I, typename T>
469  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
470  return std::min(l, r);
471  });
472 }
473 
474 // Function: reduce_max
475 // Find the maximum element over a range of items.
476 template <typename I, typename T>
478  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
479  return std::max(l, r);
480  });
481 }
482 
483 // Function: transform_reduce
484 template <typename I, typename T, typename B, typename U>
485 std::pair<Task, Task> FlowBuilder::transform_reduce(I beg, I end, T& result, B&& bop, U&& uop) {
486 
487  using category = typename std::iterator_traits<I>::iterator_category;
488 
489  // Even partition
490  size_t d = std::distance(beg, end);
491  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
492  size_t g = std::max((d + w - 1) / w, size_t{2});
493 
494  auto source = placeholder();
495  auto target = placeholder();
496 
498 
499  while(beg != end) {
500 
501  auto e = beg;
502 
503  // Case 1: random access iterator
504  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
505  size_t r = std::distance(beg, end);
506  std::advance(e, std::min(r, g));
507  }
508  // Case 2: non-random access iterator
509  else {
510  for(size_t i=0; i<g && e != end; ++e, ++i);
511  }
512 
513  // Create a task
514  auto [task, future] = emplace([beg, e, bop, uop] () mutable {
515  auto init = uop(*beg);
516  for(++beg; beg != e; ++beg) {
517  init = bop(std::move(init), uop(*beg));
518  }
519  return init;
520  });
521  source.precede(task);
522  task.precede(target);
523  futures.push_back(std::move(future));
524 
525  // adjust the pointer
526  beg = e;
527  }
528 
529  // target synchronizer
530  target.work([&result, futures=MoC{std::move(futures)}, bop] () {
531  for(auto& fu : futures.object) {
532  result = bop(std::move(result), fu.get());
533  }
534  });
535 
536  return std::make_pair(source, target);
537 }
538 
539 // Function: transform_reduce
540 template <typename I, typename T, typename B, typename P, typename U>
541 std::pair<Task, Task> FlowBuilder::transform_reduce(I beg, I end, T& result, B&& bop, P&& pop, U&& uop) {
542 
543  using category = typename std::iterator_traits<I>::iterator_category;
544 
545  // Even partition
546  size_t d = std::distance(beg, end);
547  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
548  size_t g = std::max((d + w - 1) / w, size_t{2});
549 
550  auto source = placeholder();
551  auto target = placeholder();
552 
554 
555  while(beg != end) {
556 
557  auto e = beg;
558 
559  // Case 1: random access iterator
560  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
561  size_t r = std::distance(beg, end);
562  std::advance(e, std::min(r, g));
563  }
564  // Case 2: non-random access iterator
565  else {
566  for(size_t i=0; i<g && e != end; ++e, ++i);
567  }
568 
569  // Create a task
570  auto [task, future] = emplace([beg, e, uop, pop] () mutable {
571  auto init = uop(*beg);
572  for(++beg; beg != e; ++beg) {
573  init = pop(std::move(init), *beg);
574  }
575  return init;
576  });
577  source.precede(task);
578  task.precede(target);
579  futures.push_back(std::move(future));
580 
581  // adjust the pointer
582  beg = e;
583  }
584 
585  // target synchronizer
586  target.work([&result, futures=MoC{std::move(futures)}, bop] () {
587  for(auto& fu : futures.object) {
588  result = bop(std::move(result), fu.get());
589  }
590  });
591 
592  return std::make_pair(source, target);
593 }
594 
595 // Function: _estimate_chunk_size
596 template <typename I>
597 size_t FlowBuilder::_estimate_chunk_size(I beg, I end, I step) {
598 
599  using T = std::decay_t<I>;
600 
601  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
602  size_t N = 0;
603 
604  if constexpr(std::is_integral_v<T>) {
605  if(beg <= end) {
606  N = (end - beg + step - 1) / step;
607  }
608  else {
609  N = (end - beg + step + 1) / step;
610  }
611  }
612  else if constexpr(std::is_floating_point_v<T>) {
613  N = static_cast<size_t>(std::ceil((end - beg) / step));
614  }
615  else {
616  static_assert(dependent_false_v<T>, "can't deduce chunk size");
617  }
618 
619  return (N + w - 1) / w;
620 }
621 
622 
623 // Procedure: _linearize
624 template <typename L>
625 void FlowBuilder::_linearize(L& keys) {
626 
627  auto itr = keys.begin();
628  auto end = keys.end();
629 
630  if(itr == end) {
631  return;
632  }
633 
634  auto nxt = itr;
635 
636  for(++nxt; nxt != end; ++nxt, ++itr) {
637  itr->_node->precede(*(nxt->_node));
638  }
639 }
640 
641 // Procedure: linearize
643  _linearize(keys);
644 }
645 
646 // Procedure: linearize
648  _linearize(keys);
649 }
650 
651 // Proceduer: reduce
652 template <typename I, typename T, typename B>
653 std::pair<Task, Task> FlowBuilder::reduce(I beg, I end, T& result, B&& op) {
654 
655  using category = typename std::iterator_traits<I>::iterator_category;
656 
657  size_t d = std::distance(beg, end);
658  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
659  size_t g = std::max((d + w - 1) / w, size_t{2});
660 
661  auto source = placeholder();
662  auto target = placeholder();
663 
665 
666  while(beg != end) {
667 
668  auto e = beg;
669 
670  // Case 1: random access iterator
671  if constexpr(std::is_same_v<category, std::random_access_iterator_tag>) {
672  size_t r = std::distance(beg, end);
673  std::advance(e, std::min(r, g));
674  }
675  // Case 2: non-random access iterator
676  else {
677  for(size_t i=0; i<g && e != end; ++e, ++i);
678  }
679 
680  // Create a task
681  auto [task, future] = emplace([beg, e, op] () mutable {
682  auto init = *beg;
683  for(++beg; beg != e; ++beg) {
684  init = op(std::move(init), *beg);
685  }
686  return init;
687  });
688  source.precede(task);
689  task.precede(target);
690  futures.push_back(std::move(future));
691 
692  // adjust the pointer
693  beg = e;
694  }
695 
696  // target synchronizer
697  target.work([&result, futures=MoC{std::move(futures)}, op] () {
698  for(auto& fu : futures.object) {
699  result = op(std::move(result), fu.get());
700  }
701  });
702 
703  return std::make_pair(source, target);
704 }
705 
706 // ----------------------------------------------------------------------------
707 
713 class SubflowBuilder : public FlowBuilder {
714 
715  public:
716 
717  template <typename... Args>
718  SubflowBuilder(Args&&...);
719 
723  void join();
724 
728  void detach();
729 
733  bool detached() const;
734 
738  bool joined() const;
739 
740  private:
741 
742  bool _detached {false};
743 };
744 
745 // Constructor
746 template <typename... Args>
747 SubflowBuilder::SubflowBuilder(Args&&... args) :
748  FlowBuilder {std::forward<Args>(args)...} {
749 }
750 
751 // Procedure: join
752 inline void SubflowBuilder::join() {
753  _detached = false;
754 }
755 
756 // Procedure: detach
757 inline void SubflowBuilder::detach() {
758  _detached = true;
759 }
760 
761 // Function: detached
762 inline bool SubflowBuilder::detached() const {
763  return _detached;
764 }
765 
766 // Function: joined
767 inline bool SubflowBuilder::joined() const {
768  return !_detached;
769 }
770 
771 // Function: emplace
772 template <typename C>
773 auto FlowBuilder::emplace(C&& c) {
774 
775  // subflow task
776  if constexpr(std::is_invocable_v<C, SubflowBuilder&>) {
777 
778  using R = std::invoke_result_t<C, SubflowBuilder&>;
779  std::promise<R> p;
780  auto fu = p.get_future();
781 
782  if constexpr(std::is_same_v<void, R>) {
783  auto& node = _graph.emplace_back([p=MoC(std::move(p)), c=std::forward<C>(c)]
784  (SubflowBuilder& fb) mutable {
785  if(fb._graph.empty()) {
786  c(fb);
787  // if subgraph is detached or empty after invoked
788  if(fb.detached() || fb._graph.empty()) {
789  p.get().set_value();
790  }
791  }
792  else {
793  p.get().set_value();
794  }
795  });
796  return std::make_pair(Task(node), std::move(fu));
797  }
798  else {
799  auto& node = _graph.emplace_back(
800  [p=MoC(std::move(p)), c=std::forward<C>(c), r=std::optional<R>()]
801  (SubflowBuilder& fb) mutable {
802  if(fb._graph.empty()) {
803  r.emplace(c(fb));
804  if(fb.detached() || fb._graph.empty()) {
805  p.get().set_value(std::move(*r));
806  }
807  }
808  else {
809  assert(r);
810  p.get().set_value(std::move(*r));
811  }
812  });
813  return std::make_pair(Task(node), std::move(fu));
814  }
815  }
816  // regular task
817  else if constexpr(std::is_invocable_v<C>) {
818 
819  using R = std::invoke_result_t<C>;
820  std::promise<R> p;
821  auto fu = p.get_future();
822 
823  if constexpr(std::is_same_v<void, R>) {
824  auto& node = _graph.emplace_back(
825  [p=MoC(std::move(p)), c=std::forward<C>(c)]() mutable {
826  c();
827  p.get().set_value();
828  }
829  );
830  return std::make_pair(Task(node), std::move(fu));
831  }
832  else {
833  auto& node = _graph.emplace_back(
834  [p=MoC(std::move(p)), c=std::forward<C>(c)]() mutable {
835  p.get().set_value(c());
836  }
837  );
838  return std::make_pair(Task(node), std::move(fu));
839  }
840  }
841  else {
842  static_assert(dependent_false_v<C>, "invalid task work type");
843  }
844 }
845 
846 // Function: emplace
847 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
848 auto FlowBuilder::emplace(C&&... cs) {
849  return std::make_tuple(emplace(std::forward<C>(cs))...);
850 }
851 
852 // Function: silent_emplace
853 template <typename C>
855  // dynamic tasking
856  if constexpr(std::is_invocable_v<C, SubflowBuilder&>) {
857  auto& n = _graph.emplace_back(
858  [c=std::forward<C>(c)] (SubflowBuilder& fb) {
859  // first time execution
860  if(fb._graph.empty()) {
861  c(fb);
862  }
863  });
864  return Task(n);
865  }
866  // static tasking
867  else if constexpr(std::is_invocable_v<C>) {
868  auto& n = _graph.emplace_back(std::forward<C>(c));
869  return Task(n);
870  }
871  else {
872  static_assert(dependent_false_v<C>, "invalid task work type");
873  }
874 }
875 
876 }; // end of namespace tf. ---------------------------------------------------
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:767
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition: flow_builder.hpp:642
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:762
void detach()
enables the subflow to detach from its parent task
Definition: flow_builder.hpp:757
void broadcast(Task A, std::vector< Task > &others)
adds dependency links from one task A to many tasks
Definition: flow_builder.hpp:297
std::pair< Task, Task > transform_reduce(I beg, I end, T &result, B &&bop, U &&uop)
constructs a task dependency graph of parallel transformation and reduction
Definition: flow_builder.hpp:485
Task & gather(Ts &&... tasks)
adds precedence links from other tasks to this
Definition: task.hpp:224
Definition: taskflow.hpp:6
auto silent_emplace(C &&callable)
creates a task from a given callable object without access to the result
Definition: flow_builder.hpp:854
T hardware_concurrency(T... args)
Task placeholder()
creates an empty task
Definition: flow_builder.hpp:317
std::pair< Task, Task > reduce_max(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::max
Definition: flow_builder.hpp:477
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:713
std::pair< Task, Task > parallel_for(I beg, I end, C &&callable, size_t chunk=0)
constructs a task dependency graph of range-based parallel_for
Definition: flow_builder.hpp:330
auto emplace(C &&callable)
creates a task from a given callable object
Definition: flow_builder.hpp:773
void precede(Task A, Task B)
adds a dependency link from task A to task B
Definition: flow_builder.hpp:292
void gather(std::vector< Task > &others, Task A)
adds dependency links from many tasks to one task A
Definition: flow_builder.hpp:307
The building blocks of task dependency graphs.
Definition: flow_builder.hpp:13
void join()
enables the subflow to join its parent task
Definition: flow_builder.hpp:752
Handle to modify and access a task.
Definition: task.hpp:18
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition: task.hpp:205
std::pair< Task, Task > reduce(I beg, I end, T &result, B &&bop)
construct a task dependency graph of parallel reduction
Definition: flow_builder.hpp:653
std::pair< Task, Task > reduce_min(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::min
Definition: flow_builder.hpp:468