Cpp-Taskflow  2.2.0
C3: Create a Parallel For-loop Graph

Running a for-loop in parallel is the most fundamental building block in parallel programming. In this chapter, we are going to demonstrate how to use Cpp-Taskflow to create a task dependency graph of parallel for-loop.

Range-based For-loop

Cpp-Taskflow has a STL-style method tf::Taskflow::parallel_for(I beg, I end, C&& callable, size_t chunk) that takes a range of items and applies a callable to each of the item in parallel. The method constructs a task dependency graph representing this workload and returns a task pair as two synchronization points to this task graph.

1: tf::Taskflow taskflow;
2:
3: std::vector<int> items {1, 2, 3, 4, 5, 6, 7, 8};
4:
5: auto [S, T] = taskflow.parallel_for(items.begin(), items.end(), [] (int item) {
6: std::cout << std::this_thread::get_id() << " runs " << item << std::endl;
7: });
8:
9: S.work([](){ std::cout << "S\n"; }).name("S");
10: T.work([](){ std::cout << "T\n"; }).name("T");
11:
12: taskflow.dump(std::cout);

The above code generates the following task dependency graph. The label 0x56* represents an internal task node to execute the callable object. By default (chunk=0), Cpp-Taskflow evenly partitions and distributes the workload to all threads. In our example of eight tasks and four workers, each internal node is responsible for two items.

parallel_for1.png

Debrief:

  • Line 1 creates a taskflow object of four worker threads
  • Line 3 creates a vector container of eight items
  • Line 5-7 creates a parallel execution graph using the method parallel_for
  • Line 9-10 names the synchronization tasks S and T
  • Line 12 dumps the graph to a dot format which can be visualized through GraphViz Online

Here is one possible output of this program:

S
139931471636224 runs 1
139931471636224 runs 2
139931480028928 runs 7
139931480028928 runs 8
139931496814336 runs 3
139931496814336 runs 4
139931488421632 runs 5
139931488421632 runs 6
T

Partition the Workload Explicitly

By default, Cpp-Taskflow partitions the workload evenly across the workers. In some cases, it is useful to disable this feature and apply user-specified partition. The method parallel_for tasks an unsigned integer chunk as the number of items in each partition.

auto [S, T] = taskflow.parallel_for(items.begin(), items.end(), [] (int item) {
std::cout << std::this_thread::get_id() << " runs " << item << std::endl;
}, 1);

The above example will force each partition to run exactly one item. This can be useful when you have unbalanced workload and would like to enable more efficient parallelization.

parallel_for2.png

You can leave the chunk size to 0 to use our default partition strategy.

Construct the Graph Explicitly

You can explicitly construct a dependency graph that represents a parallel execution of a for-loop using only the basic methods tf::Taskflow::emplace and tf::Task::precede.

tf::Task S = taskflow.emplace([](){}).name("S");
tf::Task T = taskflow.emplace([](){}).name("T");
for(auto item : items) {
tf::Task task = taskflow.emplace([item] () {
std::cout << std::this_thread::get_id() << " runs " << item << std::endl;
});
S.precede(task);
task.precede(T);
}

Index-based For-loop

Cpp-Taskflow provides an overload tf::Taskflow::parallel_for(I beg, I end, I step, C&& callable, size_t chunk) to parallelize an index-based for-loop. It takes three numbers beg, end, and step of the same type I and applies callable to each index in the range [beg, end) with the step size step.

1: taskflow.parallel_for(0, 10, 2, [] (int i) {
2: std::cout << "parallel on " << i << std::endl;
3: });
4: // print 0, 2, 4, 6, 8

It also works on the opposite order with negative step size.

1: taskflow.parallel_for(10, 0, -2, [] (int i) {
2: std::cout << "parallel on " << i << std::endl;
3: });
4: // print 10, 8, 6, 4, 2

Similarly, you can specify a chunk size to group the works per thread.

1: taskflow.parallel_for(0, 10, 2, [] (int i) {
2: std::cout << "parallel on " << i << std::endl;
3: }, 2);
4: // print 0, 2, 4, 6, 8 (three groups {0, 2}, {4, 6}, {8})

By default, Cpp-Taskflow performs even partition across the number of available threads if no group size is given.

Example 1: Parallel Map

This example demonstrates how to use tf::Taskflow::parallel_for to create a parallel map pattern. The map operator modifies each item in the container to one if it is an odd number, or zero if it is an even number.

#include <taskflow/taskflow.hpp>
int main() {
tf::Executor executor;
tf::Taskflow taskflow;
std::vector<int> items{1, 2, 3, 4, 5, 6, 7, 8};
taskflow.parallel_for(items.begin(), items.end(), [] (int& item) {
item = (item & 1) ? 1 : 0;
});
executor.run(taskflow).get();
for(auto item : items) {
std::cout << item << " ";
}
return 0;
}

The program outputs the following:

1 0 1 0 1 0 1 0

Example 2: Pipeline a Parallel For-loop

This example demonstrates how to pipeline a parallel-for workload with other tasks.

1: #include <taskflow/taskflow.hpp>
2:
3: int main() {
4:
5: tf::Taskflow taskflow;
6:
7: std::vector<int> items(1024);
8: std::atomic<int> sum {0};
9:
10: tf::Task T1 = taskflow.emplace([&] () { // create a modifier task
11: for(auto& item : items) {
12: item = 1;
13: }
14: }).name("Create Items");
15:
16: auto [S, T] = taskflow.parallel_for(items.begin(), items.end(), [&] (int item) {
17: sum.fetch_add(item, std::memory_order_relaxed);
18: }, 128);
19:
20: tf::Task T2 = taskflow.emplace([&] () { // create a output task
21: std::cout << "sum is: " << sum << std::endl;
22: }).name("Print Sum");
23:
24: T1.precede(S); // modifier precedes parallel-for
25: T.precede(T2); // parallel-for precedes the output task
26:
27: tf::Executor().run(taskflow);
28:
29: return 0;
30: }
parallel_for_example2.png

The output of this programs is:

sum is: 1024

Debrief:

  • Line 5 creates a taskflow object
  • Line 7 creates a vector of 1024 uninitialized integers
  • Line 8 creates an atomic integer variable
  • Line 10-14 creates a task that captures the vector to initialize all items to one
  • Line 16-18 sums up all items with each thread running on a partition of 128 items (total 1024/128=8 partitions)
  • Line 20-22 creates a task that outputs the summation value
  • Line 24-25 pipelines the parallel-for workload with the two tasks
  • Line 27 dispatches the graph to threads and blocks until the execution completes