transwarp
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
transwarp.h
1 /// @mainpage transwarp is a header-only C++ library for task concurrency
2 /// @details https://github.com/bloomen/transwarp
3 /// @version 2.2.0
4 /// @author Christian Blume, Guan Wang
5 /// @date 2019
6 /// @copyright MIT http://www.opensource.org/licenses/mit-license.php
7 #pragma once
8 #include <algorithm>
9 #include <any>
10 #include <array>
11 #include <atomic>
12 #include <chrono>
13 #include <cstddef>
14 #include <cstdint>
15 #include <functional>
16 #include <future>
17 #include <map>
18 #include <memory>
19 #include <mutex>
20 #include <optional>
21 #include <queue>
22 #include <stdexcept>
23 #include <string>
24 #include <thread>
25 #include <tuple>
26 #include <type_traits>
27 #include <unordered_map>
28 #include <utility>
29 #include <vector>
30 
31 
32 #ifdef TRANSWARP_MINIMUM_TASK_SIZE
33 
34 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
35 #define TRANSWARP_DISABLE_TASK_CUSTOM_DATA
36 #endif
37 
38 #ifndef TRANSWARP_DISABLE_TASK_NAME
39 #define TRANSWARP_DISABLE_TASK_NAME
40 #endif
41 
42 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
43 #define TRANSWARP_DISABLE_TASK_PRIORITY
44 #endif
45 
46 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
47 #define TRANSWARP_DISABLE_TASK_REFCOUNT
48 #endif
49 
50 #ifndef TRANSWARP_DISABLE_TASK_TIME
51 #define TRANSWARP_DISABLE_TASK_TIME
52 #endif
53 
54 #endif
55 
56 
57 /// The transwarp namespace
58 namespace transwarp {
59 
60 
61 /// The possible task types
62 enum class task_type {
63  root, ///< The task has no parents
64  accept, ///< The task's functor accepts all parent futures
65  accept_any, ///< The task's functor accepts the first parent future that becomes ready
66  consume, ///< The task's functor consumes all parent results
67  consume_any, ///< The task's functor consumes the first parent result that becomes ready
68  wait, ///< The task's functor takes no arguments but waits for all parents to finish
69  wait_any, ///< The task's functor takes no arguments but waits for the first parent to finish
70 };
71 
72 
73 /// Base class for exceptions
74 class transwarp_error : public std::runtime_error {
75 public:
76  explicit transwarp_error(const std::string& message)
77  : std::runtime_error{message}
78  {}
79 };
80 
81 
82 /// Exception thrown when a task is canceled
84 public:
85  explicit task_canceled(const std::string& task_repr)
86  : transwarp::transwarp_error{"Task canceled: " + task_repr}
87  {}
88 };
89 
90 
91 /// Exception thrown when a task was destroyed prematurely
93 public:
94  explicit task_destroyed(const std::string& task_repr)
95  : transwarp::transwarp_error{"Task destroyed: " + task_repr}
96  {}
97 };
98 
99 
100 /// Exception thrown when an invalid parameter was passed to a function
102 public:
103  explicit invalid_parameter(const std::string& parameter)
104  : transwarp::transwarp_error{"Invalid parameter: " + parameter}
105  {}
106 };
107 
108 
109 /// Exception thrown when a task is used in unintended ways
111 public:
112  explicit control_error(const std::string& message)
113  : transwarp::transwarp_error{"Control error: " + message}
114  {}
115 };
116 
117 
118 /// The root type. Used for tag dispatch
119 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
120 constexpr transwarp::root_type root{}; ///< The root task tag
121 
122 /// The accept type. Used for tag dispatch
123 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
124 constexpr transwarp::accept_type accept{}; ///< The accept task tag
125 
126 /// The accept_any type. Used for tag dispatch
127 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
128 constexpr transwarp::accept_any_type accept_any{}; ///< The accept_any task tag
129 
130 /// The consume type. Used for tag dispatch
131 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
132 constexpr transwarp::consume_type consume{}; ///< The consume task tag
133 
134 /// The consume_any type. Used for tag dispatch
135 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
136 constexpr transwarp::consume_any_type consume_any{}; ///< The consume_any task tag
137 
138 /// The wait type. Used for tag dispatch
139 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
140 constexpr transwarp::wait_type wait{}; ///< The wait task tag
141 
142 /// The wait_any type. Used for tag dispatch
143 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
144 constexpr transwarp::wait_any_type wait_any{}; ///< The wait_any task tag
145 
146 
147 class itask;
148 
149 
150 /// Detail namespace for internal functionality only
151 namespace detail {
152 
153 struct visit_visitor;
154 struct unvisit_visitor;
155 struct final_visitor;
156 struct schedule_visitor;
157 struct parent_visitor;
159 
160 } // detail
161 
162 
163 /// The executor interface used to perform custom task execution
164 class executor {
165 public:
166  virtual ~executor() = default;
167 
168  /// Returns the name of the executor
169  virtual std::string name() const = 0;
170 
171  /// Runs a task which is wrapped by the given functor. The functor only
172  /// captures one shared pointer and can hence be copied at low cost.
173  /// task represents the task that the functor belongs to.
174  /// This function is only ever called on the thread of the caller to schedule().
175  /// The implementer needs to ensure that this never throws exceptions
176  virtual void execute(const std::function<void()>& functor, transwarp::itask& task) = 0;
177 };
178 
179 
180 /// The task events that can be subscribed to using the listener interface
181 enum class event_type {
182  before_scheduled, ///< Just before a task is scheduled (handle_event called on thread of caller to schedule())
183  after_future_changed, ///< Just after the task's future was changed (handle_event called on thread that changed the task's future)
184  before_started, ///< Just before a task starts running (handle_event called on thread that task is run on)
185  before_invoked, ///< Just before a task's functor is invoked (handle_event called on thread that task is run on)
186  after_finished, ///< Just after a task has finished running (handle_event called on thread that task is run on)
187  after_canceled, ///< Just after a task was canceled (handle_event called on thread that task is run on)
188  after_satisfied, ///< Just after a task has satisfied all its children with results (handle_event called on thread where the last child is satisfied)
189  after_custom_data_set, ///< Just after custom data was assigned (handle_event called on thread that custom data was set on)
190  count,
191 };
192 
193 
194 /// The listener interface to listen to events raised by tasks
195 class listener {
196 public:
197  virtual ~listener() = default;
198 
199  /// This may be called from arbitrary threads depending on the event type (see transwarp::event_type).
200  /// The implementer needs to ensure that this never throws exceptions.
201  virtual void handle_event(transwarp::event_type event, transwarp::itask& task) = 0;
202 };
203 
204 
205 /// An edge between two tasks
206 class edge {
207 public:
209  : parent_{&parent}, child_{&child}
210  {}
211 
212  // default copy/move semantics
213  edge(const edge&) = default;
214  edge& operator=(const edge&) = default;
215  edge(edge&&) = default;
216  edge& operator=(edge&&) = default;
217 
218  /// Returns the parent task
219  const transwarp::itask& parent() const noexcept {
220  return *parent_;
221  }
222 
223  /// Returns the parent task
224  transwarp::itask& parent() noexcept {
225  return *parent_;
226  }
227 
228  /// Returns the child task
229  const transwarp::itask& child() const noexcept {
230  return *child_;
231  }
232 
233  /// Returns the child task
234  transwarp::itask& child() noexcept {
235  return *child_;
236  }
237 
238 private:
239  transwarp::itask* parent_;
240  transwarp::itask* child_;
241 };
242 
243 
244 class timer;
245 class releaser;
246 
247 /// An interface for the task class
248 class itask : public std::enable_shared_from_this<itask> {
249 public:
250  virtual ~itask() = default;
251 
252  virtual void finalize() = 0;
253  virtual std::size_t id() const noexcept = 0;
254  virtual std::size_t level() const noexcept = 0;
255  virtual transwarp::task_type type() const noexcept = 0;
256  virtual const std::optional<std::string>& name() const noexcept = 0;
257  virtual std::shared_ptr<transwarp::executor> executor() const noexcept = 0;
258  virtual std::int64_t priority() const noexcept = 0;
259  virtual const std::any& custom_data() const noexcept = 0;
260  virtual bool canceled() const noexcept = 0;
261  virtual std::int64_t avg_idletime_us() const noexcept = 0;
262  virtual std::int64_t avg_waittime_us() const noexcept = 0;
263  virtual std::int64_t avg_runtime_us() const noexcept = 0;
264  virtual void set_executor(std::shared_ptr<transwarp::executor> executor) = 0;
265  virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
266  virtual void remove_executor() = 0;
267  virtual void remove_executor_all() = 0;
268  virtual void set_priority(std::int64_t priority) = 0;
269  virtual void set_priority_all(std::int64_t priority) = 0;
270  virtual void reset_priority() = 0;
271  virtual void reset_priority_all() = 0;
272  virtual void set_custom_data(std::any custom_data) = 0;
273  virtual void set_custom_data_all(std::any custom_data) = 0;
274  virtual void remove_custom_data() = 0;
275  virtual void remove_custom_data_all() = 0;
276  virtual void add_listener(std::shared_ptr<transwarp::listener> listener) = 0;
277  virtual void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
278  virtual void add_listener_all(std::shared_ptr<transwarp::listener> listener) = 0;
279  virtual void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
280  virtual void remove_listener(const std::shared_ptr<transwarp::listener>& listener) = 0;
281  virtual void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
282  virtual void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) = 0;
283  virtual void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
284  virtual void remove_listeners() = 0;
285  virtual void remove_listeners(transwarp::event_type event) = 0;
286  virtual void remove_listeners_all() = 0;
287  virtual void remove_listeners_all(transwarp::event_type event) = 0;
288  virtual void schedule() = 0;
289  virtual void schedule(transwarp::executor& executor) = 0;
290  virtual void schedule(bool reset) = 0;
291  virtual void schedule(transwarp::executor& executor, bool reset) = 0;
292  virtual void schedule_all() = 0;
293  virtual void schedule_all(transwarp::executor& executor) = 0;
294  virtual void schedule_all(bool reset_all) = 0;
295  virtual void schedule_all(transwarp::executor& executor, bool reset_all) = 0;
296  virtual void set_exception(std::exception_ptr exception) = 0;
297  virtual bool was_scheduled() const noexcept = 0;
298  virtual void wait() const = 0;
299  virtual bool is_ready() const = 0;
300  virtual bool has_result() const = 0;
301  virtual void reset() = 0;
302  virtual void reset_all() = 0;
303  virtual void cancel(bool enabled) noexcept = 0;
304  virtual void cancel_all(bool enabled) noexcept = 0;
305  virtual std::vector<itask*> parents() const = 0;
306  virtual const std::vector<itask*>& tasks() = 0;
307  virtual std::vector<transwarp::edge> edges() = 0;
308 
309 protected:
310  virtual void schedule_impl(bool reset, transwarp::executor* executor=nullptr) = 0;
311 
312 private:
313  friend struct transwarp::detail::visit_visitor;
315  friend struct transwarp::detail::final_visitor;
317  friend struct transwarp::detail::parent_visitor;
318  friend class transwarp::timer;
319  friend class transwarp::releaser;
321 
322  virtual void visit(const std::function<void(itask&)>& visitor) = 0;
323  virtual void unvisit() noexcept = 0;
324  virtual void set_id(std::size_t id) noexcept = 0;
325  virtual void set_level(std::size_t level) noexcept = 0;
326  virtual void set_type(transwarp::task_type type) noexcept = 0;
327  virtual void set_name(std::optional<std::string> name) noexcept = 0;
328  virtual void set_avg_idletime_us(std::int64_t idletime) noexcept = 0;
329  virtual void set_avg_waittime_us(std::int64_t waittime) noexcept = 0;
330  virtual void set_avg_runtime_us(std::int64_t runtime) noexcept = 0;
331  virtual void increment_childcount() noexcept = 0;
332  virtual void decrement_refcount() = 0;
333  virtual void reset_future() = 0;
334 };
335 
336 
337 /// String conversion for the task_type enumeration
338 inline
339 std::string to_string(const transwarp::task_type& type) {
340  switch (type) {
341  case transwarp::task_type::root: return "root";
342  case transwarp::task_type::accept: return "accept";
343  case transwarp::task_type::accept_any: return "accept_any";
344  case transwarp::task_type::consume: return "consume";
345  case transwarp::task_type::consume_any: return "consume_any";
346  case transwarp::task_type::wait: return "wait";
347  case transwarp::task_type::wait_any: return "wait_any";
348  }
349  throw transwarp::invalid_parameter{"task type"};
350 }
351 
352 
353 /// String conversion for the itask class
354 inline
355 std::string to_string(const transwarp::itask& task, std::string_view separator="\n") {
356  std::string s;
357  s += '"';
358  const std::optional<std::string>& name = task.name();
359  if (name) {
360  s += std::string{"<"} + *name + std::string{">"} + separator.data();
361  }
362  s += transwarp::to_string(task.type());
363  s += std::string{" id="} + std::to_string(task.id());
364  s += std::string{" lev="} + std::to_string(task.level());
365  const std::shared_ptr<transwarp::executor> exec = task.executor();
366  if (exec) {
367  s += separator.data() + std::string{"<"} + exec->name() + std::string{">"};
368  }
369  const std::int64_t avg_idletime_us = task.avg_idletime_us();
370  if (avg_idletime_us >= 0) {
371  s += separator.data() + std::string{"avg-idle-us="} + std::to_string(avg_idletime_us);
372  }
373  const std::int64_t avg_waittime_us = task.avg_waittime_us();
374  if (avg_waittime_us >= 0) {
375  s += separator.data() + std::string{"avg-wait-us="} + std::to_string(avg_waittime_us);
376  }
377  const std::int64_t avg_runtime_us = task.avg_runtime_us();
378  if (avg_runtime_us >= 0) {
379  s += separator.data() + std::string{"avg-run-us="} + std::to_string(avg_runtime_us);
380  }
381  return s + '"';
382 }
383 
384 
385 /// String conversion for the edge class
386 inline
387 std::string to_string(const transwarp::edge& edge, std::string_view separator="\n") {
388  return transwarp::to_string(edge.parent(), separator) + std::string{" -> "} + transwarp::to_string(edge.child(), separator);
389 }
390 
391 
392 /// Creates a dot-style string from the given edges
393 inline
394 std::string to_string(const std::vector<transwarp::edge>& edges, std::string_view separator="\n") {
395  std::string dot = std::string{"digraph {"} + separator.data();
396  for (const transwarp::edge& edge : edges) {
397  dot += transwarp::to_string(edge, separator) + separator.data();
398  }
399  dot += std::string{"}"};
400  return dot;
401 }
402 
403 
404 /// Removes reference and const from a type
405 template<typename T>
406 using decay_t = std::remove_const_t<std::remove_reference_t<T>>;
407 
408 
409 /// Returns the result type of a std::shared_future<T>
410 template<typename T>
411 using result_t = std::result_of_t<decltype(&std::shared_future<T>::get)(std::shared_future<T>)>;
412 
413 
414 /// Detail namespace for internal functionality only
415 namespace detail {
416 
417 /// Clones a task
418 template<typename TaskType>
419 std::shared_ptr<TaskType> clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<TaskType>& t) {
420  const auto original_task = std::static_pointer_cast<transwarp::itask>(t);
421  const auto task_cache_it = task_cache.find(original_task);
422  if (task_cache_it != task_cache.cend()) {
423  return std::static_pointer_cast<TaskType>(task_cache_it->second);
424  } else {
425  auto cloned_task = t->clone_impl(task_cache);
426  task_cache[original_task] = cloned_task;
427  return cloned_task;
428  }
429 }
430 
431 } // detail
432 
433 
434 /// The task class
435 template<typename ResultType>
436 class task : public transwarp::itask {
437 public:
438  using result_type = ResultType;
439 
440  virtual ~task() = default;
441 
442  std::shared_ptr<task> clone() const {
443  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
444  return clone_impl(task_cache);
445  }
446 
447  virtual void set_value(const transwarp::decay_t<result_type>& value) = 0;
448  virtual void set_value(transwarp::decay_t<result_type>&& value) = 0;
449  virtual std::shared_future<result_type> future() const noexcept = 0;
450  virtual transwarp::result_t<result_type> get() const = 0;
451 
452 private:
453  template<typename T>
454  friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
455 
456  virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
457 };
458 
459 /// The task class (reference result type)
460 template<typename ResultType>
461 class task<ResultType&> : public transwarp::itask {
462 public:
463  using result_type = ResultType&;
464 
465  virtual ~task() = default;
466 
467  std::shared_ptr<task> clone() const {
468  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
469  return clone_impl(task_cache);
470  }
471 
472  virtual void set_value(transwarp::decay_t<result_type>& value) = 0;
473  virtual std::shared_future<result_type> future() const noexcept = 0;
474  virtual transwarp::result_t<result_type> get() const = 0;
475 
476 private:
477  template<typename T>
478  friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
479 
480  virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
481 };
482 
483 /// The task class (void result type)
484 template<>
485 class task<void> : public transwarp::itask {
486 public:
487  using result_type = void;
488 
489  virtual ~task() = default;
490 
491  std::shared_ptr<task> clone() const {
492  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
493  return clone_impl(task_cache);
494  }
495 
496  virtual void set_value() = 0;
497  virtual std::shared_future<result_type> future() const noexcept = 0;
498  virtual result_type get() const = 0;
499 
500 private:
501  template<typename T>
502  friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
503 
504  virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
505 };
506 
507 
508 /// Detail namespace for internal functionality only
509 namespace detail {
510 
511 template<typename F>
512 void assign_task_if(F&, transwarp::itask&) noexcept;
513 
514 } // detail
515 
516 
517 /// A base class for a user-defined functor that needs access to the associated
518 /// task or a cancel point to stop a task while it's running
519 class functor {
520 public:
521 
522  virtual ~functor() = default;
523 
524 protected:
525 
526  /// The associated task (only to be called after the task was constructed)
527  const transwarp::itask& transwarp_task() const noexcept {
528  return *transwarp_task_;
529  }
530 
531  /// The associated task (only to be called after the task was constructed)
533  return *transwarp_task_;
534  }
535 
536  /// If the associated task is canceled then this will throw transwarp::task_canceled
537  /// which will stop the task while it's running (only to be called after the task was constructed)
538  void transwarp_cancel_point() const {
539  if (transwarp_task_->canceled()) {
540  throw transwarp::task_canceled{std::to_string(transwarp_task_->id())};
541  }
542  }
543 
544 private:
545  template<typename F>
546  friend void transwarp::detail::assign_task_if(F&, transwarp::itask&) noexcept;
547 
548  transwarp::itask* transwarp_task_{};
549 };
550 
551 
552 /// Detail namespace for internal functionality only
553 namespace detail {
554 
555 
556 /// A simple thread pool used to execute tasks in parallel
557 class thread_pool {
558 public:
559 
560  explicit thread_pool(const std::size_t n_threads,
561  std::function<void(std::size_t thread_index)> on_thread_started = nullptr)
562  : on_thread_started_{std::move(on_thread_started)}
563  {
564  if (n_threads == 0) {
565  throw transwarp::invalid_parameter{"number of threads"};
566  }
567  for (std::size_t i = 0; i < n_threads; ++i) {
568  {
569  std::lock_guard<std::mutex> lock{mutex_};
570  ups_.push_back(false);
571  }
572  std::thread thread;
573  try {
574  thread = std::thread(&thread_pool::worker, this, i);
575  } catch (...) {
576  shutdown();
577  throw;
578  }
579  try {
580  threads_.push_back(std::move(thread));
581  } catch (...) {
582  shutdown();
583  thread.join();
584  throw;
585  }
586  }
587  for (;;) {
588  {
589  std::lock_guard<std::mutex> lock{mutex_};
590  if (std::all_of(ups_.begin(), ups_.end(), [](const bool x){ return x; })) {
591  break;
592  }
593  }
594  std::this_thread::yield();
595  }
596  }
597 
598  // delete copy/move semantics
599  thread_pool(const thread_pool&) = delete;
600  thread_pool& operator=(const thread_pool&) = delete;
601  thread_pool(thread_pool&&) = delete;
602  thread_pool& operator=(thread_pool&&) = delete;
603 
604  ~thread_pool() {
605  shutdown();
606  }
607 
608  void push(const std::function<void()>& functor) {
609  {
610  std::lock_guard<std::mutex> lock{mutex_};
611  functors_.push(functor);
612  }
613  cond_var_.notify_one();
614  }
615 
616 private:
617 
618  void worker(const std::size_t index) {
619  if (on_thread_started_) {
620  on_thread_started_(index);
621  }
622  for (;;) {
623  std::function<void()> functor;
624  {
625  std::unique_lock<std::mutex> lock{mutex_};
626  if (!ups_[index]) {
627  ups_[index] = true;
628  }
629  cond_var_.wait(lock, [this]{
630  return done_ || !functors_.empty();
631  });
632  if (done_ && functors_.empty()) {
633  break;
634  }
635  functor = functors_.front();
636  functors_.pop();
637  }
638  functor();
639  }
640  }
641 
642  void shutdown() {
643  {
644  std::lock_guard<std::mutex> lock{mutex_};
645  done_ = true;
646  }
647  cond_var_.notify_all();
648  for (std::thread& thread : threads_) {
649  thread.join();
650  }
651  ups_.clear();
652  threads_.clear();
653  }
654 
655  bool done_ = false;
656  std::function<void(std::size_t)> on_thread_started_;
657  std::vector<std::thread> threads_;
658  std::vector<bool> ups_;
659  std::queue<std::function<void()>> functors_;
660  std::condition_variable cond_var_;
661  std::mutex mutex_;
662 };
663 
664 
665 /// Applies the functor to each element in the tuple
666 template<typename Functor, typename Tuple>
667 void apply_to_each(Functor&& f, Tuple&& t) {
668  std::apply([&f](auto&&... arg){(..., f(std::forward<decltype(arg)>(arg)));}, std::forward<Tuple>(t));
669 }
670 
671 
672 template<int offset, typename... ParentResults>
674  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
675  std::get<offset>(target) = std::get<offset>(source)->future();
677  }
678 };
679 
680 template<typename... ParentResults>
681 struct assign_futures_impl<-1, ParentResults...> {
682  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&, std::tuple<std::shared_future<ParentResults>...>&) {}
683 };
684 
685 /// Returns the futures from the given tuple of tasks
686 template<typename... ParentResults>
687 std::tuple<std::shared_future<ParentResults>...> get_futures(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& input) {
688  std::tuple<std::shared_future<ParentResults>...> result;
689  assign_futures_impl<static_cast<int>(sizeof...(ParentResults)) - 1, ParentResults...>::work(input, result);
690  return result;
691 }
692 
693 /// Returns the futures from the given vector of tasks
694 template<typename ParentResultType>
695 std::vector<std::shared_future<ParentResultType>> get_futures(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& input) {
696  std::vector<std::shared_future<ParentResultType>> result;
697  result.reserve(input.size());
698  for (const std::shared_ptr<transwarp::task<ParentResultType>>& task : input) {
699  result.emplace_back(task->future());
700  }
701  return result;
702 }
703 
704 
705 /// Runs the task with the given arguments, hence, invoking the task's functor
706 template<typename Result, typename Task, typename... Args>
707 Result run_task(std::size_t task_id, const std::weak_ptr<Task>& task, Args&&... args) {
708  const std::shared_ptr<Task> t = task.lock();
709  if (!t) {
710  throw transwarp::task_destroyed{std::to_string(task_id)};
711  }
712  if (t->canceled()) {
713  throw transwarp::task_canceled{std::to_string(task_id)};
714  }
716  return (*t->functor_)(std::forward<Args>(args)...);
717 }
718 
719 
720 /// Waits for all parents to finish
721 template<typename... ParentResults>
722 void wait_for_all(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
723  transwarp::detail::apply_to_each([](const auto& p){ p->future().wait(); }, parents);
724 }
725 
726 
727 /// Waits for all parents to finish
728 template<typename ParentResultType>
729 void wait_for_all(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
730  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
731  parent->future().wait();
732  }
733 }
734 
735 
736 template<typename Parent>
737 Parent wait_for_any_impl() {
738  return {};
739 }
740 
741 template<typename Parent, typename ParentResult, typename... ParentResults>
742 Parent wait_for_any_impl(const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
743  const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
744  if (status == std::future_status::ready) {
745  return parent;
746  }
747  return transwarp::detail::wait_for_any_impl<Parent>(parents...);
748 }
749 
750 /// Waits for the first parent to finish
751 template<typename Parent, typename... ParentResults>
752 Parent wait_for_any(const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
753  for (;;) {
754  Parent parent = transwarp::detail::wait_for_any_impl<Parent>(parents...);
755  if (parent) {
756  return parent;
757  }
758  }
759 }
760 
761 
762 /// Waits for the first parent to finish
763 template<typename ParentResultType>
764 std::shared_ptr<transwarp::task<ParentResultType>> wait_for_any(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
765  for (;;) {
766  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
767  const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
768  if (status == std::future_status::ready) {
769  return parent;
770  }
771  }
772  }
773 }
774 
775 
776 /// Cancels all tasks but one
777 template<typename OneResult, typename... ParentResults>
778 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
779  auto callable = [&one](const auto& parent) {
780  if (one != parent) {
781  parent->cancel(true);
782  }
783  };
785 }
786 
787 
788 /// Cancels all tasks but one
789 template<typename OneResult, typename ParentResultType>
790 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
791  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
792  if (one != parent) {
793  parent->cancel(true);
794  }
795  }
796 }
797 
798 
799 /// Decrements refcount
800 inline
802  task.decrement_refcount();
803 }
804 
805 
806 /// Decrements the refcount of all parents
807 template<typename... ParentResults>
808 void decrement_refcount(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
809  transwarp::detail::apply_to_each([](const auto& parent) {
810  decrement_refcount(*parent);
811  }, parents);
812 }
813 
814 
815 /// Decrements the refcount of all parents
816 template<typename ParentResultType>
817 void decrement_refcount(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
818  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
819  decrement_refcount(*parent);
820  }
821 }
822 
823 
824 template<typename TaskType, bool done, int total, int... n>
825 struct call_impl {
826  template<typename Result, typename Task, typename... ParentResults>
827  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
828  return call_impl<TaskType, total == 1 + static_cast<int>(sizeof...(n)), total, n..., static_cast<int>(sizeof...(n))>::template
829  work<Result>(task_id, task, parents);
830  }
831 };
832 
833 template<typename TaskType>
835 
836 template<int total, int... n>
837 struct call_impl<transwarp::root_type, true, total, n...> {
838  template<typename Result, typename Task, typename... ParentResults>
839  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&) {
840  return transwarp::detail::run_task<Result>(task_id, task);
841  }
842 };
843 
844 template<>
845 struct call_impl_vector<transwarp::root_type> {
846  template<typename Result, typename Task, typename ParentResultType>
847  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>&) {
848  return transwarp::detail::run_task<Result>(task_id, task);
849  }
850 };
851 
852 template<int total, int... n>
853 struct call_impl<transwarp::accept_type, true, total, n...> {
854  template<typename Result, typename Task, typename... ParentResults>
855  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
857  const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
859  return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures)...);
860  }
861 };
862 
863 template<>
864 struct call_impl_vector<transwarp::accept_type> {
865  template<typename Result, typename Task, typename ParentResultType>
866  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
868  std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
870  return transwarp::detail::run_task<Result>(task_id, task, std::move(futures));
871  }
872 };
873 
874 template<int total, int... n>
875 struct call_impl<transwarp::accept_any_type, true, total, n...> {
876  template<typename Result, typename Task, typename... ParentResults>
877  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
878  using parent_t = std::remove_reference_t<decltype(std::get<0>(parents))>; // Use first type as reference
879  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
881  auto future = parent->future();
883  return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
884  }
885 };
886 
887 template<>
888 struct call_impl_vector<transwarp::accept_any_type> {
889  template<typename Result, typename Task, typename ParentResultType>
890  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
891  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
893  auto future = parent->future();
895  return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
896  }
897 };
898 
899 template<int total, int... n>
900 struct call_impl<transwarp::consume_type, true, total, n...> {
901  template<typename Result, typename Task, typename... ParentResults>
902  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
904  const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
906  return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures).get()...);
907  }
908 };
909 
910 template<>
911 struct call_impl_vector<transwarp::consume_type> {
912  template<typename Result, typename Task, typename ParentResultType>
913  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
915  const std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
917  std::vector<ParentResultType> results;
918  results.reserve(futures.size());
919  for (const std::shared_future<ParentResultType>& future : futures) {
920  results.emplace_back(future.get());
921  }
922  return transwarp::detail::run_task<Result>(task_id, task, std::move(results));
923  }
924 };
925 
926 template<int total, int... n>
927 struct call_impl<transwarp::consume_any_type, true, total, n...> {
928  template<typename Result, typename Task, typename... ParentResults>
929  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
930  using parent_t = std::remove_reference_t<decltype(std::get<0>(parents))>; /// Use first type as reference
931  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
933  const auto future = parent->future();
935  return transwarp::detail::run_task<Result>(task_id, task, future.get());
936  }
937 };
938 
939 template<>
940 struct call_impl_vector<transwarp::consume_any_type> {
941  template<typename Result, typename Task, typename ParentResultType>
942  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
943  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
945  const auto future = parent->future();
947  return transwarp::detail::run_task<Result>(task_id, task, future.get());
948  }
949 };
950 
951 template<int total, int... n>
952 struct call_impl<transwarp::wait_type, true, total, n...> {
953  template<typename Result, typename Task, typename... ParentResults>
954  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
956  const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
958  transwarp::detail::apply_to_each([](const auto& f){ f.get(); }, futures); // Ensures that exceptions are propagated
959  return transwarp::detail::run_task<Result>(task_id, task);
960  }
961 };
962 
963 template<>
964 struct call_impl_vector<transwarp::wait_type> {
965  template<typename Result, typename Task, typename ParentResultType>
966  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
968  const std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
970  for (const std::shared_future<ParentResultType>& future : futures) {
971  future.get(); // Ensures that exceptions are propagated
972  }
973  return transwarp::detail::run_task<Result>(task_id, task);
974  }
975 };
976 
977 template<int total, int... n>
978 struct call_impl<transwarp::wait_any_type, true, total, n...> {
979  template<typename Result, typename Task, typename... ParentResults>
980  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
981  using parent_t = std::remove_reference_t<decltype(std::get<0>(parents))>; // Use first type as reference
982  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
984  const auto future = parent->future();
986  future.get(); // Ensures that exceptions are propagated
987  return transwarp::detail::run_task<Result>(task_id, task);
988  }
989 };
990 
991 template<>
992 struct call_impl_vector<transwarp::wait_any_type> {
993  template<typename Result, typename Task, typename ParentResultType>
994  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
995  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
997  const auto future = parent->future();
999  future.get(); // Ensures that exceptions are propagated
1000  return transwarp::detail::run_task<Result>(task_id, task);
1001  }
1002 };
1003 
1004 /// Calls the functor of the given task with the results from the tuple of parents.
1005 /// Throws transwarp::task_canceled if the task is canceled.
1006 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
1007 template<typename TaskType, typename Result, typename Task, typename... ParentResults>
1008 Result call(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1009  constexpr std::size_t n = std::tuple_size_v<std::tuple<std::shared_future<ParentResults>...>>;
1011  work<Result>(task_id, task, parents);
1012 }
1013 
1014 /// Calls the functor of the given task with the results from the vector of parents.
1015 /// Throws transwarp::task_canceled if the task is canceled.
1016 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
1017 template<typename TaskType, typename Result, typename Task, typename ParentResultType>
1018 Result call(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1020  work<Result>(task_id, task, parents);
1021 }
1022 
1023 
1024 /// Calls the functor with every element in the tuple
1025 template<typename Functor, typename... ParentResults>
1026 void call_with_each(const Functor& f, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& t) {
1027  auto callable = [&f](const auto& task) {
1028  if (!task) {
1029  throw transwarp::invalid_parameter{"task pointer"};
1030  }
1031  f(*task);
1032  };
1033  transwarp::detail::apply_to_each(callable, t);
1034 }
1035 
1036 /// Calls the functor with every element in the vector
1037 template<typename Functor, typename ParentResultType>
1038 void call_with_each(const Functor& f, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& v) {
1039  for (const std::shared_ptr<transwarp::task<ParentResultType>>& ptr : v) {
1040  if (!ptr) {
1041  throw transwarp::invalid_parameter{"task pointer"};
1042  }
1043  f(*ptr);
1044  }
1045 }
1046 
1047 
1048 /// Sets level of a task and increments the child count
1050  explicit parent_visitor(transwarp::itask& task) noexcept
1051  : task_(task) {}
1052 
1053  void operator()(transwarp::itask& task) const {
1054  if (task_.level() <= task.level()) {
1055  // A child's level is always larger than any of its parents' levels
1056  task_.set_level(task.level() + 1);
1057  }
1058  task.increment_childcount();
1059  }
1060 
1061  transwarp::itask& task_;
1062 };
1063 
1064 /// Applies final bookkeeping to the task and collects the task
1066  explicit final_visitor(std::vector<transwarp::itask*>& tasks) noexcept
1067  : tasks_{tasks} {}
1068 
1069  void operator()(transwarp::itask& task) noexcept {
1070  tasks_.push_back(&task);
1071  task.set_id(id_++);
1072  }
1073 
1074  std::vector<transwarp::itask*>& tasks_;
1075  std::size_t id_ = 0;
1076 };
1077 
1078 /// Generates edges
1080  explicit edges_visitor(std::vector<transwarp::edge>& edges) noexcept
1081  : edges_{edges} {}
1082 
1083  void operator()(transwarp::itask& task) {
1084  for (transwarp::itask* parent : task.parents()) {
1085  edges_.emplace_back(*parent, task);
1086  }
1087  }
1088 
1089  std::vector<transwarp::edge>& edges_;
1090 };
1091 
1092 /// Schedules using the given executor
1094  schedule_visitor(bool reset, transwarp::executor* executor) noexcept
1095  : reset_{reset}, executor_{executor} {}
1096 
1097  void operator()(transwarp::itask& task) {
1098  task.schedule_impl(reset_, executor_);
1099  }
1100 
1101  bool reset_;
1102  transwarp::executor* executor_;
1103 };
1104 
1105 /// Resets the given task
1107 
1108  void operator()(transwarp::itask& task) const {
1109  task.reset();
1110  }
1111 };
1112 
1113 /// Cancels or resumes the given task
1115  explicit cancel_visitor(bool enabled) noexcept
1116  : enabled_{enabled} {}
1117 
1118  void operator()(transwarp::itask& task) const noexcept {
1119  task.cancel(enabled_);
1120  }
1121 
1122  bool enabled_;
1123 };
1124 
1125 /// Assigns an executor to the given task
1127  explicit set_executor_visitor(std::shared_ptr<transwarp::executor> executor) noexcept
1128  : executor_{std::move(executor)} {}
1129 
1130  void operator()(transwarp::itask& task) const noexcept {
1131  task.set_executor(executor_);
1132  }
1133 
1134  std::shared_ptr<transwarp::executor> executor_;
1135 };
1136 
1137 /// Removes the executor from the given task
1139 
1140  void operator()(transwarp::itask& task) const noexcept {
1141  task.remove_executor();
1142  }
1143 };
1144 
1145 /// Assigns a priority to the given task
1147  explicit set_priority_visitor(std::int64_t priority) noexcept
1148  : priority_{priority} {}
1149 
1150  void operator()(transwarp::itask& task) const noexcept {
1151  task.set_priority(priority_);
1152  }
1153 
1154  std::int64_t priority_;
1155 };
1156 
1157 /// Resets the priority of the given task
1159 
1160  void operator()(transwarp::itask& task) const noexcept {
1161  task.reset_priority();
1162  }
1163 };
1164 
1165 /// Assigns custom data to the given task
1167  explicit set_custom_data_visitor(std::any custom_data) noexcept
1168  : custom_data_{std::move(custom_data)} {}
1169 
1170  void operator()(transwarp::itask& task) const noexcept {
1171  task.set_custom_data(custom_data_);
1172  }
1173 
1174  std::any custom_data_;
1175 };
1176 
1177 /// Removes custom data from the given task
1179 
1180  void operator()(transwarp::itask& task) const noexcept {
1181  task.remove_custom_data();
1182  }
1183 };
1184 
1185 /// Pushes the given task into the vector of tasks
1187  explicit push_task_visitor(std::vector<transwarp::itask*>& tasks)
1188  : tasks_{tasks} {}
1189 
1190  void operator()(transwarp::itask& task) {
1191  tasks_.push_back(&task);
1192  }
1193 
1194  std::vector<transwarp::itask*>& tasks_;
1195 };
1196 
1197 /// Adds a new listener to the given task
1199  explicit add_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1200  : listener_{std::move(listener)}
1201  {}
1202 
1203  void operator()(transwarp::itask& task) {
1204  task.add_listener(listener_);
1205  }
1206 
1207  std::shared_ptr<transwarp::listener> listener_;
1208 };
1209 
1210 /// Adds a new listener per event type to the given task
1212  add_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1213  : event_{event}, listener_{std::move(listener)}
1214  {}
1215 
1216  void operator()(transwarp::itask& task) {
1217  task.add_listener(event_, listener_);
1218  }
1219 
1220  transwarp::event_type event_;
1221  std::shared_ptr<transwarp::listener> listener_;
1222 };
1223 
1224 /// Removes a listener from the given task
1226  explicit remove_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1227  : listener_{std::move(listener)}
1228  {}
1229 
1230  void operator()(transwarp::itask& task) {
1231  task.remove_listener(listener_);
1232  }
1233 
1234  std::shared_ptr<transwarp::listener> listener_;
1235 };
1236 
1237 /// Removes a listener per event type from the given task
1239  remove_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1240  : event_{event}, listener_{std::move(listener)}
1241  {}
1242 
1243  void operator()(transwarp::itask& task) {
1244  task.remove_listener(event_, listener_);
1245  }
1246 
1247  transwarp::event_type event_;
1248  std::shared_ptr<transwarp::listener> listener_;
1249 };
1250 
1251 /// Removes all listeners from the given task
1253 
1254  void operator()(transwarp::itask& task) {
1255  task.remove_listeners();
1256  }
1257 
1258 };
1259 
1260 /// Removes all listeners per event type from the given task
1263  : event_{event}
1264  {}
1265 
1266  void operator()(transwarp::itask& task) {
1267  task.remove_listeners(event_);
1268  }
1269 
1270  transwarp::event_type event_;
1271 };
1272 
1273 /// Visits the given task using the visitor given in the constructor
1275  explicit visit_visitor(const std::function<void(transwarp::itask&)>& visitor) noexcept
1276  : visitor_{visitor} {}
1277 
1278  void operator()(transwarp::itask& task) const {
1279  task.visit(visitor_);
1280  }
1281 
1282  const std::function<void(transwarp::itask&)>& visitor_;
1283 };
1284 
1285 /// Unvisits the given task
1287 
1288  void operator()(transwarp::itask& task) const noexcept {
1289  task.unvisit();
1290  }
1291 };
1292 
1293 /// Determines the result type of the Functor dispatching on the task type
1294 template<typename TaskType, typename Functor, typename... ParentResults>
1296  static_assert(std::is_same_v<TaskType, transwarp::root_type> ||
1297  std::is_same_v<TaskType, transwarp::accept_type> ||
1298  std::is_same_v<TaskType, transwarp::accept_any_type> ||
1299  std::is_same_v<TaskType, transwarp::consume_type> ||
1300  std::is_same_v<TaskType, transwarp::consume_any_type> ||
1301  std::is_same_v<TaskType, transwarp::wait_type> ||
1302  std::is_same_v<TaskType, transwarp::wait_any_type>,
1303  "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1304 };
1305 
1306 template<typename Functor, typename... ParentResults>
1307 struct functor_result<transwarp::root_type, Functor, ParentResults...> {
1308  static_assert(sizeof...(ParentResults) == 0, "A root task cannot have parent tasks");
1309  using type = decltype(std::declval<Functor>()());
1310 };
1311 
1312 template<typename Functor, typename... ParentResults>
1313 struct functor_result<transwarp::accept_type, Functor, ParentResults...> {
1314  static_assert(sizeof...(ParentResults) > 0, "An accept task must have at least one parent");
1315  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1316 };
1317 
1318 template<typename Functor, typename ParentResultType>
1319 struct functor_result<transwarp::accept_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1320  using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1321 };
1322 
1323 template<typename Functor, typename... ParentResults>
1324 struct functor_result<transwarp::accept_any_type, Functor, ParentResults...> {
1325  static_assert(sizeof...(ParentResults) > 0, "An accept_any task must have at least one parent");
1326  using arg_t = std::tuple_element_t<0, std::tuple<ParentResults...>>; // Using first type as reference
1327  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1328 };
1329 
1330 template<typename Functor, typename ParentResultType>
1331 struct functor_result<transwarp::accept_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1332  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1333 };
1334 
1335 template<typename Functor, typename... ParentResults>
1336 struct functor_result<transwarp::consume_type, Functor, ParentResults...> {
1337  static_assert(sizeof...(ParentResults) > 0, "A consume task must have at least one parent");
1338  using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1339 };
1340 
1341 template<typename Functor, typename ParentResultType>
1342 struct functor_result<transwarp::consume_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1343  using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1344 };
1345 
1346 template<typename Functor, typename... ParentResults>
1347 struct functor_result<transwarp::consume_any_type, Functor, ParentResults...> {
1348  static_assert(sizeof...(ParentResults) > 0, "A consume_any task must have at least one parent");
1349  using arg_t = std::tuple_element_t<0, std::tuple<ParentResults...>>; // Using first type as reference
1350  using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1351 };
1352 
1353 template<typename Functor, typename ParentResultType>
1354 struct functor_result<transwarp::consume_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1355  using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1356 };
1357 
1358 template<typename Functor, typename... ParentResults>
1359 struct functor_result<transwarp::wait_type, Functor, ParentResults...> {
1360  static_assert(sizeof...(ParentResults) > 0, "A wait task must have at least one parent");
1361  using type = decltype(std::declval<Functor>()());
1362 };
1363 
1364 template<typename Functor, typename ParentResultType>
1365 struct functor_result<transwarp::wait_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1366  using type = decltype(std::declval<Functor>()());
1367 };
1368 
1369 template<typename Functor, typename... ParentResults>
1370 struct functor_result<transwarp::wait_any_type, Functor, ParentResults...> {
1371  static_assert(sizeof...(ParentResults) > 0, "A wait_any task must have at least one parent");
1372  using type = decltype(std::declval<Functor>()());
1373 };
1374 
1375 template<typename Functor, typename ParentResultType>
1376 struct functor_result<transwarp::wait_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1377  using type = decltype(std::declval<Functor>()());
1378 };
1379 
1380 /// Determines the result type of the Functor dispatching on the task type
1381 template<typename TaskType, typename Functor, typename... ParentResults>
1382 using functor_result_t = typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type;
1383 
1384 
1385 /// Assigns the task to the given functor if the functor is a subclass of transwarp::functor
1386 template<typename Functor>
1387 void assign_task_if(Functor& functor, transwarp::itask& task) noexcept {
1388  if constexpr (std::is_base_of_v<transwarp::functor, Functor>) {
1389  functor.transwarp_task_ = &task;
1390  }
1391 }
1392 
1393 
1394 /// Returns a ready future with the given value as its state
1395 template<typename ResultType, typename Value>
1396 std::shared_future<ResultType> make_future_with_value(Value&& value) {
1397  std::promise<ResultType> promise;
1398  promise.set_value(std::forward<Value>(value));
1399  return promise.get_future();
1400 }
1401 
1402 /// Returns a ready future
1403 inline
1404 std::shared_future<void> make_ready_future() {
1405  std::promise<void> promise;
1406  promise.set_value();
1407  return promise.get_future();
1408 }
1409 
1410 /// Returns a ready future with the given exception as its state
1411 template<typename ResultType>
1412 std::shared_future<ResultType> make_future_with_exception(std::exception_ptr exception) {
1413  if (!exception) {
1414  throw transwarp::invalid_parameter{"exception pointer"};
1415  }
1416  std::promise<ResultType> promise;
1417  promise.set_exception(exception);
1418  return promise.get_future();
1419 }
1420 
1421 
1422 /// Determines the type of the parents
1423 template<typename... ParentResults>
1424 struct parents {
1425  using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1426  static std::size_t size(const type&) {
1427  return std::tuple_size_v<type>;
1428  }
1429  static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1430  type cloned = obj;
1432  [&task_cache](auto& t) {
1433  t = detail::clone_task(task_cache, t);
1434  }, cloned);
1435  return cloned;
1436  }
1437  static std::vector<transwarp::itask*> tasks(const type& parents) {
1438  std::vector<transwarp::itask*> tasks;
1440  [&tasks](auto& t) {
1441  tasks.push_back(t.get());
1442  }, parents);
1443  return tasks;
1444  }
1445 };
1446 
1447 /// Determines the type of the parents. Specialization for vector parents
1448 template<typename ParentResultType>
1449 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1450  using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1451  static std::size_t size(const type& obj) {
1452  return obj.size();
1453  }
1454  static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1455  type cloned = obj;
1456  for (auto& t : cloned) {
1457  t = detail::clone_task(task_cache, t);
1458  }
1459  return cloned;
1460  }
1461  static std::vector<transwarp::itask*> tasks(const type& parents) {
1462  std::vector<transwarp::itask*> tasks;
1463  for (auto& t : parents) {
1464  tasks.push_back(t.get());
1465  }
1466  return tasks;
1467  }
1468 };
1469 
1470 /// Determines the type of the parents
1471 template<typename... ParentResults>
1472 using parents_t = typename transwarp::detail::parents<ParentResults...>::type;
1473 
1474 
1475 template<typename ResultType, typename TaskType>
1477 protected:
1478 
1479  template<typename Task, typename Parents>
1480  void call(std::size_t task_id,
1481  const std::weak_ptr<Task>& task,
1482  const Parents& parents) {
1483  promise_.set_value(transwarp::detail::call<TaskType, ResultType>(task_id, task, parents));
1484  }
1485 
1486  std::promise<ResultType> promise_;
1487 };
1488 
1489 template<typename TaskType>
1490 class base_runner<void, TaskType> {
1491 protected:
1492 
1493  template<typename Task, typename Parents>
1494  void call(std::size_t task_id,
1495  const std::weak_ptr<Task>& task,
1496  const Parents& parents) {
1497  transwarp::detail::call<TaskType, void>(task_id, task, parents);
1498  promise_.set_value();
1499  }
1500 
1501  std::promise<void> promise_;
1502 };
1503 
1504 /// A callable to run a task given its parents
1505 template<typename ResultType, typename TaskType, typename Task, typename Parents>
1506 class runner : public transwarp::detail::base_runner<ResultType, TaskType> {
1507 public:
1508 
1509  runner(std::size_t task_id,
1510  const std::weak_ptr<Task>& task,
1512  : task_id_{task_id},
1513  task_{task},
1514  parents_{parents}
1515  {}
1516 
1517  std::future<ResultType> future() {
1518  return this->promise_.get_future();
1519  }
1520 
1521  void operator()() {
1522  if (const std::shared_ptr<Task> t = task_.lock()) {
1523  t->raise_event(transwarp::event_type::before_started);
1524  }
1525  try {
1526  this->call(task_id_, task_, parents_);
1527  } catch (const transwarp::task_canceled&) {
1528  this->promise_.set_exception(std::current_exception());
1529  if (const std::shared_ptr<Task> t = task_.lock()) {
1530  t->raise_event(transwarp::event_type::after_canceled);
1531  }
1532  } catch (...) {
1533  this->promise_.set_exception(std::current_exception());
1534  }
1535  if (const std::shared_ptr<Task> t = task_.lock()) {
1536  t->raise_event(transwarp::event_type::after_finished);
1537  }
1538  }
1539 
1540 private:
1541  const std::size_t task_id_;
1542  const std::weak_ptr<Task> task_;
1543  const transwarp::decay_t<Parents> parents_;
1544 };
1545 
1546 
1547 /// A simple circular buffer (FIFO).
1548 /// ValueType must support default construction. The buffer lets you push
1549 /// new values onto the back and pop old values off the front.
1550 template<typename ValueType>
1552 public:
1553 
1554  static_assert(std::is_default_constructible_v<ValueType>, "ValueType must be default constructible");
1555 
1556  using value_type = ValueType;
1557 
1558  /// Constructs a circular buffer with a given fixed capacity
1559  explicit
1560  circular_buffer(std::size_t capacity)
1561  : data_(capacity)
1562  {
1563  if (capacity < 1) {
1564  throw transwarp::invalid_parameter{"capacity"};
1565  }
1566  }
1567 
1568  // delete copy/move semantics
1569  circular_buffer(const circular_buffer&) = delete;
1570  circular_buffer& operator=(const circular_buffer&) = delete;
1571  circular_buffer(circular_buffer&& other) = delete;
1572  circular_buffer& operator=(circular_buffer&&) = delete;
1573 
1574  /// Pushes a new value onto the end of the buffer. If that exceeds the capacity
1575  /// of the buffer then the oldest value gets dropped (the one at the front).
1576  template<typename T, typename = std::enable_if_t<std::is_same_v<std::decay_t<T>, value_type>>>
1577  void push(T&& value) {
1578  data_[end_] = std::forward<T>(value);
1579  increment();
1580  }
1581 
1582  /// Returns the value at the front of the buffer (the oldest value).
1583  /// This is undefined if the buffer is empty
1584  const value_type& front() const {
1585  return data_[front_];
1586  }
1587 
1588  /// Removes the value at the front of the buffer (the oldest value)
1589  void pop() {
1590  if (!empty()) {
1591  data_[front_] = ValueType{};
1592  decrement();
1593  }
1594  }
1595 
1596  /// Returns the capacity of the buffer
1597  std::size_t capacity() const {
1598  return data_.size();
1599  }
1600 
1601  /// Returns the number of populated values of the buffer. Its maximum value
1602  /// equals the capacity of the buffer
1603  std::size_t size() const {
1604  return size_;
1605  }
1606 
1607  /// Returns whether the buffer is empty
1608  bool empty() const {
1609  return size_ == 0;
1610  }
1611 
1612  /// Returns whether the buffer is full
1613  bool full() const {
1614  return size_ == data_.size();
1615  }
1616 
1617  /// Swaps this buffer with the given buffer
1618  void swap(circular_buffer& buffer) {
1619  std::swap(end_, buffer.end_);
1620  std::swap(front_, buffer.front_);
1621  std::swap(size_, buffer.size_);
1622  std::swap(data_, buffer.data_);
1623  }
1624 
1625 private:
1626 
1627  void increment_or_wrap(std::size_t& value) const {
1628  if (value == data_.size() - 1) {
1629  value = 0;
1630  } else {
1631  ++value;
1632  }
1633  }
1634 
1635  void increment() {
1636  increment_or_wrap(end_);
1637  if (full()) {
1638  increment_or_wrap(front_);
1639  } else {
1640  ++size_;
1641  }
1642  }
1643 
1644  void decrement() {
1645  increment_or_wrap(front_);
1646  --size_;
1647  }
1648 
1649  std::size_t end_{};
1650  std::size_t front_{};
1651  std::size_t size_{};
1652  std::vector<value_type> data_;
1653 };
1654 
1655 
1656 class spinlock {
1657 public:
1658 
1659  void lock() noexcept {
1660  while (locked_.test_and_set(std::memory_order_acquire));
1661  }
1662 
1663  void unlock() noexcept {
1664  locked_.clear(std::memory_order_release);
1665  }
1666 
1667 private:
1668  std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1669 };
1670 
1671 
1672 } // detail
1673 
1674 
1675 /// A functor not doing nothing
1677  void operator()() const noexcept {}
1678 };
1679 
1680 /// An object to use in places where a no-op functor is required
1681 constexpr no_op_functor no_op{};
1682 
1683 
1684 /// Executor for sequential execution. Runs functors sequentially on the same thread
1686 public:
1687 
1688  sequential() = default;
1689 
1690  // delete copy/move semantics
1691  sequential(const sequential&) = delete;
1692  sequential& operator=(const sequential&) = delete;
1693  sequential(sequential&&) = delete;
1694  sequential& operator=(sequential&&) = delete;
1695 
1696  /// Returns the name of the executor
1697  std::string name() const override {
1698  return "transwarp::sequential";
1699  }
1700 
1701  /// Runs the functor on the current thread
1702  void execute(const std::function<void()>& functor, transwarp::itask&) override {
1703  functor();
1704  }
1705 };
1706 
1707 
1708 /// Executor for parallel execution. Uses a simple thread pool
1710 public:
1711 
1712  explicit parallel(const std::size_t n_threads,
1713  std::function<void(std::size_t thread_index)> on_thread_started = nullptr)
1714  : pool_{n_threads, std::move(on_thread_started)}
1715  {}
1716 
1717  // delete copy/move semantics
1718  parallel(const parallel&) = delete;
1719  parallel& operator=(const parallel&) = delete;
1720  parallel(parallel&&) = delete;
1721  parallel& operator=(parallel&&) = delete;
1722 
1723  /// Returns the name of the executor
1724  std::string name() const override {
1725  return "transwarp::parallel";
1726  }
1727 
1728  /// Pushes the functor into the thread pool for asynchronous execution
1729  void execute(const std::function<void()>& functor, transwarp::itask&) override {
1730  pool_.push(functor);
1731  }
1732 
1733 private:
1735 };
1736 
1737 
1738 /// Detail namespace for internal functionality only
1739 namespace detail {
1740 
1741 const std::optional<std::string> nullopt_string;
1742 const std::any any_empty;
1743 
1744 /// Common task functionality shared across `task_impl` and `value_task`
1745 template<typename ResultType>
1746 class task_common : public transwarp::task<ResultType> {
1747 public:
1748  /// The result type of this task
1749  using result_type = ResultType;
1750 
1751  /// The task's id
1752  std::size_t id() const noexcept override {
1753  return id_;
1754  }
1755 
1756  /// The optional task name
1757  const std::optional<std::string>& name() const noexcept override {
1758 #ifndef TRANSWARP_DISABLE_TASK_NAME
1759  return name_;
1760 #else
1761  return transwarp::detail::nullopt_string;
1762 #endif
1763  }
1764 
1765  /// The task priority (defaults to 0)
1766  std::int64_t priority() const noexcept override {
1767 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1768  return priority_;
1769 #else
1770  return 0;
1771 #endif
1772  }
1773 
1774  /// The custom task data (may not hold a value)
1775  const std::any& custom_data() const noexcept override {
1776 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1777  return custom_data_;
1778 #else
1779  return transwarp::detail::any_empty;
1780 #endif
1781  }
1782 
1783  /// Sets a task priority (defaults to 0). transwarp will not directly use this.
1784  /// This is only useful if something else is using the priority (e.g. a custom executor)
1785  void set_priority(std::int64_t priority) override {
1786 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1787  ensure_task_not_running();
1788  priority_ = priority;
1789 #else
1790  (void)priority;
1791 #endif
1792  }
1793 
1794  /// Resets the task priority to 0
1795  void reset_priority() override {
1796 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1797  ensure_task_not_running();
1798  priority_ = 0;
1799 #endif
1800  }
1801 
1802  /// Assigns custom data to this task. transwarp will not directly use this.
1803  /// This is only useful if something else is using this custom data (e.g. a custom executor)
1804  void set_custom_data(std::any custom_data) override {
1805 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1806  ensure_task_not_running();
1807  if (!custom_data.has_value()) {
1808  throw transwarp::invalid_parameter{"custom data"};
1809  }
1810  custom_data_ = std::move(custom_data);
1812 #else
1813  (void)custom_data;
1814 #endif
1815  }
1816 
1817  /// Removes custom data from this task
1818  void remove_custom_data() override {
1819 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1820  ensure_task_not_running();
1821  custom_data_ = {};
1823 #endif
1824  }
1825 
1826  /// Returns the future associated to the underlying execution
1827  std::shared_future<result_type> future() const noexcept override {
1828  return future_;
1829  }
1830 
1831  /// Adds a new listener for all event types
1832  void add_listener(std::shared_ptr<transwarp::listener> listener) override {
1833  ensure_task_not_running();
1834  check_listener(listener);
1835  for (int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
1836  listeners_[static_cast<transwarp::event_type>(i)].push_back(listener);
1837  }
1838  }
1839 
1840  /// Adds a new listener for the given event type only
1841  void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
1842  ensure_task_not_running();
1843  check_listener(listener);
1844  listeners_[event].push_back(std::move(listener));
1845  }
1846 
1847  /// Removes the listener for all event types
1848  void remove_listener(const std::shared_ptr<transwarp::listener>& listener) override {
1849  ensure_task_not_running();
1850  check_listener(listener);
1851  for (int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
1852  auto listeners_pair = listeners_.find(static_cast<transwarp::event_type>(i));
1853  if (listeners_pair != listeners_.end()) {
1854  std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
1855  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1856  }
1857  }
1858  }
1859 
1860  /// Removes the listener for the given event type only
1861  void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
1862  ensure_task_not_running();
1863  check_listener(listener);
1864  auto listeners_pair = listeners_.find(event);
1865  if (listeners_pair != listeners_.end()) {
1866  std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
1867  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1868  }
1869  }
1870 
1871  /// Removes all listeners
1872  void remove_listeners() override {
1873  ensure_task_not_running();
1874  listeners_.clear();
1875  }
1876 
1877  /// Removes all listeners for the given event type
1879  ensure_task_not_running();
1880  auto listeners_pair = listeners_.find(event);
1881  if (listeners_pair != listeners_.end()) {
1882  listeners_pair->second.clear();
1883  }
1884  }
1885 
1886 protected:
1887 
1888  /// Checks if the task is currently running and throws transwarp::control_error if it is
1890  if (future_.valid() && future_.wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
1891  throw transwarp::control_error{"task currently running: " + transwarp::to_string(*this, " ")};
1892  }
1893  }
1894 
1895  /// Raises the given event to all listeners
1897  auto listeners_pair = listeners_.find(event);
1898  if (listeners_pair != listeners_.end()) {
1899  for (const std::shared_ptr<transwarp::listener>& listener : listeners_pair->second) {
1900  listener->handle_event(event, *this);
1901  }
1902  }
1903  }
1904 
1905  /// Check for non-null listener pointer
1906  void check_listener(const std::shared_ptr<transwarp::listener>& listener) const {
1907  if (!listener) {
1908  throw transwarp::invalid_parameter{"listener pointer"};
1909  }
1910  }
1911 
1912  /// Assigns the given id
1913  void set_id(std::size_t id) noexcept override {
1914  id_ = id;
1915  }
1916 
1917  /// Assigns the given name
1918  void set_name(std::optional<std::string> name) noexcept override {
1919 #ifndef TRANSWARP_DISABLE_TASK_NAME
1920  name_ = std::move(name);
1921 #else
1922  (void)name;
1923 #endif
1924  }
1925 
1926  void copy_from(const task_common& task) {
1927  id_ = task.id_;
1928 #ifndef TRANSWARP_DISABLE_TASK_NAME
1929  name_ = task.name_;
1930 #endif
1931 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1932  priority_ = task.priority_;
1933 #endif
1934 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1935  custom_data_ = task.custom_data_;
1936 #endif
1937  if (task.has_result()) {
1938  try {
1939  if constexpr (std::is_void_v<result_type>) {
1940  task.future_.get();
1942  } else {
1943  future_ = transwarp::detail::make_future_with_value<result_type>(task.future_.get());
1944  }
1945  } catch (...) {
1946  future_ = transwarp::detail::make_future_with_exception<result_type>(std::current_exception());
1947  }
1948  }
1949  visited_ = task.visited_;
1950  listeners_ = task.listeners_;
1951  }
1952 
1953  std::size_t id_ = 0;
1954 #ifndef TRANSWARP_DISABLE_TASK_NAME
1955  std::optional<std::string> name_;
1956 #endif
1957 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1958  std::int64_t priority_ = 0;
1959 #endif
1960 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1961  std::any custom_data_;
1962 #endif
1963  std::shared_future<result_type> future_;
1964  bool visited_ = false;
1965  std::map<transwarp::event_type, std::vector<std::shared_ptr<transwarp::listener>>> listeners_;
1966  std::vector<transwarp::itask*> tasks_;
1967 };
1968 
1969 
1970 /// The base task class that contains the functionality that can be used
1971 /// with all result types (void and non-void).
1972 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
1974 public:
1975  /// The task type
1976  using task_type = TaskType;
1977 
1978  /// The result type of this task
1979  using result_type = ResultType;
1980 
1981  /// Can be called to explicitly finalize this task making this task
1982  /// the terminal task in the graph. This is also done implicitly when
1983  /// calling, e.g., any of the *_all methods. It should normally not be
1984  /// necessary to call this method directly
1985  void finalize() override {
1986  if (this->tasks_.empty()) {
1987  visit(transwarp::detail::final_visitor{this->tasks_});
1988  unvisit();
1989  auto compare = [](const transwarp::itask* const l, const transwarp::itask* const r) {
1990  const std::size_t l_level = l->level();
1991  const std::size_t l_id = l->id();
1992  const std::size_t r_level = r->level();
1993  const std::size_t r_id = r->id();
1994  return std::tie(l_level, l_id) < std::tie(r_level, r_id);
1995  };
1996  std::sort(this->tasks_.begin(), this->tasks_.end(), compare);
1997  }
1998  }
1999 
2000  /// The task's level
2001  std::size_t level() const noexcept override {
2002  return level_;
2003  }
2004 
2005  /// The task's type
2006  transwarp::task_type type() const noexcept override {
2007  return type_;
2008  }
2009 
2010  /// The task's executor (may be null)
2011  std::shared_ptr<transwarp::executor> executor() const noexcept override {
2012  return executor_;
2013  }
2014 
2015  /// Returns whether the associated task is canceled
2016  bool canceled() const noexcept override {
2017  return canceled_.load();
2018  }
2019 
2020  /// Returns the average idletime in microseconds (-1 if never set)
2021  std::int64_t avg_idletime_us() const noexcept override {
2022 #ifndef TRANSWARP_DISABLE_TASK_TIME
2023  return avg_idletime_us_.load();
2024 #else
2025  return -1;
2026 #endif
2027  }
2028 
2029  /// Returns the average waittime in microseconds (-1 if never set)
2030  std::int64_t avg_waittime_us() const noexcept override {
2031 #ifndef TRANSWARP_DISABLE_TASK_TIME
2032  return avg_waittime_us_.load();
2033 #else
2034  return -1;
2035 #endif
2036  }
2037 
2038  /// Returns the average runtime in microseconds (-1 if never set)
2039  std::int64_t avg_runtime_us() const noexcept override {
2040 #ifndef TRANSWARP_DISABLE_TASK_TIME
2041  return avg_runtime_us_.load();
2042 #else
2043  return -1;
2044 #endif
2045  }
2046 
2047  /// Assigns an executor to this task which takes precedence over
2048  /// the executor provided in schedule() or schedule_all()
2049  void set_executor(std::shared_ptr<transwarp::executor> executor) override {
2050  this->ensure_task_not_running();
2051  if (!executor) {
2052  throw transwarp::invalid_parameter{"executor pointer"};
2053  }
2054  executor_ = std::move(executor);
2055  }
2056 
2057  /// Assigns an executor to all tasks which takes precedence over
2058  /// the executor provided in schedule() or schedule_all()
2059  void set_executor_all(std::shared_ptr<transwarp::executor> executor) override {
2060  this->ensure_task_not_running();
2061  transwarp::detail::set_executor_visitor visitor{std::move(executor)};
2062  visit_all(visitor);
2063  }
2064 
2065  /// Removes the executor from this task
2066  void remove_executor() override {
2067  this->ensure_task_not_running();
2068  executor_.reset();
2069  }
2070 
2071  /// Removes the executor from all tasks
2072  void remove_executor_all() override {
2073  this->ensure_task_not_running();
2075  visit_all(visitor);
2076  }
2077 
2078  /// Schedules this task for execution on the caller thread.
2079  /// The task-specific executor gets precedence if it exists.
2080  /// This overload will reset the underlying future.
2081  void schedule() override {
2082  this->ensure_task_not_running();
2083  this->schedule_impl(true);
2084  }
2085 
2086  /// Schedules this task for execution on the caller thread.
2087  /// The task-specific executor gets precedence if it exists.
2088  /// reset denotes whether schedule should reset the underlying
2089  /// future and schedule even if the future is already valid.
2090  void schedule(bool reset) override {
2091  this->ensure_task_not_running();
2092  this->schedule_impl(reset);
2093  }
2094 
2095  /// Schedules this task for execution using the provided executor.
2096  /// The task-specific executor gets precedence if it exists.
2097  /// This overload will reset the underlying future.
2099  this->ensure_task_not_running();
2100  this->schedule_impl(true, &executor);
2101  }
2102 
2103  /// Schedules this task for execution using the provided executor.
2104  /// The task-specific executor gets precedence if it exists.
2105  /// reset denotes whether schedule should reset the underlying
2106  /// future and schedule even if the future is already valid.
2107  void schedule(transwarp::executor& executor, bool reset) override {
2108  this->ensure_task_not_running();
2109  this->schedule_impl(reset, &executor);
2110  }
2111 
2112  /// Schedules all tasks in the graph for execution on the caller thread.
2113  /// The task-specific executors get precedence if they exist.
2114  /// This overload will reset the underlying futures.
2115  void schedule_all() override {
2116  this->ensure_task_not_running();
2117  schedule_all_impl(true);
2118  }
2119 
2120  /// Schedules all tasks in the graph for execution using the provided executor.
2121  /// The task-specific executors get precedence if they exist.
2122  /// This overload will reset the underlying futures.
2124  this->ensure_task_not_running();
2125  schedule_all_impl(true, &executor);
2126  }
2127 
2128  /// Schedules all tasks in the graph for execution on the caller thread.
2129  /// The task-specific executors get precedence if they exist.
2130  /// reset_all denotes whether schedule_all should reset the underlying
2131  /// futures and schedule even if the futures are already present.
2132  void schedule_all(bool reset_all) override {
2133  this->ensure_task_not_running();
2134  schedule_all_impl(reset_all);
2135  }
2136 
2137  /// Schedules all tasks in the graph for execution using the provided executor.
2138  /// The task-specific executors get precedence if they exist.
2139  /// reset_all denotes whether schedule_all should reset the underlying
2140  /// futures and schedule even if the futures are already present.
2141  void schedule_all(transwarp::executor& executor, bool reset_all) override {
2142  this->ensure_task_not_running();
2143  schedule_all_impl(reset_all, &executor);
2144  }
2145 
2146  /// Assigns an exception to this task. Scheduling will have no effect after an exception
2147  /// has been set. Calling reset() will remove the exception and re-enable scheduling.
2148  void set_exception(std::exception_ptr exception) override {
2149  this->ensure_task_not_running();
2150  this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2151  schedule_mode_ = false;
2153  }
2154 
2155  /// Returns whether the task was scheduled and not reset afterwards.
2156  /// This means that the underlying future is valid
2157  bool was_scheduled() const noexcept override {
2158  return this->future_.valid();
2159  }
2160 
2161  /// Waits for the task to complete. Should only be called if was_scheduled()
2162  /// is true, throws transwarp::control_error otherwise
2163  void wait() const override {
2164  ensure_task_was_scheduled();
2165  this->future_.wait();
2166  }
2167 
2168  /// Returns whether the task has finished processing. Should only be called
2169  /// if was_scheduled() is true, throws transwarp::control_error otherwise
2170  bool is_ready() const override {
2171  ensure_task_was_scheduled();
2172  return this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2173  }
2174 
2175  /// Returns whether this task contains a result
2176  bool has_result() const noexcept override {
2177  return was_scheduled() && this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2178  }
2179 
2180  /// Resets this task
2181  void reset() override {
2182  this->ensure_task_not_running();
2183  this->future_ = std::shared_future<result_type>{};
2184  cancel(false);
2185  schedule_mode_ = true;
2186 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2187  refcount_ = childcount_;
2188 #endif
2190  }
2191 
2192  /// Resets all tasks in the graph
2193  void reset_all() override {
2194  this->ensure_task_not_running();
2196  visit_all(visitor);
2197  }
2198 
2199  /// If enabled then this task is canceled which will
2200  /// throw transwarp::task_canceled when retrieving the task result.
2201  /// Passing false is equivalent to resume.
2202  void cancel(bool enabled) noexcept override {
2203  canceled_ = enabled;
2204  }
2205 
2206  /// If enabled then all pending tasks in the graph are canceled which will
2207  /// throw transwarp::task_canceled when retrieving the task result.
2208  /// Passing false is equivalent to resume.
2209  void cancel_all(bool enabled) noexcept override {
2210  transwarp::detail::cancel_visitor visitor{enabled};
2211  visit_all(visitor);
2212  }
2213 
2214  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
2215  /// This is only useful if something else is using the priority (e.g. a custom executor)
2216  void set_priority_all(std::int64_t priority) override {
2217 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2218  this->ensure_task_not_running();
2219  transwarp::detail::set_priority_visitor visitor{priority};
2220  visit_all(visitor);
2221 #else
2222  (void)priority;
2223 #endif
2224  }
2225 
2226  /// Resets the priority of all tasks to 0
2227  void reset_priority_all() override {
2228 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2229  this->ensure_task_not_running();
2231  visit_all(visitor);
2232 #endif
2233  }
2234 
2235  /// Assigns custom data to all tasks. transwarp will not directly use this.
2236  /// This is only useful if something else is using this custom data (e.g. a custom executor)
2237  void set_custom_data_all(std::any custom_data) override {
2238 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2239  this->ensure_task_not_running();
2240  transwarp::detail::set_custom_data_visitor visitor{std::move(custom_data)};
2241  visit_all(visitor);
2242 #else
2243  (void)custom_data;
2244 #endif
2245  }
2246 
2247  /// Removes custom data from all tasks
2248  void remove_custom_data_all() override {
2249 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2250  this->ensure_task_not_running();
2252  visit_all(visitor);
2253 #endif
2254  }
2255 
2256  /// Adds a new listener for all event types and for all parents
2257  void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
2258  this->ensure_task_not_running();
2259  transwarp::detail::add_listener_visitor visitor{std::move(listener)};
2260  visit_all(visitor);
2261  }
2262 
2263  /// Adds a new listener for the given event type only and for all parents
2264  void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
2265  this->ensure_task_not_running();
2266  transwarp::detail::add_listener_per_event_visitor visitor{event, std::move(listener)};
2267  visit_all(visitor);
2268  }
2269 
2270  /// Removes the listener for all event types and for all parents
2271  void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
2272  this->ensure_task_not_running();
2273  transwarp::detail::remove_listener_visitor visitor{std::move(listener)};
2274  visit_all(visitor);
2275  }
2276 
2277  /// Removes the listener for the given event type only and for all parents
2278  void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
2279  this->ensure_task_not_running();
2280  transwarp::detail::remove_listener_per_event_visitor visitor{event, std::move(listener)};
2281  visit_all(visitor);
2282  }
2283 
2284  /// Removes all listeners and for all parents
2285  void remove_listeners_all() override {
2286  this->ensure_task_not_running();
2288  visit_all(visitor);
2289  }
2290 
2291  /// Removes all listeners for the given event type and for all parents
2293  this->ensure_task_not_running();
2295  visit_all(visitor);
2296  }
2297 
2298  /// Returns the task's parents (may be empty)
2299  std::vector<transwarp::itask*> parents() const override {
2301  }
2302 
2303  /// Returns all tasks in the graph in breadth order
2304  const std::vector<transwarp::itask*>& tasks() override {
2305  finalize();
2306  return this->tasks_;
2307  }
2308 
2309  /// Returns all edges in the graph. This is mainly for visualizing
2310  /// the tasks and their interdependencies. Pass the result into transwarp::to_string
2311  /// to retrieve a dot-style graph representation for easy viewing.
2312  std::vector<transwarp::edge> edges() override {
2313  std::vector<transwarp::edge> edges;
2314  transwarp::detail::edges_visitor visitor{edges};
2315  visit_all(visitor);
2316  return edges;
2317  }
2318 
2319 protected:
2320 
2321  task_impl_base() = default;
2322 
2323  template<typename F>
2324  task_impl_base(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2325  : functor_{new Functor{std::forward<F>(functor)}},
2326  parents_{std::move(parents)...}
2327  {
2328  init();
2329  }
2330 
2331  template<typename F, typename P>
2332  task_impl_base(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2333  : functor_{new Functor{std::forward<F>(functor)}},
2334  parents_{std::move(parents)}
2335  {
2336  init();
2337  if (parents_.empty()) {
2338  set_type(transwarp::task_type::root);
2339  }
2340  }
2341 
2342  void init() {
2343  set_type(task_type::value);
2344  transwarp::detail::assign_task_if(*functor_, *this);
2346  }
2347 
2348  template<typename R, typename Y, typename T, typename P>
2349  friend class transwarp::detail::runner;
2350 
2351  template<typename R, typename T, typename... A>
2352  friend R transwarp::detail::run_task(std::size_t, const std::weak_ptr<T>&, A&&...);
2353 
2354  /// Assigns the given level
2355  void set_level(std::size_t level) noexcept override {
2356  level_ = level;
2357  }
2358 
2359  /// Assigns the given type
2360  void set_type(transwarp::task_type type) noexcept override {
2361  type_ = type;
2362  }
2363 
2364  /// Assigns the given idletime
2365  void set_avg_idletime_us(std::int64_t idletime) noexcept override {
2366 #ifndef TRANSWARP_DISABLE_TASK_TIME
2367  avg_idletime_us_ = idletime;
2368 #else
2369  (void)idletime;
2370 #endif
2371  }
2372 
2373  /// Assigns the given waittime
2374  void set_avg_waittime_us(std::int64_t waittime) noexcept override {
2375 #ifndef TRANSWARP_DISABLE_TASK_TIME
2376  avg_waittime_us_ = waittime;
2377 #else
2378  (void)waittime;
2379 #endif
2380  }
2381 
2382  /// Assigns the given runtime
2383  void set_avg_runtime_us(std::int64_t runtime) noexcept override {
2384 #ifndef TRANSWARP_DISABLE_TASK_TIME
2385  avg_runtime_us_ = runtime;
2386 #else
2387  (void)runtime;
2388 #endif
2389  }
2390 
2391  /// Checks if the task was scheduled and throws transwarp::control_error if it's not
2393  if (!this->future_.valid()) {
2394  throw transwarp::control_error{"task was not scheduled: " + transwarp::to_string(*this, " ")};
2395  }
2396  }
2397 
2398  /// Schedules this task for execution using the provided executor.
2399  /// The task-specific executor gets precedence if it exists.
2400  /// Runs the task on the same thread as the caller if neither the global
2401  /// nor the task-specific executor is found.
2402  void schedule_impl(bool reset, transwarp::executor* executor=nullptr) override {
2403  if (schedule_mode_ && (reset || !this->future_.valid())) {
2404  if (reset) {
2405  cancel(false);
2406  }
2407 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2408  refcount_ = childcount_;
2409 #endif
2410  std::weak_ptr<task_impl_base> self = std::static_pointer_cast<task_impl_base>(this->shared_from_this());
2412  std::shared_ptr<runner_t> runner = std::shared_ptr<runner_t>{new runner_t{this->id(), self, parents_}};
2413  this->raise_event(transwarp::event_type::before_scheduled);
2414  this->future_ = runner->future();
2416  if (this->executor_) {
2417  this->executor_->execute([runner]{ (*runner)(); }, *this);
2418  } else if (executor) {
2419  executor->execute([runner]{ (*runner)(); }, *this);
2420  } else {
2421  (*runner)();
2422  }
2423  }
2424  }
2425 
2426  /// Schedules all tasks in the graph for execution using the provided executor.
2427  /// The task-specific executors get precedence if they exist.
2428  /// Runs tasks on the same thread as the caller if neither the global
2429  /// nor a task-specific executor is found.
2430  void schedule_all_impl(bool reset_all, transwarp::executor* executor=nullptr) {
2431  transwarp::detail::schedule_visitor visitor{reset_all, executor};
2432  visit_all(visitor);
2433  }
2434 
2435  /// Visits each task in a depth-first traversal
2436  void visit(const std::function<void(transwarp::itask&)>& visitor) override {
2437  if (!this->visited_) {
2439  visitor(*this);
2440  this->visited_ = true;
2441  }
2442  }
2443 
2444  /// Traverses through each task and marks them as not visited.
2445  void unvisit() noexcept override {
2446  if (this->visited_) {
2447  this->visited_ = false;
2449  }
2450  }
2451 
2452  /// Visits all tasks
2453  template<typename Visitor>
2454  void visit_all(Visitor& visitor) {
2455  finalize();
2456  for (transwarp::itask* t : this->tasks_) {
2457  visitor(*t);
2458  }
2459  }
2460 
2461  void increment_childcount() noexcept override {
2462 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2463  ++childcount_;
2464 #endif
2465  }
2466 
2467  void decrement_refcount() override {
2468 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2469  if (--refcount_ == 0) {
2470  this->raise_event(transwarp::event_type::after_satisfied);
2471  }
2472 #endif
2473  }
2474 
2475  void reset_future() override {
2476  this->future_ = std::shared_future<result_type>{};
2478  }
2479 
2480  std::size_t level_ = 0;
2482  std::shared_ptr<transwarp::executor> executor_;
2483  std::atomic<bool> canceled_{false};
2484  bool schedule_mode_ = true;
2485 #ifndef TRANSWARP_DISABLE_TASK_TIME
2486  std::atomic<std::int64_t> avg_idletime_us_{-1};
2487  std::atomic<std::int64_t> avg_waittime_us_{-1};
2488  std::atomic<std::int64_t> avg_runtime_us_{-1};
2489 #endif
2490  std::unique_ptr<Functor> functor_;
2491  transwarp::detail::parents_t<ParentResults...> parents_;
2492 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2493  std::size_t childcount_ = 0;
2494  std::atomic<std::size_t> refcount_{0};
2495 #endif
2496 };
2497 
2498 
2499 /// A task proxy
2500 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2501 class task_impl_proxy : public transwarp::detail::task_impl_base<ResultType, TaskType, Functor, ParentResults...> {
2502 public:
2503  /// The task type
2504  using task_type = TaskType;
2505 
2506  /// The result type of this task
2507  using result_type = ResultType;
2508 
2509  /// Assigns a value to this task. Scheduling will have no effect after a value
2510  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2511  void set_value(const transwarp::decay_t<result_type>& value) override {
2512  this->ensure_task_not_running();
2513  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2514  this->schedule_mode_ = false;
2516  }
2517 
2518  /// Assigns a value to this task. Scheduling will have no effect after a value
2519  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2521  this->ensure_task_not_running();
2522  this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2523  this->schedule_mode_ = false;
2525  }
2526 
2527  /// Returns the result of this task. Throws any exceptions that the underlying
2528  /// functor throws. Should only be called if was_scheduled() is true,
2529  /// throws transwarp::control_error otherwise
2530  transwarp::result_t<result_type> get() const override {
2531  this->ensure_task_was_scheduled();
2532  return this->future_.get();
2533  }
2534 
2535 protected:
2536 
2537  task_impl_proxy() = default;
2538 
2539  template<typename F>
2540  task_impl_proxy(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2541  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)...}
2542  {}
2543 
2544  template<typename F, typename P>
2545  task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2546  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2547  {}
2548 
2549 };
2550 
2551 /// A task proxy for reference result type.
2552 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2553 class task_impl_proxy<ResultType&, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<ResultType&, TaskType, Functor, ParentResults...> {
2554 public:
2555  /// The task type
2556  using task_type = TaskType;
2557 
2558  /// The result type of this task
2559  using result_type = ResultType&;
2560 
2561  /// Assigns a value to this task. Scheduling will have no effect after a value
2562  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2564  this->ensure_task_not_running();
2565  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2566  this->schedule_mode_ = false;
2568  }
2569 
2570  /// Returns the result of this task. Throws any exceptions that the underlying
2571  /// functor throws. Should only be called if was_scheduled() is true,
2572  /// throws transwarp::control_error otherwise
2573  transwarp::result_t<result_type> get() const override {
2574  this->ensure_task_was_scheduled();
2575  return this->future_.get();
2576  }
2577 
2578 protected:
2579 
2580  task_impl_proxy() = default;
2581 
2582  template<typename F>
2583  task_impl_proxy(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2584  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)...}
2585  {}
2586 
2587  template<typename F, typename P>
2588  task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2589  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2590  {}
2591 
2592 };
2593 
2594 /// A task proxy for void result type.
2595 template<typename TaskType, typename Functor, typename... ParentResults>
2596 class task_impl_proxy<void, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<void, TaskType, Functor, ParentResults...> {
2597 public:
2598  /// The task type
2599  using task_type = TaskType;
2600 
2601  /// The result type of this task
2602  using result_type = void;
2603 
2604  /// Assigns a value to this task. Scheduling will have no effect after a call
2605  /// to this. Calling reset() will reset this and re-enable scheduling.
2606  void set_value() override {
2607  this->ensure_task_not_running();
2608  this->future_ = transwarp::detail::make_ready_future();
2609  this->schedule_mode_ = false;
2611  }
2612 
2613  /// Blocks until the task finishes. Throws any exceptions that the underlying
2614  /// functor throws. Should only be called if was_scheduled() is true,
2615  /// throws transwarp::control_error otherwise
2616  void get() const override {
2617  this->ensure_task_was_scheduled();
2618  this->future_.get();
2619  }
2620 
2621 protected:
2622 
2623  task_impl_proxy() = default;
2624 
2625  template<typename F>
2626  task_impl_proxy(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2627  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)...}
2628  {}
2629 
2630  template<typename F, typename P>
2631  task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2632  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2633  {}
2634 
2635 };
2636 
2637 } // detail
2638 
2639 
2640 /// A task representing a piece of work given by functor and parent tasks.
2641 /// By connecting tasks a directed acyclic graph is built.
2642 /// Tasks should be created using the make_task factory functions.
2643 template<typename TaskType, typename Functor, typename... ParentResults>
2644 class task_impl : public transwarp::detail::task_impl_proxy<transwarp::detail::functor_result_t<TaskType, Functor, ParentResults...>, TaskType, Functor, ParentResults...> {
2645 public:
2646  /// The task type
2647  using task_type = TaskType;
2648 
2649  /// The result type of this task
2650  using result_type = transwarp::detail::functor_result_t<TaskType, Functor, ParentResults...>;
2651 
2652  /// A task is defined by functor and parent tasks.
2653  /// Note: Don't use this constructor directly, use transwarp::make_task
2654  template<typename F>
2655  task_impl(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2656  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)...}
2657  {}
2658 
2659  /// A task is defined by functor and parent tasks.
2660  /// Note: Don't use this constructor directly, use transwarp::make_task
2661  template<typename F, typename P>
2662  task_impl(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2663  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2664  {}
2665 
2666  // delete copy/move semantics
2667  task_impl(const task_impl&) = delete;
2668  task_impl& operator=(const task_impl&) = delete;
2669  task_impl(task_impl&&) = delete;
2670  task_impl& operator=(task_impl&&) = delete;
2671 
2672  /// Gives this task a name and returns a ptr to itself
2673  std::shared_ptr<task_impl> named(std::string name) {
2674 #ifndef TRANSWARP_DISABLE_TASK_NAME
2675  this->set_name(std::make_optional(std::move(name)));
2676 #else
2677  (void)name;
2678 #endif
2679  return std::static_pointer_cast<task_impl>(this->shared_from_this());
2680  }
2681 
2682  /// Creates a continuation to this task
2683  template<typename TaskType_, typename Functor_>
2684  auto then(TaskType_, Functor_&& functor) {
2686  return std::shared_ptr<task_t>{new task_t{std::forward<Functor_>(functor), std::static_pointer_cast<task_impl>(this->shared_from_this())}};
2687  }
2688 
2689  /// Clones this task and casts the result to a ptr to task_impl
2690  std::shared_ptr<task_impl> clone_cast() const {
2691  return std::static_pointer_cast<task_impl>(this->clone());
2692  }
2693 
2694 private:
2695 
2696  task_impl() = default;
2697 
2698  std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const override {
2699  auto t = std::shared_ptr<task_impl>{new task_impl};
2700  t->copy_from(*this);
2701  t->level_ = this->level_;
2702  t->type_ = this->type_;
2703  t->executor_ = this->executor_;
2704  t->canceled_ = this->canceled_.load();
2705  t->schedule_mode_ = this->schedule_mode_;
2706 #ifndef TRANSWARP_DISABLE_TASK_TIME
2707  t->avg_idletime_us_ = this->avg_idletime_us_.load();
2708  t->avg_waittime_us_ = this->avg_waittime_us_.load();
2709  t->avg_runtime_us_ = this->avg_runtime_us_.load();
2710 #endif
2711  t->functor_ = std::unique_ptr<Functor>{new Functor{*this->functor_}};
2712  t->parents_ = transwarp::detail::parents<ParentResults...>::clone(task_cache, this->parents_);
2713  t->executor_ = this->executor_;
2714 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2715  t->childcount_ = this->childcount_;
2716 #endif
2717  return t;
2718  }
2719 
2720 };
2721 
2722 
2723 /// A value task that stores a single value and doesn't require scheduling.
2724 /// Value tasks should be created using the make_value_task factory functions.
2725 template<typename ResultType>
2726 class value_task : public transwarp::detail::task_common<ResultType> {
2727 public:
2728  /// The task type
2730 
2731  /// The result type of this task
2732  using result_type = ResultType;
2733 
2734  /// A value task is defined by a given value.
2735  /// Note: Don't use this constructor directly, use transwarp::make_value_task
2736  template<typename T>
2737  value_task(T&& value)
2738  {
2739  this->future_ = transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value));
2740  this->tasks_ = {this};
2741  }
2742 
2743  // delete copy/move semantics
2744  value_task(const value_task&) = delete;
2745  value_task& operator=(const value_task&) = delete;
2746  value_task(value_task&&) = delete;
2747  value_task& operator=(value_task&&) = delete;
2748 
2749  /// Gives this task a name and returns a ptr to itself
2750  std::shared_ptr<value_task> named(std::string name) {
2751 #ifndef TRANSWARP_DISABLE_TASK_NAME
2752  this->set_name(std::make_optional(std::move(name)));
2753 #else
2754  (void)name;
2755 #endif
2756  return std::static_pointer_cast<value_task>(this->shared_from_this());
2757  }
2758 
2759  /// Creates a continuation to this task
2760  template<typename TaskType_, typename Functor_>
2761  auto then(TaskType_, Functor_&& functor) {
2762  using task_t = transwarp::task_impl<TaskType_, std::decay_t<Functor_>, result_type>;
2763  return std::shared_ptr<task_t>{new task_t{std::forward<Functor_>(functor), std::static_pointer_cast<value_task>(this->shared_from_this())}};
2764  }
2765 
2766  /// Clones this task and casts the result to a ptr to value_task
2767  std::shared_ptr<value_task> clone_cast() const {
2768  return std::static_pointer_cast<value_task>(this->clone());
2769  }
2770 
2771  /// Nothing to be done to finalize a value task
2772  void finalize() override {}
2773 
2774  /// The task's level
2775  std::size_t level() const noexcept override {
2776  return 0;
2777  }
2778 
2779  /// The task's type
2780  transwarp::task_type type() const noexcept override {
2782  }
2783 
2784  /// Value tasks don't have executors as they don't run
2785  std::shared_ptr<transwarp::executor> executor() const noexcept override {
2786  return nullptr;
2787  }
2788 
2789  /// Value tasks cannot be canceled
2790  bool canceled() const noexcept override {
2791  return false;
2792  }
2793 
2794  /// Returns -1 as value tasks don't run
2795  std::int64_t avg_idletime_us() const noexcept override {
2796  return -1;
2797  }
2798 
2799  /// Returns -1 as value tasks don't run
2800  std::int64_t avg_waittime_us() const noexcept override {
2801  return -1;
2802  }
2803 
2804  /// Returns -1 as value tasks don't run
2805  std::int64_t avg_runtime_us() const noexcept override {
2806  return -1;
2807  }
2808 
2809  /// No-op because a value task never runs
2810  void set_executor(std::shared_ptr<transwarp::executor>) override {}
2811 
2812  /// No-op because a value task never runs and doesn't have parents
2813  void set_executor_all(std::shared_ptr<transwarp::executor>) override {}
2814 
2815  /// No-op because a value task never runs
2816  void remove_executor() override {}
2817 
2818  /// No-op because a value task never runs and doesn't have parents
2819  void remove_executor_all() override {}
2820 
2821  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
2822  /// This is only useful if something else is using the priority
2823  void set_priority_all(std::int64_t priority) override {
2824 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2825  this->set_priority(priority);
2826 #else
2827  (void)priority;
2828 #endif
2829  }
2830 
2831  /// Resets the priority of all tasks to 0
2832  void reset_priority_all() override {
2833 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2834  this->reset_priority();
2835 #endif
2836  }
2837 
2838  /// Assigns custom data to all tasks. transwarp will not directly use this.
2839  /// This is only useful if something else is using this custom data
2840  void set_custom_data_all(std::any custom_data) override {
2841 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2842  this->set_custom_data(std::move(custom_data));
2843 #else
2844  (void)custom_data;
2845 #endif
2846  }
2847 
2848  /// Removes custom data from all tasks
2849  void remove_custom_data_all() override {
2850 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2851  this->remove_custom_data();
2852 #endif
2853  }
2854 
2855  /// No-op because a value task never runs
2856  void schedule() override {}
2857 
2858  /// No-op because a value task never runs
2859  void schedule(transwarp::executor&) override {}
2860 
2861  /// No-op because a value task never runs
2862  void schedule(bool) override {}
2863 
2864  /// No-op because a value task never runs
2865  void schedule(transwarp::executor&, bool) override {}
2866 
2867  /// No-op because a value task never runs and doesn't have parents
2868  void schedule_all() override {}
2869 
2870  /// No-op because a value task never runs and doesn't have parents
2872 
2873  /// No-op because a value task never runs and doesn't have parents
2874  void schedule_all(bool) override {}
2875 
2876  /// No-op because a value task never runs and doesn't have parents
2877  void schedule_all(transwarp::executor&, bool) override {}
2878 
2879  /// Assigns a value to this task
2880  void set_value(const transwarp::decay_t<result_type>& value) override {
2881  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2883  }
2884 
2885  /// Assigns a value to this task
2887  this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2889  }
2890 
2891  /// Assigns an exception to this task
2892  void set_exception(std::exception_ptr exception) override {
2893  this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2895  }
2896 
2897  /// Returns the value of this task. Throws an exception if this task has an exception assigned to it
2898  transwarp::result_t<result_type> get() const override {
2899  return this->future_.get();
2900  }
2901 
2902  /// Returns true because a value task is scheduled once on construction
2903  bool was_scheduled() const noexcept override {
2904  return true;
2905  }
2906 
2907  /// No-op because a value task never runs
2908  void wait() const override {}
2909 
2910  /// Returns true because a value task is always ready
2911  bool is_ready() const override {
2912  return true;
2913  }
2914 
2915  /// Returns true because a value task always contains a result
2916  bool has_result() const noexcept override {
2917  return true;
2918  }
2919 
2920  /// No-op because a value task never runs
2921  void reset() override {}
2922 
2923  /// No-op because a value task never runs and doesn't have parents
2924  void reset_all() override {}
2925 
2926  /// No-op because a value task never runs
2927  void cancel(bool) noexcept override {}
2928 
2929  /// No-op because a value task never runs and doesn't have parents
2930  void cancel_all(bool) noexcept override {}
2931 
2932  /// Adds a new listener for all event types and for all parents
2933  void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
2934  this->add_listener(listener);
2935  }
2936 
2937  /// Adds a new listener for the given event type only and for all parents
2938  void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
2939  this->add_listener(event, listener);
2940  }
2941 
2942  /// Removes the listener for all event types and for all parents
2943  void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
2944  this->remove_listener(listener);
2945  }
2946 
2947  /// Removes the listener for the given event type only and for all parents
2948  void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
2949  this->remove_listener(event, listener);
2950  }
2951 
2952  /// Removes all listeners and for all parents
2953  void remove_listeners_all() override {
2954  this->remove_listeners();
2955  }
2956 
2957  /// Removes all listeners for the given event type and for all parents
2959  this->remove_listeners(event);
2960  }
2961 
2962  /// Empty because a value task doesn't have parents
2963  std::vector<transwarp::itask*> parents() const override {
2964  return {};
2965  }
2966 
2967  /// Returns all tasks in the graph in breadth order
2968  const std::vector<transwarp::itask*>& tasks() override {
2969  return this->tasks_;
2970  }
2971 
2972  /// Returns empty edges because a value task doesn't have parents
2973  std::vector<transwarp::edge> edges() override {
2974  return {};
2975  }
2976 
2977 private:
2978 
2979  value_task()
2980  {
2981  this->tasks_ = {this};
2982  }
2983 
2984  std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>&) const override {
2985  auto t = std::shared_ptr<value_task>{new value_task{}};
2986  t->copy_from(*this);
2987  return t;
2988  }
2989 
2990  /// No-op as value tasks are always at level 0
2991  void set_level(std::size_t) noexcept override {}
2992 
2993  /// No-op as value tasks are always root tasks
2994  void set_type(transwarp::task_type) noexcept override {}
2995 
2996  /// No-op as value tasks don't run
2997  void set_avg_idletime_us(std::int64_t) noexcept override {}
2998 
2999  /// No-op as value tasks don't run
3000  void set_avg_waittime_us(std::int64_t) noexcept override {}
3001 
3002  /// No-op as value tasks don't run
3003  void set_avg_runtime_us(std::int64_t) noexcept override {}
3004 
3005  /// No-op because a value task never runs
3006  void schedule_impl(bool, transwarp::executor*) override {}
3007 
3008  /// Visits this task
3009  void visit(const std::function<void(transwarp::itask&)>& visitor) override {
3010  if (!this->visited_) {
3011  visitor(*this);
3012  this->visited_ = true;
3013  }
3014  }
3015 
3016  /// Marks this task as not visited
3017  void unvisit() noexcept override {
3018  this->visited_ = false;
3019  }
3020 
3021  void increment_childcount() noexcept override {}
3022 
3023  void decrement_refcount() override {}
3024 
3025  void reset_future() override {}
3026 
3027 };
3028 
3029 
3030 /// A factory function to create a new task
3031 template<typename TaskType, typename Functor, typename... Parents>
3032 auto make_task(TaskType, Functor&& functor, std::shared_ptr<Parents>... parents) {
3033  using task_t = transwarp::task_impl<TaskType, std::decay_t<Functor>, typename Parents::result_type...>;
3034  return std::shared_ptr<task_t>{new task_t{std::forward<Functor>(functor), std::move(parents)...}};
3035 }
3036 
3037 
3038 /// A factory function to create a new task with vector parents
3039 template<typename TaskType, typename Functor, typename ParentType>
3040 auto make_task(TaskType, Functor&& functor, std::vector<ParentType> parents) {
3041  using task_t = transwarp::task_impl<TaskType, std::decay_t<Functor>, std::vector<ParentType>>;
3042  return std::shared_ptr<task_t>{new task_t{std::forward<Functor>(functor), std::move(parents)}};
3043 }
3044 
3045 
3046 /// A factory function to create a new value task
3047 template<typename Value>
3048 auto make_value_task(Value&& value) {
3050  return std::shared_ptr<task_t>{new task_t{std::forward<Value>(value)}};
3051 }
3052 
3053 
3054 /// A function similar to std::for_each but returning a transwarp task for
3055 /// deferred, possibly asynchronous execution. This function creates a graph
3056 /// with std::distance(first, last) root tasks
3057 template<typename InputIt, typename UnaryOperation>
3058 auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) {
3059  const auto distance = std::distance(first, last);
3060  if (distance <= 0) {
3061  throw transwarp::invalid_parameter{"first or last"};
3062  }
3063  std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3064  tasks.reserve(static_cast<std::size_t>(distance));
3065  for (; first != last; ++first) {
3066  tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first]{ unary_op(*first); }));
3067  }
3069  final->finalize();
3070  return final;
3071 }
3072 
3073 /// A function similar to std::for_each but returning a transwarp task for
3074 /// deferred, possibly asynchronous execution. This function creates a graph
3075 /// with std::distance(first, last) root tasks.
3076 /// Overload for automatic scheduling by passing an executor.
3077 template<typename InputIt, typename UnaryOperation>
3078 auto for_each(transwarp::executor& executor, InputIt first, InputIt last, UnaryOperation unary_op) {
3079  auto task = transwarp::for_each(first, last, unary_op);
3080  task->schedule_all(executor);
3081  return task;
3082 }
3083 
3084 
3085 /// A function similar to std::transform but returning a transwarp task for
3086 /// deferred, possibly asynchronous execution. This function creates a graph
3087 /// with std::distance(first1, last1) root tasks
3088 template<typename InputIt, typename OutputIt, typename UnaryOperation>
3089 auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) {
3090  const auto distance = std::distance(first1, last1);
3091  if (distance <= 0) {
3092  throw transwarp::invalid_parameter{"first1 or last1"};
3093  }
3094  std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3095  tasks.reserve(static_cast<std::size_t>(distance));
3096  for (; first1 != last1; ++first1, ++d_first) {
3097  tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first1,d_first]{ *d_first = unary_op(*first1); }));
3098  }
3100  final->finalize();
3101  return final;
3102 }
3103 
3104 /// A function similar to std::transform but returning a transwarp task for
3105 /// deferred, possibly asynchronous execution. This function creates a graph
3106 /// with std::distance(first1, last1) root tasks.
3107 /// Overload for automatic scheduling by passing an executor.
3108 template<typename InputIt, typename OutputIt, typename UnaryOperation>
3109 auto transform(transwarp::executor& executor, InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) {
3110  auto task = transwarp::transform(first1, last1, d_first, unary_op);
3111  task->schedule_all(executor);
3112  return task;
3113 }
3114 
3115 
3116 /// A task pool that allows running multiple instances of the same task in parallel.
3117 template<typename ResultType>
3118 class task_pool {
3119 public:
3120 
3121  /// Constructs a task pool
3123  std::size_t minimum_size,
3124  std::size_t maximum_size)
3125  : task_{std::move(task)},
3126  minimum_{minimum_size},
3127  maximum_{maximum_size},
3128  finished_{maximum_size}
3129  {
3130  if (minimum_ < 1) {
3131  throw transwarp::invalid_parameter{"minimum size"};
3132  }
3133  if (minimum_ > maximum_) {
3134  throw transwarp::invalid_parameter{"minimum or maximum size"};
3135  }
3136  task_->add_listener(transwarp::event_type::after_finished, listener_);
3137  for (std::size_t i=0; i<minimum_; ++i) {
3138  idle_.push(task_->clone());
3139  }
3140  }
3141 
3142  /// Constructs a task pool with reasonable defaults for minimum and maximum
3143  explicit
3145  : task_pool{std::move(task), 32, 65536}
3146  {}
3147 
3148  // delete copy/move semantics
3149  task_pool(const task_pool&) = delete;
3150  task_pool& operator=(const task_pool&) = delete;
3151  task_pool(task_pool&&) = delete;
3152  task_pool& operator=(task_pool&&) = delete;
3153 
3154  /// Returns the next idle task.
3155  /// If there are no idle tasks then it will attempt to double the
3156  /// pool size. If that fails then it will return a nullptr. On successful
3157  /// retrieval of an idle task the function will mark that task as busy.
3158  std::shared_ptr<transwarp::task<ResultType>> next_task(bool maybe_resize=true) {
3159  const transwarp::itask* finished_task{};
3160  {
3161  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3162  if (!finished_.empty()) {
3163  finished_task = finished_.front(); finished_.pop();
3164  }
3165  }
3166 
3167  std::shared_ptr<transwarp::task<ResultType>> task;
3168  if (finished_task) {
3169  task = busy_.find(finished_task)->second;
3170  } else {
3171  if (maybe_resize && idle_.empty()) {
3172  resize(size() * 2); // double pool size
3173  }
3174  if (idle_.empty()) {
3175  return nullptr;
3176  }
3177  task = idle_.front(); idle_.pop();
3178  busy_.emplace(task.get(), task);
3179  }
3180 
3181  auto future = task->future();
3182  if (future.valid()) {
3183  future.wait(); // will return immediately
3184  }
3185  return task;
3186  }
3187 
3188  /// Just like next_task() but waits for a task to become available.
3189  /// The returned task will always be a valid pointer
3190  std::shared_ptr<transwarp::task<ResultType>> wait_for_next_task(bool maybe_resize=true) {
3191  for (;;) {
3192  std::shared_ptr<transwarp::task<ResultType>> g = next_task(maybe_resize);
3193  if (g) {
3194  return g;
3195  }
3196  }
3197  }
3198 
3199  /// Returns the current total size of the pool (sum of idle and busy tasks)
3200  std::size_t size() const {
3201  return idle_.size() + busy_.size();
3202  }
3203 
3204  /// Returns the minimum size of the pool
3205  std::size_t minimum_size() const {
3206  return minimum_;
3207  }
3208 
3209  /// Returns the maximum size of the pool
3210  std::size_t maximum_size() const {
3211  return maximum_;
3212  }
3213 
3214  /// Returns the number of idle tasks in the pool
3215  std::size_t idle_count() const {
3216  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3217  return idle_.size() + finished_.size();
3218  }
3219 
3220  /// Returns the number of busy tasks in the pool
3221  std::size_t busy_count() const {
3222  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3223  return busy_.size() - finished_.size();
3224  }
3225 
3226  /// Resizes the task pool to the given new size if possible
3227  void resize(std::size_t new_size) {
3228  reclaim();
3229  if (new_size > size()) { // grow
3230  const std::size_t count = new_size - size();
3231  for (std::size_t i=0; i<count; ++i) {
3232  if (size() == maximum_) {
3233  break;
3234  }
3235  idle_.push(task_->clone());
3236  }
3237  } else if (new_size < size()) { // shrink
3238  const std::size_t count = size() - new_size;
3239  for (std::size_t i=0; i<count; ++i) {
3240  if (idle_.empty() || size() == minimum_) {
3241  break;
3242  }
3243  idle_.pop();
3244  }
3245  }
3246  }
3247 
3248  /// Reclaims finished tasks by marking them as idle again
3249  void reclaim() {
3250  decltype(finished_) finished{finished_.capacity()};
3251  {
3252  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3253  finished_.swap(finished);
3254  }
3255  while (!finished.empty()) {
3256  const transwarp::itask* task = finished.front(); finished.pop();
3257  const auto it = busy_.find(task);
3258  idle_.push(it->second);
3259  busy_.erase(it);
3260  }
3261  }
3262 
3263 private:
3264 
3265  class finished_listener : public transwarp::listener {
3266  public:
3267 
3268  explicit
3269  finished_listener(task_pool<ResultType>& pool)
3270  : pool_{pool}
3271  {}
3272 
3273  // Called on a potentially high-priority thread
3274  void handle_event(transwarp::event_type, transwarp::itask& task) override {
3275  std::lock_guard<transwarp::detail::spinlock> lock{pool_.spinlock_};
3276  pool_.finished_.push(static_cast<const transwarp::itask*>(&task));
3277  }
3278 
3279  private:
3280  task_pool<ResultType>& pool_;
3281  };
3282 
3283  std::shared_ptr<transwarp::task<ResultType>> task_;
3284  std::size_t minimum_;
3285  std::size_t maximum_;
3286  mutable transwarp::detail::spinlock spinlock_; // protecting finished_
3288  std::queue<std::shared_ptr<transwarp::task<ResultType>>> idle_;
3289  std::unordered_map<const transwarp::itask*, std::shared_ptr<transwarp::task<ResultType>>> busy_;
3290  std::shared_ptr<transwarp::listener> listener_{new finished_listener{*this}};
3291 };
3292 
3293 
3294 /// A timer that tracks the average idle, wait, and run time of each task it listens to.
3295 /// - idle = time between scheduling and starting the task (executor dependent)
3296 /// - wait = time between starting and invoking the task's functor, i.e. wait for parent tasks to finish
3297 /// - run = time between invoking and finishing the task's computations
3298 class timer : public transwarp::listener {
3299 public:
3300  timer() = default;
3301 
3302  // delete copy/move semantics
3303  timer(const timer&) = delete;
3304  timer& operator=(const timer&) = delete;
3305  timer(timer&&) = delete;
3306  timer& operator=(timer&&) = delete;
3307 
3308  /// Performs the actual timing and populates the task's timing members
3309  void handle_event(const transwarp::event_type event, transwarp::itask& task) override {
3310  switch (event) {
3312  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3313  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3314  auto& track = tracks_[&task];
3315  track.startidle = now;
3316  }
3317  break;
3319  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3320  track_idletime(task, now);
3321  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3322  auto& track = tracks_[&task];
3323  track.startwait = now;
3324  }
3325  break;
3327  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3328  track_waittime(task, now);
3329  }
3330  break;
3332  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3333  track_waittime(task, now);
3334  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3335  auto& track = tracks_[&task];
3336  track.running = true;
3337  track.startrun = now;
3338  }
3339  break;
3341  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3342  track_runtime(task, now);
3343  }
3344  break;
3345  default: break;
3346  }
3347  }
3348 
3349  /// Resets all timing information
3350  void reset() {
3351  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3352  tracks_.clear();
3353  }
3354 
3355 private:
3356 
3357  void track_idletime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3358  std::int64_t avg_idletime_us;
3359  {
3360  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3361  auto& track = tracks_[&task];
3362  track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3363  ++track.idlecount;
3364  avg_idletime_us = static_cast<std::int64_t>(track.idletime / track.idlecount);
3365  }
3366  task.set_avg_idletime_us(avg_idletime_us);
3367  }
3368 
3369  void track_waittime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3370  std::int64_t avg_waittime_us;
3371  {
3372  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3373  auto& track = tracks_[&task];
3374  track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3375  ++track.waitcount;
3376  avg_waittime_us = static_cast<std::int64_t>(track.waittime / track.waitcount);
3377  }
3378  task.set_avg_waittime_us(avg_waittime_us);
3379  }
3380 
3381  void track_runtime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3382  std::int64_t avg_runtime_us;
3383  {
3384  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3385  auto& track = tracks_[&task];
3386  if (!track.running) {
3387  return;
3388  }
3389  track.running = false;
3390  track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3391  ++track.runcount;
3392  avg_runtime_us = static_cast<std::int64_t>(track.runtime / track.runcount);
3393  }
3394  task.set_avg_runtime_us(avg_runtime_us);
3395  }
3396 
3397  struct track {
3398  bool running = false;
3399  std::chrono::time_point<std::chrono::steady_clock> startidle;
3400  std::chrono::time_point<std::chrono::steady_clock> startwait;
3401  std::chrono::time_point<std::chrono::steady_clock> startrun;
3402  std::chrono::microseconds::rep idletime = 0;
3403  std::chrono::microseconds::rep idlecount = 0;
3404  std::chrono::microseconds::rep waittime = 0;
3405  std::chrono::microseconds::rep waitcount = 0;
3406  std::chrono::microseconds::rep runtime = 0;
3407  std::chrono::microseconds::rep runcount = 0;
3408  };
3409 
3410  transwarp::detail::spinlock spinlock_; // protecting tracks_
3411  std::unordered_map<const transwarp::itask*, track> tracks_;
3412 };
3413 
3414 
3415 /// The releaser will release a task's future when the task's `after_satisfied`
3416 /// event was received which happens when all children received the task's result.
3417 /// The releaser should be used in cases where the task's result is only needed
3418 /// for consumption by its children and can then be discarded.
3420 public:
3421  releaser() = default;
3422 
3423  /// The executor gives control over where a task's future is released
3424  explicit releaser(std::shared_ptr<transwarp::executor> executor)
3425  : executor_{std::move(executor)}
3426  {}
3427 
3428  // delete copy/move semantics
3429  releaser(const releaser&) = delete;
3430  releaser& operator=(const releaser&) = delete;
3431  releaser(releaser&&) = delete;
3432  releaser& operator=(releaser&&) = delete;
3433 
3434  void handle_event(const transwarp::event_type event, transwarp::itask& task) override {
3436  if (executor_) {
3437  executor_->execute([&task]{ task.reset_future(); }, task);
3438  } else {
3439  task.reset_future();
3440  }
3441  }
3442  }
3443 
3444 private:
3445  std::shared_ptr<transwarp::executor> executor_;
3446 };
3447 
3448 
3449 } // transwarp
void set_avg_idletime_us(std::int64_t idletime) noexceptoverride
Assigns the given idletime.
Definition: transwarp.h:2365
constexpr transwarp::wait_any_type wait_any
The wait_any task tag.
Definition: transwarp.h:144
void assign_task_if(Functor &functor, transwarp::itask &task) noexcept
Assigns the task to the given functor if the functor is a subclass of transwarp::functor.
Definition: transwarp.h:1387
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2849
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:2943
The executor interface used to perform custom task execution.
Definition: transwarp.h:164
transwarp::task_type type() const noexceptoverride
The task&#39;s type.
Definition: transwarp.h:2006
std::remove_const_t< std::remove_reference_t< T >> decay_t
Removes reference and const from a type.
Definition: transwarp.h:406
Exception thrown when a task was destroyed prematurely.
Definition: transwarp.h:92
Adds a new listener to the given task.
Definition: transwarp.h:1198
A simple thread pool used to execute tasks in parallel.
Definition: transwarp.h:557
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:2938
std::size_t idle_count() const
Returns the number of idle tasks in the pool.
Definition: transwarp.h:3215
void cancel_all(bool) noexceptoverride
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2930
void reclaim()
Reclaims finished tasks by marking them as idle again.
Definition: transwarp.h:3249
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:2271
Removes a listener from the given task.
Definition: transwarp.h:1225
void set_value(const transwarp::decay_t< result_type > &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2511
void set_type(transwarp::task_type type) noexceptoverride
Assigns the given type.
Definition: transwarp.h:2360
The consume type. Used for tag dispatch.
Definition: transwarp.h:131
void decrement_refcount(transwarp::itask &)
Decrements refcount.
Definition: transwarp.h:801
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1295
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority.
Definition: transwarp.h:2823
A task pool that allows running multiple instances of the same task in parallel.
Definition: transwarp.h:3118
void schedule_all(transwarp::executor &) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2871
void finalize() override
Can be called to explicitly finalize this task making this task the terminal task in the graph...
Definition: transwarp.h:1985
Result run_task(std::size_t task_id, const std::weak_ptr< Task > &task, Args &&...args)
Runs the task with the given arguments, hence, invoking the task&#39;s functor.
Definition: transwarp.h:707
Removes the executor from the given task.
Definition: transwarp.h:1138
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2227
auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op)
A function similar to std::transform but returning a transwarp task for deferred, possibly asynchrono...
Definition: transwarp.h:3089
virtual std::string name() const =0
Returns the name of the executor.
The task has no parents.
Just after a task was canceled (handle_event called on thread that task is run on) ...
TaskType task_type
The task type.
Definition: transwarp.h:2599
auto for_each(InputIt first, InputIt last, UnaryOperation unary_op)
A function similar to std::for_each but returning a transwarp task for deferred, possibly asynchronou...
Definition: transwarp.h:3058
A callable to run a task given its parents.
Definition: transwarp.h:1506
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task.
Definition: transwarp.h:2892
void schedule_all(bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:2132
Generates edges.
Definition: transwarp.h:1079
Adds a new listener per event type to the given task.
Definition: transwarp.h:1211
void ensure_task_was_scheduled() const
Checks if the task was scheduled and throws transwarp::control_error if it&#39;s not. ...
Definition: transwarp.h:2392
Sets level of a task and increments the child count.
Definition: transwarp.h:1049
void remove_listener(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types.
Definition: transwarp.h:1848
void reset()
Resets all timing information.
Definition: transwarp.h:3350
std::size_t busy_count() const
Returns the number of busy tasks in the pool.
Definition: transwarp.h:3221
std::size_t size() const
Returns the current total size of the pool (sum of idle and busy tasks)
Definition: transwarp.h:3200
void visit_all(Visitor &visitor)
Visits all tasks.
Definition: transwarp.h:2454
Definition: transwarp.h:1476
void set_executor(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs.
Definition: transwarp.h:2810
const std::optional< std::string > & name() const noexceptoverride
The optional task name.
Definition: transwarp.h:1757
void remove_listeners() override
Removes all listeners.
Definition: transwarp.h:1872
constexpr no_op_functor no_op
An object to use in places where a no-op functor is required.
Definition: transwarp.h:1681
void set_executor_all(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to all tasks which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:2059
bool was_scheduled() const noexceptoverride
Returns whether the task was scheduled and not reset afterwards. This means that the underlying futur...
Definition: transwarp.h:2157
void cancel(bool) noexceptoverride
No-op because a value task never runs.
Definition: transwarp.h:2927
Just before a task is scheduled (handle_event called on thread of caller to schedule()) ...
void schedule_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2868
void schedule_all(transwarp::executor &executor, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2141
std::shared_ptr< transwarp::executor > executor() const noexceptoverride
The task&#39;s executor (may be null)
Definition: transwarp.h:2011
Assigns a priority to the given task.
Definition: transwarp.h:1146
Result call(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Calls the functor of the given task with the results from the tuple of parents. Throws transwarp::tas...
Definition: transwarp.h:1008
std::shared_ptr< transwarp::task< ResultType > > wait_for_next_task(bool maybe_resize=true)
Just like next_task() but waits for a task to become available. The returned task will always be a va...
Definition: transwarp.h:3190
void push(T &&value)
Pushes a new value onto the end of the buffer. If that exceeds the capacity of the buffer then the ol...
Definition: transwarp.h:1577
void schedule(transwarp::executor &executor) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2098
bool canceled() const noexceptoverride
Value tasks cannot be canceled.
Definition: transwarp.h:2790
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:2948
Assigns an executor to the given task.
Definition: transwarp.h:1126
std::size_t capacity() const
Returns the capacity of the buffer.
Definition: transwarp.h:1597
void raise_event(transwarp::event_type event)
Raises the given event to all listeners.
Definition: transwarp.h:1896
A simple circular buffer (FIFO). ValueType must support default construction. The buffer lets you pus...
Definition: transwarp.h:1551
void finalize() override
Nothing to be done to finalize a value task.
Definition: transwarp.h:2772
void cancel_all(bool enabled) noexceptoverride
If enabled then all pending tasks in the graph are canceled which will throw transwarp::task_canceled...
Definition: transwarp.h:2209
std::tuple< std::shared_future< ParentResults >...> get_futures(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &input)
Returns the futures from the given tuple of tasks.
Definition: transwarp.h:687
auto make_task(TaskType, Functor &&functor, std::shared_ptr< Parents >...parents)
A factory function to create a new task.
Definition: transwarp.h:3032
const std::any & custom_data() const noexceptoverride
The custom task data (may not hold a value)
Definition: transwarp.h:1775
typename transwarp::detail::functor_result< TaskType, Functor, ParentResults...>::type functor_result_t
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1382
std::shared_future< result_type > future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:1827
transwarp::task_type type() const noexceptoverride
The task&#39;s type.
Definition: transwarp.h:2780
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:2933
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:2304
void schedule_all() override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:2115
void set_executor(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to this task which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:2049
void unvisit() noexceptoverride
Traverses through each task and marks them as not visited.
Definition: transwarp.h:2445
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:2278
void cancel_all_but_one(const std::shared_ptr< transwarp::task< OneResult >> &one, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Cancels all tasks but one.
Definition: transwarp.h:778
The task class.
Definition: transwarp.h:436
Definition: transwarp.h:1656
A functor not doing nothing.
Definition: transwarp.h:1676
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2832
std::size_t level() const noexceptoverride
The task&#39;s level.
Definition: transwarp.h:2001
void reset() override
Resets this task.
Definition: transwarp.h:2181
void set_name(std::optional< std::string > name) noexceptoverride
Assigns the given name.
Definition: transwarp.h:1918
Just before a task&#39;s functor is invoked (handle_event called on thread that task is run on) ...
const transwarp::itask & transwarp_task() const noexcept
The associated task (only to be called after the task was constructed)
Definition: transwarp.h:527
void handle_event(const transwarp::event_type event, transwarp::itask &task) override
Performs the actual timing and populates the task&#39;s timing members.
Definition: transwarp.h:3309
The task&#39;s functor accepts the first parent future that becomes ready.
The consume_any type. Used for tag dispatch.
Definition: transwarp.h:135
void set_priority(std::int64_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:1785
std::size_t level() const noexceptoverride
The task&#39;s level.
Definition: transwarp.h:2775
bool was_scheduled() const noexceptoverride
Returns true because a value task is scheduled once on construction.
Definition: transwarp.h:2903
Unvisits the given task.
Definition: transwarp.h:1286
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:2953
The accept type. Used for tag dispatch.
Definition: transwarp.h:123
std::int64_t avg_waittime_us() const noexceptoverride
Returns the average waittime in microseconds (-1 if never set)
Definition: transwarp.h:2030
std::vector< transwarp::itask * > parents() const override
Returns the task&#39;s parents (may be empty)
Definition: transwarp.h:2299
constexpr transwarp::accept_type accept
The accept task tag.
Definition: transwarp.h:124
std::size_t maximum_size() const
Returns the maximum size of the pool.
Definition: transwarp.h:3210
std::int64_t priority() const noexceptoverride
The task priority (defaults to 0)
Definition: transwarp.h:1766
Common task functionality shared across task_impl and value_task
Definition: transwarp.h:1746
bool has_result() const noexceptoverride
Returns whether this task contains a result.
Definition: transwarp.h:2176
void set_custom_data(std::any custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1804
Removes all listeners from the given task.
Definition: transwarp.h:1252
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority (e.g. a custom executor)
Definition: transwarp.h:2216
The listener interface to listen to events raised by tasks.
Definition: transwarp.h:195
void add_listener(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only.
Definition: transwarp.h:1841
void schedule_impl(bool reset, transwarp::executor *executor=nullptr) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2402
void add_listener(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types.
Definition: transwarp.h:1832
Exception thrown when a task is canceled.
Definition: transwarp.h:83
TaskType task_type
The task type.
Definition: transwarp.h:2647
void wait() const override
Waits for the task to complete. Should only be called if was_scheduled() is true, throws transwarp::c...
Definition: transwarp.h:2163
void set_avg_runtime_us(std::int64_t runtime) noexceptoverride
Assigns the given runtime.
Definition: transwarp.h:2383
std::shared_future< ResultType > make_future_with_value(Value &&value)
Returns a ready future with the given value as its state.
Definition: transwarp.h:1396
void schedule() override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:2081
Just before a task starts running (handle_event called on thread that task is run on) ...
transwarp::itask & child() noexcept
Returns the child task.
Definition: transwarp.h:234
Removes custom data from the given task.
Definition: transwarp.h:1178
void schedule_all(transwarp::executor &, bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2877
Resets the given task.
Definition: transwarp.h:1106
auto then(TaskType_, Functor_ &&functor)
Creates a continuation to this task.
Definition: transwarp.h:2761
void set_custom_data_all(std::any custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2237
const transwarp::itask & child() const noexcept
Returns the child task.
Definition: transwarp.h:229
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:2292
bool full() const
Returns whether the buffer is full.
Definition: transwarp.h:1613
A base class for a user-defined functor that needs access to the associated task or a cancel point to...
Definition: transwarp.h:519
The releaser will release a task&#39;s future when the task&#39;s after_satisfied event was received which ha...
Definition: transwarp.h:3419
constexpr transwarp::consume_any_type consume_any
The consume_any task tag.
Definition: transwarp.h:136
Removes a listener per event type from the given task.
Definition: transwarp.h:1238
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:1795
std::int64_t avg_runtime_us() const noexceptoverride
Returns -1 as value tasks don&#39;t run.
Definition: transwarp.h:2805
std::int64_t avg_waittime_us() const noexceptoverride
Returns -1 as value tasks don&#39;t run.
Definition: transwarp.h:2800
Exception thrown when a task is used in unintended ways.
Definition: transwarp.h:110
Resets the priority of the given task.
Definition: transwarp.h:1158
bool canceled() const noexceptoverride
Returns whether the associated task is canceled.
Definition: transwarp.h:2016
void schedule() override
No-op because a value task never runs.
Definition: transwarp.h:2856
std::size_t size() const
Returns the number of populated values of the buffer. Its maximum value equals the capacity of the bu...
Definition: transwarp.h:1603
void set_value(transwarp::decay_t< result_type > &&value) override
Assigns a value to this task.
Definition: transwarp.h:2886
std::vector< transwarp::edge > edges() override
Returns all edges in the graph. This is mainly for visualizing the tasks and their interdependencies...
Definition: transwarp.h:2312
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:2257
void wait() const override
No-op because a value task never runs.
Definition: transwarp.h:2908
releaser(std::shared_ptr< transwarp::executor > executor)
The executor gives control over where a task&#39;s future is released.
Definition: transwarp.h:3424
void set_value(transwarp::decay_t< result_type > &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2563
A task proxy.
Definition: transwarp.h:2501
void transwarp_cancel_point() const
If the associated task is canceled then this will throw transwarp::task_canceled which will stop the ...
Definition: transwarp.h:538
The base task class that contains the functionality that can be used with all result types (void and ...
Definition: transwarp.h:1973
Definition: transwarp.h:673
std::shared_ptr< task_impl > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:2673
The task&#39;s functor consumes all parent results.
Visits the given task using the visitor given in the constructor.
Definition: transwarp.h:1274
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:2285
Just after a task has finished running (handle_event called on thread that task is run on) ...
transwarp::itask & transwarp_task() noexcept
The associated task (only to be called after the task was constructed)
Definition: transwarp.h:532
Base class for exceptions.
Definition: transwarp.h:74
typename transwarp::detail::parents< ParentResults...>::type parents_t
Determines the type of the parents.
Definition: transwarp.h:1472
constexpr transwarp::consume_type consume
The consume task tag.
Definition: transwarp.h:132
void schedule(transwarp::executor &, bool) override
No-op because a value task never runs.
Definition: transwarp.h:2865
A value task that stores a single value and doesn&#39;t require scheduling. Value tasks should be created...
Definition: transwarp.h:2726
void remove_listeners(transwarp::event_type event) override
Removes all listeners for the given event type.
Definition: transwarp.h:1878
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:2264
void set_value(transwarp::decay_t< result_type > &&value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2520
Determines the type of the parents.
Definition: transwarp.h:1424
void execute(const std::function< void()> &functor, transwarp::itask &) override
Runs the functor on the current thread.
Definition: transwarp.h:1702
Parent wait_for_any(const std::shared_ptr< transwarp::task< ParentResults >> &...parents)
Waits for the first parent to finish.
Definition: transwarp.h:752
task_impl(F &&functor, std::shared_ptr< transwarp::task< ParentResults >>...parents)
A task is defined by functor and parent tasks. Note: Don&#39;t use this constructor directly, use transwarp::make_task.
Definition: transwarp.h:2655
void reset_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2924
Assigns custom data to the given task.
Definition: transwarp.h:1166
The task&#39;s functor takes no arguments but waits for all parents to finish.
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task. Scheduling will have no effect after an exception has been set...
Definition: transwarp.h:2148
Definition: transwarp.h:834
std::shared_ptr< transwarp::task< ResultType > > next_task(bool maybe_resize=true)
Returns the next idle task. If there are no idle tasks then it will attempt to double the pool size...
Definition: transwarp.h:3158
virtual void handle_event(transwarp::event_type event, transwarp::itask &task)=0
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions.
void call_with_each(const Functor &f, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &t)
Calls the functor with every element in the tuple.
Definition: transwarp.h:1026
auto make_value_task(Value &&value)
A factory function to create a new value task.
Definition: transwarp.h:3048
std::size_t id() const noexceptoverride
The task&#39;s id.
Definition: transwarp.h:1752
transwarp::detail::functor_result_t< TaskType, Functor, ParentResults...> result_type
The result type of this task.
Definition: transwarp.h:2650
An edge between two tasks.
Definition: transwarp.h:206
void remove_listener(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only.
Definition: transwarp.h:1861
A timer that tracks the average idle, wait, and run time of each task it listens to.
Definition: transwarp.h:3298
void schedule(transwarp::executor &) override
No-op because a value task never runs.
Definition: transwarp.h:2859
void reset_all() override
Resets all tasks in the graph.
Definition: transwarp.h:2193
Just after custom data was assigned (handle_event called on thread that custom data was set on) ...
void handle_event(const transwarp::event_type event, transwarp::itask &task) override
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions.
Definition: transwarp.h:3434
circular_buffer(std::size_t capacity)
Constructs a circular buffer with a given fixed capacity.
Definition: transwarp.h:1560
Definition: transwarp.h:825
void execute(const std::function< void()> &functor, transwarp::itask &) override
Pushes the functor into the thread pool for asynchronous execution.
Definition: transwarp.h:1729
Executor for parallel execution. Uses a simple thread pool.
Definition: transwarp.h:1709
The wait_any type. Used for tag dispatch.
Definition: transwarp.h:143
void set_custom_data_all(std::any custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2840
task_pool(std::shared_ptr< transwarp::task< ResultType >> task)
Constructs a task pool with reasonable defaults for minimum and maximum.
Definition: transwarp.h:3144
void schedule(bool reset) override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:2090
std::string to_string(const transwarp::task_type &type)
String conversion for the task_type enumeration.
Definition: transwarp.h:339
void set_value(const transwarp::decay_t< result_type > &value) override
Assigns a value to this task.
Definition: transwarp.h:2880
void remove_executor() override
Removes the executor from this task.
Definition: transwarp.h:2066
Just after the task&#39;s future was changed (handle_event called on thread that changed the task&#39;s futur...
The accept_any type. Used for tag dispatch.
Definition: transwarp.h:127
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:1818
bool is_ready() const override
Returns true because a value task is always ready.
Definition: transwarp.h:2911
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1724
constexpr transwarp::wait_type wait
The wait task tag.
Definition: transwarp.h:140
The root type. Used for tag dispatch.
Definition: transwarp.h:119
The task&#39;s functor consumes the first parent result that becomes ready.
std::shared_ptr< TaskType > clone_task(std::unordered_map< std::shared_ptr< transwarp::itask >, std::shared_ptr< transwarp::itask >> &task_cache, const std::shared_ptr< TaskType > &t)
Clones a task.
Definition: transwarp.h:419
A task representing a piece of work given by functor and parent tasks. By connecting tasks a directed...
Definition: transwarp.h:2644
std::shared_ptr< task_impl > clone_cast() const
Clones this task and casts the result to a ptr to task_impl.
Definition: transwarp.h:2690
The wait type. Used for tag dispatch.
Definition: transwarp.h:139
task_pool(std::shared_ptr< transwarp::task< ResultType >> task, std::size_t minimum_size, std::size_t maximum_size)
Constructs a task pool.
Definition: transwarp.h:3122
Just after a task has satisfied all its children with results (handle_event called on thread where th...
transwarp::itask & parent() noexcept
Returns the parent task.
Definition: transwarp.h:224
void cancel(bool enabled) noexceptoverride
If enabled then this task is canceled which will throw transwarp::task_canceled when retrieving the t...
Definition: transwarp.h:2202
Schedules using the given executor.
Definition: transwarp.h:1093
void swap(circular_buffer &buffer)
Swaps this buffer with the given buffer.
Definition: transwarp.h:1618
constexpr transwarp::root_type root
The root task tag.
Definition: transwarp.h:120
virtual void execute(const std::function< void()> &functor, transwarp::itask &task)=0
Runs a task which is wrapped by the given functor. The functor only captures one shared pointer and c...
std::vector< transwarp::itask * > parents() const override
Empty because a value task doesn&#39;t have parents.
Definition: transwarp.h:2963
void schedule_all(transwarp::executor &executor) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2123
void set_level(std::size_t level) noexceptoverride
Assigns the given level.
Definition: transwarp.h:2355
const value_type & front() const
Returns the value at the front of the buffer (the oldest value). This is undefined if the buffer is e...
Definition: transwarp.h:1584
std::int64_t avg_idletime_us() const noexceptoverride
Returns the average idletime in microseconds (-1 if never set)
Definition: transwarp.h:2021
The task&#39;s functor accepts all parent futures.
Applies final bookkeeping to the task and collects the task.
Definition: transwarp.h:1065
void remove_executor_all() override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2819
void schedule(transwarp::executor &executor, bool reset) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2107
std::size_t minimum_size() const
Returns the minimum size of the pool.
Definition: transwarp.h:3205
std::result_of_t< decltype(&std::shared_future< T >::get)(std::shared_future< T >)> result_t
Returns the result type of a std::shared_future&lt;T&gt;
Definition: transwarp.h:411
void wait_for_all(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Waits for all parents to finish.
Definition: transwarp.h:722
task_type
The possible task types.
Definition: transwarp.h:62
void set_value() override
Assigns a value to this task. Scheduling will have no effect after a call to this. Calling reset() will reset this and re-enable scheduling.
Definition: transwarp.h:2606
std::vector< transwarp::edge > edges() override
Returns empty edges because a value task doesn&#39;t have parents.
Definition: transwarp.h:2973
std::int64_t avg_runtime_us() const noexceptoverride
Returns the average runtime in microseconds (-1 if never set)
Definition: transwarp.h:2039
void pop()
Removes the value at the front of the buffer (the oldest value)
Definition: transwarp.h:1589
void resize(std::size_t new_size)
Resizes the task pool to the given new size if possible.
Definition: transwarp.h:3227
event_type
The task events that can be subscribed to using the listener interface.
Definition: transwarp.h:181
Removes all listeners per event type from the given task.
Definition: transwarp.h:1261
void schedule_all_impl(bool reset_all, transwarp::executor *executor=nullptr)
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2430
std::shared_ptr< value_task > clone_cast() const
Clones this task and casts the result to a ptr to value_task.
Definition: transwarp.h:2767
The task&#39;s functor takes no arguments but waits for the first parent to finish.
Exception thrown when an invalid parameter was passed to a function.
Definition: transwarp.h:101
void check_listener(const std::shared_ptr< transwarp::listener > &listener) const
Check for non-null listener pointer.
Definition: transwarp.h:1906
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2248
std::int64_t avg_idletime_us() const noexceptoverride
Returns -1 as value tasks don&#39;t run.
Definition: transwarp.h:2795
void reset() override
No-op because a value task never runs.
Definition: transwarp.h:2921
value_task(T &&value)
A value task is defined by a given value. Note: Don&#39;t use this constructor directly, use transwarp::make_value_task.
Definition: transwarp.h:2737
std::shared_future< void > make_ready_future()
Returns a ready future.
Definition: transwarp.h:1404
void set_executor_all(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2813
std::shared_ptr< transwarp::executor > executor() const noexceptoverride
Value tasks don&#39;t have executors as they don&#39;t run.
Definition: transwarp.h:2785
constexpr transwarp::accept_any_type accept_any
The accept_any task tag.
Definition: transwarp.h:128
bool empty() const
Returns whether the buffer is empty.
Definition: transwarp.h:1608
void ensure_task_not_running() const
Checks if the task is currently running and throws transwarp::control_error if it is...
Definition: transwarp.h:1889
const transwarp::itask & parent() const noexcept
Returns the parent task.
Definition: transwarp.h:219
Cancels or resumes the given task.
Definition: transwarp.h:1114
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:2968
static Result work(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Definition: transwarp.h:929
void remove_executor() override
No-op because a value task never runs.
Definition: transwarp.h:2816
void apply_to_each(Functor &&f, Tuple &&t)
Applies the functor to each element in the tuple.
Definition: transwarp.h:667
void schedule(bool) override
No-op because a value task never runs.
Definition: transwarp.h:2862
void set_avg_waittime_us(std::int64_t waittime) noexceptoverride
Assigns the given waittime.
Definition: transwarp.h:2374
void schedule_all(bool) override
No-op because a value task never runs and doesn&#39;t have parents.
Definition: transwarp.h:2874
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1697
Pushes the given task into the vector of tasks.
Definition: transwarp.h:1186
auto then(TaskType_, Functor_ &&functor)
Creates a continuation to this task.
Definition: transwarp.h:2684
Executor for sequential execution. Runs functors sequentially on the same thread. ...
Definition: transwarp.h:1685
bool has_result() const noexceptoverride
Returns true because a value task always contains a result.
Definition: transwarp.h:2916
std::shared_future< ResultType > make_future_with_exception(std::exception_ptr exception)
Returns a ready future with the given exception as its state.
Definition: transwarp.h:1412
void remove_executor_all() override
Removes the executor from all tasks.
Definition: transwarp.h:2072
void set_id(std::size_t id) noexceptoverride
Assigns the given id.
Definition: transwarp.h:1913
bool is_ready() const override
Returns whether the task has finished processing. Should only be called if was_scheduled() is true...
Definition: transwarp.h:2170
An interface for the task class.
Definition: transwarp.h:248
std::shared_ptr< value_task > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:2750
void visit(const std::function< void(transwarp::itask &)> &visitor) override
Visits each task in a depth-first traversal.
Definition: transwarp.h:2436
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:2958