transwarp
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
transwarp.h
1 /// @mainpage transwarp is a header-only C++ library for task concurrency
2 /// @details https://github.com/bloomen/transwarp
3 /// @version 1.9.0
4 /// @author Christian Blume, Guan Wang
5 /// @date 2018
6 /// @copyright MIT http://www.opensource.org/licenses/mit-license.php
7 #pragma once
8 #include <algorithm>
9 #include <atomic>
10 #include <chrono>
11 #include <cstddef>
12 #include <functional>
13 #include <future>
14 #include <memory>
15 #include <mutex>
16 #include <queue>
17 #include <stdexcept>
18 #include <string>
19 #include <thread>
20 #include <tuple>
21 #include <type_traits>
22 #include <unordered_map>
23 #include <vector>
24 
25 
26 /// The transwarp namespace
27 namespace transwarp {
28 
29 
30 /// The possible task types
31 enum class task_type {
32  root, ///< The task has no parents
33  accept, ///< The task's functor accepts all parent futures
34  accept_any, ///< The task's functor accepts the first parent future that becomes ready
35  consume, ///< The task's functor consumes all parent results
36  consume_any, ///< The task's functor consumes the first parent result that becomes ready
37  wait, ///< The task's functor takes no arguments but waits for all parents to finish
38  wait_any, ///< The task's functor takes no arguments but waits for the first parent to finish
39 };
40 
41 
42 /// Base class for exceptions
43 class transwarp_error : public std::runtime_error {
44 public:
45  explicit transwarp_error(const std::string& message)
46  : std::runtime_error(message)
47  {}
48 };
49 
50 
51 /// Exception thrown when a task is canceled
53 public:
54  explicit task_canceled(const std::string& node_repr)
55  : transwarp::transwarp_error("Task canceled: " + node_repr)
56  {}
57 };
58 
59 
60 /// Exception thrown when a task was destroyed prematurely
62 public:
63  explicit task_destroyed(const std::string& node_repr)
64  : transwarp::transwarp_error("Task destroyed: " + node_repr)
65  {}
66 };
67 
68 
69 /// Exception thrown when an invalid parameter was passed to a function
71 public:
72  explicit invalid_parameter(const std::string& parameter)
73  : transwarp::transwarp_error("Invalid parameter: " + parameter)
74  {}
75 };
76 
77 
78 /// Exception thrown when a task is used in unintended ways
80 public:
81  explicit control_error(const std::string& message)
82  : transwarp::transwarp_error("Control error: " + message)
83  {}
84 };
85 
86 
87 /// String conversion for the task_type enumeration
88 inline std::string to_string(const transwarp::task_type& type) {
89  switch (type) {
90  case transwarp::task_type::root: return "root";
91  case transwarp::task_type::accept: return "accept";
92  case transwarp::task_type::accept_any: return "accept_any";
93  case transwarp::task_type::consume: return "consume";
94  case transwarp::task_type::consume_any: return "consume_any";
95  case transwarp::task_type::wait: return "wait";
96  case transwarp::task_type::wait_any: return "wait_any";
97  default: throw transwarp::invalid_parameter("task type");
98  }
99 }
100 
101 
102 /// The root type. Used for tag dispatch
103 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
104 constexpr transwarp::root_type root{}; ///< The root task tag
105 
106 /// The accept type. Used for tag dispatch
107 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
108 constexpr transwarp::accept_type accept{}; ///< The accept task tag
109 
110 /// The accept_any type. Used for tag dispatch
111 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
112 constexpr transwarp::accept_any_type accept_any{}; ///< The accept_any task tag
113 
114 /// The consume type. Used for tag dispatch
115 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
116 constexpr transwarp::consume_type consume{}; ///< The consume task tag
117 
118 /// The consume_any type. Used for tag dispatch
119 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
120 constexpr transwarp::consume_any_type consume_any{}; ///< The consume_any task tag
121 
122 /// The wait type. Used for tag dispatch
123 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
124 constexpr transwarp::wait_type wait{}; ///< The wait task tag
125 
126 /// The wait_any type. Used for tag dispatch
127 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
128 constexpr transwarp::wait_any_type wait_any{}; ///< The wait_any task tag
129 
130 
131 /// Detail namespace for internal functionality only
132 namespace detail {
133 
134 struct visit_depth_visitor;
135 struct unvisit_visitor;
136 struct final_visitor;
137 struct schedule_visitor;
138 struct node_manip;
139 template<bool>
141 
142 } // detail
143 
144 
145 /// A node carrying meta-data of a task
146 class node {
147 public:
148 
149  node() = default;
150 
151  // delete copy/move semantics
152  node(const node&) = delete;
153  node& operator=(const node&) = delete;
154  node(node&&) = delete;
155  node& operator=(node&&) = delete;
156 
157  /// The task ID
158  std::size_t get_id() const noexcept {
159  return id_;
160  }
161 
162  /// The task level
163  std::size_t get_level() const noexcept {
164  return level_;
165  }
166 
167  /// The task type
168  transwarp::task_type get_type() const noexcept {
169  return type_;
170  }
171 
172  /// The optional task name (may be null)
173  const std::shared_ptr<std::string>& get_name() const noexcept {
174  return name_;
175  }
176 
177  /// The optional, task-specific executor (may be null)
178  const std::shared_ptr<std::string>& get_executor() const noexcept {
179  return executor_;
180  }
181 
182  /// The task's parents (may be empty)
183  const std::vector<std::shared_ptr<node>>& get_parents() const noexcept {
184  return parents_;
185  }
186 
187  /// The task priority (defaults to 0)
188  std::size_t get_priority() const noexcept {
189  return priority_;
190  }
191 
192  /// The custom task data (may be null)
193  const std::shared_ptr<void>& get_custom_data() const noexcept {
194  return custom_data_;
195  }
196 
197  /// Returns whether the associated task is canceled
198  bool is_canceled() const noexcept {
199  return canceled_.load();
200  }
201 
202  /// Returns the average idletime in microseconds (-1 if never set)
203  std::int64_t get_avg_idletime_us() const noexcept {
204  return avg_idletime_us_.load();
205  }
206 
207  /// Returns the average waittime in microseconds (-1 if never set)
208  std::int64_t get_avg_waittime_us() const noexcept {
209  return avg_waittime_us_.load();
210  }
211 
212  /// Returns the average runtime in microseconds (-1 if never set)
213  std::int64_t get_avg_runtime_us() const noexcept {
214  return avg_runtime_us_.load();
215  }
216 
217 private:
218  friend struct transwarp::detail::node_manip;
219 
220  std::size_t id_ = 0;
221  std::size_t level_ = 0;
223  std::shared_ptr<std::string> name_;
224  std::shared_ptr<std::string> executor_;
225  std::vector<std::shared_ptr<node>> parents_;
226  std::size_t priority_ = 0;
227  std::shared_ptr<void> custom_data_;
228  std::atomic<bool> canceled_{false};
229  std::atomic<std::int64_t> avg_idletime_us_{-1};
230  std::atomic<std::int64_t> avg_waittime_us_{-1};
231  std::atomic<std::int64_t> avg_runtime_us_{-1};
232 };
233 
234 /// String conversion for the node class
235 inline std::string to_string(const transwarp::node& node, const std::string& separator="\n") {
236  std::string s;
237  s += '"';
238  const std::shared_ptr<std::string>& name = node.get_name();
239  if (name) {
240  s += "<" + *name + ">" + separator;
241  }
242  s += transwarp::to_string(node.get_type());
243  s += " id=" + std::to_string(node.get_id());
244  s += " lev=" + std::to_string(node.get_level());
245  const std::shared_ptr<std::string>& exec = node.get_executor();
246  if (exec) {
247  s += separator + "<" + *exec + ">";
248  }
249  const std::int64_t avg_idletime_us = node.get_avg_idletime_us();
250  if (avg_idletime_us >= 0) {
251  s += separator + "avg-idle-us=" + std::to_string(avg_idletime_us);
252  }
253  const std::int64_t avg_waittime_us = node.get_avg_waittime_us();
254  if (avg_waittime_us >= 0) {
255  s += separator + "avg-wait-us=" + std::to_string(avg_waittime_us);
256  }
257  const std::int64_t avg_runtime_us = node.get_avg_runtime_us();
258  if (avg_runtime_us >= 0) {
259  s += separator + "avg-run-us=" + std::to_string(avg_runtime_us);
260  }
261  s += '"';
262  return s;
263 }
264 
265 
266 /// An edge between two nodes
267 class edge {
268 public:
269  edge(std::shared_ptr<transwarp::node> parent, std::shared_ptr<transwarp::node> child) noexcept
270  : parent_(std::move(parent)), child_(std::move(child))
271  {}
272 
273  // default copy/move semantics
274  edge(const edge&) = default;
275  edge& operator=(const edge&) = default;
276  edge(edge&&) = default;
277  edge& operator=(edge&&) = default;
278 
279  /// Returns the parent node
280  const std::shared_ptr<transwarp::node>& get_parent() const noexcept {
281  return parent_;
282  }
283 
284  /// Returns the child node
285  const std::shared_ptr<transwarp::node>& get_child() const noexcept {
286  return child_;
287  }
288 
289 private:
290  std::shared_ptr<transwarp::node> parent_;
291  std::shared_ptr<transwarp::node> child_;
292 };
293 
294 /// String conversion for the edge class
295 inline std::string to_string(const transwarp::edge& edge, const std::string& separator="\n") {
296  return transwarp::to_string(*edge.get_parent(), separator) + " -> " + transwarp::to_string(*edge.get_child(), separator);
297 }
298 
299 
300 /// Creates a dot-style string from the given graph
301 inline std::string to_string(const std::vector<transwarp::edge>& graph, const std::string& separator="\n") {
302  std::string dot = "digraph {" + separator;
303  for (const transwarp::edge& edge : graph) {
304  dot += transwarp::to_string(edge, separator) + separator;
305  }
306  dot += "}";
307  return dot;
308 }
309 
310 
311 /// The executor interface used to perform custom task execution
312 class executor {
313 public:
314  virtual ~executor() = default;
315 
316  /// Returns the name of the executor
317  virtual std::string get_name() const = 0;
318 
319  /// Runs a task which is wrapped by the given functor. The functor only
320  /// captures one shared pointer and can hence be copied at low cost.
321  /// node represents the task that the functor belongs to.
322  /// This function is only ever called on the thread of the caller to schedule().
323  /// The implementer needs to ensure that this never throws exceptions
324  virtual void execute(const std::function<void()>& functor, const std::shared_ptr<transwarp::node>& node) = 0;
325 };
326 
327 
328 /// The task events that can be subscribed to using the listener interface
329 enum class event_type {
330  before_scheduled, ///< Just before a task is scheduled (handle_event called on thread of caller to schedule())
331  before_started, ///< Just before a task starts running (handle_event called on thread that task is run on)
332  before_invoked, ///< Just before a task's functor is invoked (handle_event called on thread that task is run on)
333  after_finished, ///< Just after a task has finished running (handle_event called on thread that task is run on)
334  after_canceled, ///< Just after a task was canceled (handle_event called on thread that task is run on)
335  count,
336 };
337 
338 
339 /// The listener interface to listen to events raised by tasks
340 class listener {
341 public:
342  virtual ~listener() = default;
343 
344  /// This may be called from arbitrary threads depending on the event type (see transwarp::event_type).
345  /// The implementer needs to ensure that this never throws exceptions
346  virtual void handle_event(transwarp::event_type event, const std::shared_ptr<transwarp::node>& node) = 0;
347 };
348 
349 
350 /// Determines in which order tasks are scheduled in the graph
351 enum class schedule_type {
352  breadth, ///< Scheduling according to a breadth-first search (default)
353  depth, ///< Scheduling according to a depth-first search
354 };
355 
356 
357 /// An interface for the task class
358 class itask {
359 public:
360  virtual ~itask() = default;
361 
362  virtual void set_executor(std::shared_ptr<transwarp::executor> executor) = 0;
363  virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
364  virtual void remove_executor() = 0;
365  virtual void remove_executor_all() = 0;
366  virtual void set_priority(std::size_t priority) = 0;
367  virtual void set_priority_all(std::size_t priority) = 0;
368  virtual void reset_priority() = 0;
369  virtual void reset_priority_all() = 0;
370  virtual void set_custom_data(std::shared_ptr<void> custom_data) = 0;
371  virtual void set_custom_data_all(std::shared_ptr<void> custom_data) = 0;
372  virtual void remove_custom_data() = 0;
373  virtual void remove_custom_data_all() = 0;
374  virtual const std::shared_ptr<transwarp::node>& get_node() const noexcept = 0;
375  virtual void add_listener(std::shared_ptr<transwarp::listener> listener) = 0;
376  virtual void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
377  virtual void add_listener_all(std::shared_ptr<transwarp::listener> listener) = 0;
378  virtual void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
379  virtual void remove_listener(const std::shared_ptr<transwarp::listener>& listener) = 0;
380  virtual void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
381  virtual void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) = 0;
382  virtual void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
383  virtual void remove_listeners() = 0;
384  virtual void remove_listeners(transwarp::event_type event) = 0;
385  virtual void remove_listeners_all() = 0;
386  virtual void remove_listeners_all(transwarp::event_type event) = 0;
387  virtual void schedule() = 0;
388  virtual void schedule(transwarp::executor& executor) = 0;
389  virtual void schedule(bool reset) = 0;
390  virtual void schedule(transwarp::executor& executor, bool reset) = 0;
391  virtual void schedule_all() = 0;
392  virtual void schedule_all(transwarp::executor& executor) = 0;
393  virtual void schedule_all(bool reset_all) = 0;
394  virtual void schedule_all(transwarp::executor& executor, bool reset_all) = 0;
395  virtual void schedule_all(transwarp::schedule_type type) = 0;
396  virtual void schedule_all(transwarp::executor& executor, transwarp::schedule_type type) = 0;
397  virtual void schedule_all(transwarp::schedule_type type, bool reset_all) = 0;
398  virtual void schedule_all(transwarp::executor& executor, transwarp::schedule_type type, bool reset_all) = 0;
399  virtual void set_exception(std::exception_ptr exception) = 0;
400  virtual bool was_scheduled() const noexcept = 0;
401  virtual void wait() const = 0;
402  virtual bool is_ready() const = 0;
403  virtual bool has_result() const = 0;
404  virtual void reset() = 0;
405  virtual void reset_all() = 0;
406  virtual void cancel(bool enabled) noexcept = 0;
407  virtual void cancel_all(bool enabled) noexcept = 0;
408  virtual std::size_t get_parent_count() const noexcept = 0;
409  virtual std::size_t get_task_count() const noexcept = 0;
410  virtual std::vector<transwarp::edge> get_graph() const = 0;
411 
412 protected:
413  virtual void schedule_impl(bool reset, transwarp::executor* executor=nullptr) = 0;
414 
415 private:
418  friend struct transwarp::detail::final_visitor;
420 
421  virtual void visit_depth(const std::function<void(itask&)>& visitor) = 0;
422  virtual void unvisit() noexcept = 0;
423  virtual void set_node_id(std::size_t id) noexcept = 0;
424 };
425 
426 
427 /// Removes reference and const from a type
428 template<typename T>
429 struct decay {
430  using type = typename std::remove_const<typename std::remove_reference<T>::type>::type;
431 };
432 
433 
434 /// Returns the result type of a std::shared_future<T>
435 template<typename T>
436 struct result {
437  using type = typename std::result_of<decltype(&std::shared_future<T>::get)(std::shared_future<T>)>::type;
438 };
439 
440 
441 /// The task class
442 template<typename ResultType>
443 class task : public transwarp::itask {
444 public:
445  using result_type = ResultType;
446 
447  virtual ~task() = default;
448 
449  virtual void set_value(const typename transwarp::decay<result_type>::type& value) = 0;
450  virtual void set_value(typename transwarp::decay<result_type>::type&& value) = 0;
451  virtual const std::shared_future<result_type>& get_future() const noexcept = 0;
452  virtual typename transwarp::result<result_type>::type get() const = 0;
453 };
454 
455 /// The task class (reference result type)
456 template<typename ResultType>
457 class task<ResultType&> : public transwarp::itask {
458 public:
459  using result_type = ResultType&;
460 
461  virtual ~task() = default;
462 
463  virtual void set_value(typename transwarp::decay<result_type>::type& value) = 0;
464  virtual const std::shared_future<result_type>& get_future() const noexcept = 0;
465  virtual typename transwarp::result<result_type>::type get() const = 0;
466 };
467 
468 /// The task class (void result type)
469 template<>
470 class task<void> : public transwarp::itask {
471 public:
472  using result_type = void;
473 
474  virtual ~task() = default;
475 
476  virtual void set_value() = 0;
477  virtual const std::shared_future<result_type>& get_future() const noexcept = 0;
478  virtual result_type get() const = 0;
479 };
480 
481 
482 /// A base class for a user-defined functor that needs access to the node associated
483 /// to the task or a cancel point to stop a task while it's running
484 class functor {
485 public:
486 
487  virtual ~functor() = default;
488 
489 protected:
490 
491  /// The node associated to the task
492  const std::shared_ptr<transwarp::node>& transwarp_node() const noexcept {
493  return transwarp_node_;
494  }
495 
496  /// If the associated task is canceled then this will throw transwarp::task_canceled
497  /// which will stop the task while it's running
498  void transwarp_cancel_point() const {
499  if (transwarp_node_->is_canceled()) {
500  throw transwarp::task_canceled(std::to_string(transwarp_node_->get_id()));
501  }
502  }
503 
504 private:
505  template<bool>
507 
508  std::shared_ptr<transwarp::node> transwarp_node_;
509 };
510 
511 
512 /// Detail namespace for internal functionality only
513 namespace detail {
514 
515 
516 /// Node manipulation
517 struct node_manip {
518 
519  static void set_id(transwarp::node& node, std::size_t id) noexcept {
520  node.id_ = id;
521  }
522 
523  static void set_level(transwarp::node& node, std::size_t level) noexcept {
524  node.level_ = level;
525  }
526 
527  static void set_type(transwarp::node& node, transwarp::task_type type) noexcept {
528  node.type_ = type;
529  }
530 
531  static void set_name(transwarp::node& node, std::shared_ptr<std::string> name) noexcept {
532  node.name_ = std::move(name);
533  }
534 
535  static void set_executor(transwarp::node& node, std::shared_ptr<std::string> executor) noexcept {
536  if (executor) {
537  node.executor_ = std::move(executor);
538  } else {
539  node.executor_.reset();
540  }
541  }
542 
543  static void add_parent(transwarp::node& node, std::shared_ptr<transwarp::node> parent) {
544  node.parents_.push_back(std::move(parent));
545  }
546 
547  static void set_priority(transwarp::node& node, std::size_t priority) noexcept {
548  node.priority_ = priority;
549  }
550 
551  static void set_custom_data(transwarp::node& node, std::shared_ptr<void> custom_data) {
552  if (custom_data) {
553  node.custom_data_ = std::move(custom_data);
554  } else {
555  node.custom_data_.reset();
556  }
557  }
558 
559  static void set_canceled(transwarp::node& node, bool enabled) noexcept {
560  node.canceled_ = enabled;
561  }
562 
563  static void set_avg_idletime_us(transwarp::node& node, std::int64_t idletime) noexcept {
564  node.avg_idletime_us_ = idletime;
565  }
566 
567  static void set_avg_waittime_us(transwarp::node& node, std::int64_t waittime) noexcept {
568  node.avg_waittime_us_ = waittime;
569  }
570 
571  static void set_avg_runtime_us(transwarp::node& node, std::int64_t runtime) noexcept {
572  node.avg_runtime_us_ = runtime;
573  }
574 
575 };
576 
577 
578 /// A simple thread pool used to execute tasks in parallel
579 class thread_pool {
580 public:
581 
582  explicit thread_pool(std::size_t n_threads)
583  : done_(false)
584  {
585  if (n_threads == 0) {
586  throw transwarp::invalid_parameter("number of threads");
587  }
588  const std::size_t n_target = threads_.size() + n_threads;
589  while (threads_.size() < n_target) {
590  std::thread thread;
591  try {
592  thread = std::thread(&thread_pool::worker, this);
593  } catch (...) {
594  shutdown();
595  throw;
596  }
597  try {
598  threads_.push_back(std::move(thread));
599  } catch (...) {
600  shutdown();
601  thread.join();
602  throw;
603  }
604  }
605  }
606 
607  // delete copy/move semantics
608  thread_pool(const thread_pool&) = delete;
609  thread_pool& operator=(const thread_pool&) = delete;
610  thread_pool(thread_pool&&) = delete;
611  thread_pool& operator=(thread_pool&&) = delete;
612 
613  ~thread_pool() {
614  shutdown();
615  }
616 
617  void push(const std::function<void()>& functor) {
618  {
619  std::lock_guard<std::mutex> lock(mutex_);
620  functors_.push(functor);
621  }
622  cond_var_.notify_one();
623  }
624 
625 private:
626 
627  void worker() {
628  for (;;) {
629  std::function<void()> functor;
630  {
631  std::unique_lock<std::mutex> lock(mutex_);
632  cond_var_.wait(lock, [this]{
633  return done_ || !functors_.empty();
634  });
635  if (done_ && functors_.empty()) {
636  break;
637  }
638  functor = functors_.front();
639  functors_.pop();
640  }
641  functor();
642  }
643  }
644 
645  void shutdown() {
646  {
647  std::lock_guard<std::mutex> lock(mutex_);
648  done_ = true;
649  }
650  cond_var_.notify_all();
651  for (std::thread& thread : threads_) {
652  thread.join();
653  }
654  threads_.clear();
655  }
656 
657  bool done_;
658  std::vector<std::thread> threads_;
659  std::queue<std::function<void()>> functors_;
660  std::condition_variable cond_var_;
661  std::mutex mutex_;
662 };
663 
664 
665 template<int offset, typename... ParentResults>
667  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
668  std::get<offset>(target) = std::get<offset>(source)->get_future();
670  }
671 };
672 
673 template<typename... ParentResults>
674 struct assign_futures_impl<-1, ParentResults...> {
675  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&, std::tuple<std::shared_future<ParentResults>...>&) {}
676 };
677 
678 /// Returns the futures from the given tuple of tasks
679 template<typename... ParentResults>
680 std::tuple<std::shared_future<ParentResults>...> get_futures(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& input) {
681  std::tuple<std::shared_future<ParentResults>...> result;
682  assign_futures_impl<static_cast<int>(sizeof...(ParentResults)) - 1, ParentResults...>::work(input, result);
683  return result;
684 }
685 
686 /// Returns the futures from the given vector of tasks
687 template<typename ParentResultType>
688 std::vector<std::shared_future<ParentResultType>> get_futures(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& input) {
689  std::vector<std::shared_future<ParentResultType>> result;
690  result.reserve(input.size());
691  for (const std::shared_ptr<transwarp::task<ParentResultType>>& task : input) {
692  result.emplace_back(task->get_future());
693  }
694  return result;
695 }
696 
697 
698 /// Runs the task with the given arguments, hence, invoking the task's functor
699 template<typename Result, typename Task, typename... Args>
700 Result run_task(std::size_t node_id, const std::weak_ptr<Task>& task, Args&&... args) {
701  const std::shared_ptr<Task> t = task.lock();
702  if (!t) {
703  throw transwarp::task_destroyed(std::to_string(node_id));
704  }
705  if (t->node_->is_canceled()) {
706  throw transwarp::task_canceled(std::to_string(node_id));
707  }
709  return t->functor_(std::forward<Args>(args)...);
710 }
711 
712 
713 inline void wait_for_all() {}
714 
715 /// Waits for all parents to finish
716 template<typename ParentResult, typename... ParentResults>
717 void wait_for_all(const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
718  parent->get_future().wait();
720 }
721 
722 
723 /// Waits for all parents to finish
724 template<typename ParentResultType>
725 void wait_for_all(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
726  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
727  parent->get_future().wait();
728  }
729 }
730 
731 
732 template<typename Parent>
733 Parent wait_for_any_impl() {
734  return {};
735 }
736 
737 template<typename Parent, typename ParentResult, typename... ParentResults>
738 Parent wait_for_any_impl(const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
739  const std::future_status status = parent->get_future().wait_for(std::chrono::microseconds(1));
740  if (status == std::future_status::ready) {
741  return parent;
742  }
743  return transwarp::detail::wait_for_any_impl<Parent>(parents...);
744 }
745 
746 /// Waits for the first parent to finish
747 template<typename Parent, typename... ParentResults>
748 Parent wait_for_any(const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
749  for (;;) {
750  Parent parent = transwarp::detail::wait_for_any_impl<Parent>(parents...);
751  if (parent) {
752  return parent;
753  }
754  }
755 }
756 
757 
758 /// Waits for the first parent to finish
759 template<typename ParentResultType>
760 std::shared_ptr<transwarp::task<ParentResultType>> wait_for_any(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
761  for (;;) {
762  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
763  const std::future_status status = parent->get_future().wait_for(std::chrono::microseconds(1));
764  if (status == std::future_status::ready) {
765  return parent;
766  }
767  }
768  }
769 }
770 
771 
772 template<typename OneResult>
773 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>&) {}
774 
775 /// Cancels all tasks but one
776 template<typename OneResult, typename ParentResult, typename... ParentResults>
777 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
778  if (one != parent) {
779  parent->cancel(true);
780  }
782 }
783 
784 
785 /// Cancels all tasks but one
786 template<typename OneResult, typename ParentResultType>
787 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
788  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
789  if (one != parent) {
790  parent->cancel(true);
791  }
792  }
793 }
794 
795 
796 template<typename TaskType, bool done, int total, int... n>
797 struct call_impl {
798  template<typename Result, typename Task, typename... ParentResults>
799  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
800  return call_impl<TaskType, total == 1 + static_cast<int>(sizeof...(n)), total, n..., static_cast<int>(sizeof...(n))>::template
801  work<Result>(node_id, task, parents);
802  }
803 };
804 
805 template<typename TaskType>
807 
808 template<int total, int... n>
809 struct call_impl<transwarp::root_type, true, total, n...> {
810  template<typename Result, typename Task, typename... ParentResults>
811  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&) {
812  return transwarp::detail::run_task<Result>(node_id, task);
813  }
814 };
815 
816 template<>
817 struct call_impl_vector<transwarp::root_type> {
818  template<typename Result, typename Task, typename ParentResultType>
819  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>&) {
820  return transwarp::detail::run_task<Result>(node_id, task);
821  }
822 };
823 
824 template<int total, int... n>
825 struct call_impl<transwarp::accept_type, true, total, n...> {
826  template<typename Result, typename Task, typename... ParentResults>
827  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
828  transwarp::detail::wait_for_all(std::get<n>(parents)...);
829  const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
830  return transwarp::detail::run_task<Result>(node_id, task, std::get<n>(futures)...);
831  }
832 };
833 
834 template<>
835 struct call_impl_vector<transwarp::accept_type> {
836  template<typename Result, typename Task, typename ParentResultType>
837  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
839  return transwarp::detail::run_task<Result>(node_id, task, transwarp::detail::get_futures(parents));
840  }
841 };
842 
843 template<int total, int... n>
844 struct call_impl<transwarp::accept_any_type, true, total, n...> {
845  template<typename Result, typename Task, typename... ParentResults>
846  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
847  using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
848  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
849  transwarp::detail::cancel_all_but_one(parent, std::get<n>(parents)...);
850  return transwarp::detail::run_task<Result>(node_id, task, parent->get_future());
851  }
852 };
853 
854 template<>
855 struct call_impl_vector<transwarp::accept_any_type> {
856  template<typename Result, typename Task, typename ParentResultType>
857  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
858  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
860  return transwarp::detail::run_task<Result>(node_id, task, parent->get_future());
861  }
862 };
863 
864 template<int total, int... n>
865 struct call_impl<transwarp::consume_type, true, total, n...> {
866  template<typename Result, typename Task, typename... ParentResults>
867  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
868  transwarp::detail::wait_for_all(std::get<n>(parents)...);
869  return transwarp::detail::run_task<Result>(node_id, task, std::get<n>(parents)->get_future().get()...);
870  }
871 };
872 
873 template<>
874 struct call_impl_vector<transwarp::consume_type> {
875  template<typename Result, typename Task, typename ParentResultType>
876  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
878  std::vector<ParentResultType> results;
879  results.reserve(parents.size());
880  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
881  results.emplace_back(parent->get_future().get());
882  }
883  return transwarp::detail::run_task<Result>(node_id, task, std::move(results));
884  }
885 };
886 
887 template<int total, int... n>
888 struct call_impl<transwarp::consume_any_type, true, total, n...> {
889  template<typename Result, typename Task, typename... ParentResults>
890  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
891  using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; /// Use first type as reference
892  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
893  transwarp::detail::cancel_all_but_one(parent, std::get<n>(parents)...);
894  return transwarp::detail::run_task<Result>(node_id, task, parent->get_future().get());
895  }
896 };
897 
898 template<>
899 struct call_impl_vector<transwarp::consume_any_type> {
900  template<typename Result, typename Task, typename ParentResultType>
901  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
902  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
904  return transwarp::detail::run_task<Result>(node_id, task, parent->get_future().get());
905  }
906 };
907 
908 template<int total, int... n>
909 struct call_impl<transwarp::wait_type, true, total, n...> {
910  template<typename Result, typename Task, typename... ParentResults>
911  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
912  transwarp::detail::wait_for_all(std::get<n>(parents)...);
913  get_all(std::get<n>(parents)...); // Ensures that exceptions are propagated
914  return transwarp::detail::run_task<Result>(node_id, task);
915  }
916  template<typename T, typename... Args>
917  static void get_all(const T& arg, const Args& ...args) {
918  arg->get_future().get();
919  get_all(args...);
920  }
921  static void get_all() {}
922 };
923 
924 template<>
925 struct call_impl_vector<transwarp::wait_type> {
926  template<typename Result, typename Task, typename ParentResultType>
927  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
929  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
930  parent->get_future().get(); // Ensures that exceptions are propagated
931  }
932  return transwarp::detail::run_task<Result>(node_id, task);
933  }
934 };
935 
936 template<int total, int... n>
937 struct call_impl<transwarp::wait_any_type, true, total, n...> {
938  template<typename Result, typename Task, typename... ParentResults>
939  static Result work(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
940  using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
941  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
942  transwarp::detail::cancel_all_but_one(parent, std::get<n>(parents)...);
943  parent->get_future().get(); // Ensures that exceptions are propagated
944  return transwarp::detail::run_task<Result>(node_id, task);
945  }
946 };
947 
948 template<>
949 struct call_impl_vector<transwarp::wait_any_type> {
950  template<typename Result, typename Task, typename ParentResultType>
951  static Result work(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
952  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
954  parent->get_future().get(); // Ensures that exceptions are propagated
955  return transwarp::detail::run_task<Result>(node_id, task);
956  }
957 };
958 
959 /// Calls the functor of the given task with the results from the tuple of parents.
960 /// Throws transwarp::task_canceled if the task is canceled.
961 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
962 template<typename TaskType, typename Result, typename Task, typename... ParentResults>
963 Result call(std::size_t node_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
964  constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_future<ParentResults>...>>::value;
966  work<Result>(node_id, task, parents);
967 }
968 
969 /// Calls the functor of the given task with the results from the vector of parents.
970 /// Throws transwarp::task_canceled if the task is canceled.
971 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
972 template<typename TaskType, typename Result, typename Task, typename ParentResultType>
973 Result call(std::size_t node_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
975  work<Result>(node_id, task, parents);
976 }
977 
978 template<std::size_t...> struct indices {};
979 
980 template<std::size_t...> struct construct_range;
981 
982 template<std::size_t end, std::size_t idx, std::size_t... i>
983 struct construct_range<end, idx, i...> : construct_range<end, idx + 1, i..., idx> {};
984 
985 template<std::size_t end, std::size_t... i>
986 struct construct_range<end, end, i...> {
987  using type = transwarp::detail::indices<i...>;
988 };
989 
990 template<std::size_t b, std::size_t e>
991 struct index_range {
993 };
994 
995 template<typename Functor, typename... ParentResults>
996 void call_with_each_index(transwarp::detail::indices<>, const Functor&, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&) {}
997 
998 template<std::size_t i, std::size_t... j, typename Functor, typename... ParentResults>
999 void call_with_each_index(transwarp::detail::indices<i, j...>, const Functor& f, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& t) {
1000  auto ptr = std::get<i>(t);
1001  if (!ptr) {
1002  throw transwarp::invalid_parameter("task pointer");
1003  }
1004  f(*ptr);
1005  transwarp::detail::call_with_each_index(transwarp::detail::indices<j...>(), f, t);
1006 }
1007 
1008 /// Calls the functor with every element in the tuple
1009 template<typename Functor, typename... ParentResults>
1010 void call_with_each(const Functor& f, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& t) {
1011  constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>>::value;
1012  using index_t = typename transwarp::detail::index_range<0, n>::type;
1013  transwarp::detail::call_with_each_index(index_t(), f, t);
1014 }
1015 
1016 /// Calls the functor with every element in the vector
1017 template<typename Functor, typename ParentResultType>
1018 void call_with_each(const Functor& f, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& v) {
1019  for (const std::shared_ptr<transwarp::task<ParentResultType>>& ptr : v) {
1020  if (!ptr) {
1021  throw transwarp::invalid_parameter("task pointer");
1022  }
1023  f(*ptr);
1024  }
1025 }
1026 
1027 
1028 /// Sets parents and level of the node
1030  explicit parent_visitor(transwarp::node& node) noexcept
1031  : node_(node) {}
1032 
1033  void operator()(const transwarp::itask& task) const {
1034  transwarp::detail::node_manip::add_parent(node_, task.get_node());
1035  if (node_.get_level() <= task.get_node()->get_level()) {
1036  /// A child's level is always larger than any of its parents' levels
1037  transwarp::detail::node_manip::set_level(node_, task.get_node()->get_level() + 1);
1038  }
1039  }
1040 
1041  transwarp::node& node_;
1042 };
1043 
1044 /// Applies final bookkeeping to the task
1046  explicit final_visitor(std::size_t& count) noexcept
1047  : count_(count), id_(0) {}
1048 
1049  void operator()(transwarp::itask& task) noexcept {
1050  ++count_;
1051  task.set_node_id(id_++);
1052  }
1053 
1054  std::size_t& count_;
1055  std::size_t id_;
1056 };
1057 
1058 /// Generates a graph
1060  explicit graph_visitor(std::vector<transwarp::edge>& graph) noexcept
1061  : graph_(graph) {}
1062 
1063  void operator()(const transwarp::itask& task) {
1064  const std::shared_ptr<transwarp::node>& node = task.get_node();
1065  for (const std::shared_ptr<transwarp::node>& parent : node->get_parents()) {
1066  graph_.emplace_back(parent, node);
1067  }
1068  }
1069 
1070  std::vector<transwarp::edge>& graph_;
1071 };
1072 
1073 /// Schedules using the given executor
1075  schedule_visitor(bool reset, transwarp::executor* executor) noexcept
1076  : reset_(reset), executor_(executor) {}
1077 
1078  void operator()(transwarp::itask& task) {
1079  task.schedule_impl(reset_, executor_);
1080  }
1081 
1082  bool reset_;
1083  transwarp::executor* executor_;
1084 };
1085 
1086 /// Resets the given task
1088 
1089  void operator()(transwarp::itask& task) const {
1090  task.reset();
1091  }
1092 };
1093 
1094 /// Cancels or resumes the given task
1096  explicit cancel_visitor(bool enabled) noexcept
1097  : enabled_(enabled) {}
1098 
1099  void operator()(transwarp::itask& task) const noexcept {
1100  task.cancel(enabled_);
1101  }
1102 
1103  bool enabled_;
1104 };
1105 
1106 /// Assigns an executor to the given task
1108  explicit set_executor_visitor(std::shared_ptr<transwarp::executor> executor) noexcept
1109  : executor_(std::move(executor)) {}
1110 
1111  void operator()(transwarp::itask& task) const noexcept {
1112  task.set_executor(executor_);
1113  }
1114 
1115  std::shared_ptr<transwarp::executor> executor_;
1116 };
1117 
1118 /// Removes the executor from the given task
1120 
1121  void operator()(transwarp::itask& task) const noexcept {
1122  task.remove_executor();
1123  }
1124 };
1125 
1126 /// Assigns a priority to the given task
1128  explicit set_priority_visitor(std::size_t priority) noexcept
1129  : priority_(priority) {}
1130 
1131  void operator()(transwarp::itask& task) const noexcept {
1132  task.set_priority(priority_);
1133  }
1134 
1135  std::size_t priority_;
1136 };
1137 
1138 /// Resets the priority of the given task
1140 
1141  void operator()(transwarp::itask& task) const noexcept {
1142  task.reset_priority();
1143  }
1144 };
1145 
1146 /// Assigns custom data to the given task
1148  explicit set_custom_data_visitor(std::shared_ptr<void> custom_data) noexcept
1149  : custom_data_(std::move(custom_data)) {}
1150 
1151  void operator()(transwarp::itask& task) const noexcept {
1152  task.set_custom_data(custom_data_);
1153  }
1154 
1155  std::shared_ptr<void> custom_data_;
1156 };
1157 
1158 /// Removes custom data from the given task
1160 
1161  void operator()(transwarp::itask& task) const noexcept {
1162  task.remove_custom_data();
1163  }
1164 };
1165 
1166 /// Pushes the given task into the vector of tasks
1168  explicit push_task_visitor(std::vector<transwarp::itask*>& tasks)
1169  : tasks_(tasks) {}
1170 
1171  void operator()(transwarp::itask& task) {
1172  tasks_.push_back(&task);
1173  }
1174 
1175  std::vector<transwarp::itask*>& tasks_;
1176 };
1177 
1178 /// Adds a new listener to the given task
1180  explicit add_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1181  : listener_(std::move(listener))
1182  {}
1183 
1184  void operator()(transwarp::itask& task) {
1185  task.add_listener(listener_);
1186  }
1187 
1188  std::shared_ptr<transwarp::listener> listener_;
1189 };
1190 
1191 /// Adds a new listener per event type to the given task
1193  add_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1194  : event_(event), listener_(std::move(listener))
1195  {}
1196 
1197  void operator()(transwarp::itask& task) {
1198  task.add_listener(event_, listener_);
1199  }
1200 
1201  transwarp::event_type event_;
1202  std::shared_ptr<transwarp::listener> listener_;
1203 };
1204 
1205 /// Removes a listener from the given task
1207  explicit remove_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1208  : listener_(std::move(listener))
1209  {}
1210 
1211  void operator()(transwarp::itask& task) {
1212  task.remove_listener(listener_);
1213  }
1214 
1215  std::shared_ptr<transwarp::listener> listener_;
1216 };
1217 
1218 /// Removes a listener per event type from the given task
1220  remove_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1221  : event_(event), listener_(std::move(listener))
1222  {}
1223 
1224  void operator()(transwarp::itask& task) {
1225  task.remove_listener(event_, listener_);
1226  }
1227 
1228  transwarp::event_type event_;
1229  std::shared_ptr<transwarp::listener> listener_;
1230 };
1231 
1232 /// Removes all listeners from the given task
1234 
1235  void operator()(transwarp::itask& task) {
1236  task.remove_listeners();
1237  }
1238 
1239 };
1240 
1241 /// Removes all listeners per event type from the given task
1244  : event_(event)
1245  {}
1246 
1247  void operator()(transwarp::itask& task) {
1248  task.remove_listeners(event_);
1249  }
1250 
1251  transwarp::event_type event_;
1252 };
1253 
1254 /// Visits the given task using the visitor given in the constructor
1256  explicit visit_depth_visitor(const std::function<void(transwarp::itask&)>& visitor) noexcept
1257  : visitor_(visitor) {}
1258 
1259  void operator()(transwarp::itask& task) const {
1260  task.visit_depth(visitor_);
1261  }
1262 
1263  const std::function<void(transwarp::itask&)>& visitor_;
1264 };
1265 
1266 /// Unvisits the given task
1268 
1269  void operator()(transwarp::itask& task) const noexcept {
1270  task.unvisit();
1271  }
1272 };
1273 
1274 /// Determines the result type of the Functor dispatching on the task type
1275 template<typename TaskType, typename Functor, typename... ParentResults>
1277  static_assert(std::is_same<TaskType, transwarp::root_type>::value ||
1278  std::is_same<TaskType, transwarp::accept_type>::value ||
1279  std::is_same<TaskType, transwarp::accept_any_type>::value ||
1280  std::is_same<TaskType, transwarp::consume_type>::value ||
1281  std::is_same<TaskType, transwarp::consume_any_type>::value ||
1282  std::is_same<TaskType, transwarp::wait_type>::value ||
1283  std::is_same<TaskType, transwarp::wait_any_type>::value,
1284  "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1285 };
1286 
1287 template<typename Functor, typename... ParentResults>
1288 struct functor_result<transwarp::root_type, Functor, ParentResults...> {
1289  static_assert(sizeof...(ParentResults) == 0, "A root task cannot have parent tasks");
1290  using type = decltype(std::declval<Functor>()());
1291 };
1292 
1293 template<typename Functor, typename... ParentResults>
1294 struct functor_result<transwarp::accept_type, Functor, ParentResults...> {
1295  static_assert(sizeof...(ParentResults) > 0, "An accept task must have at least one parent");
1296  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1297 };
1298 
1299 template<typename Functor, typename ParentResultType>
1300 struct functor_result<transwarp::accept_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1301  using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1302 };
1303 
1304 template<typename Functor, typename... ParentResults>
1305 struct functor_result<transwarp::accept_any_type, Functor, ParentResults...> {
1306  static_assert(sizeof...(ParentResults) > 0, "An accept_any task must have at least one parent");
1307  using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1308  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1309 };
1310 
1311 template<typename Functor, typename ParentResultType>
1312 struct functor_result<transwarp::accept_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1313  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1314 };
1315 
1316 template<typename Functor, typename... ParentResults>
1317 struct functor_result<transwarp::consume_type, Functor, ParentResults...> {
1318  static_assert(sizeof...(ParentResults) > 0, "A consume task must have at least one parent");
1319  using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1320 };
1321 
1322 template<typename Functor, typename ParentResultType>
1323 struct functor_result<transwarp::consume_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1324  using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1325 };
1326 
1327 template<typename Functor, typename... ParentResults>
1328 struct functor_result<transwarp::consume_any_type, Functor, ParentResults...> {
1329  static_assert(sizeof...(ParentResults) > 0, "A consume_any task must have at least one parent");
1330  using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1331  using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1332 };
1333 
1334 template<typename Functor, typename ParentResultType>
1335 struct functor_result<transwarp::consume_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1336  using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1337 };
1338 
1339 template<typename Functor, typename... ParentResults>
1340 struct functor_result<transwarp::wait_type, Functor, ParentResults...> {
1341  static_assert(sizeof...(ParentResults) > 0, "A wait task must have at least one parent");
1342  using type = decltype(std::declval<Functor>()());
1343 };
1344 
1345 template<typename Functor, typename ParentResultType>
1346 struct functor_result<transwarp::wait_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1347  using type = decltype(std::declval<Functor>()());
1348 };
1349 
1350 template<typename Functor, typename... ParentResults>
1351 struct functor_result<transwarp::wait_any_type, Functor, ParentResults...> {
1352  static_assert(sizeof...(ParentResults) > 0, "A wait_any task must have at least one parent");
1353  using type = decltype(std::declval<Functor>()());
1354 };
1355 
1356 template<typename Functor, typename ParentResultType>
1357 struct functor_result<transwarp::wait_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1358  using type = decltype(std::declval<Functor>()());
1359 };
1360 
1361 template<bool is_transwarp_functor>
1362 struct assign_node_if_impl;
1363 
1364 template<>
1365 struct assign_node_if_impl<true> {
1366  template<typename Functor>
1367  void operator()(Functor& functor, std::shared_ptr<transwarp::node> node) const noexcept {
1368  functor.transwarp_node_ = std::move(node);
1369  }
1370 };
1371 
1372 template<>
1373 struct assign_node_if_impl<false> {
1374  template<typename Functor>
1375  void operator()(Functor&, std::shared_ptr<transwarp::node>) const noexcept {}
1376 };
1377 
1378 /// Assigns the node to the given functor if the functor is a subclass of transwarp::functor
1379 template<typename Functor>
1380 void assign_node_if(Functor& functor, std::shared_ptr<transwarp::node> node) noexcept {
1382 }
1383 
1384 /// Returns a ready future with the given value as its state
1385 template<typename ResultType, typename Value>
1386 std::shared_future<ResultType> make_future_with_value(Value&& value) {
1387  std::promise<ResultType> promise;
1388  promise.set_value(std::forward<Value>(value));
1389  return promise.get_future();
1390 }
1391 
1392 /// Returns a ready future
1393 inline std::shared_future<void> make_ready_future() {
1394  std::promise<void> promise;
1395  promise.set_value();
1396  return promise.get_future();
1397 }
1398 
1399 /// Returns a ready future with the given exception as its state
1400 template<typename ResultType>
1401 std::shared_future<ResultType> make_future_with_exception(std::exception_ptr exception) {
1402  if (!exception) {
1403  throw transwarp::invalid_parameter("exception pointer");
1404  }
1405  std::promise<ResultType> promise;
1406  promise.set_exception(exception);
1407  return promise.get_future();
1408 }
1409 
1410 
1411 /// Determines the type of the parents
1412 template<typename... ParentResults>
1413 struct parents {
1414  using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1415  static std::size_t size(const type&) {
1416  return std::tuple_size<type>::value;
1417  }
1418 };
1419 
1420 /// Determines the type of the parents. Specialization for vector parents
1421 template<typename ParentResultType>
1422 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1423  using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1424  static std::size_t size(const type& obj) {
1425  return obj.size();
1426  }
1427 };
1428 
1429 
1430 template<typename ResultType, typename TaskType>
1432 protected:
1433 
1434  template<typename Task, typename Parents>
1435  void call(std::size_t node_id,
1436  const std::weak_ptr<Task>& task,
1437  const Parents& parents) {
1438  promise_.set_value(transwarp::detail::call<TaskType, ResultType>(node_id, task, parents));
1439  }
1440 
1441  std::promise<ResultType> promise_;
1442 };
1443 
1444 template<typename TaskType>
1445 class base_runner<void, TaskType> {
1446 protected:
1447 
1448  template<typename Task, typename Parents>
1449  void call(std::size_t node_id,
1450  const std::weak_ptr<Task>& task,
1451  const Parents& parents) {
1452  transwarp::detail::call<TaskType, void>(node_id, task, parents);
1453  promise_.set_value();
1454  }
1455 
1456  std::promise<void> promise_;
1457 };
1458 
1459 /// A callable to run a task given its parents
1460 template<typename ResultType, typename TaskType, typename Task, typename Parents>
1461 class runner : public transwarp::detail::base_runner<ResultType, TaskType> {
1462 public:
1463 
1464  runner(std::size_t node_id,
1465  const std::weak_ptr<Task>& task,
1466  const typename transwarp::decay<Parents>::type& parents)
1467  : node_id_(node_id),
1468  task_(task),
1469  parents_(parents)
1470  {}
1471 
1472  std::future<ResultType> get_future() {
1473  return this->promise_.get_future();
1474  }
1475 
1476  void operator()() {
1477  if (const std::shared_ptr<Task> t = task_.lock()) {
1478  t->raise_event(transwarp::event_type::before_started);
1479  }
1480  try {
1481  this->call(node_id_, task_, parents_);
1482  } catch (const transwarp::task_canceled&) {
1483  this->promise_.set_exception(std::current_exception());
1484  if (const std::shared_ptr<Task> t = task_.lock()) {
1485  t->raise_event(transwarp::event_type::after_canceled);
1486  }
1487  } catch (...) {
1488  this->promise_.set_exception(std::current_exception());
1489  }
1490  if (const std::shared_ptr<Task> t = task_.lock()) {
1491  t->raise_event(transwarp::event_type::after_finished);
1492  }
1493  }
1494 
1495 private:
1496  const std::size_t node_id_;
1497  const std::weak_ptr<Task> task_;
1498  const typename transwarp::decay<Parents>::type parents_;
1499 };
1500 
1501 
1502 /// A simple circular buffer (FIFO).
1503 /// ValueType must support default construction. The buffer lets you push
1504 /// new values onto the back and pop old values off the front.
1505 template<typename ValueType>
1507 public:
1508 
1509  static_assert(std::is_default_constructible<ValueType>::value, "ValueType must be default constructible");
1510 
1511  using value_type = ValueType;
1512 
1513  /// Constructs a circular buffer with a given fixed capacity
1514  explicit
1516  : data_(capacity)
1517  {
1518  if (capacity < 1) {
1519  throw transwarp::invalid_parameter("capacity");
1520  }
1521  }
1522 
1523  // delete copy/move semantics
1524  circular_buffer(const circular_buffer&) = delete;
1525  circular_buffer& operator=(const circular_buffer&) = delete;
1526  circular_buffer(circular_buffer&& other) = delete;
1527  circular_buffer& operator=(circular_buffer&&) = delete;
1528 
1529  /// Pushes a new value onto the end of the buffer. If that exceeds the capacity
1530  /// of the buffer then the oldest value gets dropped (the one at the front).
1531  template<typename T, typename = typename std::enable_if<std::is_same<typename std::decay<T>::type, value_type>::value>::type>
1532  void push(T&& value) {
1533  data_[end_] = std::forward<T>(value);
1534  increment();
1535  }
1536 
1537  /// Returns the value at the front of the buffer (the oldest value).
1538  /// This is undefined if the buffer is empty
1539  const value_type& front() const {
1540  return data_[front_];
1541  }
1542 
1543  /// Removes the value at the front of the buffer (the oldest value)
1544  void pop() {
1545  if (!empty()) {
1546  data_[front_] = ValueType{};
1547  decrement();
1548  }
1549  }
1550 
1551  /// Returns the capacity of the buffer
1552  std::size_t capacity() const {
1553  return data_.size();
1554  }
1555 
1556  /// Returns the number of populated values of the buffer. Its maximum value
1557  /// equals the capacity of the buffer
1558  std::size_t size() const {
1559  return size_;
1560  }
1561 
1562  /// Returns whether the buffer is empty
1563  bool empty() const {
1564  return size_ == 0;
1565  }
1566 
1567  /// Returns whether the buffer is full
1568  bool full() const {
1569  return size_ == data_.size();
1570  }
1571 
1572  /// Swaps this buffer with the given buffer
1573  void swap(circular_buffer& buffer) {
1574  std::swap(end_, buffer.end_);
1575  std::swap(front_, buffer.front_);
1576  std::swap(size_, buffer.size_);
1577  std::swap(data_, buffer.data_);
1578  }
1579 
1580 private:
1581 
1582  void increment_or_wrap(std::size_t& value) const {
1583  if (value == data_.size() - 1) {
1584  value = 0;
1585  } else {
1586  ++value;
1587  }
1588  }
1589 
1590  void increment() {
1591  increment_or_wrap(end_);
1592  if (full()) {
1593  increment_or_wrap(front_);
1594  } else {
1595  ++size_;
1596  }
1597  }
1598 
1599  void decrement() {
1600  increment_or_wrap(front_);
1601  --size_;
1602  }
1603 
1604  std::size_t end_{};
1605  std::size_t front_{};
1606  std::size_t size_{};
1607  std::vector<value_type> data_;
1608 };
1609 
1610 
1611 class spinlock {
1612 public:
1613 
1614  void lock() noexcept {
1615  while (locked_.test_and_set(std::memory_order_acquire));
1616  }
1617 
1618  void unlock() noexcept {
1619  locked_.clear(std::memory_order_release);
1620  }
1621 
1622 private:
1623  std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1624 };
1625 
1626 
1627 } // detail
1628 
1629 
1630 /// A functor not doing nothing
1632  void operator()() const noexcept {}
1633 };
1634 
1635 /// An object to use in places where a no-op functor is required
1636 constexpr no_op_functor no_op{};
1637 
1638 
1639 /// Executor for sequential execution. Runs functors sequentially on the same thread
1641 public:
1642 
1643  sequential() = default;
1644 
1645  // delete copy/move semantics
1646  sequential(const sequential&) = delete;
1647  sequential& operator=(const sequential&) = delete;
1648  sequential(sequential&&) = delete;
1649  sequential& operator=(sequential&&) = delete;
1650 
1651  /// Returns the name of the executor
1652  std::string get_name() const override {
1653  return "transwarp::sequential";
1654  }
1655 
1656  /// Runs the functor on the current thread
1657  void execute(const std::function<void()>& functor, const std::shared_ptr<transwarp::node>&) override {
1658  functor();
1659  }
1660 };
1661 
1662 
1663 /// Executor for parallel execution. Uses a simple thread pool
1665 public:
1666 
1667  explicit parallel(std::size_t n_threads)
1668  : pool_(n_threads)
1669  {}
1670 
1671  // delete copy/move semantics
1672  parallel(const parallel&) = delete;
1673  parallel& operator=(const parallel&) = delete;
1674  parallel(parallel&&) = delete;
1675  parallel& operator=(parallel&&) = delete;
1676 
1677  /// Returns the name of the executor
1678  std::string get_name() const override {
1679  return "transwarp::parallel";
1680  }
1681 
1682  /// Pushes the functor into the thread pool for asynchronous execution
1683  void execute(const std::function<void()>& functor, const std::shared_ptr<transwarp::node>&) override {
1684  pool_.push(functor);
1685  }
1686 
1687 private:
1689 };
1690 
1691 
1692 /// Detail namespace for internal functionality only
1693 namespace detail {
1694 
1695 /// The base task class that contains the functionality that can be used
1696 /// with all result types (void and non-void).
1697 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
1698 class task_impl_base : public transwarp::task<ResultType>,
1699  public std::enable_shared_from_this<task_impl_base<ResultType, TaskType, Functor, ParentResults...>> {
1700 public:
1701  /// The task type
1702  using task_type = TaskType;
1703 
1704  /// The result type of this task
1705  using result_type = ResultType;
1706 
1707  /// Assigns an executor to this task which takes precedence over
1708  /// the executor provided in schedule() or schedule_all()
1709  void set_executor(std::shared_ptr<transwarp::executor> executor) override {
1711  if (!executor) {
1712  throw transwarp::invalid_parameter("executor pointer");
1713  }
1714  executor_ = std::move(executor);
1715  transwarp::detail::node_manip::set_executor(*node_, std::shared_ptr<std::string>(new std::string(executor_->get_name())));
1716  }
1717 
1718  /// Assigns an executor to all tasks which takes precedence over
1719  /// the executor provided in schedule() or schedule_all()
1720  void set_executor_all(std::shared_ptr<transwarp::executor> executor) override {
1722  transwarp::detail::set_executor_visitor visitor(std::move(executor));
1723  visit_all(visitor);
1724  }
1725 
1726  /// Removes the executor from this task
1727  void remove_executor() override {
1729  executor_.reset();
1730  transwarp::detail::node_manip::set_executor(*node_, nullptr);
1731  }
1732 
1733  /// Removes the executor from all tasks
1734  void remove_executor_all() override {
1737  visit_all(visitor);
1738  }
1739 
1740  /// Sets a task priority (defaults to 0). transwarp will not directly use this.
1741  /// This is only useful if something else is using the priority (e.g. a custom executor)
1742  void set_priority(std::size_t priority) override {
1744  transwarp::detail::node_manip::set_priority(*node_, priority);
1745  }
1746 
1747  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
1748  /// This is only useful if something else is using the priority (e.g. a custom executor)
1749  void set_priority_all(std::size_t priority) override {
1751  transwarp::detail::set_priority_visitor visitor(priority);
1752  visit_all(visitor);
1753  }
1754 
1755  /// Resets the task priority to 0
1756  void reset_priority() override {
1758  transwarp::detail::node_manip::set_priority(*node_, 0);
1759  }
1760 
1761  /// Resets the priority of all tasks to 0
1762  void reset_priority_all() override {
1765  visit_all(visitor);
1766  }
1767 
1768  /// Assigns custom data to this task. transwarp will not directly use this.
1769  /// This is only useful if something else is using this custom data (e.g. a custom executor)
1770  void set_custom_data(std::shared_ptr<void> custom_data) override {
1772  if (!custom_data) {
1773  throw transwarp::invalid_parameter("custom data pointer");
1774  }
1775  transwarp::detail::node_manip::set_custom_data(*node_, std::move(custom_data));
1776  }
1777 
1778  /// Assigns custom data to all tasks. transwarp will not directly use this.
1779  /// This is only useful if something else is using this custom data (e.g. a custom executor)
1780  void set_custom_data_all(std::shared_ptr<void> custom_data) override {
1782  transwarp::detail::set_custom_data_visitor visitor(std::move(custom_data));
1783  visit_all(visitor);
1784  }
1785 
1786  /// Removes custom data from this task
1787  void remove_custom_data() override {
1789  transwarp::detail::node_manip::set_custom_data(*node_, nullptr);
1790  }
1791 
1792  /// Removes custom data from all tasks
1793  void remove_custom_data_all() override {
1796  visit_all(visitor);
1797  }
1798 
1799  /// Returns the future associated to the underlying execution
1800  const std::shared_future<result_type>& get_future() const noexcept override {
1801  return future_;
1802  }
1803 
1804  /// Returns the associated node
1805  const std::shared_ptr<transwarp::node>& get_node() const noexcept override {
1806  return node_;
1807  }
1808 
1809  /// Adds a new listener for all event types
1810  void add_listener(std::shared_ptr<transwarp::listener> listener) override {
1812  check_listener(listener);
1813  for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1814  l.push_back(listener);
1815  }
1816  }
1817 
1818  /// Adds a new listener for the given event type only
1819  void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
1821  check_listener(listener);
1822  listeners_[get_event_index(event)].push_back(std::move(listener));
1823  }
1824 
1825  /// Adds a new listener for all event types and for all parents
1826  void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
1828  transwarp::detail::add_listener_visitor visitor(std::move(listener));
1829  visit_all(visitor);
1830  }
1831 
1832  /// Adds a new listener for the given event type only and for all parents
1833  void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
1835  transwarp::detail::add_listener_per_event_visitor visitor(event, std::move(listener));
1836  visit_all(visitor);
1837  }
1838 
1839  /// Removes the listener for all event types
1840  void remove_listener(const std::shared_ptr<transwarp::listener>& listener) override {
1842  check_listener(listener);
1843  for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1844  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1845  }
1846  }
1847 
1848  /// Removes the listener for the given event type only
1849  void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
1851  check_listener(listener);
1852  std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_[get_event_index(event)];
1853  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1854  }
1855 
1856  /// Removes the listener for all event types and for all parents
1857  void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
1859  transwarp::detail::remove_listener_visitor visitor(std::move(listener));
1860  visit_all(visitor);
1861  }
1862 
1863  /// Removes the listener for the given event type only and for all parents
1864  void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
1866  transwarp::detail::remove_listener_per_event_visitor visitor(event, std::move(listener));
1867  visit_all(visitor);
1868  }
1869 
1870  /// Removes all listeners
1871  void remove_listeners() override {
1873  for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1874  l.clear();
1875  }
1876  }
1877 
1878  /// Removes all listeners for the given event type
1881  listeners_[get_event_index(event)].clear();
1882  }
1883 
1884  /// Removes all listeners and for all parents
1885  void remove_listeners_all() override {
1888  visit_all(visitor);
1889  }
1890 
1891  /// Removes all listeners for the given event type and for all parents
1895  visit_all(visitor);
1896  }
1897 
1898  /// Schedules this task for execution on the caller thread.
1899  /// The task-specific executor gets precedence if it exists.
1900  /// This overload will reset the underlying future.
1901  void schedule() override {
1903  this->schedule_impl(true);
1904  }
1905 
1906  /// Schedules this task for execution on the caller thread.
1907  /// The task-specific executor gets precedence if it exists.
1908  /// reset denotes whether schedule should reset the underlying
1909  /// future and schedule even if the future is already valid.
1910  void schedule(bool reset) override {
1912  this->schedule_impl(reset);
1913  }
1914 
1915  /// Schedules this task for execution using the provided executor.
1916  /// The task-specific executor gets precedence if it exists.
1917  /// This overload will reset the underlying future.
1920  this->schedule_impl(true, &executor);
1921  }
1922 
1923  /// Schedules this task for execution using the provided executor.
1924  /// The task-specific executor gets precedence if it exists.
1925  /// reset denotes whether schedule should reset the underlying
1926  /// future and schedule even if the future is already valid.
1927  void schedule(transwarp::executor& executor, bool reset) override {
1929  this->schedule_impl(reset, &executor);
1930  }
1931 
1932  /// Schedules all tasks in the graph for execution on the caller thread.
1933  /// The task-specific executors get precedence if they exist.
1934  /// This overload will reset the underlying futures.
1935  void schedule_all() override {
1937  schedule_all_impl(true, transwarp::schedule_type::breadth);
1938  }
1939 
1940  /// Schedules all tasks in the graph for execution using the provided executor.
1941  /// The task-specific executors get precedence if they exist.
1942  /// This overload will reset the underlying futures.
1945  schedule_all_impl(true, transwarp::schedule_type::breadth, &executor);
1946  }
1947 
1948  /// Schedules all tasks in the graph for execution on the caller thread.
1949  /// The task-specific executors get precedence if they exist.
1950  /// reset_all denotes whether schedule_all should reset the underlying
1951  /// futures and schedule even if the futures are already present.
1952  void schedule_all(bool reset_all) override {
1954  schedule_all_impl(reset_all, transwarp::schedule_type::breadth);
1955  }
1956 
1957  /// Schedules all tasks in the graph for execution using the provided executor.
1958  /// The task-specific executors get precedence if they exist.
1959  /// reset_all denotes whether schedule_all should reset the underlying
1960  /// futures and schedule even if the futures are already present.
1963  schedule_all_impl(reset_all, transwarp::schedule_type::breadth, &executor);
1964  }
1965 
1966  /// Schedules all tasks in the graph for execution on the caller thread.
1967  /// The task-specific executors get precedence if they exist.
1968  /// This overload will reset the underlying futures.
1971  schedule_all_impl(true, type);
1972  }
1973 
1974  /// Schedules all tasks in the graph for execution using the provided executor.
1975  /// The task-specific executors get precedence if they exist.
1976  /// This overload will reset the underlying futures.
1979  schedule_all_impl(true, type, &executor);
1980  }
1981 
1982  /// Schedules all tasks in the graph for execution on the caller thread.
1983  /// The task-specific executors get precedence if they exist.
1984  /// reset_all denotes whether schedule_all should reset the underlying
1985  /// futures and schedule even if the futures are already present.
1988  schedule_all_impl(reset_all, type);
1989  }
1990 
1991  /// Schedules all tasks in the graph for execution using the provided executor.
1992  /// The task-specific executors get precedence if they exist.
1993  /// reset_all denotes whether schedule_all should reset the underlying
1994  /// futures and schedule even if the futures are already present.
1997  schedule_all_impl(reset_all, type, &executor);
1998  }
1999 
2000  /// Assigns an exception to this task. Scheduling will have no effect after an exception
2001  /// has been set. Calling reset() will remove the exception and re-enable scheduling.
2002  void set_exception(std::exception_ptr exception) override {
2004  future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2005  schedule_mode_ = false;
2006  }
2007 
2008  /// Returns whether the task was scheduled and not reset afterwards.
2009  /// This means that the underlying future is valid
2010  bool was_scheduled() const noexcept override {
2011  return future_.valid();
2012  }
2013 
2014  /// Waits for the task to complete. Should only be called if was_scheduled()
2015  /// is true, throws transwarp::control_error otherwise
2016  void wait() const override {
2018  future_.wait();
2019  }
2020 
2021  /// Returns whether the task has finished processing. Should only be called
2022  /// if was_scheduled() is true, throws transwarp::control_error otherwise
2023  bool is_ready() const override {
2025  return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2026  }
2027 
2028  /// Returns whether this task contains a result
2029  bool has_result() const noexcept override {
2030  return was_scheduled() && future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2031  }
2032 
2033  /// Resets this task
2034  void reset() override {
2036  future_ = std::shared_future<result_type>();
2037  transwarp::detail::node_manip::set_canceled(*node_, false);
2038  schedule_mode_ = true;
2039  }
2040 
2041  /// Resets all tasks in the graph
2042  void reset_all() override {
2045  visit_all(visitor);
2046  }
2047 
2048  /// If enabled then this task is canceled which will
2049  /// throw transwarp::task_canceled when retrieving the task result.
2050  /// Passing false is equivalent to resume.
2051  void cancel(bool enabled) noexcept override {
2052  transwarp::detail::node_manip::set_canceled(*node_, enabled);
2053  }
2054 
2055  /// If enabled then all pending tasks in the graph are canceled which will
2056  /// throw transwarp::task_canceled when retrieving the task result.
2057  /// Passing false is equivalent to resume.
2058  void cancel_all(bool enabled) noexcept override {
2059  transwarp::detail::cancel_visitor visitor(enabled);
2060  visit_all(visitor);
2061  }
2062 
2063  /// Returns the number of direct parents of this task
2064  std::size_t get_parent_count() const noexcept override {
2066  }
2067 
2068  /// Returns the number of tasks in the graph
2069  std::size_t get_task_count() const noexcept override {
2070  return task_count_;
2071  }
2072 
2073  /// Returns the graph of the task structure. This is mainly for visualizing
2074  /// the tasks and their interdependencies. Pass the result into transwarp::to_string
2075  /// to retrieve a dot-style graph representation for easy viewing.
2076  std::vector<transwarp::edge> get_graph() const override {
2077  std::vector<transwarp::edge> graph;
2078  transwarp::detail::graph_visitor visitor(graph);
2079  const_cast<task_impl_base*>(this)->visit_depth(visitor);
2080  const_cast<task_impl_base*>(this)->unvisit();
2081  return graph;
2082  }
2083 
2084 protected:
2085 
2086  template<typename F>
2087  task_impl_base(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2088  : node_(new transwarp::node),
2089  functor_(std::forward<F>(functor)),
2090  parents_(std::move(parents)...)
2091  {
2092  init(has_name, std::move(name));
2093  }
2094 
2095  template<typename F, typename P>
2096  task_impl_base(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2097  : node_(new transwarp::node),
2098  functor_(std::forward<F>(functor)),
2099  parents_(std::move(parents))
2100  {
2101  if (parents_.empty()) {
2102  throw transwarp::invalid_parameter("parents are empty");
2103  }
2104  init(has_name, std::move(name));
2105  }
2106 
2107  void init(bool has_name, std::string name) {
2108  transwarp::detail::node_manip::set_type(*node_, task_type::value);
2109  transwarp::detail::node_manip::set_name(*node_, (has_name ? std::shared_ptr<std::string>(new std::string(std::move(name))) : nullptr));
2110  transwarp::detail::assign_node_if(functor_, node_);
2112  transwarp::detail::final_visitor visitor(task_count_);
2113  visit_depth(visitor);
2114  unvisit();
2115  }
2116 
2117  /// Checks if the task is currently running and throws transwarp::control_error if it is
2119  if (future_.valid() && future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
2120  throw transwarp::control_error("task currently running: " + transwarp::to_string(*node_, " "));
2121  }
2122  }
2123 
2124  /// Checks if the task was scheduled and throws transwarp::control_error if it's not
2126  if (!future_.valid()) {
2127  throw transwarp::control_error("task was not scheduled: " + transwarp::to_string(*node_, " "));
2128  }
2129  }
2130 
2131  bool schedule_mode_ = true;
2132  std::shared_future<result_type> future_;
2133 
2134 private:
2135 
2136  template<typename R, typename Y, typename T, typename P>
2137  friend class transwarp::detail::runner;
2138 
2139  template<typename R, typename T, typename... A>
2140  friend R transwarp::detail::run_task(std::size_t, const std::weak_ptr<T>&, A&&...);
2141 
2142  /// Assigns the given id to the node
2143  void set_node_id(std::size_t id) noexcept override {
2144  transwarp::detail::node_manip::set_id(*node_, id);
2145  }
2146 
2147  /// Schedules this task for execution using the provided executor.
2148  /// The task-specific executor gets precedence if it exists.
2149  /// Runs the task on the same thread as the caller if neither the global
2150  /// nor the task-specific executor is found.
2151  void schedule_impl(bool reset, transwarp::executor* executor=nullptr) override {
2152  if (schedule_mode_ && (reset || !future_.valid())) {
2153  if (reset) {
2154  transwarp::detail::node_manip::set_canceled(*node_, false);
2155  }
2156  std::weak_ptr<task_impl_base> self = this->shared_from_this();
2158  std::shared_ptr<runner_t> runner = std::shared_ptr<runner_t>(new runner_t(node_->get_id(), self, parents_));
2160  future_ = runner->get_future();
2161  if (executor_) {
2162  executor_->execute([runner]{ (*runner)(); }, node_);
2163  } else if (executor) {
2164  executor->execute([runner]{ (*runner)(); }, node_);
2165  } else {
2166  (*runner)();
2167  }
2168  }
2169  }
2170 
2171  /// Schedules all tasks in the graph for execution using the provided executor.
2172  /// The task-specific executors get precedence if they exist.
2173  /// Runs tasks on the same thread as the caller if neither the global
2174  /// nor a task-specific executor is found.
2175  void schedule_all_impl(bool reset_all, transwarp::schedule_type type, transwarp::executor* executor=nullptr) {
2176  transwarp::detail::schedule_visitor visitor(reset_all, executor);
2177  switch (type) {
2179  visit_breadth_all(visitor);
2180  break;
2182  visit_depth_all(visitor);
2183  break;
2184  default:
2185  throw transwarp::invalid_parameter("schedule type");
2186  }
2187  }
2188 
2189  /// Collects all tasks in depth order
2190  std::vector<transwarp::itask*> tasks_in_depth_order() const {
2191  std::vector<transwarp::itask*> tasks;
2192  const_cast<task_impl_base*>(this)->visit_depth(transwarp::detail::push_task_visitor(tasks));
2193  const_cast<task_impl_base*>(this)->unvisit();
2194  return tasks;
2195  }
2196 
2197  /// Visits all tasks
2198  template<typename Visitor>
2199  void visit_all(Visitor& visitor) {
2200  if (!breadth_tasks_.empty()) {
2201  visit_breadth_all(visitor);
2202  } else if (!depth_tasks_.empty()) {
2203  visit_depth_all(visitor);
2204  } else {
2205  visit_breadth_all(visitor);
2206  }
2207  }
2208 
2209  /// Visits all tasks in a breadth-first traversal.
2210  template<typename Visitor>
2211  void visit_breadth_all(Visitor& visitor) {
2212  if (breadth_tasks_.empty()) {
2213  breadth_tasks_ = tasks_in_depth_order();
2214  auto compare = [](const transwarp::itask* const l, const transwarp::itask* const r) {
2215  const std::size_t l_level = l->get_node()->get_level();
2216  const std::size_t l_id = l->get_node()->get_id();
2217  const std::size_t r_level = r->get_node()->get_level();
2218  const std::size_t r_id = r->get_node()->get_id();
2219  return std::tie(l_level, l_id) < std::tie(r_level, r_id);
2220  };
2221  std::sort(breadth_tasks_.begin(), breadth_tasks_.end(), compare);
2222  }
2223  for (transwarp::itask* task : breadth_tasks_) {
2224  visitor(*task);
2225  }
2226  }
2227 
2228  /// Visits all tasks in a depth-first traversal.
2229  template<typename Visitor>
2230  void visit_depth_all(Visitor& visitor) {
2231  if (depth_tasks_.empty()) {
2232  depth_tasks_ = tasks_in_depth_order();
2233  }
2234  for (transwarp::itask* task : depth_tasks_) {
2235  visitor(*task);
2236  }
2237  }
2238 
2239  /// Visits each task in a depth-first traversal.
2240  void visit_depth(const std::function<void(transwarp::itask&)>& visitor) override {
2241  if (!visited_) {
2243  visitor(*this);
2244  visited_ = true;
2245  }
2246  }
2247 
2248  /// Traverses through each task and marks them as not visited.
2249  void unvisit() noexcept override {
2250  if (visited_) {
2251  visited_ = false;
2253  }
2254  }
2255 
2256  /// Returns the index for a given event type
2257  std::size_t get_event_index(transwarp::event_type event) const {
2258  const std::size_t index = static_cast<std::size_t>(event);
2259  if (index >= static_cast<std::size_t>(transwarp::event_type::count)) {
2260  throw transwarp::invalid_parameter("event type");
2261  }
2262  return index;
2263  }
2264 
2265  /// Raises the given event to all listeners
2266  void raise_event(transwarp::event_type event) const {
2267  for (const std::shared_ptr<transwarp::listener>& listener : listeners_[static_cast<std::size_t>(event)]) {
2268  listener->handle_event(event, node_);
2269  }
2270  }
2271 
2272  /// Check for non-null listener pointer
2273  void check_listener(const std::shared_ptr<transwarp::listener>& listener) const {
2274  if (!listener) {
2275  throw transwarp::invalid_parameter("listener pointer");
2276  }
2277  }
2278 
2279  std::size_t task_count_ = 0;
2280  std::shared_ptr<transwarp::node> node_;
2281  Functor functor_;
2282  typename transwarp::detail::parents<ParentResults...>::type parents_;
2283  bool visited_ = false;
2284  std::shared_ptr<transwarp::executor> executor_;
2285  std::vector<std::shared_ptr<transwarp::listener>> listeners_[static_cast<std::size_t>(transwarp::event_type::count)];
2286  std::vector<transwarp::itask*> depth_tasks_;
2287  std::vector<transwarp::itask*> breadth_tasks_;
2288 };
2289 
2290 
2291 /// A task proxy
2292 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2293 class task_impl_proxy : public transwarp::detail::task_impl_base<ResultType, TaskType, Functor, ParentResults...> {
2294 public:
2295  /// The task type
2296  using task_type = TaskType;
2297 
2298  /// The result type of this task
2299  using result_type = ResultType;
2300 
2301  /// Assigns a value to this task. Scheduling will have no effect after a value
2302  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2303  void set_value(const typename transwarp::decay<result_type>::type& value) override {
2304  this->ensure_task_not_running();
2305  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2306  this->schedule_mode_ = false;
2307  }
2308 
2309  /// Assigns a value to this task. Scheduling will have no effect after a value
2310  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2311  void set_value(typename transwarp::decay<result_type>::type&& value) override {
2312  this->ensure_task_not_running();
2313  this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2314  this->schedule_mode_ = false;
2315  }
2316 
2317  /// Returns the result of this task. Throws any exceptions that the underlying
2318  /// functor throws. Should only be called if was_scheduled() is true,
2319  /// throws transwarp::control_error otherwise
2320  typename transwarp::result<result_type>::type get() const override {
2321  this->ensure_task_was_scheduled();
2322  return this->future_.get();
2323  }
2324 
2325 protected:
2326 
2327  template<typename F>
2328  task_impl_proxy(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2329  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents)...)
2330  {}
2331 
2332  template<typename F, typename P>
2333  task_impl_proxy(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2334  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents))
2335  {}
2336 
2337 };
2338 
2339 /// A task proxy for reference result type.
2340 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2341 class task_impl_proxy<ResultType&, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<ResultType&, TaskType, Functor, ParentResults...> {
2342 public:
2343  /// The task type
2344  using task_type = TaskType;
2345 
2346  /// The result type of this task
2347  using result_type = ResultType&;
2348 
2349  /// Assigns a value to this task. Scheduling will have no effect after a value
2350  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2351  void set_value(typename transwarp::decay<result_type>::type& value) override {
2352  this->ensure_task_not_running();
2353  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2354  this->schedule_mode_ = false;
2355  }
2356 
2357  /// Returns the result of this task. Throws any exceptions that the underlying
2358  /// functor throws. Should only be called if was_scheduled() is true,
2359  /// throws transwarp::control_error otherwise
2360  typename transwarp::result<result_type>::type get() const override {
2361  this->ensure_task_was_scheduled();
2362  return this->future_.get();
2363  }
2364 
2365 protected:
2366 
2367  template<typename F>
2368  task_impl_proxy(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2369  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents)...)
2370  {}
2371 
2372  template<typename F, typename P>
2373  task_impl_proxy(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2374  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents))
2375  {}
2376 
2377 };
2378 
2379 /// A task proxy for void result type.
2380 template<typename TaskType, typename Functor, typename... ParentResults>
2381 class task_impl_proxy<void, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<void, TaskType, Functor, ParentResults...> {
2382 public:
2383  /// The task type
2384  using task_type = TaskType;
2385 
2386  /// The result type of this task
2387  using result_type = void;
2388 
2389  /// Assigns a value to this task. Scheduling will have no effect after a call
2390  /// to this. Calling reset() will reset this and re-enable scheduling.
2391  void set_value() override {
2392  this->ensure_task_not_running();
2393  this->future_ = transwarp::detail::make_ready_future();
2394  this->schedule_mode_ = false;
2395  }
2396 
2397  /// Blocks until the task finishes. Throws any exceptions that the underlying
2398  /// functor throws. Should only be called if was_scheduled() is true,
2399  /// throws transwarp::control_error otherwise
2400  void get() const override {
2401  this->ensure_task_was_scheduled();
2402  this->future_.get();
2403  }
2404 
2405 protected:
2406 
2407  template<typename F>
2408  task_impl_proxy(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2409  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents)...)
2410  {}
2411 
2412  template<typename F, typename P>
2413  task_impl_proxy(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2414  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents))
2415  {}
2416 
2417 };
2418 
2419 } // detail
2420 
2421 
2422 /// A task representing a piece of work given by functor and parent tasks.
2423 /// By connecting tasks a directed acyclic graph is built.
2424 /// Tasks should be created using the make_task factory functions.
2425 template<typename TaskType, typename Functor, typename... ParentResults>
2426 class task_impl : public transwarp::detail::task_impl_proxy<typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type, TaskType, Functor, ParentResults...> {
2427 public:
2428  /// The task type
2429  using task_type = TaskType;
2430 
2431  /// The result type of this task
2432  using result_type = typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type;
2433 
2434  /// A task is defined by name, functor, and parent tasks.
2435  /// Note: Don't use this constructor directly, use transwarp::make_task
2436  template<typename F>
2437  task_impl(bool has_name, std::string name, F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2438  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents)...)
2439  {}
2440 
2441  /// A task is defined by name, functor, and parent tasks.
2442  /// Note: Don't use this constructor directly, use transwarp::make_task
2443  template<typename F, typename P>
2444  task_impl(bool has_name, std::string name, F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2445  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(functor), std::move(parents))
2446  {}
2447 
2448  // delete copy/move semantics
2449  task_impl(const task_impl&) = delete;
2450  task_impl& operator=(const task_impl&) = delete;
2451  task_impl(task_impl&&) = delete;
2452  task_impl& operator=(task_impl&&) = delete;
2453 
2454  /// Creates a continuation to this task
2455  template<typename TaskType_, typename Functor_>
2456  std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>>
2457  then(TaskType_, std::string name, Functor_&& functor) const {
2459  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Functor_>(functor), std::dynamic_pointer_cast<transwarp::task<result_type>>(const_cast<task_impl*>(this)->shared_from_this())));
2460  }
2461 
2462  /// Creates a continuation to this task. Overload for omitting for task name
2463  template<typename TaskType_, typename Functor_>
2464  std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>>
2465  then(TaskType_, Functor_&& functor) const {
2467  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Functor_>(functor), std::dynamic_pointer_cast<transwarp::task<result_type>>(const_cast<task_impl*>(this)->shared_from_this())));
2468  }
2469 
2470 };
2471 
2472 
2473 /// A value task that stores a single value and doesn't require scheduling.
2474 /// Value tasks should be created using the make_value_task factory functions.
2475 template<typename ResultType>
2476 class value_task : public transwarp::task<ResultType>,
2477  public std::enable_shared_from_this<value_task<ResultType>> {
2478 public:
2479  /// The task type
2481 
2482  /// The result type of this task
2483  using result_type = ResultType;
2484 
2485  /// A value task is defined by name and value.
2486  /// Note: Don't use this constructor directly, use transwarp::make_value_task
2487  template<typename T>
2488  value_task(bool has_name, std::string name, T&& value)
2489  : node_(new transwarp::node),
2490  future_(transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value)))
2491  {
2492  transwarp::detail::node_manip::set_type(*node_, task_type::value);
2493  transwarp::detail::node_manip::set_name(*node_, (has_name ? std::shared_ptr<std::string>(new std::string(std::move(name))) : nullptr));
2494  }
2495 
2496  // delete copy/move semantics
2497  value_task(const value_task&) = delete;
2498  value_task& operator=(const value_task&) = delete;
2499  value_task(value_task&&) = delete;
2500  value_task& operator=(value_task&&) = delete;
2501 
2502  /// Creates a continuation to this task
2503  template<typename TaskType_, typename Functor_>
2504  std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>>
2505  then(TaskType_, std::string name, Functor_&& functor) const {
2507  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Functor_>(functor), std::dynamic_pointer_cast<transwarp::task<result_type>>(const_cast<value_task*>(this)->shared_from_this())));
2508  }
2509 
2510  /// Creates a continuation to this task. Overload for omitting the task name
2511  template<typename TaskType_, typename Functor_>
2512  std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>>
2513  then(TaskType_, Functor_&& functor) const {
2515  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Functor_>(functor), std::dynamic_pointer_cast<transwarp::task<result_type>>(const_cast<value_task*>(this)->shared_from_this())));
2516  }
2517 
2518  /// No-op because a value task never runs
2519  void set_executor(std::shared_ptr<transwarp::executor>) override {}
2520 
2521  /// No-op because a value task never runs and doesn't have parents
2522  void set_executor_all(std::shared_ptr<transwarp::executor>) override {}
2523 
2524  /// No-op because a value task never runs
2525  void remove_executor() override {}
2526 
2527  /// No-op because a value task never runs and doesn't have parents
2528  void remove_executor_all() override {}
2529 
2530  /// Sets a task priority (defaults to 0). transwarp will not directly use this.
2531  /// This is only useful if something else is using the priority
2532  void set_priority(std::size_t priority) override {
2533  transwarp::detail::node_manip::set_priority(*node_, priority);
2534  }
2535 
2536  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
2537  /// This is only useful if something else is using the priority
2538  void set_priority_all(std::size_t priority) override {
2539  set_priority(priority);
2540  }
2541 
2542  /// Resets the task priority to 0
2543  void reset_priority() override {
2544  transwarp::detail::node_manip::set_priority(*node_, 0);
2545  }
2546 
2547  /// Resets the priority of all tasks to 0
2548  void reset_priority_all() override {
2549  reset_priority();
2550  }
2551 
2552  /// Assigns custom data to this task. transwarp will not directly use this.
2553  /// This is only useful if something else is using this custom data
2554  void set_custom_data(std::shared_ptr<void> custom_data) override {
2555  if (!custom_data) {
2556  throw transwarp::invalid_parameter("custom data pointer");
2557  }
2558  transwarp::detail::node_manip::set_custom_data(*node_, std::move(custom_data));
2559  }
2560 
2561  /// Assigns custom data to all tasks. transwarp will not directly use this.
2562  /// This is only useful if something else is using this custom data
2563  void set_custom_data_all(std::shared_ptr<void> custom_data) override {
2564  set_custom_data(std::move(custom_data));
2565  }
2566 
2567  /// Removes custom data from this task
2568  void remove_custom_data() override {
2569  transwarp::detail::node_manip::set_custom_data(*node_, nullptr);
2570  }
2571 
2572  /// Removes custom data from all tasks
2573  void remove_custom_data_all() override {
2575  }
2576 
2577  /// Returns the future associated to the underlying execution
2578  const std::shared_future<result_type>& get_future() const noexcept override {
2579  return future_;
2580  }
2581 
2582  /// Returns the associated node
2583  const std::shared_ptr<transwarp::node>& get_node() const noexcept override {
2584  return node_;
2585  }
2586 
2587  /// No-op because a value task doesn't raise events
2588  void add_listener(std::shared_ptr<transwarp::listener>) override {}
2589 
2590  /// No-op because a value task doesn't raise events
2591  void add_listener(transwarp::event_type, std::shared_ptr<transwarp::listener>) override {}
2592 
2593  /// No-op because a value task doesn't raise events
2594  void add_listener_all(std::shared_ptr<transwarp::listener>) override {}
2595 
2596  /// No-op because a value task doesn't raise events
2597  void add_listener_all(transwarp::event_type, std::shared_ptr<transwarp::listener>) override {}
2598 
2599  /// No-op because a value task doesn't raise events
2600  void remove_listener(const std::shared_ptr<transwarp::listener>&) override {}
2601 
2602  /// No-op because a value task doesn't raise events
2603  void remove_listener(transwarp::event_type, const std::shared_ptr<transwarp::listener>&) override {}
2604 
2605  /// No-op because a value task doesn't raise events
2606  void remove_listener_all(const std::shared_ptr<transwarp::listener>&) override {}
2607 
2608  /// No-op because a value task doesn't raise events
2609  void remove_listener_all(transwarp::event_type, const std::shared_ptr<transwarp::listener>&) override {}
2610 
2611  /// No-op because a value task doesn't raise events
2612  void remove_listeners() override {}
2613 
2614  /// No-op because a value task doesn't raise events
2616 
2617  /// No-op because a value task doesn't raise events
2618  void remove_listeners_all() override {}
2619 
2620  /// No-op because a value task doesn't raise events
2622 
2623  /// No-op because a value task never runs
2624  void schedule() override {}
2625 
2626  /// No-op because a value task never runs
2627  void schedule(transwarp::executor&) override {}
2628 
2629  /// No-op because a value task never runs
2630  void schedule(bool) override {}
2631 
2632  /// No-op because a value task never runs
2633  void schedule(transwarp::executor&, bool) override {}
2634 
2635  /// No-op because a value task never runs and doesn't have parents
2636  void schedule_all() override {}
2637 
2638  /// No-op because a value task never runs and doesn't have parents
2640 
2641  /// No-op because a value task never runs and doesn't have parents
2642  void schedule_all(bool) override {}
2643 
2644  /// No-op because a value task never runs and doesn't have parents
2645  void schedule_all(transwarp::executor&, bool) override {}
2646 
2647  /// No-op because a value task never runs and doesn't have parents
2649 
2650  /// No-op because a value task never runs and doesn't have parents
2652 
2653  /// No-op because a value task never runs and doesn't have parents
2654  void schedule_all(transwarp::schedule_type, bool) override {}
2655 
2656  /// No-op because a value task never runs and doesn't have parents
2658 
2659  /// Assigns a value to this task
2660  void set_value(const typename transwarp::decay<result_type>::type& value) override {
2661  future_ = transwarp::detail::make_future_with_value<result_type>(value);
2662  }
2663 
2664  /// Assigns a value to this task
2665  void set_value(typename transwarp::decay<result_type>::type&& value) override {
2666  future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2667  };
2668 
2669  /// Assigns an exception to this task
2670  void set_exception(std::exception_ptr exception) override {
2671  future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2672  }
2673 
2674  /// Returns true because a value task is scheduled once on construction
2675  bool was_scheduled() const noexcept override {
2676  return true;
2677  }
2678 
2679  /// No-op because a value task never runs
2680  void wait() const override {}
2681 
2682  /// Returns true because a value task is always ready
2683  bool is_ready() const override {
2684  return true;
2685  }
2686 
2687  /// Returns true because a value task always contains a result
2688  bool has_result() const noexcept override {
2689  return true;
2690  }
2691 
2692  /// Returns the result of this task
2693  typename transwarp::result<result_type>::type get() const override {
2694  return future_.get();
2695  }
2696 
2697  /// No-op because a value task never runs
2698  void reset() override {}
2699 
2700  /// No-op because a value task never runs and doesn't have parents
2701  void reset_all() override {}
2702 
2703  /// No-op because a value task never runs
2704  void cancel(bool) noexcept override {}
2705 
2706  /// No-op because a value task never runs and doesn't have parents
2707  void cancel_all(bool) noexcept override {}
2708 
2709  /// Returns the number of direct parents of this task
2710  std::size_t get_parent_count() const noexcept override {
2711  return 0;
2712  }
2713 
2714  /// Returns the number of tasks in the graph
2715  std::size_t get_task_count() const noexcept override {
2716  return 1;
2717  }
2718 
2719  /// Returns an empty graph because a value task doesn't have parents
2720  std::vector<transwarp::edge> get_graph() const override {
2721  return {};
2722  }
2723 
2724 private:
2725 
2726  /// Assigns the given id to the node
2727  void set_node_id(std::size_t id) noexcept override {
2728  transwarp::detail::node_manip::set_id(*node_, id);
2729  }
2730 
2731  /// No-op because a value task never runs
2732  void schedule_impl(bool, transwarp::executor*) override {}
2733 
2734  /// Visits this task
2735  void visit_depth(const std::function<void(transwarp::itask&)>& visitor) override {
2736  if (!visited_) {
2737  visitor(*this);
2738  visited_ = true;
2739  }
2740  }
2741 
2742  /// Marks this task as not visited
2743  void unvisit() noexcept override {
2744  visited_ = false;
2745  }
2746 
2747  std::shared_ptr<transwarp::node> node_;
2748  std::shared_future<result_type> future_;
2749  bool visited_ = false;
2750 };
2751 
2752 
2753 /// A factory function to create a new task
2754 template<typename TaskType, typename Functor, typename... Parents>
2755 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>>
2756 make_task(TaskType, std::string name, Functor&& functor, std::shared_ptr<Parents>... parents) {
2757  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>;
2758  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Functor>(functor), std::move(parents)...));
2759 }
2760 
2761 /// A factory function to create a new task. Overload for omitting the task name
2762 template<typename TaskType, typename Functor, typename... Parents>
2763 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>>
2764 make_task(TaskType, Functor&& functor, std::shared_ptr<Parents>... parents) {
2765  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>;
2766  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Functor>(functor), std::move(parents)...));
2767 }
2768 
2769 
2770 /// A factory function to create a new task with vector parents
2771 template<typename TaskType, typename Functor, typename ParentType>
2772 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>>
2773 make_task(TaskType, std::string name, Functor&& functor, std::vector<ParentType> parents) {
2774  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>;
2775  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Functor>(functor), std::move(parents)));
2776 }
2777 
2778 /// A factory function to create a new task with vector parents. Overload for omitting the task name
2779 template<typename TaskType, typename Functor, typename ParentType>
2780 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>>
2781 make_task(TaskType, Functor&& functor, std::vector<ParentType> parents) {
2782  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>;
2783  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Functor>(functor), std::move(parents)));
2784 }
2785 
2786 
2787 /// A factory function to create a new value task
2788 template<typename Value>
2789 std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>>
2790 make_value_task(std::string name, Value&& value) {
2792  return std::shared_ptr<task_t>(new task_t(true, std::move(name), std::forward<Value>(value)));
2793 }
2794 
2795 /// A factory function to create a new value task. Overload for omitting the task name
2796 template<typename Value>
2797 std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>>
2798 make_value_task(Value&& value) {
2800  return std::shared_ptr<task_t>(new task_t(false, "", std::forward<Value>(value)));
2801 }
2802 
2803 
2804 /// A function similar to std::for_each but returning a transwarp task for
2805 /// deferred, possibly asynchronous execution. This function creates a graph
2806 /// with std::distance(first, last) root nodes
2807 template<typename InputIt, typename UnaryOperation>
2808 std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>>
2809 for_each(InputIt first, InputIt last, UnaryOperation unary_op) {
2810  const auto distance = std::distance(first, last);
2811  if (distance <= 0) {
2812  throw transwarp::invalid_parameter("first or last");
2813  }
2814  std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
2815  tasks.reserve(static_cast<std::size_t>(distance));
2816  for (; first != last; ++first) {
2817  tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first]{ unary_op(*first); }));
2818  }
2820 }
2821 
2822 /// A function similar to std::for_each but returning a transwarp task for
2823 /// deferred, possibly asynchronous execution. This function creates a graph
2824 /// with std::distance(first, last) root nodes.
2825 /// Overload for automatic scheduling by passing an executor.
2826 template<typename InputIt, typename UnaryOperation>
2827 std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>>
2828 for_each(transwarp::executor& executor, InputIt first, InputIt last, UnaryOperation unary_op) {
2829  auto task = transwarp::for_each(first, last, unary_op);
2830  task->schedule_all(executor);
2831  return task;
2832 }
2833 
2834 
2835 /// A function similar to std::transform but returning a transwarp task for
2836 /// deferred, possibly asynchronous execution. This function creates a graph
2837 /// with std::distance(first1, last1) root nodes
2838 template<typename InputIt, typename OutputIt, typename UnaryOperation>
2839 std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>>
2840 transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) {
2841  const auto distance = std::distance(first1, last1);
2842  if (distance <= 0) {
2843  throw transwarp::invalid_parameter("first1 or last1");
2844  }
2845  std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
2846  tasks.reserve(static_cast<std::size_t>(distance));
2847  for (; first1 != last1; ++first1, ++d_first) {
2848  tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first1,d_first]{ *d_first = unary_op(*first1); }));
2849  }
2851 }
2852 
2853 /// A function similar to std::transform but returning a transwarp task for
2854 /// deferred, possibly asynchronous execution. This function creates a graph
2855 /// with std::distance(first1, last1) root nodes.
2856 /// Overload for automatic scheduling by passing an executor.
2857 template<typename InputIt, typename OutputIt, typename UnaryOperation>
2858 std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>>
2859 transform(transwarp::executor& executor, InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) {
2860  auto task = transwarp::transform(first1, last1, d_first, unary_op);
2861  task->schedule_all(executor);
2862  return task;
2863 }
2864 
2865 
2866 /// A graph interface giving access to the final task as required by transwarp::graph_pool
2867 template<typename ResultType>
2868 class graph {
2869 public:
2870  using result_type = ResultType;
2871 
2872  virtual ~graph() = default;
2873 
2874  /// Returns the final task of the graph
2875  virtual const std::shared_ptr<transwarp::task<result_type>>& final_task() const = 0;
2876 };
2877 
2878 
2879 /// A graph pool that allows running multiple instances of the same graph in parallel.
2880 /// Graph must be a sub-class of transwarp::graph
2881 template<typename Graph>
2882 class graph_pool {
2883 public:
2884 
2885  static_assert(std::is_base_of<transwarp::graph<typename Graph::result_type>, Graph>::value,
2886  "Graph must be a sub-class of transwarp::graph");
2887 
2888  /// Constructs a graph pool by passing a generator to create a new graph
2889  /// and a minimum and maximum size of the pool. The minimum size is used as
2890  /// the initial size of the pool. Graph should be a subclass of transwarp::graph
2891  graph_pool(std::function<std::shared_ptr<Graph>()> generator,
2892  std::size_t minimum_size,
2893  std::size_t maximum_size)
2894  : generator_(std::move(generator)),
2895  minimum_(minimum_size),
2896  maximum_(maximum_size),
2897  finished_(maximum_size)
2898  {
2899  if (minimum_ < 1) {
2900  throw transwarp::invalid_parameter("minimum size");
2901  }
2902  if (minimum_ > maximum_) {
2903  throw transwarp::invalid_parameter("minimum or maximum size");
2904  }
2905  for (std::size_t i=0; i<minimum_; ++i) {
2906  idle_.push(generate());
2907  }
2908  }
2909 
2910  // delete copy/move semantics
2911  graph_pool(const graph_pool&) = delete;
2912  graph_pool& operator=(const graph_pool&) = delete;
2913  graph_pool(graph_pool&&) = delete;
2914  graph_pool& operator=(graph_pool&&) = delete;
2915 
2916  /// Returns the next idle graph.
2917  /// If there are no idle graphs then it will attempt to double the
2918  /// pool size. If that fails then it will return a nullptr. On successful
2919  /// retrieval of an idle graph the function will mark that graph as busy.
2920  std::shared_ptr<Graph> next_idle_graph(bool maybe_resize=true) {
2921  std::shared_ptr<transwarp::node> finished_node;
2922  {
2923  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
2924  if (!finished_.empty()) {
2925  finished_node = finished_.front(); finished_.pop();
2926  }
2927  }
2928 
2929  std::shared_ptr<Graph> g;
2930  if (finished_node) {
2931  g = busy_.find(finished_node)->second;
2932  } else {
2933  if (maybe_resize && idle_.empty()) {
2934  resize(size() * 2); // double pool size
2935  }
2936  if (idle_.empty()) {
2937  return nullptr;
2938  }
2939  g = idle_.front(); idle_.pop();
2940  busy_.emplace(g->final_task()->get_node(), g);
2941  }
2942 
2943  const auto& future = g->final_task()->get_future();
2944  if (future.valid()) {
2945  future.wait(); // will return immediately
2946  }
2947  return g;
2948  }
2949 
2950  /// Just like next_idle_graph() but waits for a graph to become available.
2951  /// The returned graph will always be a valid pointer
2952  std::shared_ptr<Graph> wait_for_next_idle_graph(bool maybe_resize=true) {
2953  for (;;) {
2954  std::shared_ptr<Graph> g = next_idle_graph(maybe_resize);
2955  if (g) {
2956  return g;
2957  }
2958  }
2959  }
2960 
2961  /// Returns the current total size of the pool (sum of idle and busy graphs)
2962  std::size_t size() const {
2963  return idle_.size() + busy_.size();
2964  }
2965 
2966  /// Returns the minimum size of the pool
2967  std::size_t minimum_size() const {
2968  return minimum_;
2969  }
2970 
2971  /// Returns the maximum size of the pool
2972  std::size_t maximum_size() const {
2973  return maximum_;
2974  }
2975 
2976  /// Returns the number of idle graphs in the pool
2977  std::size_t idle_count() const {
2978  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
2979  return idle_.size() + finished_.size();
2980  }
2981 
2982  /// Returns the number of busy graphs in the pool
2983  std::size_t busy_count() const {
2984  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
2985  return busy_.size() - finished_.size();
2986  }
2987 
2988  /// Resizes the graph pool to the given new size if possible
2989  void resize(std::size_t new_size) {
2990  reclaim();
2991  if (new_size > size()) { // grow
2992  const std::size_t count = new_size - size();
2993  for (std::size_t i=0; i<count; ++i) {
2994  if (size() == maximum_) {
2995  break;
2996  }
2997  idle_.push(generate());
2998  }
2999  } else if (new_size < size()) { // shrink
3000  const std::size_t count = size() - new_size;
3001  for (std::size_t i=0; i<count; ++i) {
3002  if (idle_.empty() || size() == minimum_) {
3003  break;
3004  }
3005  idle_.pop();
3006  }
3007  }
3008  }
3009 
3010  /// Reclaims finished graphs by marking them as idle again
3011  void reclaim() {
3012  decltype(finished_) finished{finished_.capacity()};
3013  {
3014  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3015  finished_.swap(finished);
3016  }
3017  while (!finished.empty()) {
3018  const std::shared_ptr<transwarp::node> node = finished.front(); finished.pop();
3019  const auto it = busy_.find(node);
3020  idle_.push(it->second);
3021  busy_.erase(it);
3022  }
3023  }
3024 
3025 private:
3026 
3027  class finished_listener : public transwarp::listener {
3028  public:
3029 
3030  explicit
3031  finished_listener(graph_pool<Graph>& pool)
3032  : pool_(pool)
3033  {}
3034 
3035  // Called on a potentially high-priority thread
3036  void handle_event(transwarp::event_type, const std::shared_ptr<transwarp::node>& node) override {
3037  std::lock_guard<transwarp::detail::spinlock> lock(pool_.spinlock_);
3038  pool_.finished_.push(node);
3039  }
3040 
3041  private:
3042  graph_pool<Graph>& pool_;
3043  };
3044 
3045  std::shared_ptr<Graph> generate() {
3046  std::shared_ptr<Graph> graph = generator_();
3047  graph->final_task()->add_listener(transwarp::event_type::after_finished, listener_);
3048  return graph;
3049  }
3050 
3051  std::function<std::shared_ptr<Graph>()> generator_;
3052  std::size_t minimum_;
3053  std::size_t maximum_;
3054  mutable transwarp::detail::spinlock spinlock_; // protecting finished_
3056  std::queue<std::shared_ptr<Graph>> idle_;
3057  std::unordered_map<std::shared_ptr<transwarp::node>, std::shared_ptr<Graph>> busy_;
3058  std::shared_ptr<transwarp::listener> listener_{new finished_listener(*this)};
3059 };
3060 
3061 
3062 /// A timer that tracks the average idle, wait, and run time of each task it listens to.
3063 /// - idle = time between scheduling and starting the task (executor dependent)
3064 /// - wait = time between starting and invoking the task's functor, i.e. wait for parent tasks to finish
3065 /// - run = time between invoking and finishing the task's computations
3066 class timer : public transwarp::listener {
3067 public:
3068  timer() = default;
3069 
3070  // delete copy/move semantics
3071  timer(const timer&) = delete;
3072  timer& operator=(const timer&) = delete;
3073  timer(timer&&) = delete;
3074  timer& operator=(timer&&) = delete;
3075 
3076  /// Performs the actual timing and populates the node's timing members
3077  void handle_event(transwarp::event_type event, const std::shared_ptr<transwarp::node>& node) override {
3078  switch (event) {
3080  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3081  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3082  auto& track = tracks_[node];
3083  track.startidle = now;
3084  }
3085  break;
3087  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3088  track_idletime(node, now);
3089  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3090  auto& track = tracks_[node];
3091  track.startwait = now;
3092  }
3093  break;
3095  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3096  track_waittime(node, now);
3097  }
3098  break;
3100  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3101  track_waittime(node, now);
3102  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3103  auto& track = tracks_[node];
3104  track.running = true;
3105  track.startrun = now;
3106  }
3107  break;
3109  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3110  track_runtime(node, now);
3111  }
3112  break;
3113  default: break;
3114  }
3115  }
3116 
3117  /// Resets all timing information
3118  void reset() {
3119  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3120  tracks_.clear();
3121  }
3122 
3123 private:
3124 
3125  void track_idletime(const std::shared_ptr<transwarp::node>& node, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3126  std::int64_t avg_idletime_us;
3127  {
3128  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3129  auto& track = tracks_[node];
3130  track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3131  ++track.idlecount;
3132  avg_idletime_us = static_cast<std::int64_t>(track.idletime / track.idlecount);
3133  }
3134  transwarp::detail::node_manip::set_avg_idletime_us(*node, avg_idletime_us);
3135  };
3136 
3137  void track_waittime(const std::shared_ptr<transwarp::node>& node, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3138  std::int64_t avg_waittime_us;
3139  {
3140  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3141  auto& track = tracks_[node];
3142  track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3143  ++track.waitcount;
3144  avg_waittime_us = static_cast<std::int64_t>(track.waittime / track.waitcount);
3145  }
3146  transwarp::detail::node_manip::set_avg_waittime_us(*node, avg_waittime_us);
3147  };
3148 
3149  void track_runtime(const std::shared_ptr<transwarp::node>& node, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3150  std::int64_t avg_runtime_us;
3151  {
3152  std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3153  auto& track = tracks_[node];
3154  if (!track.running) {
3155  return;
3156  }
3157  track.running = false;
3158  track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3159  ++track.runcount;
3160  avg_runtime_us = static_cast<std::int64_t>(track.runtime / track.runcount);
3161  }
3162  transwarp::detail::node_manip::set_avg_runtime_us(*node, avg_runtime_us);
3163  }
3164 
3165  struct track {
3166  bool running = false;
3167  std::chrono::time_point<std::chrono::steady_clock> startidle;
3168  std::chrono::time_point<std::chrono::steady_clock> startwait;
3169  std::chrono::time_point<std::chrono::steady_clock> startrun;
3170  std::chrono::microseconds::rep idletime = 0;
3171  std::chrono::microseconds::rep idlecount = 0;
3172  std::chrono::microseconds::rep waittime = 0;
3173  std::chrono::microseconds::rep waitcount = 0;
3174  std::chrono::microseconds::rep runtime = 0;
3175  std::chrono::microseconds::rep runcount = 0;
3176  };
3177 
3178  transwarp::detail::spinlock spinlock_; // protecting tracks_
3179  std::unordered_map<std::shared_ptr<transwarp::node>, track> tracks_;
3180 };
3181 
3182 
3183 } // transwarp
void set_custom_data_all(std::shared_ptr< void > custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1780
constexpr transwarp::wait_any_type wait_any
The wait_any task tag.
Definition: transwarp.h:128
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2573
The executor interface used to perform custom task execution.
Definition: transwarp.h:312
Exception thrown when a task was destroyed prematurely.
Definition: transwarp.h:61
Adds a new listener to the given task.
Definition: transwarp.h:1179
A simple thread pool used to execute tasks in parallel.
Definition: transwarp.h:579
void cancel_all(bool) noexceptoverride
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2707
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:1857
void operator()(const transwarp::itask &task) const
Definition: transwarp.h:1033
Node manipulation.
Definition: transwarp.h:517
Removes a listener from the given task.
Definition: transwarp.h:1206
void remove_listeners(transwarp::event_type) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2615
Generates a graph.
Definition: transwarp.h:1059
The consume type. Used for tag dispatch.
Definition: transwarp.h:115
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1276
void schedule_all(transwarp::executor &) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2639
Removes the executor from the given task.
Definition: transwarp.h:1119
Definition: transwarp.h:980
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:1762
Definition: transwarp.h:140
The task has no parents.
Just after a task was canceled (handle_event called on thread that task is run on) ...
TaskType task_type
The task type.
Definition: transwarp.h:2384
A callable to run a task given its parents.
Definition: transwarp.h:1461
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task.
Definition: transwarp.h:2670
void schedule_all(bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1952
Adds a new listener per event type to the given task.
Definition: transwarp.h:1192
void ensure_task_was_scheduled() const
Checks if the task was scheduled and throws transwarp::control_error if it&#39;s not. ...
Definition: transwarp.h:2125
Sets parents and level of the node.
Definition: transwarp.h:1029
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, std::string name, Functor_ &&functor) const
Creates a continuation to this task.
Definition: transwarp.h:2457
std::shared_ptr< transwarp::value_task< typename transwarp::decay< Value >::type > > make_value_task(std::string name, Value &&value)
A factory function to create a new value task.
Definition: transwarp.h:2790
std::size_t get_parent_count() const noexceptoverride
Returns the number of direct parents of this task.
Definition: transwarp.h:2710
void reset()
Resets all timing information.
Definition: transwarp.h:3118
Definition: transwarp.h:1431
void set_executor(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs.
Definition: transwarp.h:2519
constexpr no_op_functor no_op
An object to use in places where a no-op functor is required.
Definition: transwarp.h:1636
void set_executor_all(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to all tasks which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1720
bool was_scheduled() const noexceptoverride
Returns whether the task was scheduled and not reset afterwards. This means that the underlying futur...
Definition: transwarp.h:2010
void cancel(bool) noexceptoverride
No-op because a value task never runs.
Definition: transwarp.h:2704
Just before a task is scheduled (handle_event called on thread of caller to schedule()) ...
void schedule_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2636
void schedule_all(transwarp::executor &executor, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1961
std::size_t minimum_size() const
Returns the minimum size of the pool.
Definition: transwarp.h:2967
std::string get_name() const override
Returns the name of the executor.
Definition: transwarp.h:1678
void add_listener_all(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2594
Assigns a priority to the given task.
Definition: transwarp.h:1127
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task.
Definition: transwarp.h:2660
void push(T &&value)
Pushes a new value onto the end of the buffer. If that exceeds the capacity of the buffer then the ol...
Definition: transwarp.h:1532
void schedule(transwarp::executor &executor) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1918
Assigns an executor to the given task.
Definition: transwarp.h:1107
std::size_t capacity() const
Returns the capacity of the buffer.
Definition: transwarp.h:1552
A simple circular buffer (FIFO). ValueType must support default construction. The buffer lets you pus...
Definition: transwarp.h:1506
void set_priority(std::size_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:2532
void cancel_all(bool enabled) noexceptoverride
If enabled then all pending tasks in the graph are canceled which will throw transwarp::task_canceled...
Definition: transwarp.h:2058
std::size_t get_parent_count() const noexceptoverride
Returns the number of direct parents of this task.
Definition: transwarp.h:2064
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:1787
void set_custom_data_all(std::shared_ptr< void > custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2563
std::tuple< std::shared_future< ParentResults >...> get_futures(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &input)
Returns the futures from the given tuple of tasks.
Definition: transwarp.h:680
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task.
Definition: transwarp.h:2665
void schedule_all(transwarp::schedule_type type, bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1986
Scheduling according to a breadth-first search (default)
void remove_listeners_all(transwarp::event_type) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2621
void schedule_all() override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1935
void set_executor(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to this task which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1709
value_task(bool has_name, std::string name, T &&value)
A value task is defined by name and value. Note: Don&#39;t use this constructor directly, use transwarp::make_value_task.
Definition: transwarp.h:2488
void set_custom_data(std::shared_ptr< void > custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2554
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:1864
The task class.
Definition: transwarp.h:443
Definition: transwarp.h:1611
A functor not doing nothing.
Definition: transwarp.h:1631
Definition: transwarp.h:991
void schedule_all(transwarp::executor &executor, transwarp::schedule_type type, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1995
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2548
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2303
void set_priority(std::size_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:1742
const std::shared_ptr< std::string > & get_executor() const noexcept
The optional, task-specific executor (may be null)
Definition: transwarp.h:178
void assign_node_if(Functor &functor, std::shared_ptr< transwarp::node > node) noexcept
Assigns the node to the given functor if the functor is a subclass of transwarp::functor.
Definition: transwarp.h:1380
void reset() override
Resets this task.
Definition: transwarp.h:2034
const std::vector< std::shared_ptr< node > > & get_parents() const noexcept
The task&#39;s parents (may be empty)
Definition: transwarp.h:183
Just before a task&#39;s functor is invoked (handle_event called on thread that task is run on) ...
Definition: transwarp.h:978
The task&#39;s functor accepts the first parent future that becomes ready.
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, Functor_ &&functor) const
Creates a continuation to this task. Overload for omitting for task name.
Definition: transwarp.h:2465
The consume_any type. Used for tag dispatch.
Definition: transwarp.h:119
ResultType result_type
The result type of this task.
Definition: transwarp.h:2299
void set_custom_data(std::shared_ptr< void > custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1770
bool was_scheduled() const noexceptoverride
Returns true because a value task is scheduled once on construction.
Definition: transwarp.h:2675
std::size_t get_task_count() const noexceptoverride
Returns the number of tasks in the graph.
Definition: transwarp.h:2069
Unvisits the given task.
Definition: transwarp.h:1267
void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &) override
Pushes the functor into the thread pool for asynchronous execution.
Definition: transwarp.h:1683
void wait_for_all(const std::vector< std::shared_ptr< transwarp::task< ParentResultType >>> &parents)
Waits for all parents to finish.
Definition: transwarp.h:725
void remove_listeners_all() override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2618
The accept type. Used for tag dispatch.
Definition: transwarp.h:107
void add_listener_all(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2597
const std::shared_future< result_type > & get_future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:1800
constexpr transwarp::accept_type accept
The accept task tag.
Definition: transwarp.h:108
void remove_listener_all(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2609
void schedule_all(transwarp::executor &, transwarp::schedule_type, bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2657
void remove_listeners() override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2612
bool has_result() const noexceptoverride
Returns whether this task contains a result.
Definition: transwarp.h:2029
std::size_t maximum_size() const
Returns the maximum size of the pool.
Definition: transwarp.h:2972
Removes all listeners from the given task.
Definition: transwarp.h:1233
The listener interface to listen to events raised by tasks.
Definition: transwarp.h:340
std::size_t get_task_count() const noexceptoverride
Returns the number of tasks in the graph.
Definition: transwarp.h:2715
Exception thrown when a task is canceled.
Definition: transwarp.h:52
TaskType task_type
The task type.
Definition: transwarp.h:2429
void wait() const override
Waits for the task to complete. Should only be called if was_scheduled() is true, throws transwarp::c...
Definition: transwarp.h:2016
Scheduling according to a depth-first search.
void add_listener(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only.
Definition: transwarp.h:1819
std::size_t get_level() const noexcept
The task level.
Definition: transwarp.h:163
std::shared_future< ResultType > make_future_with_value(Value &&value)
Returns a ready future with the given value as its state.
Definition: transwarp.h:1386
std::string get_name() const override
Returns the name of the executor.
Definition: transwarp.h:1652
void schedule() override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1901
void add_listener(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2588
void add_listener(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types.
Definition: transwarp.h:1810
std::shared_ptr< Graph > next_idle_graph(bool maybe_resize=true)
Returns the next idle graph. If there are no idle graphs then it will attempt to double the pool size...
Definition: transwarp.h:2920
void ensure_task_not_running() const
Checks if the task is currently running and throws transwarp::control_error if it is...
Definition: transwarp.h:2118
Just before a task starts running (handle_event called on thread that task is run on) ...
Removes custom data from the given task.
Definition: transwarp.h:1159
void schedule_all(transwarp::executor &, bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2645
Resets the given task.
Definition: transwarp.h:1087
const std::shared_ptr< std::string > & get_name() const noexcept
The optional task name (may be null)
Definition: transwarp.h:173
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:1892
bool full() const
Returns whether the buffer is full.
Definition: transwarp.h:1568
A base class for a user-defined functor that needs access to the node associated to the task or a can...
Definition: transwarp.h:484
constexpr transwarp::consume_any_type consume_any
The consume_any task tag.
Definition: transwarp.h:120
Removes a listener per event type from the given task.
Definition: transwarp.h:1219
bool is_canceled() const noexcept
Returns whether the associated task is canceled.
Definition: transwarp.h:198
A node carrying meta-data of a task.
Definition: transwarp.h:146
std::shared_ptr< transwarp::task_impl< transwarp::wait_type, transwarp::no_op_functor, std::vector< std::shared_ptr< transwarp::task< void > > > > > for_each(InputIt first, InputIt last, UnaryOperation unary_op)
A function similar to std::for_each but returning a transwarp task for deferred, possibly asynchronou...
Definition: transwarp.h:2809
Exception thrown when a task is used in unintended ways.
Definition: transwarp.h:79
void resize(std::size_t new_size)
Resizes the graph pool to the given new size if possible.
Definition: transwarp.h:2989
Resets the priority of the given task.
Definition: transwarp.h:1139
void schedule() override
No-op because a value task never runs.
Definition: transwarp.h:2624
std::size_t size() const
Returns the number of populated values of the buffer. Its maximum value equals the capacity of the bu...
Definition: transwarp.h:1558
Returns the result type of a std::shared_future&lt;T&gt;
Definition: transwarp.h:436
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:1826
void add_listener(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2591
void wait() const override
No-op because a value task never runs.
Definition: transwarp.h:2680
void schedule_all(transwarp::executor &executor, transwarp::schedule_type type) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1977
std::size_t get_priority() const noexcept
The task priority (defaults to 0)
Definition: transwarp.h:188
const std::shared_ptr< transwarp::node > & get_parent() const noexcept
Returns the parent node.
Definition: transwarp.h:280
A task proxy.
Definition: transwarp.h:2293
void transwarp_cancel_point() const
If the associated task is canceled then this will throw transwarp::task_canceled which will stop the ...
Definition: transwarp.h:498
std::shared_ptr< transwarp::task_impl< transwarp::wait_type, transwarp::no_op_functor, std::vector< std::shared_ptr< transwarp::task< void > > > > > transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op)
A function similar to std::transform but returning a transwarp task for deferred, possibly asynchrono...
Definition: transwarp.h:2840
The base task class that contains the functionality that can be used with all result types (void and ...
Definition: transwarp.h:1698
Definition: transwarp.h:666
The task&#39;s functor consumes all parent results.
std::shared_ptr< Graph > wait_for_next_idle_graph(bool maybe_resize=true)
Just like next_idle_graph() but waits for a graph to become available. The returned graph will always...
Definition: transwarp.h:2952
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:1885
Just after a task has finished running (handle_event called on thread that task is run on) ...
void remove_listener(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2600
void set_priority_all(std::size_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority.
Definition: transwarp.h:2538
Base class for exceptions.
Definition: transwarp.h:43
void schedule_all(transwarp::schedule_type) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2648
std::size_t size() const
Returns the current total size of the pool (sum of idle and busy graphs)
Definition: transwarp.h:2962
void reclaim()
Reclaims finished graphs by marking them as idle again.
Definition: transwarp.h:3011
constexpr transwarp::consume_type consume
The consume task tag.
Definition: transwarp.h:116
std::size_t get_id() const noexcept
The task ID.
Definition: transwarp.h:158
void schedule(transwarp::executor &, bool) override
No-op because a value task never runs.
Definition: transwarp.h:2633
A value task that stores a single value and doesn&#39;t require scheduling. Value tasks should be created...
Definition: transwarp.h:2476
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:2543
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:1833
void remove_listeners(transwarp::event_type event) override
Removes all listeners for the given event type.
Definition: transwarp.h:1879
void remove_listener(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types.
Definition: transwarp.h:1840
Determines the type of the parents.
Definition: transwarp.h:1413
const std::shared_ptr< transwarp::node > & transwarp_node() const noexcept
The node associated to the task.
Definition: transwarp.h:492
Parent wait_for_any(const std::shared_ptr< transwarp::task< ParentResults >> &...parents)
Waits for the first parent to finish.
Definition: transwarp.h:748
void reset_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2701
void remove_listener(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only.
Definition: transwarp.h:1849
Assigns custom data to the given task.
Definition: transwarp.h:1147
The task&#39;s functor takes no arguments but waits for all parents to finish.
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task. Scheduling will have no effect after an exception has been set...
Definition: transwarp.h:2002
void cancel_all_but_one(const std::shared_ptr< transwarp::task< OneResult >> &one, const std::vector< std::shared_ptr< transwarp::task< ParentResultType >>> &parents)
Cancels all tasks but one.
Definition: transwarp.h:787
Definition: transwarp.h:806
void handle_event(transwarp::event_type event, const std::shared_ptr< transwarp::node > &node) override
Performs the actual timing and populates the node&#39;s timing members.
Definition: transwarp.h:3077
Removes reference and const from a type.
Definition: transwarp.h:429
void remove_listeners() override
Removes all listeners.
Definition: transwarp.h:1871
const std::shared_ptr< void > & get_custom_data() const noexcept
The custom task data (may be null)
Definition: transwarp.h:193
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, std::string name, Functor_ &&functor) const
Creates a continuation to this task.
Definition: transwarp.h:2505
void call_with_each(const Functor &f, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &t)
Calls the functor with every element in the tuple.
Definition: transwarp.h:1010
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:1756
void schedule_all(transwarp::executor &, transwarp::schedule_type) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2651
void schedule_all(transwarp::schedule_type, bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2654
An edge between two nodes.
Definition: transwarp.h:267
const std::shared_ptr< transwarp::node > & get_child() const noexcept
Returns the child node.
Definition: transwarp.h:285
std::int64_t get_avg_runtime_us() const noexcept
Returns the average runtime in microseconds (-1 if never set)
Definition: transwarp.h:213
A timer that tracks the average idle, wait, and run time of each task it listens to.
Definition: transwarp.h:3066
void schedule_all(transwarp::schedule_type type) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1969
std::vector< transwarp::edge > get_graph() const override
Returns the graph of the task structure. This is mainly for visualizing the tasks and their interdepe...
Definition: transwarp.h:2076
graph_pool(std::function< std::shared_ptr< Graph >()> generator, std::size_t minimum_size, std::size_t maximum_size)
Constructs a graph pool by passing a generator to create a new graph and a minimum and maximum size o...
Definition: transwarp.h:2891
void set_value(typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2351
void schedule(transwarp::executor &) override
No-op because a value task never runs.
Definition: transwarp.h:2627
void reset_all() override
Resets all tasks in the graph.
Definition: transwarp.h:2042
std::int64_t get_avg_waittime_us() const noexcept
Returns the average waittime in microseconds (-1 if never set)
Definition: transwarp.h:208
A graph interface giving access to the final task as required by transwarp::graph_pool.
Definition: transwarp.h:2868
std::size_t busy_count() const
Returns the number of busy graphs in the pool.
Definition: transwarp.h:2983
circular_buffer(std::size_t capacity)
Constructs a circular buffer with a given fixed capacity.
Definition: transwarp.h:1515
Definition: transwarp.h:797
ResultType result_type
The result type of this task.
Definition: transwarp.h:2483
Executor for parallel execution. Uses a simple thread pool.
Definition: transwarp.h:1664
The wait_any type. Used for tag dispatch.
Definition: transwarp.h:127
void set_priority_all(std::size_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority (e.g. a custom executor)
Definition: transwarp.h:1749
void schedule(bool reset) override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1910
virtual std::string get_name() const =0
Returns the name of the executor.
std::string to_string(const transwarp::task_type &type)
String conversion for the task_type enumeration.
Definition: transwarp.h:88
virtual void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &node)=0
Runs a task which is wrapped by the given functor. The functor only captures one shared pointer and c...
void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &) override
Runs the functor on the current thread.
Definition: transwarp.h:1657
void remove_executor() override
Removes the executor from this task.
Definition: transwarp.h:1727
The accept_any type. Used for tag dispatch.
Definition: transwarp.h:111
transwarp::task_type get_type() const noexcept
The task type.
Definition: transwarp.h:168
std::shared_ptr< transwarp::task_impl< TaskType, typename std::decay< Functor >::type, typename Parents::result_type...> > make_task(TaskType, std::string name, Functor &&functor, std::shared_ptr< Parents >...parents)
A factory function to create a new task.
Definition: transwarp.h:2756
virtual void handle_event(transwarp::event_type event, const std::shared_ptr< transwarp::node > &node)=0
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions.
bool is_ready() const override
Returns true because a value task is always ready.
Definition: transwarp.h:2683
constexpr transwarp::wait_type wait
The wait task tag.
Definition: transwarp.h:124
The root type. Used for tag dispatch.
Definition: transwarp.h:103
The task&#39;s functor consumes the first parent result that becomes ready.
A task representing a piece of work given by functor and parent tasks. By connecting tasks a directed...
Definition: transwarp.h:2426
void remove_listener(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2603
The wait type. Used for tag dispatch.
Definition: transwarp.h:123
void cancel(bool enabled) noexceptoverride
If enabled then this task is canceled which will throw transwarp::task_canceled when retrieving the t...
Definition: transwarp.h:2051
Schedules using the given executor.
Definition: transwarp.h:1074
void swap(circular_buffer &buffer)
Swaps this buffer with the given buffer.
Definition: transwarp.h:1573
Result call(std::size_t node_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Calls the functor of the given task with the results from the tuple of parents. Throws transwarp::tas...
Definition: transwarp.h:963
Visits the given task using the visitor given in the constructor.
Definition: transwarp.h:1255
constexpr transwarp::root_type root
The root task tag.
Definition: transwarp.h:104
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:2568
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2311
void schedule_all(transwarp::executor &executor) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1943
const value_type & front() const
Returns the value at the front of the buffer (the oldest value). This is undefined if the buffer is e...
Definition: transwarp.h:1539
static Result work(std::size_t node_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Definition: transwarp.h:890
schedule_type
Determines in which order tasks are scheduled in the graph.
Definition: transwarp.h:351
The task&#39;s functor accepts all parent futures.
A graph pool that allows running multiple instances of the same graph in parallel. Graph must be a sub-class of transwarp::graph.
Definition: transwarp.h:2882
Applies final bookkeeping to the task.
Definition: transwarp.h:1045
void remove_executor_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2528
typename transwarp::detail::functor_result< TaskType, Functor, ParentResults...>::type result_type
The result type of this task.
Definition: transwarp.h:2432
void schedule(transwarp::executor &executor, bool reset) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1927
std::vector< transwarp::edge > get_graph() const override
Returns an empty graph because a value task doesn&#39;t have parents.
Definition: transwarp.h:2720
const std::shared_ptr< transwarp::node > & get_node() const noexceptoverride
Returns the associated node.
Definition: transwarp.h:1805
task_impl(bool has_name, std::string name, F &&functor, std::vector< std::shared_ptr< transwarp::task< P >>> parents)
A task is defined by name, functor, and parent tasks. Note: Don&#39;t use this constructor directly...
Definition: transwarp.h:2444
task_type
The possible task types.
Definition: transwarp.h:31
void set_value() override
Assigns a value to this task. Scheduling will have no effect after a call to this. Calling reset() will reset this and re-enable scheduling.
Definition: transwarp.h:2391
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, Functor_ &&functor) const
Creates a continuation to this task. Overload for omitting the task name.
Definition: transwarp.h:2513
void pop()
Removes the value at the front of the buffer (the oldest value)
Definition: transwarp.h:1544
event_type
The task events that can be subscribed to using the listener interface.
Definition: transwarp.h:329
Removes all listeners per event type from the given task.
Definition: transwarp.h:1242
std::int64_t get_avg_idletime_us() const noexcept
Returns the average idletime in microseconds (-1 if never set)
Definition: transwarp.h:203
The task&#39;s functor takes no arguments but waits for the first parent to finish.
Exception thrown when an invalid parameter was passed to a function.
Definition: transwarp.h:70
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:1793
void reset() override
No-op because a value task never runs.
Definition: transwarp.h:2698
std::shared_future< void > make_ready_future()
Returns a ready future.
Definition: transwarp.h:1393
void set_executor_all(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2522
constexpr transwarp::accept_any_type accept_any
The accept_any task tag.
Definition: transwarp.h:112
bool empty() const
Returns whether the buffer is empty.
Definition: transwarp.h:1563
const std::shared_future< result_type > & get_future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:2578
Cancels or resumes the given task.
Definition: transwarp.h:1095
void remove_listener_all(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2606
std::size_t idle_count() const
Returns the number of idle graphs in the pool.
Definition: transwarp.h:2977
void remove_executor() override
No-op because a value task never runs.
Definition: transwarp.h:2525
void schedule(bool) override
No-op because a value task never runs.
Definition: transwarp.h:2630
virtual const std::shared_ptr< transwarp::task< result_type > > & final_task() const =0
Returns the final task of the graph.
void schedule_all(bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2642
Pushes the given task into the vector of tasks.
Definition: transwarp.h:1167
Executor for sequential execution. Runs functors sequentially on the same thread. ...
Definition: transwarp.h:1640
bool has_result() const noexceptoverride
Returns true because a value task always contains a result.
Definition: transwarp.h:2688
task_impl(bool has_name, std::string name, F &&functor, std::shared_ptr< transwarp::task< ParentResults >>...parents)
A task is defined by name, functor, and parent tasks. Note: Don&#39;t use this constructor directly...
Definition: transwarp.h:2437
std::shared_future< ResultType > make_future_with_exception(std::exception_ptr exception)
Returns a ready future with the given exception as its state.
Definition: transwarp.h:1401
void remove_executor_all() override
Removes the executor from all tasks.
Definition: transwarp.h:1734
const std::shared_ptr< transwarp::node > & get_node() const noexceptoverride
Returns the associated node.
Definition: transwarp.h:2583
bool is_ready() const override
Returns whether the task has finished processing. Should only be called if was_scheduled() is true...
Definition: transwarp.h:2023
An interface for the task class.
Definition: transwarp.h:358
Result run_task(std::size_t node_id, const std::weak_ptr< Task > &task, Args &&...args)
Runs the task with the given arguments, hence, invoking the task&#39;s functor.
Definition: transwarp.h:700