21 #include <type_traits>
22 #include <unordered_map>
46 : std::runtime_error(message)
103 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
107 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
111 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
115 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
119 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
123 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
127 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
153 node& operator=(
const node&) =
delete;
173 const std::shared_ptr<std::string>&
get_name() const noexcept {
183 const std::vector<std::shared_ptr<node>>&
get_parents() const noexcept {
199 return canceled_.load();
206 std::size_t level_ = 0;
208 std::shared_ptr<std::string> name_;
209 std::shared_ptr<std::string> executor_;
210 std::vector<std::shared_ptr<node>> parents_;
211 std::size_t priority_ = 0;
212 std::shared_ptr<void> custom_data_;
213 std::atomic_bool canceled_{
false};
220 const std::shared_ptr<std::string>& name = node.
get_name();
222 s +=
"<" + *name +
">" + seperator;
225 s +=
" id=" + std::to_string(node.
get_id());
226 s +=
" lev=" + std::to_string(node.
get_level());
227 const std::shared_ptr<std::string>& exec = node.
get_executor();
229 s += seperator +
"<" + *exec +
">";
240 edge(std::shared_ptr<transwarp::node> parent, std::shared_ptr<transwarp::node> child) noexcept
241 : parent_(std::move(parent)), child_(std::move(child))
246 edge& operator=(
const edge&) =
default;
251 const std::shared_ptr<transwarp::node>&
get_parent() const noexcept {
256 const std::shared_ptr<transwarp::node>&
get_child() const noexcept {
261 std::shared_ptr<transwarp::node> parent_;
262 std::shared_ptr<transwarp::node> child_;
272 inline std::string
to_string(
const std::vector<transwarp::edge>&
graph,
const std::string& separator=
"\n") {
273 std::string dot =
"digraph {" + separator;
288 virtual std::string
get_name()
const = 0;
295 virtual void execute(
const std::function<
void()>&
functor,
const std::shared_ptr<transwarp::node>&
node) = 0;
331 virtual ~
itask() =
default;
333 virtual void set_executor(std::shared_ptr<transwarp::executor>
executor) = 0;
334 virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
335 virtual void remove_executor() = 0;
336 virtual void remove_executor_all() = 0;
337 virtual void set_priority(std::size_t priority) = 0;
338 virtual void set_priority_all(std::size_t priority) = 0;
339 virtual void reset_priority() = 0;
340 virtual void reset_priority_all() = 0;
341 virtual void set_custom_data(std::shared_ptr<void> custom_data) = 0;
342 virtual void set_custom_data_all(std::shared_ptr<void> custom_data) = 0;
343 virtual void remove_custom_data() = 0;
344 virtual void remove_custom_data_all() = 0;
345 virtual const std::shared_ptr<transwarp::node>& get_node()
const noexcept = 0;
346 virtual void add_listener(std::shared_ptr<transwarp::listener>
listener) = 0;
347 virtual void add_listener(
transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
348 virtual void remove_listener(
const std::shared_ptr<transwarp::listener>& listener) = 0;
349 virtual void remove_listener(
transwarp::event_type event,
const std::shared_ptr<transwarp::listener>& listener) = 0;
351 virtual void remove_listeners() = 0;
352 virtual void schedule() = 0;
354 virtual void schedule(
bool reset) = 0;
356 virtual void schedule_all() = 0;
358 virtual void schedule_all(
bool reset_all) = 0;
364 virtual void set_exception(std::exception_ptr exception) = 0;
365 virtual bool was_scheduled()
const noexcept = 0;
366 virtual void wait()
const = 0;
367 virtual bool is_ready()
const = 0;
368 virtual bool has_result()
const = 0;
369 virtual void reset() = 0;
370 virtual void reset_all() = 0;
371 virtual void cancel(
bool enabled) noexcept = 0;
372 virtual void cancel_all(
bool enabled) noexcept = 0;
373 virtual std::vector<transwarp::edge> get_graph()
const = 0;
384 virtual void visit_depth(
const std::function<
void(
itask&)>& visitor) = 0;
385 virtual void unvisit() noexcept = 0;
386 virtual void set_node_id(std::size_t
id) noexcept = 0;
393 using type =
typename std::remove_const<typename std::remove_reference<T>::type>::type;
400 using type =
typename std::result_of<decltype(&std::shared_future<T>::get)(std::shared_future<T>)>::type;
405 template<
typename ResultType>
408 using result_type = ResultType;
410 virtual ~
task() =
default;
412 virtual void set_value(
const typename transwarp::decay<result_type>::type& value) = 0;
413 virtual void set_value(
typename transwarp::decay<result_type>::type&& value) = 0;
414 virtual const std::shared_future<result_type>& get_future()
const noexcept = 0;
415 virtual typename transwarp::result<result_type>::type
get()
const = 0;
419 template<
typename ResultType>
422 using result_type = ResultType&;
424 virtual ~
task() =
default;
426 virtual void set_value(
typename transwarp::decay<result_type>::type& value) = 0;
427 virtual const std::shared_future<result_type>& get_future()
const noexcept = 0;
428 virtual typename transwarp::result<result_type>::type
get()
const = 0;
435 using result_type = void;
437 virtual ~
task() =
default;
439 virtual void set_value() = 0;
440 virtual const std::shared_future<result_type>& get_future()
const noexcept = 0;
441 virtual result_type
get()
const = 0;
456 return transwarp_node_;
462 if (transwarp_node_->is_canceled()) {
471 std::shared_ptr<transwarp::node> transwarp_node_;
486 static void set_level(
transwarp::node& node, std::size_t level) noexcept {
494 static void set_name(
transwarp::node& node, std::shared_ptr<std::string> name) noexcept {
500 node.executor_ = std::move(executor);
502 node.executor_.reset();
506 static void add_parent(
transwarp::node& node, std::shared_ptr<transwarp::node> parent) {
507 node.parents_.push_back(std::move(parent));
510 static void set_priority(
transwarp::node& node, std::size_t priority) noexcept {
511 node.priority_ = priority;
514 static void set_custom_data(
transwarp::node& node, std::shared_ptr<void> custom_data) {
516 node.custom_data_ = std::move(custom_data);
518 node.custom_data_.reset();
522 static void set_canceled(
transwarp::node& node,
bool enabled) noexcept {
523 node.canceled_ = enabled;
536 if (n_threads == 0) {
539 const std::size_t n_target = threads_.size() + n_threads;
540 while (threads_.size() < n_target) {
543 thread = std::thread(&thread_pool::worker,
this);
549 threads_.push_back(std::move(thread));
568 void push(
const std::function<
void()>&
functor) {
570 std::lock_guard<std::mutex> lock(mutex_);
573 cond_var_.notify_one();
582 std::unique_lock<std::mutex> lock(mutex_);
583 cond_var_.wait(lock, [
this]{
584 return done_ || !functors_.empty();
586 if (done_ && functors_.empty()) {
589 functor = functors_.front();
598 std::lock_guard<std::mutex> lock(mutex_);
601 cond_var_.notify_all();
602 for (std::thread& thread : threads_) {
609 std::vector<std::thread> threads_;
610 std::queue<std::function<void()>> functors_;
611 std::condition_variable cond_var_;
616 template<
int offset,
typename... ParentResults>
618 static void work(
const std::tuple<std::shared_ptr<
transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
619 std::get<offset>(target) = std::get<offset>(source)->get_future();
624 template<
typename... ParentResults>
630 template<
typename... ParentResults>
632 std::tuple<std::shared_future<ParentResults>...>
result;
638 template<
typename ParentResultType>
640 std::vector<std::shared_future<ParentResultType>>
result;
641 result.reserve(input.size());
643 result.emplace_back(
task->get_future());
650 template<
typename Result,
typename Task,
typename... Args>
651 Result
run_task(std::size_t node_id,
const std::weak_ptr<Task>&
task, Args&&... args) {
652 const std::shared_ptr<Task> t = task.lock();
656 if (t->node_->is_canceled()) {
660 return t->functor_(std::forward<Args>(args)...);
664 inline void wait_for_all() {}
667 template<
typename ParentResult,
typename... ParentResults>
669 parent->get_future().wait();
675 template<
typename ParentResultType>
678 parent->get_future().wait();
683 template<
typename Parent>
684 Parent wait_for_any_impl() {
688 template<
typename Parent,
typename ParentResult,
typename... ParentResults>
690 const std::future_status status = parent->get_future().wait_for(std::chrono::microseconds(1));
691 if (status == std::future_status::ready) {
694 return transwarp::detail::wait_for_any_impl<Parent>(parents...);
698 template<
typename Parent,
typename... ParentResults>
701 Parent parent = transwarp::detail::wait_for_any_impl<Parent>(
parents...);
710 template<
typename ParentResultType>
714 const std::future_status status = parent->get_future().wait_for(std::chrono::microseconds(1));
715 if (status == std::future_status::ready) {
723 template<
typename OneResult>
727 template<
typename OneResult,
typename ParentResult,
typename... ParentResults>
730 parent->cancel(
true);
737 template<
typename OneResult,
typename ParentResultType>
741 parent->cancel(
true);
747 template<
typename TaskType,
bool done,
int total,
int... n>
749 template<
typename Result,
typename Task,
typename... ParentResults>
752 work<Result>(node_id, task,
parents);
756 template<
typename TaskType>
759 template<
int total,
int... n>
761 template<
typename Result,
typename Task,
typename... ParentResults>
763 return transwarp::detail::run_task<Result>(node_id, task);
769 template<
typename Result,
typename Task,
typename ParentResultType>
771 return transwarp::detail::run_task<Result>(node_id, task);
775 template<
int total,
int... n>
777 template<
typename Result,
typename Task,
typename... ParentResults>
781 return transwarp::detail::run_task<Result>(node_id, task, std::get<n>(futures)...);
787 template<
typename Result,
typename Task,
typename ParentResultType>
794 template<
int total,
int... n>
796 template<
typename Result,
typename Task,
typename... ParentResults>
798 using parent_t =
typename std::remove_reference<decltype(std::get<0>(
parents))>::type;
799 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
801 return transwarp::detail::run_task<Result>(node_id, task, parent->get_future());
807 template<
typename Result,
typename Task,
typename ParentResultType>
811 return transwarp::detail::run_task<Result>(node_id, task, parent->get_future());
815 template<
int total,
int... n>
817 template<
typename Result,
typename Task,
typename... ParentResults>
820 return transwarp::detail::run_task<Result>(node_id, task, std::get<n>(
parents)->get_future().get()...);
826 template<
typename Result,
typename Task,
typename ParentResultType>
829 std::vector<ParentResultType> results;
830 results.reserve(
parents.size());
832 results.emplace_back(parent->get_future().get());
834 return transwarp::detail::run_task<Result>(node_id, task, std::move(results));
838 template<
int total,
int... n>
840 template<
typename Result,
typename Task,
typename... ParentResults>
842 using parent_t =
typename std::remove_reference<decltype(std::get<0>(
parents))>::type;
843 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
845 return transwarp::detail::run_task<Result>(node_id, task, parent->get_future().get());
851 template<
typename Result,
typename Task,
typename ParentResultType>
855 return transwarp::detail::run_task<Result>(node_id, task, parent->get_future().get());
859 template<
int total,
int... n>
861 template<
typename Result,
typename Task,
typename... ParentResults>
864 get_all(std::get<n>(
parents)...);
865 return transwarp::detail::run_task<Result>(node_id, task);
867 template<
typename T,
typename... Args>
868 static void get_all(
const T& arg,
const Args& ...args) {
869 arg->get_future().get();
872 static void get_all() {}
877 template<
typename Result,
typename Task,
typename ParentResultType>
881 parent->get_future().get();
883 return transwarp::detail::run_task<Result>(node_id, task);
887 template<
int total,
int... n>
889 template<
typename Result,
typename Task,
typename... ParentResults>
891 using parent_t =
typename std::remove_reference<decltype(std::get<0>(
parents))>::type;
892 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
894 parent->get_future().get();
895 return transwarp::detail::run_task<Result>(node_id, task);
901 template<
typename Result,
typename Task,
typename ParentResultType>
905 parent->get_future().get();
906 return transwarp::detail::run_task<Result>(node_id, task);
913 template<
typename TaskType,
typename Result,
typename Task,
typename... ParentResults>
915 constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_future<ParentResults>...>>::value;
917 work<Result>(node_id, task,
parents);
923 template<
typename TaskType,
typename Result,
typename Task,
typename ParentResultType>
926 work<Result>(node_id, task,
parents);
933 template<std::size_t end, std::size_t idx, std::size_t... i>
936 template<std::size_t end, std::size_t... i>
941 template<std::
size_t b, std::
size_t e>
946 template<
typename Functor,
typename... ParentResults>
949 template<std::size_t i, std::size_t... j,
typename Functor,
typename... ParentResults>
951 auto ptr = std::get<i>(t);
960 template<
typename Functor,
typename... ParentResults>
962 constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>>::value;
963 using index_t =
typename transwarp::detail::index_range<0, n>::type;
964 transwarp::detail::call_with_each_index(index_t(), f, t);
968 template<
typename Functor,
typename ParentResultType>
985 transwarp::detail::node_manip::add_parent(node_, task.get_node());
986 if (node_.
get_level() <= task.get_node()->get_level()) {
988 transwarp::detail::node_manip::set_level(node_, task.get_node()->get_level() + 1);
1001 task.set_node_id(id_++);
1013 const std::shared_ptr<transwarp::node>&
node = task.get_node();
1014 for (
const std::shared_ptr<transwarp::node>& parent : node->get_parents()) {
1015 graph_.emplace_back(parent, node);
1019 std::vector<transwarp::edge>& graph_;
1025 : reset_(reset), executor_(executor) {}
1028 task.schedule_impl(reset_, executor_);
1046 : enabled_(enabled) {}
1049 task.cancel(enabled_);
1058 : executor_(std::move(executor)) {}
1061 task.set_executor(executor_);
1064 std::shared_ptr<transwarp::executor> executor_;
1071 task.remove_executor();
1078 : priority_(priority) {}
1081 task.set_priority(priority_);
1084 std::size_t priority_;
1091 task.reset_priority();
1098 : custom_data_(std::move(custom_data)) {}
1101 task.set_custom_data(custom_data_);
1104 std::shared_ptr<void> custom_data_;
1111 task.remove_custom_data();
1121 tasks_.push_back(&task);
1124 std::vector<transwarp::itask*>& tasks_;
1130 : visitor_(visitor) {}
1133 task.visit_depth(visitor_);
1136 const std::function<void(transwarp::itask&)>& visitor_;
1148 template<
typename TaskType,
typename Functor,
typename... ParentResults>
1150 static_assert(std::is_same<TaskType, transwarp::root_type>::value ||
1151 std::is_same<TaskType, transwarp::accept_type>::value ||
1152 std::is_same<TaskType, transwarp::accept_any_type>::value ||
1153 std::is_same<TaskType, transwarp::consume_type>::value ||
1154 std::is_same<TaskType, transwarp::consume_any_type>::value ||
1155 std::is_same<TaskType, transwarp::wait_type>::value ||
1156 std::is_same<TaskType, transwarp::wait_any_type>::value,
1157 "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1160 template<
typename Functor,
typename... ParentResults>
1162 static_assert(
sizeof...(ParentResults) == 0,
"A root task cannot have parent tasks");
1163 using type = decltype(std::declval<Functor>()());
1166 template<
typename Functor,
typename... ParentResults>
1168 static_assert(
sizeof...(ParentResults) > 0,
"An accept task must have at least one parent");
1169 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1172 template<
typename Functor,
typename ParentResultType>
1174 using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1177 template<
typename Functor,
typename... ParentResults>
1179 static_assert(
sizeof...(ParentResults) > 0,
"An accept_any task must have at least one parent");
1180 using arg_t =
typename std::tuple_element<0, std::tuple<ParentResults...>>::type;
1181 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1184 template<
typename Functor,
typename ParentResultType>
1186 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1189 template<
typename Functor,
typename... ParentResults>
1191 static_assert(
sizeof...(ParentResults) > 0,
"A consume task must have at least one parent");
1192 using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1195 template<
typename Functor,
typename ParentResultType>
1197 using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1200 template<
typename Functor,
typename... ParentResults>
1202 static_assert(
sizeof...(ParentResults) > 0,
"A consume_any task must have at least one parent");
1203 using arg_t =
typename std::tuple_element<0, std::tuple<ParentResults...>>::type;
1204 using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1207 template<
typename Functor,
typename ParentResultType>
1209 using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1212 template<
typename Functor,
typename... ParentResults>
1214 static_assert(
sizeof...(ParentResults) > 0,
"A wait task must have at least one parent");
1215 using type = decltype(std::declval<Functor>()());
1218 template<
typename Functor,
typename ParentResultType>
1220 using type = decltype(std::declval<Functor>()());
1223 template<
typename Functor,
typename... ParentResults>
1225 static_assert(
sizeof...(ParentResults) > 0,
"A wait_any task must have at least one parent");
1226 using type = decltype(std::declval<Functor>()());
1229 template<
typename Functor,
typename ParentResultType>
1231 using type = decltype(std::declval<Functor>()());
1234 template<
bool is_transwarp_functor>
1239 template<
typename Functor>
1240 void operator()(Functor&
functor, std::shared_ptr<transwarp::node>
node)
const noexcept {
1241 functor.transwarp_node_ = std::move(node);
1247 template<
typename Functor>
1248 void operator()(Functor&, std::shared_ptr<transwarp::node>)
const noexcept {}
1252 template<
typename Functor>
1258 template<
typename ResultType,
typename Value>
1260 std::promise<ResultType> promise;
1261 promise.set_value(std::forward<Value>(value));
1262 return promise.get_future();
1267 std::promise<void> promise;
1268 promise.set_value();
1269 return promise.get_future();
1273 template<
typename ResultType>
1278 std::promise<ResultType> promise;
1279 promise.set_exception(exception);
1280 return promise.get_future();
1285 template<
typename... ParentResults>
1287 using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1291 template<
typename ParentResultType>
1292 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1293 using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1297 template<
typename ResultType,
typename TaskType>
1301 template<
typename Task,
typename Parents>
1302 void call(std::size_t node_id,
1303 const std::weak_ptr<Task>&
task,
1305 promise_.set_value(transwarp::detail::call<TaskType, ResultType>(node_id, task, parents));
1308 std::promise<ResultType> promise_;
1311 template<
typename TaskType>
1315 template<
typename Task,
typename Parents>
1316 void call(std::size_t node_id,
1317 const std::weak_ptr<Task>&
task,
1319 transwarp::detail::call<TaskType, void>(node_id, task, parents);
1320 promise_.set_value();
1323 std::promise<void> promise_;
1327 template<
typename ResultType,
typename TaskType,
typename Task,
typename Parents>
1331 runner(std::size_t node_id,
1332 const std::weak_ptr<Task>&
task,
1333 const typename transwarp::decay<Parents>::type&
parents)
1334 : node_id_(node_id),
1339 std::future<ResultType> get_future() {
1340 return this->promise_.get_future();
1344 if (
const std::shared_ptr<Task> t = task_.lock()) {
1348 this->call(node_id_, task_, parents_);
1350 this->promise_.set_exception(std::current_exception());
1351 if (
const std::shared_ptr<Task> t = task_.lock()) {
1355 this->promise_.set_exception(std::current_exception());
1357 if (
const std::shared_ptr<Task> t = task_.lock()) {
1363 const std::size_t node_id_;
1364 const std::weak_ptr<Task> task_;
1365 const typename transwarp::decay<Parents>::type parents_;
1372 template<
typename ValueType>
1376 static_assert(std::is_default_constructible<ValueType>::value,
"ValueType must be default constructible");
1378 using value_type = ValueType;
1398 template<typename T, typename = typename std::enable_if<std::is_same<typename std::decay<T>::type, value_type>::value>::type>
1400 data_[end_] = std::forward<T>(value);
1407 return data_[front_];
1413 data_[front_] = ValueType{};
1420 return data_.size();
1436 return size_ == data_.size();
1441 std::swap(end_, buffer.end_);
1442 std::swap(front_, buffer.front_);
1443 std::swap(size_, buffer.size_);
1444 std::swap(data_, buffer.data_);
1449 void increment_or_wrap(std::size_t& value)
const {
1450 if (value == data_.size() - 1) {
1458 increment_or_wrap(end_);
1460 increment_or_wrap(front_);
1467 increment_or_wrap(front_);
1472 std::size_t front_{};
1473 std::size_t size_{};
1474 std::vector<value_type> data_;
1495 return "transwarp::sequential";
1499 void execute(
const std::function<
void()>&
functor,
const std::shared_ptr<transwarp::node>&)
override {
1509 explicit parallel(std::size_t n_threads)
1521 return "transwarp::parallel";
1525 void execute(
const std::function<
void()>&
functor,
const std::shared_ptr<transwarp::node>&)
override {
1539 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
1541 public std::enable_shared_from_this<task_impl_base<ResultType, TaskType, Functor, ParentResults...>> {
1547 using result_type = ResultType;
1556 executor_ = std::move(executor);
1557 transwarp::detail::node_manip::set_executor(*node_, std::shared_ptr<std::string>(
new std::string(executor_->get_name())));
1565 visit_depth_all(visitor);
1572 transwarp::detail::node_manip::set_executor(*node_,
nullptr);
1579 visit_depth_all(visitor);
1586 transwarp::detail::node_manip::set_priority(*node_, priority);
1594 visit_depth_all(visitor);
1600 transwarp::detail::node_manip::set_priority(*node_, 0);
1607 visit_depth_all(visitor);
1617 transwarp::detail::node_manip::set_custom_data(*node_, std::move(custom_data));
1625 visit_depth_all(visitor);
1631 transwarp::detail::node_manip::set_custom_data(*node_,
nullptr);
1638 visit_depth_all(visitor);
1642 const std::shared_future<result_type>&
get_future() const noexcept
override {
1647 const std::shared_ptr<transwarp::node>&
get_node() const noexcept
override {
1654 check_listener(listener);
1655 for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1656 l.push_back(listener);
1663 check_listener(listener);
1664 listeners_[get_event_index(event)].push_back(std::move(listener));
1670 check_listener(listener);
1671 for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1672 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1679 check_listener(listener);
1680 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_[get_event_index(event)];
1681 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1687 listeners_[get_event_index(event)].clear();
1693 for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1703 this->schedule_impl(
true);
1712 this->schedule_impl(reset);
1720 this->schedule_impl(
true, &executor);
1729 this->schedule_impl(reset, &executor);
1771 schedule_all_impl(
true, type);
1779 schedule_all_impl(
true, type, &executor);
1788 schedule_all_impl(reset_all, type);
1797 schedule_all_impl(reset_all, type, &executor);
1804 future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
1805 schedule_mode_ =
false;
1811 return future_.valid();
1825 return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
1836 future_ = std::shared_future<result_type>();
1837 transwarp::detail::node_manip::set_canceled(*node_,
false);
1838 schedule_mode_ =
true;
1845 visit_depth_all(visitor);
1851 void cancel(
bool enabled) noexcept
override {
1852 transwarp::detail::node_manip::set_canceled(*node_, enabled);
1860 visit_depth_all(visitor);
1867 std::vector<transwarp::edge>
graph;
1875 template<
typename F>
1878 : node_(new transwarp::
node),
1879 functor_(std::forward<F>(
functor)),
1880 parents_(std::move(
parents)...)
1882 init(has_name, std::move(name));
1885 template<
typename F,
typename P>
1888 : node_(new transwarp::
node),
1889 functor_(std::forward<F>(
functor)),
1890 parents_(std::move(parents))
1892 if (parents_.empty()) {
1895 init(has_name, std::move(name));
1898 void init(
bool has_name, std::string name) {
1899 transwarp::detail::node_manip::set_type(*node_, task_type::value);
1900 transwarp::detail::node_manip::set_name(*node_, (has_name ? std::shared_ptr<std::string>(
new std::string(std::move(name))) :
nullptr));
1904 visit_depth(visitor);
1910 if (future_.valid() && future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
1917 if (!future_.valid()) {
1922 bool schedule_mode_ =
true;
1923 std::shared_future<result_type> future_;
1927 template<
typename R,
typename Y,
typename T,
typename P>
1930 template<
typename R,
typename T,
typename... A>
1934 void set_node_id(std::size_t
id) noexcept
override {
1935 transwarp::detail::node_manip::set_id(*node_,
id);
1943 if (schedule_mode_ && (reset || !future_.valid())) {
1945 transwarp::detail::node_manip::set_canceled(*node_,
false);
1947 std::weak_ptr<task_impl_base>
self = this->shared_from_this();
1949 std::shared_ptr<runner_t> runner = std::shared_ptr<runner_t>(
new runner_t(node_->get_id(),
self, parents_));
1951 future_ = runner->get_future();
1953 executor_->execute([runner]{ (*runner)(); }, node_);
1954 }
else if (executor) {
1955 executor->execute([runner]{ (*runner)(); }, node_);
1970 visit_breadth_all(visitor);
1973 visit_depth_all(visitor);
1981 template<
typename Visitor>
1982 void visit_breadth_all(Visitor& visitor) {
1983 if (breadth_tasks_.empty()) {
1984 breadth_tasks_.reserve(node_->get_id() + 1);
1988 const std::size_t l_level = l->get_node()->get_level();
1989 const std::size_t l_id = l->get_node()->get_id();
1990 const std::size_t r_level = r->get_node()->get_level();
1991 const std::size_t r_id = r->get_node()->get_id();
1992 return std::tie(l_level, l_id) < std::tie(r_level, r_id);
1994 std::sort(breadth_tasks_.begin(), breadth_tasks_.end(), compare);
2002 template<
typename Visitor>
2003 void visit_depth_all(Visitor& visitor) {
2004 if (depth_tasks_.empty()) {
2005 depth_tasks_.reserve(node_->get_id() + 1);
2015 void visit_depth(
const std::function<
void(
transwarp::itask&)>& visitor)
override {
2024 void unvisit() noexcept
override {
2033 const std::size_t index =
static_cast<std::size_t
>(event);
2034 if (index >= static_cast<std::size_t>(transwarp::event_type::count)) {
2042 for (
const std::shared_ptr<transwarp::listener>& listener : listeners_[static_cast<std::size_t>(event)]) {
2043 listener->handle_event(event, node_);
2048 void check_listener(
const std::shared_ptr<transwarp::listener>& listener)
const {
2054 std::shared_ptr<transwarp::node> node_;
2057 bool visited_ =
false;
2058 std::shared_ptr<transwarp::executor> executor_;
2059 std::vector<std::shared_ptr<transwarp::listener>> listeners_[
static_cast<std::size_t
>(transwarp::event_type::count)];
2060 std::vector<transwarp::itask*> depth_tasks_;
2061 std::vector<transwarp::itask*> breadth_tasks_;
2066 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2073 using result_type = ResultType;
2077 void set_value(
const typename transwarp::decay<result_type>::type& value)
override {
2079 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2080 this->schedule_mode_ =
false;
2085 void set_value(
typename transwarp::decay<result_type>::type&& value)
override {
2087 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2088 this->schedule_mode_ =
false;
2094 typename transwarp::result<result_type>::type
get()
const override {
2096 return this->future_.get();
2101 template<
typename F>
2108 template<
typename F,
typename P>
2112 : transwarp::detail::task_impl_base<
result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents))
2118 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2125 using result_type = ResultType&;
2129 void set_value(
typename transwarp::decay<result_type>::type& value)
override {
2130 this->ensure_task_not_running();
2131 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2132 this->schedule_mode_ =
false;
2138 typename transwarp::result<result_type>::type
get()
const override {
2139 this->ensure_task_was_scheduled();
2140 return this->future_.get();
2145 template<
typename F>
2152 template<
typename F,
typename P>
2156 : transwarp::detail::task_impl_base<result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents))
2162 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2169 using result_type = void;
2176 this->schedule_mode_ =
false;
2182 void get()
const override {
2184 this->future_.get();
2189 template<
typename F>
2196 template<
typename F,
typename P>
2200 : transwarp::detail::task_impl_base<
result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents))
2211 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2222 template<
typename F>
2226 : transwarp::detail::task_impl_proxy<
result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents)...)
2231 template<
typename F,
typename P>
2234 : transwarp::detail::task_impl_proxy<
result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents))
2244 template<
typename TaskType_,
typename Functor_>
2245 std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type,
result_type>>
2248 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Functor_>(
functor), std::dynamic_pointer_cast<
transwarp::task<result_type>>(const_cast<task_impl*>(
this)->shared_from_this())));
2252 template<
typename TaskType_,
typename Functor_>
2253 std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type,
result_type>>
2256 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Functor_>(
functor), std::dynamic_pointer_cast<
transwarp::task<result_type>>(const_cast<task_impl*>(
this)->shared_from_this())));
2264 template<
typename ResultType>
2266 public std::enable_shared_from_this<value_task<ResultType>> {
2276 template<
typename T>
2280 : node_(new transwarp::
node),
2281 future_(transwarp::detail::make_future_with_value<
result_type>(std::forward<T>(value)))
2283 transwarp::detail::node_manip::set_type(*node_, task_type::value);
2284 transwarp::detail::node_manip::set_name(*node_, (has_name ? std::shared_ptr<std::string>(
new std::string(std::move(name))) :
nullptr));
2294 template<
typename TaskType_,
typename Functor_>
2295 std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type,
result_type>>
2298 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Functor_>(
functor), std::dynamic_pointer_cast<
transwarp::task<result_type>>(const_cast<value_task*>(
this)->shared_from_this())));
2302 template<
typename TaskType_,
typename Functor_>
2303 std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type,
result_type>>
2306 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Functor_>(
functor), std::dynamic_pointer_cast<
transwarp::task<result_type>>(const_cast<value_task*>(
this)->shared_from_this())));
2324 transwarp::detail::node_manip::set_priority(*node_, priority);
2335 transwarp::detail::node_manip::set_priority(*node_, 0);
2349 transwarp::detail::node_manip::set_custom_data(*node_, std::move(custom_data));
2360 transwarp::detail::node_manip::set_custom_data(*node_,
nullptr);
2369 const std::shared_future<result_type>&
get_future() const noexcept
override {
2374 const std::shared_ptr<transwarp::node>&
get_node() const noexcept
override {
2433 void set_value(
const typename transwarp::decay<result_type>::type& value)
override {
2434 future_ = transwarp::detail::make_future_with_value<result_type>(value);
2438 void set_value(
typename transwarp::decay<result_type>::type&& value)
override {
2439 future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2444 future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2466 typename transwarp::result<result_type>::type
get()
const override {
2467 return future_.get();
2490 void set_node_id(std::size_t
id) noexcept
override {
2491 transwarp::detail::node_manip::set_id(*node_,
id);
2498 void visit_depth(
const std::function<
void(
transwarp::itask&)>& visitor)
override {
2506 void unvisit() noexcept
override {
2510 std::shared_ptr<transwarp::node> node_;
2511 std::shared_future<result_type> future_;
2512 bool visited_ =
false;
2517 template<
typename TaskType,
typename Functor,
typename... Parents>
2518 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type,
typename Parents::result_type...>>
2519 make_task(TaskType, std::string name, Functor&&
functor, std::shared_ptr<Parents>... parents) {
2521 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Functor>(
functor), std::move(parents)...));
2525 template<
typename TaskType,
typename Functor,
typename... Parents>
2526 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type,
typename Parents::result_type...>>
2529 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Functor>(
functor), std::move(parents)...));
2534 template<
typename TaskType,
typename Functor,
typename ParentType>
2535 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>>
2538 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Functor>(
functor), std::move(parents)));
2542 template<
typename TaskType,
typename Functor,
typename ParentType>
2543 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>>
2546 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Functor>(
functor), std::move(parents)));
2551 template<
typename Value>
2552 std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>>
2555 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Value>(value)));
2559 template<
typename Value>
2560 std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>>
2563 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Value>(value)));
2568 template<
typename FinalResultType>
2572 virtual ~
graph() =
default;
2575 virtual const std::shared_ptr<transwarp::task<FinalResultType>>&
final_task()
const = 0;
2580 template<
typename Graph>
2590 : generator_(std::move(generator)),
2591 minimum_(minimum_size),
2592 maximum_(maximum_size),
2593 finished_(maximum_size)
2598 if (minimum_ > maximum_) {
2601 for (std::size_t i=0; i<minimum_; ++i) {
2602 idle_.push(generate());
2617 std::shared_ptr<transwarp::node> finished_node;
2619 std::lock_guard<spinlock> lock(spinlock_);
2620 if (!finished_.empty()) {
2621 finished_node = finished_.front(); finished_.pop();
2625 std::shared_ptr<Graph> g;
2626 if (finished_node) {
2627 g = busy_.find(finished_node)->second;
2629 if (maybe_resize && idle_.empty()) {
2632 if (idle_.empty()) {
2635 g = idle_.front(); idle_.pop();
2636 busy_.emplace(g->final_task()->get_node(), g);
2639 const auto& future = g->final_task()->get_future();
2640 if (future.valid()) {
2659 return idle_.size() + busy_.size();
2674 std::lock_guard<spinlock> lock(spinlock_);
2675 return idle_.size() + finished_.size();
2680 std::lock_guard<spinlock> lock(spinlock_);
2681 return busy_.size() - finished_.size();
2687 if (new_size >
size()) {
2688 const std::size_t count = new_size -
size();
2689 for (std::size_t i=0; i<count; ++i) {
2690 if (
size() == maximum_) {
2693 idle_.push(generate());
2695 }
else if (new_size <
size()) {
2696 const std::size_t count =
size() - new_size;
2697 for (std::size_t i=0; i<count; ++i) {
2698 if (idle_.empty() ||
size() == minimum_) {
2708 decltype(finished_) finished{finished_.capacity()};
2710 std::lock_guard<spinlock> lock(spinlock_);
2711 finished_.swap(finished);
2713 while (!finished.empty()) {
2714 const std::shared_ptr<transwarp::node>
node = finished.front(); finished.pop();
2715 const auto it = busy_.find(node);
2716 idle_.push(it->second);
2726 void lock() noexcept {
2727 while (locked_.test_and_set(std::memory_order_acquire));
2730 void unlock() noexcept {
2731 locked_.clear(std::memory_order_release);
2735 std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
2742 finished_listener(graph_pool<Graph>& pool)
2748 std::lock_guard<spinlock> lock(pool_.spinlock_);
2749 pool_.finished_.push(node);
2753 graph_pool<Graph>& pool_;
2756 std::shared_ptr<Graph> generate() {
2757 std::shared_ptr<Graph> graph = generator_();
2762 std::function<std::shared_ptr<Graph>()> generator_;
2763 std::size_t minimum_;
2764 std::size_t maximum_;
2765 mutable spinlock spinlock_;
2767 std::queue<std::shared_ptr<Graph>> idle_;
2768 std::unordered_map<std::shared_ptr<transwarp::node>, std::shared_ptr<Graph>> busy_;
2769 std::shared_ptr<transwarp::listener> listener_{
new finished_listener(*
this)};
void set_custom_data_all(std::shared_ptr< void > custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1622
constexpr transwarp::wait_any_type wait_any
The wait_any task tag.
Definition: transwarp.h:128
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2364
The executor interface used to perform custom task execution.
Definition: transwarp.h:283
Exception thrown when a task was destroyed prematurely.
Definition: transwarp.h:61
A simple thread pool used to execute tasks in parallel.
Definition: transwarp.h:530
void cancel_all(bool) noexceptoverride
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2480
void operator()(const transwarp::itask &task) const
Definition: transwarp.h:984
Node manipulation.
Definition: transwarp.h:480
TaskType task_type
The task type.
Definition: transwarp.h:2122
void remove_listeners(transwarp::event_type) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2391
Generates a graph.
Definition: transwarp.h:1008
The consume type. Used for tag dispatch.
Definition: transwarp.h:115
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1149
void schedule_all(transwarp::executor &) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2412
Removes the executor from the given task.
Definition: transwarp.h:1068
Definition: transwarp.h:931
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:1604
Definition: transwarp.h:140
Just after a task was canceled (handle_event called on thread that task is run on) ...
TaskType task_type
The task type.
Definition: transwarp.h:2166
A callable to run a task given its parents.
Definition: transwarp.h:1328
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task.
Definition: transwarp.h:2443
void schedule_all(bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1752
void ensure_task_was_scheduled() const
Checks if the task was scheduled and throws transwarp::control_error if it's not. ...
Definition: transwarp.h:1916
Sets parents and level of the node.
Definition: transwarp.h:980
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, std::string name, Functor_ &&functor) const
Creates a continuation to this task.
Definition: transwarp.h:2246
std::shared_ptr< transwarp::value_task< typename transwarp::decay< Value >::type > > make_value_task(std::string name, Value &&value)
A factory function to create a new value task.
Definition: transwarp.h:2553
Definition: transwarp.h:1298
void set_executor(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs.
Definition: transwarp.h:2310
void set_executor_all(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to all tasks which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1562
bool was_scheduled() const noexceptoverride
Returns whether the task was scheduled and not reset afterwards. This means that the underlying futur...
Definition: transwarp.h:1810
void cancel(bool) noexceptoverride
No-op because a value task never runs.
Definition: transwarp.h:2477
Just before a task is scheduled (handle_event called on thread of caller to schedule()) ...
void schedule_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2409
void schedule_all(transwarp::executor &executor, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1761
std::size_t minimum_size() const
Returns the minimum size of the pool.
Definition: transwarp.h:2663
std::string get_name() const override
Returns the name of the executor.
Definition: transwarp.h:1520
Assigns a priority to the given task.
Definition: transwarp.h:1076
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task.
Definition: transwarp.h:2433
void push(T &&value)
Pushes a new value onto the end of the buffer. If that exceeds the capacity of the buffer then the ol...
Definition: transwarp.h:1399
void schedule(transwarp::executor &executor) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1718
Assigns an executor to the given task.
Definition: transwarp.h:1056
std::size_t capacity() const
Returns the capacity of the buffer.
Definition: transwarp.h:1419
A simple circular buffer (FIFO). ValueType must support default construction. The buffer lets you pus...
Definition: transwarp.h:1373
Unvisits the given task.
Definition: transwarp.h:1140
void set_priority(std::size_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:2323
void cancel_all(bool enabled) noexceptoverride
If enabled then all pending tasks in the graph are canceled which will throw transwarp::task_canceled...
Definition: transwarp.h:1858
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:1629
void set_custom_data_all(std::shared_ptr< void > custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2354
std::tuple< std::shared_future< ParentResults >...> get_futures(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &input)
Returns the futures from the given tuple of tasks.
Definition: transwarp.h:631
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task.
Definition: transwarp.h:2438
void schedule_all(transwarp::schedule_type type, bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1786
Scheduling according to a breadth-first search (default)
void schedule_all() override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1735
void set_executor(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to this task which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1551
value_task(bool has_name, std::string name, T &&value)
A value task is defined by name and value. Note: Don't use this constructor directly, use transwarp::make_value_task.
Definition: transwarp.h:2279
void set_custom_data(std::shared_ptr< void > custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2345
The task class.
Definition: transwarp.h:406
Definition: transwarp.h:942
void schedule_all(transwarp::executor &executor, transwarp::schedule_type type, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1795
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2339
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2077
void set_priority(std::size_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:1584
const std::shared_ptr< std::string > & get_executor() const noexcept
The optional, task-specific executor (may be null)
Definition: transwarp.h:178
void assign_node_if(Functor &functor, std::shared_ptr< transwarp::node > node) noexcept
Assigns the node to the given functor if the functor is a subclass of transwarp::functor.
Definition: transwarp.h:1253
void reset() override
Resets this task.
Definition: transwarp.h:1834
const std::vector< std::shared_ptr< node > > & get_parents() const noexcept
The task's parents (may be empty)
Definition: transwarp.h:183
Just before a task's functor is invoked (handle_event called on thread that task is run on) ...
Definition: transwarp.h:929
The task's functor accepts the first parent future that becomes ready.
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, Functor_ &&functor) const
Creates a continuation to this task. Overload for omitting for task name.
Definition: transwarp.h:2254
The consume_any type. Used for tag dispatch.
Definition: transwarp.h:119
ResultType result_type
The result type of this task.
Definition: transwarp.h:2073
void set_custom_data(std::shared_ptr< void > custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1612
bool was_scheduled() const noexceptoverride
Returns true because a value task is scheduled once on construction.
Definition: transwarp.h:2448
void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &) override
Pushes the functor into the thread pool for asynchronous execution.
Definition: transwarp.h:1525
void wait_for_all(const std::vector< std::shared_ptr< transwarp::task< ParentResultType >>> &parents)
Waits for all parents to finish.
Definition: transwarp.h:676
The accept type. Used for tag dispatch.
Definition: transwarp.h:107
virtual const std::shared_ptr< transwarp::task< FinalResultType > > & final_task() const =0
Returns the final task of the graph.
const std::shared_future< result_type > & get_future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:1642
constexpr transwarp::accept_type accept
The accept task tag.
Definition: transwarp.h:108
TaskType task_type
The task type.
Definition: transwarp.h:1544
void schedule_all(transwarp::executor &, transwarp::schedule_type, bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2430
void remove_listeners() override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2394
bool has_result() const noexceptoverride
Returns whether this task contains a result.
Definition: transwarp.h:1829
std::size_t maximum_size() const
Returns the maximum size of the pool.
Definition: transwarp.h:2668
The listener interface to listen to events raised by tasks.
Definition: transwarp.h:311
Exception thrown when a task is canceled.
Definition: transwarp.h:52
TaskType task_type
The task type.
Definition: transwarp.h:2215
void wait() const override
Waits for the task to complete. Should only be called if was_scheduled() is true, throws transwarp::c...
Definition: transwarp.h:1816
Scheduling according to a depth-first search.
void add_listener(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only.
Definition: transwarp.h:1661
std::size_t get_level() const noexcept
The task level.
Definition: transwarp.h:163
std::shared_future< ResultType > make_future_with_value(Value &&value)
Returns a ready future with the given value as its state.
Definition: transwarp.h:1259
std::string get_name() const override
Returns the name of the executor.
Definition: transwarp.h:1494
void schedule() override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1701
void add_listener(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2379
void add_listener(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types.
Definition: transwarp.h:1652
std::shared_ptr< Graph > next_idle_graph(bool maybe_resize=true)
Returns the next idle graph. If there are no idle graphs then it will attempt to double the pool size...
Definition: transwarp.h:2616
void ensure_task_not_running() const
Checks if the task is currently running and throws transwarp::control_error if it is...
Definition: transwarp.h:1909
Just before a task starts running (handle_event called on thread that task is run on) ...
Removes custom data from the given task.
Definition: transwarp.h:1108
void schedule_all(transwarp::executor &, bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2418
Resets the given task.
Definition: transwarp.h:1036
const std::shared_ptr< std::string > & get_name() const noexcept
The optional task name (may be null)
Definition: transwarp.h:173
bool full() const
Returns whether the buffer is full.
Definition: transwarp.h:1435
A base class for a user-defined functor that needs access to the node associated to the task or a can...
Definition: transwarp.h:447
constexpr transwarp::consume_any_type consume_any
The consume_any task tag.
Definition: transwarp.h:120
bool is_canceled() const noexcept
Returns whether the associated task is canceled.
Definition: transwarp.h:198
A node carrying meta-data of a task.
Definition: transwarp.h:146
Exception thrown when a task is used in unintended ways.
Definition: transwarp.h:79
void resize(std::size_t new_size)
Resizes the graph pool to the given new size if possible.
Definition: transwarp.h:2685
Resets the priority of the given task.
Definition: transwarp.h:1088
void schedule() override
No-op because a value task never runs.
Definition: transwarp.h:2397
std::size_t size() const
Returns the number of populated values of the buffer. Its maximum value equals the capacity of the bu...
Definition: transwarp.h:1425
Returns the result type of a std::shared_future<T>
Definition: transwarp.h:399
void add_listener(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2382
void wait() const override
No-op because a value task never runs.
Definition: transwarp.h:2453
void schedule_all(transwarp::executor &executor, transwarp::schedule_type type) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1777
std::size_t get_priority() const noexcept
The task priority (defaults to 0)
Definition: transwarp.h:188
const std::shared_ptr< transwarp::node > & get_parent() const noexcept
Returns the parent node.
Definition: transwarp.h:251
A task proxy.
Definition: transwarp.h:2067
void transwarp_cancel_point() const
If the associated task is canceled then this will throw transwarp::task_canceled which will stop the ...
Definition: transwarp.h:461
The base task class that contains the functionality that can be used with all result types (void and ...
Definition: transwarp.h:1540
Definition: transwarp.h:617
The task's functor consumes all parent results.
std::shared_ptr< Graph > wait_for_next_idle_graph(bool maybe_resize=true)
Just like next_idle_graph() but waits for a graph to become available. The returned graph will always...
Definition: transwarp.h:2648
Just after a task has finished running (handle_event called on thread that task is run on) ...
void remove_listener(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2385
void set_priority_all(std::size_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority.
Definition: transwarp.h:2329
Base class for exceptions.
Definition: transwarp.h:43
void schedule_all(transwarp::schedule_type) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2421
std::size_t size() const
Returns the current total size of the pool (sum of idle and busy graphs)
Definition: transwarp.h:2658
void reclaim()
Reclaims finished graphs by marking them as idle again.
Definition: transwarp.h:2707
constexpr transwarp::consume_type consume
The consume task tag.
Definition: transwarp.h:116
std::size_t get_id() const noexcept
The task ID.
Definition: transwarp.h:158
void schedule(transwarp::executor &, bool) override
No-op because a value task never runs.
Definition: transwarp.h:2406
A value task that stores a single value and doesn't require scheduling. Value tasks should be created...
Definition: transwarp.h:2265
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:2334
void remove_listeners(transwarp::event_type event) override
Removes all listeners for the given event type.
Definition: transwarp.h:1685
void remove_listener(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types.
Definition: transwarp.h:1668
Determines the type of the parents.
Definition: transwarp.h:1286
const std::shared_ptr< transwarp::node > & transwarp_node() const noexcept
The node associated to the task.
Definition: transwarp.h:455
Parent wait_for_any(const std::shared_ptr< transwarp::task< ParentResults >> &...parents)
Waits for the first parent to finish.
Definition: transwarp.h:699
void reset_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2474
void remove_listener(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only.
Definition: transwarp.h:1677
Assigns custom data to the given task.
Definition: transwarp.h:1096
The task's functor takes no arguments but waits for all parents to finish.
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task. Scheduling will have no effect after an exception has been set...
Definition: transwarp.h:1802
void cancel_all_but_one(const std::shared_ptr< transwarp::task< OneResult >> &one, const std::vector< std::shared_ptr< transwarp::task< ParentResultType >>> &parents)
Cancels all tasks but one.
Definition: transwarp.h:738
Definition: transwarp.h:757
Removes reference and const from a type.
Definition: transwarp.h:392
void remove_listeners() override
Removes all listeners.
Definition: transwarp.h:1691
const std::shared_ptr< void > & get_custom_data() const noexcept
The custom task data (may be null)
Definition: transwarp.h:193
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, std::string name, Functor_ &&functor) const
Creates a continuation to this task.
Definition: transwarp.h:2296
void call_with_each(const Functor &f, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &t)
Calls the functor with every element in the tuple.
Definition: transwarp.h:961
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:1598
void schedule_all(transwarp::executor &, transwarp::schedule_type) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2424
void schedule_all(transwarp::schedule_type, bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2427
An edge between two nodes.
Definition: transwarp.h:237
const std::shared_ptr< transwarp::node > & get_child() const noexcept
Returns the child node.
Definition: transwarp.h:256
void schedule_all(transwarp::schedule_type type) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1769
std::vector< transwarp::edge > get_graph() const override
Returns the graph of the task structure. This is mainly for visualizing the tasks and their interdepe...
Definition: transwarp.h:1866
graph_pool(std::function< std::shared_ptr< Graph >()> generator, std::size_t minimum_size, std::size_t maximum_size)
Constructs a graph pool by passing a generator to create a new graph and a minimum and maximum size o...
Definition: transwarp.h:2587
void set_value(typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2129
void schedule(transwarp::executor &) override
No-op because a value task never runs.
Definition: transwarp.h:2400
void reset_all() override
Resets all tasks in the graph.
Definition: transwarp.h:1842
A graph interface giving access to the final task as required by transwarp::graph_pool.
Definition: transwarp.h:2569
std::size_t busy_count() const
Returns the number of busy graphs in the pool.
Definition: transwarp.h:2679
circular_buffer(std::size_t capacity)
Constructs a circular buffer with a given fixed capacity.
Definition: transwarp.h:1382
Definition: transwarp.h:748
ResultType result_type
The result type of this task.
Definition: transwarp.h:2272
Executor for parallel execution. Uses a simple thread pool.
Definition: transwarp.h:1506
The wait_any type. Used for tag dispatch.
Definition: transwarp.h:127
void set_priority_all(std::size_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority (e.g. a custom executor)
Definition: transwarp.h:1591
void schedule(bool reset) override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1710
virtual std::string get_name() const =0
Returns the name of the executor.
std::string to_string(const transwarp::task_type &type)
String conversion for the task_type enumeration.
Definition: transwarp.h:88
virtual void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &node)=0
Runs a task which is wrapped by the given functor. The functor only captures one shared pointer and c...
void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &) override
Runs the functor on the current thread.
Definition: transwarp.h:1499
void remove_executor() override
Removes the executor from this task.
Definition: transwarp.h:1569
The accept_any type. Used for tag dispatch.
Definition: transwarp.h:111
transwarp::task_type get_type() const noexcept
The task type.
Definition: transwarp.h:168
std::shared_ptr< transwarp::task_impl< TaskType, typename std::decay< Functor >::type, typename Parents::result_type...> > make_task(TaskType, std::string name, Functor &&functor, std::shared_ptr< Parents >...parents)
A factory function to create a new task.
Definition: transwarp.h:2519
virtual void handle_event(transwarp::event_type event, const std::shared_ptr< transwarp::node > &node)=0
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions.
bool is_ready() const override
Returns true because a value task is always ready.
Definition: transwarp.h:2456
constexpr transwarp::wait_type wait
The wait task tag.
Definition: transwarp.h:124
The root type. Used for tag dispatch.
Definition: transwarp.h:103
The task's functor consumes the first parent result that becomes ready.
A task representing a piece of work given by functor and parent tasks. By connecting tasks a directed...
Definition: transwarp.h:2212
void remove_listener(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2388
The wait type. Used for tag dispatch.
Definition: transwarp.h:123
void cancel(bool enabled) noexceptoverride
If enabled then this task is canceled which will throw transwarp::task_canceled when retrieving the t...
Definition: transwarp.h:1851
Schedules using the given executor.
Definition: transwarp.h:1023
void swap(circular_buffer &buffer)
Swaps this buffer with the given buffer.
Definition: transwarp.h:1440
Result call(std::size_t node_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Calls the functor of the given task with the results from the tuple of parents. Throws transwarp::tas...
Definition: transwarp.h:914
constexpr transwarp::root_type root
The root task tag.
Definition: transwarp.h:104
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:2359
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2085
void schedule_all(transwarp::executor &executor) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1743
const value_type & front() const
Returns the value at the front of the buffer (the oldest value). This is undefined if the buffer is e...
Definition: transwarp.h:1406
static Result work(std::size_t node_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Definition: transwarp.h:841
schedule_type
Determines in which order tasks are scheduled in the graph.
Definition: transwarp.h:322
The task's functor accepts all parent futures.
A graph pool that allows running multiple instances of the same graph in parallel.
Definition: transwarp.h:2581
Applies final bookkeeping to the task.
Definition: transwarp.h:996
void remove_executor_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2319
typename transwarp::detail::functor_result< TaskType, Functor, ParentResults...>::type result_type
The result type of this task.
Definition: transwarp.h:2218
void schedule(transwarp::executor &executor, bool reset) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1727
std::vector< transwarp::edge > get_graph() const override
Returns an empty graph because a value task doesn't have parents.
Definition: transwarp.h:2483
const std::shared_ptr< transwarp::node > & get_node() const noexceptoverride
Returns the associated node.
Definition: transwarp.h:1647
task_impl(bool has_name, std::string name, F &&functor, std::vector< std::shared_ptr< transwarp::task< P >>> parents)
A task is defined by name, functor, and parent tasks. Note: Don't use this constructor directly...
Definition: transwarp.h:2233
task_type
The possible task types.
Definition: transwarp.h:31
void set_value() override
Assigns a value to this task. Scheduling will have no effect after a call to this. Calling reset() will reset this and re-enable scheduling.
Definition: transwarp.h:2173
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, Functor_ &&functor) const
Creates a continuation to this task. Overload for omitting the task name.
Definition: transwarp.h:2304
void pop()
Removes the value at the front of the buffer (the oldest value)
Definition: transwarp.h:1411
event_type
The task events that can be subscribed to using the listener interface.
Definition: transwarp.h:300
The task's functor takes no arguments but waits for the first parent to finish.
Exception thrown when an invalid parameter was passed to a function.
Definition: transwarp.h:70
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:1635
void reset() override
No-op because a value task never runs.
Definition: transwarp.h:2471
std::shared_future< void > make_ready_future()
Returns a ready future.
Definition: transwarp.h:1266
void set_executor_all(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2313
constexpr transwarp::accept_any_type accept_any
The accept_any task tag.
Definition: transwarp.h:112
bool empty() const
Returns whether the buffer is empty.
Definition: transwarp.h:1430
const std::shared_future< result_type > & get_future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:2369
Cancels or resumes the given task.
Definition: transwarp.h:1044
std::size_t idle_count() const
Returns the number of idle graphs in the pool.
Definition: transwarp.h:2673
void remove_executor() override
No-op because a value task never runs.
Definition: transwarp.h:2316
void schedule(bool) override
No-op because a value task never runs.
Definition: transwarp.h:2403
void schedule_all(bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2415
Visits the given task using the visitor given in the constructor.
Definition: transwarp.h:1128
Pushes the given task into the vector of tasks.
Definition: transwarp.h:1116
Executor for sequential execution. Runs functors sequentially on the same thread. ...
Definition: transwarp.h:1482
bool has_result() const noexceptoverride
Returns true because a value task always contains a result.
Definition: transwarp.h:2461
task_impl(bool has_name, std::string name, F &&functor, std::shared_ptr< transwarp::task< ParentResults >>...parents)
A task is defined by name, functor, and parent tasks. Note: Don't use this constructor directly...
Definition: transwarp.h:2225
std::shared_future< ResultType > make_future_with_exception(std::exception_ptr exception)
Returns a ready future with the given exception as its state.
Definition: transwarp.h:1274
void remove_executor_all() override
Removes the executor from all tasks.
Definition: transwarp.h:1576
const std::shared_ptr< transwarp::node > & get_node() const noexceptoverride
Returns the associated node.
Definition: transwarp.h:2374
bool is_ready() const override
Returns whether the task has finished processing. Should only be called if was_scheduled() is true...
Definition: transwarp.h:1823
An interface for the task class.
Definition: transwarp.h:329
Result run_task(std::size_t node_id, const std::weak_ptr< Task > &task, Args &&...args)
Runs the task with the given arguments, hence, invoking the task's functor.
Definition: transwarp.h:651