25 #include <type_traits>
26 #include <unordered_map>
51 : std::runtime_error{message}
93 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
97 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
101 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
105 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
109 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
113 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
117 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
141 virtual std::string
name()
const = 0;
180 : parent_{parent}, child_{child}
185 edge& operator=(
const edge&) =
default;
210 virtual ~
itask() =
default;
212 virtual void finalize() = 0;
213 virtual std::size_t id()
const noexcept = 0;
214 virtual std::size_t level()
const noexcept = 0;
216 virtual const std::optional<std::string>& name()
const noexcept = 0;
217 virtual std::shared_ptr<transwarp::executor>
executor()
const noexcept = 0;
218 virtual std::int64_t priority()
const noexcept = 0;
219 virtual const std::any& custom_data()
const noexcept = 0;
220 virtual bool canceled()
const noexcept = 0;
221 virtual std::int64_t avg_idletime_us()
const noexcept = 0;
222 virtual std::int64_t avg_waittime_us()
const noexcept = 0;
223 virtual std::int64_t avg_runtime_us()
const noexcept = 0;
224 virtual void set_executor(std::shared_ptr<transwarp::executor>
executor) = 0;
225 virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
226 virtual void remove_executor() = 0;
227 virtual void remove_executor_all() = 0;
228 virtual void set_priority(std::int64_t priority) = 0;
229 virtual void set_priority_all(std::int64_t priority) = 0;
230 virtual void reset_priority() = 0;
231 virtual void reset_priority_all() = 0;
232 virtual void set_custom_data(std::any custom_data) = 0;
233 virtual void set_custom_data_all(std::any custom_data) = 0;
234 virtual void remove_custom_data() = 0;
235 virtual void remove_custom_data_all() = 0;
236 virtual void add_listener(std::shared_ptr<transwarp::listener>
listener) = 0;
237 virtual void add_listener(
transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
238 virtual void add_listener_all(std::shared_ptr<transwarp::listener> listener) = 0;
239 virtual void add_listener_all(
transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
240 virtual void remove_listener(
const std::shared_ptr<transwarp::listener>& listener) = 0;
241 virtual void remove_listener(
transwarp::event_type event,
const std::shared_ptr<transwarp::listener>& listener) = 0;
242 virtual void remove_listener_all(
const std::shared_ptr<transwarp::listener>& listener) = 0;
243 virtual void remove_listener_all(
transwarp::event_type event,
const std::shared_ptr<transwarp::listener>& listener) = 0;
244 virtual void remove_listeners() = 0;
246 virtual void remove_listeners_all() = 0;
248 virtual void schedule() = 0;
250 virtual void schedule(
bool reset) = 0;
252 virtual void schedule_all() = 0;
254 virtual void schedule_all(
bool reset_all) = 0;
256 virtual void set_exception(std::exception_ptr exception) = 0;
257 virtual bool was_scheduled()
const noexcept = 0;
258 virtual void wait()
const = 0;
259 virtual bool is_ready()
const = 0;
260 virtual bool has_result()
const = 0;
261 virtual void reset() = 0;
262 virtual void reset_all() = 0;
263 virtual void cancel(
bool enabled) noexcept = 0;
264 virtual void cancel_all(
bool enabled) noexcept = 0;
265 virtual std::vector<itask*> parents()
const = 0;
266 virtual const std::vector<itask*>& tasks() = 0;
267 virtual std::vector<transwarp::edge> edges() = 0;
280 virtual void visit(
const std::function<
void(
itask&)>& visitor) = 0;
281 virtual void unvisit() noexcept = 0;
282 virtual void set_id(std::size_t
id) noexcept = 0;
283 virtual void set_level(std::size_t level) noexcept = 0;
285 virtual void set_name(std::optional<std::string> name) noexcept = 0;
286 virtual void set_avg_idletime_us(std::int64_t idletime) noexcept = 0;
287 virtual void set_avg_waittime_us(std::int64_t waittime) noexcept = 0;
288 virtual void set_avg_runtime_us(std::int64_t runtime) noexcept = 0;
311 const std::optional<std::string>& name = task.name();
313 s += std::string{
"<"} + *name + std::string{
">"} + separator.data();
316 s += std::string{
" id="} + std::to_string(task.id());
317 s += std::string{
" lev="} + std::to_string(task.level());
318 const std::shared_ptr<transwarp::executor> exec = task.executor();
320 s += separator.data() + std::string{
"<"} + exec->name() + std::string{
">"};
322 const std::int64_t avg_idletime_us = task.avg_idletime_us();
323 if (avg_idletime_us >= 0) {
324 s += separator.data() + std::string{
"avg-idle-us="} + std::to_string(avg_idletime_us);
326 const std::int64_t avg_waittime_us = task.avg_waittime_us();
327 if (avg_waittime_us >= 0) {
328 s += separator.data() + std::string{
"avg-wait-us="} + std::to_string(avg_waittime_us);
330 const std::int64_t avg_runtime_us = task.avg_runtime_us();
331 if (avg_runtime_us >= 0) {
332 s += separator.data() + std::string{
"avg-run-us="} + std::to_string(avg_runtime_us);
345 inline std::string
to_string(
const std::vector<transwarp::edge>& edges, std::string_view separator=
"\n") {
346 std::string dot = std::string{
"digraph {"} + separator.data();
350 dot += std::string{
"}"};
357 using decay_t = std::remove_const_t<std::remove_reference_t<T>>;
362 using result_t = std::result_of_t<decltype(&std::shared_future<T>::get)(std::shared_future<T>)>;
369 template<
typename TaskType>
370 std::shared_ptr<TaskType>
clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<TaskType>& t) {
372 const auto task_cache_it = task_cache.find(original_task);
373 if (task_cache_it != task_cache.cend()) {
374 return std::dynamic_pointer_cast<TaskType>(task_cache_it->second);
376 auto cloned_task = t->clone_impl(task_cache);
377 task_cache[original_task] = cloned_task;
378 return std::move(cloned_task);
386 template<
typename ResultType>
389 using result_type = ResultType;
391 virtual ~
task() =
default;
393 std::shared_ptr<task> clone()
const {
394 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
395 return clone_impl(task_cache);
400 virtual const std::shared_future<result_type>& future()
const noexcept = 0;
405 friend std::shared_ptr<T>
detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<T>& t);
407 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const = 0;
411 template<
typename ResultType>
414 using result_type = ResultType&;
416 virtual ~
task() =
default;
418 std::shared_ptr<task> clone()
const {
419 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
420 return clone_impl(task_cache);
424 virtual const std::shared_future<result_type>& future()
const noexcept = 0;
429 friend std::shared_ptr<T>
detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<T>& t);
431 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const = 0;
438 using result_type = void;
440 virtual ~
task() =
default;
442 std::shared_ptr<task> clone()
const {
443 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
444 return clone_impl(task_cache);
447 virtual void set_value() = 0;
448 virtual const std::shared_future<result_type>& future()
const noexcept = 0;
449 virtual result_type
get()
const = 0;
453 friend std::shared_ptr<T>
detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<T>& t);
455 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const = 0;
473 virtual ~functor() =
default;
479 return *transwarp_task_;
485 if (transwarp_task_->canceled()) {
492 friend void transwarp::detail::assign_task_if(F&,
const transwarp::itask&) noexcept;
494 const transwarp::
itask* transwarp_task_{};
508 if (n_threads == 0) {
511 const std::size_t n_target = threads_.size() + n_threads;
512 while (threads_.size() < n_target) {
515 thread = std::thread(&thread_pool::worker,
this);
521 threads_.push_back(std::move(thread));
540 void push(
const std::function<
void()>&
functor) {
542 std::lock_guard<std::mutex> lock{mutex_};
545 cond_var_.notify_one();
554 std::unique_lock<std::mutex> lock{mutex_};
555 cond_var_.wait(lock, [
this]{
556 return done_ || !functors_.empty();
558 if (done_ && functors_.empty()) {
561 functor = functors_.front();
570 std::lock_guard<std::mutex> lock{mutex_};
573 cond_var_.notify_all();
574 for (std::thread& thread : threads_) {
581 std::vector<std::thread> threads_;
582 std::queue<std::function<void()>> functors_;
583 std::condition_variable cond_var_;
589 template<
typename Functor,
typename Tuple>
591 std::apply([&f](
auto&&... arg){(..., f(std::forward<decltype(arg)>(arg)));}, std::forward<Tuple>(t));
595 template<
int offset,
typename... ParentResults>
597 static void work(
const std::tuple<std::shared_ptr<
transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
598 std::get<offset>(target) = std::get<offset>(source)->future();
603 template<
typename... ParentResults>
609 template<
typename... ParentResults>
611 std::tuple<std::shared_future<ParentResults>...> result;
617 template<
typename ParentResultType>
619 std::vector<std::shared_future<ParentResultType>> result;
620 result.reserve(input.size());
622 result.emplace_back(
task->future());
629 template<
typename Result,
typename Task,
typename... Args>
630 Result
run_task(std::size_t task_id,
const std::weak_ptr<Task>&
task, Args&&... args) {
631 const std::shared_ptr<Task> t = task.lock();
639 return (*t->functor_)(std::forward<Args>(args)...);
644 template<
typename... ParentResults>
651 template<
typename ParentResultType>
654 parent->future().wait();
659 template<
typename Parent>
660 Parent wait_for_any_impl() {
664 template<
typename Parent,
typename ParentResult,
typename... ParentResults>
666 const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
667 if (status == std::future_status::ready) {
670 return transwarp::detail::wait_for_any_impl<Parent>(parents...);
674 template<
typename Parent,
typename... ParentResults>
677 Parent parent = transwarp::detail::wait_for_any_impl<Parent>(
parents...);
686 template<
typename ParentResultType>
690 const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
691 if (status == std::future_status::ready) {
700 template<
typename OneResult,
typename... ParentResults>
702 auto callable = [&one](
auto& parent) {
704 parent->cancel(
true);
712 template<
typename OneResult,
typename ParentResultType>
716 parent->cancel(
true);
722 template<
typename TaskType,
bool done,
int total,
int... n>
724 template<
typename Result,
typename Task,
typename... ParentResults>
727 work<Result>(task_id, task,
parents);
731 template<
typename TaskType>
734 template<
int total,
int... n>
736 template<
typename Result,
typename Task,
typename... ParentResults>
738 return transwarp::detail::run_task<Result>(task_id, task);
744 template<
typename Result,
typename Task,
typename ParentResultType>
746 return transwarp::detail::run_task<Result>(task_id, task);
750 template<
int total,
int... n>
752 template<
typename Result,
typename Task,
typename... ParentResults>
756 return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures)...);
762 template<
typename Result,
typename Task,
typename ParentResultType>
769 template<
int total,
int... n>
771 template<
typename Result,
typename Task,
typename... ParentResults>
773 using parent_t = std::remove_reference_t<decltype(std::get<0>(
parents))>;
774 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
776 return transwarp::detail::run_task<Result>(task_id, task, parent->future());
782 template<
typename Result,
typename Task,
typename ParentResultType>
786 return transwarp::detail::run_task<Result>(task_id, task, parent->future());
790 template<
int total,
int... n>
792 template<
typename Result,
typename Task,
typename... ParentResults>
795 return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(
parents)->future().get()...);
801 template<
typename Result,
typename Task,
typename ParentResultType>
804 std::vector<ParentResultType> results;
805 results.reserve(
parents.size());
807 results.emplace_back(parent->future().get());
809 return transwarp::detail::run_task<Result>(task_id, task, std::move(results));
813 template<
int total,
int... n>
815 template<
typename Result,
typename Task,
typename... ParentResults>
817 using parent_t = std::remove_reference_t<decltype(std::get<0>(
parents))>;
818 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
820 return transwarp::detail::run_task<Result>(task_id, task, parent->future().get());
826 template<
typename Result,
typename Task,
typename ParentResultType>
830 return transwarp::detail::run_task<Result>(task_id, task, parent->future().get());
834 template<
int total,
int... n>
836 template<
typename Result,
typename Task,
typename... ParentResults>
840 return transwarp::detail::run_task<Result>(task_id, task);
846 template<
typename Result,
typename Task,
typename ParentResultType>
850 parent->future().get();
852 return transwarp::detail::run_task<Result>(task_id, task);
856 template<
int total,
int... n>
858 template<
typename Result,
typename Task,
typename... ParentResults>
860 using parent_t = std::remove_reference_t<decltype(std::get<0>(
parents))>;
861 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
863 parent->future().get();
864 return transwarp::detail::run_task<Result>(task_id, task);
870 template<
typename Result,
typename Task,
typename ParentResultType>
874 parent->future().get();
875 return transwarp::detail::run_task<Result>(task_id, task);
882 template<
typename TaskType,
typename Result,
typename Task,
typename... ParentResults>
884 constexpr std::size_t n = std::tuple_size_v<std::tuple<std::shared_future<ParentResults>...>>;
886 work<Result>(task_id, task,
parents);
892 template<
typename TaskType,
typename Result,
typename Task,
typename ParentResultType>
895 work<Result>(task_id, task,
parents);
900 template<
typename Functor,
typename... ParentResults>
902 auto callable = [&f](
const auto&
task) {
912 template<
typename Functor,
typename ParentResultType>
929 if (task_.level() <= task.level()) {
931 task_.set_level(task.level() + 1);
940 explicit final_visitor(std::vector<transwarp::itask*>& tasks) noexcept
944 tasks_.push_back(&task);
948 std::vector<transwarp::itask*>& tasks_;
954 explicit edges_visitor(std::vector<transwarp::edge>& edges) noexcept
959 edges_.emplace_back(static_cast<const transwarp::itask&>(*parent), task);
963 std::vector<transwarp::edge>& edges_;
969 : reset_{reset}, executor_{executor} {}
972 task.schedule_impl(reset_, executor_);
990 : enabled_{enabled} {}
993 task.cancel(enabled_);
1002 : executor_{std::move(executor)} {}
1005 task.set_executor(executor_);
1008 std::shared_ptr<transwarp::executor> executor_;
1015 task.remove_executor();
1022 : priority_{priority} {}
1025 task.set_priority(priority_);
1028 std::int64_t priority_;
1035 task.reset_priority();
1042 : custom_data_{std::move(custom_data)} {}
1045 task.set_custom_data(custom_data_);
1048 std::any custom_data_;
1055 task.remove_custom_data();
1065 tasks_.push_back(&task);
1068 std::vector<transwarp::itask*>& tasks_;
1074 : listener_{std::move(listener)}
1078 task.add_listener(listener_);
1081 std::shared_ptr<transwarp::listener> listener_;
1087 : event_{
event}, listener_{std::move(listener)}
1091 task.add_listener(event_, listener_);
1095 std::shared_ptr<transwarp::listener> listener_;
1101 : listener_{std::move(listener)}
1105 task.remove_listener(listener_);
1108 std::shared_ptr<transwarp::listener> listener_;
1114 : event_{
event}, listener_{std::move(listener)}
1118 task.remove_listener(event_, listener_);
1122 std::shared_ptr<transwarp::listener> listener_;
1129 task.remove_listeners();
1141 task.remove_listeners(event_);
1150 : visitor_{visitor} {}
1153 task.visit(visitor_);
1156 const std::function<void(transwarp::itask&)>& visitor_;
1168 template<
typename TaskType,
typename Functor,
typename... ParentResults>
1170 static_assert(std::is_same_v<TaskType, transwarp::root_type> ||
1171 std::is_same_v<TaskType, transwarp::accept_type> ||
1172 std::is_same_v<TaskType, transwarp::accept_any_type> ||
1173 std::is_same_v<TaskType, transwarp::consume_type> ||
1174 std::is_same_v<TaskType, transwarp::consume_any_type> ||
1175 std::is_same_v<TaskType, transwarp::wait_type> ||
1176 std::is_same_v<TaskType, transwarp::wait_any_type>,
1177 "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1180 template<
typename Functor,
typename... ParentResults>
1182 static_assert(
sizeof...(ParentResults) == 0,
"A root task cannot have parent tasks");
1183 using type = decltype(std::declval<Functor>()());
1186 template<
typename Functor,
typename... ParentResults>
1188 static_assert(
sizeof...(ParentResults) > 0,
"An accept task must have at least one parent");
1189 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1192 template<
typename Functor,
typename ParentResultType>
1194 using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1197 template<
typename Functor,
typename... ParentResults>
1199 static_assert(
sizeof...(ParentResults) > 0,
"An accept_any task must have at least one parent");
1200 using arg_t = std::tuple_element_t<0, std::tuple<ParentResults...>>;
1201 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1204 template<
typename Functor,
typename ParentResultType>
1206 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1209 template<
typename Functor,
typename... ParentResults>
1211 static_assert(
sizeof...(ParentResults) > 0,
"A consume task must have at least one parent");
1212 using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1215 template<
typename Functor,
typename ParentResultType>
1217 using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1220 template<
typename Functor,
typename... ParentResults>
1222 static_assert(
sizeof...(ParentResults) > 0,
"A consume_any task must have at least one parent");
1223 using arg_t = std::tuple_element_t<0, std::tuple<ParentResults...>>;
1224 using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1227 template<
typename Functor,
typename ParentResultType>
1229 using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1232 template<
typename Functor,
typename... ParentResults>
1234 static_assert(
sizeof...(ParentResults) > 0,
"A wait task must have at least one parent");
1235 using type = decltype(std::declval<Functor>()());
1238 template<
typename Functor,
typename ParentResultType>
1240 using type = decltype(std::declval<Functor>()());
1243 template<
typename Functor,
typename... ParentResults>
1245 static_assert(
sizeof...(ParentResults) > 0,
"A wait_any task must have at least one parent");
1246 using type = decltype(std::declval<Functor>()());
1249 template<
typename Functor,
typename ParentResultType>
1251 using type = decltype(std::declval<Functor>()());
1255 template<
typename TaskType,
typename Functor,
typename... ParentResults>
1260 template<
typename Functor>
1262 if constexpr (std::is_base_of_v<transwarp::functor, Functor>) {
1269 template<
typename ResultType,
typename Value>
1271 std::promise<ResultType> promise;
1272 promise.set_value(std::forward<Value>(value));
1273 return promise.get_future();
1278 std::promise<void> promise;
1279 promise.set_value();
1280 return promise.get_future();
1284 template<
typename ResultType>
1289 std::promise<ResultType> promise;
1290 promise.set_exception(exception);
1291 return promise.get_future();
1296 template<
typename... ParentResults>
1298 using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1299 static std::size_t size(
const type&) {
1300 return std::tuple_size_v<type>;
1302 static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const type& obj) {
1305 [&task_cache](
auto& t) {
1310 static std::vector<transwarp::itask*> tasks(
const type&
parents) {
1311 std::vector<transwarp::itask*> tasks;
1314 tasks.push_back(t.get());
1321 template<
typename ParentResultType>
1322 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1323 using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1324 static std::size_t size(
const type& obj) {
1327 static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const type& obj) {
1329 for (
auto& t : cloned) {
1334 static std::vector<transwarp::itask*> tasks(
const type&
parents) {
1335 std::vector<transwarp::itask*> tasks;
1336 for (
auto& t : parents) {
1337 tasks.push_back(t.get());
1344 template<
typename... ParentResults>
1348 template<
typename ResultType,
typename TaskType>
1352 template<
typename Task,
typename Parents>
1353 void call(std::size_t task_id,
1354 const std::weak_ptr<Task>&
task,
1356 promise_.set_value(transwarp::detail::call<TaskType, ResultType>(task_id, task, parents));
1359 std::promise<ResultType> promise_;
1362 template<
typename TaskType>
1366 template<
typename Task,
typename Parents>
1367 void call(std::size_t task_id,
1368 const std::weak_ptr<Task>&
task,
1370 transwarp::detail::call<TaskType, void>(task_id, task, parents);
1371 promise_.set_value();
1374 std::promise<void> promise_;
1378 template<
typename ResultType,
typename TaskType,
typename Task,
typename Parents>
1382 runner(std::size_t task_id,
1383 const std::weak_ptr<Task>&
task,
1385 : task_id_{task_id},
1390 std::future<ResultType> future() {
1391 return this->promise_.get_future();
1395 if (
const std::shared_ptr<Task> t = task_.lock()) {
1399 this->
call(task_id_, task_, parents_);
1401 this->promise_.set_exception(std::current_exception());
1402 if (
const std::shared_ptr<Task> t = task_.lock()) {
1406 this->promise_.set_exception(std::current_exception());
1408 if (
const std::shared_ptr<Task> t = task_.lock()) {
1414 const std::size_t task_id_;
1415 const std::weak_ptr<Task> task_;
1423 template<
typename ValueType>
1427 static_assert(std::is_default_constructible_v<ValueType>,
"ValueType must be default constructible");
1429 using value_type = ValueType;
1449 template<
typename T,
typename = std::enable_if_t<std::is_same_v<std::decay_t<T>, value_type>>>
1451 data_[end_] = std::forward<T>(value);
1458 return data_[front_];
1464 data_[front_] = ValueType{};
1471 return data_.size();
1487 return size_ == data_.size();
1492 std::swap(end_, buffer.end_);
1493 std::swap(front_, buffer.front_);
1494 std::swap(size_, buffer.size_);
1495 std::swap(data_, buffer.data_);
1500 void increment_or_wrap(std::size_t& value)
const {
1501 if (value == data_.size() - 1) {
1509 increment_or_wrap(end_);
1511 increment_or_wrap(front_);
1518 increment_or_wrap(front_);
1523 std::size_t front_{};
1524 std::size_t size_{};
1525 std::vector<value_type> data_;
1532 void lock() noexcept {
1533 while (locked_.test_and_set(std::memory_order_acquire));
1536 void unlock() noexcept {
1537 locked_.clear(std::memory_order_release);
1541 std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1550 void operator()()
const noexcept {}
1570 std::string
name()
const override {
1571 return "transwarp::sequential";
1585 explicit parallel(std::size_t n_threads)
1596 std::string
name()
const override {
1597 return "transwarp::parallel";
1615 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
1617 public std::enable_shared_from_this<task_impl_base<ResultType, TaskType, Functor, ParentResults...>> {
1623 using result_type = ResultType;
1630 if (tasks_.empty()) {
1634 const std::size_t l_level = l->level();
1635 const std::size_t l_id = l->id();
1636 const std::size_t r_level = r->level();
1637 const std::size_t r_id = r->id();
1638 return std::tie(l_level, l_id) < std::tie(r_level, r_id);
1640 std::sort(tasks_.begin(), tasks_.end(), compare);
1645 std::size_t
id() const noexcept
override {
1650 std::size_t
level() const noexcept
override {
1660 const std::optional<std::string>&
name() const noexcept {
1665 std::shared_ptr<transwarp::executor>
executor() const noexcept
override {
1676 return custom_data_;
1681 return canceled_.load();
1686 return avg_idletime_us_.load();
1691 return avg_waittime_us_.load();
1696 return avg_runtime_us_.load();
1702 ensure_task_not_running();
1706 executor_ = std::move(executor);
1712 ensure_task_not_running();
1719 ensure_task_not_running();
1725 ensure_task_not_running();
1733 ensure_task_not_running();
1734 priority_ = priority;
1740 ensure_task_not_running();
1747 ensure_task_not_running();
1753 ensure_task_not_running();
1761 ensure_task_not_running();
1762 if (!custom_data.has_value()) {
1765 custom_data_ = std::move(custom_data);
1771 ensure_task_not_running();
1778 ensure_task_not_running();
1784 ensure_task_not_running();
1790 const std::shared_future<result_type>&
future() const noexcept
override {
1796 ensure_task_not_running();
1797 check_listener(listener);
1798 for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1799 l.push_back(listener);
1805 ensure_task_not_running();
1806 check_listener(listener);
1807 listeners_[event_index(event)].push_back(std::move(listener));
1812 ensure_task_not_running();
1819 ensure_task_not_running();
1826 ensure_task_not_running();
1827 check_listener(listener);
1828 for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1829 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1835 ensure_task_not_running();
1836 check_listener(listener);
1837 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_[event_index(event)];
1838 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1843 ensure_task_not_running();
1850 ensure_task_not_running();
1857 ensure_task_not_running();
1858 for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1865 ensure_task_not_running();
1866 listeners_[event_index(event)].clear();
1871 ensure_task_not_running();
1878 ensure_task_not_running();
1887 ensure_task_not_running();
1888 this->schedule_impl(
true);
1896 ensure_task_not_running();
1897 this->schedule_impl(reset);
1904 ensure_task_not_running();
1905 this->schedule_impl(
true, &executor);
1913 ensure_task_not_running();
1914 this->schedule_impl(reset, &executor);
1921 ensure_task_not_running();
1922 schedule_all_impl(
true);
1929 ensure_task_not_running();
1930 schedule_all_impl(
true, &executor);
1938 ensure_task_not_running();
1939 schedule_all_impl(reset_all);
1947 ensure_task_not_running();
1948 schedule_all_impl(reset_all, &executor);
1954 ensure_task_not_running();
1955 future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
1956 schedule_mode_ =
false;
1962 return future_.valid();
1968 ensure_task_was_scheduled();
1975 ensure_task_was_scheduled();
1976 return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
1981 return was_scheduled() && future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
1986 ensure_task_not_running();
1987 future_ = std::shared_future<result_type>{};
1989 schedule_mode_ =
true;
1994 ensure_task_not_running();
2002 void cancel(
bool enabled) noexcept
override {
2003 canceled_ = enabled;
2015 std::vector<transwarp::itask*>
parents()
const override {
2020 const std::vector<transwarp::itask*>&
tasks()
override {
2028 std::vector<transwarp::edge>
edges()
override {
2029 std::vector<transwarp::edge> edges;
2039 template<
typename F>
2041 : functor_{
new Functor{std::forward<F>(
functor)}},
2042 parents_{std::move(
parents)...}
2047 template<
typename F,
typename P>
2048 task_impl_base(F&& functor, std::vector<std::shared_ptr<
transwarp::task<P>>> parents)
2049 : functor_{
new Functor{std::forward<F>(functor)}},
2050 parents_{std::move(parents)}
2052 if (parents_.empty()) {
2059 set_type(task_type::value);
2066 if (future_.valid() && future_.wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
2073 if (!future_.valid()) {
2078 template<
typename R,
typename Y,
typename T,
typename P>
2081 template<
typename R,
typename T,
typename... A>
2085 void set_id(std::size_t
id) noexcept
override {
2100 void set_name(std::optional<std::string> name) noexcept
override {
2101 name_ = std::move(name);
2106 avg_idletime_us_ = idletime;
2111 avg_waittime_us_ = waittime;
2116 avg_runtime_us_ = runtime;
2124 if (schedule_mode_ && (reset || !future_.valid())) {
2128 std::weak_ptr<task_impl_base>
self = this->shared_from_this();
2130 std::shared_ptr<runner_t>
runner = std::shared_ptr<runner_t>{
new runner_t{this->id(),
self, parents_}};
2132 future_ = runner->future();
2133 if (this->executor_) {
2134 this->executor_->execute([runner]{ (*runner)(); }, *
this);
2170 template<
typename Visitor>
2180 const std::size_t index =
static_cast<std::size_t
>(event);
2181 if (index >= static_cast<std::size_t>(transwarp::event_type::count)) {
2189 for (
const std::shared_ptr<transwarp::listener>&
listener : listeners_[static_cast<std::size_t>(event)]) {
2201 std::size_t id_ = 0;
2202 std::size_t level_ = 0;
2204 std::optional<std::string> name_;
2205 std::shared_ptr<transwarp::executor> executor_;
2206 std::int64_t priority_ = 0;
2207 std::any custom_data_;
2208 std::atomic<bool> canceled_{
false};
2209 std::atomic<std::int64_t> avg_idletime_us_{-1};
2210 std::atomic<std::int64_t> avg_waittime_us_{-1};
2211 std::atomic<std::int64_t> avg_runtime_us_{-1};
2212 bool schedule_mode_ =
true;
2213 std::shared_future<result_type> future_;
2214 std::unique_ptr<Functor> functor_;
2216 bool visited_ =
false;
2217 std::array<std::vector<std::shared_ptr<transwarp::listener>>,
static_cast<std::size_t
>(transwarp::event_type::count)> listeners_;
2218 std::vector<transwarp::itask*> tasks_;
2223 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2230 using result_type = ResultType;
2235 this->ensure_task_not_running();
2236 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2237 this->schedule_mode_ =
false;
2243 this->ensure_task_not_running();
2244 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2245 this->schedule_mode_ =
false;
2252 this->ensure_task_was_scheduled();
2253 return this->future_.get();
2260 template<
typename F>
2265 template<
typename F,
typename P>
2266 task_impl_proxy(F&& functor, std::vector<std::shared_ptr<
transwarp::task<P>>> parents)
2267 : transwarp::detail::task_impl_base<result_type,
task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2273 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2280 using result_type = ResultType&;
2285 this->ensure_task_not_running();
2286 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2287 this->schedule_mode_ =
false;
2294 this->ensure_task_was_scheduled();
2295 return this->future_.get();
2302 template<
typename F>
2307 template<
typename F,
typename P>
2308 task_impl_proxy(F&& functor, std::vector<std::shared_ptr<
transwarp::task<P>>> parents)
2309 : transwarp::detail::task_impl_base<result_type,
task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2315 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2322 using result_type = void;
2327 this->ensure_task_not_running();
2329 this->schedule_mode_ =
false;
2335 void get()
const override {
2336 this->ensure_task_was_scheduled();
2337 this->future_.get();
2344 template<
typename F>
2349 template<
typename F,
typename P>
2350 task_impl_proxy(F&& functor, std::vector<std::shared_ptr<
transwarp::task<P>>> parents)
2351 : transwarp::detail::task_impl_base<result_type,
task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2362 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2373 template<
typename F>
2375 : transwarp::detail::task_impl_proxy<
result_type,
task_type, Functor, ParentResults...>{std::forward<F>(
functor), std::move(parents)...}
2380 template<
typename F,
typename P>
2382 : transwarp::detail::task_impl_proxy<result_type,
task_type, Functor, ParentResults...>{std::forward<F>(functor), std::move(parents)}
2386 task_impl(
const task_impl&) =
delete;
2387 task_impl& operator=(
const task_impl&) =
delete;
2388 task_impl(task_impl&&) =
delete;
2389 task_impl& operator=(task_impl&&) =
delete;
2392 std::shared_ptr<task_impl>
named(std::string name) {
2393 this->set_name(std::make_optional(std::move(name)));
2394 return std::dynamic_pointer_cast<
task_impl>(this->shared_from_this());
2398 template<
typename TaskType_,
typename Functor_>
2401 return std::shared_ptr<task_t>{
new task_t{std::forward<Functor_>(
functor), this->shared_from_this()}};
2406 return std::dynamic_pointer_cast<
task_impl>(this->clone());
2413 std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const override {
2414 auto t = std::shared_ptr<task_impl>{
new task_impl};
2416 t->level_ = this->level_;
2417 t->type_ = this->type_;
2418 t->name_ = this->name_;
2419 t->executor_ = this->executor_;
2420 t->priority_ = this->priority_;
2421 t->custom_data_ = this->custom_data_;
2422 t->canceled_ = this->canceled_.load();
2423 t->avg_idletime_us_ = this->avg_idletime_us_.load();
2424 t->avg_waittime_us_ = this->avg_waittime_us_.load();
2425 t->avg_runtime_us_ = this->avg_runtime_us_.load();
2426 t->schedule_mode_ = this->schedule_mode_;
2427 if (this->has_result()) {
2429 if constexpr (std::is_void_v<result_type>) {
2430 this->future_.get();
2433 t->future_ = transwarp::detail::make_future_with_value<result_type>(this->future_.get());
2436 t->future_ = transwarp::detail::make_future_with_exception<result_type>(std::current_exception());
2439 t->functor_ = std::unique_ptr<Functor>{
new Functor{*this->functor_}};
2441 t->visited_ = this->visited_;
2442 t->executor_ = this->executor_;
2443 t->listeners_ = this->listeners_;
2452 template<
typename ResultType>
2454 public std::enable_shared_from_this<value_task<ResultType>> {
2464 template<
typename T>
2466 : future_{transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value))}
2470 value_task(
const value_task&) =
delete;
2471 value_task& operator=(
const value_task&) =
delete;
2472 value_task(value_task&&) =
delete;
2473 value_task& operator=(value_task&&) =
delete;
2476 std::shared_ptr<value_task>
named(std::string name) {
2477 set_name(std::make_optional(std::move(name)));
2478 return this->shared_from_this();
2482 template<
typename TaskType_,
typename Functor_>
2485 return std::shared_ptr<task_t>{
new task_t{std::forward<Functor_>(
functor), this->shared_from_this()}};
2490 return std::dynamic_pointer_cast<
value_task>(this->clone());
2497 std::size_t
id() const noexcept
override {
2502 std::size_t
level() const noexcept
override {
2512 const std::optional<std::string>&
name() const noexcept {
2517 std::shared_ptr<transwarp::executor>
executor() const noexcept
override {
2528 return custom_data_;
2566 priority_ = priority;
2572 set_priority(priority);
2588 if (!custom_data.has_value()) {
2591 custom_data_ = std::move(custom_data);
2597 set_custom_data(std::move(custom_data));
2607 remove_custom_data();
2611 const std::shared_future<result_type>&
future() const noexcept
override {
2677 future_ = transwarp::detail::make_future_with_value<result_type>(value);
2682 future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2687 future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2710 return future_.get();
2726 std::vector<transwarp::itask*>
parents()
const override {
2731 const std::vector<transwarp::itask*>&
tasks()
override {
2736 std::vector<transwarp::edge>
edges()
override {
2744 std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>&)
const override {
2745 auto t = std::shared_ptr<value_task>{
new value_task{}};
2748 t->priority_ = priority_;
2749 t->custom_data_ = custom_data_;
2751 t->set_value(future_.get());
2753 t->set_exception(std::current_exception());
2755 t->visited_ = visited_;
2760 void set_id(std::size_t
id) noexcept
override {
2765 void set_level(std::size_t) noexcept
override {}
2771 void set_name(std::optional<std::string> name) noexcept
override {
2772 name_ = std::move(name);
2776 void set_avg_idletime_us(std::int64_t) noexcept
override {}
2779 void set_avg_waittime_us(std::int64_t) noexcept
override {}
2782 void set_avg_runtime_us(std::int64_t) noexcept
override {}
2788 void visit(
const std::function<
void(
transwarp::itask&)>& visitor)
override {
2796 void unvisit() noexcept
override {
2800 std::size_t id_ = 0;
2801 std::optional<std::string> name_;
2802 std::size_t priority_ = 0;
2803 std::any custom_data_;
2804 std::shared_future<result_type> future_;
2805 bool visited_ =
false;
2806 std::vector<transwarp::itask*> tasks_{
this};
2811 template<
typename TaskType,
typename Functor,
typename... Parents>
2814 return std::shared_ptr<task_t>{
new task_t{std::forward<Functor>(
functor), std::move(parents)...}};
2819 template<
typename TaskType,
typename Functor,
typename ParentType>
2822 return std::shared_ptr<task_t>{
new task_t{std::forward<Functor>(
functor), std::move(parents)}};
2827 template<
typename Value>
2830 return std::shared_ptr<task_t>{
new task_t{std::forward<Value>(value)}};
2837 template<
typename InputIt,
typename UnaryOperation>
2838 auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) {
2839 const auto distance = std::distance(first, last);
2840 if (distance <= 0) {
2843 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
2844 tasks.reserve(static_cast<std::size_t>(distance));
2845 for (; first != last; ++first) {
2857 template<
typename InputIt,
typename UnaryOperation>
2860 task->schedule_all(executor);
2868 template<
typename InputIt,
typename OutputIt,
typename UnaryOperation>
2869 auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) {
2870 const auto distance = std::distance(first1, last1);
2871 if (distance <= 0) {
2874 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
2875 tasks.reserve(static_cast<std::size_t>(distance));
2876 for (; first1 != last1; ++first1, ++d_first) {
2888 template<
typename InputIt,
typename OutputIt,
typename UnaryOperation>
2891 task->schedule_all(executor);
2897 template<
typename ResultType>
2903 std::size_t minimum_size,
2904 std::size_t maximum_size)
2905 : task_{std::move(
task)},
2906 minimum_{minimum_size},
2907 maximum_{maximum_size},
2908 finished_{maximum_size}
2913 if (minimum_ > maximum_) {
2917 for (std::size_t i=0; i<minimum_; ++i) {
2918 idle_.push(task_->clone());
2929 task_pool(
const task_pool&) =
delete;
2930 task_pool& operator=(
const task_pool&) =
delete;
2931 task_pool(task_pool&&) =
delete;
2932 task_pool& operator=(task_pool&&) =
delete;
2938 std::shared_ptr<transwarp::task<ResultType>>
next_task(
bool maybe_resize=
true) {
2941 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
2942 if (!finished_.empty()) {
2943 finished_task = finished_.front(); finished_.pop();
2947 std::shared_ptr<transwarp::task<ResultType>>
task;
2948 if (finished_task) {
2949 task = busy_.find(finished_task)->second;
2951 if (maybe_resize && idle_.empty()) {
2954 if (idle_.empty()) {
2957 task = idle_.front(); idle_.pop();
2958 busy_.emplace(task.get(), task);
2961 const auto& future = task->future();
2962 if (future.valid()) {
2972 std::shared_ptr<transwarp::task<ResultType>> g = next_task(maybe_resize);
2981 return idle_.size() + busy_.size();
2996 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
2997 return idle_.size() + finished_.size();
3002 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3003 return busy_.size() - finished_.size();
3009 if (new_size > size()) {
3010 const std::size_t count = new_size - size();
3011 for (std::size_t i=0; i<count; ++i) {
3012 if (size() == maximum_) {
3015 idle_.push(task_->clone());
3017 }
else if (new_size < size()) {
3018 const std::size_t count = size() - new_size;
3019 for (std::size_t i=0; i<count; ++i) {
3020 if (idle_.empty() || size() == minimum_) {
3030 decltype(finished_) finished{finished_.capacity()};
3032 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3033 finished_.swap(finished);
3035 while (!finished.empty()) {
3037 const auto it = busy_.find(task);
3038 idle_.push(it->second);
3055 std::lock_guard<transwarp::detail::spinlock> lock{pool_.spinlock_};
3056 pool_.finished_.push(&task);
3060 task_pool<ResultType>& pool_;
3063 std::shared_ptr<transwarp::task<ResultType>> task_;
3064 std::size_t minimum_;
3065 std::size_t maximum_;
3068 std::queue<std::shared_ptr<transwarp::task<ResultType>>> idle_;
3069 std::unordered_map<const transwarp::itask*, std::shared_ptr<transwarp::task<ResultType>>> busy_;
3070 std::shared_ptr<transwarp::listener> listener_{
new finished_listener{*
this}};
3092 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3093 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3094 auto& track = tracks_[&task];
3095 track.startidle = now;
3099 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3100 track_idletime(task, now);
3101 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3102 auto& track = tracks_[&task];
3103 track.startwait = now;
3107 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3108 track_waittime(task, now);
3112 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3113 track_waittime(task, now);
3114 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3115 auto& track = tracks_[&task];
3116 track.running =
true;
3117 track.startrun = now;
3121 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3122 track_runtime(task, now);
3131 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3137 void track_idletime(
const transwarp::itask&
task,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3138 std::int64_t avg_idletime_us;
3140 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3141 auto& track = tracks_[&task];
3142 track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3144 avg_idletime_us =
static_cast<std::int64_t
>(track.idletime / track.idlecount);
3149 void track_waittime(
const transwarp::itask& task,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3150 std::int64_t avg_waittime_us;
3152 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3153 auto& track = tracks_[&task];
3154 track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3156 avg_waittime_us =
static_cast<std::int64_t
>(track.waittime / track.waitcount);
3161 void track_runtime(
const transwarp::itask& task,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3162 std::int64_t avg_runtime_us;
3164 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3165 auto& track = tracks_[&task];
3166 if (!track.running) {
3169 track.running =
false;
3170 track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3172 avg_runtime_us =
static_cast<std::int64_t
>(track.runtime / track.runcount);
3178 bool running =
false;
3179 std::chrono::time_point<std::chrono::steady_clock> startidle;
3180 std::chrono::time_point<std::chrono::steady_clock> startwait;
3181 std::chrono::time_point<std::chrono::steady_clock> startrun;
3182 std::chrono::microseconds::rep idletime = 0;
3183 std::chrono::microseconds::rep idlecount = 0;
3184 std::chrono::microseconds::rep waittime = 0;
3185 std::chrono::microseconds::rep waitcount = 0;
3186 std::chrono::microseconds::rep runtime = 0;
3187 std::chrono::microseconds::rep runcount = 0;
3191 std::unordered_map<const transwarp::itask*, track> tracks_;
void set_avg_idletime_us(std::int64_t idletime) noexceptoverride
Assigns the given idletime.
Definition: transwarp.h:2105
constexpr transwarp::wait_any_type wait_any
The wait_any task tag.
Definition: transwarp.h:118
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2606
The executor interface used to perform custom task execution.
Definition: transwarp.h:136
transwarp::task_type type() const noexceptoverride
The task's type.
Definition: transwarp.h:1655
std::remove_const_t< std::remove_reference_t< T >> decay_t
Removes reference and const from a type.
Definition: transwarp.h:357
Exception thrown when a task was destroyed prematurely.
Definition: transwarp.h:66
Adds a new listener to the given task.
Definition: transwarp.h:1072
A simple thread pool used to execute tasks in parallel.
Definition: transwarp.h:503
void check_listener(const std::shared_ptr< transwarp::listener > &listener) const
Check for non-null listener pointer.
Definition: transwarp.h:2195
std::size_t idle_count() const
Returns the number of idle tasks in the pool.
Definition: transwarp.h:2995
void cancel_all(bool) noexceptoverride
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2723
void reclaim()
Reclaims finished tasks by marking them as idle again.
Definition: transwarp.h:3029
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:1842
void operator()(const transwarp::itask &task) const
Definition: transwarp.h:928
std::int64_t priority() const noexceptoverride
Returns the task priority.
Definition: transwarp.h:2522
Removes a listener from the given task.
Definition: transwarp.h:1099
void set_value(const transwarp::decay_t< result_type > &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2234
TaskType task_type
The task type.
Definition: transwarp.h:2277
void remove_listeners(transwarp::event_type) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2643
void set_type(transwarp::task_type type) noexceptoverride
Assigns the given type.
Definition: transwarp.h:2095
The consume type. Used for tag dispatch.
Definition: transwarp.h:105
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1169
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority.
Definition: transwarp.h:2571
A task pool that allows running multiple instances of the same task in parallel.
Definition: transwarp.h:2898
std::size_t id() const noexceptoverride
The task's id.
Definition: transwarp.h:1645
void schedule_all(transwarp::executor &) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2667
void finalize() override
Can be called to explicitly finalize this task making this task the terminal task in the graph...
Definition: transwarp.h:1629
Result run_task(std::size_t task_id, const std::weak_ptr< Task > &task, Args &&...args)
Runs the task with the given arguments, hence, invoking the task's functor.
Definition: transwarp.h:630
Removes the executor from the given task.
Definition: transwarp.h:1012
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:1752
const std::any & custom_data() const noexceptoverride
The custom task data (may not hold a value)
Definition: transwarp.h:1675
auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op)
A function similar to std::transform but returning a transwarp task for deferred, possibly asynchrono...
Definition: transwarp.h:2869
virtual std::string name() const =0
Returns the name of the executor.
void raise_event(transwarp::event_type event) const
Raises the given event to all listeners.
Definition: transwarp.h:2188
void set_custom_data(std::any custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2587
virtual void execute(const std::function< void()> &functor, const transwarp::itask &task)=0
Runs a task which is wrapped by the given functor. The functor only captures one shared pointer and c...
Just after a task was canceled (handle_event called on thread that task is run on) ...
TaskType task_type
The task type.
Definition: transwarp.h:2319
auto for_each(InputIt first, InputIt last, UnaryOperation unary_op)
A function similar to std::for_each but returning a transwarp task for deferred, possibly asynchronou...
Definition: transwarp.h:2838
A callable to run a task given its parents.
Definition: transwarp.h:1379
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task.
Definition: transwarp.h:2686
void schedule_all(bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1937
Generates edges.
Definition: transwarp.h:953
const std::optional< std::string > & name() const noexcept
The optional task name.
Definition: transwarp.h:1660
Adds a new listener per event type to the given task.
Definition: transwarp.h:1085
void ensure_task_was_scheduled() const
Checks if the task was scheduled and throws transwarp::control_error if it's not. ...
Definition: transwarp.h:2072
Sets level of a task.
Definition: transwarp.h:924
void reset()
Resets all timing information.
Definition: transwarp.h:3130
std::size_t busy_count() const
Returns the number of busy tasks in the pool.
Definition: transwarp.h:3001
std::size_t size() const
Returns the current total size of the pool (sum of idle and busy tasks)
Definition: transwarp.h:2980
void visit_all(Visitor &visitor)
Visits all tasks.
Definition: transwarp.h:2171
Definition: transwarp.h:1349
void handle_event(transwarp::event_type event, const transwarp::itask &task) override
Performs the actual timing and populates the task's timing members.
Definition: transwarp.h:3089
void set_executor(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs.
Definition: transwarp.h:2552
constexpr no_op_functor no_op
An object to use in places where a no-op functor is required.
Definition: transwarp.h:1554
void set_executor_all(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to all tasks which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1711
bool was_scheduled() const noexceptoverride
Returns whether the task was scheduled and not reset afterwards. This means that the underlying futur...
Definition: transwarp.h:1961
void cancel(bool) noexceptoverride
No-op because a value task never runs.
Definition: transwarp.h:2720
Just before a task is scheduled (handle_event called on thread of caller to schedule()) ...
void schedule_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2664
void schedule_all(transwarp::executor &executor, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1946
std::shared_ptr< transwarp::executor > executor() const noexceptoverride
The task's executor (may be null)
Definition: transwarp.h:1665
void add_listener_all(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2622
Assigns a priority to the given task.
Definition: transwarp.h:1020
Result call(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Calls the functor of the given task with the results from the tuple of parents. Throws transwarp::tas...
Definition: transwarp.h:883
std::shared_ptr< transwarp::task< ResultType > > wait_for_next_task(bool maybe_resize=true)
Just like next_task() but waits for a task to become available. The returned graph will always be a v...
Definition: transwarp.h:2970
void push(T &&value)
Pushes a new value onto the end of the buffer. If that exceeds the capacity of the buffer then the ol...
Definition: transwarp.h:1450
void schedule(transwarp::executor &executor) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1903
std::size_t id() const noexceptoverride
The task's id.
Definition: transwarp.h:2497
bool canceled() const noexceptoverride
Value tasks cannot be canceled.
Definition: transwarp.h:2532
Assigns an executor to the given task.
Definition: transwarp.h:1000
std::size_t capacity() const
Returns the capacity of the buffer.
Definition: transwarp.h:1470
A simple circular buffer (FIFO). ValueType must support default construction. The buffer lets you pus...
Definition: transwarp.h:1424
void finalize() override
Nothing to be done to finalize a value task.
Definition: transwarp.h:2494
void cancel_all(bool enabled) noexceptoverride
If enabled then all pending tasks in the graph are canceled which will throw transwarp::task_canceled...
Definition: transwarp.h:2009
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:1777
std::tuple< std::shared_future< ParentResults >...> get_futures(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &input)
Returns the futures from the given tuple of tasks.
Definition: transwarp.h:610
auto make_task(TaskType, Functor &&functor, std::shared_ptr< Parents >...parents)
A factory function to create a new task.
Definition: transwarp.h:2812
void execute(const std::function< void()> &functor, const transwarp::itask &) override
Pushes the functor into the thread pool for asynchronous execution.
Definition: transwarp.h:1601
typename transwarp::detail::functor_result< TaskType, Functor, ParentResults...>::type functor_result_t
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1256
void remove_listeners_all(transwarp::event_type) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2649
transwarp::task_type type() const noexceptoverride
The task's type.
Definition: transwarp.h:2507
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:2020
void schedule_all() override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1920
void set_executor(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to this task which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1701
void set_custom_data(std::any custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1760
void unvisit() noexceptoverride
Traverses through each task and marks them as not visited.
Definition: transwarp.h:2162
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:1849
void cancel_all_but_one(const std::shared_ptr< transwarp::task< OneResult >> &one, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Cancels all tasks but one.
Definition: transwarp.h:701
The task class.
Definition: transwarp.h:387
Definition: transwarp.h:1529
A functor not doing nothing.
Definition: transwarp.h:1549
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2581
std::size_t level() const noexceptoverride
The task's level.
Definition: transwarp.h:1650
void reset() override
Resets this task.
Definition: transwarp.h:1985
const std::any & custom_data() const noexceptoverride
The custom task data (may not hold a value)
Definition: transwarp.h:2527
Just before a task's functor is invoked (handle_event called on thread that task is run on) ...
const transwarp::itask & transwarp_task() const noexcept
The associated task.
Definition: transwarp.h:478
The task's functor accepts the first parent future that becomes ready.
The consume_any type. Used for tag dispatch.
Definition: transwarp.h:109
std::size_t level() const noexceptoverride
The task's level.
Definition: transwarp.h:2502
bool was_scheduled() const noexceptoverride
Returns true because a value task is scheduled once on construction.
Definition: transwarp.h:2691
Unvisits the given task.
Definition: transwarp.h:1160
void remove_listeners_all() override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2646
The accept type. Used for tag dispatch.
Definition: transwarp.h:97
void add_listener_all(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2625
std::int64_t avg_waittime_us() const noexceptoverride
Returns the average waittime in microseconds (-1 if never set)
Definition: transwarp.h:1690
std::vector< transwarp::itask * > parents() const override
Returns the task's parents (may be empty)
Definition: transwarp.h:2015
constexpr transwarp::accept_type accept
The accept task tag.
Definition: transwarp.h:98
std::size_t maximum_size() const
Returns the maximum size of the pool.
Definition: transwarp.h:2990
TaskType task_type
The task type.
Definition: transwarp.h:1620
virtual void handle_event(transwarp::event_type event, const transwarp::itask &task)=0
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions. The lifetime of the task reference is not guaranteed beyond the duration of handle_event, and listeners must not retain a copy of the task.
void remove_listener_all(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2637
void remove_listeners() override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2640
bool has_result() const noexceptoverride
Returns whether this task contains a result.
Definition: transwarp.h:1980
Removes all listeners from the given task.
Definition: transwarp.h:1126
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority (e.g. a custom executor)
Definition: transwarp.h:1739
The listener interface to listen to events raised by tasks.
Definition: transwarp.h:164
void schedule_impl(bool reset, transwarp::executor *executor=nullptr) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2123
Exception thrown when a task is canceled.
Definition: transwarp.h:57
TaskType task_type
The task type.
Definition: transwarp.h:2366
void wait() const override
Waits for the task to complete. Should only be called if was_scheduled() is true, throws transwarp::c...
Definition: transwarp.h:1967
void set_avg_runtime_us(std::int64_t runtime) noexceptoverride
Assigns the given runtime.
Definition: transwarp.h:2115
void add_listener(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only.
Definition: transwarp.h:1804
std::shared_future< ResultType > make_future_with_value(Value &&value)
Returns a ready future with the given value as its state.
Definition: transwarp.h:1270
void schedule() override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1886
void add_listener(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2616
void add_listener(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types.
Definition: transwarp.h:1795
void ensure_task_not_running() const
Checks if the task is currently running and throws transwarp::control_error if it is...
Definition: transwarp.h:2065
Just before a task starts running (handle_event called on thread that task is run on) ...
Removes custom data from the given task.
Definition: transwarp.h:1052
void schedule_all(transwarp::executor &, bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2673
const std::optional< std::string > & name() const noexcept
The optional task name.
Definition: transwarp.h:2512
Resets the given task.
Definition: transwarp.h:980
void execute(const std::function< void()> &functor, const transwarp::itask &) override
Runs the functor on the current thread.
Definition: transwarp.h:1575
auto then(TaskType_, Functor_ &&functor)
Creates a continuation to this task.
Definition: transwarp.h:2483
void set_custom_data_all(std::any custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1770
const transwarp::itask & child() const noexcept
Returns the child task.
Definition: transwarp.h:195
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:1877
bool full() const
Returns whether the buffer is full.
Definition: transwarp.h:1486
A base class for a user-defined functor that needs access to the associated task or a cancel point to...
Definition: transwarp.h:470
constexpr transwarp::consume_any_type consume_any
The consume_any task tag.
Definition: transwarp.h:110
Removes a listener per event type from the given task.
Definition: transwarp.h:1112
std::int64_t avg_runtime_us() const noexceptoverride
Returns -1 as value tasks don't run.
Definition: transwarp.h:2547
std::int64_t priority() const noexceptoverride
The task priority (defaults to 0)
Definition: transwarp.h:1670
std::int64_t avg_waittime_us() const noexceptoverride
Returns -1 as value tasks don't run.
Definition: transwarp.h:2542
Exception thrown when a task is used in unintended ways.
Definition: transwarp.h:84
Resets the priority of the given task.
Definition: transwarp.h:1032
void assign_task_if(Functor &functor, const transwarp::itask &task) noexcept
Assigns the task to the given functor if the functor is a subclass of transwarp::functor.
Definition: transwarp.h:1261
bool canceled() const noexceptoverride
Returns whether the associated task is canceled.
Definition: transwarp.h:1680
void schedule() override
No-op because a value task never runs.
Definition: transwarp.h:2652
std::size_t size() const
Returns the number of populated values of the buffer. Its maximum value equals the capacity of the bu...
Definition: transwarp.h:1476
void set_value(transwarp::decay_t< result_type > &&value) override
Assigns a value to this task.
Definition: transwarp.h:2681
std::vector< transwarp::edge > edges() override
Returns all edges in the graph. This is mainly for visualizing the tasks and their interdependencies...
Definition: transwarp.h:2028
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:1811
void add_listener(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2619
void wait() const override
No-op because a value task never runs.
Definition: transwarp.h:2696
void set_value(transwarp::decay_t< result_type > &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2284
A task proxy.
Definition: transwarp.h:2224
void transwarp_cancel_point() const
If the associated task is canceled then this will throw transwarp::task_canceled which will stop the ...
Definition: transwarp.h:484
The base task class that contains the functionality that can be used with all result types (void and ...
Definition: transwarp.h:1616
Definition: transwarp.h:596
std::shared_ptr< task_impl > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:2392
The task's functor consumes all parent results.
Visits the given task using the visitor given in the constructor.
Definition: transwarp.h:1148
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:1870
Just after a task has finished running (handle_event called on thread that task is run on) ...
void remove_listener(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2628
Base class for exceptions.
Definition: transwarp.h:48
typename transwarp::detail::parents< ParentResults...>::type parents_t
Determines the type of the parents.
Definition: transwarp.h:1345
constexpr transwarp::consume_type consume
The consume task tag.
Definition: transwarp.h:106
void schedule(transwarp::executor &, bool) override
No-op because a value task never runs.
Definition: transwarp.h:2661
A value task that stores a single value and doesn't require scheduling. Value tasks should be created...
Definition: transwarp.h:2453
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:2576
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:1818
void remove_listeners(transwarp::event_type event) override
Removes all listeners for the given event type.
Definition: transwarp.h:1864
void remove_listener(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types.
Definition: transwarp.h:1825
void set_value(transwarp::decay_t< result_type > &&value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2242
Determines the type of the parents.
Definition: transwarp.h:1297
Parent wait_for_any(const std::shared_ptr< transwarp::task< ParentResults >> &...parents)
Waits for the first parent to finish.
Definition: transwarp.h:675
task_impl(F &&functor, std::shared_ptr< transwarp::task< ParentResults >>...parents)
A task is defined functor and parent tasks. Note: Don't use this constructor directly, use transwarp::make_task.
Definition: transwarp.h:2374
void reset_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2717
void remove_listener(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only.
Definition: transwarp.h:1834
Assigns custom data to the given task.
Definition: transwarp.h:1040
The task's functor takes no arguments but waits for all parents to finish.
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task. Scheduling will have no effect after an exception has been set...
Definition: transwarp.h:1953
Definition: transwarp.h:732
void remove_listeners() override
Removes all listeners.
Definition: transwarp.h:1856
std::shared_ptr< transwarp::task< ResultType > > next_task(bool maybe_resize=true)
Returns the next idle task. If there are no idle tasks then it will attempt to double the pool size...
Definition: transwarp.h:2938
void call_with_each(const Functor &f, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &t)
Calls the functor with every element in the tuple.
Definition: transwarp.h:901
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:1746
auto make_value_task(Value &&value)
A factory function to create a new value task.
Definition: transwarp.h:2828
transwarp::detail::functor_result_t< TaskType, Functor, ParentResults...> result_type
The result type of this task.
Definition: transwarp.h:2369
An edge between two tasks.
Definition: transwarp.h:177
A timer that tracks the average idle, wait, and run time of each task it listens to.
Definition: transwarp.h:3078
void schedule(transwarp::executor &) override
No-op because a value task never runs.
Definition: transwarp.h:2655
void reset_all() override
Resets all tasks in the graph.
Definition: transwarp.h:1993
circular_buffer(std::size_t capacity)
Constructs a circular buffer with a given fixed capacity.
Definition: transwarp.h:1433
Definition: transwarp.h:723
ResultType result_type
The result type of this task.
Definition: transwarp.h:2460
Executor for parallel execution. Uses a simple thread pool.
Definition: transwarp.h:1582
The wait_any type. Used for tag dispatch.
Definition: transwarp.h:117
void set_custom_data_all(std::any custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2596
task_pool(std::shared_ptr< transwarp::task< ResultType >> task)
Constructs a task pool with reasonable defaults for minimum and maximum.
Definition: transwarp.h:2924
void schedule(bool reset) override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1895
std::string to_string(const transwarp::task_type &type)
String conversion for the task_type enumeration.
Definition: transwarp.h:293
void set_value(const transwarp::decay_t< result_type > &value) override
Assigns a value to this task.
Definition: transwarp.h:2676
void remove_executor() override
Removes the executor from this task.
Definition: transwarp.h:1718
The accept_any type. Used for tag dispatch.
Definition: transwarp.h:101
void set_id(std::size_t id) noexceptoverride
Assigns the given id.
Definition: transwarp.h:2085
void set_name(std::optional< std::string > name) noexceptoverride
Assigns the given name.
Definition: transwarp.h:2100
bool is_ready() const override
Returns true because a value task is always ready.
Definition: transwarp.h:2699
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1596
constexpr transwarp::wait_type wait
The wait task tag.
Definition: transwarp.h:114
The root type. Used for tag dispatch.
Definition: transwarp.h:93
The task's functor consumes the first parent result that becomes ready.
std::shared_ptr< TaskType > clone_task(std::unordered_map< std::shared_ptr< transwarp::itask >, std::shared_ptr< transwarp::itask >> &task_cache, const std::shared_ptr< TaskType > &t)
Clones a task.
Definition: transwarp.h:370
A task representing a piece of work given by functor and parent tasks. By connecting tasks a directed...
Definition: transwarp.h:2363
void remove_listener(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2631
std::shared_ptr< task_impl > clone_cast() const
Clones this task and casts the result to a ptr to task_impl.
Definition: transwarp.h:2405
The wait type. Used for tag dispatch.
Definition: transwarp.h:113
task_pool(std::shared_ptr< transwarp::task< ResultType >> task, std::size_t minimum_size, std::size_t maximum_size)
Constructs a task pool.
Definition: transwarp.h:2902
void cancel(bool enabled) noexceptoverride
If enabled then this task is canceled which will throw transwarp::task_canceled when retrieving the t...
Definition: transwarp.h:2002
Schedules using the given executor.
Definition: transwarp.h:967
void swap(circular_buffer &buffer)
Swaps this buffer with the given buffer.
Definition: transwarp.h:1491
constexpr transwarp::root_type root
The root task tag.
Definition: transwarp.h:94
std::vector< transwarp::itask * > parents() const override
Empty because a value task doesn't have parents.
Definition: transwarp.h:2726
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:2601
void schedule_all(transwarp::executor &executor) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1928
void set_level(std::size_t level) noexceptoverride
Assigns the given level.
Definition: transwarp.h:2090
const std::shared_future< result_type > & future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:1790
const value_type & front() const
Returns the value at the front of the buffer (the oldest value). This is undefined if the buffer is e...
Definition: transwarp.h:1457
std::int64_t avg_idletime_us() const noexceptoverride
Returns the average idletime in microseconds (-1 if never set)
Definition: transwarp.h:1685
void set_priority(std::int64_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:2565
The task's functor accepts all parent futures.
Applies final bookkeeping to the task and collects the task.
Definition: transwarp.h:939
void remove_executor_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2561
void schedule(transwarp::executor &executor, bool reset) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1912
std::size_t minimum_size() const
Returns the minimum size of the pool.
Definition: transwarp.h:2985
std::result_of_t< decltype(&std::shared_future< T >::get)(std::shared_future< T >)> result_t
Returns the result type of a std::shared_future<T>
Definition: transwarp.h:362
void wait_for_all(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Waits for all parents to finish.
Definition: transwarp.h:645
task_type
The possible task types.
Definition: transwarp.h:36
std::size_t event_index(transwarp::event_type event) const
Returns the index for a given event type.
Definition: transwarp.h:2179
void set_value() override
Assigns a value to this task. Scheduling will have no effect after a call to this. Calling reset() will reset this and re-enable scheduling.
Definition: transwarp.h:2326
std::vector< transwarp::edge > edges() override
Returns empty edges because a value task doesn't have parents.
Definition: transwarp.h:2736
std::int64_t avg_runtime_us() const noexceptoverride
Returns the average runtime in microseconds (-1 if never set)
Definition: transwarp.h:1695
void pop()
Removes the value at the front of the buffer (the oldest value)
Definition: transwarp.h:1462
void resize(std::size_t new_size)
Resizes the task pool to the given new size if possible.
Definition: transwarp.h:3007
event_type
The task events that can be subscribed to using the listener interface.
Definition: transwarp.h:153
Removes all listeners per event type from the given task.
Definition: transwarp.h:1135
void schedule_all_impl(bool reset_all, transwarp::executor *executor=nullptr)
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2147
std::shared_ptr< value_task > clone_cast() const
Clones this task and casts the result to a ptr to value_task.
Definition: transwarp.h:2489
The task's functor takes no arguments but waits for the first parent to finish.
Exception thrown when an invalid parameter was passed to a function.
Definition: transwarp.h:75
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:1783
std::int64_t avg_idletime_us() const noexceptoverride
Returns -1 as value tasks don't run.
Definition: transwarp.h:2537
void reset() override
No-op because a value task never runs.
Definition: transwarp.h:2714
value_task(T &&value)
A value task is defined by a given value. Note: Don't use this constructor directly, use transwarp::make_value_task.
Definition: transwarp.h:2465
std::shared_future< void > make_ready_future()
Returns a ready future.
Definition: transwarp.h:1277
void set_executor_all(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2555
std::shared_ptr< transwarp::executor > executor() const noexceptoverride
Value tasks don't have executors as they don't run.
Definition: transwarp.h:2517
constexpr transwarp::accept_any_type accept_any
The accept_any task tag.
Definition: transwarp.h:102
bool empty() const
Returns whether the buffer is empty.
Definition: transwarp.h:1481
const transwarp::itask & parent() const noexcept
Returns the parent task.
Definition: transwarp.h:190
Cancels or resumes the given task.
Definition: transwarp.h:988
void remove_listener_all(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2634
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:2731
void set_priority(std::int64_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:1732
static Result work(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Definition: transwarp.h:816
void remove_executor() override
No-op because a value task never runs.
Definition: transwarp.h:2558
void apply_to_each(Functor &&f, Tuple &&t)
Applies the functor to each element in the tuple.
Definition: transwarp.h:590
void schedule(bool) override
No-op because a value task never runs.
Definition: transwarp.h:2658
void set_avg_waittime_us(std::int64_t waittime) noexceptoverride
Assigns the given waittime.
Definition: transwarp.h:2110
void schedule_all(bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2670
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1570
Pushes the given task into the vector of tasks.
Definition: transwarp.h:1060
auto then(TaskType_, Functor_ &&functor)
Creates a continuation to this task.
Definition: transwarp.h:2399
Executor for sequential execution. Runs functors sequentially on the same thread. ...
Definition: transwarp.h:1558
const std::shared_future< result_type > & future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:2611
bool has_result() const noexceptoverride
Returns true because a value task always contains a result.
Definition: transwarp.h:2704
std::shared_future< ResultType > make_future_with_exception(std::exception_ptr exception)
Returns a ready future with the given exception as its state.
Definition: transwarp.h:1285
void remove_executor_all() override
Removes the executor from all tasks.
Definition: transwarp.h:1724
bool is_ready() const override
Returns whether the task has finished processing. Should only be called if was_scheduled() is true...
Definition: transwarp.h:1974
An interface for the task class.
Definition: transwarp.h:208
std::shared_ptr< value_task > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:2476
void visit(const std::function< void(transwarp::itask &)> &visitor) override
Visits each task in a depth-first traversal.
Definition: transwarp.h:2153