transwarp
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
transwarp.h
1 /// @mainpage transwarp is a header-only C++ library for task concurrency
2 /// @details https://github.com/bloomen/transwarp
3 /// @version 1.7.0
4 /// @author Christian Blume, Guan Wang
5 /// @date 2018
6 /// @copyright MIT http://www.opensource.org/licenses/mit-license.php
7 #pragma once
8 #include <algorithm>
9 #include <atomic>
10 #include <chrono>
11 #include <cstddef>
12 #include <functional>
13 #include <future>
14 #include <memory>
15 #include <mutex>
16 #include <queue>
17 #include <stdexcept>
18 #include <string>
19 #include <thread>
20 #include <tuple>
21 #include <type_traits>
22 #include <unordered_map>
23 #include <vector>
24 
25 
26 /// The transwarp namespace
27 namespace transwarp {
28 
29 
30 /// The possible task types
31 enum class task_type {
32  root, ///< The task has no parents
33  accept, ///< The task's functor accepts all parent futures
34  accept_any, ///< The task's functor accepts the first parent future that becomes ready
35  consume, ///< The task's functor consumes all parent results
36  consume_any, ///< The task's functor consumes the first parent result that becomes ready
37  wait, ///< The task's functor takes no arguments but waits for all parents to finish
38  wait_any, ///< The task's functor takes no arguments but waits for the first parent to finish
39 };
40 
41 
42 /// Base class for exceptions
43 class transwarp_error : public std::runtime_error {
44 public:
45  explicit transwarp_error(const std::string& message)
46  : std::runtime_error(message)
47  {}
48 };
49 
50 
51 /// Exception thrown when a task is canceled
53 public:
54  explicit task_canceled(const std::string& node_repr)
55  : transwarp::transwarp_error("Task canceled: " + node_repr)
56  {}
57 };
58 
59 
60 /// Exception thrown when a task was destroyed prematurely
62 public:
63  explicit task_destroyed(const std::string& node_repr)
64  : transwarp::transwarp_error("Task destroyed: " + node_repr)
65  {}
66 };
67 
68 
69 /// Exception thrown when an invalid parameter was passed to a function
71 public:
72  explicit invalid_parameter(const std::string& parameter)
73  : transwarp::transwarp_error("Invalid parameter: " + parameter)
74  {}
75 };
76 
77 
78 /// Exception thrown when a task is used in unintended ways
80 public:
81  explicit control_error(const std::string& message)
82  : transwarp::transwarp_error("Control error: " + message)
83  {}
84 };
85 
86 
87 /// String conversion for the task_type enumeration
88 inline std::string to_string(const transwarp::task_type& type) {
89  switch (type) {
90  case transwarp::task_type::root: return "root";
91  case transwarp::task_type::accept: return "accept";
92  case transwarp::task_type::accept_any: return "accept_any";
93  case transwarp::task_type::consume: return "consume";
94  case transwarp::task_type::consume_any: return "consume_any";
95  case transwarp::task_type::wait: return "wait";
96  case transwarp::task_type::wait_any: return "wait_any";
97  default: throw transwarp::invalid_parameter("task type");
98  }
99 }
100 
101 
102 /// The root type. Used for tag dispatch
103 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
104 constexpr transwarp::root_type root{}; ///< The root task tag
105 
106 /// The accept type. Used for tag dispatch
107 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
108 constexpr transwarp::accept_type accept{}; ///< The accept task tag
109 
110 /// The accept_any type. Used for tag dispatch
111 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
112 constexpr transwarp::accept_any_type accept_any{}; ///< The accept_any task tag
113 
114 /// The consume type. Used for tag dispatch
115 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
116 constexpr transwarp::consume_type consume{}; ///< The consume task tag
117 
118 /// The consume_any type. Used for tag dispatch
119 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
120 constexpr transwarp::consume_any_type consume_any{}; ///< The consume_any task tag
121 
122 /// The wait type. Used for tag dispatch
123 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
124 constexpr transwarp::wait_type wait{}; ///< The wait task tag
125 
126 /// The wait_any type. Used for tag dispatch
127 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
128 constexpr transwarp::wait_any_type wait_any{}; ///< The wait_any task tag
129 
130 
131 /// Detail namespace for internal functionality only
132 namespace detail {
133 
134 struct visit_depth;
135 struct unvisit;
136 struct final_visitor;
137 struct schedule_visitor;
138 struct node_manip;
139 template<bool>
141 
142 } // detail
143 
144 
145 /// A node carrying meta-data of a task
146 class node {
147 public:
148 
149  node() = default;
150 
151  // delete copy/move semantics
152  node(const node&) = delete;
153  node& operator=(const node&) = delete;
154  node(node&&) = delete;
155  node& operator=(node&&) = delete;
156 
157  /// The task ID
158  std::size_t get_id() const noexcept {
159  return id_;
160  }
161 
162  /// The task level
163  std::size_t get_level() const noexcept {
164  return level_;
165  }
166 
167  /// The task type
168  transwarp::task_type get_type() const noexcept {
169  return type_;
170  }
171 
172  /// The optional task name (may be null)
173  const std::shared_ptr<std::string>& get_name() const noexcept {
174  return name_;
175  }
176 
177  /// The optional, task-specific executor (may be null)
178  const std::shared_ptr<std::string>& get_executor() const noexcept {
179  return executor_;
180  }
181 
182  /// The task's parents (may be empty)
183  const std::vector<std::shared_ptr<node>>& get_parents() const noexcept {
184  return parents_;
185  }
186 
187  /// The task priority (defaults to 0)
188  std::size_t get_priority() const noexcept {
189  return priority_;
190  }
191 
192  /// The custom task data (may be null)
193  const std::shared_ptr<void>& get_custom_data() const noexcept {
194  return custom_data_;
195  }
196 
197  /// Returns whether the associated task is canceled
198  bool is_canceled() const noexcept {
199  return canceled_.load();
200  }
201 
202 private:
203  friend struct transwarp::detail::node_manip;
204 
205  std::size_t id_ = 0;
206  std::size_t level_ = 0;
208  std::shared_ptr<std::string> name_;
209  std::shared_ptr<std::string> executor_;
210  std::vector<std::shared_ptr<node>> parents_;
211  std::size_t priority_ = 0;
212  std::shared_ptr<void> custom_data_;
213  std::atomic_bool canceled_{false};
214 };
215 
216 /// String conversion for the node class
217 inline std::string to_string(const transwarp::node& node, const std::string& seperator="\n") {
218  std::string s;
219  s += '"';
220  const std::shared_ptr<std::string>& name = node.get_name();
221  if (name) {
222  s += "<" + *name + ">" + seperator;
223  }
224  s += transwarp::to_string(node.get_type());
225  s += " id=" + std::to_string(node.get_id());
226  s += " lev=" + std::to_string(node.get_level());
227  const std::shared_ptr<std::string>& exec = node.get_executor();
228  if (exec) {
229  s += seperator + "<" + *exec + ">";
230  }
231  s += '"';
232  return s;
233 }
234 
235 
236 /// An edge between two nodes
237 class edge {
238 public:
239  // cppcheck-suppress passedByValue
240  edge(std::shared_ptr<transwarp::node> parent, std::shared_ptr<transwarp::node> child) noexcept
241  : parent_(std::move(parent)), child_(std::move(child))
242  {}
243 
244  // default copy/move semantics
245  edge(const edge&) = default;
246  edge& operator=(const edge&) = default;
247  edge(edge&&) = default;
248  edge& operator=(edge&&) = default;
249 
250  /// Returns the parent node
251  const std::shared_ptr<transwarp::node>& get_parent() const noexcept {
252  return parent_;
253  }
254 
255  /// Returns the child node
256  const std::shared_ptr<transwarp::node>& get_child() const noexcept {
257  return child_;
258  }
259 
260 private:
261  std::shared_ptr<transwarp::node> parent_;
262  std::shared_ptr<transwarp::node> child_;
263 };
264 
265 /// String conversion for the edge class
266 inline std::string to_string(const transwarp::edge& edge, const std::string& separator="\n") {
267  return transwarp::to_string(*edge.get_parent(), separator) + " -> " + transwarp::to_string(*edge.get_child(), separator);
268 }
269 
270 
271 /// Creates a dot-style string from the given graph
272 inline std::string to_string(const std::vector<transwarp::edge>& graph, const std::string& separator="\n") {
273  std::string dot = "digraph {" + separator;
274  for (const transwarp::edge& edge : graph) {
275  dot += transwarp::to_string(edge, separator) + separator;
276  }
277  dot += "}";
278  return dot;
279 }
280 
281 
282 /// The executor interface used to perform custom task execution
283 class executor {
284 public:
285  virtual ~executor() = default;
286 
287  /// Returns the name of the executor
288  virtual std::string get_name() const = 0;
289 
290  /// Runs a task which is wrapped by the given functor. The functor only
291  /// captures one shared pointer and can hence be copied at low cost.
292  /// node represents the task that the functor belongs to.
293  /// This function is only ever called on the thread of the caller to schedule().
294  /// The implementer needs to ensure that this never throws exceptions
295  virtual void execute(const std::function<void()>& functor, const std::shared_ptr<transwarp::node>& node) = 0;
296 };
297 
298 
299 /// The task events that can be subscribed to using the listener interface
300 enum class event_type {
301  before_scheduled, ///< Just before a task is scheduled (handle_event called on thread of caller to schedule())
302  before_started, ///< Just before a task starts running (handle_event called on thread that task is run on)
303  before_invoked, ///< Just before a task's functor is invoked (handle_event called on thread that task is run on)
304  after_finished, ///< Just after a task has finished running (handle_event called on thread that task is run on)
305  after_canceled, ///< Just after a task was canceled (handle_event called on thread that task is run on)
306  count,
307 };
308 
309 
310 /// The listener interface to listen to events raised by tasks
311 class listener {
312 public:
313  virtual ~listener() = default;
314 
315  /// This may be called from arbitrary threads depending on the event type (see transwarp::event_type).
316  /// The implementer needs to ensure that this never throws exceptions
317  virtual void handle_event(transwarp::event_type event, const std::shared_ptr<transwarp::node>& node) = 0;
318 };
319 
320 
321 /// Determines in which order tasks are scheduled in the graph
322 enum class schedule_type {
323  breadth, ///< Scheduling according to a breadth-first search (default)
324  depth, ///< Scheduling according to a depth-first search
325 };
326 
327 
328 /// An interface for the task class
329 class itask {
330 public:
331  virtual ~itask() = default;
332 
333  virtual void set_executor(std::shared_ptr<transwarp::executor> executor) = 0;
334  virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
335  virtual void remove_executor() = 0;
336  virtual void remove_executor_all() = 0;
337  virtual void set_priority(std::size_t priority) = 0;
338  virtual void set_priority_all(std::size_t priority) = 0;
339  virtual void reset_priority() = 0;
340  virtual void reset_priority_all() = 0;
341  virtual void set_custom_data(std::shared_ptr<void> custom_data) = 0;
342  virtual void set_custom_data_all(std::shared_ptr<void> custom_data) = 0;
343  virtual void remove_custom_data() = 0;
344  virtual void remove_custom_data_all() = 0;
345  virtual const std::shared_ptr<transwarp::node>& get_node() const noexcept = 0;
346  virtual void add_listener(std::shared_ptr<transwarp::listener> listener) = 0;
347  virtual void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
348  virtual void remove_listener(const std::shared_ptr<transwarp::listener>& listener) = 0;
349  virtual void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
350  virtual void remove_listeners(transwarp::event_type event) = 0;
351  virtual void remove_listeners() = 0;
352  virtual void schedule() = 0;
353  virtual void schedule(transwarp::executor& executor) = 0;
354  virtual void schedule(bool reset) = 0;
355  virtual void schedule(transwarp::executor& executor, bool reset) = 0;
356  virtual void schedule_all() = 0;
357  virtual void schedule_all(transwarp::executor& executor) = 0;
358  virtual void schedule_all(bool reset_all) = 0;
359  virtual void schedule_all(transwarp::executor& executor, bool reset_all) = 0;
360  virtual void schedule_all(transwarp::schedule_type type) = 0;
361  virtual void schedule_all(transwarp::executor& executor, transwarp::schedule_type type) = 0;
362  virtual void schedule_all(transwarp::schedule_type type, bool reset_all) = 0;
363  virtual void schedule_all(transwarp::executor& executor, transwarp::schedule_type type, bool reset_all) = 0;
364  virtual void set_exception(std::exception_ptr exception) = 0;
365  virtual bool was_scheduled() const noexcept = 0;
366  virtual void wait() const = 0;
367  virtual bool is_ready() const = 0;
368  virtual bool has_result() const = 0;
369  virtual void reset() = 0;
370  virtual void reset_all() = 0;
371  virtual void cancel(bool enabled) noexcept = 0;
372  virtual void cancel_all(bool enabled) noexcept = 0;
373  virtual std::vector<transwarp::edge> get_graph() const = 0;
374 
375 protected:
376  virtual void schedule_impl(bool reset, transwarp::executor* executor=nullptr) = 0;
377 
378 private:
379  friend struct transwarp::detail::visit_depth;
380  friend struct transwarp::detail::unvisit;
381  friend struct transwarp::detail::final_visitor;
383 
384  virtual void visit_depth(const std::function<void(itask&)>& visitor) = 0;
385  virtual void unvisit() noexcept = 0;
386  virtual void set_node_id(std::size_t id) noexcept = 0;
387 };
388 
389 
390 /// Removes reference and const from a type
391 template<typename T>
392 struct decay {
393  using type = typename std::remove_const<typename std::remove_reference<T>::type>::type;
394 };
395 
396 
397 /// Returns the result type of a std::shared_future<T>
398 template<typename T>
399 struct result {
400  using type = typename std::result_of<decltype(&std::shared_future<T>::get)(std::shared_future<T>)>::type;
401 };
402 
403 
404 /// The task class
405 template<typename ResultType>
406 class task : public transwarp::itask {
407 public:
408  using result_type = ResultType;
409 
410  virtual ~task() = default;
411 
412  virtual void set_value(const typename transwarp::decay<result_type>::type& value) = 0;
413  virtual void set_value(typename transwarp::decay<result_type>::type&& value) = 0;
414  virtual const std::shared_future<result_type>& get_future() const noexcept = 0;
415  virtual typename transwarp::result<result_type>::type get() const = 0;
416 };
417 
418 /// The task class (reference result type)
419 template<typename ResultType>
420 class task<ResultType&> : public transwarp::itask {
421 public:
422  using result_type = ResultType&;
423 
424  virtual ~task() = default;
425 
426  virtual void set_value(typename transwarp::decay<result_type>::type& value) = 0;
427  virtual const std::shared_future<result_type>& get_future() const noexcept = 0;
428  virtual typename transwarp::result<result_type>::type get() const = 0;
429 };
430 
431 /// The task class (void result type)
432 template<>
433 class task<void> : public transwarp::itask {
434 public:
435  using result_type = void;
436 
437  virtual ~task() = default;
438 
439  virtual void set_value() = 0;
440  virtual const std::shared_future<result_type>& get_future() const noexcept = 0;
441  virtual result_type get() const = 0;
442 };
443 
444 
445 /// A base class for a user-defined functor that needs access to the node associated
446 /// to the task or a cancel point to stop a task while it's running
447 class functor {
448 public:
449 
450  virtual ~functor() = default;
451 
452 protected:
453 
454  /// The node associated to the task
455  const std::shared_ptr<transwarp::node>& transwarp_node() const noexcept {
456  return transwarp_node_;
457  }
458 
459  /// If the associated task is canceled then this will throw transwarp::task_canceled
460  /// which will stop the task while it's running
461  void transwarp_cancel_point() const {
462  if (transwarp_node_->is_canceled()) {
463  throw transwarp::task_canceled(std::to_string(transwarp_node_->get_id()));
464  }
465  }
466 
467 private:
468  template<bool>
470 
471  std::shared_ptr<transwarp::node> transwarp_node_;
472 };
473 
474 
475 /// Detail namespace for internal functionality only
476 namespace detail {
477 
478 
479 /// Node manipulation
480 struct node_manip {
481 
482  static void set_id(transwarp::node& node, std::size_t id) noexcept {
483  node.id_ = id;
484  }
485 
486  static void set_level(transwarp::node& node, std::size_t level) noexcept {
487  node.level_ = level;
488  }
489 
490  static void set_type(transwarp::node& node, transwarp::task_type type) noexcept {
491  node.type_ = type;
492  }
493 
494  static void set_name(transwarp::node& node, std::shared_ptr<std::string> name) noexcept {
495  node.name_ = name;
496  }
497 
498  static void set_executor(transwarp::node& node, std::shared_ptr<std::string> executor) noexcept {
499  if (executor) {
500  node.executor_ = std::move(executor);
501  } else {
502  node.executor_.reset();
503  }
504  }
505 
506  static void add_parent(transwarp::node& node, std::shared_ptr<transwarp::node> parent) {
507  node.parents_.push_back(std::move(parent));
508  }
509 
510  static void set_priority(transwarp::node& node, std::size_t priority) noexcept {
511  node.priority_ = priority;
512  }
513 
514  static void set_custom_data(transwarp::node& node, std::shared_ptr<void> custom_data) {
515  if (custom_data) {
516  node.custom_data_ = std::move(custom_data);
517  } else {
518  node.custom_data_.reset();
519  }
520  }
521 
522  static void set_canceled(transwarp::node& node, bool enabled) noexcept {
523  node.canceled_ = enabled;
524  }
525 
526 };
527 
528 
529 /// A simple thread pool used to execute tasks in parallel
530 class thread_pool {
531 public:
532 
533  explicit thread_pool(std::size_t n_threads)
534  : done_(false)
535  {
536  if (n_threads == 0) {
537  throw transwarp::invalid_parameter("number of threads");
538  }
539  const std::size_t n_target = threads_.size() + n_threads;
540  while (threads_.size() < n_target) {
541  std::thread thread;
542  try {
543  thread = std::thread(&thread_pool::worker, this);
544  } catch (...) {
545  shutdown();
546  throw;
547  }
548  try {
549  threads_.push_back(std::move(thread));
550  } catch (...) {
551  shutdown();
552  thread.join();
553  throw;
554  }
555  }
556  }
557 
558  // delete copy/move semantics
559  thread_pool(const thread_pool&) = delete;
560  thread_pool& operator=(const thread_pool&) = delete;
561  thread_pool(thread_pool&&) = delete;
562  thread_pool& operator=(thread_pool&&) = delete;
563 
564  ~thread_pool() {
565  shutdown();
566  }
567 
568  void push(const std::function<void()>& functor) {
569  {
570  std::lock_guard<std::mutex> lock(mutex_);
571  functors_.push(functor);
572  }
573  cond_var_.notify_one();
574  }
575 
576 private:
577 
578  void worker() {
579  for (;;) {
580  std::function<void()> functor;
581  {
582  std::unique_lock<std::mutex> lock(mutex_);
583  cond_var_.wait(lock, [this]{
584  return done_ || !functors_.empty();
585  });
586  if (done_ && functors_.empty()) {
587  break;
588  }
589  functor = functors_.front();
590  functors_.pop();
591  }
592  functor();
593  }
594  }
595 
596  void shutdown() {
597  {
598  std::lock_guard<std::mutex> lock(mutex_);
599  done_ = true;
600  }
601  cond_var_.notify_all();
602  for (std::thread& thread : threads_) {
603  thread.join();
604  }
605  threads_.clear();
606  }
607 
608  bool done_;
609  std::vector<std::thread> threads_;
610  std::queue<std::function<void()>> functors_;
611  std::condition_variable cond_var_;
612  std::mutex mutex_;
613 };
614 
615 
616 template<int offset, typename... ParentResults>
618  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
619  std::get<offset>(target) = std::get<offset>(source)->get_future();
621  }
622 };
623 
624 template<typename... ParentResults>
625 struct assign_futures_impl<-1, ParentResults...> {
626  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&, std::tuple<std::shared_future<ParentResults>...>&) {}
627 };
628 
629 /// Returns the futures from the given tuple of tasks
630 template<typename... ParentResults>
631 std::tuple<std::shared_future<ParentResults>...> get_futures(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& input) {
632  std::tuple<std::shared_future<ParentResults>...> result;
633  assign_futures_impl<static_cast<int>(sizeof...(ParentResults)) - 1, ParentResults...>::work(input, result);
634  return result;
635 }
636 
637 /// Returns the futures from the given vector of tasks
638 template<typename ParentResultType>
639 std::vector<std::shared_future<ParentResultType>> get_futures(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& input) {
640  std::vector<std::shared_future<ParentResultType>> result;
641  result.reserve(input.size());
642  for (const std::shared_ptr<transwarp::task<ParentResultType>>& task : input) {
643  result.emplace_back(task->get_future());
644  }
645  return result;
646 }
647 
648 
649 /// Runs the task with the given arguments, hence, invoking the task's functor
650 template<typename Result, typename Task, typename... Args>
651 Result run_task(std::size_t node_id, const std::weak_ptr<Task>& task, Args&&... args) {
652  const std::shared_ptr<Task> t = task.lock();
653  if (!t) {
654  throw transwarp::task_destroyed(std::to_string(node_id));
655  }
656  if (t->node_->is_canceled()) {
657  throw transwarp::task_canceled(std::to_string(node_id));
658  }
660  return t->functor_(std::forward<Args>(args)...);
661 }
662 
663 
664 inline void wait_for_all() {}
665 
666 /// Waits for all parents to finish
667 template<typename ParentResult, typename... ParentResults>
668 void wait_for_all(const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
669  parent->get_future().wait();
671 }
672 
673 
674 /// Waits for all parents to finish
675 template<typename ParentResultType>
676 void wait_for_all(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
677  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
678  parent->get_future().wait();
679  }
680 }
681 
682 
683 template<typename Parent>
684 Parent wait_for_any_impl() {
685  return {};
686 }
687 
688 template<typename Parent, typename ParentResult, typename... ParentResults>
689 Parent wait_for_any_impl(const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
690  const std::future_status status = parent->get_future().wait_for(std::chrono::microseconds(1));
691  if (status == std::future_status::ready) {
692  return parent;
693  }
694  return transwarp::detail::wait_for_any_impl<Parent>(parents...);
695 }
696 
697 /// Waits for the first parent to finish
698 template<typename Parent, typename... ParentResults>
699 Parent wait_for_any(const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
700  for (;;) {
701  Parent parent = transwarp::detail::wait_for_any_impl<Parent>(parents...);
702  if (parent) {
703  return parent;
704  }
705  }
706 }
707 
708 
709 /// Waits for the first parent to finish
710 template<typename ParentResultType>
711 std::shared_ptr<transwarp::task<ParentResultType>> wait_for_any(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
712  for (;;) {
713  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
714  const std::future_status status = parent->get_future().wait_for(std::chrono::microseconds(1));
715  if (status == std::future_status::ready) {
716  return parent;
717  }
718  }
719  }
720 }
721 
722 
723 template<typename OneResult>
724 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>&) {}
725 
726 /// Cancels all tasks but one
727 template<typename OneResult, typename ParentResult, typename... ParentResults>
728 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
729  if (one != parent) {
730  parent->cancel(true);
731  }
733 }
734 
735 
736 /// Cancels all tasks but one
737 template<typename OneResult, typename ParentResultType>
738 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
739  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
740  if (one != parent) {
741  parent->cancel(true);
742  }
743  }
744 }
745 
746 
747 template<typename TaskType, bool done, int total, int... n>
748 struct call_impl {
749  template<typename Result, typename Task, typename... ParentResults>
750  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
751  return call_impl<TaskType, total == 1 + static_cast<int>(sizeof...(n)), total, n..., static_cast<int>(sizeof...(n))>::template
752  work<Result>(node_id, task, parents);
753  }
754 };
755 
756 template<typename TaskType>
758 
759 template<int total, int... n>
760 struct call_impl<transwarp::root_type, true, total, n...> {
761  template<typename Result, typename Task, typename... ParentResults>
762  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&) {
763  return transwarp::detail::run_task<Result>(node_id, task);
764  }
765 };
766 
767 template<>
768 struct call_impl_vector<transwarp::root_type> {
769  template<typename Result, typename Task, typename ParentResultType>
770  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>&) {
771  return transwarp::detail::run_task<Result>(node_id, task);
772  }
773 };
774 
775 template<int total, int... n>
776 struct call_impl<transwarp::accept_type, true, total, n...> {
777  template<typename Result, typename Task, typename... ParentResults>
778  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
779  transwarp::detail::wait_for_all(std::get<n>(parents)...);
780  const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
781  return transwarp::detail::run_task<Result>(node_id, task, std::get<n>(futures)...);
782  }
783 };
784 
785 template<>
786 struct call_impl_vector<transwarp::accept_type> {
787  template<typename Result, typename Task, typename ParentResultType>
788  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
790  return transwarp::detail::run_task<Result>(node_id, task, transwarp::detail::get_futures(parents));
791  }
792 };
793 
794 template<int total, int... n>
795 struct call_impl<transwarp::accept_any_type, true, total, n...> {
796  template<typename Result, typename Task, typename... ParentResults>
797  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
798  using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
799  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
800  transwarp::detail::cancel_all_but_one(parent, std::get<n>(parents)...);
801  return transwarp::detail::run_task<Result>(node_id, task, parent->get_future());
802  }
803 };
804 
805 template<>
806 struct call_impl_vector<transwarp::accept_any_type> {
807  template<typename Result, typename Task, typename ParentResultType>
808  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
809  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
811  return transwarp::detail::run_task<Result>(node_id, task, parent->get_future());
812  }
813 };
814 
815 template<int total, int... n>
816 struct call_impl<transwarp::consume_type, true, total, n...> {
817  template<typename Result, typename Task, typename... ParentResults>
818  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
819  transwarp::detail::wait_for_all(std::get<n>(parents)...);
820  return transwarp::detail::run_task<Result>(node_id, task, std::get<n>(parents)->get_future().get()...);
821  }
822 };
823 
824 template<>
825 struct call_impl_vector<transwarp::consume_type> {
826  template<typename Result, typename Task, typename ParentResultType>
827  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
829  std::vector<ParentResultType> results;
830  results.reserve(parents.size());
831  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
832  results.emplace_back(parent->get_future().get());
833  }
834  return transwarp::detail::run_task<Result>(node_id, task, std::move(results));
835  }
836 };
837 
838 template<int total, int... n>
839 struct call_impl<transwarp::consume_any_type, true, total, n...> {
840  template<typename Result, typename Task, typename... ParentResults>
841  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
842  using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; /// Use first type as reference
843  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
844  transwarp::detail::cancel_all_but_one(parent, std::get<n>(parents)...);
845  return transwarp::detail::run_task<Result>(node_id, task, parent->get_future().get());
846  }
847 };
848 
849 template<>
850 struct call_impl_vector<transwarp::consume_any_type> {
851  template<typename Result, typename Task, typename ParentResultType>
852  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
853  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
855  return transwarp::detail::run_task<Result>(node_id, task, parent->get_future().get());
856  }
857 };
858 
859 template<int total, int... n>
860 struct call_impl<transwarp::wait_type, true, total, n...> {
861  template<typename Result, typename Task, typename... ParentResults>
862  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
863  transwarp::detail::wait_for_all(std::get<n>(parents)...);
864  get_all(std::get<n>(parents)...); // Ensures that exceptions are propagated
865  return transwarp::detail::run_task<Result>(node_id, task);
866  }
867  template<typename T, typename... Args>
868  static void get_all(const T& arg, const Args& ...args) {
869  arg->get_future().get();
870  get_all(args...);
871  }
872  static void get_all() {}
873 };
874 
875 template<>
876 struct call_impl_vector<transwarp::wait_type> {
877  template<typename Result, typename Task, typename ParentResultType>
878  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
880  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
881  parent->get_future().get(); // Ensures that exceptions are propagated
882  }
883  return transwarp::detail::run_task<Result>(node_id, task);
884  }
885 };
886 
887 template<int total, int... n>
888 struct call_impl<transwarp::wait_any_type, true, total, n...> {
889  template<typename Result, typename Task, typename... ParentResults>
890  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
891  using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
892  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
893  transwarp::detail::cancel_all_but_one(parent, std::get<n>(parents)...);
894  parent->get_future().get(); // Ensures that exceptions are propagated
895  return transwarp::detail::run_task<Result>(node_id, task);
896  }
897 };
898 
899 template<>
900 struct call_impl_vector<transwarp::wait_any_type> {
901  template<typename Result, typename Task, typename ParentResultType>
902  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
903  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
905  parent->get_future().get(); // Ensures that exceptions are propagated
906  return transwarp::detail::run_task<Result>(node_id, task);
907  }
908 };
909 
910 /// Calls the functor of the given task with the results from the tuple of parents.
911 /// Throws transwarp::task_canceled if the task is canceled.
912 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
913 template<typename TaskType, typename Result, typename Task, typename... ParentResults>
914 Result call(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
915  constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_future<ParentResults>...>>::value;
917  work<Result>(node_id, task, parents);
918 }
919 
920 /// Calls the functor of the given task with the results from the vector of parents.
921 /// Throws transwarp::task_canceled if the task is canceled.
922 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
923 template<typename TaskType, typename Result, typename Task, typename ParentResultType>
924 Result call(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
926  work<Result>(node_id, task, parents);
927 }
928 
929 template<std::size_t...> struct indices {};
930 
931 template<std::size_t...> struct construct_range;
932 
933 template<std::size_t end, std::size_t idx, std::size_t... i>
934 struct construct_range<end, idx, i...> : construct_range<end, idx + 1, i..., idx> {};
935 
936 template<std::size_t end, std::size_t... i>
937 struct construct_range<end, end, i...> {
938  using type = transwarp::detail::indices<i...>;
939 };
940 
941 template<std::size_t b, std::size_t e>
942 struct index_range {
944 };
945 
946 template<typename Functor, typename... ParentResults>
947 void call_with_each_index(transwarp::detail::indices<>, const Functor&, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&) {}
948 
949 template<std::size_t i, std::size_t... j, typename Functor, typename... ParentResults>
950 void call_with_each_index(transwarp::detail::indices<i, j...>, const Functor& f, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& t) {
951  auto ptr = std::get<i>(t);
952  if (!ptr) {
953  throw transwarp::invalid_parameter("task pointer");
954  }
955  f(*ptr);
956  transwarp::detail::call_with_each_index(transwarp::detail::indices<j...>(), f, t);
957 }
958 
959 /// Calls the functor with every element in the tuple
960 template<typename Functor, typename... ParentResults>
961 void call_with_each(const Functor& f, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& t) {
962  constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>>::value;
963  using index_t = typename transwarp::detail::index_range<0, n>::type;
964  transwarp::detail::call_with_each_index(index_t(), f, t);
965 }
966 
967 /// Calls the functor with every element in the vector
968 template<typename Functor, typename ParentResultType>
969 void call_with_each(const Functor& f, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& v) {
970  for (const std::shared_ptr<transwarp::task<ParentResultType>>& ptr : v) {
971  if (!ptr) {
972  throw transwarp::invalid_parameter("task pointer");
973  }
974  f(*ptr);
975  }
976 }
977 
978 
979 /// Sets parents and level of the node
981  explicit parent_visitor(transwarp::node& node) noexcept
982  : node_(node) {}
983 
984  void operator()(const transwarp::itask& task) const {
985  transwarp::detail::node_manip::add_parent(node_, task.get_node());
986  if (node_.get_level() <= task.get_node()->get_level()) {
987  /// A child's level is always larger than any of its parents' levels
988  transwarp::detail::node_manip::set_level(node_, task.get_node()->get_level() + 1);
989  }
990  }
991 
992  transwarp::node& node_;
993 };
994 
995 /// Applies final bookkeeping to the task
997  final_visitor() noexcept
998  : id_(0) {}
999 
1000  void operator()(transwarp::itask& task) noexcept {
1001  task.set_node_id(id_++);
1002  }
1003 
1004  std::size_t id_;
1005 };
1006 
1007 /// Generates a graph
1009  explicit graph_visitor(std::vector<transwarp::edge>& graph) noexcept
1010  : graph_(graph) {}
1011 
1012  void operator()(const transwarp::itask& task) {
1013  const std::shared_ptr<transwarp::node>& node = task.get_node();
1014  for (const std::shared_ptr<transwarp::node>& parent : node->get_parents()) {
1015  graph_.emplace_back(parent, node);
1016  }
1017  }
1018 
1019  std::vector<transwarp::edge>& graph_;
1020 };
1021 
1022 /// Schedules using the given executor
1024  schedule_visitor(bool reset, transwarp::executor* executor) noexcept
1025  : reset_(reset), executor_(executor) {}
1026 
1027  void operator()(transwarp::itask& task) {
1028  task.schedule_impl(reset_, executor_);
1029  }
1030 
1031  bool reset_;
1032  transwarp::executor* executor_;
1033 };
1034 
1035 /// Resets the given task
1037 
1038  void operator()(transwarp::itask& task) const {
1039  task.reset();
1040  }
1041 };
1042 
1043 /// Cancels or resumes the given task
1045  explicit cancel_visitor(bool enabled) noexcept
1046  : enabled_(enabled) {}
1047 
1048  void operator()(transwarp::itask& task) const noexcept {
1049  task.cancel(enabled_);
1050  }
1051 
1052  bool enabled_;
1053 };
1054 
1055 /// Assigns an executor to the given task
1057  explicit set_executor_visitor(std::shared_ptr<transwarp::executor> executor) noexcept
1058  : executor_(std::move(executor)) {}
1059 
1060  void operator()(transwarp::itask& task) const noexcept {
1061  task.set_executor(executor_);
1062  }
1063 
1064  std::shared_ptr<transwarp::executor> executor_;
1065 };
1066 
1067 /// Removes the executor from the given task
1069 
1070  void operator()(transwarp::itask& task) const noexcept {
1071  task.remove_executor();
1072  }
1073 };
1074 
1075 /// Assigns a priority to the given task
1077  explicit set_priority_visitor(std::size_t priority) noexcept
1078  : priority_(priority) {}
1079 
1080  void operator()(transwarp::itask& task) const noexcept {
1081  task.set_priority(priority_);
1082  }
1083 
1084  std::size_t priority_;
1085 };
1086 
1087 /// Resets the priority of the given task
1089 
1090  void operator()(transwarp::itask& task) const noexcept {
1091  task.reset_priority();
1092  }
1093 };
1094 
1095 /// Assigns custom data to the given task
1097  explicit set_custom_data_visitor(std::shared_ptr<void> custom_data) noexcept
1098  : custom_data_(std::move(custom_data)) {}
1099 
1100  void operator()(transwarp::itask& task) const noexcept {
1101  task.set_custom_data(custom_data_);
1102  }
1103 
1104  std::shared_ptr<void> custom_data_;
1105 };
1106 
1107 /// Removes custom data from the given task
1109 
1110  void operator()(transwarp::itask& task) const noexcept {
1111  task.remove_custom_data();
1112  }
1113 };
1114 
1115 /// Pushes the given task into the vector of tasks
1117  explicit push_task_visitor(std::vector<transwarp::itask*>& tasks)
1118  : tasks_(tasks) {}
1119 
1120  void operator()(transwarp::itask& task) {
1121  tasks_.push_back(&task);
1122  }
1123 
1124  std::vector<transwarp::itask*>& tasks_;
1125 };
1126 
1127 /// Visits the given task using the visitor given in the constructor
1128 struct visit_depth {
1129  explicit visit_depth(const std::function<void(transwarp::itask&)>& visitor) noexcept
1130  : visitor_(visitor) {}
1131 
1132  void operator()(transwarp::itask& task) const {
1133  task.visit_depth(visitor_);
1134  }
1135 
1136  const std::function<void(transwarp::itask&)>& visitor_;
1137 };
1138 
1139 /// Unvisits the given task
1140 struct unvisit {
1141 
1142  void operator()(transwarp::itask& task) const noexcept {
1143  task.unvisit();
1144  }
1145 };
1146 
1147 /// Determines the result type of the Functor dispatching on the task type
1148 template<typename TaskType, typename Functor, typename... ParentResults>
1150  static_assert(std::is_same<TaskType, transwarp::root_type>::value ||
1151  std::is_same<TaskType, transwarp::accept_type>::value ||
1152  std::is_same<TaskType, transwarp::accept_any_type>::value ||
1153  std::is_same<TaskType, transwarp::consume_type>::value ||
1154  std::is_same<TaskType, transwarp::consume_any_type>::value ||
1155  std::is_same<TaskType, transwarp::wait_type>::value ||
1156  std::is_same<TaskType, transwarp::wait_any_type>::value,
1157  "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1158 };
1159 
1160 template<typename Functor, typename... ParentResults>
1161 struct functor_result<transwarp::root_type, Functor, ParentResults...> {
1162  static_assert(sizeof...(ParentResults) == 0, "A root task cannot have parent tasks");
1163  using type = decltype(std::declval<Functor>()());
1164 };
1165 
1166 template<typename Functor, typename... ParentResults>
1167 struct functor_result<transwarp::accept_type, Functor, ParentResults...> {
1168  static_assert(sizeof...(ParentResults) > 0, "An accept task must have at least one parent");
1169  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1170 };
1171 
1172 template<typename Functor, typename ParentResultType>
1173 struct functor_result<transwarp::accept_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1174  using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1175 };
1176 
1177 template<typename Functor, typename... ParentResults>
1178 struct functor_result<transwarp::accept_any_type, Functor, ParentResults...> {
1179  static_assert(sizeof...(ParentResults) > 0, "An accept_any task must have at least one parent");
1180  using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1181  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1182 };
1183 
1184 template<typename Functor, typename ParentResultType>
1185 struct functor_result<transwarp::accept_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1186  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1187 };
1188 
1189 template<typename Functor, typename... ParentResults>
1190 struct functor_result<transwarp::consume_type, Functor, ParentResults...> {
1191  static_assert(sizeof...(ParentResults) > 0, "A consume task must have at least one parent");
1192  using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1193 };
1194 
1195 template<typename Functor, typename ParentResultType>
1196 struct functor_result<transwarp::consume_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1197  using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1198 };
1199 
1200 template<typename Functor, typename... ParentResults>
1201 struct functor_result<transwarp::consume_any_type, Functor, ParentResults...> {
1202  static_assert(sizeof...(ParentResults) > 0, "A consume_any task must have at least one parent");
1203  using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1204  using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1205 };
1206 
1207 template<typename Functor, typename ParentResultType>
1208 struct functor_result<transwarp::consume_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1209  using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1210 };
1211 
1212 template<typename Functor, typename... ParentResults>
1213 struct functor_result<transwarp::wait_type, Functor, ParentResults...> {
1214  static_assert(sizeof...(ParentResults) > 0, "A wait task must have at least one parent");
1215  using type = decltype(std::declval<Functor>()());
1216 };
1217 
1218 template<typename Functor, typename ParentResultType>
1219 struct functor_result<transwarp::wait_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1220  using type = decltype(std::declval<Functor>()());
1221 };
1222 
1223 template<typename Functor, typename... ParentResults>
1224 struct functor_result<transwarp::wait_any_type, Functor, ParentResults...> {
1225  static_assert(sizeof...(ParentResults) > 0, "A wait_any task must have at least one parent");
1226  using type = decltype(std::declval<Functor>()());
1227 };
1228 
1229 template<typename Functor, typename ParentResultType>
1230 struct functor_result<transwarp::wait_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1231  using type = decltype(std::declval<Functor>()());
1232 };
1233 
1234 template<bool is_transwarp_functor>
1235 struct assign_node_if_impl;
1236 
1237 template<>
1238 struct assign_node_if_impl<true> {
1239  template<typename Functor>
1240  void operator()(Functor& functor, std::shared_ptr<transwarp::node> node) const noexcept {
1241  functor.transwarp_node_ = std::move(node);
1242  }
1243 };
1244 
1245 template<>
1246 struct assign_node_if_impl<false> {
1247  template<typename Functor>
1248  void operator()(Functor&, std::shared_ptr<transwarp::node>) const noexcept {}
1249 };
1250 
1251 /// Assigns the node to the given functor if the functor is a subclass of transwarp::functor
1252 template<typename Functor>
1253 void assign_node_if(Functor& functor, std::shared_ptr<transwarp::node> node) noexcept {
1255 }
1256 
1257 /// Returns a ready future with the given value as its state
1258 template<typename ResultType, typename Value>
1259 std::shared_future<ResultType> make_future_with_value(Value&& value) {
1260  std::promise<ResultType> promise;
1261  promise.set_value(std::forward<Value>(value));
1262  return promise.get_future();
1263 }
1264 
1265 /// Returns a ready future
1266 inline std::shared_future<void> make_ready_future() {
1267  std::promise<void> promise;
1268  promise.set_value();
1269  return promise.get_future();
1270 }
1271 
1272 /// Returns a ready future with the given exception as its state
1273 template<typename ResultType>
1274 std::shared_future<ResultType> make_future_with_exception(std::exception_ptr exception) {
1275  if (!exception) {
1276  throw transwarp::invalid_parameter("exception pointer");
1277  }
1278  std::promise<ResultType> promise;
1279  promise.set_exception(exception);
1280  return promise.get_future();
1281 }
1282 
1283 
1284 /// Determines the type of the parents
1285 template<typename... ParentResults>
1286 struct parents {
1287  using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1288 };
1289 
1290 /// Determines the type of the parents. Specialization for vector parents
1291 template<typename ParentResultType>
1292 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1293  using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1294 };
1295 
1296 
1297 template<typename ResultType, typename TaskType>
1299 protected:
1300 
1301  template<typename Task, typename Parents>
1302  void call(std::size_t node_id,
1303  const std::weak_ptr<Task>& task,
1304  const Parents& parents) {
1305  promise_.set_value(transwarp::detail::call<TaskType, ResultType>(node_id, task, parents));
1306  }
1307 
1308  std::promise<ResultType> promise_;
1309 };
1310 
1311 template<typename TaskType>
1312 class base_runner<void, TaskType> {
1313 protected:
1314 
1315  template<typename Task, typename Parents>
1316  void call(std::size_t node_id,
1317  const std::weak_ptr<Task>& task,
1318  const Parents& parents) {
1319  transwarp::detail::call<TaskType, void>(node_id, task, parents);
1320  promise_.set_value();
1321  }
1322 
1323  std::promise<void> promise_;
1324 };
1325 
1326 /// A callable to run a task given its parents
1327 template<typename ResultType, typename TaskType, typename Task, typename Parents>
1328 class runner : public transwarp::detail::base_runner<ResultType, TaskType> {
1329 public:
1330 
1331  runner(std::size_t node_id,
1332  const std::weak_ptr<Task>& task,
1333  const typename transwarp::decay<Parents>::type& parents)
1334  : node_id_(node_id),
1335  task_(task),
1336  parents_(parents)
1337  {}
1338 
1339  std::future<ResultType> get_future() {
1340  return this->promise_.get_future();
1341  }
1342 
1343  void operator()() {
1344  if (const std::shared_ptr<Task> t = task_.lock()) {
1345  t->raise_event(transwarp::event_type::before_started);
1346  }
1347  try {
1348  this->call(node_id_, task_, parents_);
1349  } catch (const transwarp::task_canceled&) {
1350  this->promise_.set_exception(std::current_exception());
1351  if (const std::shared_ptr<Task> t = task_.lock()) {
1352  t->raise_event(transwarp::event_type::after_canceled);
1353  }
1354  } catch (...) {
1355  this->promise_.set_exception(std::current_exception());
1356  }
1357  if (const std::shared_ptr<Task> t = task_.lock()) {
1358  t->raise_event(transwarp::event_type::after_finished);
1359  }
1360  }
1361 
1362 private:
1363  const std::size_t node_id_;
1364  const std::weak_ptr<Task> task_;
1365  const typename transwarp::decay<Parents>::type parents_;
1366 };
1367 
1368 
1369 /// A simple circular buffer (FIFO).
1370 /// ValueType must support default construction. The buffer lets you push
1371 /// new values onto the back and pop old values off the front.
1372 template<typename ValueType>
1374 public:
1375 
1376  static_assert(std::is_default_constructible<ValueType>::value, "ValueType must be default constructible");
1377 
1378  using value_type = ValueType;
1379 
1380  /// Constructs a circular buffer with a given fixed capacity
1381  explicit
1383  : data_(capacity)
1384  {
1385  if (capacity < 1) {
1386  throw transwarp::invalid_parameter("capacity");
1387  }
1388  }
1389 
1390  // delete copy/move semantics
1391  circular_buffer(const circular_buffer&) = delete;
1392  circular_buffer& operator=(const circular_buffer&) = delete;
1393  circular_buffer(circular_buffer&& other) = delete;
1394  circular_buffer& operator=(circular_buffer&&) = delete;
1395 
1396  /// Pushes a new value onto the end of the buffer. If that exceeds the capacity
1397  /// of the buffer then the oldest value gets dropped (the one at the front).
1398  template<typename T, typename = typename std::enable_if<std::is_same<typename std::decay<T>::type, value_type>::value>::type>
1399  void push(T&& value) {
1400  data_[end_] = std::forward<T>(value);
1401  increment();
1402  }
1403 
1404  /// Returns the value at the front of the buffer (the oldest value).
1405  /// This is undefined if the buffer is empty
1406  const value_type& front() const {
1407  return data_[front_];
1408  }
1409 
1410  /// Removes the value at the front of the buffer (the oldest value)
1411  void pop() {
1412  if (!empty()) {
1413  data_[front_] = ValueType{};
1414  decrement();
1415  }
1416  }
1417 
1418  /// Returns the capacity of the buffer
1419  std::size_t capacity() const {
1420  return data_.size();
1421  }
1422 
1423  /// Returns the number of populated values of the buffer. Its maximum value
1424  /// equals the capacity of the buffer
1425  std::size_t size() const {
1426  return size_;
1427  }
1428 
1429  /// Returns whether the buffer is empty
1430  bool empty() const {
1431  return size_ == 0;
1432  }
1433 
1434  /// Returns whether the buffer is full
1435  bool full() const {
1436  return size_ == data_.size();
1437  }
1438 
1439  /// Swaps this buffer with the given buffer
1440  void swap(circular_buffer& buffer) {
1441  std::swap(end_, buffer.end_);
1442  std::swap(front_, buffer.front_);
1443  std::swap(size_, buffer.size_);
1444  std::swap(data_, buffer.data_);
1445  }
1446 
1447 private:
1448 
1449  void increment_or_wrap(std::size_t& value) const {
1450  if (value == data_.size() - 1) {
1451  value = 0;
1452  } else {
1453  ++value;
1454  }
1455  }
1456 
1457  void increment() {
1458  increment_or_wrap(end_);
1459  if (full()) {
1460  increment_or_wrap(front_);
1461  } else {
1462  ++size_;
1463  }
1464  }
1465 
1466  void decrement() {
1467  increment_or_wrap(front_);
1468  --size_;
1469  }
1470 
1471  std::size_t end_{};
1472  std::size_t front_{};
1473  std::size_t size_{};
1474  std::vector<value_type> data_;
1475 };
1476 
1477 
1478 } // detail
1479 
1480 
1481 /// Executor for sequential execution. Runs functors sequentially on the same thread
1483 public:
1484 
1485  sequential() = default;
1486 
1487  // delete copy/move semantics
1488  sequential(const sequential&) = delete;
1489  sequential& operator=(const sequential&) = delete;
1490  sequential(sequential&&) = delete;
1491  sequential& operator=(sequential&&) = delete;
1492 
1493  /// Returns the name of the executor
1494  std::string get_name() const override {
1495  return "transwarp::sequential";
1496  }
1497 
1498  /// Runs the functor on the current thread
1499  void execute(const std::function<void()>& functor, const std::shared_ptr<transwarp::node>&) override {
1500  functor();
1501  }
1502 };
1503 
1504 
1505 /// Executor for parallel execution. Uses a simple thread pool
1507 public:
1508 
1509  explicit parallel(std::size_t n_threads)
1510  : pool_(n_threads)
1511  {}
1512 
1513  // delete copy/move semantics
1514  parallel(const parallel&) = delete;
1515  parallel& operator=(const parallel&) = delete;
1516  parallel(parallel&&) = delete;
1517  parallel& operator=(parallel&&) = delete;
1518 
1519  /// Returns the name of the executor
1520  std::string get_name() const override {
1521  return "transwarp::parallel";
1522  }
1523 
1524  /// Pushes the functor into the thread pool for asynchronous execution
1525  void execute(const std::function<void()>& functor, const std::shared_ptr<transwarp::node>&) override {
1526  pool_.push(functor);
1527  }
1528 
1529 private:
1531 };
1532 
1533 
1534 /// Detail namespace for internal functionality only
1535 namespace detail {
1536 
1537 /// The base task class that contains the functionality that can be used
1538 /// with all result types (void and non-void).
1539 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
1540 class task_impl_base : public transwarp::task<ResultType>,
1541  public std::enable_shared_from_this<task_impl_base<ResultType, TaskType, Functor, ParentResults...>> {
1542 public:
1543  /// The task type
1544  using task_type = TaskType;
1545 
1546  /// The result type of this task
1547  using result_type = ResultType;
1548 
1549  /// Assigns an executor to this task which takes precedence over
1550  /// the executor provided in schedule() or schedule_all()
1551  void set_executor(std::shared_ptr<transwarp::executor> executor) override {
1553  if (!executor) {
1554  throw transwarp::invalid_parameter("executor pointer");
1555  }
1556  executor_ = std::move(executor);
1557  transwarp::detail::node_manip::set_executor(*node_, std::shared_ptr<std::string>(new std::string(executor_->get_name())));
1558  }
1559 
1560  /// Assigns an executor to all tasks which takes precedence over
1561  /// the executor provided in schedule() or schedule_all()
1562  void set_executor_all(std::shared_ptr<transwarp::executor> executor) override {
1564  transwarp::detail::set_executor_visitor visitor(std::move(executor));
1565  visit_depth_all(visitor);
1566  }
1567 
1568  /// Removes the executor from this task
1569  void remove_executor() override {
1571  executor_.reset();
1572  transwarp::detail::node_manip::set_executor(*node_, nullptr);
1573  }
1574 
1575  /// Removes the executor from all tasks
1576  void remove_executor_all() override {
1579  visit_depth_all(visitor);
1580  }
1581 
1582  /// Sets a task priority (defaults to 0). transwarp will not directly use this.
1583  /// This is only useful if something else is using the priority (e.g. a custom executor)
1584  void set_priority(std::size_t priority) override {
1586  transwarp::detail::node_manip::set_priority(*node_, priority);
1587  }
1588 
1589  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
1590  /// This is only useful if something else is using the priority (e.g. a custom executor)
1591  void set_priority_all(std::size_t priority) override {
1593  transwarp::detail::set_priority_visitor visitor(priority);
1594  visit_depth_all(visitor);
1595  }
1596 
1597  /// Resets the task priority to 0
1598  void reset_priority() override {
1600  transwarp::detail::node_manip::set_priority(*node_, 0);
1601  }
1602 
1603  /// Resets the priority of all tasks to 0
1604  void reset_priority_all() override {
1607  visit_depth_all(visitor);
1608  }
1609 
1610  /// Assigns custom data to this task. transwarp will not directly use this.
1611  /// This is only useful if something else is using this custom data (e.g. a custom executor)
1612  void set_custom_data(std::shared_ptr<void> custom_data) override {
1614  if (!custom_data) {
1615  throw transwarp::invalid_parameter("custom data pointer");
1616  }
1617  transwarp::detail::node_manip::set_custom_data(*node_, std::move(custom_data));
1618  }
1619 
1620  /// Assigns custom data to all tasks. transwarp will not directly use this.
1621  /// This is only useful if something else is using this custom data (e.g. a custom executor)
1622  void set_custom_data_all(std::shared_ptr<void> custom_data) override {
1624  transwarp::detail::set_custom_data_visitor visitor(std::move(custom_data));
1625  visit_depth_all(visitor);
1626  }
1627 
1628  /// Removes custom data from this task
1629  void remove_custom_data() override {
1631  transwarp::detail::node_manip::set_custom_data(*node_, nullptr);
1632  }
1633 
1634  /// Removes custom data from all tasks
1635  void remove_custom_data_all() override {
1638  visit_depth_all(visitor);
1639  }
1640 
1641  /// Returns the future associated to the underlying execution
1642  const std::shared_future<result_type>& get_future() const noexcept override {
1643  return future_;
1644  }
1645 
1646  /// Returns the associated node
1647  const std::shared_ptr<transwarp::node>& get_node() const noexcept override {
1648  return node_;
1649  }
1650 
1651  /// Adds a new listener for all event types
1652  void add_listener(std::shared_ptr<transwarp::listener> listener) override {
1654  check_listener(listener);
1655  for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1656  l.push_back(listener);
1657  }
1658  }
1659 
1660  /// Adds a new listener for the given event type only
1661  void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
1663  check_listener(listener);
1664  listeners_[get_event_index(event)].push_back(std::move(listener));
1665  }
1666 
1667  /// Removes the listener for all event types
1668  void remove_listener(const std::shared_ptr<transwarp::listener>& listener) override {
1670  check_listener(listener);
1671  for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1672  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1673  }
1674  }
1675 
1676  /// Removes the listener for the given event type only
1677  void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
1679  check_listener(listener);
1680  std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_[get_event_index(event)];
1681  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1682  }
1683 
1684  /// Removes all listeners for the given event type
1687  listeners_[get_event_index(event)].clear();
1688  }
1689 
1690  /// Removes all listeners
1691  void remove_listeners() override {
1693  for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1694  l.clear();
1695  }
1696  }
1697 
1698  /// Schedules this task for execution on the caller thread.
1699  /// The task-specific executor gets precedence if it exists.
1700  /// This overload will reset the underlying future.
1701  void schedule() override {
1703  this->schedule_impl(true);
1704  }
1705 
1706  /// Schedules this task for execution on the caller thread.
1707  /// The task-specific executor gets precedence if it exists.
1708  /// reset denotes whether schedule should reset the underlying
1709  /// future and schedule even if the future is already valid.
1710  void schedule(bool reset) override {
1712  this->schedule_impl(reset);
1713  }
1714 
1715  /// Schedules this task for execution using the provided executor.
1716  /// The task-specific executor gets precedence if it exists.
1717  /// This overload will reset the underlying future.
1720  this->schedule_impl(true, &executor);
1721  }
1722 
1723  /// Schedules this task for execution using the provided executor.
1724  /// The task-specific executor gets precedence if it exists.
1725  /// reset denotes whether schedule should reset the underlying
1726  /// future and schedule even if the future is already valid.
1727  void schedule(transwarp::executor& executor, bool reset) override {
1729  this->schedule_impl(reset, &executor);
1730  }
1731 
1732  /// Schedules all tasks in the graph for execution on the caller thread.
1733  /// The task-specific executors get precedence if they exist.
1734  /// This overload will reset the underlying futures.
1735  void schedule_all() override {
1737  schedule_all_impl(true, transwarp::schedule_type::breadth);
1738  }
1739 
1740  /// Schedules all tasks in the graph for execution using the provided executor.
1741  /// The task-specific executors get precedence if they exist.
1742  /// This overload will reset the underlying futures.
1745  schedule_all_impl(true, transwarp::schedule_type::breadth, &executor);
1746  }
1747 
1748  /// Schedules all tasks in the graph for execution on the caller thread.
1749  /// The task-specific executors get precedence if they exist.
1750  /// reset_all denotes whether schedule_all should reset the underlying
1751  /// futures and schedule even if the futures are already present.
1752  void schedule_all(bool reset_all) override {
1754  schedule_all_impl(reset_all, transwarp::schedule_type::breadth);
1755  }
1756 
1757  /// Schedules all tasks in the graph for execution using the provided executor.
1758  /// The task-specific executors get precedence if they exist.
1759  /// reset_all denotes whether schedule_all should reset the underlying
1760  /// futures and schedule even if the futures are already present.
1763  schedule_all_impl(reset_all, transwarp::schedule_type::breadth, &executor);
1764  }
1765 
1766  /// Schedules all tasks in the graph for execution on the caller thread.
1767  /// The task-specific executors get precedence if they exist.
1768  /// This overload will reset the underlying futures.
1771  schedule_all_impl(true, type);
1772  }
1773 
1774  /// Schedules all tasks in the graph for execution using the provided executor.
1775  /// The task-specific executors get precedence if they exist.
1776  /// This overload will reset the underlying futures.
1779  schedule_all_impl(true, type, &executor);
1780  }
1781 
1782  /// Schedules all tasks in the graph for execution on the caller thread.
1783  /// The task-specific executors get precedence if they exist.
1784  /// reset_all denotes whether schedule_all should reset the underlying
1785  /// futures and schedule even if the futures are already present.
1788  schedule_all_impl(reset_all, type);
1789  }
1790 
1791  /// Schedules all tasks in the graph for execution using the provided executor.
1792  /// The task-specific executors get precedence if they exist.
1793  /// reset_all denotes whether schedule_all should reset the underlying
1794  /// futures and schedule even if the futures are already present.
1797  schedule_all_impl(reset_all, type, &executor);
1798  }
1799 
1800  /// Assigns an exception to this task. Scheduling will have no effect after an exception
1801  /// has been set. Calling reset() will remove the exception and re-enable scheduling.
1802  void set_exception(std::exception_ptr exception) override {
1804  future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
1805  schedule_mode_ = false;
1806  }
1807 
1808  /// Returns whether the task was scheduled and not reset afterwards.
1809  /// This means that the underlying future is valid
1810  bool was_scheduled() const noexcept override {
1811  return future_.valid();
1812  }
1813 
1814  /// Waits for the task to complete. Should only be called if was_scheduled()
1815  /// is true, throws transwarp::control_error otherwise
1816  void wait() const override {
1818  future_.wait();
1819  }
1820 
1821  /// Returns whether the task has finished processing. Should only be called
1822  /// if was_scheduled() is true, throws transwarp::control_error otherwise
1823  bool is_ready() const override {
1825  return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
1826  }
1827 
1828  /// Returns whether this task contains a result
1829  bool has_result() const noexcept override {
1830  return was_scheduled() && is_ready();
1831  }
1832 
1833  /// Resets this task
1834  void reset() override {
1836  future_ = std::shared_future<result_type>();
1837  transwarp::detail::node_manip::set_canceled(*node_, false);
1838  schedule_mode_ = true;
1839  }
1840 
1841  /// Resets all tasks in the graph
1842  void reset_all() override {
1845  visit_depth_all(visitor);
1846  }
1847 
1848  /// If enabled then this task is canceled which will
1849  /// throw transwarp::task_canceled when retrieving the task result.
1850  /// Passing false is equivalent to resume.
1851  void cancel(bool enabled) noexcept override {
1852  transwarp::detail::node_manip::set_canceled(*node_, enabled);
1853  }
1854 
1855  /// If enabled then all pending tasks in the graph are canceled which will
1856  /// throw transwarp::task_canceled when retrieving the task result.
1857  /// Passing false is equivalent to resume.
1858  void cancel_all(bool enabled) noexcept override {
1859  transwarp::detail::cancel_visitor visitor(enabled);
1860  visit_depth_all(visitor);
1861  }
1862 
1863  /// Returns the graph of the task structure. This is mainly for visualizing
1864  /// the tasks and their interdependencies. Pass the result into transwarp::to_string
1865  /// to retrieve a dot-style graph representation for easy viewing.
1866  std::vector<transwarp::edge> get_graph() const override {
1867  std::vector<transwarp::edge> graph;
1868  transwarp::detail::graph_visitor visitor(graph);
1869  const_cast<task_impl_base*>(this)->visit_depth_all(visitor);
1870  return graph;
1871  }
1872 
1873 protected:
1874 
1875  template<typename F>
1876  // cppcheck-suppress passedByValue
1877  task_impl_base(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
1878  : node_(new transwarp::node),
1879  functor_(std::forward<F>(functor)),
1880  parents_(std::move(parents)...)
1881  {
1882  init(has_name, std::move(name));
1883  }
1884 
1885  template<typename F, typename P>
1886  // cppcheck-suppress passedByValue
1887  task_impl_base(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
1888  : node_(new transwarp::node),
1889  functor_(std::forward<F>(functor)),
1890  parents_(std::move(parents))
1891  {
1892  if (parents_.empty()) {
1893  throw transwarp::invalid_parameter("parents are empty");
1894  }
1895  init(has_name, std::move(name));
1896  }
1897 
1898  void init(bool has_name, std::string name) {
1899  transwarp::detail::node_manip::set_type(*node_, task_type::value);
1900  transwarp::detail::node_manip::set_name(*node_, (has_name ? std::shared_ptr<std::string>(new std::string(std::move(name))) : nullptr));
1901  transwarp::detail::assign_node_if(functor_, node_);
1904  visit_depth(visitor);
1905  unvisit();
1906  }
1907 
1908  /// Checks if the task is currently running and throws transwarp::control_error if it is
1910  if (future_.valid() && future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
1911  throw transwarp::control_error("task currently running: " + transwarp::to_string(*node_, " "));
1912  }
1913  }
1914 
1915  /// Checks if the task was scheduled and throws transwarp::control_error if it's not
1917  if (!future_.valid()) {
1918  throw transwarp::control_error("task was not scheduled: " + transwarp::to_string(*node_, " "));
1919  }
1920  }
1921 
1922  bool schedule_mode_ = true;
1923  std::shared_future<result_type> future_;
1924 
1925 private:
1926 
1927  template<typename R, typename Y, typename T, typename P>
1928  friend class transwarp::detail::runner;
1929 
1930  template<typename R, typename T, typename... A>
1931  friend R transwarp::detail::run_task(std::size_t, const std::weak_ptr<T>&, A&&...);
1932 
1933  /// Assigns the given id to the node
1934  void set_node_id(std::size_t id) noexcept override {
1935  transwarp::detail::node_manip::set_id(*node_, id);
1936  }
1937 
1938  /// Schedules this task for execution using the provided executor.
1939  /// The task-specific executor gets precedence if it exists.
1940  /// Runs the task on the same thread as the caller if neither the global
1941  /// nor the task-specific executor is found.
1942  void schedule_impl(bool reset, transwarp::executor* executor=nullptr) override {
1943  if (schedule_mode_ && (reset || !future_.valid())) {
1944  if (reset) {
1945  transwarp::detail::node_manip::set_canceled(*node_, false);
1946  }
1947  std::weak_ptr<task_impl_base> self = this->shared_from_this();
1949  std::shared_ptr<runner_t> runner = std::shared_ptr<runner_t>(new runner_t(node_->get_id(), self, parents_));
1951  future_ = runner->get_future();
1952  if (executor_) {
1953  executor_->execute([runner]{ (*runner)(); }, node_);
1954  } else if (executor) {
1955  executor->execute([runner]{ (*runner)(); }, node_);
1956  } else {
1957  (*runner)();
1958  }
1959  }
1960  }
1961 
1962  /// Schedules all tasks in the graph for execution using the provided executor.
1963  /// The task-specific executors get precedence if they exist.
1964  /// Runs tasks on the same thread as the caller if neither the global
1965  /// nor a task-specific executor is found.
1966  void schedule_all_impl(bool reset_all, transwarp::schedule_type type, transwarp::executor* executor=nullptr) {
1967  transwarp::detail::schedule_visitor visitor(reset_all, executor);
1968  switch (type) {
1970  visit_breadth_all(visitor);
1971  break;
1973  visit_depth_all(visitor);
1974  break;
1975  default:
1976  throw transwarp::invalid_parameter("schedule type");
1977  }
1978  }
1979 
1980  /// Visits all tasks in a breadth-first traversal.
1981  template<typename Visitor>
1982  void visit_breadth_all(Visitor& visitor) {
1983  if (breadth_tasks_.empty()) {
1984  breadth_tasks_.reserve(node_->get_id() + 1);
1985  visit_depth(transwarp::detail::push_task_visitor(breadth_tasks_));
1986  unvisit();
1987  auto compare = [](const transwarp::itask* const l, const transwarp::itask* const r) {
1988  const std::size_t l_level = l->get_node()->get_level();
1989  const std::size_t l_id = l->get_node()->get_id();
1990  const std::size_t r_level = r->get_node()->get_level();
1991  const std::size_t r_id = r->get_node()->get_id();
1992  return std::tie(l_level, l_id) < std::tie(r_level, r_id);
1993  };
1994  std::sort(breadth_tasks_.begin(), breadth_tasks_.end(), compare);
1995  }
1996  for (transwarp::itask* task : breadth_tasks_) {
1997  visitor(*task);
1998  }
1999  }
2000 
2001  /// Visits all tasks in a depth-first traversal.
2002  template<typename Visitor>
2003  void visit_depth_all(Visitor& visitor) {
2004  if (depth_tasks_.empty()) {
2005  depth_tasks_.reserve(node_->get_id() + 1);
2006  visit_depth(transwarp::detail::push_task_visitor(depth_tasks_));
2007  unvisit();
2008  }
2009  for (transwarp::itask* task : depth_tasks_) {
2010  visitor(*task);
2011  }
2012  }
2013 
2014  /// Visits each task in a depth-first traversal.
2015  void visit_depth(const std::function<void(transwarp::itask&)>& visitor) override {
2016  if (!visited_) {
2018  visitor(*this);
2019  visited_ = true;
2020  }
2021  }
2022 
2023  /// Traverses through each task and marks them as not visited.
2024  void unvisit() noexcept override {
2025  if (visited_) {
2026  visited_ = false;
2028  }
2029  }
2030 
2031  /// Returns the index for a given event type
2032  std::size_t get_event_index(transwarp::event_type event) const {
2033  const std::size_t index = static_cast<std::size_t>(event);
2034  if (index >= static_cast<std::size_t>(transwarp::event_type::count)) {
2035  throw transwarp::invalid_parameter("event type");
2036  }
2037  return index;
2038  }
2039 
2040  /// Raises the given event to all listeners
2041  void raise_event(transwarp::event_type event) const {
2042  for (const std::shared_ptr<transwarp::listener>& listener : listeners_[static_cast<std::size_t>(event)]) {
2043  listener->handle_event(event, node_);
2044  }
2045  }
2046 
2047  /// Check for non-null listener pointer
2048  void check_listener(const std::shared_ptr<transwarp::listener>& listener) const {
2049  if (!listener) {
2050  throw transwarp::invalid_parameter("listener pointer");
2051  }
2052  }
2053 
2054  std::shared_ptr<transwarp::node> node_;
2055  Functor functor_;
2056  typename transwarp::detail::parents<ParentResults...>::type parents_;
2057  bool visited_ = false;
2058  std::shared_ptr<transwarp::executor> executor_;
2059  std::vector<std::shared_ptr<transwarp::listener>> listeners_[static_cast<std::size_t>(transwarp::event_type::count)];
2060  std::vector<transwarp::itask*> depth_tasks_;
2061  std::vector<transwarp::itask*> breadth_tasks_;
2062 };
2063 
2064 
2065 /// A task proxy
2066 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2067 class task_impl_proxy : public transwarp::detail::task_impl_base<ResultType, TaskType, Functor, ParentResults...> {
2068 public:
2069  /// The task type
2070  using task_type = TaskType;
2071 
2072  /// The result type of this task
2073  using result_type = ResultType;
2074 
2075  /// Assigns a value to this task. Scheduling will have no effect after a value
2076  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2077  void set_value(const typename transwarp::decay<result_type>::type& value) override {
2078  this->ensure_task_not_running();
2079  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2080  this->schedule_mode_ = false;
2081  }
2082 
2083  /// Assigns a value to this task. Scheduling will have no effect after a value
2084  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2085  void set_value(typename transwarp::decay<result_type>::type&& value) override {
2086  this->ensure_task_not_running();
2087  this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2088  this->schedule_mode_ = false;
2089  }
2090 
2091  /// Returns the result of this task. Throws any exceptions that the underlying
2092  /// functor throws. Should only be called if was_scheduled() is true,
2093  /// throws transwarp::control_error otherwise
2094  typename transwarp::result<result_type>::type get() const override {
2095  this->ensure_task_was_scheduled();
2096  return this->future_.get();
2097  }
2098 
2099 protected:
2100 
2101  template<typename F>
2102  // cppcheck-suppress passedByValue
2103  // cppcheck-suppress uninitMemberVar
2104  task_impl_proxy(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2105  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents)...)
2106  {}
2107 
2108  template<typename F, typename P>
2109  // cppcheck-suppress passedByValue
2110  // cppcheck-suppress uninitMemberVar
2111  task_impl_proxy(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2112  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents))
2113  {}
2114 
2115 };
2116 
2117 /// A task proxy for reference result type.
2118 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2119 class task_impl_proxy<ResultType&, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<ResultType&, TaskType, Functor, ParentResults...> {
2120 public:
2121  /// The task type
2122  using task_type = TaskType;
2123 
2124  /// The result type of this task
2125  using result_type = ResultType&;
2126 
2127  /// Assigns a value to this task. Scheduling will have no effect after a value
2128  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2129  void set_value(typename transwarp::decay<result_type>::type& value) override {
2130  this->ensure_task_not_running();
2131  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2132  this->schedule_mode_ = false;
2133  }
2134 
2135  /// Returns the result of this task. Throws any exceptions that the underlying
2136  /// functor throws. Should only be called if was_scheduled() is true,
2137  /// throws transwarp::control_error otherwise
2138  typename transwarp::result<result_type>::type get() const override {
2139  this->ensure_task_was_scheduled();
2140  return this->future_.get();
2141  }
2142 
2143 protected:
2144 
2145  template<typename F>
2146  // cppcheck-suppress passedByValue
2147  // cppcheck-suppress uninitMemberVar
2148  task_impl_proxy(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2149  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents)...)
2150  {}
2151 
2152  template<typename F, typename P>
2153  // cppcheck-suppress passedByValue
2154  // cppcheck-suppress uninitMemberVar
2155  task_impl_proxy(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2156  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents))
2157  {}
2158 
2159 };
2160 
2161 /// A task proxy for void result type.
2162 template<typename TaskType, typename Functor, typename... ParentResults>
2163 class task_impl_proxy<void, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<void, TaskType, Functor, ParentResults...> {
2164 public:
2165  /// The task type
2166  using task_type = TaskType;
2167 
2168  /// The result type of this task
2169  using result_type = void;
2170 
2171  /// Assigns a value to this task. Scheduling will have no effect after a call
2172  /// to this. Calling reset() will reset this and re-enable scheduling.
2173  void set_value() override {
2174  this->ensure_task_not_running();
2175  this->future_ = transwarp::detail::make_ready_future();
2176  this->schedule_mode_ = false;
2177  }
2178 
2179  /// Blocks until the task finishes. Throws any exceptions that the underlying
2180  /// functor throws. Should only be called if was_scheduled() is true,
2181  /// throws transwarp::control_error otherwise
2182  void get() const override {
2183  this->ensure_task_was_scheduled();
2184  this->future_.get();
2185  }
2186 
2187 protected:
2188 
2189  template<typename F>
2190  // cppcheck-suppress passedByValue
2191  // cppcheck-suppress uninitMemberVar
2192  task_impl_proxy(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2193  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents)...)
2194  {}
2195 
2196  template<typename F, typename P>
2197  // cppcheck-suppress passedByValue
2198  // cppcheck-suppress uninitMemberVar
2199  task_impl_proxy(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2200  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents))
2201  {}
2202 
2203 };
2204 
2205 } // detail
2206 
2207 
2208 /// A task representing a piece of work given by functor and parent tasks.
2209 /// By connecting tasks a directed acyclic graph is built.
2210 /// Tasks should be created using the make_task factory functions.
2211 template<typename TaskType, typename Functor, typename... ParentResults>
2212 class task_impl : public transwarp::detail::task_impl_proxy<typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type, TaskType, Functor, ParentResults...> {
2213 public:
2214  /// The task type
2215  using task_type = TaskType;
2216 
2217  /// The result type of this task
2218  using result_type = typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type;
2219 
2220  /// A task is defined by name, functor, and parent tasks.
2221  /// Note: Don't use this constructor directly, use transwarp::make_task
2222  template<typename F>
2223  // cppcheck-suppress passedByValue
2224  // cppcheck-suppress uninitMemberVar
2225  task_impl(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2226  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents)...)
2227  {}
2228 
2229  /// A task is defined by name, functor, and parent tasks.
2230  /// Note: Don't use this constructor directly, use transwarp::make_task
2231  template<typename F, typename P>
2232  // cppcheck-suppress uninitMemberVar
2233  task_impl(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2234  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents))
2235  {}
2236 
2237  // delete copy/move semantics
2238  task_impl(const task_impl&) = delete;
2239  task_impl& operator=(const task_impl&) = delete;
2240  task_impl(task_impl&&) = delete;
2241  task_impl& operator=(task_impl&&) = delete;
2242 
2243  /// Creates a continuation to this task
2244  template<typename TaskType_, typename Functor_>
2245  std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>>
2246  then(TaskType_, std::string name, Functor_&& functor) const {
2248  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Functor_>(functor), std::dynamic_pointer_cast<transwarp::task<result_type>>(const_cast<task_impl*>(this)->shared_from_this())));
2249  }
2250 
2251  /// Creates a continuation to this task. Overload for omitting for task name
2252  template<typename TaskType_, typename Functor_>
2253  std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>>
2254  then(TaskType_, Functor_&& functor) const {
2256  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Functor_>(functor), std::dynamic_pointer_cast<transwarp::task<result_type>>(const_cast<task_impl*>(this)->shared_from_this())));
2257  }
2258 
2259 };
2260 
2261 
2262 /// A value task that stores a single value and doesn't require scheduling.
2263 /// Value tasks should be created using the make_value_task factory functions.
2264 template<typename ResultType>
2265 class value_task : public transwarp::task<ResultType>,
2266  public std::enable_shared_from_this<value_task<ResultType>> {
2267 public:
2268  /// The task type
2270 
2271  /// The result type of this task
2272  using result_type = ResultType;
2273 
2274  /// A value task is defined by name and value.
2275  /// Note: Don't use this constructor directly, use transwarp::make_value_task
2276  template<typename T>
2277  // cppcheck-suppress passedByValue
2278  // cppcheck-suppress uninitMemberVar
2279  value_task(bool has_name, std::string name, T&& value)
2280  : node_(new transwarp::node),
2281  future_(transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value)))
2282  {
2283  transwarp::detail::node_manip::set_type(*node_, task_type::value);
2284  transwarp::detail::node_manip::set_name(*node_, (has_name ? std::shared_ptr<std::string>(new std::string(std::move(name))) : nullptr));
2285  }
2286 
2287  // delete copy/move semantics
2288  value_task(const value_task&) = delete;
2289  value_task& operator=(const value_task&) = delete;
2290  value_task(value_task&&) = delete;
2291  value_task& operator=(value_task&&) = delete;
2292 
2293  /// Creates a continuation to this task
2294  template<typename TaskType_, typename Functor_>
2295  std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>>
2296  then(TaskType_, std::string name, Functor_&& functor) const {
2298  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Functor_>(functor), std::dynamic_pointer_cast<transwarp::task<result_type>>(const_cast<value_task*>(this)->shared_from_this())));
2299  }
2300 
2301  /// Creates a continuation to this task. Overload for omitting the task name
2302  template<typename TaskType_, typename Functor_>
2303  std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>>
2304  then(TaskType_, Functor_&& functor) const {
2306  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Functor_>(functor), std::dynamic_pointer_cast<transwarp::task<result_type>>(const_cast<value_task*>(this)->shared_from_this())));
2307  }
2308 
2309  /// No-op because a value task never runs
2310  void set_executor(std::shared_ptr<transwarp::executor>) override {}
2311 
2312  /// No-op because a value task never runs and doesn't have parents
2313  void set_executor_all(std::shared_ptr<transwarp::executor>) override {}
2314 
2315  /// No-op because a value task never runs
2316  void remove_executor() override {}
2317 
2318  /// No-op because a value task never runs and doesn't have parents
2319  void remove_executor_all() override {}
2320 
2321  /// Sets a task priority (defaults to 0). transwarp will not directly use this.
2322  /// This is only useful if something else is using the priority
2323  void set_priority(std::size_t priority) override {
2324  transwarp::detail::node_manip::set_priority(*node_, priority);
2325  }
2326 
2327  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
2328  /// This is only useful if something else is using the priority
2329  void set_priority_all(std::size_t priority) override {
2330  set_priority(priority);
2331  }
2332 
2333  /// Resets the task priority to 0
2334  void reset_priority() override {
2335  transwarp::detail::node_manip::set_priority(*node_, 0);
2336  }
2337 
2338  /// Resets the priority of all tasks to 0
2339  void reset_priority_all() override {
2340  reset_priority();
2341  }
2342 
2343  /// Assigns custom data to this task. transwarp will not directly use this.
2344  /// This is only useful if something else is using this custom data
2345  void set_custom_data(std::shared_ptr<void> custom_data) override {
2346  if (!custom_data) {
2347  throw transwarp::invalid_parameter("custom data pointer");
2348  }
2349  transwarp::detail::node_manip::set_custom_data(*node_, std::move(custom_data));
2350  }
2351 
2352  /// Assigns custom data to all tasks. transwarp will not directly use this.
2353  /// This is only useful if something else is using this custom data
2354  void set_custom_data_all(std::shared_ptr<void> custom_data) override {
2355  set_custom_data(std::move(custom_data));
2356  }
2357 
2358  /// Removes custom data from this task
2359  void remove_custom_data() override {
2360  transwarp::detail::node_manip::set_custom_data(*node_, nullptr);
2361  }
2362 
2363  /// Removes custom data from all tasks
2364  void remove_custom_data_all() override {
2366  }
2367 
2368  /// Returns the future associated to the underlying execution
2369  const std::shared_future<result_type>& get_future() const noexcept override {
2370  return future_;
2371  }
2372 
2373  /// Returns the associated node
2374  const std::shared_ptr<transwarp::node>& get_node() const noexcept override {
2375  return node_;
2376  }
2377 
2378  /// No-op because a value task doesn't raise events
2379  void add_listener(std::shared_ptr<transwarp::listener>) override {}
2380 
2381  /// No-op because a value task doesn't raise events
2382  void add_listener(transwarp::event_type, std::shared_ptr<transwarp::listener>) override {}
2383 
2384  /// No-op because a value task doesn't raise events
2385  void remove_listener(const std::shared_ptr<transwarp::listener>&) override {}
2386 
2387  /// No-op because a value task doesn't raise events
2388  void remove_listener(transwarp::event_type, const std::shared_ptr<transwarp::listener>&) override {}
2389 
2390  /// No-op because a value task doesn't raise events
2392 
2393  /// No-op because a value task doesn't raise events
2394  void remove_listeners() override {}
2395 
2396  /// No-op because a value task never runs
2397  void schedule() override {}
2398 
2399  /// No-op because a value task never runs
2400  void schedule(transwarp::executor&) override {}
2401 
2402  /// No-op because a value task never runs
2403  void schedule(bool) override {}
2404 
2405  /// No-op because a value task never runs
2406  void schedule(transwarp::executor&, bool) override {}
2407 
2408  /// No-op because a value task never runs and doesn't have parents
2409  void schedule_all() override {}
2410 
2411  /// No-op because a value task never runs and doesn't have parents
2413 
2414  /// No-op because a value task never runs and doesn't have parents
2415  void schedule_all(bool) override {}
2416 
2417  /// No-op because a value task never runs and doesn't have parents
2418  void schedule_all(transwarp::executor&, bool) override {}
2419 
2420  /// No-op because a value task never runs and doesn't have parents
2422 
2423  /// No-op because a value task never runs and doesn't have parents
2425 
2426  /// No-op because a value task never runs and doesn't have parents
2427  void schedule_all(transwarp::schedule_type, bool) override {}
2428 
2429  /// No-op because a value task never runs and doesn't have parents
2431 
2432  /// Assigns a value to this task
2433  void set_value(const typename transwarp::decay<result_type>::type& value) override {
2434  future_ = transwarp::detail::make_future_with_value<result_type>(value);
2435  }
2436 
2437  /// Assigns a value to this task
2438  void set_value(typename transwarp::decay<result_type>::type&& value) override {
2439  future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2440  };
2441 
2442  /// Assigns an exception to this task
2443  void set_exception(std::exception_ptr exception) override {
2444  future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2445  }
2446 
2447  /// Returns true because a value task is scheduled once on construction
2448  bool was_scheduled() const noexcept override {
2449  return true;
2450  }
2451 
2452  /// No-op because a value task never runs
2453  void wait() const override {}
2454 
2455  /// Returns true because a value task is always ready
2456  bool is_ready() const override {
2457  return true;
2458  }
2459 
2460  /// Returns true because a value task always contains a result
2461  bool has_result() const noexcept override {
2462  return true;
2463  }
2464 
2465  /// Returns the result of this task
2466  typename transwarp::result<result_type>::type get() const override {
2467  return future_.get();
2468  }
2469 
2470  /// No-op because a value task never runs
2471  void reset() override {}
2472 
2473  /// No-op because a value task never runs and doesn't have parents
2474  void reset_all() override {}
2475 
2476  /// No-op because a value task never runs
2477  void cancel(bool) noexcept override {}
2478 
2479  /// No-op because a value task never runs and doesn't have parents
2480  void cancel_all(bool) noexcept override {}
2481 
2482  /// Returns an empty graph because a value task doesn't have parents
2483  std::vector<transwarp::edge> get_graph() const override {
2484  return {};
2485  }
2486 
2487 private:
2488 
2489  /// Assigns the given id to the node
2490  void set_node_id(std::size_t id) noexcept override {
2491  transwarp::detail::node_manip::set_id(*node_, id);
2492  }
2493 
2494  /// No-op because a value task never runs
2495  void schedule_impl(bool, transwarp::executor*) override {}
2496 
2497  /// Visits this task
2498  void visit_depth(const std::function<void(transwarp::itask&)>& visitor) override {
2499  if (!visited_) {
2500  visitor(*this);
2501  visited_ = true;
2502  }
2503  }
2504 
2505  /// Marks this task as not visited
2506  void unvisit() noexcept override {
2507  visited_ = false;
2508  }
2509 
2510  std::shared_ptr<transwarp::node> node_;
2511  std::shared_future<result_type> future_;
2512  bool visited_ = false;
2513 };
2514 
2515 
2516 /// A factory function to create a new task
2517 template<typename TaskType, typename Functor, typename... Parents>
2518 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>>
2519 make_task(TaskType, std::string name, Functor&& functor, std::shared_ptr<Parents>... parents) {
2520  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>;
2521  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Functor>(functor), std::move(parents)...));
2522 }
2523 
2524 /// A factory function to create a new task. Overload for omitting the task name
2525 template<typename TaskType, typename Functor, typename... Parents>
2526 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>>
2527 make_task(TaskType, Functor&& functor, std::shared_ptr<Parents>... parents) {
2528  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>;
2529  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Functor>(functor), std::move(parents)...));
2530 }
2531 
2532 
2533 /// A factory function to create a new task with vector parents
2534 template<typename TaskType, typename Functor, typename ParentType>
2535 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>>
2536 make_task(TaskType, std::string name, Functor&& functor, std::vector<ParentType> parents) {
2537  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>;
2538  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Functor>(functor), std::move(parents)));
2539 }
2540 
2541 /// A factory function to create a new task with vector parents. Overload for omitting the task name
2542 template<typename TaskType, typename Functor, typename ParentType>
2543 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>>
2544 make_task(TaskType, Functor&& functor, std::vector<ParentType> parents) {
2545  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>;
2546  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Functor>(functor), std::move(parents)));
2547 }
2548 
2549 
2550 /// A factory function to create a new value task
2551 template<typename Value>
2552 std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>>
2553 make_value_task(std::string name, Value&& value) {
2555  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Value>(value)));
2556 }
2557 
2558 /// A factory function to create a new value task. Overload for omitting the task name
2559 template<typename Value>
2560 std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>>
2561 make_value_task(Value&& value) {
2563  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Value>(value)));
2564 }
2565 
2566 
2567 /// A graph interface giving access to the final task as required by transwarp::graph_pool
2568 template<typename FinalResultType>
2569 class graph {
2570 public:
2571 
2572  virtual ~graph() = default;
2573 
2574  /// Returns the final task of the graph
2575  virtual const std::shared_ptr<transwarp::task<FinalResultType>>& final_task() const = 0;
2576 };
2577 
2578 
2579 /// A graph pool that allows running multiple instances of the same graph in parallel
2580 template<typename Graph>
2581 class graph_pool {
2582 public:
2583 
2584  /// Constructs a graph pool by passing a generator to create a new graph
2585  /// and a minimum and maximum size of the pool. The minimum size is used as
2586  /// the initial size of the pool. Graph should be a subclass of transwarp::graph
2587  graph_pool(std::function<std::shared_ptr<Graph>()> generator,
2588  std::size_t minimum_size,
2589  std::size_t maximum_size)
2590  : generator_(std::move(generator)),
2591  minimum_(minimum_size),
2592  maximum_(maximum_size),
2593  finished_(maximum_size)
2594  {
2595  if (minimum_ < 1) {
2596  throw transwarp::invalid_parameter("minimum size");
2597  }
2598  if (minimum_ > maximum_) {
2599  throw transwarp::invalid_parameter("minimum or maximum size");
2600  }
2601  for (std::size_t i=0; i<minimum_; ++i) {
2602  idle_.push(generate());
2603  }
2604  }
2605 
2606  // delete copy/move semantics
2607  graph_pool(const graph_pool&) = delete;
2608  graph_pool& operator=(const graph_pool&) = delete;
2609  graph_pool(graph_pool&&) = delete;
2610  graph_pool& operator=(graph_pool&&) = delete;
2611 
2612  /// Returns the next idle graph.
2613  /// If there are no idle graphs then it will attempt to double the
2614  /// pool size. If that fails then it will return a nullptr. On successful
2615  /// retrieval of an idle graph the function will mark that graph as busy.
2616  std::shared_ptr<Graph> next_idle_graph(bool maybe_resize=true) {
2617  std::shared_ptr<transwarp::node> finished_node;
2618  {
2619  std::lock_guard<spinlock> lock(spinlock_);
2620  if (!finished_.empty()) {
2621  finished_node = finished_.front(); finished_.pop();
2622  }
2623  }
2624 
2625  std::shared_ptr<Graph> g;
2626  if (finished_node) {
2627  g = busy_.find(finished_node)->second;
2628  } else {
2629  if (maybe_resize && idle_.empty()) {
2630  resize(size() * 2); // double pool size
2631  }
2632  if (idle_.empty()) {
2633  return nullptr;
2634  }
2635  g = idle_.front(); idle_.pop();
2636  busy_.emplace(g->final_task()->get_node(), g);
2637  }
2638 
2639  const auto& future = g->final_task()->get_future();
2640  if (future.valid()) {
2641  future.wait(); // will return immediately
2642  }
2643  return g;
2644  }
2645 
2646  /// Just like next_idle_graph() but waits for a graph to become available.
2647  /// The returned graph will always be a valid pointer
2648  std::shared_ptr<Graph> wait_for_next_idle_graph(bool maybe_resize=true) {
2649  for (;;) {
2650  std::shared_ptr<Graph> g = next_idle_graph(maybe_resize);
2651  if (g) {
2652  return g;
2653  }
2654  }
2655  }
2656 
2657  /// Returns the current total size of the pool (sum of idle and busy graphs)
2658  std::size_t size() const {
2659  return idle_.size() + busy_.size();
2660  }
2661 
2662  /// Returns the minimum size of the pool
2663  std::size_t minimum_size() const {
2664  return minimum_;
2665  }
2666 
2667  /// Returns the maximum size of the pool
2668  std::size_t maximum_size() const {
2669  return maximum_;
2670  }
2671 
2672  /// Returns the number of idle graphs in the pool
2673  std::size_t idle_count() const {
2674  std::lock_guard<spinlock> lock(spinlock_);
2675  return idle_.size() + finished_.size();
2676  }
2677 
2678  /// Returns the number of busy graphs in the pool
2679  std::size_t busy_count() const {
2680  std::lock_guard<spinlock> lock(spinlock_);
2681  return busy_.size() - finished_.size();
2682  }
2683 
2684  /// Resizes the graph pool to the given new size if possible
2685  void resize(std::size_t new_size) {
2686  reclaim();
2687  if (new_size > size()) { // grow
2688  const std::size_t count = new_size - size();
2689  for (std::size_t i=0; i<count; ++i) {
2690  if (size() == maximum_) {
2691  break;
2692  }
2693  idle_.push(generate());
2694  }
2695  } else if (new_size < size()) { // shrink
2696  const std::size_t count = size() - new_size;
2697  for (std::size_t i=0; i<count; ++i) {
2698  if (idle_.empty() || size() == minimum_) {
2699  break;
2700  }
2701  idle_.pop();
2702  }
2703  }
2704  }
2705 
2706  /// Reclaims finished graphs by marking them as idle again
2707  void reclaim() {
2708  decltype(finished_) finished{finished_.capacity()};
2709  {
2710  std::lock_guard<spinlock> lock(spinlock_);
2711  finished_.swap(finished);
2712  }
2713  while (!finished.empty()) {
2714  const std::shared_ptr<transwarp::node> node = finished.front(); finished.pop();
2715  const auto it = busy_.find(node);
2716  idle_.push(it->second);
2717  busy_.erase(it);
2718  }
2719  }
2720 
2721 private:
2722 
2723  class spinlock {
2724  public:
2725 
2726  void lock() noexcept {
2727  while (locked_.test_and_set(std::memory_order_acquire));
2728  }
2729 
2730  void unlock() noexcept {
2731  locked_.clear(std::memory_order_release);
2732  }
2733 
2734  private:
2735  std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
2736  };
2737 
2738  class finished_listener : public transwarp::listener {
2739  public:
2740 
2741  explicit
2742  finished_listener(graph_pool<Graph>& pool)
2743  : pool_(pool)
2744  {}
2745 
2746  // Called on a potentially high-priority thread
2747  void handle_event(transwarp::event_type, const std::shared_ptr<transwarp::node>& node) override {
2748  std::lock_guard<spinlock> lock(pool_.spinlock_);
2749  pool_.finished_.push(node);
2750  }
2751 
2752  private:
2753  graph_pool<Graph>& pool_;
2754  };
2755 
2756  std::shared_ptr<Graph> generate() {
2757  std::shared_ptr<Graph> graph = generator_();
2758  graph->final_task()->add_listener(transwarp::event_type::after_finished, listener_);
2759  return graph;
2760  }
2761 
2762  std::function<std::shared_ptr<Graph>()> generator_;
2763  std::size_t minimum_;
2764  std::size_t maximum_;
2765  mutable spinlock spinlock_; // protecting finished_
2767  std::queue<std::shared_ptr<Graph>> idle_;
2768  std::unordered_map<std::shared_ptr<transwarp::node>, std::shared_ptr<Graph>> busy_;
2769  std::shared_ptr<transwarp::listener> listener_{new finished_listener(*this)};
2770 };
2771 
2772 
2773 } // transwarp
void set_custom_data_all(std::shared_ptr< void > custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1622
constexpr transwarp::wait_any_type wait_any
The wait_any task tag.
Definition: transwarp.h:128
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2364
The executor interface used to perform custom task execution.
Definition: transwarp.h:283
Exception thrown when a task was destroyed prematurely.
Definition: transwarp.h:61
A simple thread pool used to execute tasks in parallel.
Definition: transwarp.h:530
void cancel_all(bool) noexceptoverride
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2480
void operator()(const transwarp::itask &task) const
Definition: transwarp.h:984
Node manipulation.
Definition: transwarp.h:480
void remove_listeners(transwarp::event_type) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2391
Generates a graph.
Definition: transwarp.h:1008
The consume type. Used for tag dispatch.
Definition: transwarp.h:115
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1149
void schedule_all(transwarp::executor &) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2412
Removes the executor from the given task.
Definition: transwarp.h:1068
Definition: transwarp.h:931
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:1604
Definition: transwarp.h:140
The task has no parents.
Just after a task was canceled (handle_event called on thread that task is run on) ...
TaskType task_type
The task type.
Definition: transwarp.h:2166
A callable to run a task given its parents.
Definition: transwarp.h:1328
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task.
Definition: transwarp.h:2443
void schedule_all(bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1752
void ensure_task_was_scheduled() const
Checks if the task was scheduled and throws transwarp::control_error if it&#39;s not. ...
Definition: transwarp.h:1916
Sets parents and level of the node.
Definition: transwarp.h:980
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, std::string name, Functor_ &&functor) const
Creates a continuation to this task.
Definition: transwarp.h:2246
std::shared_ptr< transwarp::value_task< typename transwarp::decay< Value >::type > > make_value_task(std::string name, Value &&value)
A factory function to create a new value task.
Definition: transwarp.h:2553
Definition: transwarp.h:1298
void set_executor(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs.
Definition: transwarp.h:2310
void set_executor_all(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to all tasks which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1562
bool was_scheduled() const noexceptoverride
Returns whether the task was scheduled and not reset afterwards. This means that the underlying futur...
Definition: transwarp.h:1810
void cancel(bool) noexceptoverride
No-op because a value task never runs.
Definition: transwarp.h:2477
Just before a task is scheduled (handle_event called on thread of caller to schedule()) ...
void schedule_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2409
void schedule_all(transwarp::executor &executor, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1761
std::size_t minimum_size() const
Returns the minimum size of the pool.
Definition: transwarp.h:2663
std::string get_name() const override
Returns the name of the executor.
Definition: transwarp.h:1520
Assigns a priority to the given task.
Definition: transwarp.h:1076
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task.
Definition: transwarp.h:2433
void push(T &&value)
Pushes a new value onto the end of the buffer. If that exceeds the capacity of the buffer then the ol...
Definition: transwarp.h:1399
void schedule(transwarp::executor &executor) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1718
Assigns an executor to the given task.
Definition: transwarp.h:1056
std::size_t capacity() const
Returns the capacity of the buffer.
Definition: transwarp.h:1419
A simple circular buffer (FIFO). ValueType must support default construction. The buffer lets you pus...
Definition: transwarp.h:1373
Unvisits the given task.
Definition: transwarp.h:1140
void set_priority(std::size_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:2323
void cancel_all(bool enabled) noexceptoverride
If enabled then all pending tasks in the graph are canceled which will throw transwarp::task_canceled...
Definition: transwarp.h:1858
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:1629
void set_custom_data_all(std::shared_ptr< void > custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2354
std::tuple< std::shared_future< ParentResults >...> get_futures(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &input)
Returns the futures from the given tuple of tasks.
Definition: transwarp.h:631
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task.
Definition: transwarp.h:2438
void schedule_all(transwarp::schedule_type type, bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1786
Scheduling according to a breadth-first search (default)
void schedule_all() override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1735
void set_executor(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to this task which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1551
value_task(bool has_name, std::string name, T &&value)
A value task is defined by name and value. Note: Don&#39;t use this constructor directly, use transwarp::make_value_task.
Definition: transwarp.h:2279
void set_custom_data(std::shared_ptr< void > custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2345
The task class.
Definition: transwarp.h:406
Definition: transwarp.h:942
void schedule_all(transwarp::executor &executor, transwarp::schedule_type type, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1795
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2339
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2077
void set_priority(std::size_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:1584
const std::shared_ptr< std::string > & get_executor() const noexcept
The optional, task-specific executor (may be null)
Definition: transwarp.h:178
void assign_node_if(Functor &functor, std::shared_ptr< transwarp::node > node) noexcept
Assigns the node to the given functor if the functor is a subclass of transwarp::functor.
Definition: transwarp.h:1253
void reset() override
Resets this task.
Definition: transwarp.h:1834
const std::vector< std::shared_ptr< node > > & get_parents() const noexcept
The task&#39;s parents (may be empty)
Definition: transwarp.h:183
Just before a task&#39;s functor is invoked (handle_event called on thread that task is run on) ...
Definition: transwarp.h:929
The task&#39;s functor accepts the first parent future that becomes ready.
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, Functor_ &&functor) const
Creates a continuation to this task. Overload for omitting for task name.
Definition: transwarp.h:2254
The consume_any type. Used for tag dispatch.
Definition: transwarp.h:119
ResultType result_type
The result type of this task.
Definition: transwarp.h:2073
void set_custom_data(std::shared_ptr< void > custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1612
bool was_scheduled() const noexceptoverride
Returns true because a value task is scheduled once on construction.
Definition: transwarp.h:2448
void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &) override
Pushes the functor into the thread pool for asynchronous execution.
Definition: transwarp.h:1525
void wait_for_all(const std::vector< std::shared_ptr< transwarp::task< ParentResultType >>> &parents)
Waits for all parents to finish.
Definition: transwarp.h:676
The accept type. Used for tag dispatch.
Definition: transwarp.h:107
virtual const std::shared_ptr< transwarp::task< FinalResultType > > & final_task() const =0
Returns the final task of the graph.
const std::shared_future< result_type > & get_future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:1642
constexpr transwarp::accept_type accept
The accept task tag.
Definition: transwarp.h:108
void schedule_all(transwarp::executor &, transwarp::schedule_type, bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2430
void remove_listeners() override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2394
bool has_result() const noexceptoverride
Returns whether this task contains a result.
Definition: transwarp.h:1829
std::size_t maximum_size() const
Returns the maximum size of the pool.
Definition: transwarp.h:2668
The listener interface to listen to events raised by tasks.
Definition: transwarp.h:311
Exception thrown when a task is canceled.
Definition: transwarp.h:52
TaskType task_type
The task type.
Definition: transwarp.h:2215
void wait() const override
Waits for the task to complete. Should only be called if was_scheduled() is true, throws transwarp::c...
Definition: transwarp.h:1816
Scheduling according to a depth-first search.
void add_listener(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only.
Definition: transwarp.h:1661
std::size_t get_level() const noexcept
The task level.
Definition: transwarp.h:163
std::shared_future< ResultType > make_future_with_value(Value &&value)
Returns a ready future with the given value as its state.
Definition: transwarp.h:1259
std::string get_name() const override
Returns the name of the executor.
Definition: transwarp.h:1494
void schedule() override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1701
void add_listener(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2379
void add_listener(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types.
Definition: transwarp.h:1652
std::shared_ptr< Graph > next_idle_graph(bool maybe_resize=true)
Returns the next idle graph. If there are no idle graphs then it will attempt to double the pool size...
Definition: transwarp.h:2616
void ensure_task_not_running() const
Checks if the task is currently running and throws transwarp::control_error if it is...
Definition: transwarp.h:1909
Just before a task starts running (handle_event called on thread that task is run on) ...
Removes custom data from the given task.
Definition: transwarp.h:1108
void schedule_all(transwarp::executor &, bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2418
Resets the given task.
Definition: transwarp.h:1036
const std::shared_ptr< std::string > & get_name() const noexcept
The optional task name (may be null)
Definition: transwarp.h:173
bool full() const
Returns whether the buffer is full.
Definition: transwarp.h:1435
A base class for a user-defined functor that needs access to the node associated to the task or a can...
Definition: transwarp.h:447
constexpr transwarp::consume_any_type consume_any
The consume_any task tag.
Definition: transwarp.h:120
bool is_canceled() const noexcept
Returns whether the associated task is canceled.
Definition: transwarp.h:198
A node carrying meta-data of a task.
Definition: transwarp.h:146
Exception thrown when a task is used in unintended ways.
Definition: transwarp.h:79
void resize(std::size_t new_size)
Resizes the graph pool to the given new size if possible.
Definition: transwarp.h:2685
Resets the priority of the given task.
Definition: transwarp.h:1088
void schedule() override
No-op because a value task never runs.
Definition: transwarp.h:2397
std::size_t size() const
Returns the number of populated values of the buffer. Its maximum value equals the capacity of the bu...
Definition: transwarp.h:1425
Returns the result type of a std::shared_future&lt;T&gt;
Definition: transwarp.h:399
void add_listener(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2382
void wait() const override
No-op because a value task never runs.
Definition: transwarp.h:2453
void schedule_all(transwarp::executor &executor, transwarp::schedule_type type) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1777
std::size_t get_priority() const noexcept
The task priority (defaults to 0)
Definition: transwarp.h:188
const std::shared_ptr< transwarp::node > & get_parent() const noexcept
Returns the parent node.
Definition: transwarp.h:251
A task proxy.
Definition: transwarp.h:2067
void transwarp_cancel_point() const
If the associated task is canceled then this will throw transwarp::task_canceled which will stop the ...
Definition: transwarp.h:461
The base task class that contains the functionality that can be used with all result types (void and ...
Definition: transwarp.h:1540
Definition: transwarp.h:617
The task&#39;s functor consumes all parent results.
std::shared_ptr< Graph > wait_for_next_idle_graph(bool maybe_resize=true)
Just like next_idle_graph() but waits for a graph to become available. The returned graph will always...
Definition: transwarp.h:2648
Just after a task has finished running (handle_event called on thread that task is run on) ...
void remove_listener(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2385
void set_priority_all(std::size_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority.
Definition: transwarp.h:2329
Base class for exceptions.
Definition: transwarp.h:43
void schedule_all(transwarp::schedule_type) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2421
std::size_t size() const
Returns the current total size of the pool (sum of idle and busy graphs)
Definition: transwarp.h:2658
void reclaim()
Reclaims finished graphs by marking them as idle again.
Definition: transwarp.h:2707
constexpr transwarp::consume_type consume
The consume task tag.
Definition: transwarp.h:116
std::size_t get_id() const noexcept
The task ID.
Definition: transwarp.h:158
void schedule(transwarp::executor &, bool) override
No-op because a value task never runs.
Definition: transwarp.h:2406
A value task that stores a single value and doesn&#39;t require scheduling. Value tasks should be created...
Definition: transwarp.h:2265
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:2334
void remove_listeners(transwarp::event_type event) override
Removes all listeners for the given event type.
Definition: transwarp.h:1685
void remove_listener(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types.
Definition: transwarp.h:1668
Determines the type of the parents.
Definition: transwarp.h:1286
const std::shared_ptr< transwarp::node > & transwarp_node() const noexcept
The node associated to the task.
Definition: transwarp.h:455
Parent wait_for_any(const std::shared_ptr< transwarp::task< ParentResults >> &...parents)
Waits for the first parent to finish.
Definition: transwarp.h:699
void reset_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2474
void remove_listener(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only.
Definition: transwarp.h:1677
Assigns custom data to the given task.
Definition: transwarp.h:1096
The task&#39;s functor takes no arguments but waits for all parents to finish.
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task. Scheduling will have no effect after an exception has been set...
Definition: transwarp.h:1802
void cancel_all_but_one(const std::shared_ptr< transwarp::task< OneResult >> &one, const std::vector< std::shared_ptr< transwarp::task< ParentResultType >>> &parents)
Cancels all tasks but one.
Definition: transwarp.h:738
Definition: transwarp.h:757
Removes reference and const from a type.
Definition: transwarp.h:392
void remove_listeners() override
Removes all listeners.
Definition: transwarp.h:1691
const std::shared_ptr< void > & get_custom_data() const noexcept
The custom task data (may be null)
Definition: transwarp.h:193
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, std::string name, Functor_ &&functor) const
Creates a continuation to this task.
Definition: transwarp.h:2296
void call_with_each(const Functor &f, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &t)
Calls the functor with every element in the tuple.
Definition: transwarp.h:961
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:1598
void schedule_all(transwarp::executor &, transwarp::schedule_type) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2424
void schedule_all(transwarp::schedule_type, bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2427
An edge between two nodes.
Definition: transwarp.h:237
const std::shared_ptr< transwarp::node > & get_child() const noexcept
Returns the child node.
Definition: transwarp.h:256
void schedule_all(transwarp::schedule_type type) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1769
std::vector< transwarp::edge > get_graph() const override
Returns the graph of the task structure. This is mainly for visualizing the tasks and their interdepe...
Definition: transwarp.h:1866
graph_pool(std::function< std::shared_ptr< Graph >()> generator, std::size_t minimum_size, std::size_t maximum_size)
Constructs a graph pool by passing a generator to create a new graph and a minimum and maximum size o...
Definition: transwarp.h:2587
void set_value(typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2129
void schedule(transwarp::executor &) override
No-op because a value task never runs.
Definition: transwarp.h:2400
void reset_all() override
Resets all tasks in the graph.
Definition: transwarp.h:1842
A graph interface giving access to the final task as required by transwarp::graph_pool.
Definition: transwarp.h:2569
std::size_t busy_count() const
Returns the number of busy graphs in the pool.
Definition: transwarp.h:2679
circular_buffer(std::size_t capacity)
Constructs a circular buffer with a given fixed capacity.
Definition: transwarp.h:1382
Definition: transwarp.h:748
ResultType result_type
The result type of this task.
Definition: transwarp.h:2272
Executor for parallel execution. Uses a simple thread pool.
Definition: transwarp.h:1506
The wait_any type. Used for tag dispatch.
Definition: transwarp.h:127
void set_priority_all(std::size_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority (e.g. a custom executor)
Definition: transwarp.h:1591
void schedule(bool reset) override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1710
virtual std::string get_name() const =0
Returns the name of the executor.
std::string to_string(const transwarp::task_type &type)
String conversion for the task_type enumeration.
Definition: transwarp.h:88
virtual void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &node)=0
Runs a task which is wrapped by the given functor. The functor only captures one shared pointer and c...
void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &) override
Runs the functor on the current thread.
Definition: transwarp.h:1499
void remove_executor() override
Removes the executor from this task.
Definition: transwarp.h:1569
The accept_any type. Used for tag dispatch.
Definition: transwarp.h:111
transwarp::task_type get_type() const noexcept
The task type.
Definition: transwarp.h:168
std::shared_ptr< transwarp::task_impl< TaskType, typename std::decay< Functor >::type, typename Parents::result_type...> > make_task(TaskType, std::string name, Functor &&functor, std::shared_ptr< Parents >...parents)
A factory function to create a new task.
Definition: transwarp.h:2519
virtual void handle_event(transwarp::event_type event, const std::shared_ptr< transwarp::node > &node)=0
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions.
bool is_ready() const override
Returns true because a value task is always ready.
Definition: transwarp.h:2456
constexpr transwarp::wait_type wait
The wait task tag.
Definition: transwarp.h:124
The root type. Used for tag dispatch.
Definition: transwarp.h:103
The task&#39;s functor consumes the first parent result that becomes ready.
A task representing a piece of work given by functor and parent tasks. By connecting tasks a directed...
Definition: transwarp.h:2212
void remove_listener(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2388
The wait type. Used for tag dispatch.
Definition: transwarp.h:123
void cancel(bool enabled) noexceptoverride
If enabled then this task is canceled which will throw transwarp::task_canceled when retrieving the t...
Definition: transwarp.h:1851
Schedules using the given executor.
Definition: transwarp.h:1023
void swap(circular_buffer &buffer)
Swaps this buffer with the given buffer.
Definition: transwarp.h:1440
Result call(std::size_t node_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Calls the functor of the given task with the results from the tuple of parents. Throws transwarp::tas...
Definition: transwarp.h:914
constexpr transwarp::root_type root
The root task tag.
Definition: transwarp.h:104
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:2359
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2085
void schedule_all(transwarp::executor &executor) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1743
const value_type & front() const
Returns the value at the front of the buffer (the oldest value). This is undefined if the buffer is e...
Definition: transwarp.h:1406
static Result work(std::size_t node_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Definition: transwarp.h:841
schedule_type
Determines in which order tasks are scheduled in the graph.
Definition: transwarp.h:322
The task&#39;s functor accepts all parent futures.
A graph pool that allows running multiple instances of the same graph in parallel.
Definition: transwarp.h:2581
Applies final bookkeeping to the task.
Definition: transwarp.h:996
void remove_executor_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2319
typename transwarp::detail::functor_result< TaskType, Functor, ParentResults...>::type result_type
The result type of this task.
Definition: transwarp.h:2218
void schedule(transwarp::executor &executor, bool reset) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1727
std::vector< transwarp::edge > get_graph() const override
Returns an empty graph because a value task doesn&#39;t have parents.
Definition: transwarp.h:2483
const std::shared_ptr< transwarp::node > & get_node() const noexceptoverride
Returns the associated node.
Definition: transwarp.h:1647
task_impl(bool has_name, std::string name, F &&functor, std::vector< std::shared_ptr< transwarp::task< P >>> parents)
A task is defined by name, functor, and parent tasks. Note: Don&#39;t use this constructor directly...
Definition: transwarp.h:2233
task_type
The possible task types.
Definition: transwarp.h:31
void set_value() override
Assigns a value to this task. Scheduling will have no effect after a call to this. Calling reset() will reset this and re-enable scheduling.
Definition: transwarp.h:2173
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, Functor_ &&functor) const
Creates a continuation to this task. Overload for omitting the task name.
Definition: transwarp.h:2304
void pop()
Removes the value at the front of the buffer (the oldest value)
Definition: transwarp.h:1411
event_type
The task events that can be subscribed to using the listener interface.
Definition: transwarp.h:300
The task&#39;s functor takes no arguments but waits for the first parent to finish.
Exception thrown when an invalid parameter was passed to a function.
Definition: transwarp.h:70
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:1635
void reset() override
No-op because a value task never runs.
Definition: transwarp.h:2471
std::shared_future< void > make_ready_future()
Returns a ready future.
Definition: transwarp.h:1266
void set_executor_all(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2313
constexpr transwarp::accept_any_type accept_any
The accept_any task tag.
Definition: transwarp.h:112
bool empty() const
Returns whether the buffer is empty.
Definition: transwarp.h:1430
const std::shared_future< result_type > & get_future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:2369
Cancels or resumes the given task.
Definition: transwarp.h:1044
std::size_t idle_count() const
Returns the number of idle graphs in the pool.
Definition: transwarp.h:2673
void remove_executor() override
No-op because a value task never runs.
Definition: transwarp.h:2316
void schedule(bool) override
No-op because a value task never runs.
Definition: transwarp.h:2403
void schedule_all(bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2415
Visits the given task using the visitor given in the constructor.
Definition: transwarp.h:1128
Pushes the given task into the vector of tasks.
Definition: transwarp.h:1116
Executor for sequential execution. Runs functors sequentially on the same thread. ...
Definition: transwarp.h:1482
bool has_result() const noexceptoverride
Returns true because a value task always contains a result.
Definition: transwarp.h:2461
task_impl(bool has_name, std::string name, F &&functor, std::shared_ptr< transwarp::task< ParentResults >>...parents)
A task is defined by name, functor, and parent tasks. Note: Don&#39;t use this constructor directly...
Definition: transwarp.h:2225
std::shared_future< ResultType > make_future_with_exception(std::exception_ptr exception)
Returns a ready future with the given exception as its state.
Definition: transwarp.h:1274
void remove_executor_all() override
Removes the executor from all tasks.
Definition: transwarp.h:1576
const std::shared_ptr< transwarp::node > & get_node() const noexceptoverride
Returns the associated node.
Definition: transwarp.h:2374
bool is_ready() const override
Returns whether the task has finished processing. Should only be called if was_scheduled() is true...
Definition: transwarp.h:1823
An interface for the task class.
Definition: transwarp.h:329
Result run_task(std::size_t node_id, const std::weak_ptr< Task > &task, Args &&...args)
Runs the task with the given arguments, hence, invoking the task&#39;s functor.
Definition: transwarp.h:651