Cpp-Taskflow  2.0.0
basic_taskflow.hpp
1 #pragma once
2 
3 #include "task.hpp"
4 #include "framework.hpp"
5 
6 namespace tf {
7 
19 template <template <typename...> typename E>
20 class BasicTaskflow : public FlowBuilder {
21 
22  using StaticWork = typename Node::StaticWork;
23  using DynamicWork = typename Node::DynamicWork;
24 
25  // Closure
26  struct Closure {
27 
28  Closure() = default;
29  Closure(const Closure&) = default;
30  Closure(BasicTaskflow&, Node&);
31 
32  Closure& operator = (const Closure&) = default;
33 
34  void operator ()() const;
35 
36  BasicTaskflow* taskflow {nullptr};
37  Node* node {nullptr};
38  };
39 
40  public:
41 
47  using Executor = E<Closure>;
48 
52  explicit BasicTaskflow();
53 
57  explicit BasicTaskflow(unsigned N);
58 
62  explicit BasicTaskflow(std::shared_ptr<Executor> executor);
63 
71 
78 
85 
91  template <typename C>
93 
97  void silent_dispatch();
98 
104  template <typename C>
105  void silent_dispatch(C&& callable);
106 
110  void wait_for_all();
111 
116  void wait_for_topologies();
117 
123  void dump(std::ostream& ostream) const;
124 
130  void dump_topologies(std::ostream& ostream) const;
131 
135  size_t num_nodes() const;
136 
140  size_t num_workers() const;
141 
145  size_t num_topologies() const;
146 
150  std::string dump() const;
151 
156 
157  private:
158 
159  Graph _graph;
160 
161  std::shared_ptr<Executor> _executor;
162 
163  std::forward_list<Topology> _topologies;
164 
165  void _schedule(Node&);
166  void _schedule(std::vector<Node*>&);
167 };
168 
169 // ============================================================================
170 // BasicTaskflow::Closure Method Definitions
171 // ============================================================================
172 
173 // Constructor
174 template <template <typename...> typename E>
176  taskflow{&t}, node {&n} {
177 }
178 
179 // Operator ()
180 template <template <typename...> typename E>
182 
183  //assert(taskflow && node);
184 
185  // Here we need to fetch the num_successors first to avoid the invalid memory
186  // access caused by topology clear.
187  const auto num_successors = node->num_successors();
188 
189  // regular node type
190  // The default node work type. We only need to execute the callback if any.
191  if(auto index=node->_work.index(); index == 0) {
192  if(auto &f = std::get<StaticWork>(node->_work); f != nullptr){
193  std::invoke(f);
194  }
195  }
196  // subflow node type
197  else {
198 
199  bool first_time {false}; // To stop creating subflow in second time
200 
201  // The first time we enter into the subflow context, subgraph must be nullopt.
202  if(!(node->_subgraph)){
203  node->_subgraph.emplace(); // Initialize the _subgraph
204  first_time = true;
205  }
206 
207  SubflowBuilder fb(*(node->_subgraph));
208 
209  std::invoke(std::get<DynamicWork>(node->_work), fb);
210 
211  // Need to create a subflow if first time & subgraph is not empty
212  if(first_time && !(node->_subgraph->empty())) {
214  //if(std::next(node->_subgraph->begin()) == node->_subgraph->end()){
215  // // If the subgraph has only one node, directly execute this static task
216  // // regardless of detached or join since this task will be eventually executed
217  // // some time before the end of the graph.
218  // if(auto &f = node->_subgraph->front(); f._work.index() == 0) {
219  // std::invoke(std::get<StaticWork>(f._work));
220  // }
221  // else {
222  // f.precede(fb.detached() ? node->_topology->_target : *node);
223  // f._topology = node->_topology;
224  // Closure c(*taskflow, f);
225  // c();
226  // // The reason to return here is this f might spawn new subflows (grandchildren)
227  // // and we need to make sure grandchildren finish before f's parent. So we need to
228  // // return here. Otherwise, we might execute f's parent even grandchildren are not scheduled
229  // if(!fb.detached()) {
230  // return;
231  // }
232  // }
233  //}
234  //else {
235  // For storing the source nodes
236  std::vector<Node*> src;
237  for(auto n = node->_subgraph->begin(); n != node->_subgraph->end(); ++n) {
238  n->_topology = node->_topology;
239  if(n->num_successors() == 0) {
240  n->precede(fb.detached() ? node->_topology->_target : *node);
241  }
242  if(n->num_dependents() == 0) {
243  src.emplace_back(&(*n));
244  }
245  }
246 
247  for(auto& n: src) {
248  taskflow->_schedule(*n);
249  }
250 
251  if(!fb.detached()) {
252  return;
253  }
254  //}
255  }
256  }
257 
258  // At this point, the node/node storage might be destructed.
259  for(size_t i=0; i<num_successors; ++i) {
260  if(--(node->_successors[i]->_dependents) == 0) {
261  taskflow->_schedule(*(node->_successors[i]));
262  }
263  }
264 }
265 
266 // ============================================================================
267 // BasicTaskflow Method Definitions
268 // ============================================================================
269 
270 // Constructor
271 template <template <typename...> typename E>
273  FlowBuilder {_graph},
274  _executor {std::make_shared<Executor>(std::thread::hardware_concurrency())} {
275 }
276 
277 // Constructor
278 template <template <typename...> typename E>
280  FlowBuilder {_graph},
281  _executor {std::make_shared<Executor>(N)} {
282 }
283 
284 // Constructor
285 template <template <typename...> typename E>
287  FlowBuilder {_graph},
288  _executor {std::move(e)} {
289 
290  if(_executor == nullptr) {
291  TF_THROW(Error::EXECUTOR,
292  "failed to construct taskflow (executor cannot be null)"
293  );
294  }
295 }
296 
297 // Destructor
298 template <template <typename...> typename E>
301 }
302 
303 // Function: num_nodes
304 template <template <typename...> typename E>
306  return _graph.size();
307 }
308 
309 // Function: num_workers
310 template <template <typename...> typename E>
312  return _executor->num_workers();
313 }
314 
315 // Function: num_topologies
316 template <template <typename...> typename E>
318  return std::distance(_topologies.begin(), _topologies.end());
319 }
320 
321 // Function: share_executor
322 template <template <typename...> typename E>
324  return _executor;
325 }
326 
327 // Procedure: silent_dispatch
328 template <template <typename...> typename E>
330 
331  if(_graph.empty()) return;
332 
333  auto& topology = _topologies.emplace_front(std::move(_graph));
334 
335  _schedule(topology._sources);
336 }
337 
338 
339 // Procedure: silent_dispatch with registered callback
340 template <template <typename...> typename E>
341 template <typename C>
343 
344  if(_graph.empty()) {
345  c();
346  return;
347  }
348 
349  auto& topology = _topologies.emplace_front(std::move(_graph), std::forward<C>(c));
350 
351  _schedule(topology._sources);
352 }
353 
354 // Procedure: dispatch
355 template <template <typename...> typename E>
357 
358  if(_graph.empty()) {
359  return std::async(std::launch::deferred, [](){}).share();
360  }
361 
362  auto& topology = _topologies.emplace_front(std::move(_graph));
363 
364  _schedule(topology._sources);
365 
366  return topology._future;
367 }
368 
369 
370 // Procedure: dispatch with registered callback
371 template <template <typename...> typename E>
372 template <typename C>
374 
375  if(_graph.empty()) {
376  c();
377  return std::async(std::launch::deferred, [](){}).share();
378  }
379 
380  auto& topology = _topologies.emplace_front(std::move(_graph), std::forward<C>(c));
381 
382  _schedule(topology._sources);
383 
384  return topology._future;
385 }
386 
387 // Procedure: wait_for_all
388 template <template <typename...> typename E>
390  if(!_graph.empty()) {
391  silent_dispatch();
392  }
394 }
395 
396 // Procedure: wait_for_topologies
397 template <template <typename...> typename E>
399  for(auto& t: _topologies){
400  t._future.get();
401  }
402  _topologies.clear();
403 }
404 
405 // Procedure: _schedule
406 // The main procedure to schedule a give task node.
407 // Each task node has two types of tasks - regular and subflow.
408 template <template <typename...> typename E>
409 void BasicTaskflow<E>::_schedule(Node& node) {
410  _executor->emplace(*this, node);
411 }
412 
413 
414 // Procedure: _schedule
415 // The main procedure to schedule a set of task nodes.
416 // Each task node has two types of tasks - regular and subflow.
417 template <template <typename...> typename E>
419  std::vector<Closure> closures;
420  closures.reserve(nodes.size());
421  for(auto src : nodes) {
422  closures.emplace_back(*this, *src);
423  }
424  _executor->batch(std::move(closures));
425 }
426 
427 // Function: dump_topologies
428 template <template <typename...> typename E>
430 
432 
433  for(const auto& tpg : _topologies) {
434  tpg.dump(os);
435  }
436 
437  return os.str();
438 }
439 
440 // Function: dump_topologies
441 template <template <typename...> typename E>
443  for(const auto& tpg : _topologies) {
444  tpg.dump(os);
445  }
446 }
447 
448 // Function: dump
449 template <template <typename...> typename E>
451 
452  os << "digraph Taskflow {\n";
453 
454  for(const auto& node : _graph) {
455  node.dump(os);
456  }
457 
458  os << "}\n";
459 }
460 
461 // Function: dump
462 // Dumps the taskflow in graphviz.
463 // The result can be viewed at http://www.webgraphviz.com/.
464 template <template <typename...> typename E>
467  dump(os);
468  return os.str();
469 }
470 
471 }; // end of namespace tf ----------------------------------------------------
472 
The base class to derive a taskflow class.
Definition: basic_taskflow.hpp:20
std::shared_ptr< Executor > share_executor()
shares ownership of the executor associated with this taskflow object
Definition: basic_taskflow.hpp:323
size_t num_nodes() const
queries the number of nodes in the present task dependency graph
Definition: basic_taskflow.hpp:305
std::shared_future< void > dispatch()
dispatches the present graph to threads and returns immediately
Definition: basic_taskflow.hpp:356
Definition: taskflow.hpp:6
void silent_dispatch()
dispatches the present graph to threads and returns immediately
Definition: basic_taskflow.hpp:329
T hardware_concurrency(T... args)
BasicTaskflow()
constructs the taskflow with std::thread::hardware_concurrency worker threads
Definition: basic_taskflow.hpp:272
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:713
size_t num_topologies() const
queries the number of existing topologies
Definition: basic_taskflow.hpp:317
void wait_for_topologies()
blocks until all running topologies complete and then cleans up all associated storages ...
Definition: basic_taskflow.hpp:398
void wait_for_all()
dispatches the present graph to threads and wait for all topologies to complete
Definition: basic_taskflow.hpp:389
The building blocks of task dependency graphs.
Definition: flow_builder.hpp:13
size_t num_workers() const
queries the number of worker threads in the associated executor
Definition: basic_taskflow.hpp:311
E< Closure > Executor
alias of executor type
Definition: basic_taskflow.hpp:47
~BasicTaskflow()
destructs the taskflow
Definition: basic_taskflow.hpp:299
std::string dump() const
dumps the present task dependency graph in DOT format to a std::string
Definition: basic_taskflow.hpp:465
std::string dump_topologies() const
dumps the existing topologies in DOT format to a std::string
Definition: basic_taskflow.hpp:429