26 #include <type_traits>
27 #include <unordered_map>
32 #ifdef TRANSWARP_MINIMUM_TASK_SIZE
34 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
35 #define TRANSWARP_DISABLE_TASK_CUSTOM_DATA
38 #ifndef TRANSWARP_DISABLE_TASK_NAME
39 #define TRANSWARP_DISABLE_TASK_NAME
42 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
43 #define TRANSWARP_DISABLE_TASK_PRIORITY
46 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
47 #define TRANSWARP_DISABLE_TASK_REFCOUNT
50 #ifndef TRANSWARP_DISABLE_TASK_TIME
51 #define TRANSWARP_DISABLE_TASK_TIME
77 : std::runtime_error{message}
119 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
123 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
127 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
131 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
135 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
139 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
143 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
169 virtual std::string
name()
const = 0;
209 : parent_{&parent}, child_{&child}
214 edge& operator=(
const edge&) =
default;
248 class itask :
public std::enable_shared_from_this<itask> {
250 virtual ~
itask() =
default;
252 virtual void finalize() = 0;
253 virtual std::size_t id()
const noexcept = 0;
254 virtual std::size_t level()
const noexcept = 0;
256 virtual const std::optional<std::string>& name()
const noexcept = 0;
257 virtual std::shared_ptr<transwarp::executor>
executor()
const noexcept = 0;
258 virtual std::int64_t priority()
const noexcept = 0;
259 virtual const std::any& custom_data()
const noexcept = 0;
260 virtual bool canceled()
const noexcept = 0;
261 virtual std::int64_t avg_idletime_us()
const noexcept = 0;
262 virtual std::int64_t avg_waittime_us()
const noexcept = 0;
263 virtual std::int64_t avg_runtime_us()
const noexcept = 0;
264 virtual void set_executor(std::shared_ptr<transwarp::executor>
executor) = 0;
265 virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
266 virtual void remove_executor() = 0;
267 virtual void remove_executor_all() = 0;
268 virtual void set_priority(std::int64_t priority) = 0;
269 virtual void set_priority_all(std::int64_t priority) = 0;
270 virtual void reset_priority() = 0;
271 virtual void reset_priority_all() = 0;
272 virtual void set_custom_data(std::any custom_data) = 0;
273 virtual void set_custom_data_all(std::any custom_data) = 0;
274 virtual void remove_custom_data() = 0;
275 virtual void remove_custom_data_all() = 0;
276 virtual void add_listener(std::shared_ptr<transwarp::listener>
listener) = 0;
277 virtual void add_listener(
transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
278 virtual void add_listener_all(std::shared_ptr<transwarp::listener> listener) = 0;
279 virtual void add_listener_all(
transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
280 virtual void remove_listener(
const std::shared_ptr<transwarp::listener>& listener) = 0;
281 virtual void remove_listener(
transwarp::event_type event,
const std::shared_ptr<transwarp::listener>& listener) = 0;
282 virtual void remove_listener_all(
const std::shared_ptr<transwarp::listener>& listener) = 0;
283 virtual void remove_listener_all(
transwarp::event_type event,
const std::shared_ptr<transwarp::listener>& listener) = 0;
284 virtual void remove_listeners() = 0;
286 virtual void remove_listeners_all() = 0;
288 virtual void schedule() = 0;
290 virtual void schedule(
bool reset) = 0;
292 virtual void schedule_all() = 0;
294 virtual void schedule_all(
bool reset_all) = 0;
296 virtual void set_exception(std::exception_ptr exception) = 0;
297 virtual bool was_scheduled()
const noexcept = 0;
298 virtual void wait()
const = 0;
299 virtual bool is_ready()
const = 0;
300 virtual bool has_result()
const = 0;
301 virtual void reset() = 0;
302 virtual void reset_all() = 0;
303 virtual void cancel(
bool enabled) noexcept = 0;
304 virtual void cancel_all(
bool enabled) noexcept = 0;
305 virtual std::vector<itask*> parents()
const = 0;
306 virtual const std::vector<itask*>& tasks() = 0;
307 virtual std::vector<transwarp::edge> edges() = 0;
322 virtual void visit(
const std::function<
void(
itask&)>& visitor) = 0;
323 virtual void unvisit() noexcept = 0;
324 virtual void set_id(std::size_t
id) noexcept = 0;
325 virtual void set_level(std::size_t level) noexcept = 0;
327 virtual void set_name(std::optional<std::string> name) noexcept = 0;
328 virtual void set_avg_idletime_us(std::int64_t idletime) noexcept = 0;
329 virtual void set_avg_waittime_us(std::int64_t waittime) noexcept = 0;
330 virtual void set_avg_runtime_us(std::int64_t runtime) noexcept = 0;
331 virtual void increment_childcount() noexcept = 0;
332 virtual void decrement_refcount() = 0;
333 virtual void reset_future() = 0;
358 const std::optional<std::string>& name = task.name();
360 s += std::string{
"<"} + *name + std::string{
">"} + separator.data();
363 s += std::string{
" id="} + std::to_string(task.id());
364 s += std::string{
" lev="} + std::to_string(task.level());
365 const std::shared_ptr<transwarp::executor> exec = task.executor();
367 s += separator.data() + std::string{
"<"} + exec->name() + std::string{
">"};
369 const std::int64_t avg_idletime_us = task.avg_idletime_us();
370 if (avg_idletime_us >= 0) {
371 s += separator.data() + std::string{
"avg-idle-us="} + std::to_string(avg_idletime_us);
373 const std::int64_t avg_waittime_us = task.avg_waittime_us();
374 if (avg_waittime_us >= 0) {
375 s += separator.data() + std::string{
"avg-wait-us="} + std::to_string(avg_waittime_us);
377 const std::int64_t avg_runtime_us = task.avg_runtime_us();
378 if (avg_runtime_us >= 0) {
379 s += separator.data() + std::string{
"avg-run-us="} + std::to_string(avg_runtime_us);
394 std::string
to_string(
const std::vector<transwarp::edge>& edges, std::string_view separator=
"\n") {
395 std::string dot = std::string{
"digraph {"} + separator.data();
399 dot += std::string{
"}"};
406 using decay_t = std::remove_const_t<std::remove_reference_t<T>>;
411 using result_t = std::result_of_t<decltype(&std::shared_future<T>::get)(std::shared_future<T>)>;
418 template<
typename TaskType>
419 std::shared_ptr<TaskType>
clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<TaskType>& t) {
421 const auto task_cache_it = task_cache.find(original_task);
422 if (task_cache_it != task_cache.cend()) {
423 return std::static_pointer_cast<TaskType>(task_cache_it->second);
425 auto cloned_task = t->clone_impl(task_cache);
426 task_cache[original_task] = cloned_task;
435 template<
typename ResultType>
438 using result_type = ResultType;
440 virtual ~
task() =
default;
442 std::shared_ptr<task> clone()
const {
443 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
444 return clone_impl(task_cache);
449 virtual std::shared_future<result_type> future()
const noexcept = 0;
454 friend std::shared_ptr<T>
transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<T>& t);
456 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const = 0;
460 template<
typename ResultType>
463 using result_type = ResultType&;
465 virtual ~
task() =
default;
467 std::shared_ptr<task> clone()
const {
468 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
469 return clone_impl(task_cache);
473 virtual std::shared_future<result_type> future()
const noexcept = 0;
478 friend std::shared_ptr<T>
transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<T>& t);
480 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const = 0;
487 using result_type = void;
489 virtual ~
task() =
default;
491 std::shared_ptr<task> clone()
const {
492 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
493 return clone_impl(task_cache);
496 virtual void set_value() = 0;
497 virtual std::shared_future<result_type> future()
const noexcept = 0;
498 virtual result_type
get()
const = 0;
502 friend std::shared_ptr<T>
transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<T>& t);
504 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const = 0;
522 virtual ~functor() =
default;
528 return *transwarp_task_;
533 return *transwarp_task_;
539 if (transwarp_task_->canceled()) {
546 friend void transwarp::detail::assign_task_if(F&,
transwarp::itask&) noexcept;
561 std::function<
void(std::size_t thread_index)> on_thread_started =
nullptr)
562 : on_thread_started_{std::move(on_thread_started)}
564 if (n_threads == 0) {
567 for (std::size_t i = 0; i < n_threads; ++i) {
569 std::lock_guard<std::mutex> lock{mutex_};
570 ups_.push_back(
false);
574 thread = std::thread(&thread_pool::worker,
this, i);
580 threads_.push_back(std::move(thread));
589 std::lock_guard<std::mutex> lock{mutex_};
590 if (std::all_of(ups_.begin(), ups_.end(), [](
const bool x){
return x; })) {
594 std::this_thread::yield();
608 void push(
const std::function<
void()>& functor) {
610 std::lock_guard<std::mutex> lock{mutex_};
611 functors_.push(functor);
613 cond_var_.notify_one();
618 void worker(
const std::size_t index) {
619 if (on_thread_started_) {
620 on_thread_started_(index);
623 std::function<void()> functor;
625 std::unique_lock<std::mutex> lock{mutex_};
629 cond_var_.wait(lock, [
this]{
630 return done_ || !functors_.empty();
632 if (done_ && functors_.empty()) {
635 functor = functors_.front();
644 std::lock_guard<std::mutex> lock{mutex_};
647 cond_var_.notify_all();
648 for (std::thread& thread : threads_) {
656 std::function<void(std::size_t)> on_thread_started_;
657 std::vector<std::thread> threads_;
658 std::vector<bool> ups_;
659 std::queue<std::function<void()>> functors_;
660 std::condition_variable cond_var_;
666 template<
typename Functor,
typename Tuple>
668 std::apply([&f](
auto&&... arg){(..., f(std::forward<decltype(arg)>(arg)));}, std::forward<Tuple>(t));
672 template<
int offset,
typename... ParentResults>
674 static void work(
const std::tuple<std::shared_ptr<
transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
675 std::get<offset>(target) = std::get<offset>(source)->future();
680 template<
typename... ParentResults>
686 template<
typename... ParentResults>
688 std::tuple<std::shared_future<ParentResults>...> result;
694 template<
typename ParentResultType>
696 std::vector<std::shared_future<ParentResultType>> result;
697 result.reserve(input.size());
699 result.emplace_back(
task->future());
706 template<
typename Result,
typename Task,
typename... Args>
707 Result
run_task(std::size_t task_id,
const std::weak_ptr<Task>&
task, Args&&... args) {
708 const std::shared_ptr<Task> t = task.lock();
716 return (*t->functor_)(std::forward<Args>(args)...);
721 template<
typename... ParentResults>
728 template<
typename ParentResultType>
731 parent->future().wait();
736 template<
typename Parent>
737 Parent wait_for_any_impl() {
741 template<
typename Parent,
typename ParentResult,
typename... ParentResults>
743 const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
744 if (status == std::future_status::ready) {
747 return transwarp::detail::wait_for_any_impl<Parent>(parents...);
751 template<
typename Parent,
typename... ParentResults>
754 Parent parent = transwarp::detail::wait_for_any_impl<Parent>(
parents...);
763 template<
typename ParentResultType>
767 const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
768 if (status == std::future_status::ready) {
777 template<
typename OneResult,
typename... ParentResults>
779 auto callable = [&one](
const auto& parent) {
781 parent->cancel(
true);
789 template<
typename OneResult,
typename ParentResultType>
793 parent->cancel(
true);
802 task.decrement_refcount();
807 template<
typename... ParentResults>
816 template<
typename ParentResultType>
824 template<
typename TaskType,
bool done,
int total,
int... n>
826 template<
typename Result,
typename Task,
typename... ParentResults>
829 work<Result>(task_id, task,
parents);
833 template<
typename TaskType>
836 template<
int total,
int... n>
838 template<
typename Result,
typename Task,
typename... ParentResults>
840 return transwarp::detail::run_task<Result>(task_id, task);
846 template<
typename Result,
typename Task,
typename ParentResultType>
848 return transwarp::detail::run_task<Result>(task_id, task);
852 template<
int total,
int... n>
854 template<
typename Result,
typename Task,
typename... ParentResults>
859 return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures)...);
865 template<
typename Result,
typename Task,
typename ParentResultType>
870 return transwarp::detail::run_task<Result>(task_id, task, std::move(futures));
874 template<
int total,
int... n>
876 template<
typename Result,
typename Task,
typename... ParentResults>
878 using parent_t = std::remove_reference_t<decltype(std::get<0>(
parents))>;
879 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
881 auto future = parent->future();
883 return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
889 template<
typename Result,
typename Task,
typename ParentResultType>
893 auto future = parent->future();
895 return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
899 template<
int total,
int... n>
901 template<
typename Result,
typename Task,
typename... ParentResults>
906 return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures).
get()...);
912 template<
typename Result,
typename Task,
typename ParentResultType>
917 std::vector<ParentResultType> results;
918 results.reserve(futures.size());
919 for (
const std::shared_future<ParentResultType>& future : futures) {
920 results.emplace_back(future.get());
922 return transwarp::detail::run_task<Result>(task_id, task, std::move(results));
926 template<
int total,
int... n>
928 template<
typename Result,
typename Task,
typename... ParentResults>
930 using parent_t = std::remove_reference_t<decltype(std::get<0>(
parents))>;
931 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
933 const auto future = parent->future();
935 return transwarp::detail::run_task<Result>(task_id, task, future.get());
941 template<
typename Result,
typename Task,
typename ParentResultType>
945 const auto future = parent->future();
947 return transwarp::detail::run_task<Result>(task_id, task, future.get());
951 template<
int total,
int... n>
953 template<
typename Result,
typename Task,
typename... ParentResults>
959 return transwarp::detail::run_task<Result>(task_id, task);
965 template<
typename Result,
typename Task,
typename ParentResultType>
970 for (
const std::shared_future<ParentResultType>& future : futures) {
973 return transwarp::detail::run_task<Result>(task_id, task);
977 template<
int total,
int... n>
979 template<
typename Result,
typename Task,
typename... ParentResults>
981 using parent_t = std::remove_reference_t<decltype(std::get<0>(
parents))>;
982 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
984 const auto future = parent->future();
987 return transwarp::detail::run_task<Result>(task_id, task);
993 template<
typename Result,
typename Task,
typename ParentResultType>
997 const auto future = parent->future();
1000 return transwarp::detail::run_task<Result>(task_id, task);
1007 template<
typename TaskType,
typename Result,
typename Task,
typename... ParentResults>
1009 constexpr std::size_t n = std::tuple_size_v<std::tuple<std::shared_future<ParentResults>...>>;
1011 work<Result>(task_id, task,
parents);
1017 template<
typename TaskType,
typename Result,
typename Task,
typename ParentResultType>
1020 work<Result>(task_id, task,
parents);
1025 template<
typename Functor,
typename... ParentResults>
1027 auto callable = [&f](
const auto&
task) {
1037 template<
typename Functor,
typename ParentResultType>
1054 if (task_.level() <= task.level()) {
1056 task_.set_level(task.level() + 1);
1058 task.increment_childcount();
1066 explicit final_visitor(std::vector<transwarp::itask*>& tasks) noexcept
1070 tasks_.push_back(&task);
1074 std::vector<transwarp::itask*>& tasks_;
1075 std::size_t id_ = 0;
1080 explicit edges_visitor(std::vector<transwarp::edge>& edges) noexcept
1085 edges_.emplace_back(*parent, task);
1089 std::vector<transwarp::edge>& edges_;
1095 : reset_{reset}, executor_{executor} {}
1098 task.schedule_impl(reset_, executor_);
1116 : enabled_{enabled} {}
1119 task.cancel(enabled_);
1128 : executor_{std::move(executor)} {}
1131 task.set_executor(executor_);
1134 std::shared_ptr<transwarp::executor> executor_;
1141 task.remove_executor();
1148 : priority_{priority} {}
1151 task.set_priority(priority_);
1154 std::int64_t priority_;
1161 task.reset_priority();
1168 : custom_data_{std::move(custom_data)} {}
1171 task.set_custom_data(custom_data_);
1174 std::any custom_data_;
1181 task.remove_custom_data();
1191 tasks_.push_back(&task);
1194 std::vector<transwarp::itask*>& tasks_;
1200 : listener_{std::move(listener)}
1204 task.add_listener(listener_);
1207 std::shared_ptr<transwarp::listener> listener_;
1213 : event_{
event}, listener_{std::move(listener)}
1217 task.add_listener(event_, listener_);
1221 std::shared_ptr<transwarp::listener> listener_;
1227 : listener_{std::move(listener)}
1231 task.remove_listener(listener_);
1234 std::shared_ptr<transwarp::listener> listener_;
1240 : event_{
event}, listener_{std::move(listener)}
1244 task.remove_listener(event_, listener_);
1248 std::shared_ptr<transwarp::listener> listener_;
1255 task.remove_listeners();
1267 task.remove_listeners(event_);
1276 : visitor_{visitor} {}
1279 task.visit(visitor_);
1282 const std::function<void(transwarp::itask&)>& visitor_;
1294 template<
typename TaskType,
typename Functor,
typename... ParentResults>
1296 static_assert(std::is_same_v<TaskType, transwarp::root_type> ||
1297 std::is_same_v<TaskType, transwarp::accept_type> ||
1298 std::is_same_v<TaskType, transwarp::accept_any_type> ||
1299 std::is_same_v<TaskType, transwarp::consume_type> ||
1300 std::is_same_v<TaskType, transwarp::consume_any_type> ||
1301 std::is_same_v<TaskType, transwarp::wait_type> ||
1302 std::is_same_v<TaskType, transwarp::wait_any_type>,
1303 "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1306 template<
typename Functor,
typename... ParentResults>
1308 static_assert(
sizeof...(ParentResults) == 0,
"A root task cannot have parent tasks");
1309 using type = decltype(std::declval<Functor>()());
1312 template<
typename Functor,
typename... ParentResults>
1314 static_assert(
sizeof...(ParentResults) > 0,
"An accept task must have at least one parent");
1315 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1318 template<
typename Functor,
typename ParentResultType>
1320 using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1323 template<
typename Functor,
typename... ParentResults>
1325 static_assert(
sizeof...(ParentResults) > 0,
"An accept_any task must have at least one parent");
1326 using arg_t = std::tuple_element_t<0, std::tuple<ParentResults...>>;
1327 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1330 template<
typename Functor,
typename ParentResultType>
1332 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1335 template<
typename Functor,
typename... ParentResults>
1337 static_assert(
sizeof...(ParentResults) > 0,
"A consume task must have at least one parent");
1338 using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1341 template<
typename Functor,
typename ParentResultType>
1343 using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1346 template<
typename Functor,
typename... ParentResults>
1348 static_assert(
sizeof...(ParentResults) > 0,
"A consume_any task must have at least one parent");
1349 using arg_t = std::tuple_element_t<0, std::tuple<ParentResults...>>;
1350 using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1353 template<
typename Functor,
typename ParentResultType>
1355 using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1358 template<
typename Functor,
typename... ParentResults>
1360 static_assert(
sizeof...(ParentResults) > 0,
"A wait task must have at least one parent");
1361 using type = decltype(std::declval<Functor>()());
1364 template<
typename Functor,
typename ParentResultType>
1366 using type = decltype(std::declval<Functor>()());
1369 template<
typename Functor,
typename... ParentResults>
1371 static_assert(
sizeof...(ParentResults) > 0,
"A wait_any task must have at least one parent");
1372 using type = decltype(std::declval<Functor>()());
1375 template<
typename Functor,
typename ParentResultType>
1377 using type = decltype(std::declval<Functor>()());
1381 template<
typename TaskType,
typename Functor,
typename... ParentResults>
1386 template<
typename Functor>
1388 if constexpr (std::is_base_of_v<transwarp::functor, Functor>) {
1389 functor.transwarp_task_ = &
task;
1395 template<
typename ResultType,
typename Value>
1397 std::promise<ResultType> promise;
1398 promise.set_value(std::forward<Value>(value));
1399 return promise.get_future();
1405 std::promise<void> promise;
1406 promise.set_value();
1407 return promise.get_future();
1411 template<
typename ResultType>
1416 std::promise<ResultType> promise;
1417 promise.set_exception(exception);
1418 return promise.get_future();
1423 template<
typename... ParentResults>
1425 using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1426 static std::size_t size(
const type&) {
1427 return std::tuple_size_v<type>;
1429 static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const type& obj) {
1432 [&task_cache](
auto& t) {
1437 static std::vector<transwarp::itask*> tasks(
const type&
parents) {
1438 std::vector<transwarp::itask*> tasks;
1441 tasks.push_back(t.get());
1448 template<
typename ParentResultType>
1449 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1450 using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1451 static std::size_t size(
const type& obj) {
1454 static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const type& obj) {
1456 for (
auto& t : cloned) {
1461 static std::vector<transwarp::itask*> tasks(
const type&
parents) {
1462 std::vector<transwarp::itask*> tasks;
1463 for (
auto& t : parents) {
1464 tasks.push_back(t.get());
1471 template<
typename... ParentResults>
1475 template<
typename ResultType,
typename TaskType>
1479 template<
typename Task,
typename Parents>
1480 void call(std::size_t task_id,
1481 const std::weak_ptr<Task>&
task,
1483 promise_.set_value(transwarp::detail::call<TaskType, ResultType>(task_id, task, parents));
1486 std::promise<ResultType> promise_;
1489 template<
typename TaskType>
1493 template<
typename Task,
typename Parents>
1494 void call(std::size_t task_id,
1495 const std::weak_ptr<Task>&
task,
1497 transwarp::detail::call<TaskType, void>(task_id, task, parents);
1498 promise_.set_value();
1501 std::promise<void> promise_;
1505 template<
typename ResultType,
typename TaskType,
typename Task,
typename Parents>
1509 runner(std::size_t task_id,
1510 const std::weak_ptr<Task>&
task,
1512 : task_id_{task_id},
1517 std::future<ResultType> future() {
1518 return this->promise_.get_future();
1522 if (
const std::shared_ptr<Task> t = task_.lock()) {
1526 this->
call(task_id_, task_, parents_);
1528 this->promise_.set_exception(std::current_exception());
1529 if (
const std::shared_ptr<Task> t = task_.lock()) {
1533 this->promise_.set_exception(std::current_exception());
1535 if (
const std::shared_ptr<Task> t = task_.lock()) {
1541 const std::size_t task_id_;
1542 const std::weak_ptr<Task> task_;
1550 template<
typename ValueType>
1554 static_assert(std::is_default_constructible_v<ValueType>,
"ValueType must be default constructible");
1556 using value_type = ValueType;
1576 template<
typename T,
typename = std::enable_if_t<std::is_same_v<std::decay_t<T>, value_type>>>
1578 data_[end_] = std::forward<T>(value);
1585 return data_[front_];
1591 data_[front_] = ValueType{};
1598 return data_.size();
1614 return size_ == data_.size();
1619 std::swap(end_, buffer.end_);
1620 std::swap(front_, buffer.front_);
1621 std::swap(size_, buffer.size_);
1622 std::swap(data_, buffer.data_);
1627 void increment_or_wrap(std::size_t& value)
const {
1628 if (value == data_.size() - 1) {
1636 increment_or_wrap(end_);
1638 increment_or_wrap(front_);
1645 increment_or_wrap(front_);
1650 std::size_t front_{};
1651 std::size_t size_{};
1652 std::vector<value_type> data_;
1659 void lock() noexcept {
1660 while (locked_.test_and_set(std::memory_order_acquire));
1663 void unlock() noexcept {
1664 locked_.clear(std::memory_order_release);
1668 std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1677 void operator()()
const noexcept {}
1697 std::string
name()
const override {
1698 return "transwarp::sequential";
1712 explicit parallel(
const std::size_t n_threads,
1713 std::function<
void(std::size_t thread_index)> on_thread_started =
nullptr)
1714 : pool_{n_threads, std::move(on_thread_started)}
1724 std::string
name()
const override {
1725 return "transwarp::parallel";
1730 pool_.push(functor);
1741 const std::optional<std::string> nullopt_string;
1742 const std::any any_empty;
1745 template<
typename ResultType>
1749 using result_type = ResultType;
1752 std::size_t
id() const noexcept
override {
1757 const std::optional<std::string>&
name() const noexcept
override {
1758 #ifndef TRANSWARP_DISABLE_TASK_NAME
1761 return transwarp::detail::nullopt_string;
1767 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1776 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1777 return custom_data_;
1779 return transwarp::detail::any_empty;
1786 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1787 ensure_task_not_running();
1788 priority_ = priority;
1796 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1797 ensure_task_not_running();
1805 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1806 ensure_task_not_running();
1807 if (!custom_data.has_value()) {
1810 custom_data_ = std::move(custom_data);
1819 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1820 ensure_task_not_running();
1827 std::shared_future<result_type>
future() const noexcept
override {
1833 ensure_task_not_running();
1834 check_listener(listener);
1835 for (
int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
1842 ensure_task_not_running();
1843 check_listener(listener);
1844 listeners_[event].push_back(std::move(listener));
1849 ensure_task_not_running();
1850 check_listener(listener);
1851 for (
int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
1852 auto listeners_pair = listeners_.find(static_cast<transwarp::event_type>(i));
1853 if (listeners_pair != listeners_.end()) {
1854 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
1855 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1862 ensure_task_not_running();
1863 check_listener(listener);
1864 auto listeners_pair = listeners_.find(event);
1865 if (listeners_pair != listeners_.end()) {
1866 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
1867 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1873 ensure_task_not_running();
1879 ensure_task_not_running();
1880 auto listeners_pair = listeners_.find(event);
1881 if (listeners_pair != listeners_.end()) {
1882 listeners_pair->second.clear();
1890 if (future_.valid() && future_.wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
1897 auto listeners_pair = listeners_.find(event);
1898 if (listeners_pair != listeners_.end()) {
1899 for (
const std::shared_ptr<transwarp::listener>&
listener : listeners_pair->second) {
1900 listener->handle_event(event, *
this);
1913 void set_id(std::size_t
id) noexcept
override {
1918 void set_name(std::optional<std::string> name) noexcept
override {
1919 #ifndef TRANSWARP_DISABLE_TASK_NAME
1920 name_ = std::move(name);
1928 #ifndef TRANSWARP_DISABLE_TASK_NAME
1931 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1932 priority_ = task.priority_;
1934 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1935 custom_data_ = task.custom_data_;
1937 if (task.has_result()) {
1939 if constexpr (std::is_void_v<result_type>) {
1943 future_ = transwarp::detail::make_future_with_value<result_type>(task.future_.get());
1946 future_ = transwarp::detail::make_future_with_exception<result_type>(std::current_exception());
1949 visited_ = task.visited_;
1950 listeners_ = task.listeners_;
1953 std::size_t id_ = 0;
1954 #ifndef TRANSWARP_DISABLE_TASK_NAME
1955 std::optional<std::string> name_;
1957 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
1958 std::int64_t priority_ = 0;
1960 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
1961 std::any custom_data_;
1963 std::shared_future<result_type> future_;
1964 bool visited_ =
false;
1965 std::map<transwarp::event_type, std::vector<std::shared_ptr<transwarp::listener>>> listeners_;
1966 std::vector<transwarp::itask*> tasks_;
1972 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
1979 using result_type = ResultType;
1986 if (this->tasks_.empty()) {
1990 const std::size_t l_level = l->level();
1991 const std::size_t l_id = l->id();
1992 const std::size_t r_level = r->level();
1993 const std::size_t r_id = r->id();
1994 return std::tie(l_level, l_id) < std::tie(r_level, r_id);
1996 std::sort(this->tasks_.begin(), this->tasks_.end(), compare);
2001 std::size_t
level() const noexcept
override {
2011 std::shared_ptr<transwarp::executor>
executor() const noexcept
override {
2017 return canceled_.load();
2022 #ifndef TRANSWARP_DISABLE_TASK_TIME
2023 return avg_idletime_us_.load();
2031 #ifndef TRANSWARP_DISABLE_TASK_TIME
2032 return avg_waittime_us_.load();
2040 #ifndef TRANSWARP_DISABLE_TASK_TIME
2041 return avg_runtime_us_.load();
2050 this->ensure_task_not_running();
2054 executor_ = std::move(executor);
2060 this->ensure_task_not_running();
2067 this->ensure_task_not_running();
2073 this->ensure_task_not_running();
2082 this->ensure_task_not_running();
2083 this->schedule_impl(
true);
2091 this->ensure_task_not_running();
2092 this->schedule_impl(reset);
2099 this->ensure_task_not_running();
2100 this->schedule_impl(
true, &executor);
2108 this->ensure_task_not_running();
2109 this->schedule_impl(reset, &executor);
2116 this->ensure_task_not_running();
2117 schedule_all_impl(
true);
2124 this->ensure_task_not_running();
2125 schedule_all_impl(
true, &executor);
2133 this->ensure_task_not_running();
2134 schedule_all_impl(reset_all);
2142 this->ensure_task_not_running();
2143 schedule_all_impl(reset_all, &executor);
2149 this->ensure_task_not_running();
2150 this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2151 schedule_mode_ =
false;
2158 return this->future_.valid();
2164 ensure_task_was_scheduled();
2165 this->future_.wait();
2171 ensure_task_was_scheduled();
2172 return this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2177 return was_scheduled() && this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2182 this->ensure_task_not_running();
2183 this->future_ = std::shared_future<result_type>{};
2185 schedule_mode_ =
true;
2186 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2187 refcount_ = childcount_;
2194 this->ensure_task_not_running();
2202 void cancel(
bool enabled) noexcept
override {
2203 canceled_ = enabled;
2217 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2218 this->ensure_task_not_running();
2228 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2229 this->ensure_task_not_running();
2238 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2239 this->ensure_task_not_running();
2249 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2250 this->ensure_task_not_running();
2258 this->ensure_task_not_running();
2265 this->ensure_task_not_running();
2272 this->ensure_task_not_running();
2279 this->ensure_task_not_running();
2286 this->ensure_task_not_running();
2293 this->ensure_task_not_running();
2299 std::vector<transwarp::itask*>
parents()
const override {
2304 const std::vector<transwarp::itask*>&
tasks()
override {
2306 return this->tasks_;
2312 std::vector<transwarp::edge>
edges()
override {
2313 std::vector<transwarp::edge> edges;
2323 template<
typename F>
2325 : functor_{
new Functor{std::forward<F>(functor)}},
2326 parents_{std::move(
parents)...}
2331 template<
typename F,
typename P>
2332 task_impl_base(F&& functor, std::vector<std::shared_ptr<
transwarp::task<P>>> parents)
2333 : functor_{
new Functor{std::forward<F>(functor)}},
2334 parents_{std::move(parents)}
2337 if (parents_.empty()) {
2343 set_type(task_type::value);
2348 template<
typename R,
typename Y,
typename T,
typename P>
2351 template<
typename R,
typename T,
typename... A>
2366 #ifndef TRANSWARP_DISABLE_TASK_TIME
2367 avg_idletime_us_ = idletime;
2375 #ifndef TRANSWARP_DISABLE_TASK_TIME
2376 avg_waittime_us_ = waittime;
2384 #ifndef TRANSWARP_DISABLE_TASK_TIME
2385 avg_runtime_us_ = runtime;
2393 if (!this->future_.valid()) {
2403 if (schedule_mode_ && (reset || !this->future_.valid())) {
2407 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2408 refcount_ = childcount_;
2410 std::weak_ptr<task_impl_base>
self = std::static_pointer_cast<
task_impl_base>(this->shared_from_this());
2412 std::shared_ptr<runner_t>
runner = std::shared_ptr<runner_t>{
new runner_t{this->id(),
self, parents_}};
2414 this->future_ = runner->future();
2416 if (this->executor_) {
2417 this->executor_->execute([runner]{ (*runner)(); }, *
this);
2437 if (!this->visited_) {
2440 this->visited_ =
true;
2446 if (this->visited_) {
2447 this->visited_ =
false;
2453 template<
typename Visitor>
2461 void increment_childcount() noexcept
override {
2462 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2468 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2469 if (--refcount_ == 0) {
2475 void reset_future()
override {
2476 this->future_ = std::shared_future<result_type>{};
2480 std::size_t level_ = 0;
2482 std::shared_ptr<transwarp::executor> executor_;
2483 std::atomic<bool> canceled_{
false};
2484 bool schedule_mode_ =
true;
2485 #ifndef TRANSWARP_DISABLE_TASK_TIME
2486 std::atomic<std::int64_t> avg_idletime_us_{-1};
2487 std::atomic<std::int64_t> avg_waittime_us_{-1};
2488 std::atomic<std::int64_t> avg_runtime_us_{-1};
2490 std::unique_ptr<Functor> functor_;
2492 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2493 std::size_t childcount_ = 0;
2494 std::atomic<std::size_t> refcount_{0};
2500 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2507 using result_type = ResultType;
2512 this->ensure_task_not_running();
2513 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2514 this->schedule_mode_ =
false;
2521 this->ensure_task_not_running();
2522 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2523 this->schedule_mode_ =
false;
2531 this->ensure_task_was_scheduled();
2532 return this->future_.get();
2539 template<
typename F>
2544 template<
typename F,
typename P>
2545 task_impl_proxy(F&& functor, std::vector<std::shared_ptr<
transwarp::task<P>>> parents)
2546 : transwarp::detail::task_impl_base<result_type,
task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2552 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2559 using result_type = ResultType&;
2564 this->ensure_task_not_running();
2565 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2566 this->schedule_mode_ =
false;
2574 this->ensure_task_was_scheduled();
2575 return this->future_.get();
2582 template<
typename F>
2587 template<
typename F,
typename P>
2588 task_impl_proxy(F&& functor, std::vector<std::shared_ptr<
transwarp::task<P>>> parents)
2589 : transwarp::detail::task_impl_base<result_type,
task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2595 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2602 using result_type = void;
2607 this->ensure_task_not_running();
2609 this->schedule_mode_ =
false;
2616 void get()
const override {
2617 this->ensure_task_was_scheduled();
2618 this->future_.get();
2625 template<
typename F>
2630 template<
typename F,
typename P>
2631 task_impl_proxy(F&& functor, std::vector<std::shared_ptr<
transwarp::task<P>>> parents)
2632 : transwarp::detail::task_impl_base<result_type,
task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2643 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2654 template<
typename F>
2656 : transwarp::detail::task_impl_proxy<
result_type,
task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)...}
2661 template<
typename F,
typename P>
2663 : transwarp::detail::task_impl_proxy<result_type,
task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2667 task_impl(
const task_impl&) =
delete;
2668 task_impl& operator=(
const task_impl&) =
delete;
2669 task_impl(task_impl&&) =
delete;
2670 task_impl& operator=(task_impl&&) =
delete;
2673 std::shared_ptr<task_impl>
named(std::string name) {
2674 #ifndef TRANSWARP_DISABLE_TASK_NAME
2675 this->set_name(std::make_optional(std::move(name)));
2679 return std::static_pointer_cast<
task_impl>(this->shared_from_this());
2683 template<
typename TaskType_,
typename Functor_>
2684 auto then(TaskType_, Functor_&& functor) {
2686 return std::shared_ptr<task_t>{
new task_t{std::forward<Functor_>(functor), std::static_pointer_cast<task_impl>(this->shared_from_this())}};
2691 return std::static_pointer_cast<
task_impl>(this->clone());
2698 std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const override {
2699 auto t = std::shared_ptr<task_impl>{
new task_impl};
2700 t->copy_from(*
this);
2701 t->level_ = this->level_;
2702 t->type_ = this->type_;
2703 t->executor_ = this->executor_;
2704 t->canceled_ = this->canceled_.load();
2705 t->schedule_mode_ = this->schedule_mode_;
2706 #ifndef TRANSWARP_DISABLE_TASK_TIME
2707 t->avg_idletime_us_ = this->avg_idletime_us_.load();
2708 t->avg_waittime_us_ = this->avg_waittime_us_.load();
2709 t->avg_runtime_us_ = this->avg_runtime_us_.load();
2711 t->functor_ = std::unique_ptr<Functor>{
new Functor{*this->functor_}};
2713 t->executor_ = this->executor_;
2714 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2715 t->childcount_ = this->childcount_;
2725 template<
typename ResultType>
2732 using result_type = ResultType;
2736 template<
typename T>
2739 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value));
2740 this->tasks_ = {
this};
2750 std::shared_ptr<value_task>
named(std::string name) {
2751 #ifndef TRANSWARP_DISABLE_TASK_NAME
2752 this->set_name(std::make_optional(std::move(name)));
2756 return std::static_pointer_cast<
value_task>(this->shared_from_this());
2760 template<
typename TaskType_,
typename Functor_>
2761 auto then(TaskType_, Functor_&& functor) {
2763 return std::shared_ptr<task_t>{
new task_t{std::forward<Functor_>(functor), std::static_pointer_cast<value_task>(this->shared_from_this())}};
2768 return std::static_pointer_cast<
value_task>(this->clone());
2775 std::size_t
level() const noexcept
override {
2785 std::shared_ptr<transwarp::executor>
executor() const noexcept
override {
2824 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2825 this->set_priority(priority);
2833 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2834 this->reset_priority();
2841 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2842 this->set_custom_data(std::move(custom_data));
2850 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2851 this->remove_custom_data();
2881 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2887 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2893 this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2899 return this->future_.get();
2934 this->add_listener(listener);
2939 this->add_listener(event, listener);
2944 this->remove_listener(listener);
2949 this->remove_listener(event, listener);
2954 this->remove_listeners();
2959 this->remove_listeners(event);
2963 std::vector<transwarp::itask*>
parents()
const override {
2968 const std::vector<transwarp::itask*>&
tasks()
override {
2969 return this->tasks_;
2973 std::vector<transwarp::edge>
edges()
override {
2981 this->tasks_ = {
this};
2984 std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>&)
const override {
2985 auto t = std::shared_ptr<value_task>{
new value_task{}};
2986 t->copy_from(*
this);
2991 void set_level(std::size_t) noexcept
override {}
2997 void set_avg_idletime_us(std::int64_t) noexcept
override {}
3000 void set_avg_waittime_us(std::int64_t) noexcept
override {}
3003 void set_avg_runtime_us(std::int64_t) noexcept
override {}
3009 void visit(
const std::function<
void(
transwarp::itask&)>& visitor)
override {
3010 if (!this->visited_) {
3012 this->visited_ =
true;
3017 void unvisit() noexcept
override {
3018 this->visited_ =
false;
3021 void increment_childcount() noexcept
override {}
3025 void reset_future()
override {}
3031 template<
typename TaskType,
typename Functor,
typename... Parents>
3032 auto make_task(TaskType, Functor&& functor, std::shared_ptr<Parents>... parents) {
3034 return std::shared_ptr<task_t>{
new task_t{std::forward<Functor>(functor), std::move(parents)...}};
3039 template<
typename TaskType,
typename Functor,
typename ParentType>
3040 auto make_task(TaskType, Functor&& functor, std::vector<ParentType> parents) {
3042 return std::shared_ptr<task_t>{
new task_t{std::forward<Functor>(functor), std::move(parents)}};
3047 template<
typename Value>
3050 return std::shared_ptr<task_t>{
new task_t{std::forward<Value>(value)}};
3057 template<
typename InputIt,
typename UnaryOperation>
3058 auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) {
3059 const auto distance = std::distance(first, last);
3060 if (distance <= 0) {
3063 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3064 tasks.reserve(static_cast<std::size_t>(distance));
3065 for (; first != last; ++first) {
3077 template<
typename InputIt,
typename UnaryOperation>
3080 task->schedule_all(executor);
3088 template<
typename InputIt,
typename OutputIt,
typename UnaryOperation>
3089 auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) {
3090 const auto distance = std::distance(first1, last1);
3091 if (distance <= 0) {
3094 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3095 tasks.reserve(static_cast<std::size_t>(distance));
3096 for (; first1 != last1; ++first1, ++d_first) {
3108 template<
typename InputIt,
typename OutputIt,
typename UnaryOperation>
3111 task->schedule_all(executor);
3117 template<
typename ResultType>
3123 std::size_t minimum_size,
3124 std::size_t maximum_size)
3125 : task_{std::move(task)},
3126 minimum_{minimum_size},
3127 maximum_{maximum_size},
3128 finished_{maximum_size}
3133 if (minimum_ > maximum_) {
3137 for (std::size_t i=0; i<minimum_; ++i) {
3138 idle_.push(task_->clone());
3149 task_pool(
const task_pool&) =
delete;
3150 task_pool& operator=(
const task_pool&) =
delete;
3151 task_pool(task_pool&&) =
delete;
3152 task_pool& operator=(task_pool&&) =
delete;
3158 std::shared_ptr<transwarp::task<ResultType>>
next_task(
bool maybe_resize=
true) {
3161 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3162 if (!finished_.empty()) {
3163 finished_task = finished_.front(); finished_.pop();
3167 std::shared_ptr<transwarp::task<ResultType>> task;
3168 if (finished_task) {
3169 task = busy_.find(finished_task)->second;
3171 if (maybe_resize && idle_.empty()) {
3174 if (idle_.empty()) {
3177 task = idle_.front(); idle_.pop();
3178 busy_.emplace(task.get(), task);
3181 auto future = task->future();
3182 if (future.valid()) {
3192 std::shared_ptr<transwarp::task<ResultType>> g = next_task(maybe_resize);
3201 return idle_.size() + busy_.size();
3216 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3217 return idle_.size() + finished_.size();
3222 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3223 return busy_.size() - finished_.size();
3229 if (new_size > size()) {
3230 const std::size_t count = new_size - size();
3231 for (std::size_t i=0; i<count; ++i) {
3232 if (size() == maximum_) {
3235 idle_.push(task_->clone());
3237 }
else if (new_size < size()) {
3238 const std::size_t count = size() - new_size;
3239 for (std::size_t i=0; i<count; ++i) {
3240 if (idle_.empty() || size() == minimum_) {
3250 decltype(finished_) finished{finished_.capacity()};
3252 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3253 finished_.swap(finished);
3255 while (!finished.empty()) {
3257 const auto it = busy_.find(task);
3258 idle_.push(it->second);
3275 std::lock_guard<transwarp::detail::spinlock> lock{pool_.spinlock_};
3276 pool_.finished_.push(static_cast<const transwarp::itask*>(&task));
3280 task_pool<ResultType>& pool_;
3283 std::shared_ptr<transwarp::task<ResultType>> task_;
3284 std::size_t minimum_;
3285 std::size_t maximum_;
3288 std::queue<std::shared_ptr<transwarp::task<ResultType>>> idle_;
3289 std::unordered_map<const transwarp::itask*, std::shared_ptr<transwarp::task<ResultType>>> busy_;
3290 std::shared_ptr<transwarp::listener> listener_{
new finished_listener{*
this}};
3312 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3313 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3314 auto& track = tracks_[&task];
3315 track.startidle = now;
3319 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3320 track_idletime(task, now);
3321 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3322 auto& track = tracks_[&task];
3323 track.startwait = now;
3327 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3328 track_waittime(task, now);
3332 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3333 track_waittime(task, now);
3334 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3335 auto& track = tracks_[&task];
3336 track.running =
true;
3337 track.startrun = now;
3341 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3342 track_runtime(task, now);
3351 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3357 void track_idletime(
transwarp::itask& task,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3358 std::int64_t avg_idletime_us;
3360 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3361 auto& track = tracks_[&task];
3362 track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3364 avg_idletime_us =
static_cast<std::int64_t
>(track.idletime / track.idlecount);
3366 task.set_avg_idletime_us(avg_idletime_us);
3369 void track_waittime(
transwarp::itask& task,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3370 std::int64_t avg_waittime_us;
3372 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3373 auto& track = tracks_[&task];
3374 track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3376 avg_waittime_us =
static_cast<std::int64_t
>(track.waittime / track.waitcount);
3378 task.set_avg_waittime_us(avg_waittime_us);
3381 void track_runtime(
transwarp::itask& task,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3382 std::int64_t avg_runtime_us;
3384 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3385 auto& track = tracks_[&task];
3386 if (!track.running) {
3389 track.running =
false;
3390 track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3392 avg_runtime_us =
static_cast<std::int64_t
>(track.runtime / track.runcount);
3394 task.set_avg_runtime_us(avg_runtime_us);
3398 bool running =
false;
3399 std::chrono::time_point<std::chrono::steady_clock> startidle;
3400 std::chrono::time_point<std::chrono::steady_clock> startwait;
3401 std::chrono::time_point<std::chrono::steady_clock> startrun;
3402 std::chrono::microseconds::rep idletime = 0;
3403 std::chrono::microseconds::rep idlecount = 0;
3404 std::chrono::microseconds::rep waittime = 0;
3405 std::chrono::microseconds::rep waitcount = 0;
3406 std::chrono::microseconds::rep runtime = 0;
3407 std::chrono::microseconds::rep runcount = 0;
3411 std::unordered_map<const transwarp::itask*, track> tracks_;
3425 : executor_{std::move(executor)}
3429 releaser(
const releaser&) =
delete;
3430 releaser& operator=(
const releaser&) =
delete;
3431 releaser(releaser&&) =
delete;
3432 releaser& operator=(releaser&&) =
delete;
3437 executor_->execute([&task]{ task.reset_future(); }, task);
3439 task.reset_future();
3445 std::shared_ptr<transwarp::executor> executor_;
void set_avg_idletime_us(std::int64_t idletime) noexceptoverride
Assigns the given idletime.
Definition: transwarp.h:2365
constexpr transwarp::wait_any_type wait_any
The wait_any task tag.
Definition: transwarp.h:144
void assign_task_if(Functor &functor, transwarp::itask &task) noexcept
Assigns the task to the given functor if the functor is a subclass of transwarp::functor.
Definition: transwarp.h:1387
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2849
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:2943
The executor interface used to perform custom task execution.
Definition: transwarp.h:164
transwarp::task_type type() const noexceptoverride
The task's type.
Definition: transwarp.h:2006
std::remove_const_t< std::remove_reference_t< T >> decay_t
Removes reference and const from a type.
Definition: transwarp.h:406
Exception thrown when a task was destroyed prematurely.
Definition: transwarp.h:92
Adds a new listener to the given task.
Definition: transwarp.h:1198
A simple thread pool used to execute tasks in parallel.
Definition: transwarp.h:557
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:2938
std::size_t idle_count() const
Returns the number of idle tasks in the pool.
Definition: transwarp.h:3215
void cancel_all(bool) noexceptoverride
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2930
void reclaim()
Reclaims finished tasks by marking them as idle again.
Definition: transwarp.h:3249
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:2271
Removes a listener from the given task.
Definition: transwarp.h:1225
void set_value(const transwarp::decay_t< result_type > &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2511
TaskType task_type
The task type.
Definition: transwarp.h:2556
void set_type(transwarp::task_type type) noexceptoverride
Assigns the given type.
Definition: transwarp.h:2360
The consume type. Used for tag dispatch.
Definition: transwarp.h:131
void decrement_refcount(transwarp::itask &)
Decrements refcount.
Definition: transwarp.h:801
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1295
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority.
Definition: transwarp.h:2823
A task pool that allows running multiple instances of the same task in parallel.
Definition: transwarp.h:3118
void schedule_all(transwarp::executor &) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2871
void finalize() override
Can be called to explicitly finalize this task making this task the terminal task in the graph...
Definition: transwarp.h:1985
Result run_task(std::size_t task_id, const std::weak_ptr< Task > &task, Args &&...args)
Runs the task with the given arguments, hence, invoking the task's functor.
Definition: transwarp.h:707
Removes the executor from the given task.
Definition: transwarp.h:1138
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2227
auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op)
A function similar to std::transform but returning a transwarp task for deferred, possibly asynchrono...
Definition: transwarp.h:3089
virtual std::string name() const =0
Returns the name of the executor.
Just after a task was canceled (handle_event called on thread that task is run on) ...
TaskType task_type
The task type.
Definition: transwarp.h:2599
auto for_each(InputIt first, InputIt last, UnaryOperation unary_op)
A function similar to std::for_each but returning a transwarp task for deferred, possibly asynchronou...
Definition: transwarp.h:3058
A callable to run a task given its parents.
Definition: transwarp.h:1506
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task.
Definition: transwarp.h:2892
void schedule_all(bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:2132
Generates edges.
Definition: transwarp.h:1079
Adds a new listener per event type to the given task.
Definition: transwarp.h:1211
void ensure_task_was_scheduled() const
Checks if the task was scheduled and throws transwarp::control_error if it's not. ...
Definition: transwarp.h:2392
Sets level of a task and increments the child count.
Definition: transwarp.h:1049
void remove_listener(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types.
Definition: transwarp.h:1848
void reset()
Resets all timing information.
Definition: transwarp.h:3350
std::size_t busy_count() const
Returns the number of busy tasks in the pool.
Definition: transwarp.h:3221
std::size_t size() const
Returns the current total size of the pool (sum of idle and busy tasks)
Definition: transwarp.h:3200
void visit_all(Visitor &visitor)
Visits all tasks.
Definition: transwarp.h:2454
Definition: transwarp.h:1476
void set_executor(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs.
Definition: transwarp.h:2810
const std::optional< std::string > & name() const noexceptoverride
The optional task name.
Definition: transwarp.h:1757
void remove_listeners() override
Removes all listeners.
Definition: transwarp.h:1872
constexpr no_op_functor no_op
An object to use in places where a no-op functor is required.
Definition: transwarp.h:1681
void set_executor_all(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to all tasks which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:2059
bool was_scheduled() const noexceptoverride
Returns whether the task was scheduled and not reset afterwards. This means that the underlying futur...
Definition: transwarp.h:2157
void cancel(bool) noexceptoverride
No-op because a value task never runs.
Definition: transwarp.h:2927
Just before a task is scheduled (handle_event called on thread of caller to schedule()) ...
void schedule_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2868
void schedule_all(transwarp::executor &executor, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2141
std::shared_ptr< transwarp::executor > executor() const noexceptoverride
The task's executor (may be null)
Definition: transwarp.h:2011
Assigns a priority to the given task.
Definition: transwarp.h:1146
Result call(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Calls the functor of the given task with the results from the tuple of parents. Throws transwarp::tas...
Definition: transwarp.h:1008
std::shared_ptr< transwarp::task< ResultType > > wait_for_next_task(bool maybe_resize=true)
Just like next_task() but waits for a task to become available. The returned task will always be a va...
Definition: transwarp.h:3190
void push(T &&value)
Pushes a new value onto the end of the buffer. If that exceeds the capacity of the buffer then the ol...
Definition: transwarp.h:1577
void schedule(transwarp::executor &executor) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2098
bool canceled() const noexceptoverride
Value tasks cannot be canceled.
Definition: transwarp.h:2790
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:2948
Assigns an executor to the given task.
Definition: transwarp.h:1126
std::size_t capacity() const
Returns the capacity of the buffer.
Definition: transwarp.h:1597
void raise_event(transwarp::event_type event)
Raises the given event to all listeners.
Definition: transwarp.h:1896
A simple circular buffer (FIFO). ValueType must support default construction. The buffer lets you pus...
Definition: transwarp.h:1551
void finalize() override
Nothing to be done to finalize a value task.
Definition: transwarp.h:2772
void cancel_all(bool enabled) noexceptoverride
If enabled then all pending tasks in the graph are canceled which will throw transwarp::task_canceled...
Definition: transwarp.h:2209
std::tuple< std::shared_future< ParentResults >...> get_futures(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &input)
Returns the futures from the given tuple of tasks.
Definition: transwarp.h:687
auto make_task(TaskType, Functor &&functor, std::shared_ptr< Parents >...parents)
A factory function to create a new task.
Definition: transwarp.h:3032
const std::any & custom_data() const noexceptoverride
The custom task data (may not hold a value)
Definition: transwarp.h:1775
typename transwarp::detail::functor_result< TaskType, Functor, ParentResults...>::type functor_result_t
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1382
std::shared_future< result_type > future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:1827
transwarp::task_type type() const noexceptoverride
The task's type.
Definition: transwarp.h:2780
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:2933
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:2304
void schedule_all() override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:2115
void set_executor(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to this task which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:2049
void unvisit() noexceptoverride
Traverses through each task and marks them as not visited.
Definition: transwarp.h:2445
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:2278
void cancel_all_but_one(const std::shared_ptr< transwarp::task< OneResult >> &one, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Cancels all tasks but one.
Definition: transwarp.h:778
The task class.
Definition: transwarp.h:436
Definition: transwarp.h:1656
A functor not doing nothing.
Definition: transwarp.h:1676
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2832
std::size_t level() const noexceptoverride
The task's level.
Definition: transwarp.h:2001
void reset() override
Resets this task.
Definition: transwarp.h:2181
void set_name(std::optional< std::string > name) noexceptoverride
Assigns the given name.
Definition: transwarp.h:1918
Just before a task's functor is invoked (handle_event called on thread that task is run on) ...
const transwarp::itask & transwarp_task() const noexcept
The associated task (only to be called after the task was constructed)
Definition: transwarp.h:527
void handle_event(const transwarp::event_type event, transwarp::itask &task) override
Performs the actual timing and populates the task's timing members.
Definition: transwarp.h:3309
The task's functor accepts the first parent future that becomes ready.
The consume_any type. Used for tag dispatch.
Definition: transwarp.h:135
void set_priority(std::int64_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:1785
std::size_t level() const noexceptoverride
The task's level.
Definition: transwarp.h:2775
bool was_scheduled() const noexceptoverride
Returns true because a value task is scheduled once on construction.
Definition: transwarp.h:2903
Unvisits the given task.
Definition: transwarp.h:1286
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:2953
The accept type. Used for tag dispatch.
Definition: transwarp.h:123
std::int64_t avg_waittime_us() const noexceptoverride
Returns the average waittime in microseconds (-1 if never set)
Definition: transwarp.h:2030
std::vector< transwarp::itask * > parents() const override
Returns the task's parents (may be empty)
Definition: transwarp.h:2299
constexpr transwarp::accept_type accept
The accept task tag.
Definition: transwarp.h:124
std::size_t maximum_size() const
Returns the maximum size of the pool.
Definition: transwarp.h:3210
std::int64_t priority() const noexceptoverride
The task priority (defaults to 0)
Definition: transwarp.h:1766
TaskType task_type
The task type.
Definition: transwarp.h:1976
Common task functionality shared across task_impl and value_task
Definition: transwarp.h:1746
bool has_result() const noexceptoverride
Returns whether this task contains a result.
Definition: transwarp.h:2176
void set_custom_data(std::any custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1804
Removes all listeners from the given task.
Definition: transwarp.h:1252
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority (e.g. a custom executor)
Definition: transwarp.h:2216
The listener interface to listen to events raised by tasks.
Definition: transwarp.h:195
void add_listener(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only.
Definition: transwarp.h:1841
void schedule_impl(bool reset, transwarp::executor *executor=nullptr) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2402
void add_listener(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types.
Definition: transwarp.h:1832
Exception thrown when a task is canceled.
Definition: transwarp.h:83
TaskType task_type
The task type.
Definition: transwarp.h:2647
void wait() const override
Waits for the task to complete. Should only be called if was_scheduled() is true, throws transwarp::c...
Definition: transwarp.h:2163
void set_avg_runtime_us(std::int64_t runtime) noexceptoverride
Assigns the given runtime.
Definition: transwarp.h:2383
std::shared_future< ResultType > make_future_with_value(Value &&value)
Returns a ready future with the given value as its state.
Definition: transwarp.h:1396
void schedule() override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:2081
Just before a task starts running (handle_event called on thread that task is run on) ...
transwarp::itask & child() noexcept
Returns the child task.
Definition: transwarp.h:234
Removes custom data from the given task.
Definition: transwarp.h:1178
void schedule_all(transwarp::executor &, bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2877
Resets the given task.
Definition: transwarp.h:1106
auto then(TaskType_, Functor_ &&functor)
Creates a continuation to this task.
Definition: transwarp.h:2761
void set_custom_data_all(std::any custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2237
const transwarp::itask & child() const noexcept
Returns the child task.
Definition: transwarp.h:229
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:2292
bool full() const
Returns whether the buffer is full.
Definition: transwarp.h:1613
A base class for a user-defined functor that needs access to the associated task or a cancel point to...
Definition: transwarp.h:519
The releaser will release a task's future when the task's after_satisfied event was received which ha...
Definition: transwarp.h:3419
constexpr transwarp::consume_any_type consume_any
The consume_any task tag.
Definition: transwarp.h:136
Removes a listener per event type from the given task.
Definition: transwarp.h:1238
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:1795
std::int64_t avg_runtime_us() const noexceptoverride
Returns -1 as value tasks don't run.
Definition: transwarp.h:2805
std::int64_t avg_waittime_us() const noexceptoverride
Returns -1 as value tasks don't run.
Definition: transwarp.h:2800
Exception thrown when a task is used in unintended ways.
Definition: transwarp.h:110
Resets the priority of the given task.
Definition: transwarp.h:1158
bool canceled() const noexceptoverride
Returns whether the associated task is canceled.
Definition: transwarp.h:2016
void schedule() override
No-op because a value task never runs.
Definition: transwarp.h:2856
std::size_t size() const
Returns the number of populated values of the buffer. Its maximum value equals the capacity of the bu...
Definition: transwarp.h:1603
void set_value(transwarp::decay_t< result_type > &&value) override
Assigns a value to this task.
Definition: transwarp.h:2886
std::vector< transwarp::edge > edges() override
Returns all edges in the graph. This is mainly for visualizing the tasks and their interdependencies...
Definition: transwarp.h:2312
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:2257
void wait() const override
No-op because a value task never runs.
Definition: transwarp.h:2908
releaser(std::shared_ptr< transwarp::executor > executor)
The executor gives control over where a task's future is released.
Definition: transwarp.h:3424
void set_value(transwarp::decay_t< result_type > &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2563
A task proxy.
Definition: transwarp.h:2501
void transwarp_cancel_point() const
If the associated task is canceled then this will throw transwarp::task_canceled which will stop the ...
Definition: transwarp.h:538
The base task class that contains the functionality that can be used with all result types (void and ...
Definition: transwarp.h:1973
Definition: transwarp.h:673
std::shared_ptr< task_impl > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:2673
The task's functor consumes all parent results.
Visits the given task using the visitor given in the constructor.
Definition: transwarp.h:1274
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:2285
Just after a task has finished running (handle_event called on thread that task is run on) ...
transwarp::itask & transwarp_task() noexcept
The associated task (only to be called after the task was constructed)
Definition: transwarp.h:532
Base class for exceptions.
Definition: transwarp.h:74
typename transwarp::detail::parents< ParentResults...>::type parents_t
Determines the type of the parents.
Definition: transwarp.h:1472
constexpr transwarp::consume_type consume
The consume task tag.
Definition: transwarp.h:132
void schedule(transwarp::executor &, bool) override
No-op because a value task never runs.
Definition: transwarp.h:2865
A value task that stores a single value and doesn't require scheduling. Value tasks should be created...
Definition: transwarp.h:2726
void remove_listeners(transwarp::event_type event) override
Removes all listeners for the given event type.
Definition: transwarp.h:1878
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:2264
void set_value(transwarp::decay_t< result_type > &&value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2520
Determines the type of the parents.
Definition: transwarp.h:1424
void execute(const std::function< void()> &functor, transwarp::itask &) override
Runs the functor on the current thread.
Definition: transwarp.h:1702
Parent wait_for_any(const std::shared_ptr< transwarp::task< ParentResults >> &...parents)
Waits for the first parent to finish.
Definition: transwarp.h:752
task_impl(F &&functor, std::shared_ptr< transwarp::task< ParentResults >>...parents)
A task is defined by functor and parent tasks. Note: Don't use this constructor directly, use transwarp::make_task.
Definition: transwarp.h:2655
void reset_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2924
Assigns custom data to the given task.
Definition: transwarp.h:1166
The task's functor takes no arguments but waits for all parents to finish.
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task. Scheduling will have no effect after an exception has been set...
Definition: transwarp.h:2148
Definition: transwarp.h:834
std::shared_ptr< transwarp::task< ResultType > > next_task(bool maybe_resize=true)
Returns the next idle task. If there are no idle tasks then it will attempt to double the pool size...
Definition: transwarp.h:3158
virtual void handle_event(transwarp::event_type event, transwarp::itask &task)=0
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions.
void call_with_each(const Functor &f, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &t)
Calls the functor with every element in the tuple.
Definition: transwarp.h:1026
auto make_value_task(Value &&value)
A factory function to create a new value task.
Definition: transwarp.h:3048
std::size_t id() const noexceptoverride
The task's id.
Definition: transwarp.h:1752
transwarp::detail::functor_result_t< TaskType, Functor, ParentResults...> result_type
The result type of this task.
Definition: transwarp.h:2650
An edge between two tasks.
Definition: transwarp.h:206
void remove_listener(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only.
Definition: transwarp.h:1861
A timer that tracks the average idle, wait, and run time of each task it listens to.
Definition: transwarp.h:3298
void schedule(transwarp::executor &) override
No-op because a value task never runs.
Definition: transwarp.h:2859
void reset_all() override
Resets all tasks in the graph.
Definition: transwarp.h:2193
Just after custom data was assigned (handle_event called on thread that custom data was set on) ...
void handle_event(const transwarp::event_type event, transwarp::itask &task) override
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions.
Definition: transwarp.h:3434
circular_buffer(std::size_t capacity)
Constructs a circular buffer with a given fixed capacity.
Definition: transwarp.h:1560
Definition: transwarp.h:825
void execute(const std::function< void()> &functor, transwarp::itask &) override
Pushes the functor into the thread pool for asynchronous execution.
Definition: transwarp.h:1729
Executor for parallel execution. Uses a simple thread pool.
Definition: transwarp.h:1709
The wait_any type. Used for tag dispatch.
Definition: transwarp.h:143
void set_custom_data_all(std::any custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2840
task_pool(std::shared_ptr< transwarp::task< ResultType >> task)
Constructs a task pool with reasonable defaults for minimum and maximum.
Definition: transwarp.h:3144
void schedule(bool reset) override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:2090
std::string to_string(const transwarp::task_type &type)
String conversion for the task_type enumeration.
Definition: transwarp.h:339
void set_value(const transwarp::decay_t< result_type > &value) override
Assigns a value to this task.
Definition: transwarp.h:2880
void remove_executor() override
Removes the executor from this task.
Definition: transwarp.h:2066
Just after the task's future was changed (handle_event called on thread that changed the task's futur...
The accept_any type. Used for tag dispatch.
Definition: transwarp.h:127
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:1818
bool is_ready() const override
Returns true because a value task is always ready.
Definition: transwarp.h:2911
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1724
constexpr transwarp::wait_type wait
The wait task tag.
Definition: transwarp.h:140
The root type. Used for tag dispatch.
Definition: transwarp.h:119
The task's functor consumes the first parent result that becomes ready.
std::shared_ptr< TaskType > clone_task(std::unordered_map< std::shared_ptr< transwarp::itask >, std::shared_ptr< transwarp::itask >> &task_cache, const std::shared_ptr< TaskType > &t)
Clones a task.
Definition: transwarp.h:419
A task representing a piece of work given by functor and parent tasks. By connecting tasks a directed...
Definition: transwarp.h:2644
std::shared_ptr< task_impl > clone_cast() const
Clones this task and casts the result to a ptr to task_impl.
Definition: transwarp.h:2690
The wait type. Used for tag dispatch.
Definition: transwarp.h:139
task_pool(std::shared_ptr< transwarp::task< ResultType >> task, std::size_t minimum_size, std::size_t maximum_size)
Constructs a task pool.
Definition: transwarp.h:3122
Just after a task has satisfied all its children with results (handle_event called on thread where th...
transwarp::itask & parent() noexcept
Returns the parent task.
Definition: transwarp.h:224
void cancel(bool enabled) noexceptoverride
If enabled then this task is canceled which will throw transwarp::task_canceled when retrieving the t...
Definition: transwarp.h:2202
Schedules using the given executor.
Definition: transwarp.h:1093
void swap(circular_buffer &buffer)
Swaps this buffer with the given buffer.
Definition: transwarp.h:1618
constexpr transwarp::root_type root
The root task tag.
Definition: transwarp.h:120
virtual void execute(const std::function< void()> &functor, transwarp::itask &task)=0
Runs a task which is wrapped by the given functor. The functor only captures one shared pointer and c...
std::vector< transwarp::itask * > parents() const override
Empty because a value task doesn't have parents.
Definition: transwarp.h:2963
void schedule_all(transwarp::executor &executor) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2123
void set_level(std::size_t level) noexceptoverride
Assigns the given level.
Definition: transwarp.h:2355
const value_type & front() const
Returns the value at the front of the buffer (the oldest value). This is undefined if the buffer is e...
Definition: transwarp.h:1584
std::int64_t avg_idletime_us() const noexceptoverride
Returns the average idletime in microseconds (-1 if never set)
Definition: transwarp.h:2021
The task's functor accepts all parent futures.
Applies final bookkeeping to the task and collects the task.
Definition: transwarp.h:1065
void remove_executor_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2819
void schedule(transwarp::executor &executor, bool reset) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2107
std::size_t minimum_size() const
Returns the minimum size of the pool.
Definition: transwarp.h:3205
std::result_of_t< decltype(&std::shared_future< T >::get)(std::shared_future< T >)> result_t
Returns the result type of a std::shared_future<T>
Definition: transwarp.h:411
void wait_for_all(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Waits for all parents to finish.
Definition: transwarp.h:722
task_type
The possible task types.
Definition: transwarp.h:62
void set_value() override
Assigns a value to this task. Scheduling will have no effect after a call to this. Calling reset() will reset this and re-enable scheduling.
Definition: transwarp.h:2606
std::vector< transwarp::edge > edges() override
Returns empty edges because a value task doesn't have parents.
Definition: transwarp.h:2973
std::int64_t avg_runtime_us() const noexceptoverride
Returns the average runtime in microseconds (-1 if never set)
Definition: transwarp.h:2039
void pop()
Removes the value at the front of the buffer (the oldest value)
Definition: transwarp.h:1589
void resize(std::size_t new_size)
Resizes the task pool to the given new size if possible.
Definition: transwarp.h:3227
event_type
The task events that can be subscribed to using the listener interface.
Definition: transwarp.h:181
Removes all listeners per event type from the given task.
Definition: transwarp.h:1261
void schedule_all_impl(bool reset_all, transwarp::executor *executor=nullptr)
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2430
std::shared_ptr< value_task > clone_cast() const
Clones this task and casts the result to a ptr to value_task.
Definition: transwarp.h:2767
The task's functor takes no arguments but waits for the first parent to finish.
Exception thrown when an invalid parameter was passed to a function.
Definition: transwarp.h:101
void check_listener(const std::shared_ptr< transwarp::listener > &listener) const
Check for non-null listener pointer.
Definition: transwarp.h:1906
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2248
std::int64_t avg_idletime_us() const noexceptoverride
Returns -1 as value tasks don't run.
Definition: transwarp.h:2795
void reset() override
No-op because a value task never runs.
Definition: transwarp.h:2921
value_task(T &&value)
A value task is defined by a given value. Note: Don't use this constructor directly, use transwarp::make_value_task.
Definition: transwarp.h:2737
std::shared_future< void > make_ready_future()
Returns a ready future.
Definition: transwarp.h:1404
void set_executor_all(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2813
std::shared_ptr< transwarp::executor > executor() const noexceptoverride
Value tasks don't have executors as they don't run.
Definition: transwarp.h:2785
constexpr transwarp::accept_any_type accept_any
The accept_any task tag.
Definition: transwarp.h:128
bool empty() const
Returns whether the buffer is empty.
Definition: transwarp.h:1608
void ensure_task_not_running() const
Checks if the task is currently running and throws transwarp::control_error if it is...
Definition: transwarp.h:1889
const transwarp::itask & parent() const noexcept
Returns the parent task.
Definition: transwarp.h:219
Cancels or resumes the given task.
Definition: transwarp.h:1114
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:2968
static Result work(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Definition: transwarp.h:929
void remove_executor() override
No-op because a value task never runs.
Definition: transwarp.h:2816
void apply_to_each(Functor &&f, Tuple &&t)
Applies the functor to each element in the tuple.
Definition: transwarp.h:667
void schedule(bool) override
No-op because a value task never runs.
Definition: transwarp.h:2862
void set_avg_waittime_us(std::int64_t waittime) noexceptoverride
Assigns the given waittime.
Definition: transwarp.h:2374
void schedule_all(bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2874
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1697
Pushes the given task into the vector of tasks.
Definition: transwarp.h:1186
auto then(TaskType_, Functor_ &&functor)
Creates a continuation to this task.
Definition: transwarp.h:2684
Executor for sequential execution. Runs functors sequentially on the same thread. ...
Definition: transwarp.h:1685
bool has_result() const noexceptoverride
Returns true because a value task always contains a result.
Definition: transwarp.h:2916
std::shared_future< ResultType > make_future_with_exception(std::exception_ptr exception)
Returns a ready future with the given exception as its state.
Definition: transwarp.h:1412
void remove_executor_all() override
Removes the executor from all tasks.
Definition: transwarp.h:2072
void set_id(std::size_t id) noexceptoverride
Assigns the given id.
Definition: transwarp.h:1913
bool is_ready() const override
Returns whether the task has finished processing. Should only be called if was_scheduled() is true...
Definition: transwarp.h:2170
An interface for the task class.
Definition: transwarp.h:248
std::shared_ptr< value_task > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:2750
void visit(const std::function< void(transwarp::itask &)> &visitor) override
Visits each task in a depth-first traversal.
Definition: transwarp.h:2436
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:2958