transwarp
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
transwarp.h
1 /// @mainpage transwarp is a header-only C++ library for task concurrency
2 /// @details https://github.com/bloomen/transwarp
3 /// @version 2.0.0
4 /// @author Christian Blume, Guan Wang
5 /// @date 2019
6 /// @copyright MIT http://www.opensource.org/licenses/mit-license.php
7 #pragma once
8 #include <algorithm>
9 #include <any>
10 #include <array>
11 #include <atomic>
12 #include <chrono>
13 #include <cstddef>
14 #include <cstdint>
15 #include <functional>
16 #include <future>
17 #include <memory>
18 #include <mutex>
19 #include <optional>
20 #include <queue>
21 #include <stdexcept>
22 #include <string>
23 #include <thread>
24 #include <tuple>
25 #include <type_traits>
26 #include <unordered_map>
27 #include <utility>
28 #include <vector>
29 
30 
31 /// The transwarp namespace
32 namespace transwarp {
33 
34 
35 /// The possible task types
36 enum class task_type {
37  root, ///< The task has no parents
38  accept, ///< The task's functor accepts all parent futures
39  accept_any, ///< The task's functor accepts the first parent future that becomes ready
40  consume, ///< The task's functor consumes all parent results
41  consume_any, ///< The task's functor consumes the first parent result that becomes ready
42  wait, ///< The task's functor takes no arguments but waits for all parents to finish
43  wait_any, ///< The task's functor takes no arguments but waits for the first parent to finish
44 };
45 
46 
47 /// Base class for exceptions
48 class transwarp_error : public std::runtime_error {
49 public:
50  explicit transwarp_error(const std::string& message)
51  : std::runtime_error{message}
52  {}
53 };
54 
55 
56 /// Exception thrown when a task is canceled
58 public:
59  explicit task_canceled(const std::string& task_repr)
60  : transwarp::transwarp_error{"Task canceled: " + task_repr}
61  {}
62 };
63 
64 
65 /// Exception thrown when a task was destroyed prematurely
67 public:
68  explicit task_destroyed(const std::string& task_repr)
69  : transwarp::transwarp_error{"Task destroyed: " + task_repr}
70  {}
71 };
72 
73 
74 /// Exception thrown when an invalid parameter was passed to a function
76 public:
77  explicit invalid_parameter(const std::string& parameter)
78  : transwarp::transwarp_error{"Invalid parameter: " + parameter}
79  {}
80 };
81 
82 
83 /// Exception thrown when a task is used in unintended ways
85 public:
86  explicit control_error(const std::string& message)
87  : transwarp::transwarp_error{"Control error: " + message}
88  {}
89 };
90 
91 
92 /// The root type. Used for tag dispatch
93 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
94 constexpr transwarp::root_type root{}; ///< The root task tag
95 
96 /// The accept type. Used for tag dispatch
97 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
98 constexpr transwarp::accept_type accept{}; ///< The accept task tag
99 
100 /// The accept_any type. Used for tag dispatch
101 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
102 constexpr transwarp::accept_any_type accept_any{}; ///< The accept_any task tag
103 
104 /// The consume type. Used for tag dispatch
105 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
106 constexpr transwarp::consume_type consume{}; ///< The consume task tag
107 
108 /// The consume_any type. Used for tag dispatch
109 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
110 constexpr transwarp::consume_any_type consume_any{}; ///< The consume_any task tag
111 
112 /// The wait type. Used for tag dispatch
113 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
114 constexpr transwarp::wait_type wait{}; ///< The wait task tag
115 
116 /// The wait_any type. Used for tag dispatch
117 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
118 constexpr transwarp::wait_any_type wait_any{}; ///< The wait_any task tag
119 
120 
121 /// Detail namespace for internal functionality only
122 namespace detail {
123 
124 struct visit_visitor;
125 struct unvisit_visitor;
126 struct final_visitor;
127 struct schedule_visitor;
128 struct parent_visitor;
129 
130 } // detail
131 
132 
133 class itask;
134 
135 /// The executor interface used to perform custom task execution
136 class executor {
137 public:
138  virtual ~executor() = default;
139 
140  /// Returns the name of the executor
141  virtual std::string name() const = 0;
142 
143  /// Runs a task which is wrapped by the given functor. The functor only
144  /// captures one shared pointer and can hence be copied at low cost.
145  /// task represents the task that the functor belongs to.
146  /// This function is only ever called on the thread of the caller to schedule().
147  /// The implementer needs to ensure that this never throws exceptions
148  virtual void execute(const std::function<void()>& functor, const transwarp::itask& task) = 0;
149 };
150 
151 
152 /// The task events that can be subscribed to using the listener interface
153 enum class event_type {
154  before_scheduled, ///< Just before a task is scheduled (handle_event called on thread of caller to schedule())
155  before_started, ///< Just before a task starts running (handle_event called on thread that task is run on)
156  before_invoked, ///< Just before a task's functor is invoked (handle_event called on thread that task is run on)
157  after_finished, ///< Just after a task has finished running (handle_event called on thread that task is run on)
158  after_canceled, ///< Just after a task was canceled (handle_event called on thread that task is run on)
159  count,
160 };
161 
162 
163 /// The listener interface to listen to events raised by tasks
164 class listener {
165 public:
166  virtual ~listener() = default;
167 
168  /// This may be called from arbitrary threads depending on the event type (see transwarp::event_type).
169  /// The implementer needs to ensure that this never throws exceptions. The lifetime of the task
170  /// reference is not guaranteed beyond the duration of handle_event, and listeners must not retain
171  /// a copy of the task.
172  virtual void handle_event(transwarp::event_type event, const transwarp::itask& task) = 0;
173 };
174 
175 
176 /// An edge between two tasks
177 class edge {
178 public:
179  edge(const transwarp::itask& parent, const transwarp::itask& child) noexcept
180  : parent_{parent}, child_{child}
181  {}
182 
183  // default copy/move semantics
184  edge(const edge&) = default;
185  edge& operator=(const edge&) = default;
186  edge(edge&&) = default;
187  edge& operator=(edge&&) = default;
188 
189  /// Returns the parent task
190  const transwarp::itask& parent() const noexcept {
191  return parent_;
192  }
193 
194  /// Returns the child task
195  const transwarp::itask& child() const noexcept {
196  return child_;
197  }
198 
199 private:
200  const transwarp::itask& parent_;
201  const transwarp::itask& child_;
202 };
203 
204 
205 class timer;
206 
207 /// An interface for the task class
208 class itask {
209 public:
210  virtual ~itask() = default;
211 
212  virtual void finalize() = 0;
213  virtual std::size_t id() const noexcept = 0;
214  virtual std::size_t level() const noexcept = 0;
215  virtual transwarp::task_type type() const noexcept = 0;
216  virtual const std::optional<std::string>& name() const noexcept = 0;
217  virtual std::shared_ptr<transwarp::executor> executor() const noexcept = 0;
218  virtual std::int64_t priority() const noexcept = 0;
219  virtual const std::any& custom_data() const noexcept = 0;
220  virtual bool canceled() const noexcept = 0;
221  virtual std::int64_t avg_idletime_us() const noexcept = 0;
222  virtual std::int64_t avg_waittime_us() const noexcept = 0;
223  virtual std::int64_t avg_runtime_us() const noexcept = 0;
224  virtual void set_executor(std::shared_ptr<transwarp::executor> executor) = 0;
225  virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
226  virtual void remove_executor() = 0;
227  virtual void remove_executor_all() = 0;
228  virtual void set_priority(std::int64_t priority) = 0;
229  virtual void set_priority_all(std::int64_t priority) = 0;
230  virtual void reset_priority() = 0;
231  virtual void reset_priority_all() = 0;
232  virtual void set_custom_data(std::any custom_data) = 0;
233  virtual void set_custom_data_all(std::any custom_data) = 0;
234  virtual void remove_custom_data() = 0;
235  virtual void remove_custom_data_all() = 0;
236  virtual void add_listener(std::shared_ptr<transwarp::listener> listener) = 0;
237  virtual void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
238  virtual void add_listener_all(std::shared_ptr<transwarp::listener> listener) = 0;
239  virtual void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
240  virtual void remove_listener(const std::shared_ptr<transwarp::listener>& listener) = 0;
241  virtual void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
242  virtual void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) = 0;
243  virtual void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
244  virtual void remove_listeners() = 0;
245  virtual void remove_listeners(transwarp::event_type event) = 0;
246  virtual void remove_listeners_all() = 0;
247  virtual void remove_listeners_all(transwarp::event_type event) = 0;
248  virtual void schedule() = 0;
249  virtual void schedule(transwarp::executor& executor) = 0;
250  virtual void schedule(bool reset) = 0;
251  virtual void schedule(transwarp::executor& executor, bool reset) = 0;
252  virtual void schedule_all() = 0;
253  virtual void schedule_all(transwarp::executor& executor) = 0;
254  virtual void schedule_all(bool reset_all) = 0;
255  virtual void schedule_all(transwarp::executor& executor, bool reset_all) = 0;
256  virtual void set_exception(std::exception_ptr exception) = 0;
257  virtual bool was_scheduled() const noexcept = 0;
258  virtual void wait() const = 0;
259  virtual bool is_ready() const = 0;
260  virtual bool has_result() const = 0;
261  virtual void reset() = 0;
262  virtual void reset_all() = 0;
263  virtual void cancel(bool enabled) noexcept = 0;
264  virtual void cancel_all(bool enabled) noexcept = 0;
265  virtual std::vector<itask*> parents() const = 0;
266  virtual const std::vector<itask*>& tasks() = 0;
267  virtual std::vector<transwarp::edge> edges() = 0;
268 
269 protected:
270  virtual void schedule_impl(bool reset, transwarp::executor* executor=nullptr) = 0;
271 
272 private:
273  friend struct transwarp::detail::visit_visitor;
275  friend struct transwarp::detail::final_visitor;
277  friend struct transwarp::detail::parent_visitor;
278  friend class timer;
279 
280  virtual void visit(const std::function<void(itask&)>& visitor) = 0;
281  virtual void unvisit() noexcept = 0;
282  virtual void set_id(std::size_t id) noexcept = 0;
283  virtual void set_level(std::size_t level) noexcept = 0;
284  virtual void set_type(transwarp::task_type type) noexcept = 0;
285  virtual void set_name(std::optional<std::string> name) noexcept = 0;
286  virtual void set_avg_idletime_us(std::int64_t idletime) noexcept = 0;
287  virtual void set_avg_waittime_us(std::int64_t waittime) noexcept = 0;
288  virtual void set_avg_runtime_us(std::int64_t runtime) noexcept = 0;
289 };
290 
291 
292 /// String conversion for the task_type enumeration
293 inline std::string to_string(const transwarp::task_type& type) {
294  switch (type) {
295  case transwarp::task_type::root: return "root";
296  case transwarp::task_type::accept: return "accept";
297  case transwarp::task_type::accept_any: return "accept_any";
298  case transwarp::task_type::consume: return "consume";
299  case transwarp::task_type::consume_any: return "consume_any";
300  case transwarp::task_type::wait: return "wait";
301  case transwarp::task_type::wait_any: return "wait_any";
302  }
303  throw transwarp::invalid_parameter{"task type"};
304 }
305 
306 
307 /// String conversion for the itask class
308 inline std::string to_string(const transwarp::itask& task, std::string_view separator="\n") {
309  std::string s;
310  s += '"';
311  const std::optional<std::string>& name = task.name();
312  if (name) {
313  s += std::string{"<"} + *name + std::string{">"} + separator.data();
314  }
315  s += transwarp::to_string(task.type());
316  s += std::string{" id="} + std::to_string(task.id());
317  s += std::string{" lev="} + std::to_string(task.level());
318  const std::shared_ptr<transwarp::executor> exec = task.executor();
319  if (exec) {
320  s += separator.data() + std::string{"<"} + exec->name() + std::string{">"};
321  }
322  const std::int64_t avg_idletime_us = task.avg_idletime_us();
323  if (avg_idletime_us >= 0) {
324  s += separator.data() + std::string{"avg-idle-us="} + std::to_string(avg_idletime_us);
325  }
326  const std::int64_t avg_waittime_us = task.avg_waittime_us();
327  if (avg_waittime_us >= 0) {
328  s += separator.data() + std::string{"avg-wait-us="} + std::to_string(avg_waittime_us);
329  }
330  const std::int64_t avg_runtime_us = task.avg_runtime_us();
331  if (avg_runtime_us >= 0) {
332  s += separator.data() + std::string{"avg-run-us="} + std::to_string(avg_runtime_us);
333  }
334  return s + '"';
335 }
336 
337 
338 /// String conversion for the edge class
339 inline std::string to_string(const transwarp::edge& edge, std::string_view separator="\n") {
340  return transwarp::to_string(edge.parent(), separator) + std::string{" -> "} + transwarp::to_string(edge.child(), separator);
341 }
342 
343 
344 /// Creates a dot-style string from the given edges
345 inline std::string to_string(const std::vector<transwarp::edge>& edges, std::string_view separator="\n") {
346  std::string dot = std::string{"digraph {"} + separator.data();
347  for (const transwarp::edge& edge : edges) {
348  dot += transwarp::to_string(edge, separator) + separator.data();
349  }
350  dot += std::string{"}"};
351  return dot;
352 }
353 
354 
355 /// Removes reference and const from a type
356 template<typename T>
357 using decay_t = std::remove_const_t<std::remove_reference_t<T>>;
358 
359 
360 /// Returns the result type of a std::shared_future<T>
361 template<typename T>
362 using result_t = std::result_of_t<decltype(&std::shared_future<T>::get)(std::shared_future<T>)>;
363 
364 
365 /// Detail namespace for internal functionality only
366 namespace detail {
367 
368 /// Clones a task
369 template<typename TaskType>
370 std::shared_ptr<TaskType> clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<TaskType>& t) {
371  const auto original_task = std::dynamic_pointer_cast<transwarp::itask>(t);
372  const auto task_cache_it = task_cache.find(original_task);
373  if (task_cache_it != task_cache.cend()) {
374  return std::dynamic_pointer_cast<TaskType>(task_cache_it->second);
375  } else {
376  auto cloned_task = t->clone_impl(task_cache);
377  task_cache[original_task] = cloned_task;
378  return std::move(cloned_task);
379  }
380 }
381 
382 } // detail
383 
384 
385 /// The task class
386 template<typename ResultType>
387 class task : public transwarp::itask {
388 public:
389  using result_type = ResultType;
390 
391  virtual ~task() = default;
392 
393  std::shared_ptr<task> clone() const {
394  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
395  return clone_impl(task_cache);
396  }
397 
398  virtual void set_value(const transwarp::decay_t<result_type>& value) = 0;
399  virtual void set_value(transwarp::decay_t<result_type>&& value) = 0;
400  virtual const std::shared_future<result_type>& future() const noexcept = 0;
401  virtual transwarp::result_t<result_type> get() const = 0;
402 
403 private:
404  template<typename T>
405  friend std::shared_ptr<T> detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
406 
407  virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
408 };
409 
410 /// The task class (reference result type)
411 template<typename ResultType>
412 class task<ResultType&> : public transwarp::itask {
413 public:
414  using result_type = ResultType&;
415 
416  virtual ~task() = default;
417 
418  std::shared_ptr<task> clone() const {
419  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
420  return clone_impl(task_cache);
421  }
422 
423  virtual void set_value(transwarp::decay_t<result_type>& value) = 0;
424  virtual const std::shared_future<result_type>& future() const noexcept = 0;
425  virtual transwarp::result_t<result_type> get() const = 0;
426 
427 private:
428  template<typename T>
429  friend std::shared_ptr<T> detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
430 
431  virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
432 };
433 
434 /// The task class (void result type)
435 template<>
436 class task<void> : public transwarp::itask {
437 public:
438  using result_type = void;
439 
440  virtual ~task() = default;
441 
442  std::shared_ptr<task> clone() const {
443  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
444  return clone_impl(task_cache);
445  }
446 
447  virtual void set_value() = 0;
448  virtual const std::shared_future<result_type>& future() const noexcept = 0;
449  virtual result_type get() const = 0;
450 
451 private:
452  template<typename T>
453  friend std::shared_ptr<T> detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
454 
455  virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
456 };
457 
458 
459 /// Detail namespace for internal functionality only
460 namespace detail {
461 
462 template<typename F>
463 void assign_task_if(F&, const transwarp::itask&) noexcept;
464 
465 } // detail
466 
467 
468 /// A base class for a user-defined functor that needs access to the associated
469 /// task or a cancel point to stop a task while it's running
470 class functor {
471 public:
472 
473  virtual ~functor() = default;
474 
475 protected:
476 
477  /// The associated task
478  const transwarp::itask& transwarp_task() const noexcept {
479  return *transwarp_task_;
480  }
481 
482  /// If the associated task is canceled then this will throw transwarp::task_canceled
483  /// which will stop the task while it's running
484  void transwarp_cancel_point() const {
485  if (transwarp_task_->canceled()) {
486  throw transwarp::task_canceled(std::to_string(transwarp_task_->id()));
487  }
488  }
489 
490 private:
491  template<typename F>
492  friend void transwarp::detail::assign_task_if(F&, const transwarp::itask&) noexcept;
493 
494  const transwarp::itask* transwarp_task_{};
495 };
496 
497 
498 /// Detail namespace for internal functionality only
499 namespace detail {
500 
501 
502 /// A simple thread pool used to execute tasks in parallel
503 class thread_pool {
504 public:
505 
506  explicit thread_pool(std::size_t n_threads)
507  {
508  if (n_threads == 0) {
509  throw transwarp::invalid_parameter{"number of threads"};
510  }
511  const std::size_t n_target = threads_.size() + n_threads;
512  while (threads_.size() < n_target) {
513  std::thread thread;
514  try {
515  thread = std::thread(&thread_pool::worker, this);
516  } catch (...) {
517  shutdown();
518  throw;
519  }
520  try {
521  threads_.push_back(std::move(thread));
522  } catch (...) {
523  shutdown();
524  thread.join();
525  throw;
526  }
527  }
528  }
529 
530  // delete copy/move semantics
531  thread_pool(const thread_pool&) = delete;
532  thread_pool& operator=(const thread_pool&) = delete;
533  thread_pool(thread_pool&&) = delete;
534  thread_pool& operator=(thread_pool&&) = delete;
535 
536  ~thread_pool() {
537  shutdown();
538  }
539 
540  void push(const std::function<void()>& functor) {
541  {
542  std::lock_guard<std::mutex> lock{mutex_};
543  functors_.push(functor);
544  }
545  cond_var_.notify_one();
546  }
547 
548 private:
549 
550  void worker() {
551  for (;;) {
552  std::function<void()> functor;
553  {
554  std::unique_lock<std::mutex> lock{mutex_};
555  cond_var_.wait(lock, [this]{
556  return done_ || !functors_.empty();
557  });
558  if (done_ && functors_.empty()) {
559  break;
560  }
561  functor = functors_.front();
562  functors_.pop();
563  }
564  functor();
565  }
566  }
567 
568  void shutdown() {
569  {
570  std::lock_guard<std::mutex> lock{mutex_};
571  done_ = true;
572  }
573  cond_var_.notify_all();
574  for (std::thread& thread : threads_) {
575  thread.join();
576  }
577  threads_.clear();
578  }
579 
580  bool done_ = false;
581  std::vector<std::thread> threads_;
582  std::queue<std::function<void()>> functors_;
583  std::condition_variable cond_var_;
584  std::mutex mutex_;
585 };
586 
587 
588 /// Applies the functor to each element in the tuple
589 template<typename Functor, typename Tuple>
590 void apply_to_each(Functor&& f, Tuple&& t) {
591  std::apply([&f](auto&&... arg){(..., f(std::forward<decltype(arg)>(arg)));}, std::forward<Tuple>(t));
592 }
593 
594 
595 template<int offset, typename... ParentResults>
597  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
598  std::get<offset>(target) = std::get<offset>(source)->future();
600  }
601 };
602 
603 template<typename... ParentResults>
604 struct assign_futures_impl<-1, ParentResults...> {
605  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&, std::tuple<std::shared_future<ParentResults>...>&) {}
606 };
607 
608 /// Returns the futures from the given tuple of tasks
609 template<typename... ParentResults>
610 std::tuple<std::shared_future<ParentResults>...> get_futures(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& input) {
611  std::tuple<std::shared_future<ParentResults>...> result;
612  assign_futures_impl<static_cast<int>(sizeof...(ParentResults)) - 1, ParentResults...>::work(input, result);
613  return result;
614 }
615 
616 /// Returns the futures from the given vector of tasks
617 template<typename ParentResultType>
618 std::vector<std::shared_future<ParentResultType>> get_futures(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& input) {
619  std::vector<std::shared_future<ParentResultType>> result;
620  result.reserve(input.size());
621  for (const std::shared_ptr<transwarp::task<ParentResultType>>& task : input) {
622  result.emplace_back(task->future());
623  }
624  return result;
625 }
626 
627 
628 /// Runs the task with the given arguments, hence, invoking the task's functor
629 template<typename Result, typename Task, typename... Args>
630 Result run_task(std::size_t task_id, const std::weak_ptr<Task>& task, Args&&... args) {
631  const std::shared_ptr<Task> t = task.lock();
632  if (!t) {
633  throw transwarp::task_destroyed{std::to_string(task_id)};
634  }
635  if (t->canceled()) {
636  throw transwarp::task_canceled{std::to_string(task_id)};
637  }
639  return (*t->functor_)(std::forward<Args>(args)...);
640 }
641 
642 
643 /// Waits for all parents to finish
644 template<typename... ParentResults>
645 void wait_for_all(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
646  transwarp::detail::apply_to_each([](auto& p){ p->future().wait(); }, parents);
647 }
648 
649 
650 /// Waits for all parents to finish
651 template<typename ParentResultType>
652 void wait_for_all(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
653  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
654  parent->future().wait();
655  }
656 }
657 
658 
659 template<typename Parent>
660 Parent wait_for_any_impl() {
661  return {};
662 }
663 
664 template<typename Parent, typename ParentResult, typename... ParentResults>
665 Parent wait_for_any_impl(const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
666  const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
667  if (status == std::future_status::ready) {
668  return parent;
669  }
670  return transwarp::detail::wait_for_any_impl<Parent>(parents...);
671 }
672 
673 /// Waits for the first parent to finish
674 template<typename Parent, typename... ParentResults>
675 Parent wait_for_any(const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
676  for (;;) {
677  Parent parent = transwarp::detail::wait_for_any_impl<Parent>(parents...);
678  if (parent) {
679  return parent;
680  }
681  }
682 }
683 
684 
685 /// Waits for the first parent to finish
686 template<typename ParentResultType>
687 std::shared_ptr<transwarp::task<ParentResultType>> wait_for_any(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
688  for (;;) {
689  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
690  const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
691  if (status == std::future_status::ready) {
692  return parent;
693  }
694  }
695  }
696 }
697 
698 
699 /// Cancels all tasks but one
700 template<typename OneResult, typename... ParentResults>
701 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
702  auto callable = [&one](auto& parent) {
703  if (one != parent) {
704  parent->cancel(true);
705  }
706  };
708 }
709 
710 
711 /// Cancels all tasks but one
712 template<typename OneResult, typename ParentResultType>
713 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
714  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
715  if (one != parent) {
716  parent->cancel(true);
717  }
718  }
719 }
720 
721 
722 template<typename TaskType, bool done, int total, int... n>
723 struct call_impl {
724  template<typename Result, typename Task, typename... ParentResults>
725  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
726  return call_impl<TaskType, total == 1 + static_cast<int>(sizeof...(n)), total, n..., static_cast<int>(sizeof...(n))>::template
727  work<Result>(task_id, task, parents);
728  }
729 };
730 
731 template<typename TaskType>
733 
734 template<int total, int... n>
735 struct call_impl<transwarp::root_type, true, total, n...> {
736  template<typename Result, typename Task, typename... ParentResults>
737  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&) {
738  return transwarp::detail::run_task<Result>(task_id, task);
739  }
740 };
741 
742 template<>
743 struct call_impl_vector<transwarp::root_type> {
744  template<typename Result, typename Task, typename ParentResultType>
745  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>&) {
746  return transwarp::detail::run_task<Result>(task_id, task);
747  }
748 };
749 
750 template<int total, int... n>
751 struct call_impl<transwarp::accept_type, true, total, n...> {
752  template<typename Result, typename Task, typename... ParentResults>
753  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
755  const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
756  return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures)...);
757  }
758 };
759 
760 template<>
761 struct call_impl_vector<transwarp::accept_type> {
762  template<typename Result, typename Task, typename ParentResultType>
763  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
765  return transwarp::detail::run_task<Result>(task_id, task, transwarp::detail::get_futures(parents));
766  }
767 };
768 
769 template<int total, int... n>
770 struct call_impl<transwarp::accept_any_type, true, total, n...> {
771  template<typename Result, typename Task, typename... ParentResults>
772  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
773  using parent_t = std::remove_reference_t<decltype(std::get<0>(parents))>; // Use first type as reference
774  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
776  return transwarp::detail::run_task<Result>(task_id, task, parent->future());
777  }
778 };
779 
780 template<>
781 struct call_impl_vector<transwarp::accept_any_type> {
782  template<typename Result, typename Task, typename ParentResultType>
783  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
784  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
786  return transwarp::detail::run_task<Result>(task_id, task, parent->future());
787  }
788 };
789 
790 template<int total, int... n>
791 struct call_impl<transwarp::consume_type, true, total, n...> {
792  template<typename Result, typename Task, typename... ParentResults>
793  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
795  return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(parents)->future().get()...);
796  }
797 };
798 
799 template<>
800 struct call_impl_vector<transwarp::consume_type> {
801  template<typename Result, typename Task, typename ParentResultType>
802  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
804  std::vector<ParentResultType> results;
805  results.reserve(parents.size());
806  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
807  results.emplace_back(parent->future().get());
808  }
809  return transwarp::detail::run_task<Result>(task_id, task, std::move(results));
810  }
811 };
812 
813 template<int total, int... n>
814 struct call_impl<transwarp::consume_any_type, true, total, n...> {
815  template<typename Result, typename Task, typename... ParentResults>
816  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
817  using parent_t = std::remove_reference_t<decltype(std::get<0>(parents))>; /// Use first type as reference
818  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
820  return transwarp::detail::run_task<Result>(task_id, task, parent->future().get());
821  }
822 };
823 
824 template<>
825 struct call_impl_vector<transwarp::consume_any_type> {
826  template<typename Result, typename Task, typename ParentResultType>
827  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
828  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
830  return transwarp::detail::run_task<Result>(task_id, task, parent->future().get());
831  }
832 };
833 
834 template<int total, int... n>
835 struct call_impl<transwarp::wait_type, true, total, n...> {
836  template<typename Result, typename Task, typename... ParentResults>
837  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
839  transwarp::detail::apply_to_each([](auto& p){ p->future().get(); }, parents); // Ensures that exceptions are propagated
840  return transwarp::detail::run_task<Result>(task_id, task);
841  }
842 };
843 
844 template<>
845 struct call_impl_vector<transwarp::wait_type> {
846  template<typename Result, typename Task, typename ParentResultType>
847  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
849  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
850  parent->future().get(); // Ensures that exceptions are propagated
851  }
852  return transwarp::detail::run_task<Result>(task_id, task);
853  }
854 };
855 
856 template<int total, int... n>
857 struct call_impl<transwarp::wait_any_type, true, total, n...> {
858  template<typename Result, typename Task, typename... ParentResults>
859  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
860  using parent_t = std::remove_reference_t<decltype(std::get<0>(parents))>; // Use first type as reference
861  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
863  parent->future().get(); // Ensures that exceptions are propagated
864  return transwarp::detail::run_task<Result>(task_id, task);
865  }
866 };
867 
868 template<>
869 struct call_impl_vector<transwarp::wait_any_type> {
870  template<typename Result, typename Task, typename ParentResultType>
871  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
872  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
874  parent->future().get(); // Ensures that exceptions are propagated
875  return transwarp::detail::run_task<Result>(task_id, task);
876  }
877 };
878 
879 /// Calls the functor of the given task with the results from the tuple of parents.
880 /// Throws transwarp::task_canceled if the task is canceled.
881 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
882 template<typename TaskType, typename Result, typename Task, typename... ParentResults>
883 Result call(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
884  constexpr std::size_t n = std::tuple_size_v<std::tuple<std::shared_future<ParentResults>...>>;
886  work<Result>(task_id, task, parents);
887 }
888 
889 /// Calls the functor of the given task with the results from the vector of parents.
890 /// Throws transwarp::task_canceled if the task is canceled.
891 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
892 template<typename TaskType, typename Result, typename Task, typename ParentResultType>
893 Result call(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
895  work<Result>(task_id, task, parents);
896 }
897 
898 
899 /// Calls the functor with every element in the tuple
900 template<typename Functor, typename... ParentResults>
901 void call_with_each(const Functor& f, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& t) {
902  auto callable = [&f](const auto& task) {
903  if (!task) {
904  throw transwarp::invalid_parameter{"task pointer"};
905  }
906  f(*task);
907  };
909 }
910 
911 /// Calls the functor with every element in the vector
912 template<typename Functor, typename ParentResultType>
913 void call_with_each(const Functor& f, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& v) {
914  for (const std::shared_ptr<transwarp::task<ParentResultType>>& ptr : v) {
915  if (!ptr) {
916  throw transwarp::invalid_parameter{"task pointer"};
917  }
918  f(*ptr);
919  }
920 }
921 
922 
923 /// Sets level of a task
925  explicit parent_visitor(transwarp::itask& task) noexcept
926  : task_(task) {}
927 
928  void operator()(const transwarp::itask& task) const {
929  if (task_.level() <= task.level()) {
930  /// A child's level is always larger than any of its parents' levels
931  task_.set_level(task.level() + 1);
932  }
933  }
934 
935  transwarp::itask& task_;
936 };
937 
938 /// Applies final bookkeeping to the task and collects the task
940  explicit final_visitor(std::vector<transwarp::itask*>& tasks) noexcept
941  : tasks_{tasks} {}
942 
943  void operator()(transwarp::itask& task) noexcept {
944  tasks_.push_back(&task);
945  task.set_id(id_++);
946  }
947 
948  std::vector<transwarp::itask*>& tasks_;
949  std::size_t id_ = 0;
950 };
951 
952 /// Generates edges
954  explicit edges_visitor(std::vector<transwarp::edge>& edges) noexcept
955  : edges_{edges} {}
956 
957  void operator()(const transwarp::itask& task) {
958  for (const transwarp::itask* parent : task.parents()) {
959  edges_.emplace_back(static_cast<const transwarp::itask&>(*parent), task);
960  }
961  }
962 
963  std::vector<transwarp::edge>& edges_;
964 };
965 
966 /// Schedules using the given executor
968  schedule_visitor(bool reset, transwarp::executor* executor) noexcept
969  : reset_{reset}, executor_{executor} {}
970 
971  void operator()(transwarp::itask& task) {
972  task.schedule_impl(reset_, executor_);
973  }
974 
975  bool reset_;
976  transwarp::executor* executor_;
977 };
978 
979 /// Resets the given task
981 
982  void operator()(transwarp::itask& task) const {
983  task.reset();
984  }
985 };
986 
987 /// Cancels or resumes the given task
989  explicit cancel_visitor(bool enabled) noexcept
990  : enabled_{enabled} {}
991 
992  void operator()(transwarp::itask& task) const noexcept {
993  task.cancel(enabled_);
994  }
995 
996  bool enabled_;
997 };
998 
999 /// Assigns an executor to the given task
1001  explicit set_executor_visitor(std::shared_ptr<transwarp::executor> executor) noexcept
1002  : executor_{std::move(executor)} {}
1003 
1004  void operator()(transwarp::itask& task) const noexcept {
1005  task.set_executor(executor_);
1006  }
1007 
1008  std::shared_ptr<transwarp::executor> executor_;
1009 };
1010 
1011 /// Removes the executor from the given task
1013 
1014  void operator()(transwarp::itask& task) const noexcept {
1015  task.remove_executor();
1016  }
1017 };
1018 
1019 /// Assigns a priority to the given task
1021  explicit set_priority_visitor(std::int64_t priority) noexcept
1022  : priority_{priority} {}
1023 
1024  void operator()(transwarp::itask& task) const noexcept {
1025  task.set_priority(priority_);
1026  }
1027 
1028  std::int64_t priority_;
1029 };
1030 
1031 /// Resets the priority of the given task
1033 
1034  void operator()(transwarp::itask& task) const noexcept {
1035  task.reset_priority();
1036  }
1037 };
1038 
1039 /// Assigns custom data to the given task
1041  explicit set_custom_data_visitor(std::any custom_data) noexcept
1042  : custom_data_{std::move(custom_data)} {}
1043 
1044  void operator()(transwarp::itask& task) const noexcept {
1045  task.set_custom_data(custom_data_);
1046  }
1047 
1048  std::any custom_data_;
1049 };
1050 
1051 /// Removes custom data from the given task
1053 
1054  void operator()(transwarp::itask& task) const noexcept {
1055  task.remove_custom_data();
1056  }
1057 };
1058 
1059 /// Pushes the given task into the vector of tasks
1061  explicit push_task_visitor(std::vector<transwarp::itask*>& tasks)
1062  : tasks_{tasks} {}
1063 
1064  void operator()(transwarp::itask& task) {
1065  tasks_.push_back(&task);
1066  }
1067 
1068  std::vector<transwarp::itask*>& tasks_;
1069 };
1070 
1071 /// Adds a new listener to the given task
1073  explicit add_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1074  : listener_{std::move(listener)}
1075  {}
1076 
1077  void operator()(transwarp::itask& task) {
1078  task.add_listener(listener_);
1079  }
1080 
1081  std::shared_ptr<transwarp::listener> listener_;
1082 };
1083 
1084 /// Adds a new listener per event type to the given task
1086  add_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1087  : event_{event}, listener_{std::move(listener)}
1088  {}
1089 
1090  void operator()(transwarp::itask& task) {
1091  task.add_listener(event_, listener_);
1092  }
1093 
1094  transwarp::event_type event_;
1095  std::shared_ptr<transwarp::listener> listener_;
1096 };
1097 
1098 /// Removes a listener from the given task
1100  explicit remove_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1101  : listener_{std::move(listener)}
1102  {}
1103 
1104  void operator()(transwarp::itask& task) {
1105  task.remove_listener(listener_);
1106  }
1107 
1108  std::shared_ptr<transwarp::listener> listener_;
1109 };
1110 
1111 /// Removes a listener per event type from the given task
1113  remove_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1114  : event_{event}, listener_{std::move(listener)}
1115  {}
1116 
1117  void operator()(transwarp::itask& task) {
1118  task.remove_listener(event_, listener_);
1119  }
1120 
1121  transwarp::event_type event_;
1122  std::shared_ptr<transwarp::listener> listener_;
1123 };
1124 
1125 /// Removes all listeners from the given task
1127 
1128  void operator()(transwarp::itask& task) {
1129  task.remove_listeners();
1130  }
1131 
1132 };
1133 
1134 /// Removes all listeners per event type from the given task
1137  : event_{event}
1138  {}
1139 
1140  void operator()(transwarp::itask& task) {
1141  task.remove_listeners(event_);
1142  }
1143 
1144  transwarp::event_type event_;
1145 };
1146 
1147 /// Visits the given task using the visitor given in the constructor
1149  explicit visit_visitor(const std::function<void(transwarp::itask&)>& visitor) noexcept
1150  : visitor_{visitor} {}
1151 
1152  void operator()(transwarp::itask& task) const {
1153  task.visit(visitor_);
1154  }
1155 
1156  const std::function<void(transwarp::itask&)>& visitor_;
1157 };
1158 
1159 /// Unvisits the given task
1161 
1162  void operator()(transwarp::itask& task) const noexcept {
1163  task.unvisit();
1164  }
1165 };
1166 
1167 /// Determines the result type of the Functor dispatching on the task type
1168 template<typename TaskType, typename Functor, typename... ParentResults>
1170  static_assert(std::is_same_v<TaskType, transwarp::root_type> ||
1171  std::is_same_v<TaskType, transwarp::accept_type> ||
1172  std::is_same_v<TaskType, transwarp::accept_any_type> ||
1173  std::is_same_v<TaskType, transwarp::consume_type> ||
1174  std::is_same_v<TaskType, transwarp::consume_any_type> ||
1175  std::is_same_v<TaskType, transwarp::wait_type> ||
1176  std::is_same_v<TaskType, transwarp::wait_any_type>,
1177  "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1178 };
1179 
1180 template<typename Functor, typename... ParentResults>
1181 struct functor_result<transwarp::root_type, Functor, ParentResults...> {
1182  static_assert(sizeof...(ParentResults) == 0, "A root task cannot have parent tasks");
1183  using type = decltype(std::declval<Functor>()());
1184 };
1185 
1186 template<typename Functor, typename... ParentResults>
1187 struct functor_result<transwarp::accept_type, Functor, ParentResults...> {
1188  static_assert(sizeof...(ParentResults) > 0, "An accept task must have at least one parent");
1189  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1190 };
1191 
1192 template<typename Functor, typename ParentResultType>
1193 struct functor_result<transwarp::accept_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1194  using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1195 };
1196 
1197 template<typename Functor, typename... ParentResults>
1198 struct functor_result<transwarp::accept_any_type, Functor, ParentResults...> {
1199  static_assert(sizeof...(ParentResults) > 0, "An accept_any task must have at least one parent");
1200  using arg_t = std::tuple_element_t<0, std::tuple<ParentResults...>>; // Using first type as reference
1201  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1202 };
1203 
1204 template<typename Functor, typename ParentResultType>
1205 struct functor_result<transwarp::accept_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1206  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1207 };
1208 
1209 template<typename Functor, typename... ParentResults>
1210 struct functor_result<transwarp::consume_type, Functor, ParentResults...> {
1211  static_assert(sizeof...(ParentResults) > 0, "A consume task must have at least one parent");
1212  using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1213 };
1214 
1215 template<typename Functor, typename ParentResultType>
1216 struct functor_result<transwarp::consume_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1217  using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1218 };
1219 
1220 template<typename Functor, typename... ParentResults>
1221 struct functor_result<transwarp::consume_any_type, Functor, ParentResults...> {
1222  static_assert(sizeof...(ParentResults) > 0, "A consume_any task must have at least one parent");
1223  using arg_t = std::tuple_element_t<0, std::tuple<ParentResults...>>; // Using first type as reference
1224  using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1225 };
1226 
1227 template<typename Functor, typename ParentResultType>
1228 struct functor_result<transwarp::consume_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1229  using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1230 };
1231 
1232 template<typename Functor, typename... ParentResults>
1233 struct functor_result<transwarp::wait_type, Functor, ParentResults...> {
1234  static_assert(sizeof...(ParentResults) > 0, "A wait task must have at least one parent");
1235  using type = decltype(std::declval<Functor>()());
1236 };
1237 
1238 template<typename Functor, typename ParentResultType>
1239 struct functor_result<transwarp::wait_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1240  using type = decltype(std::declval<Functor>()());
1241 };
1242 
1243 template<typename Functor, typename... ParentResults>
1244 struct functor_result<transwarp::wait_any_type, Functor, ParentResults...> {
1245  static_assert(sizeof...(ParentResults) > 0, "A wait_any task must have at least one parent");
1246  using type = decltype(std::declval<Functor>()());
1247 };
1248 
1249 template<typename Functor, typename ParentResultType>
1250 struct functor_result<transwarp::wait_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1251  using type = decltype(std::declval<Functor>()());
1252 };
1253 
1254 /// Determines the result type of the Functor dispatching on the task type
1255 template<typename TaskType, typename Functor, typename... ParentResults>
1256 using functor_result_t = typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type;
1257 
1258 
1259 /// Assigns the task to the given functor if the functor is a subclass of transwarp::functor
1260 template<typename Functor>
1261 void assign_task_if(Functor& functor, const transwarp::itask& task) noexcept {
1262  if constexpr (std::is_base_of_v<transwarp::functor, Functor>) {
1263  functor.transwarp_task_ = &task;
1264  }
1265 }
1266 
1267 
1268 /// Returns a ready future with the given value as its state
1269 template<typename ResultType, typename Value>
1270 std::shared_future<ResultType> make_future_with_value(Value&& value) {
1271  std::promise<ResultType> promise;
1272  promise.set_value(std::forward<Value>(value));
1273  return promise.get_future();
1274 }
1275 
1276 /// Returns a ready future
1277 inline std::shared_future<void> make_ready_future() {
1278  std::promise<void> promise;
1279  promise.set_value();
1280  return promise.get_future();
1281 }
1282 
1283 /// Returns a ready future with the given exception as its state
1284 template<typename ResultType>
1285 std::shared_future<ResultType> make_future_with_exception(std::exception_ptr exception) {
1286  if (!exception) {
1287  throw transwarp::invalid_parameter{"exception pointer"};
1288  }
1289  std::promise<ResultType> promise;
1290  promise.set_exception(exception);
1291  return promise.get_future();
1292 }
1293 
1294 
1295 /// Determines the type of the parents
1296 template<typename... ParentResults>
1297 struct parents {
1298  using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1299  static std::size_t size(const type&) {
1300  return std::tuple_size_v<type>;
1301  }
1302  static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1303  type cloned = obj;
1305  [&task_cache](auto& t) {
1306  t = detail::clone_task(task_cache, t);
1307  }, cloned);
1308  return cloned;
1309  }
1310  static std::vector<transwarp::itask*> tasks(const type& parents) {
1311  std::vector<transwarp::itask*> tasks;
1313  [&tasks](auto& t) {
1314  tasks.push_back(t.get());
1315  }, parents);
1316  return tasks;
1317  }
1318 };
1319 
1320 /// Determines the type of the parents. Specialization for vector parents
1321 template<typename ParentResultType>
1322 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1323  using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1324  static std::size_t size(const type& obj) {
1325  return obj.size();
1326  }
1327  static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1328  type cloned = obj;
1329  for (auto& t : cloned) {
1330  t = detail::clone_task(task_cache, t);
1331  }
1332  return cloned;
1333  }
1334  static std::vector<transwarp::itask*> tasks(const type& parents) {
1335  std::vector<transwarp::itask*> tasks;
1336  for (auto& t : parents) {
1337  tasks.push_back(t.get());
1338  }
1339  return tasks;
1340  }
1341 };
1342 
1343 /// Determines the type of the parents
1344 template<typename... ParentResults>
1345 using parents_t = typename transwarp::detail::parents<ParentResults...>::type;
1346 
1347 
1348 template<typename ResultType, typename TaskType>
1350 protected:
1351 
1352  template<typename Task, typename Parents>
1353  void call(std::size_t task_id,
1354  const std::weak_ptr<Task>& task,
1355  const Parents& parents) {
1356  promise_.set_value(transwarp::detail::call<TaskType, ResultType>(task_id, task, parents));
1357  }
1358 
1359  std::promise<ResultType> promise_;
1360 };
1361 
1362 template<typename TaskType>
1363 class base_runner<void, TaskType> {
1364 protected:
1365 
1366  template<typename Task, typename Parents>
1367  void call(std::size_t task_id,
1368  const std::weak_ptr<Task>& task,
1369  const Parents& parents) {
1370  transwarp::detail::call<TaskType, void>(task_id, task, parents);
1371  promise_.set_value();
1372  }
1373 
1374  std::promise<void> promise_;
1375 };
1376 
1377 /// A callable to run a task given its parents
1378 template<typename ResultType, typename TaskType, typename Task, typename Parents>
1379 class runner : public transwarp::detail::base_runner<ResultType, TaskType> {
1380 public:
1381 
1382  runner(std::size_t task_id,
1383  const std::weak_ptr<Task>& task,
1385  : task_id_{task_id},
1386  task_{task},
1387  parents_{parents}
1388  {}
1389 
1390  std::future<ResultType> future() {
1391  return this->promise_.get_future();
1392  }
1393 
1394  void operator()() {
1395  if (const std::shared_ptr<Task> t = task_.lock()) {
1396  t->raise_event(transwarp::event_type::before_started);
1397  }
1398  try {
1399  this->call(task_id_, task_, parents_);
1400  } catch (const transwarp::task_canceled&) {
1401  this->promise_.set_exception(std::current_exception());
1402  if (const std::shared_ptr<Task> t = task_.lock()) {
1403  t->raise_event(transwarp::event_type::after_canceled);
1404  }
1405  } catch (...) {
1406  this->promise_.set_exception(std::current_exception());
1407  }
1408  if (const std::shared_ptr<Task> t = task_.lock()) {
1409  t->raise_event(transwarp::event_type::after_finished);
1410  }
1411  }
1412 
1413 private:
1414  const std::size_t task_id_;
1415  const std::weak_ptr<Task> task_;
1416  const transwarp::decay_t<Parents> parents_;
1417 };
1418 
1419 
1420 /// A simple circular buffer (FIFO).
1421 /// ValueType must support default construction. The buffer lets you push
1422 /// new values onto the back and pop old values off the front.
1423 template<typename ValueType>
1425 public:
1426 
1427  static_assert(std::is_default_constructible_v<ValueType>, "ValueType must be default constructible");
1428 
1429  using value_type = ValueType;
1430 
1431  /// Constructs a circular buffer with a given fixed capacity
1432  explicit
1433  circular_buffer(std::size_t capacity)
1434  : data_(capacity)
1435  {
1436  if (capacity < 1) {
1437  throw transwarp::invalid_parameter{"capacity"};
1438  }
1439  }
1440 
1441  // delete copy/move semantics
1442  circular_buffer(const circular_buffer&) = delete;
1443  circular_buffer& operator=(const circular_buffer&) = delete;
1444  circular_buffer(circular_buffer&& other) = delete;
1445  circular_buffer& operator=(circular_buffer&&) = delete;
1446 
1447  /// Pushes a new value onto the end of the buffer. If that exceeds the capacity
1448  /// of the buffer then the oldest value gets dropped (the one at the front).
1449  template<typename T, typename = std::enable_if_t<std::is_same_v<std::decay_t<T>, value_type>>>
1450  void push(T&& value) {
1451  data_[end_] = std::forward<T>(value);
1452  increment();
1453  }
1454 
1455  /// Returns the value at the front of the buffer (the oldest value).
1456  /// This is undefined if the buffer is empty
1457  const value_type& front() const {
1458  return data_[front_];
1459  }
1460 
1461  /// Removes the value at the front of the buffer (the oldest value)
1462  void pop() {
1463  if (!empty()) {
1464  data_[front_] = ValueType{};
1465  decrement();
1466  }
1467  }
1468 
1469  /// Returns the capacity of the buffer
1470  std::size_t capacity() const {
1471  return data_.size();
1472  }
1473 
1474  /// Returns the number of populated values of the buffer. Its maximum value
1475  /// equals the capacity of the buffer
1476  std::size_t size() const {
1477  return size_;
1478  }
1479 
1480  /// Returns whether the buffer is empty
1481  bool empty() const {
1482  return size_ == 0;
1483  }
1484 
1485  /// Returns whether the buffer is full
1486  bool full() const {
1487  return size_ == data_.size();
1488  }
1489 
1490  /// Swaps this buffer with the given buffer
1491  void swap(circular_buffer& buffer) {
1492  std::swap(end_, buffer.end_);
1493  std::swap(front_, buffer.front_);
1494  std::swap(size_, buffer.size_);
1495  std::swap(data_, buffer.data_);
1496  }
1497 
1498 private:
1499 
1500  void increment_or_wrap(std::size_t& value) const {
1501  if (value == data_.size() - 1) {
1502  value = 0;
1503  } else {
1504  ++value;
1505  }
1506  }
1507 
1508  void increment() {
1509  increment_or_wrap(end_);
1510  if (full()) {
1511  increment_or_wrap(front_);
1512  } else {
1513  ++size_;
1514  }
1515  }
1516 
1517  void decrement() {
1518  increment_or_wrap(front_);
1519  --size_;
1520  }
1521 
1522  std::size_t end_{};
1523  std::size_t front_{};
1524  std::size_t size_{};
1525  std::vector<value_type> data_;
1526 };
1527 
1528 
1529 class spinlock {
1530 public:
1531 
1532  void lock() noexcept {
1533  while (locked_.test_and_set(std::memory_order_acquire));
1534  }
1535 
1536  void unlock() noexcept {
1537  locked_.clear(std::memory_order_release);
1538  }
1539 
1540 private:
1541  std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1542 };
1543 
1544 
1545 } // detail
1546 
1547 
1548 /// A functor not doing nothing
1550  void operator()() const noexcept {}
1551 };
1552 
1553 /// An object to use in places where a no-op functor is required
1554 constexpr no_op_functor no_op{};
1555 
1556 
1557 /// Executor for sequential execution. Runs functors sequentially on the same thread
1559 public:
1560 
1561  sequential() = default;
1562 
1563  // delete copy/move semantics
1564  sequential(const sequential&) = delete;
1565  sequential& operator=(const sequential&) = delete;
1566  sequential(sequential&&) = delete;
1567  sequential& operator=(sequential&&) = delete;
1568 
1569  /// Returns the name of the executor
1570  std::string name() const override {
1571  return "transwarp::sequential";
1572  }
1573 
1574  /// Runs the functor on the current thread
1575  void execute(const std::function<void()>& functor, const transwarp::itask&) override {
1576  functor();
1577  }
1578 };
1579 
1580 
1581 /// Executor for parallel execution. Uses a simple thread pool
1583 public:
1584 
1585  explicit parallel(std::size_t n_threads)
1586  : pool_{n_threads}
1587  {}
1588 
1589  // delete copy/move semantics
1590  parallel(const parallel&) = delete;
1591  parallel& operator=(const parallel&) = delete;
1592  parallel(parallel&&) = delete;
1593  parallel& operator=(parallel&&) = delete;
1594 
1595  /// Returns the name of the executor
1596  std::string name() const override {
1597  return "transwarp::parallel";
1598  }
1599 
1600  /// Pushes the functor into the thread pool for asynchronous execution
1601  void execute(const std::function<void()>& functor, const transwarp::itask&) override {
1602  pool_.push(functor);
1603  }
1604 
1605 private:
1607 };
1608 
1609 
1610 /// Detail namespace for internal functionality only
1611 namespace detail {
1612 
1613 /// The base task class that contains the functionality that can be used
1614 /// with all result types (void and non-void).
1615 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
1616 class task_impl_base : public transwarp::task<ResultType>,
1617  public std::enable_shared_from_this<task_impl_base<ResultType, TaskType, Functor, ParentResults...>> {
1618 public:
1619  /// The task type
1620  using task_type = TaskType;
1621 
1622  /// The result type of this task
1623  using result_type = ResultType;
1624 
1625  /// Can be called to explicitly finalize this task making this task
1626  /// the terminal task in the graph. This is also done implicitly when
1627  /// calling, e.g., any of the *_all methods. It should normally not be
1628  /// necessary to call this method directly
1629  void finalize() override {
1630  if (tasks_.empty()) {
1631  visit(transwarp::detail::final_visitor{tasks_});
1632  unvisit();
1633  auto compare = [](const transwarp::itask* const l, const transwarp::itask* const r) {
1634  const std::size_t l_level = l->level();
1635  const std::size_t l_id = l->id();
1636  const std::size_t r_level = r->level();
1637  const std::size_t r_id = r->id();
1638  return std::tie(l_level, l_id) < std::tie(r_level, r_id);
1639  };
1640  std::sort(tasks_.begin(), tasks_.end(), compare);
1641  }
1642  }
1643 
1644  /// The task's id
1645  std::size_t id() const noexcept override {
1646  return id_;
1647  }
1648 
1649  /// The task's level
1650  std::size_t level() const noexcept override {
1651  return level_;
1652  }
1653 
1654  /// The task's type
1655  transwarp::task_type type() const noexcept override {
1656  return type_;
1657  }
1658 
1659  /// The optional task name
1660  const std::optional<std::string>& name() const noexcept {
1661  return name_;
1662  }
1663 
1664  /// The task's executor (may be null)
1665  std::shared_ptr<transwarp::executor> executor() const noexcept override {
1666  return executor_;
1667  }
1668 
1669  /// The task priority (defaults to 0)
1670  std::int64_t priority() const noexcept override {
1671  return priority_;
1672  }
1673 
1674  /// The custom task data (may not hold a value)
1675  const std::any& custom_data() const noexcept override {
1676  return custom_data_;
1677  }
1678 
1679  /// Returns whether the associated task is canceled
1680  bool canceled() const noexcept override {
1681  return canceled_.load();
1682  }
1683 
1684  /// Returns the average idletime in microseconds (-1 if never set)
1685  std::int64_t avg_idletime_us() const noexcept override {
1686  return avg_idletime_us_.load();
1687  }
1688 
1689  /// Returns the average waittime in microseconds (-1 if never set)
1690  std::int64_t avg_waittime_us() const noexcept override {
1691  return avg_waittime_us_.load();
1692  }
1693 
1694  /// Returns the average runtime in microseconds (-1 if never set)
1695  std::int64_t avg_runtime_us() const noexcept override {
1696  return avg_runtime_us_.load();
1697  }
1698 
1699  /// Assigns an executor to this task which takes precedence over
1700  /// the executor provided in schedule() or schedule_all()
1701  void set_executor(std::shared_ptr<transwarp::executor> executor) override {
1702  ensure_task_not_running();
1703  if (!executor) {
1704  throw transwarp::invalid_parameter("executor pointer");
1705  }
1706  executor_ = std::move(executor);
1707  }
1708 
1709  /// Assigns an executor to all tasks which takes precedence over
1710  /// the executor provided in schedule() or schedule_all()
1711  void set_executor_all(std::shared_ptr<transwarp::executor> executor) override {
1712  ensure_task_not_running();
1713  transwarp::detail::set_executor_visitor visitor{std::move(executor)};
1714  visit_all(visitor);
1715  }
1716 
1717  /// Removes the executor from this task
1718  void remove_executor() override {
1719  ensure_task_not_running();
1720  executor_.reset();
1721  }
1722 
1723  /// Removes the executor from all tasks
1724  void remove_executor_all() override {
1725  ensure_task_not_running();
1727  visit_all(visitor);
1728  }
1729 
1730  /// Sets a task priority (defaults to 0). transwarp will not directly use this.
1731  /// This is only useful if something else is using the priority (e.g. a custom executor)
1732  void set_priority(std::int64_t priority) override {
1733  ensure_task_not_running();
1734  priority_ = priority;
1735  }
1736 
1737  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
1738  /// This is only useful if something else is using the priority (e.g. a custom executor)
1739  void set_priority_all(std::int64_t priority) override {
1740  ensure_task_not_running();
1741  transwarp::detail::set_priority_visitor visitor{priority};
1742  visit_all(visitor);
1743  }
1744 
1745  /// Resets the task priority to 0
1746  void reset_priority() override {
1747  ensure_task_not_running();
1748  priority_ = 0;
1749  }
1750 
1751  /// Resets the priority of all tasks to 0
1752  void reset_priority_all() override {
1753  ensure_task_not_running();
1755  visit_all(visitor);
1756  }
1757 
1758  /// Assigns custom data to this task. transwarp will not directly use this.
1759  /// This is only useful if something else is using this custom data (e.g. a custom executor)
1760  void set_custom_data(std::any custom_data) override {
1761  ensure_task_not_running();
1762  if (!custom_data.has_value()) {
1763  throw transwarp::invalid_parameter{"custom data"};
1764  }
1765  custom_data_ = std::move(custom_data);
1766  }
1767 
1768  /// Assigns custom data to all tasks. transwarp will not directly use this.
1769  /// This is only useful if something else is using this custom data (e.g. a custom executor)
1770  void set_custom_data_all(std::any custom_data) override {
1771  ensure_task_not_running();
1772  transwarp::detail::set_custom_data_visitor visitor{std::move(custom_data)};
1773  visit_all(visitor);
1774  }
1775 
1776  /// Removes custom data from this task
1777  void remove_custom_data() override {
1778  ensure_task_not_running();
1779  custom_data_ = {};
1780  }
1781 
1782  /// Removes custom data from all tasks
1783  void remove_custom_data_all() override {
1784  ensure_task_not_running();
1786  visit_all(visitor);
1787  }
1788 
1789  /// Returns the future associated to the underlying execution
1790  const std::shared_future<result_type>& future() const noexcept override {
1791  return future_;
1792  }
1793 
1794  /// Adds a new listener for all event types
1795  void add_listener(std::shared_ptr<transwarp::listener> listener) override {
1796  ensure_task_not_running();
1797  check_listener(listener);
1798  for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1799  l.push_back(listener);
1800  }
1801  }
1802 
1803  /// Adds a new listener for the given event type only
1804  void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
1805  ensure_task_not_running();
1806  check_listener(listener);
1807  listeners_[event_index(event)].push_back(std::move(listener));
1808  }
1809 
1810  /// Adds a new listener for all event types and for all parents
1811  void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
1812  ensure_task_not_running();
1813  transwarp::detail::add_listener_visitor visitor{std::move(listener)};
1814  visit_all(visitor);
1815  }
1816 
1817  /// Adds a new listener for the given event type only and for all parents
1818  void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
1819  ensure_task_not_running();
1820  transwarp::detail::add_listener_per_event_visitor visitor{event, std::move(listener)};
1821  visit_all(visitor);
1822  }
1823 
1824  /// Removes the listener for all event types
1825  void remove_listener(const std::shared_ptr<transwarp::listener>& listener) override {
1826  ensure_task_not_running();
1827  check_listener(listener);
1828  for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1829  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1830  }
1831  }
1832 
1833  /// Removes the listener for the given event type only
1834  void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
1835  ensure_task_not_running();
1836  check_listener(listener);
1837  std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_[event_index(event)];
1838  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1839  }
1840 
1841  /// Removes the listener for all event types and for all parents
1842  void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
1843  ensure_task_not_running();
1844  transwarp::detail::remove_listener_visitor visitor{std::move(listener)};
1845  visit_all(visitor);
1846  }
1847 
1848  /// Removes the listener for the given event type only and for all parents
1849  void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
1850  ensure_task_not_running();
1851  transwarp::detail::remove_listener_per_event_visitor visitor{event, std::move(listener)};
1852  visit_all(visitor);
1853  }
1854 
1855  /// Removes all listeners
1856  void remove_listeners() override {
1857  ensure_task_not_running();
1858  for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1859  l.clear();
1860  }
1861  }
1862 
1863  /// Removes all listeners for the given event type
1865  ensure_task_not_running();
1866  listeners_[event_index(event)].clear();
1867  }
1868 
1869  /// Removes all listeners and for all parents
1870  void remove_listeners_all() override {
1871  ensure_task_not_running();
1873  visit_all(visitor);
1874  }
1875 
1876  /// Removes all listeners for the given event type and for all parents
1878  ensure_task_not_running();
1880  visit_all(visitor);
1881  }
1882 
1883  /// Schedules this task for execution on the caller thread.
1884  /// The task-specific executor gets precedence if it exists.
1885  /// This overload will reset the underlying future.
1886  void schedule() override {
1887  ensure_task_not_running();
1888  this->schedule_impl(true);
1889  }
1890 
1891  /// Schedules this task for execution on the caller thread.
1892  /// The task-specific executor gets precedence if it exists.
1893  /// reset denotes whether schedule should reset the underlying
1894  /// future and schedule even if the future is already valid.
1895  void schedule(bool reset) override {
1896  ensure_task_not_running();
1897  this->schedule_impl(reset);
1898  }
1899 
1900  /// Schedules this task for execution using the provided executor.
1901  /// The task-specific executor gets precedence if it exists.
1902  /// This overload will reset the underlying future.
1904  ensure_task_not_running();
1905  this->schedule_impl(true, &executor);
1906  }
1907 
1908  /// Schedules this task for execution using the provided executor.
1909  /// The task-specific executor gets precedence if it exists.
1910  /// reset denotes whether schedule should reset the underlying
1911  /// future and schedule even if the future is already valid.
1912  void schedule(transwarp::executor& executor, bool reset) override {
1913  ensure_task_not_running();
1914  this->schedule_impl(reset, &executor);
1915  }
1916 
1917  /// Schedules all tasks in the graph for execution on the caller thread.
1918  /// The task-specific executors get precedence if they exist.
1919  /// This overload will reset the underlying futures.
1920  void schedule_all() override {
1921  ensure_task_not_running();
1922  schedule_all_impl(true);
1923  }
1924 
1925  /// Schedules all tasks in the graph for execution using the provided executor.
1926  /// The task-specific executors get precedence if they exist.
1927  /// This overload will reset the underlying futures.
1929  ensure_task_not_running();
1930  schedule_all_impl(true, &executor);
1931  }
1932 
1933  /// Schedules all tasks in the graph for execution on the caller thread.
1934  /// The task-specific executors get precedence if they exist.
1935  /// reset_all denotes whether schedule_all should reset the underlying
1936  /// futures and schedule even if the futures are already present.
1937  void schedule_all(bool reset_all) override {
1938  ensure_task_not_running();
1939  schedule_all_impl(reset_all);
1940  }
1941 
1942  /// Schedules all tasks in the graph for execution using the provided executor.
1943  /// The task-specific executors get precedence if they exist.
1944  /// reset_all denotes whether schedule_all should reset the underlying
1945  /// futures and schedule even if the futures are already present.
1946  void schedule_all(transwarp::executor& executor, bool reset_all) override {
1947  ensure_task_not_running();
1948  schedule_all_impl(reset_all, &executor);
1949  }
1950 
1951  /// Assigns an exception to this task. Scheduling will have no effect after an exception
1952  /// has been set. Calling reset() will remove the exception and re-enable scheduling.
1953  void set_exception(std::exception_ptr exception) override {
1954  ensure_task_not_running();
1955  future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
1956  schedule_mode_ = false;
1957  }
1958 
1959  /// Returns whether the task was scheduled and not reset afterwards.
1960  /// This means that the underlying future is valid
1961  bool was_scheduled() const noexcept override {
1962  return future_.valid();
1963  }
1964 
1965  /// Waits for the task to complete. Should only be called if was_scheduled()
1966  /// is true, throws transwarp::control_error otherwise
1967  void wait() const override {
1968  ensure_task_was_scheduled();
1969  future_.wait();
1970  }
1971 
1972  /// Returns whether the task has finished processing. Should only be called
1973  /// if was_scheduled() is true, throws transwarp::control_error otherwise
1974  bool is_ready() const override {
1975  ensure_task_was_scheduled();
1976  return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
1977  }
1978 
1979  /// Returns whether this task contains a result
1980  bool has_result() const noexcept override {
1981  return was_scheduled() && future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
1982  }
1983 
1984  /// Resets this task
1985  void reset() override {
1986  ensure_task_not_running();
1987  future_ = std::shared_future<result_type>{};
1988  cancel(false);
1989  schedule_mode_ = true;
1990  }
1991 
1992  /// Resets all tasks in the graph
1993  void reset_all() override {
1994  ensure_task_not_running();
1996  visit_all(visitor);
1997  }
1998 
1999  /// If enabled then this task is canceled which will
2000  /// throw transwarp::task_canceled when retrieving the task result.
2001  /// Passing false is equivalent to resume.
2002  void cancel(bool enabled) noexcept override {
2003  canceled_ = enabled;
2004  }
2005 
2006  /// If enabled then all pending tasks in the graph are canceled which will
2007  /// throw transwarp::task_canceled when retrieving the task result.
2008  /// Passing false is equivalent to resume.
2009  void cancel_all(bool enabled) noexcept override {
2010  transwarp::detail::cancel_visitor visitor{enabled};
2011  visit_all(visitor);
2012  }
2013 
2014  /// Returns the task's parents (may be empty)
2015  std::vector<transwarp::itask*> parents() const override {
2017  }
2018 
2019  /// Returns all tasks in the graph in breadth order
2020  const std::vector<transwarp::itask*>& tasks() override {
2021  finalize();
2022  return tasks_;
2023  }
2024 
2025  /// Returns all edges in the graph. This is mainly for visualizing
2026  /// the tasks and their interdependencies. Pass the result into transwarp::to_string
2027  /// to retrieve a dot-style graph representation for easy viewing.
2028  std::vector<transwarp::edge> edges() override {
2029  std::vector<transwarp::edge> edges;
2030  transwarp::detail::edges_visitor visitor{edges};
2031  visit_all(visitor);
2032  return edges;
2033  }
2034 
2035 protected:
2036 
2037  task_impl_base() = default;
2038 
2039  template<typename F>
2041  : functor_{new Functor{std::forward<F>(functor)}},
2042  parents_{std::move(parents)...}
2043  {
2044  init();
2045  }
2046 
2047  template<typename F, typename P>
2048  task_impl_base(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2049  : functor_{new Functor{std::forward<F>(functor)}},
2050  parents_{std::move(parents)}
2051  {
2052  if (parents_.empty()) {
2053  throw transwarp::invalid_parameter{"parents are empty"};
2054  }
2055  init();
2056  }
2057 
2058  void init() {
2059  set_type(task_type::value);
2060  transwarp::detail::assign_task_if(*functor_, *this);
2062  }
2063 
2064  /// Checks if the task is currently running and throws transwarp::control_error if it is
2066  if (future_.valid() && future_.wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
2067  throw transwarp::control_error("task currently running: " + transwarp::to_string(*this, " "));
2068  }
2069  }
2070 
2071  /// Checks if the task was scheduled and throws transwarp::control_error if it's not
2073  if (!future_.valid()) {
2074  throw transwarp::control_error{"task was not scheduled: " + transwarp::to_string(*this, " ")};
2075  }
2076  }
2077 
2078  template<typename R, typename Y, typename T, typename P>
2079  friend class transwarp::detail::runner;
2080 
2081  template<typename R, typename T, typename... A>
2082  friend R transwarp::detail::run_task(std::size_t, const std::weak_ptr<T>&, A&&...);
2083 
2084  /// Assigns the given id
2085  void set_id(std::size_t id) noexcept override {
2086  id_ = id;
2087  }
2088 
2089  /// Assigns the given level
2090  void set_level(std::size_t level) noexcept override {
2091  level_ = level;
2092  }
2093 
2094  /// Assigns the given type
2095  void set_type(transwarp::task_type type) noexcept override {
2096  type_ = type;
2097  }
2098 
2099  /// Assigns the given name
2100  void set_name(std::optional<std::string> name) noexcept override {
2101  name_ = std::move(name);
2102  }
2103 
2104  /// Assigns the given idletime
2105  void set_avg_idletime_us(std::int64_t idletime) noexcept override {
2106  avg_idletime_us_ = idletime;
2107  }
2108 
2109  /// Assigns the given waittime
2110  void set_avg_waittime_us(std::int64_t waittime) noexcept override {
2111  avg_waittime_us_ = waittime;
2112  }
2113 
2114  /// Assigns the given runtime
2115  void set_avg_runtime_us(std::int64_t runtime) noexcept override {
2116  avg_runtime_us_ = runtime;
2117  }
2118 
2119  /// Schedules this task for execution using the provided executor.
2120  /// The task-specific executor gets precedence if it exists.
2121  /// Runs the task on the same thread as the caller if neither the global
2122  /// nor the task-specific executor is found.
2123  void schedule_impl(bool reset, transwarp::executor* executor=nullptr) override {
2124  if (schedule_mode_ && (reset || !future_.valid())) {
2125  if (reset) {
2126  cancel(false);
2127  }
2128  std::weak_ptr<task_impl_base> self = this->shared_from_this();
2130  std::shared_ptr<runner_t> runner = std::shared_ptr<runner_t>{new runner_t{this->id(), self, parents_}};
2132  future_ = runner->future();
2133  if (this->executor_) {
2134  this->executor_->execute([runner]{ (*runner)(); }, *this);
2135  } else if (executor) {
2136  executor->execute([runner]{ (*runner)(); }, *this);
2137  } else {
2138  (*runner)();
2139  }
2140  }
2141  }
2142 
2143  /// Schedules all tasks in the graph for execution using the provided executor.
2144  /// The task-specific executors get precedence if they exist.
2145  /// Runs tasks on the same thread as the caller if neither the global
2146  /// nor a task-specific executor is found.
2147  void schedule_all_impl(bool reset_all, transwarp::executor* executor=nullptr) {
2148  transwarp::detail::schedule_visitor visitor{reset_all, executor};
2149  visit_all(visitor);
2150  }
2151 
2152  /// Visits each task in a depth-first traversal
2153  void visit(const std::function<void(transwarp::itask&)>& visitor) override {
2154  if (!visited_) {
2156  visitor(*this);
2157  visited_ = true;
2158  }
2159  }
2160 
2161  /// Traverses through each task and marks them as not visited.
2162  void unvisit() noexcept override {
2163  if (visited_) {
2164  visited_ = false;
2166  }
2167  }
2168 
2169  /// Visits all tasks
2170  template<typename Visitor>
2171  void visit_all(Visitor& visitor) {
2172  finalize();
2173  for (transwarp::itask* t : tasks_) {
2174  visitor(*t);
2175  }
2176  }
2177 
2178  /// Returns the index for a given event type
2179  std::size_t event_index(transwarp::event_type event) const {
2180  const std::size_t index = static_cast<std::size_t>(event);
2181  if (index >= static_cast<std::size_t>(transwarp::event_type::count)) {
2182  throw transwarp::invalid_parameter{"event type"};
2183  }
2184  return index;
2185  }
2186 
2187  /// Raises the given event to all listeners
2189  for (const std::shared_ptr<transwarp::listener>& listener : listeners_[static_cast<std::size_t>(event)]) {
2190  listener->handle_event(event, *this);
2191  }
2192  }
2193 
2194  /// Check for non-null listener pointer
2195  void check_listener(const std::shared_ptr<transwarp::listener>& listener) const {
2196  if (!listener) {
2197  throw transwarp::invalid_parameter{"listener pointer"};
2198  }
2199  }
2200 
2201  std::size_t id_ = 0;
2202  std::size_t level_ = 0;
2204  std::optional<std::string> name_;
2205  std::shared_ptr<transwarp::executor> executor_;
2206  std::int64_t priority_ = 0;
2207  std::any custom_data_;
2208  std::atomic<bool> canceled_{false};
2209  std::atomic<std::int64_t> avg_idletime_us_{-1};
2210  std::atomic<std::int64_t> avg_waittime_us_{-1};
2211  std::atomic<std::int64_t> avg_runtime_us_{-1};
2212  bool schedule_mode_ = true;
2213  std::shared_future<result_type> future_;
2214  std::unique_ptr<Functor> functor_;
2215  transwarp::detail::parents_t<ParentResults...> parents_;
2216  bool visited_ = false;
2217  std::array<std::vector<std::shared_ptr<transwarp::listener>>, static_cast<std::size_t>(transwarp::event_type::count)> listeners_;
2218  std::vector<transwarp::itask*> tasks_;
2219 };
2220 
2221 
2222 /// A task proxy
2223 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2224 class task_impl_proxy : public transwarp::detail::task_impl_base<ResultType, TaskType, Functor, ParentResults...> {
2225 public:
2226  /// The task type
2227  using task_type = TaskType;
2228 
2229  /// The result type of this task
2230  using result_type = ResultType;
2231 
2232  /// Assigns a value to this task. Scheduling will have no effect after a value
2233  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2234  void set_value(const transwarp::decay_t<result_type>& value) override {
2235  this->ensure_task_not_running();
2236  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2237  this->schedule_mode_ = false;
2238  }
2239 
2240  /// Assigns a value to this task. Scheduling will have no effect after a value
2241  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2243  this->ensure_task_not_running();
2244  this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2245  this->schedule_mode_ = false;
2246  }
2247 
2248  /// Returns the result of this task. Throws any exceptions that the underlying
2249  /// functor throws. Should only be called if was_scheduled() is true,
2250  /// throws transwarp::control_error otherwise
2251  transwarp::result_t<result_type> get() const override {
2252  this->ensure_task_was_scheduled();
2253  return this->future_.get();
2254  }
2255 
2256 protected:
2257 
2258  task_impl_proxy() = default;
2259 
2260  template<typename F>
2262  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)...}
2263  {}
2264 
2265  template<typename F, typename P>
2266  task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2267  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2268  {}
2269 
2270 };
2271 
2272 /// A task proxy for reference result type.
2273 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2274 class task_impl_proxy<ResultType&, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<ResultType&, TaskType, Functor, ParentResults...> {
2275 public:
2276  /// The task type
2277  using task_type = TaskType;
2278 
2279  /// The result type of this task
2280  using result_type = ResultType&;
2281 
2282  /// Assigns a value to this task. Scheduling will have no effect after a value
2283  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2285  this->ensure_task_not_running();
2286  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2287  this->schedule_mode_ = false;
2288  }
2289 
2290  /// Returns the result of this task. Throws any exceptions that the underlying
2291  /// functor throws. Should only be called if was_scheduled() is true,
2292  /// throws transwarp::control_error otherwise
2293  transwarp::result_t<result_type> get() const override {
2294  this->ensure_task_was_scheduled();
2295  return this->future_.get();
2296  }
2297 
2298 protected:
2299 
2300  task_impl_proxy() = default;
2301 
2302  template<typename F>
2304  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)...}
2305  {}
2306 
2307  template<typename F, typename P>
2308  task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2309  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2310  {}
2311 
2312 };
2313 
2314 /// A task proxy for void result type.
2315 template<typename TaskType, typename Functor, typename... ParentResults>
2316 class task_impl_proxy<void, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<void, TaskType, Functor, ParentResults...> {
2317 public:
2318  /// The task type
2319  using task_type = TaskType;
2320 
2321  /// The result type of this task
2322  using result_type = void;
2323 
2324  /// Assigns a value to this task. Scheduling will have no effect after a call
2325  /// to this. Calling reset() will reset this and re-enable scheduling.
2326  void set_value() override {
2327  this->ensure_task_not_running();
2328  this->future_ = transwarp::detail::make_ready_future();
2329  this->schedule_mode_ = false;
2330  }
2331 
2332  /// Blocks until the task finishes. Throws any exceptions that the underlying
2333  /// functor throws. Should only be called if was_scheduled() is true,
2334  /// throws transwarp::control_error otherwise
2335  void get() const override {
2336  this->ensure_task_was_scheduled();
2337  this->future_.get();
2338  }
2339 
2340 protected:
2341 
2342  task_impl_proxy() = default;
2343 
2344  template<typename F>
2346  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)...}
2347  {}
2348 
2349  template<typename F, typename P>
2350  task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2351  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2352  {}
2353 
2354 };
2355 
2356 } // detail
2357 
2358 
2359 /// A task representing a piece of work given by functor and parent tasks.
2360 /// By connecting tasks a directed acyclic graph is built.
2361 /// Tasks should be created using the make_task factory functions.
2362 template<typename TaskType, typename Functor, typename... ParentResults>
2363 class task_impl : public transwarp::detail::task_impl_proxy<transwarp::detail::functor_result_t<TaskType, Functor, ParentResults...>, TaskType, Functor, ParentResults...> {
2364 public:
2365  /// The task type
2366  using task_type = TaskType;
2367 
2368  /// The result type of this task
2369  using result_type = transwarp::detail::functor_result_t<TaskType, Functor, ParentResults...>;
2370 
2371  /// A task is defined functor and parent tasks.
2372  /// Note: Don't use this constructor directly, use transwarp::make_task
2373  template<typename F>
2374  task_impl(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2375  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)...}
2376  {}
2377 
2378  /// A task is defined functor and parent tasks.
2379  /// Note: Don't use this constructor directly, use transwarp::make_task
2380  template<typename F, typename P>
2381  task_impl(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2382  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2383  {}
2384 
2385  // delete copy/move semantics
2386  task_impl(const task_impl&) = delete;
2387  task_impl& operator=(const task_impl&) = delete;
2388  task_impl(task_impl&&) = delete;
2389  task_impl& operator=(task_impl&&) = delete;
2390 
2391  /// Gives this task a name and returns a ptr to itself
2392  std::shared_ptr<task_impl> named(std::string name) {
2393  this->set_name(std::make_optional(std::move(name)));
2394  return std::dynamic_pointer_cast<task_impl>(this->shared_from_this());
2395  }
2396 
2397  /// Creates a continuation to this task
2398  template<typename TaskType_, typename Functor_>
2399  auto then(TaskType_, Functor_&& functor) {
2401  return std::shared_ptr<task_t>{new task_t{std::forward<Functor_>(functor), this->shared_from_this()}};
2402  }
2403 
2404  /// Clones this task and casts the result to a ptr to task_impl
2405  std::shared_ptr<task_impl> clone_cast() const {
2406  return std::dynamic_pointer_cast<task_impl>(this->clone());
2407  }
2408 
2409 private:
2410 
2411  task_impl() = default;
2412 
2413  std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const override {
2414  auto t = std::shared_ptr<task_impl>{new task_impl};
2415  t->id_ = this->id_;
2416  t->level_ = this->level_;
2417  t->type_ = this->type_;
2418  t->name_ = this->name_;
2419  t->executor_ = this->executor_;
2420  t->priority_ = this->priority_;
2421  t->custom_data_ = this->custom_data_;
2422  t->canceled_ = this->canceled_.load();
2423  t->avg_idletime_us_ = this->avg_idletime_us_.load();
2424  t->avg_waittime_us_ = this->avg_waittime_us_.load();
2425  t->avg_runtime_us_ = this->avg_runtime_us_.load();
2426  t->schedule_mode_ = this->schedule_mode_;
2427  if (this->has_result()) {
2428  try {
2429  if constexpr (std::is_void_v<result_type>) {
2430  this->future_.get();
2431  t->future_ = transwarp::detail::make_ready_future();
2432  } else {
2433  t->future_ = transwarp::detail::make_future_with_value<result_type>(this->future_.get());
2434  }
2435  } catch (...) {
2436  t->future_ = transwarp::detail::make_future_with_exception<result_type>(std::current_exception());
2437  }
2438  }
2439  t->functor_ = std::unique_ptr<Functor>{new Functor{*this->functor_}};
2440  t->parents_ = transwarp::detail::parents<ParentResults...>::clone(task_cache, this->parents_);
2441  t->visited_ = this->visited_;
2442  t->executor_ = this->executor_;
2443  t->listeners_ = this->listeners_;
2444  return t;
2445  }
2446 
2447 };
2448 
2449 
2450 /// A value task that stores a single value and doesn't require scheduling.
2451 /// Value tasks should be created using the make_value_task factory functions.
2452 template<typename ResultType>
2453 class value_task : public transwarp::task<ResultType>,
2454  public std::enable_shared_from_this<value_task<ResultType>> {
2455 public:
2456  /// The task type
2458 
2459  /// The result type of this task
2460  using result_type = ResultType;
2461 
2462  /// A value task is defined by a given value.
2463  /// Note: Don't use this constructor directly, use transwarp::make_value_task
2464  template<typename T>
2465  value_task(T&& value)
2466  : future_{transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value))}
2467  {}
2468 
2469  // delete copy/move semantics
2470  value_task(const value_task&) = delete;
2471  value_task& operator=(const value_task&) = delete;
2472  value_task(value_task&&) = delete;
2473  value_task& operator=(value_task&&) = delete;
2474 
2475  /// Gives this task a name and returns a ptr to itself
2476  std::shared_ptr<value_task> named(std::string name) {
2477  set_name(std::make_optional(std::move(name)));
2478  return this->shared_from_this();
2479  }
2480 
2481  /// Creates a continuation to this task
2482  template<typename TaskType_, typename Functor_>
2483  auto then(TaskType_, Functor_&& functor) {
2485  return std::shared_ptr<task_t>{new task_t{std::forward<Functor_>(functor), this->shared_from_this()}};
2486  }
2487 
2488  /// Clones this task and casts the result to a ptr to value_task
2489  std::shared_ptr<value_task> clone_cast() const {
2490  return std::dynamic_pointer_cast<value_task>(this->clone());
2491  }
2492 
2493  /// Nothing to be done to finalize a value task
2494  void finalize() override {}
2495 
2496  /// The task's id
2497  std::size_t id() const noexcept override {
2498  return id_;
2499  }
2500 
2501  /// The task's level
2502  std::size_t level() const noexcept override {
2503  return 0;
2504  }
2505 
2506  /// The task's type
2507  transwarp::task_type type() const noexcept override {
2509  }
2510 
2511  /// The optional task name
2512  const std::optional<std::string>& name() const noexcept {
2513  return name_;
2514  }
2515 
2516  /// Value tasks don't have executors as they don't run
2517  std::shared_ptr<transwarp::executor> executor() const noexcept override {
2518  return nullptr;
2519  }
2520 
2521  /// Returns the task priority
2522  std::int64_t priority() const noexcept override {
2523  return priority_;
2524  }
2525 
2526  /// The custom task data (may not hold a value)
2527  const std::any& custom_data() const noexcept override {
2528  return custom_data_;
2529  }
2530 
2531  /// Value tasks cannot be canceled
2532  bool canceled() const noexcept override {
2533  return false;
2534  }
2535 
2536  /// Returns -1 as value tasks don't run
2537  std::int64_t avg_idletime_us() const noexcept override {
2538  return -1;
2539  }
2540 
2541  /// Returns -1 as value tasks don't run
2542  std::int64_t avg_waittime_us() const noexcept override {
2543  return -1;
2544  }
2545 
2546  /// Returns -1 as value tasks don't run
2547  std::int64_t avg_runtime_us() const noexcept override {
2548  return -1;
2549  }
2550 
2551  /// No-op because a value task never runs
2552  void set_executor(std::shared_ptr<transwarp::executor>) override {}
2553 
2554  /// No-op because a value task never runs and doesn't have parents
2555  void set_executor_all(std::shared_ptr<transwarp::executor>) override {}
2556 
2557  /// No-op because a value task never runs
2558  void remove_executor() override {}
2559 
2560  /// No-op because a value task never runs and doesn't have parents
2561  void remove_executor_all() override {}
2562 
2563  /// Sets a task priority (defaults to 0). transwarp will not directly use this.
2564  /// This is only useful if something else is using the priority
2565  void set_priority(std::int64_t priority) override {
2566  priority_ = priority;
2567  }
2568 
2569  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
2570  /// This is only useful if something else is using the priority
2571  void set_priority_all(std::int64_t priority) override {
2572  set_priority(priority);
2573  }
2574 
2575  /// Resets the task priority to 0
2576  void reset_priority() override {
2577  priority_ = 0;
2578  }
2579 
2580  /// Resets the priority of all tasks to 0
2581  void reset_priority_all() override {
2582  reset_priority();
2583  }
2584 
2585  /// Assigns custom data to this task. transwarp will not directly use this.
2586  /// This is only useful if something else is using this custom data
2587  void set_custom_data(std::any custom_data) override {
2588  if (!custom_data.has_value()) {
2589  throw transwarp::invalid_parameter{"custom data"};
2590  }
2591  custom_data_ = std::move(custom_data);
2592  }
2593 
2594  /// Assigns custom data to all tasks. transwarp will not directly use this.
2595  /// This is only useful if something else is using this custom data
2596  void set_custom_data_all(std::any custom_data) override {
2597  set_custom_data(std::move(custom_data));
2598  }
2599 
2600  /// Removes custom data from this task
2601  void remove_custom_data() override {
2602  custom_data_ = {};
2603  }
2604 
2605  /// Removes custom data from all tasks
2606  void remove_custom_data_all() override {
2607  remove_custom_data();
2608  }
2609 
2610  /// Returns the future associated to the underlying execution
2611  const std::shared_future<result_type>& future() const noexcept override {
2612  return future_;
2613  }
2614 
2615  /// No-op because a value task doesn't raise events
2616  void add_listener(std::shared_ptr<transwarp::listener>) override {}
2617 
2618  /// No-op because a value task doesn't raise events
2619  void add_listener(transwarp::event_type, std::shared_ptr<transwarp::listener>) override {}
2620 
2621  /// No-op because a value task doesn't raise events
2622  void add_listener_all(std::shared_ptr<transwarp::listener>) override {}
2623 
2624  /// No-op because a value task doesn't raise events
2625  void add_listener_all(transwarp::event_type, std::shared_ptr<transwarp::listener>) override {}
2626 
2627  /// No-op because a value task doesn't raise events
2628  void remove_listener(const std::shared_ptr<transwarp::listener>&) override {}
2629 
2630  /// No-op because a value task doesn't raise events
2631  void remove_listener(transwarp::event_type, const std::shared_ptr<transwarp::listener>&) override {}
2632 
2633  /// No-op because a value task doesn't raise events
2634  void remove_listener_all(const std::shared_ptr<transwarp::listener>&) override {}
2635 
2636  /// No-op because a value task doesn't raise events
2637  void remove_listener_all(transwarp::event_type, const std::shared_ptr<transwarp::listener>&) override {}
2638 
2639  /// No-op because a value task doesn't raise events
2640  void remove_listeners() override {}
2641 
2642  /// No-op because a value task doesn't raise events
2644 
2645  /// No-op because a value task doesn't raise events
2646  void remove_listeners_all() override {}
2647 
2648  /// No-op because a value task doesn't raise events
2650 
2651  /// No-op because a value task never runs
2652  void schedule() override {}
2653 
2654  /// No-op because a value task never runs
2655  void schedule(transwarp::executor&) override {}
2656 
2657  /// No-op because a value task never runs
2658  void schedule(bool) override {}
2659 
2660  /// No-op because a value task never runs
2661  void schedule(transwarp::executor&, bool) override {}
2662 
2663  /// No-op because a value task never runs and doesn't have parents
2664  void schedule_all() override {}
2665 
2666  /// No-op because a value task never runs and doesn't have parents
2668 
2669  /// No-op because a value task never runs and doesn't have parents
2670  void schedule_all(bool) override {}
2671 
2672  /// No-op because a value task never runs and doesn't have parents
2673  void schedule_all(transwarp::executor&, bool) override {}
2674 
2675  /// Assigns a value to this task
2676  void set_value(const transwarp::decay_t<result_type>& value) override {
2677  future_ = transwarp::detail::make_future_with_value<result_type>(value);
2678  }
2679 
2680  /// Assigns a value to this task
2682  future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2683  };
2684 
2685  /// Assigns an exception to this task
2686  void set_exception(std::exception_ptr exception) override {
2687  future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2688  }
2689 
2690  /// Returns true because a value task is scheduled once on construction
2691  bool was_scheduled() const noexcept override {
2692  return true;
2693  }
2694 
2695  /// No-op because a value task never runs
2696  void wait() const override {}
2697 
2698  /// Returns true because a value task is always ready
2699  bool is_ready() const override {
2700  return true;
2701  }
2702 
2703  /// Returns true because a value task always contains a result
2704  bool has_result() const noexcept override {
2705  return true;
2706  }
2707 
2708  /// Returns the result of this task
2709  transwarp::result_t<result_type> get() const override {
2710  return future_.get();
2711  }
2712 
2713  /// No-op because a value task never runs
2714  void reset() override {}
2715 
2716  /// No-op because a value task never runs and doesn't have parents
2717  void reset_all() override {}
2718 
2719  /// No-op because a value task never runs
2720  void cancel(bool) noexcept override {}
2721 
2722  /// No-op because a value task never runs and doesn't have parents
2723  void cancel_all(bool) noexcept override {}
2724 
2725  /// Empty because a value task doesn't have parents
2726  std::vector<transwarp::itask*> parents() const override {
2727  return {};
2728  }
2729 
2730  /// Returns all tasks in the graph in breadth order
2731  const std::vector<transwarp::itask*>& tasks() override {
2732  return tasks_;
2733  }
2734 
2735  /// Returns empty edges because a value task doesn't have parents
2736  std::vector<transwarp::edge> edges() override {
2737  return {};
2738  }
2739 
2740 private:
2741 
2742  value_task() = default;
2743 
2744  std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>&) const override {
2745  auto t = std::shared_ptr<value_task>{new value_task{}};
2746  t->id_ = id_;
2747  t->name_ = name_;
2748  t->priority_ = priority_;
2749  t->custom_data_ = custom_data_;
2750  try {
2751  t->set_value(future_.get());
2752  } catch (...) {
2753  t->set_exception(std::current_exception());
2754  }
2755  t->visited_ = visited_;
2756  return t;
2757  }
2758 
2759  /// Assigns the given id
2760  void set_id(std::size_t id) noexcept override {
2761  id_ = id;
2762  }
2763 
2764  /// No-op as value tasks are always at level 0
2765  void set_level(std::size_t) noexcept override {}
2766 
2767  /// No-op as value tasks are always root tasks
2768  void set_type(transwarp::task_type) noexcept override {}
2769 
2770  /// Assigns the given name
2771  void set_name(std::optional<std::string> name) noexcept override {
2772  name_ = std::move(name);
2773  }
2774 
2775  /// No-op as value tasks don't run
2776  void set_avg_idletime_us(std::int64_t) noexcept override {}
2777 
2778  /// No-op as value tasks don't run
2779  void set_avg_waittime_us(std::int64_t) noexcept override {}
2780 
2781  /// No-op as value tasks don't run
2782  void set_avg_runtime_us(std::int64_t) noexcept override {}
2783 
2784  /// No-op because a value task never runs
2785  void schedule_impl(bool, transwarp::executor*) override {}
2786 
2787  /// Visits this task
2788  void visit(const std::function<void(transwarp::itask&)>& visitor) override {
2789  if (!visited_) {
2790  visitor(*this);
2791  visited_ = true;
2792  }
2793  }
2794 
2795  /// Marks this task as not visited
2796  void unvisit() noexcept override {
2797  visited_ = false;
2798  }
2799 
2800  std::size_t id_ = 0;
2801  std::optional<std::string> name_;
2802  std::size_t priority_ = 0;
2803  std::any custom_data_;
2804  std::shared_future<result_type> future_;
2805  bool visited_ = false;
2806  std::vector<transwarp::itask*> tasks_{this};
2807 };
2808 
2809 
2810 /// A factory function to create a new task
2811 template<typename TaskType, typename Functor, typename... Parents>
2812 auto make_task(TaskType, Functor&& functor, std::shared_ptr<Parents>... parents) {
2813  using task_t = transwarp::task_impl<TaskType, std::decay_t<Functor>, typename Parents::result_type...>;
2814  return std::shared_ptr<task_t>{new task_t{std::forward<Functor>(functor), std::move(parents)...}};
2815 }
2816 
2817 
2818 /// A factory function to create a new task with vector parents
2819 template<typename TaskType, typename Functor, typename ParentType>
2820 auto make_task(TaskType, Functor&& functor, std::vector<ParentType> parents) {
2821  using task_t = transwarp::task_impl<TaskType, std::decay_t<Functor>, std::vector<ParentType>>;
2822  return std::shared_ptr<task_t>{new task_t{std::forward<Functor>(functor), std::move(parents)}};
2823 }
2824 
2825 
2826 /// A factory function to create a new value task
2827 template<typename Value>
2828 auto make_value_task(Value&& value) {
2830  return std::shared_ptr<task_t>{new task_t{std::forward<Value>(value)}};
2831 }
2832 
2833 
2834 /// A function similar to std::for_each but returning a transwarp task for
2835 /// deferred, possibly asynchronous execution. This function creates a graph
2836 /// with std::distance(first, last) root tasks
2837 template<typename InputIt, typename UnaryOperation>
2838 auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) {
2839  const auto distance = std::distance(first, last);
2840  if (distance <= 0) {
2841  throw transwarp::invalid_parameter{"first or last"};
2842  }
2843  std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
2844  tasks.reserve(static_cast<std::size_t>(distance));
2845  for (; first != last; ++first) {
2846  tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first]{ unary_op(*first); }));
2847  }
2849  final->finalize();
2850  return final;
2851 }
2852 
2853 /// A function similar to std::for_each but returning a transwarp task for
2854 /// deferred, possibly asynchronous execution. This function creates a graph
2855 /// with std::distance(first, last) root tasks.
2856 /// Overload for automatic scheduling by passing an executor.
2857 template<typename InputIt, typename UnaryOperation>
2858 auto for_each(transwarp::executor& executor, InputIt first, InputIt last, UnaryOperation unary_op) {
2859  auto task = transwarp::for_each(first, last, unary_op);
2860  task->schedule_all(executor);
2861  return task;
2862 }
2863 
2864 
2865 /// A function similar to std::transform but returning a transwarp task for
2866 /// deferred, possibly asynchronous execution. This function creates a graph
2867 /// with std::distance(first1, last1) root tasks
2868 template<typename InputIt, typename OutputIt, typename UnaryOperation>
2869 auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) {
2870  const auto distance = std::distance(first1, last1);
2871  if (distance <= 0) {
2872  throw transwarp::invalid_parameter{"first1 or last1"};
2873  }
2874  std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
2875  tasks.reserve(static_cast<std::size_t>(distance));
2876  for (; first1 != last1; ++first1, ++d_first) {
2877  tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first1,d_first]{ *d_first = unary_op(*first1); }));
2878  }
2880  final->finalize();
2881  return final;
2882 }
2883 
2884 /// A function similar to std::transform but returning a transwarp task for
2885 /// deferred, possibly asynchronous execution. This function creates a graph
2886 /// with std::distance(first1, last1) root tasks.
2887 /// Overload for automatic scheduling by passing an executor.
2888 template<typename InputIt, typename OutputIt, typename UnaryOperation>
2889 auto transform(transwarp::executor& executor, InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) {
2890  auto task = transwarp::transform(first1, last1, d_first, unary_op);
2891  task->schedule_all(executor);
2892  return task;
2893 }
2894 
2895 
2896 /// A task pool that allows running multiple instances of the same task in parallel.
2897 template<typename ResultType>
2898 class task_pool {
2899 public:
2900 
2901  /// Constructs a task pool
2903  std::size_t minimum_size,
2904  std::size_t maximum_size)
2905  : task_{std::move(task)},
2906  minimum_{minimum_size},
2907  maximum_{maximum_size},
2908  finished_{maximum_size}
2909  {
2910  if (minimum_ < 1) {
2911  throw transwarp::invalid_parameter{"minimum size"};
2912  }
2913  if (minimum_ > maximum_) {
2914  throw transwarp::invalid_parameter{"minimum or maximum size"};
2915  }
2916  task_->add_listener(transwarp::event_type::after_finished, listener_);
2917  for (std::size_t i=0; i<minimum_; ++i) {
2918  idle_.push(task_->clone());
2919  }
2920  }
2921 
2922  /// Constructs a task pool with reasonable defaults for minimum and maximum
2923  explicit
2925  : task_pool{std::move(task), 32, 65536}
2926  {}
2927 
2928  // delete copy/move semantics
2929  task_pool(const task_pool&) = delete;
2930  task_pool& operator=(const task_pool&) = delete;
2931  task_pool(task_pool&&) = delete;
2932  task_pool& operator=(task_pool&&) = delete;
2933 
2934  /// Returns the next idle task.
2935  /// If there are no idle tasks then it will attempt to double the
2936  /// pool size. If that fails then it will return a nullptr. On successful
2937  /// retrieval of an idle task the function will mark that task as busy.
2938  std::shared_ptr<transwarp::task<ResultType>> next_task(bool maybe_resize=true) {
2939  const transwarp::itask* finished_task{};
2940  {
2941  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
2942  if (!finished_.empty()) {
2943  finished_task = finished_.front(); finished_.pop();
2944  }
2945  }
2946 
2947  std::shared_ptr<transwarp::task<ResultType>> task;
2948  if (finished_task) {
2949  task = busy_.find(finished_task)->second;
2950  } else {
2951  if (maybe_resize && idle_.empty()) {
2952  resize(size() * 2); // double pool size
2953  }
2954  if (idle_.empty()) {
2955  return nullptr;
2956  }
2957  task = idle_.front(); idle_.pop();
2958  busy_.emplace(task.get(), task);
2959  }
2960 
2961  const auto& future = task->future();
2962  if (future.valid()) {
2963  future.wait(); // will return immediately
2964  }
2965  return task;
2966  }
2967 
2968  /// Just like next_task() but waits for a task to become available.
2969  /// The returned graph will always be a valid pointer
2970  std::shared_ptr<transwarp::task<ResultType>> wait_for_next_task(bool maybe_resize=true) {
2971  for (;;) {
2972  std::shared_ptr<transwarp::task<ResultType>> g = next_task(maybe_resize);
2973  if (g) {
2974  return g;
2975  }
2976  }
2977  }
2978 
2979  /// Returns the current total size of the pool (sum of idle and busy tasks)
2980  std::size_t size() const {
2981  return idle_.size() + busy_.size();
2982  }
2983 
2984  /// Returns the minimum size of the pool
2985  std::size_t minimum_size() const {
2986  return minimum_;
2987  }
2988 
2989  /// Returns the maximum size of the pool
2990  std::size_t maximum_size() const {
2991  return maximum_;
2992  }
2993 
2994  /// Returns the number of idle tasks in the pool
2995  std::size_t idle_count() const {
2996  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
2997  return idle_.size() + finished_.size();
2998  }
2999 
3000  /// Returns the number of busy tasks in the pool
3001  std::size_t busy_count() const {
3002  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3003  return busy_.size() - finished_.size();
3004  }
3005 
3006  /// Resizes the task pool to the given new size if possible
3007  void resize(std::size_t new_size) {
3008  reclaim();
3009  if (new_size > size()) { // grow
3010  const std::size_t count = new_size - size();
3011  for (std::size_t i=0; i<count; ++i) {
3012  if (size() == maximum_) {
3013  break;
3014  }
3015  idle_.push(task_->clone());
3016  }
3017  } else if (new_size < size()) { // shrink
3018  const std::size_t count = size() - new_size;
3019  for (std::size_t i=0; i<count; ++i) {
3020  if (idle_.empty() || size() == minimum_) {
3021  break;
3022  }
3023  idle_.pop();
3024  }
3025  }
3026  }
3027 
3028  /// Reclaims finished tasks by marking them as idle again
3029  void reclaim() {
3030  decltype(finished_) finished{finished_.capacity()};
3031  {
3032  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3033  finished_.swap(finished);
3034  }
3035  while (!finished.empty()) {
3036  const transwarp::itask* task = finished.front(); finished.pop();
3037  const auto it = busy_.find(task);
3038  idle_.push(it->second);
3039  busy_.erase(it);
3040  }
3041  }
3042 
3043 private:
3044 
3045  class finished_listener : public transwarp::listener {
3046  public:
3047 
3048  explicit
3049  finished_listener(task_pool<ResultType>& pool)
3050  : pool_{pool}
3051  {}
3052 
3053  // Called on a potentially high-priority thread
3054  void handle_event(transwarp::event_type, const transwarp::itask& task) override {
3055  std::lock_guard<transwarp::detail::spinlock> lock{pool_.spinlock_};
3056  pool_.finished_.push(&task);
3057  }
3058 
3059  private:
3060  task_pool<ResultType>& pool_;
3061  };
3062 
3063  std::shared_ptr<transwarp::task<ResultType>> task_;
3064  std::size_t minimum_;
3065  std::size_t maximum_;
3066  mutable transwarp::detail::spinlock spinlock_; // protecting finished_
3068  std::queue<std::shared_ptr<transwarp::task<ResultType>>> idle_;
3069  std::unordered_map<const transwarp::itask*, std::shared_ptr<transwarp::task<ResultType>>> busy_;
3070  std::shared_ptr<transwarp::listener> listener_{new finished_listener{*this}};
3071 };
3072 
3073 
3074 /// A timer that tracks the average idle, wait, and run time of each task it listens to.
3075 /// - idle = time between scheduling and starting the task (executor dependent)
3076 /// - wait = time between starting and invoking the task's functor, i.e. wait for parent tasks to finish
3077 /// - run = time between invoking and finishing the task's computations
3078 class timer : public transwarp::listener {
3079 public:
3080  timer() = default;
3081 
3082  // delete copy/move semantics
3083  timer(const timer&) = delete;
3084  timer& operator=(const timer&) = delete;
3085  timer(timer&&) = delete;
3086  timer& operator=(timer&&) = delete;
3087 
3088  /// Performs the actual timing and populates the task's timing members
3090  switch (event) {
3092  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3093  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3094  auto& track = tracks_[&task];
3095  track.startidle = now;
3096  }
3097  break;
3099  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3100  track_idletime(task, now);
3101  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3102  auto& track = tracks_[&task];
3103  track.startwait = now;
3104  }
3105  break;
3107  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3108  track_waittime(task, now);
3109  }
3110  break;
3112  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3113  track_waittime(task, now);
3114  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3115  auto& track = tracks_[&task];
3116  track.running = true;
3117  track.startrun = now;
3118  }
3119  break;
3121  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3122  track_runtime(task, now);
3123  }
3124  break;
3125  default: break;
3126  }
3127  }
3128 
3129  /// Resets all timing information
3130  void reset() {
3131  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3132  tracks_.clear();
3133  }
3134 
3135 private:
3136 
3137  void track_idletime(const transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3138  std::int64_t avg_idletime_us;
3139  {
3140  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3141  auto& track = tracks_[&task];
3142  track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3143  ++track.idlecount;
3144  avg_idletime_us = static_cast<std::int64_t>(track.idletime / track.idlecount);
3145  }
3146  const_cast<transwarp::itask&>(task).set_avg_idletime_us(avg_idletime_us);
3147  };
3148 
3149  void track_waittime(const transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3150  std::int64_t avg_waittime_us;
3151  {
3152  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3153  auto& track = tracks_[&task];
3154  track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3155  ++track.waitcount;
3156  avg_waittime_us = static_cast<std::int64_t>(track.waittime / track.waitcount);
3157  }
3158  const_cast<transwarp::itask&>(task).set_avg_waittime_us(avg_waittime_us);
3159  };
3160 
3161  void track_runtime(const transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3162  std::int64_t avg_runtime_us;
3163  {
3164  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3165  auto& track = tracks_[&task];
3166  if (!track.running) {
3167  return;
3168  }
3169  track.running = false;
3170  track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3171  ++track.runcount;
3172  avg_runtime_us = static_cast<std::int64_t>(track.runtime / track.runcount);
3173  }
3174  const_cast<transwarp::itask&>(task).set_avg_runtime_us(avg_runtime_us);
3175  }
3176 
3177  struct track {
3178  bool running = false;
3179  std::chrono::time_point<std::chrono::steady_clock> startidle;
3180  std::chrono::time_point<std::chrono::steady_clock> startwait;
3181  std::chrono::time_point<std::chrono::steady_clock> startrun;
3182  std::chrono::microseconds::rep idletime = 0;
3183  std::chrono::microseconds::rep idlecount = 0;
3184  std::chrono::microseconds::rep waittime = 0;
3185  std::chrono::microseconds::rep waitcount = 0;
3186  std::chrono::microseconds::rep runtime = 0;
3187  std::chrono::microseconds::rep runcount = 0;
3188  };
3189 
3190  transwarp::detail::spinlock spinlock_; // protecting tracks_
3191  std::unordered_map<const transwarp::itask*, track> tracks_;
3192 };
3193 
3194 
3195 } // transwarp
void set_avg_idletime_us(std::int64_t idletime) noexceptoverride
Assigns the given idletime.
Definition: transwarp.h:2105
constexpr transwarp::wait_any_type wait_any
The wait_any task tag.
Definition: transwarp.h:118
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2606
The executor interface used to perform custom task execution.
Definition: transwarp.h:136
transwarp::task_type type() const noexceptoverride
The task&#39;s type.
Definition: transwarp.h:1655
std::remove_const_t< std::remove_reference_t< T >> decay_t
Removes reference and const from a type.
Definition: transwarp.h:357
Exception thrown when a task was destroyed prematurely.
Definition: transwarp.h:66
Adds a new listener to the given task.
Definition: transwarp.h:1072
A simple thread pool used to execute tasks in parallel.
Definition: transwarp.h:503
void check_listener(const std::shared_ptr< transwarp::listener > &listener) const
Check for non-null listener pointer.
Definition: transwarp.h:2195
std::size_t idle_count() const
Returns the number of idle tasks in the pool.
Definition: transwarp.h:2995
void cancel_all(bool) noexceptoverride
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2723
void reclaim()
Reclaims finished tasks by marking them as idle again.
Definition: transwarp.h:3029
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:1842
void operator()(const transwarp::itask &task) const
Definition: transwarp.h:928
std::int64_t priority() const noexceptoverride
Returns the task priority.
Definition: transwarp.h:2522
Removes a listener from the given task.
Definition: transwarp.h:1099
void set_value(const transwarp::decay_t< result_type > &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2234
void remove_listeners(transwarp::event_type) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2643
void set_type(transwarp::task_type type) noexceptoverride
Assigns the given type.
Definition: transwarp.h:2095
The consume type. Used for tag dispatch.
Definition: transwarp.h:105
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1169
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority.
Definition: transwarp.h:2571
A task pool that allows running multiple instances of the same task in parallel.
Definition: transwarp.h:2898
std::size_t id() const noexceptoverride
The task&#39;s id.
Definition: transwarp.h:1645
void schedule_all(transwarp::executor &) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2667
void finalize() override
Can be called to explicitly finalize this task making this task the terminal task in the graph...
Definition: transwarp.h:1629
Result run_task(std::size_t task_id, const std::weak_ptr< Task > &task, Args &&...args)
Runs the task with the given arguments, hence, invoking the task&#39;s functor.
Definition: transwarp.h:630
Removes the executor from the given task.
Definition: transwarp.h:1012
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:1752
const std::any & custom_data() const noexceptoverride
The custom task data (may not hold a value)
Definition: transwarp.h:1675
auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op)
A function similar to std::transform but returning a transwarp task for deferred, possibly asynchrono...
Definition: transwarp.h:2869
virtual std::string name() const =0
Returns the name of the executor.
void raise_event(transwarp::event_type event) const
Raises the given event to all listeners.
Definition: transwarp.h:2188
void set_custom_data(std::any custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2587
virtual void execute(const std::function< void()> &functor, const transwarp::itask &task)=0
Runs a task which is wrapped by the given functor. The functor only captures one shared pointer and c...
The task has no parents.
Just after a task was canceled (handle_event called on thread that task is run on) ...
TaskType task_type
The task type.
Definition: transwarp.h:2319
auto for_each(InputIt first, InputIt last, UnaryOperation unary_op)
A function similar to std::for_each but returning a transwarp task for deferred, possibly asynchronou...
Definition: transwarp.h:2838
A callable to run a task given its parents.
Definition: transwarp.h:1379
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task.
Definition: transwarp.h:2686
void schedule_all(bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1937
Generates edges.
Definition: transwarp.h:953
const std::optional< std::string > & name() const noexcept
The optional task name.
Definition: transwarp.h:1660
Adds a new listener per event type to the given task.
Definition: transwarp.h:1085
void ensure_task_was_scheduled() const
Checks if the task was scheduled and throws transwarp::control_error if it&#39;s not. ...
Definition: transwarp.h:2072
Sets level of a task.
Definition: transwarp.h:924
void reset()
Resets all timing information.
Definition: transwarp.h:3130
std::size_t busy_count() const
Returns the number of busy tasks in the pool.
Definition: transwarp.h:3001
std::size_t size() const
Returns the current total size of the pool (sum of idle and busy tasks)
Definition: transwarp.h:2980
void visit_all(Visitor &visitor)
Visits all tasks.
Definition: transwarp.h:2171
Definition: transwarp.h:1349
void handle_event(transwarp::event_type event, const transwarp::itask &task) override
Performs the actual timing and populates the task&#39;s timing members.
Definition: transwarp.h:3089
void set_executor(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs.
Definition: transwarp.h:2552
constexpr no_op_functor no_op
An object to use in places where a no-op functor is required.
Definition: transwarp.h:1554
void set_executor_all(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to all tasks which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1711
bool was_scheduled() const noexceptoverride
Returns whether the task was scheduled and not reset afterwards. This means that the underlying futur...
Definition: transwarp.h:1961
void cancel(bool) noexceptoverride
No-op because a value task never runs.
Definition: transwarp.h:2720
Just before a task is scheduled (handle_event called on thread of caller to schedule()) ...
void schedule_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2664
void schedule_all(transwarp::executor &executor, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1946
std::shared_ptr< transwarp::executor > executor() const noexceptoverride
The task&#39;s executor (may be null)
Definition: transwarp.h:1665
void add_listener_all(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2622
Assigns a priority to the given task.
Definition: transwarp.h:1020
Result call(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Calls the functor of the given task with the results from the tuple of parents. Throws transwarp::tas...
Definition: transwarp.h:883
std::shared_ptr< transwarp::task< ResultType > > wait_for_next_task(bool maybe_resize=true)
Just like next_task() but waits for a task to become available. The returned graph will always be a v...
Definition: transwarp.h:2970
void push(T &&value)
Pushes a new value onto the end of the buffer. If that exceeds the capacity of the buffer then the ol...
Definition: transwarp.h:1450
void schedule(transwarp::executor &executor) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1903
std::size_t id() const noexceptoverride
The task&#39;s id.
Definition: transwarp.h:2497
bool canceled() const noexceptoverride
Value tasks cannot be canceled.
Definition: transwarp.h:2532
Assigns an executor to the given task.
Definition: transwarp.h:1000
std::size_t capacity() const
Returns the capacity of the buffer.
Definition: transwarp.h:1470
A simple circular buffer (FIFO). ValueType must support default construction. The buffer lets you pus...
Definition: transwarp.h:1424
void finalize() override
Nothing to be done to finalize a value task.
Definition: transwarp.h:2494
void cancel_all(bool enabled) noexceptoverride
If enabled then all pending tasks in the graph are canceled which will throw transwarp::task_canceled...
Definition: transwarp.h:2009
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:1777
std::tuple< std::shared_future< ParentResults >...> get_futures(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &input)
Returns the futures from the given tuple of tasks.
Definition: transwarp.h:610
auto make_task(TaskType, Functor &&functor, std::shared_ptr< Parents >...parents)
A factory function to create a new task.
Definition: transwarp.h:2812
void execute(const std::function< void()> &functor, const transwarp::itask &) override
Pushes the functor into the thread pool for asynchronous execution.
Definition: transwarp.h:1601
typename transwarp::detail::functor_result< TaskType, Functor, ParentResults...>::type functor_result_t
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1256
void remove_listeners_all(transwarp::event_type) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2649
transwarp::task_type type() const noexceptoverride
The task&#39;s type.
Definition: transwarp.h:2507
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:2020
void schedule_all() override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1920
void set_executor(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to this task which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1701
void set_custom_data(std::any custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1760
void unvisit() noexceptoverride
Traverses through each task and marks them as not visited.
Definition: transwarp.h:2162
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:1849
void cancel_all_but_one(const std::shared_ptr< transwarp::task< OneResult >> &one, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Cancels all tasks but one.
Definition: transwarp.h:701
The task class.
Definition: transwarp.h:387
Definition: transwarp.h:1529
A functor not doing nothing.
Definition: transwarp.h:1549
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2581
std::size_t level() const noexceptoverride
The task&#39;s level.
Definition: transwarp.h:1650
void reset() override
Resets this task.
Definition: transwarp.h:1985
const std::any & custom_data() const noexceptoverride
The custom task data (may not hold a value)
Definition: transwarp.h:2527
Just before a task&#39;s functor is invoked (handle_event called on thread that task is run on) ...
const transwarp::itask & transwarp_task() const noexcept
The associated task.
Definition: transwarp.h:478
The task&#39;s functor accepts the first parent future that becomes ready.
The consume_any type. Used for tag dispatch.
Definition: transwarp.h:109
std::size_t level() const noexceptoverride
The task&#39;s level.
Definition: transwarp.h:2502
bool was_scheduled() const noexceptoverride
Returns true because a value task is scheduled once on construction.
Definition: transwarp.h:2691
Unvisits the given task.
Definition: transwarp.h:1160
void remove_listeners_all() override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2646
The accept type. Used for tag dispatch.
Definition: transwarp.h:97
void add_listener_all(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2625
std::int64_t avg_waittime_us() const noexceptoverride
Returns the average waittime in microseconds (-1 if never set)
Definition: transwarp.h:1690
std::vector< transwarp::itask * > parents() const override
Returns the task&#39;s parents (may be empty)
Definition: transwarp.h:2015
constexpr transwarp::accept_type accept
The accept task tag.
Definition: transwarp.h:98
std::size_t maximum_size() const
Returns the maximum size of the pool.
Definition: transwarp.h:2990
virtual void handle_event(transwarp::event_type event, const transwarp::itask &task)=0
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions. The lifetime of the task reference is not guaranteed beyond the duration of handle_event, and listeners must not retain a copy of the task.
void remove_listener_all(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2637
void remove_listeners() override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2640
bool has_result() const noexceptoverride
Returns whether this task contains a result.
Definition: transwarp.h:1980
Removes all listeners from the given task.
Definition: transwarp.h:1126
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority (e.g. a custom executor)
Definition: transwarp.h:1739
The listener interface to listen to events raised by tasks.
Definition: transwarp.h:164
void schedule_impl(bool reset, transwarp::executor *executor=nullptr) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2123
Exception thrown when a task is canceled.
Definition: transwarp.h:57
TaskType task_type
The task type.
Definition: transwarp.h:2366
void wait() const override
Waits for the task to complete. Should only be called if was_scheduled() is true, throws transwarp::c...
Definition: transwarp.h:1967
void set_avg_runtime_us(std::int64_t runtime) noexceptoverride
Assigns the given runtime.
Definition: transwarp.h:2115
void add_listener(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only.
Definition: transwarp.h:1804
std::shared_future< ResultType > make_future_with_value(Value &&value)
Returns a ready future with the given value as its state.
Definition: transwarp.h:1270
void schedule() override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1886
void add_listener(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2616
void add_listener(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types.
Definition: transwarp.h:1795
void ensure_task_not_running() const
Checks if the task is currently running and throws transwarp::control_error if it is...
Definition: transwarp.h:2065
Just before a task starts running (handle_event called on thread that task is run on) ...
Removes custom data from the given task.
Definition: transwarp.h:1052
void schedule_all(transwarp::executor &, bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2673
const std::optional< std::string > & name() const noexcept
The optional task name.
Definition: transwarp.h:2512
Resets the given task.
Definition: transwarp.h:980
void execute(const std::function< void()> &functor, const transwarp::itask &) override
Runs the functor on the current thread.
Definition: transwarp.h:1575
auto then(TaskType_, Functor_ &&functor)
Creates a continuation to this task.
Definition: transwarp.h:2483
void set_custom_data_all(std::any custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1770
const transwarp::itask & child() const noexcept
Returns the child task.
Definition: transwarp.h:195
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:1877
bool full() const
Returns whether the buffer is full.
Definition: transwarp.h:1486
A base class for a user-defined functor that needs access to the associated task or a cancel point to...
Definition: transwarp.h:470
constexpr transwarp::consume_any_type consume_any
The consume_any task tag.
Definition: transwarp.h:110
Removes a listener per event type from the given task.
Definition: transwarp.h:1112
std::int64_t avg_runtime_us() const noexceptoverride
Returns -1 as value tasks don&#39;t run.
Definition: transwarp.h:2547
std::int64_t priority() const noexceptoverride
The task priority (defaults to 0)
Definition: transwarp.h:1670
std::int64_t avg_waittime_us() const noexceptoverride
Returns -1 as value tasks don&#39;t run.
Definition: transwarp.h:2542
Exception thrown when a task is used in unintended ways.
Definition: transwarp.h:84
Resets the priority of the given task.
Definition: transwarp.h:1032
void assign_task_if(Functor &functor, const transwarp::itask &task) noexcept
Assigns the task to the given functor if the functor is a subclass of transwarp::functor.
Definition: transwarp.h:1261
bool canceled() const noexceptoverride
Returns whether the associated task is canceled.
Definition: transwarp.h:1680
void schedule() override
No-op because a value task never runs.
Definition: transwarp.h:2652
std::size_t size() const
Returns the number of populated values of the buffer. Its maximum value equals the capacity of the bu...
Definition: transwarp.h:1476
void set_value(transwarp::decay_t< result_type > &&value) override
Assigns a value to this task.
Definition: transwarp.h:2681
std::vector< transwarp::edge > edges() override
Returns all edges in the graph. This is mainly for visualizing the tasks and their interdependencies...
Definition: transwarp.h:2028
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:1811
void add_listener(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2619
void wait() const override
No-op because a value task never runs.
Definition: transwarp.h:2696
void set_value(transwarp::decay_t< result_type > &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2284
A task proxy.
Definition: transwarp.h:2224
void transwarp_cancel_point() const
If the associated task is canceled then this will throw transwarp::task_canceled which will stop the ...
Definition: transwarp.h:484
The base task class that contains the functionality that can be used with all result types (void and ...
Definition: transwarp.h:1616
Definition: transwarp.h:596
std::shared_ptr< task_impl > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:2392
The task&#39;s functor consumes all parent results.
Visits the given task using the visitor given in the constructor.
Definition: transwarp.h:1148
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:1870
Just after a task has finished running (handle_event called on thread that task is run on) ...
void remove_listener(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2628
Base class for exceptions.
Definition: transwarp.h:48
typename transwarp::detail::parents< ParentResults...>::type parents_t
Determines the type of the parents.
Definition: transwarp.h:1345
constexpr transwarp::consume_type consume
The consume task tag.
Definition: transwarp.h:106
void schedule(transwarp::executor &, bool) override
No-op because a value task never runs.
Definition: transwarp.h:2661
A value task that stores a single value and doesn&#39;t require scheduling. Value tasks should be created...
Definition: transwarp.h:2453
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:2576
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:1818
void remove_listeners(transwarp::event_type event) override
Removes all listeners for the given event type.
Definition: transwarp.h:1864
void remove_listener(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types.
Definition: transwarp.h:1825
void set_value(transwarp::decay_t< result_type > &&value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2242
Determines the type of the parents.
Definition: transwarp.h:1297
Parent wait_for_any(const std::shared_ptr< transwarp::task< ParentResults >> &...parents)
Waits for the first parent to finish.
Definition: transwarp.h:675
task_impl(F &&functor, std::shared_ptr< transwarp::task< ParentResults >>...parents)
A task is defined functor and parent tasks. Note: Don&#39;t use this constructor directly, use transwarp::make_task.
Definition: transwarp.h:2374
void reset_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2717
void remove_listener(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only.
Definition: transwarp.h:1834
Assigns custom data to the given task.
Definition: transwarp.h:1040
The task&#39;s functor takes no arguments but waits for all parents to finish.
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task. Scheduling will have no effect after an exception has been set...
Definition: transwarp.h:1953
Definition: transwarp.h:732
void remove_listeners() override
Removes all listeners.
Definition: transwarp.h:1856
std::shared_ptr< transwarp::task< ResultType > > next_task(bool maybe_resize=true)
Returns the next idle task. If there are no idle tasks then it will attempt to double the pool size...
Definition: transwarp.h:2938
void call_with_each(const Functor &f, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &t)
Calls the functor with every element in the tuple.
Definition: transwarp.h:901
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:1746
auto make_value_task(Value &&value)
A factory function to create a new value task.
Definition: transwarp.h:2828
transwarp::detail::functor_result_t< TaskType, Functor, ParentResults...> result_type
The result type of this task.
Definition: transwarp.h:2369
An edge between two tasks.
Definition: transwarp.h:177
A timer that tracks the average idle, wait, and run time of each task it listens to.
Definition: transwarp.h:3078
void schedule(transwarp::executor &) override
No-op because a value task never runs.
Definition: transwarp.h:2655
void reset_all() override
Resets all tasks in the graph.
Definition: transwarp.h:1993
circular_buffer(std::size_t capacity)
Constructs a circular buffer with a given fixed capacity.
Definition: transwarp.h:1433
Definition: transwarp.h:723
ResultType result_type
The result type of this task.
Definition: transwarp.h:2460
Executor for parallel execution. Uses a simple thread pool.
Definition: transwarp.h:1582
The wait_any type. Used for tag dispatch.
Definition: transwarp.h:117
void set_custom_data_all(std::any custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2596
task_pool(std::shared_ptr< transwarp::task< ResultType >> task)
Constructs a task pool with reasonable defaults for minimum and maximum.
Definition: transwarp.h:2924
void schedule(bool reset) override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1895
std::string to_string(const transwarp::task_type &type)
String conversion for the task_type enumeration.
Definition: transwarp.h:293
void set_value(const transwarp::decay_t< result_type > &value) override
Assigns a value to this task.
Definition: transwarp.h:2676
void remove_executor() override
Removes the executor from this task.
Definition: transwarp.h:1718
The accept_any type. Used for tag dispatch.
Definition: transwarp.h:101
void set_id(std::size_t id) noexceptoverride
Assigns the given id.
Definition: transwarp.h:2085
void set_name(std::optional< std::string > name) noexceptoverride
Assigns the given name.
Definition: transwarp.h:2100
bool is_ready() const override
Returns true because a value task is always ready.
Definition: transwarp.h:2699
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1596
constexpr transwarp::wait_type wait
The wait task tag.
Definition: transwarp.h:114
The root type. Used for tag dispatch.
Definition: transwarp.h:93
The task&#39;s functor consumes the first parent result that becomes ready.
std::shared_ptr< TaskType > clone_task(std::unordered_map< std::shared_ptr< transwarp::itask >, std::shared_ptr< transwarp::itask >> &task_cache, const std::shared_ptr< TaskType > &t)
Clones a task.
Definition: transwarp.h:370
A task representing a piece of work given by functor and parent tasks. By connecting tasks a directed...
Definition: transwarp.h:2363
void remove_listener(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2631
std::shared_ptr< task_impl > clone_cast() const
Clones this task and casts the result to a ptr to task_impl.
Definition: transwarp.h:2405
The wait type. Used for tag dispatch.
Definition: transwarp.h:113
task_pool(std::shared_ptr< transwarp::task< ResultType >> task, std::size_t minimum_size, std::size_t maximum_size)
Constructs a task pool.
Definition: transwarp.h:2902
void cancel(bool enabled) noexceptoverride
If enabled then this task is canceled which will throw transwarp::task_canceled when retrieving the t...
Definition: transwarp.h:2002
Schedules using the given executor.
Definition: transwarp.h:967
void swap(circular_buffer &buffer)
Swaps this buffer with the given buffer.
Definition: transwarp.h:1491
constexpr transwarp::root_type root
The root task tag.
Definition: transwarp.h:94
std::vector< transwarp::itask * > parents() const override
Empty because a value task doesn&#39;t have parents.
Definition: transwarp.h:2726
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:2601
void schedule_all(transwarp::executor &executor) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1928
void set_level(std::size_t level) noexceptoverride
Assigns the given level.
Definition: transwarp.h:2090
const std::shared_future< result_type > & future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:1790
const value_type & front() const
Returns the value at the front of the buffer (the oldest value). This is undefined if the buffer is e...
Definition: transwarp.h:1457
std::int64_t avg_idletime_us() const noexceptoverride
Returns the average idletime in microseconds (-1 if never set)
Definition: transwarp.h:1685
void set_priority(std::int64_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:2565
The task&#39;s functor accepts all parent futures.
Applies final bookkeeping to the task and collects the task.
Definition: transwarp.h:939
void remove_executor_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2561
void schedule(transwarp::executor &executor, bool reset) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1912
std::size_t minimum_size() const
Returns the minimum size of the pool.
Definition: transwarp.h:2985
std::result_of_t< decltype(&std::shared_future< T >::get)(std::shared_future< T >)> result_t
Returns the result type of a std::shared_future&lt;T&gt;
Definition: transwarp.h:362
void wait_for_all(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Waits for all parents to finish.
Definition: transwarp.h:645
task_type
The possible task types.
Definition: transwarp.h:36
std::size_t event_index(transwarp::event_type event) const
Returns the index for a given event type.
Definition: transwarp.h:2179
void set_value() override
Assigns a value to this task. Scheduling will have no effect after a call to this. Calling reset() will reset this and re-enable scheduling.
Definition: transwarp.h:2326
std::vector< transwarp::edge > edges() override
Returns empty edges because a value task doesn&#39;t have parents.
Definition: transwarp.h:2736
std::int64_t avg_runtime_us() const noexceptoverride
Returns the average runtime in microseconds (-1 if never set)
Definition: transwarp.h:1695
void pop()
Removes the value at the front of the buffer (the oldest value)
Definition: transwarp.h:1462
void resize(std::size_t new_size)
Resizes the task pool to the given new size if possible.
Definition: transwarp.h:3007
event_type
The task events that can be subscribed to using the listener interface.
Definition: transwarp.h:153
Removes all listeners per event type from the given task.
Definition: transwarp.h:1135
void schedule_all_impl(bool reset_all, transwarp::executor *executor=nullptr)
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2147
std::shared_ptr< value_task > clone_cast() const
Clones this task and casts the result to a ptr to value_task.
Definition: transwarp.h:2489
The task&#39;s functor takes no arguments but waits for the first parent to finish.
Exception thrown when an invalid parameter was passed to a function.
Definition: transwarp.h:75
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:1783
std::int64_t avg_idletime_us() const noexceptoverride
Returns -1 as value tasks don&#39;t run.
Definition: transwarp.h:2537
void reset() override
No-op because a value task never runs.
Definition: transwarp.h:2714
value_task(T &&value)
A value task is defined by a given value. Note: Don&#39;t use this constructor directly, use transwarp::make_value_task.
Definition: transwarp.h:2465
std::shared_future< void > make_ready_future()
Returns a ready future.
Definition: transwarp.h:1277
void set_executor_all(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2555
std::shared_ptr< transwarp::executor > executor() const noexceptoverride
Value tasks don&#39;t have executors as they don&#39;t run.
Definition: transwarp.h:2517
constexpr transwarp::accept_any_type accept_any
The accept_any task tag.
Definition: transwarp.h:102
bool empty() const
Returns whether the buffer is empty.
Definition: transwarp.h:1481
const transwarp::itask & parent() const noexcept
Returns the parent task.
Definition: transwarp.h:190
Cancels or resumes the given task.
Definition: transwarp.h:988
void remove_listener_all(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn&#39;t raise events.
Definition: transwarp.h:2634
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:2731
void set_priority(std::int64_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:1732
static Result work(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Definition: transwarp.h:816
void remove_executor() override
No-op because a value task never runs.
Definition: transwarp.h:2558
void apply_to_each(Functor &&f, Tuple &&t)
Applies the functor to each element in the tuple.
Definition: transwarp.h:590
void schedule(bool) override
No-op because a value task never runs.
Definition: transwarp.h:2658
void set_avg_waittime_us(std::int64_t waittime) noexceptoverride
Assigns the given waittime.
Definition: transwarp.h:2110
void schedule_all(bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2670
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1570
Pushes the given task into the vector of tasks.
Definition: transwarp.h:1060
auto then(TaskType_, Functor_ &&functor)
Creates a continuation to this task.
Definition: transwarp.h:2399
Executor for sequential execution. Runs functors sequentially on the same thread. ...
Definition: transwarp.h:1558
const std::shared_future< result_type > & future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:2611
bool has_result() const noexceptoverride
Returns true because a value task always contains a result.
Definition: transwarp.h:2704
std::shared_future< ResultType > make_future_with_exception(std::exception_ptr exception)
Returns a ready future with the given exception as its state.
Definition: transwarp.h:1285
void remove_executor_all() override
Removes the executor from all tasks.
Definition: transwarp.h:1724
bool is_ready() const override
Returns whether the task has finished processing. Should only be called if was_scheduled() is true...
Definition: transwarp.h:1974
An interface for the task class.
Definition: transwarp.h:208
std::shared_ptr< value_task > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:2476
void visit(const std::function< void(transwarp::itask &)> &visitor) override
Visits each task in a depth-first traversal.
Definition: transwarp.h:2153