21 #include <type_traits>
22 #include <unordered_map>
46 : std::runtime_error(message)
103 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
107 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
111 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
115 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
119 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
123 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
127 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
153 node& operator=(
const node&) =
delete;
173 const std::shared_ptr<std::string>&
get_name() const noexcept {
183 const std::vector<std::shared_ptr<node>>&
get_parents() const noexcept {
199 return canceled_.load();
204 return avg_idletime_us_.load();
209 return avg_waittime_us_.load();
214 return avg_runtime_us_.load();
221 std::size_t level_ = 0;
223 std::shared_ptr<std::string> name_;
224 std::shared_ptr<std::string> executor_;
225 std::vector<std::shared_ptr<node>> parents_;
226 std::size_t priority_ = 0;
227 std::shared_ptr<void> custom_data_;
228 std::atomic<bool> canceled_{
false};
229 std::atomic<std::int64_t> avg_idletime_us_{-1};
230 std::atomic<std::int64_t> avg_waittime_us_{-1};
231 std::atomic<std::int64_t> avg_runtime_us_{-1};
238 const std::shared_ptr<std::string>& name = node.
get_name();
240 s +=
"<" + *name +
">" + separator;
243 s +=
" id=" + std::to_string(node.
get_id());
244 s +=
" lev=" + std::to_string(node.
get_level());
245 const std::shared_ptr<std::string>& exec = node.
get_executor();
247 s += separator +
"<" + *exec +
">";
250 if (avg_idletime_us >= 0) {
251 s += separator +
"avg-idle-us=" + std::to_string(avg_idletime_us);
254 if (avg_waittime_us >= 0) {
255 s += separator +
"avg-wait-us=" + std::to_string(avg_waittime_us);
258 if (avg_runtime_us >= 0) {
259 s += separator +
"avg-run-us=" + std::to_string(avg_runtime_us);
269 edge(std::shared_ptr<transwarp::node> parent, std::shared_ptr<transwarp::node> child) noexcept
270 : parent_(std::move(parent)), child_(std::move(child))
275 edge& operator=(
const edge&) =
default;
280 const std::shared_ptr<transwarp::node>&
get_parent() const noexcept {
285 const std::shared_ptr<transwarp::node>&
get_child() const noexcept {
290 std::shared_ptr<transwarp::node> parent_;
291 std::shared_ptr<transwarp::node> child_;
301 inline std::string
to_string(
const std::vector<transwarp::edge>&
graph,
const std::string& separator=
"\n") {
302 std::string dot =
"digraph {" + separator;
317 virtual std::string
get_name()
const = 0;
324 virtual void execute(
const std::function<
void()>&
functor,
const std::shared_ptr<transwarp::node>&
node) = 0;
360 virtual ~
itask() =
default;
362 virtual void set_executor(std::shared_ptr<transwarp::executor>
executor) = 0;
363 virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
364 virtual void remove_executor() = 0;
365 virtual void remove_executor_all() = 0;
366 virtual void set_priority(std::size_t priority) = 0;
367 virtual void set_priority_all(std::size_t priority) = 0;
368 virtual void reset_priority() = 0;
369 virtual void reset_priority_all() = 0;
370 virtual void set_custom_data(std::shared_ptr<void> custom_data) = 0;
371 virtual void set_custom_data_all(std::shared_ptr<void> custom_data) = 0;
372 virtual void remove_custom_data() = 0;
373 virtual void remove_custom_data_all() = 0;
374 virtual const std::shared_ptr<transwarp::node>& get_node()
const noexcept = 0;
375 virtual void add_listener(std::shared_ptr<transwarp::listener>
listener) = 0;
376 virtual void add_listener(
transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
377 virtual void add_listener_all(std::shared_ptr<transwarp::listener> listener) = 0;
378 virtual void add_listener_all(
transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
379 virtual void remove_listener(
const std::shared_ptr<transwarp::listener>& listener) = 0;
380 virtual void remove_listener(
transwarp::event_type event,
const std::shared_ptr<transwarp::listener>& listener) = 0;
381 virtual void remove_listener_all(
const std::shared_ptr<transwarp::listener>& listener) = 0;
382 virtual void remove_listener_all(
transwarp::event_type event,
const std::shared_ptr<transwarp::listener>& listener) = 0;
383 virtual void remove_listeners() = 0;
385 virtual void remove_listeners_all() = 0;
387 virtual void schedule() = 0;
389 virtual void schedule(
bool reset) = 0;
391 virtual void schedule_all() = 0;
393 virtual void schedule_all(
bool reset_all) = 0;
399 virtual void set_exception(std::exception_ptr exception) = 0;
400 virtual bool was_scheduled()
const noexcept = 0;
401 virtual void wait()
const = 0;
402 virtual bool is_ready()
const = 0;
403 virtual bool has_result()
const = 0;
404 virtual void reset() = 0;
405 virtual void reset_all() = 0;
406 virtual void cancel(
bool enabled) noexcept = 0;
407 virtual void cancel_all(
bool enabled) noexcept = 0;
408 virtual std::size_t get_parent_count()
const noexcept = 0;
409 virtual std::size_t get_task_count()
const noexcept = 0;
410 virtual std::vector<transwarp::edge> get_graph()
const = 0;
421 virtual void visit_depth(
const std::function<
void(
itask&)>& visitor) = 0;
422 virtual void unvisit() noexcept = 0;
423 virtual void set_node_id(std::size_t
id) noexcept = 0;
430 using type =
typename std::remove_const<typename std::remove_reference<T>::type>::type;
437 using type =
typename std::result_of<decltype(&std::shared_future<T>::get)(std::shared_future<T>)>::type;
442 template<
typename ResultType>
445 using result_type = ResultType;
447 virtual ~
task() =
default;
449 virtual void set_value(
const typename transwarp::decay<result_type>::type& value) = 0;
450 virtual void set_value(
typename transwarp::decay<result_type>::type&& value) = 0;
451 virtual const std::shared_future<result_type>& get_future()
const noexcept = 0;
452 virtual typename transwarp::result<result_type>::type
get()
const = 0;
456 template<
typename ResultType>
459 using result_type = ResultType&;
461 virtual ~
task() =
default;
463 virtual void set_value(
typename transwarp::decay<result_type>::type& value) = 0;
464 virtual const std::shared_future<result_type>& get_future()
const noexcept = 0;
465 virtual typename transwarp::result<result_type>::type
get()
const = 0;
472 using result_type = void;
474 virtual ~
task() =
default;
476 virtual void set_value() = 0;
477 virtual const std::shared_future<result_type>& get_future()
const noexcept = 0;
478 virtual result_type
get()
const = 0;
493 return transwarp_node_;
499 if (transwarp_node_->is_canceled()) {
508 std::shared_ptr<transwarp::node> transwarp_node_;
523 static void set_level(
transwarp::node& node, std::size_t level) noexcept {
531 static void set_name(
transwarp::node& node, std::shared_ptr<std::string> name) noexcept {
532 node.name_ = std::move(name);
537 node.executor_ = std::move(executor);
539 node.executor_.reset();
543 static void add_parent(
transwarp::node& node, std::shared_ptr<transwarp::node> parent) {
544 node.parents_.push_back(std::move(parent));
547 static void set_priority(
transwarp::node& node, std::size_t priority) noexcept {
548 node.priority_ = priority;
551 static void set_custom_data(
transwarp::node& node, std::shared_ptr<void> custom_data) {
553 node.custom_data_ = std::move(custom_data);
555 node.custom_data_.reset();
559 static void set_canceled(
transwarp::node& node,
bool enabled) noexcept {
560 node.canceled_ = enabled;
563 static void set_avg_idletime_us(
transwarp::node& node, std::int64_t idletime) noexcept {
564 node.avg_idletime_us_ = idletime;
567 static void set_avg_waittime_us(
transwarp::node& node, std::int64_t waittime) noexcept {
568 node.avg_waittime_us_ = waittime;
571 static void set_avg_runtime_us(
transwarp::node& node, std::int64_t runtime) noexcept {
572 node.avg_runtime_us_ = runtime;
585 if (n_threads == 0) {
588 const std::size_t n_target = threads_.size() + n_threads;
589 while (threads_.size() < n_target) {
592 thread = std::thread(&thread_pool::worker,
this);
598 threads_.push_back(std::move(thread));
617 void push(
const std::function<
void()>&
functor) {
619 std::lock_guard<std::mutex> lock(mutex_);
622 cond_var_.notify_one();
631 std::unique_lock<std::mutex> lock(mutex_);
632 cond_var_.wait(lock, [
this]{
633 return done_ || !functors_.empty();
635 if (done_ && functors_.empty()) {
638 functor = functors_.front();
647 std::lock_guard<std::mutex> lock(mutex_);
650 cond_var_.notify_all();
651 for (std::thread& thread : threads_) {
658 std::vector<std::thread> threads_;
659 std::queue<std::function<void()>> functors_;
660 std::condition_variable cond_var_;
665 template<
int offset,
typename... ParentResults>
667 static void work(
const std::tuple<std::shared_ptr<
transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
668 std::get<offset>(target) = std::get<offset>(source)->get_future();
673 template<
typename... ParentResults>
679 template<
typename... ParentResults>
681 std::tuple<std::shared_future<ParentResults>...>
result;
687 template<
typename ParentResultType>
689 std::vector<std::shared_future<ParentResultType>>
result;
690 result.reserve(input.size());
692 result.emplace_back(
task->get_future());
699 template<
typename Result,
typename Task,
typename... Args>
700 Result
run_task(std::size_t node_id,
const std::weak_ptr<Task>&
task, Args&&... args) {
701 const std::shared_ptr<Task> t = task.lock();
705 if (t->node_->is_canceled()) {
709 return t->functor_(std::forward<Args>(args)...);
713 inline void wait_for_all() {}
716 template<
typename ParentResult,
typename... ParentResults>
718 parent->get_future().wait();
724 template<
typename ParentResultType>
727 parent->get_future().wait();
732 template<
typename Parent>
733 Parent wait_for_any_impl() {
737 template<
typename Parent,
typename ParentResult,
typename... ParentResults>
739 const std::future_status status = parent->get_future().wait_for(std::chrono::microseconds(1));
740 if (status == std::future_status::ready) {
743 return transwarp::detail::wait_for_any_impl<Parent>(parents...);
747 template<
typename Parent,
typename... ParentResults>
750 Parent parent = transwarp::detail::wait_for_any_impl<Parent>(
parents...);
759 template<
typename ParentResultType>
763 const std::future_status status = parent->get_future().wait_for(std::chrono::microseconds(1));
764 if (status == std::future_status::ready) {
772 template<
typename OneResult>
776 template<
typename OneResult,
typename ParentResult,
typename... ParentResults>
779 parent->cancel(
true);
786 template<
typename OneResult,
typename ParentResultType>
790 parent->cancel(
true);
796 template<
typename TaskType,
bool done,
int total,
int... n>
798 template<
typename Result,
typename Task,
typename... ParentResults>
801 work<Result>(node_id, task,
parents);
805 template<
typename TaskType>
808 template<
int total,
int... n>
810 template<
typename Result,
typename Task,
typename... ParentResults>
812 return transwarp::detail::run_task<Result>(node_id, task);
818 template<
typename Result,
typename Task,
typename ParentResultType>
820 return transwarp::detail::run_task<Result>(node_id, task);
824 template<
int total,
int... n>
826 template<
typename Result,
typename Task,
typename... ParentResults>
830 return transwarp::detail::run_task<Result>(node_id, task, std::get<n>(futures)...);
836 template<
typename Result,
typename Task,
typename ParentResultType>
843 template<
int total,
int... n>
845 template<
typename Result,
typename Task,
typename... ParentResults>
847 using parent_t =
typename std::remove_reference<decltype(std::get<0>(
parents))>::type;
848 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
850 return transwarp::detail::run_task<Result>(node_id, task, parent->get_future());
856 template<
typename Result,
typename Task,
typename ParentResultType>
860 return transwarp::detail::run_task<Result>(node_id, task, parent->get_future());
864 template<
int total,
int... n>
866 template<
typename Result,
typename Task,
typename... ParentResults>
869 return transwarp::detail::run_task<Result>(node_id, task, std::get<n>(
parents)->get_future().get()...);
875 template<
typename Result,
typename Task,
typename ParentResultType>
878 std::vector<ParentResultType> results;
879 results.reserve(
parents.size());
881 results.emplace_back(parent->get_future().get());
883 return transwarp::detail::run_task<Result>(node_id, task, std::move(results));
887 template<
int total,
int... n>
889 template<
typename Result,
typename Task,
typename... ParentResults>
891 using parent_t =
typename std::remove_reference<decltype(std::get<0>(
parents))>::type;
892 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
894 return transwarp::detail::run_task<Result>(node_id, task, parent->get_future().get());
900 template<
typename Result,
typename Task,
typename ParentResultType>
904 return transwarp::detail::run_task<Result>(node_id, task, parent->get_future().get());
908 template<
int total,
int... n>
910 template<
typename Result,
typename Task,
typename... ParentResults>
913 get_all(std::get<n>(
parents)...);
914 return transwarp::detail::run_task<Result>(node_id, task);
916 template<
typename T,
typename... Args>
917 static void get_all(
const T& arg,
const Args& ...args) {
918 arg->get_future().get();
921 static void get_all() {}
926 template<
typename Result,
typename Task,
typename ParentResultType>
930 parent->get_future().get();
932 return transwarp::detail::run_task<Result>(node_id, task);
936 template<
int total,
int... n>
938 template<
typename Result,
typename Task,
typename... ParentResults>
940 using parent_t =
typename std::remove_reference<decltype(std::get<0>(
parents))>::type;
941 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
943 parent->get_future().get();
944 return transwarp::detail::run_task<Result>(node_id, task);
950 template<
typename Result,
typename Task,
typename ParentResultType>
954 parent->get_future().get();
955 return transwarp::detail::run_task<Result>(node_id, task);
962 template<
typename TaskType,
typename Result,
typename Task,
typename... ParentResults>
964 constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_future<ParentResults>...>>::value;
966 work<Result>(node_id, task,
parents);
972 template<
typename TaskType,
typename Result,
typename Task,
typename ParentResultType>
975 work<Result>(node_id, task,
parents);
982 template<std::size_t end, std::size_t idx, std::size_t... i>
985 template<std::size_t end, std::size_t... i>
990 template<std::
size_t b, std::
size_t e>
995 template<
typename Functor,
typename... ParentResults>
998 template<std::size_t i, std::size_t... j,
typename Functor,
typename... ParentResults>
1000 auto ptr = std::get<i>(t);
1009 template<
typename Functor,
typename... ParentResults>
1011 constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>>::value;
1012 using index_t =
typename transwarp::detail::index_range<0, n>::type;
1013 transwarp::detail::call_with_each_index(index_t(), f, t);
1017 template<
typename Functor,
typename ParentResultType>
1034 transwarp::detail::node_manip::add_parent(node_, task.get_node());
1035 if (node_.
get_level() <= task.get_node()->get_level()) {
1037 transwarp::detail::node_manip::set_level(node_, task.get_node()->get_level() + 1);
1047 : count_(count), id_(0) {}
1051 task.set_node_id(id_++);
1054 std::size_t& count_;
1064 const std::shared_ptr<transwarp::node>&
node = task.get_node();
1065 for (
const std::shared_ptr<transwarp::node>& parent : node->get_parents()) {
1066 graph_.emplace_back(parent, node);
1070 std::vector<transwarp::edge>& graph_;
1076 : reset_(reset), executor_(executor) {}
1079 task.schedule_impl(reset_, executor_);
1097 : enabled_(enabled) {}
1100 task.cancel(enabled_);
1109 : executor_(std::move(executor)) {}
1112 task.set_executor(executor_);
1115 std::shared_ptr<transwarp::executor> executor_;
1122 task.remove_executor();
1129 : priority_(priority) {}
1132 task.set_priority(priority_);
1135 std::size_t priority_;
1142 task.reset_priority();
1149 : custom_data_(std::move(custom_data)) {}
1152 task.set_custom_data(custom_data_);
1155 std::shared_ptr<void> custom_data_;
1162 task.remove_custom_data();
1172 tasks_.push_back(&task);
1175 std::vector<transwarp::itask*>& tasks_;
1181 : listener_(std::move(listener))
1185 task.add_listener(listener_);
1188 std::shared_ptr<transwarp::listener> listener_;
1194 : event_(event), listener_(std::move(listener))
1198 task.add_listener(event_, listener_);
1202 std::shared_ptr<transwarp::listener> listener_;
1208 : listener_(std::move(listener))
1212 task.remove_listener(listener_);
1215 std::shared_ptr<transwarp::listener> listener_;
1221 : event_(event), listener_(std::move(listener))
1225 task.remove_listener(event_, listener_);
1229 std::shared_ptr<transwarp::listener> listener_;
1236 task.remove_listeners();
1248 task.remove_listeners(event_);
1257 : visitor_(visitor) {}
1260 task.visit_depth(visitor_);
1263 const std::function<void(transwarp::itask&)>& visitor_;
1275 template<
typename TaskType,
typename Functor,
typename... ParentResults>
1277 static_assert(std::is_same<TaskType, transwarp::root_type>::value ||
1278 std::is_same<TaskType, transwarp::accept_type>::value ||
1279 std::is_same<TaskType, transwarp::accept_any_type>::value ||
1280 std::is_same<TaskType, transwarp::consume_type>::value ||
1281 std::is_same<TaskType, transwarp::consume_any_type>::value ||
1282 std::is_same<TaskType, transwarp::wait_type>::value ||
1283 std::is_same<TaskType, transwarp::wait_any_type>::value,
1284 "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1287 template<
typename Functor,
typename... ParentResults>
1289 static_assert(
sizeof...(ParentResults) == 0,
"A root task cannot have parent tasks");
1290 using type = decltype(std::declval<Functor>()());
1293 template<
typename Functor,
typename... ParentResults>
1295 static_assert(
sizeof...(ParentResults) > 0,
"An accept task must have at least one parent");
1296 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1299 template<
typename Functor,
typename ParentResultType>
1301 using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1304 template<
typename Functor,
typename... ParentResults>
1306 static_assert(
sizeof...(ParentResults) > 0,
"An accept_any task must have at least one parent");
1307 using arg_t =
typename std::tuple_element<0, std::tuple<ParentResults...>>::type;
1308 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1311 template<
typename Functor,
typename ParentResultType>
1313 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1316 template<
typename Functor,
typename... ParentResults>
1318 static_assert(
sizeof...(ParentResults) > 0,
"A consume task must have at least one parent");
1319 using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1322 template<
typename Functor,
typename ParentResultType>
1324 using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1327 template<
typename Functor,
typename... ParentResults>
1329 static_assert(
sizeof...(ParentResults) > 0,
"A consume_any task must have at least one parent");
1330 using arg_t =
typename std::tuple_element<0, std::tuple<ParentResults...>>::type;
1331 using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1334 template<
typename Functor,
typename ParentResultType>
1336 using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1339 template<
typename Functor,
typename... ParentResults>
1341 static_assert(
sizeof...(ParentResults) > 0,
"A wait task must have at least one parent");
1342 using type = decltype(std::declval<Functor>()());
1345 template<
typename Functor,
typename ParentResultType>
1347 using type = decltype(std::declval<Functor>()());
1350 template<
typename Functor,
typename... ParentResults>
1352 static_assert(
sizeof...(ParentResults) > 0,
"A wait_any task must have at least one parent");
1353 using type = decltype(std::declval<Functor>()());
1356 template<
typename Functor,
typename ParentResultType>
1358 using type = decltype(std::declval<Functor>()());
1361 template<
bool is_transwarp_functor>
1366 template<
typename Functor>
1367 void operator()(Functor&
functor, std::shared_ptr<transwarp::node>
node)
const noexcept {
1368 functor.transwarp_node_ = std::move(node);
1374 template<
typename Functor>
1375 void operator()(Functor&, std::shared_ptr<transwarp::node>)
const noexcept {}
1379 template<
typename Functor>
1385 template<
typename ResultType,
typename Value>
1387 std::promise<ResultType> promise;
1388 promise.set_value(std::forward<Value>(value));
1389 return promise.get_future();
1394 std::promise<void> promise;
1395 promise.set_value();
1396 return promise.get_future();
1400 template<
typename ResultType>
1405 std::promise<ResultType> promise;
1406 promise.set_exception(exception);
1407 return promise.get_future();
1412 template<
typename... ParentResults>
1414 using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1415 static std::size_t size(
const type&) {
1416 return std::tuple_size<type>::value;
1421 template<
typename ParentResultType>
1422 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1423 using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1424 static std::size_t size(
const type& obj) {
1430 template<
typename ResultType,
typename TaskType>
1434 template<
typename Task,
typename Parents>
1435 void call(std::size_t node_id,
1436 const std::weak_ptr<Task>&
task,
1438 promise_.set_value(transwarp::detail::call<TaskType, ResultType>(node_id, task, parents));
1441 std::promise<ResultType> promise_;
1444 template<
typename TaskType>
1448 template<
typename Task,
typename Parents>
1449 void call(std::size_t node_id,
1450 const std::weak_ptr<Task>&
task,
1452 transwarp::detail::call<TaskType, void>(node_id, task, parents);
1453 promise_.set_value();
1456 std::promise<void> promise_;
1460 template<
typename ResultType,
typename TaskType,
typename Task,
typename Parents>
1464 runner(std::size_t node_id,
1465 const std::weak_ptr<Task>&
task,
1466 const typename transwarp::decay<Parents>::type&
parents)
1467 : node_id_(node_id),
1472 std::future<ResultType> get_future() {
1473 return this->promise_.get_future();
1477 if (
const std::shared_ptr<Task> t = task_.lock()) {
1481 this->call(node_id_, task_, parents_);
1483 this->promise_.set_exception(std::current_exception());
1484 if (
const std::shared_ptr<Task> t = task_.lock()) {
1488 this->promise_.set_exception(std::current_exception());
1490 if (
const std::shared_ptr<Task> t = task_.lock()) {
1496 const std::size_t node_id_;
1497 const std::weak_ptr<Task> task_;
1498 const typename transwarp::decay<Parents>::type parents_;
1505 template<
typename ValueType>
1509 static_assert(std::is_default_constructible<ValueType>::value,
"ValueType must be default constructible");
1511 using value_type = ValueType;
1531 template<typename T, typename = typename std::enable_if<std::is_same<typename std::decay<T>::type, value_type>::value>::type>
1533 data_[end_] = std::forward<T>(value);
1540 return data_[front_];
1546 data_[front_] = ValueType{};
1553 return data_.size();
1569 return size_ == data_.size();
1574 std::swap(end_, buffer.end_);
1575 std::swap(front_, buffer.front_);
1576 std::swap(size_, buffer.size_);
1577 std::swap(data_, buffer.data_);
1582 void increment_or_wrap(std::size_t& value)
const {
1583 if (value == data_.size() - 1) {
1591 increment_or_wrap(end_);
1593 increment_or_wrap(front_);
1600 increment_or_wrap(front_);
1605 std::size_t front_{};
1606 std::size_t size_{};
1607 std::vector<value_type> data_;
1614 void lock() noexcept {
1615 while (locked_.test_and_set(std::memory_order_acquire));
1618 void unlock() noexcept {
1619 locked_.clear(std::memory_order_release);
1623 std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1632 void operator()()
const noexcept {}
1653 return "transwarp::sequential";
1657 void execute(
const std::function<
void()>&
functor,
const std::shared_ptr<transwarp::node>&)
override {
1667 explicit parallel(std::size_t n_threads)
1679 return "transwarp::parallel";
1683 void execute(
const std::function<
void()>&
functor,
const std::shared_ptr<transwarp::node>&)
override {
1697 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
1699 public std::enable_shared_from_this<task_impl_base<ResultType, TaskType, Functor, ParentResults...>> {
1705 using result_type = ResultType;
1714 executor_ = std::move(executor);
1715 transwarp::detail::node_manip::set_executor(*node_, std::shared_ptr<std::string>(
new std::string(executor_->get_name())));
1730 transwarp::detail::node_manip::set_executor(*node_,
nullptr);
1744 transwarp::detail::node_manip::set_priority(*node_, priority);
1758 transwarp::detail::node_manip::set_priority(*node_, 0);
1775 transwarp::detail::node_manip::set_custom_data(*node_, std::move(custom_data));
1789 transwarp::detail::node_manip::set_custom_data(*node_,
nullptr);
1800 const std::shared_future<result_type>&
get_future() const noexcept
override {
1805 const std::shared_ptr<transwarp::node>&
get_node() const noexcept
override {
1812 check_listener(listener);
1813 for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1814 l.push_back(listener);
1821 check_listener(listener);
1822 listeners_[get_event_index(event)].push_back(std::move(listener));
1842 check_listener(listener);
1843 for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1844 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1851 check_listener(listener);
1852 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_[get_event_index(event)];
1853 l.erase(std::remove(l.begin(), l.end(), listener), l.end());
1873 for (std::vector<std::shared_ptr<transwarp::listener>>& l : listeners_) {
1881 listeners_[get_event_index(event)].clear();
1903 this->schedule_impl(
true);
1912 this->schedule_impl(reset);
1920 this->schedule_impl(
true, &executor);
1929 this->schedule_impl(reset, &executor);
1971 schedule_all_impl(
true, type);
1979 schedule_all_impl(
true, type, &executor);
1988 schedule_all_impl(reset_all, type);
1997 schedule_all_impl(reset_all, type, &executor);
2004 future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2005 schedule_mode_ =
false;
2011 return future_.valid();
2025 return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2030 return was_scheduled() && future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2036 future_ = std::shared_future<result_type>();
2037 transwarp::detail::node_manip::set_canceled(*node_,
false);
2038 schedule_mode_ =
true;
2051 void cancel(
bool enabled) noexcept
override {
2052 transwarp::detail::node_manip::set_canceled(*node_, enabled);
2077 std::vector<transwarp::edge>
graph;
2086 template<
typename F>
2088 : node_(new transwarp::
node),
2089 functor_(std::forward<F>(
functor)),
2090 parents_(std::move(
parents)...)
2092 init(has_name, std::move(name));
2095 template<
typename F,
typename P>
2097 : node_(new transwarp::
node),
2098 functor_(std::forward<F>(
functor)),
2099 parents_(std::move(parents))
2101 if (parents_.empty()) {
2104 init(has_name, std::move(name));
2107 void init(
bool has_name, std::string name) {
2108 transwarp::detail::node_manip::set_type(*node_, task_type::value);
2109 transwarp::detail::node_manip::set_name(*node_, (has_name ? std::shared_ptr<std::string>(
new std::string(std::move(name))) :
nullptr));
2113 visit_depth(visitor);
2119 if (future_.valid() && future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
2126 if (!future_.valid()) {
2131 bool schedule_mode_ =
true;
2132 std::shared_future<result_type> future_;
2136 template<
typename R,
typename Y,
typename T,
typename P>
2139 template<
typename R,
typename T,
typename... A>
2143 void set_node_id(std::size_t
id) noexcept
override {
2144 transwarp::detail::node_manip::set_id(*node_,
id);
2152 if (schedule_mode_ && (reset || !future_.valid())) {
2154 transwarp::detail::node_manip::set_canceled(*node_,
false);
2156 std::weak_ptr<task_impl_base>
self = this->shared_from_this();
2158 std::shared_ptr<runner_t> runner = std::shared_ptr<runner_t>(
new runner_t(node_->get_id(),
self, parents_));
2160 future_ = runner->get_future();
2162 executor_->execute([runner]{ (*runner)(); }, node_);
2163 }
else if (executor) {
2164 executor->execute([runner]{ (*runner)(); }, node_);
2179 visit_breadth_all(visitor);
2182 visit_depth_all(visitor);
2190 std::vector<transwarp::itask*> tasks_in_depth_order()
const {
2191 std::vector<transwarp::itask*> tasks;
2193 const_cast<task_impl_base*
>(
this)->unvisit();
2198 template<
typename Visitor>
2199 void visit_all(Visitor& visitor) {
2200 if (!breadth_tasks_.empty()) {
2201 visit_breadth_all(visitor);
2202 }
else if (!depth_tasks_.empty()) {
2203 visit_depth_all(visitor);
2205 visit_breadth_all(visitor);
2210 template<
typename Visitor>
2211 void visit_breadth_all(Visitor& visitor) {
2212 if (breadth_tasks_.empty()) {
2213 breadth_tasks_ = tasks_in_depth_order();
2215 const std::size_t l_level = l->get_node()->get_level();
2216 const std::size_t l_id = l->get_node()->get_id();
2217 const std::size_t r_level = r->get_node()->get_level();
2218 const std::size_t r_id = r->get_node()->get_id();
2219 return std::tie(l_level, l_id) < std::tie(r_level, r_id);
2221 std::sort(breadth_tasks_.begin(), breadth_tasks_.end(), compare);
2229 template<
typename Visitor>
2230 void visit_depth_all(Visitor& visitor) {
2231 if (depth_tasks_.empty()) {
2232 depth_tasks_ = tasks_in_depth_order();
2240 void visit_depth(
const std::function<
void(
transwarp::itask&)>& visitor)
override {
2249 void unvisit() noexcept
override {
2258 const std::size_t index =
static_cast<std::size_t
>(event);
2259 if (index >= static_cast<std::size_t>(transwarp::event_type::count)) {
2267 for (
const std::shared_ptr<transwarp::listener>& listener : listeners_[static_cast<std::size_t>(event)]) {
2268 listener->handle_event(event, node_);
2273 void check_listener(
const std::shared_ptr<transwarp::listener>& listener)
const {
2279 std::size_t task_count_ = 0;
2280 std::shared_ptr<transwarp::node> node_;
2283 bool visited_ =
false;
2284 std::shared_ptr<transwarp::executor> executor_;
2285 std::vector<std::shared_ptr<transwarp::listener>> listeners_[
static_cast<std::size_t
>(transwarp::event_type::count)];
2286 std::vector<transwarp::itask*> depth_tasks_;
2287 std::vector<transwarp::itask*> breadth_tasks_;
2292 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2299 using result_type = ResultType;
2303 void set_value(
const typename transwarp::decay<result_type>::type& value)
override {
2305 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2306 this->schedule_mode_ =
false;
2311 void set_value(
typename transwarp::decay<result_type>::type&& value)
override {
2313 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2314 this->schedule_mode_ =
false;
2320 typename transwarp::result<result_type>::type
get()
const override {
2322 return this->future_.get();
2327 template<
typename F>
2332 template<
typename F,
typename P>
2334 : transwarp::detail::task_impl_base<
result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents))
2340 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2347 using result_type = ResultType&;
2351 void set_value(
typename transwarp::decay<result_type>::type& value)
override {
2352 this->ensure_task_not_running();
2353 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2354 this->schedule_mode_ =
false;
2360 typename transwarp::result<result_type>::type
get()
const override {
2361 this->ensure_task_was_scheduled();
2362 return this->future_.get();
2367 template<
typename F>
2372 template<
typename F,
typename P>
2374 : transwarp::detail::task_impl_base<result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents))
2380 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2387 using result_type = void;
2394 this->schedule_mode_ =
false;
2400 void get()
const override {
2402 this->future_.get();
2407 template<
typename F>
2412 template<
typename F,
typename P>
2414 : transwarp::detail::task_impl_base<
result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents))
2425 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2436 template<
typename F>
2438 : transwarp::detail::task_impl_proxy<
result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents)...)
2443 template<
typename F,
typename P>
2445 : transwarp::detail::task_impl_proxy<
result_type,
task_type, Functor, ParentResults...>(has_name, std::move(name), std::forward<F>(
functor), std::move(parents))
2455 template<
typename TaskType_,
typename Functor_>
2456 std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type,
result_type>>
2459 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Functor_>(
functor), std::dynamic_pointer_cast<
transwarp::task<result_type>>(const_cast<task_impl*>(
this)->shared_from_this())));
2463 template<
typename TaskType_,
typename Functor_>
2464 std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type,
result_type>>
2467 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Functor_>(
functor), std::dynamic_pointer_cast<
transwarp::task<result_type>>(const_cast<task_impl*>(
this)->shared_from_this())));
2475 template<
typename ResultType>
2477 public std::enable_shared_from_this<value_task<ResultType>> {
2487 template<
typename T>
2489 : node_(new transwarp::
node),
2490 future_(transwarp::detail::make_future_with_value<
result_type>(std::forward<T>(value)))
2492 transwarp::detail::node_manip::set_type(*node_, task_type::value);
2493 transwarp::detail::node_manip::set_name(*node_, (has_name ? std::shared_ptr<std::string>(
new std::string(std::move(name))) :
nullptr));
2503 template<
typename TaskType_,
typename Functor_>
2504 std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type,
result_type>>
2507 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Functor_>(
functor), std::dynamic_pointer_cast<
transwarp::task<result_type>>(const_cast<value_task*>(
this)->shared_from_this())));
2511 template<
typename TaskType_,
typename Functor_>
2512 std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type,
result_type>>
2515 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Functor_>(
functor), std::dynamic_pointer_cast<
transwarp::task<result_type>>(const_cast<value_task*>(
this)->shared_from_this())));
2533 transwarp::detail::node_manip::set_priority(*node_, priority);
2544 transwarp::detail::node_manip::set_priority(*node_, 0);
2558 transwarp::detail::node_manip::set_custom_data(*node_, std::move(custom_data));
2569 transwarp::detail::node_manip::set_custom_data(*node_,
nullptr);
2578 const std::shared_future<result_type>&
get_future() const noexcept
override {
2583 const std::shared_ptr<transwarp::node>&
get_node() const noexcept
override {
2660 void set_value(
const typename transwarp::decay<result_type>::type& value)
override {
2661 future_ = transwarp::detail::make_future_with_value<result_type>(value);
2665 void set_value(
typename transwarp::decay<result_type>::type&& value)
override {
2666 future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2671 future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2693 typename transwarp::result<result_type>::type
get()
const override {
2694 return future_.get();
2727 void set_node_id(std::size_t
id) noexcept
override {
2728 transwarp::detail::node_manip::set_id(*node_,
id);
2735 void visit_depth(
const std::function<
void(
transwarp::itask&)>& visitor)
override {
2743 void unvisit() noexcept
override {
2747 std::shared_ptr<transwarp::node> node_;
2748 std::shared_future<result_type> future_;
2749 bool visited_ =
false;
2754 template<
typename TaskType,
typename Functor,
typename... Parents>
2755 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type,
typename Parents::result_type...>>
2756 make_task(TaskType, std::string name, Functor&&
functor, std::shared_ptr<Parents>... parents) {
2758 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Functor>(
functor), std::move(parents)...));
2762 template<
typename TaskType,
typename Functor,
typename... Parents>
2763 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type,
typename Parents::result_type...>>
2766 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Functor>(
functor), std::move(parents)...));
2771 template<
typename TaskType,
typename Functor,
typename ParentType>
2772 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>>
2775 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Functor>(
functor), std::move(parents)));
2779 template<
typename TaskType,
typename Functor,
typename ParentType>
2780 std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>>
2783 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Functor>(
functor), std::move(parents)));
2788 template<
typename Value>
2789 std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>>
2792 return std::shared_ptr<task_t>(
new task_t(
true, std::move(name), std::forward<Value>(value)));
2796 template<
typename Value>
2797 std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>>
2800 return std::shared_ptr<task_t>(
new task_t(
false,
"", std::forward<Value>(value)));
2807 template<
typename InputIt,
typename UnaryOperation>
2808 std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>>
2809 for_each(InputIt first, InputIt last, UnaryOperation unary_op) {
2810 const auto distance = std::distance(first, last);
2811 if (distance <= 0) {
2814 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
2815 tasks.reserve(static_cast<std::size_t>(distance));
2816 for (; first != last; ++first) {
2826 template<
typename InputIt,
typename UnaryOperation>
2827 std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>>
2830 task->schedule_all(executor);
2838 template<
typename InputIt,
typename OutputIt,
typename UnaryOperation>
2839 std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>>
2840 transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) {
2841 const auto distance = std::distance(first1, last1);
2842 if (distance <= 0) {
2845 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
2846 tasks.reserve(static_cast<std::size_t>(distance));
2847 for (; first1 != last1; ++first1, ++d_first) {
2857 template<
typename InputIt,
typename OutputIt,
typename UnaryOperation>
2858 std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>>
2861 task->schedule_all(executor);
2867 template<
typename ResultType>
2870 using result_type = ResultType;
2872 virtual ~
graph() =
default;
2875 virtual const std::shared_ptr<transwarp::task<result_type>>&
final_task()
const = 0;
2881 template<
typename Graph>
2886 "Graph must be a sub-class of transwarp::graph");
2894 : generator_(std::move(generator)),
2895 minimum_(minimum_size),
2896 maximum_(maximum_size),
2897 finished_(maximum_size)
2902 if (minimum_ > maximum_) {
2905 for (std::size_t i=0; i<minimum_; ++i) {
2906 idle_.push(generate());
2921 std::shared_ptr<transwarp::node> finished_node;
2923 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
2924 if (!finished_.empty()) {
2925 finished_node = finished_.front(); finished_.pop();
2929 std::shared_ptr<Graph> g;
2930 if (finished_node) {
2931 g = busy_.find(finished_node)->second;
2933 if (maybe_resize && idle_.empty()) {
2936 if (idle_.empty()) {
2939 g = idle_.front(); idle_.pop();
2940 busy_.emplace(g->final_task()->get_node(), g);
2943 const auto& future = g->final_task()->get_future();
2944 if (future.valid()) {
2963 return idle_.size() + busy_.size();
2978 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
2979 return idle_.size() + finished_.size();
2984 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
2985 return busy_.size() - finished_.size();
2991 if (new_size >
size()) {
2992 const std::size_t count = new_size -
size();
2993 for (std::size_t i=0; i<count; ++i) {
2994 if (
size() == maximum_) {
2997 idle_.push(generate());
2999 }
else if (new_size <
size()) {
3000 const std::size_t count =
size() - new_size;
3001 for (std::size_t i=0; i<count; ++i) {
3002 if (idle_.empty() ||
size() == minimum_) {
3012 decltype(finished_) finished{finished_.capacity()};
3014 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3015 finished_.swap(finished);
3017 while (!finished.empty()) {
3018 const std::shared_ptr<transwarp::node>
node = finished.front(); finished.pop();
3019 const auto it = busy_.find(node);
3020 idle_.push(it->second);
3037 std::lock_guard<transwarp::detail::spinlock> lock(pool_.spinlock_);
3038 pool_.finished_.push(node);
3042 graph_pool<Graph>& pool_;
3045 std::shared_ptr<Graph> generate() {
3046 std::shared_ptr<Graph> graph = generator_();
3051 std::function<std::shared_ptr<Graph>()> generator_;
3052 std::size_t minimum_;
3053 std::size_t maximum_;
3056 std::queue<std::shared_ptr<Graph>> idle_;
3057 std::unordered_map<std::shared_ptr<transwarp::node>, std::shared_ptr<Graph>> busy_;
3058 std::shared_ptr<transwarp::listener> listener_{
new finished_listener(*
this)};
3080 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3081 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3082 auto& track = tracks_[node];
3083 track.startidle = now;
3087 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3088 track_idletime(node, now);
3089 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3090 auto& track = tracks_[node];
3091 track.startwait = now;
3095 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3096 track_waittime(node, now);
3100 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3101 track_waittime(node, now);
3102 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3103 auto& track = tracks_[node];
3104 track.running =
true;
3105 track.startrun = now;
3109 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3110 track_runtime(node, now);
3119 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3125 void track_idletime(
const std::shared_ptr<transwarp::node>&
node,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3126 std::int64_t avg_idletime_us;
3128 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3129 auto& track = tracks_[node];
3130 track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3132 avg_idletime_us =
static_cast<std::int64_t
>(track.idletime / track.idlecount);
3134 transwarp::detail::node_manip::set_avg_idletime_us(*node, avg_idletime_us);
3137 void track_waittime(
const std::shared_ptr<transwarp::node>& node,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3138 std::int64_t avg_waittime_us;
3140 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3141 auto& track = tracks_[node];
3142 track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3144 avg_waittime_us =
static_cast<std::int64_t
>(track.waittime / track.waitcount);
3146 transwarp::detail::node_manip::set_avg_waittime_us(*node, avg_waittime_us);
3149 void track_runtime(
const std::shared_ptr<transwarp::node>& node,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3150 std::int64_t avg_runtime_us;
3152 std::lock_guard<transwarp::detail::spinlock> lock(spinlock_);
3153 auto& track = tracks_[node];
3154 if (!track.running) {
3157 track.running =
false;
3158 track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3160 avg_runtime_us =
static_cast<std::int64_t
>(track.runtime / track.runcount);
3162 transwarp::detail::node_manip::set_avg_runtime_us(*node, avg_runtime_us);
3166 bool running =
false;
3167 std::chrono::time_point<std::chrono::steady_clock> startidle;
3168 std::chrono::time_point<std::chrono::steady_clock> startwait;
3169 std::chrono::time_point<std::chrono::steady_clock> startrun;
3170 std::chrono::microseconds::rep idletime = 0;
3171 std::chrono::microseconds::rep idlecount = 0;
3172 std::chrono::microseconds::rep waittime = 0;
3173 std::chrono::microseconds::rep waitcount = 0;
3174 std::chrono::microseconds::rep runtime = 0;
3175 std::chrono::microseconds::rep runcount = 0;
3179 std::unordered_map<std::shared_ptr<transwarp::node>, track> tracks_;
void set_custom_data_all(std::shared_ptr< void > custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1780
constexpr transwarp::wait_any_type wait_any
The wait_any task tag.
Definition: transwarp.h:128
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2573
The executor interface used to perform custom task execution.
Definition: transwarp.h:312
Exception thrown when a task was destroyed prematurely.
Definition: transwarp.h:61
Adds a new listener to the given task.
Definition: transwarp.h:1179
A simple thread pool used to execute tasks in parallel.
Definition: transwarp.h:579
void cancel_all(bool) noexceptoverride
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2707
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:1857
void operator()(const transwarp::itask &task) const
Definition: transwarp.h:1033
Node manipulation.
Definition: transwarp.h:517
Removes a listener from the given task.
Definition: transwarp.h:1206
TaskType task_type
The task type.
Definition: transwarp.h:2344
void remove_listeners(transwarp::event_type) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2615
Generates a graph.
Definition: transwarp.h:1059
The consume type. Used for tag dispatch.
Definition: transwarp.h:115
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1276
void schedule_all(transwarp::executor &) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2639
Removes the executor from the given task.
Definition: transwarp.h:1119
Definition: transwarp.h:980
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:1762
Definition: transwarp.h:140
Just after a task was canceled (handle_event called on thread that task is run on) ...
TaskType task_type
The task type.
Definition: transwarp.h:2384
A callable to run a task given its parents.
Definition: transwarp.h:1461
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task.
Definition: transwarp.h:2670
void schedule_all(bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1952
Adds a new listener per event type to the given task.
Definition: transwarp.h:1192
void ensure_task_was_scheduled() const
Checks if the task was scheduled and throws transwarp::control_error if it's not. ...
Definition: transwarp.h:2125
Sets parents and level of the node.
Definition: transwarp.h:1029
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, std::string name, Functor_ &&functor) const
Creates a continuation to this task.
Definition: transwarp.h:2457
std::shared_ptr< transwarp::value_task< typename transwarp::decay< Value >::type > > make_value_task(std::string name, Value &&value)
A factory function to create a new value task.
Definition: transwarp.h:2790
std::size_t get_parent_count() const noexceptoverride
Returns the number of direct parents of this task.
Definition: transwarp.h:2710
void reset()
Resets all timing information.
Definition: transwarp.h:3118
Definition: transwarp.h:1431
void set_executor(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs.
Definition: transwarp.h:2519
constexpr no_op_functor no_op
An object to use in places where a no-op functor is required.
Definition: transwarp.h:1636
void set_executor_all(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to all tasks which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1720
bool was_scheduled() const noexceptoverride
Returns whether the task was scheduled and not reset afterwards. This means that the underlying futur...
Definition: transwarp.h:2010
void cancel(bool) noexceptoverride
No-op because a value task never runs.
Definition: transwarp.h:2704
Just before a task is scheduled (handle_event called on thread of caller to schedule()) ...
void schedule_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2636
void schedule_all(transwarp::executor &executor, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1961
std::size_t minimum_size() const
Returns the minimum size of the pool.
Definition: transwarp.h:2967
std::string get_name() const override
Returns the name of the executor.
Definition: transwarp.h:1678
void add_listener_all(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2594
Assigns a priority to the given task.
Definition: transwarp.h:1127
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task.
Definition: transwarp.h:2660
void push(T &&value)
Pushes a new value onto the end of the buffer. If that exceeds the capacity of the buffer then the ol...
Definition: transwarp.h:1532
void schedule(transwarp::executor &executor) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1918
Assigns an executor to the given task.
Definition: transwarp.h:1107
std::size_t capacity() const
Returns the capacity of the buffer.
Definition: transwarp.h:1552
A simple circular buffer (FIFO). ValueType must support default construction. The buffer lets you pus...
Definition: transwarp.h:1506
void set_priority(std::size_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:2532
void cancel_all(bool enabled) noexceptoverride
If enabled then all pending tasks in the graph are canceled which will throw transwarp::task_canceled...
Definition: transwarp.h:2058
std::size_t get_parent_count() const noexceptoverride
Returns the number of direct parents of this task.
Definition: transwarp.h:2064
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:1787
void set_custom_data_all(std::shared_ptr< void > custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2563
std::tuple< std::shared_future< ParentResults >...> get_futures(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &input)
Returns the futures from the given tuple of tasks.
Definition: transwarp.h:680
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task.
Definition: transwarp.h:2665
void schedule_all(transwarp::schedule_type type, bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1986
Scheduling according to a breadth-first search (default)
void remove_listeners_all(transwarp::event_type) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2621
void schedule_all() override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1935
void set_executor(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to this task which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:1709
value_task(bool has_name, std::string name, T &&value)
A value task is defined by name and value. Note: Don't use this constructor directly, use transwarp::make_value_task.
Definition: transwarp.h:2488
void set_custom_data(std::shared_ptr< void > custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2554
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:1864
The task class.
Definition: transwarp.h:443
Definition: transwarp.h:1611
A functor not doing nothing.
Definition: transwarp.h:1631
Definition: transwarp.h:991
void schedule_all(transwarp::executor &executor, transwarp::schedule_type type, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1995
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2548
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2303
void set_priority(std::size_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:1742
const std::shared_ptr< std::string > & get_executor() const noexcept
The optional, task-specific executor (may be null)
Definition: transwarp.h:178
void assign_node_if(Functor &functor, std::shared_ptr< transwarp::node > node) noexcept
Assigns the node to the given functor if the functor is a subclass of transwarp::functor.
Definition: transwarp.h:1380
void reset() override
Resets this task.
Definition: transwarp.h:2034
const std::vector< std::shared_ptr< node > > & get_parents() const noexcept
The task's parents (may be empty)
Definition: transwarp.h:183
Just before a task's functor is invoked (handle_event called on thread that task is run on) ...
Definition: transwarp.h:978
The task's functor accepts the first parent future that becomes ready.
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, Functor_ &&functor) const
Creates a continuation to this task. Overload for omitting for task name.
Definition: transwarp.h:2465
The consume_any type. Used for tag dispatch.
Definition: transwarp.h:119
ResultType result_type
The result type of this task.
Definition: transwarp.h:2299
void set_custom_data(std::shared_ptr< void > custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:1770
bool was_scheduled() const noexceptoverride
Returns true because a value task is scheduled once on construction.
Definition: transwarp.h:2675
std::size_t get_task_count() const noexceptoverride
Returns the number of tasks in the graph.
Definition: transwarp.h:2069
Unvisits the given task.
Definition: transwarp.h:1267
void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &) override
Pushes the functor into the thread pool for asynchronous execution.
Definition: transwarp.h:1683
void wait_for_all(const std::vector< std::shared_ptr< transwarp::task< ParentResultType >>> &parents)
Waits for all parents to finish.
Definition: transwarp.h:725
void remove_listeners_all() override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2618
The accept type. Used for tag dispatch.
Definition: transwarp.h:107
void add_listener_all(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2597
const std::shared_future< result_type > & get_future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:1800
constexpr transwarp::accept_type accept
The accept task tag.
Definition: transwarp.h:108
TaskType task_type
The task type.
Definition: transwarp.h:1702
void remove_listener_all(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2609
void schedule_all(transwarp::executor &, transwarp::schedule_type, bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2657
void remove_listeners() override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2612
bool has_result() const noexceptoverride
Returns whether this task contains a result.
Definition: transwarp.h:2029
std::size_t maximum_size() const
Returns the maximum size of the pool.
Definition: transwarp.h:2972
Removes all listeners from the given task.
Definition: transwarp.h:1233
The listener interface to listen to events raised by tasks.
Definition: transwarp.h:340
std::size_t get_task_count() const noexceptoverride
Returns the number of tasks in the graph.
Definition: transwarp.h:2715
Exception thrown when a task is canceled.
Definition: transwarp.h:52
TaskType task_type
The task type.
Definition: transwarp.h:2429
void wait() const override
Waits for the task to complete. Should only be called if was_scheduled() is true, throws transwarp::c...
Definition: transwarp.h:2016
Scheduling according to a depth-first search.
void add_listener(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only.
Definition: transwarp.h:1819
std::size_t get_level() const noexcept
The task level.
Definition: transwarp.h:163
std::shared_future< ResultType > make_future_with_value(Value &&value)
Returns a ready future with the given value as its state.
Definition: transwarp.h:1386
std::string get_name() const override
Returns the name of the executor.
Definition: transwarp.h:1652
void schedule() override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1901
void add_listener(std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2588
void add_listener(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types.
Definition: transwarp.h:1810
std::shared_ptr< Graph > next_idle_graph(bool maybe_resize=true)
Returns the next idle graph. If there are no idle graphs then it will attempt to double the pool size...
Definition: transwarp.h:2920
void ensure_task_not_running() const
Checks if the task is currently running and throws transwarp::control_error if it is...
Definition: transwarp.h:2118
Just before a task starts running (handle_event called on thread that task is run on) ...
Removes custom data from the given task.
Definition: transwarp.h:1159
void schedule_all(transwarp::executor &, bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2645
Resets the given task.
Definition: transwarp.h:1087
const std::shared_ptr< std::string > & get_name() const noexcept
The optional task name (may be null)
Definition: transwarp.h:173
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:1892
bool full() const
Returns whether the buffer is full.
Definition: transwarp.h:1568
A base class for a user-defined functor that needs access to the node associated to the task or a can...
Definition: transwarp.h:484
constexpr transwarp::consume_any_type consume_any
The consume_any task tag.
Definition: transwarp.h:120
Removes a listener per event type from the given task.
Definition: transwarp.h:1219
bool is_canceled() const noexcept
Returns whether the associated task is canceled.
Definition: transwarp.h:198
A node carrying meta-data of a task.
Definition: transwarp.h:146
std::shared_ptr< transwarp::task_impl< transwarp::wait_type, transwarp::no_op_functor, std::vector< std::shared_ptr< transwarp::task< void > > > > > for_each(InputIt first, InputIt last, UnaryOperation unary_op)
A function similar to std::for_each but returning a transwarp task for deferred, possibly asynchronou...
Definition: transwarp.h:2809
Exception thrown when a task is used in unintended ways.
Definition: transwarp.h:79
void resize(std::size_t new_size)
Resizes the graph pool to the given new size if possible.
Definition: transwarp.h:2989
Resets the priority of the given task.
Definition: transwarp.h:1139
void schedule() override
No-op because a value task never runs.
Definition: transwarp.h:2624
std::size_t size() const
Returns the number of populated values of the buffer. Its maximum value equals the capacity of the bu...
Definition: transwarp.h:1558
Returns the result type of a std::shared_future<T>
Definition: transwarp.h:436
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:1826
void add_listener(transwarp::event_type, std::shared_ptr< transwarp::listener >) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2591
void wait() const override
No-op because a value task never runs.
Definition: transwarp.h:2680
void schedule_all(transwarp::executor &executor, transwarp::schedule_type type) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1977
std::size_t get_priority() const noexcept
The task priority (defaults to 0)
Definition: transwarp.h:188
const std::shared_ptr< transwarp::node > & get_parent() const noexcept
Returns the parent node.
Definition: transwarp.h:280
A task proxy.
Definition: transwarp.h:2293
void transwarp_cancel_point() const
If the associated task is canceled then this will throw transwarp::task_canceled which will stop the ...
Definition: transwarp.h:498
std::shared_ptr< transwarp::task_impl< transwarp::wait_type, transwarp::no_op_functor, std::vector< std::shared_ptr< transwarp::task< void > > > > > transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op)
A function similar to std::transform but returning a transwarp task for deferred, possibly asynchrono...
Definition: transwarp.h:2840
The base task class that contains the functionality that can be used with all result types (void and ...
Definition: transwarp.h:1698
Definition: transwarp.h:666
The task's functor consumes all parent results.
std::shared_ptr< Graph > wait_for_next_idle_graph(bool maybe_resize=true)
Just like next_idle_graph() but waits for a graph to become available. The returned graph will always...
Definition: transwarp.h:2952
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:1885
Just after a task has finished running (handle_event called on thread that task is run on) ...
void remove_listener(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2600
void set_priority_all(std::size_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority.
Definition: transwarp.h:2538
Base class for exceptions.
Definition: transwarp.h:43
void schedule_all(transwarp::schedule_type) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2648
std::size_t size() const
Returns the current total size of the pool (sum of idle and busy graphs)
Definition: transwarp.h:2962
void reclaim()
Reclaims finished graphs by marking them as idle again.
Definition: transwarp.h:3011
constexpr transwarp::consume_type consume
The consume task tag.
Definition: transwarp.h:116
std::size_t get_id() const noexcept
The task ID.
Definition: transwarp.h:158
void schedule(transwarp::executor &, bool) override
No-op because a value task never runs.
Definition: transwarp.h:2633
A value task that stores a single value and doesn't require scheduling. Value tasks should be created...
Definition: transwarp.h:2476
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:2543
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:1833
void remove_listeners(transwarp::event_type event) override
Removes all listeners for the given event type.
Definition: transwarp.h:1879
void remove_listener(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types.
Definition: transwarp.h:1840
Determines the type of the parents.
Definition: transwarp.h:1413
const std::shared_ptr< transwarp::node > & transwarp_node() const noexcept
The node associated to the task.
Definition: transwarp.h:492
Parent wait_for_any(const std::shared_ptr< transwarp::task< ParentResults >> &...parents)
Waits for the first parent to finish.
Definition: transwarp.h:748
void reset_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2701
void remove_listener(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only.
Definition: transwarp.h:1849
Assigns custom data to the given task.
Definition: transwarp.h:1147
The task's functor takes no arguments but waits for all parents to finish.
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task. Scheduling will have no effect after an exception has been set...
Definition: transwarp.h:2002
void cancel_all_but_one(const std::shared_ptr< transwarp::task< OneResult >> &one, const std::vector< std::shared_ptr< transwarp::task< ParentResultType >>> &parents)
Cancels all tasks but one.
Definition: transwarp.h:787
Definition: transwarp.h:806
void handle_event(transwarp::event_type event, const std::shared_ptr< transwarp::node > &node) override
Performs the actual timing and populates the node's timing members.
Definition: transwarp.h:3077
Removes reference and const from a type.
Definition: transwarp.h:429
void remove_listeners() override
Removes all listeners.
Definition: transwarp.h:1871
const std::shared_ptr< void > & get_custom_data() const noexcept
The custom task data (may be null)
Definition: transwarp.h:193
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, std::string name, Functor_ &&functor) const
Creates a continuation to this task.
Definition: transwarp.h:2505
void call_with_each(const Functor &f, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &t)
Calls the functor with every element in the tuple.
Definition: transwarp.h:1010
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:1756
void schedule_all(transwarp::executor &, transwarp::schedule_type) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2651
void schedule_all(transwarp::schedule_type, bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2654
An edge between two nodes.
Definition: transwarp.h:267
const std::shared_ptr< transwarp::node > & get_child() const noexcept
Returns the child node.
Definition: transwarp.h:285
std::int64_t get_avg_runtime_us() const noexcept
Returns the average runtime in microseconds (-1 if never set)
Definition: transwarp.h:213
A timer that tracks the average idle, wait, and run time of each task it listens to.
Definition: transwarp.h:3066
void schedule_all(transwarp::schedule_type type) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:1969
std::vector< transwarp::edge > get_graph() const override
Returns the graph of the task structure. This is mainly for visualizing the tasks and their interdepe...
Definition: transwarp.h:2076
graph_pool(std::function< std::shared_ptr< Graph >()> generator, std::size_t minimum_size, std::size_t maximum_size)
Constructs a graph pool by passing a generator to create a new graph and a minimum and maximum size o...
Definition: transwarp.h:2891
void set_value(typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2351
void schedule(transwarp::executor &) override
No-op because a value task never runs.
Definition: transwarp.h:2627
void reset_all() override
Resets all tasks in the graph.
Definition: transwarp.h:2042
std::int64_t get_avg_waittime_us() const noexcept
Returns the average waittime in microseconds (-1 if never set)
Definition: transwarp.h:208
A graph interface giving access to the final task as required by transwarp::graph_pool.
Definition: transwarp.h:2868
std::size_t busy_count() const
Returns the number of busy graphs in the pool.
Definition: transwarp.h:2983
circular_buffer(std::size_t capacity)
Constructs a circular buffer with a given fixed capacity.
Definition: transwarp.h:1515
Definition: transwarp.h:797
ResultType result_type
The result type of this task.
Definition: transwarp.h:2483
Executor for parallel execution. Uses a simple thread pool.
Definition: transwarp.h:1664
The wait_any type. Used for tag dispatch.
Definition: transwarp.h:127
void set_priority_all(std::size_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. This is only useful if something else is using the priority (e.g. a custom executor)
Definition: transwarp.h:1749
void schedule(bool reset) override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:1910
virtual std::string get_name() const =0
Returns the name of the executor.
std::string to_string(const transwarp::task_type &type)
String conversion for the task_type enumeration.
Definition: transwarp.h:88
virtual void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &node)=0
Runs a task which is wrapped by the given functor. The functor only captures one shared pointer and c...
void execute(const std::function< void()> &functor, const std::shared_ptr< transwarp::node > &) override
Runs the functor on the current thread.
Definition: transwarp.h:1657
void remove_executor() override
Removes the executor from this task.
Definition: transwarp.h:1727
The accept_any type. Used for tag dispatch.
Definition: transwarp.h:111
transwarp::task_type get_type() const noexcept
The task type.
Definition: transwarp.h:168
std::shared_ptr< transwarp::task_impl< TaskType, typename std::decay< Functor >::type, typename Parents::result_type...> > make_task(TaskType, std::string name, Functor &&functor, std::shared_ptr< Parents >...parents)
A factory function to create a new task.
Definition: transwarp.h:2756
virtual void handle_event(transwarp::event_type event, const std::shared_ptr< transwarp::node > &node)=0
This may be called from arbitrary threads depending on the event type (see transwarp::event_type). The implementer needs to ensure that this never throws exceptions.
bool is_ready() const override
Returns true because a value task is always ready.
Definition: transwarp.h:2683
constexpr transwarp::wait_type wait
The wait task tag.
Definition: transwarp.h:124
The root type. Used for tag dispatch.
Definition: transwarp.h:103
The task's functor consumes the first parent result that becomes ready.
A task representing a piece of work given by functor and parent tasks. By connecting tasks a directed...
Definition: transwarp.h:2426
void remove_listener(transwarp::event_type, const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2603
The wait type. Used for tag dispatch.
Definition: transwarp.h:123
void cancel(bool enabled) noexceptoverride
If enabled then this task is canceled which will throw transwarp::task_canceled when retrieving the t...
Definition: transwarp.h:2051
Schedules using the given executor.
Definition: transwarp.h:1074
void swap(circular_buffer &buffer)
Swaps this buffer with the given buffer.
Definition: transwarp.h:1573
Result call(std::size_t node_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Calls the functor of the given task with the results from the tuple of parents. Throws transwarp::tas...
Definition: transwarp.h:963
Visits the given task using the visitor given in the constructor.
Definition: transwarp.h:1255
constexpr transwarp::root_type root
The root task tag.
Definition: transwarp.h:104
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:2568
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set...
Definition: transwarp.h:2311
void schedule_all(transwarp::executor &executor) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:1943
const value_type & front() const
Returns the value at the front of the buffer (the oldest value). This is undefined if the buffer is e...
Definition: transwarp.h:1539
static Result work(std::size_t node_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>...> &parents)
Definition: transwarp.h:890
schedule_type
Determines in which order tasks are scheduled in the graph.
Definition: transwarp.h:351
The task's functor accepts all parent futures.
A graph pool that allows running multiple instances of the same graph in parallel. Graph must be a sub-class of transwarp::graph.
Definition: transwarp.h:2882
Applies final bookkeeping to the task.
Definition: transwarp.h:1045
void remove_executor_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2528
typename transwarp::detail::functor_result< TaskType, Functor, ParentResults...>::type result_type
The result type of this task.
Definition: transwarp.h:2432
void schedule(transwarp::executor &executor, bool reset) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:1927
std::vector< transwarp::edge > get_graph() const override
Returns an empty graph because a value task doesn't have parents.
Definition: transwarp.h:2720
const std::shared_ptr< transwarp::node > & get_node() const noexceptoverride
Returns the associated node.
Definition: transwarp.h:1805
task_impl(bool has_name, std::string name, F &&functor, std::vector< std::shared_ptr< transwarp::task< P >>> parents)
A task is defined by name, functor, and parent tasks. Note: Don't use this constructor directly...
Definition: transwarp.h:2444
task_type
The possible task types.
Definition: transwarp.h:31
void set_value() override
Assigns a value to this task. Scheduling will have no effect after a call to this. Calling reset() will reset this and re-enable scheduling.
Definition: transwarp.h:2391
std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type > > then(TaskType_, Functor_ &&functor) const
Creates a continuation to this task. Overload for omitting the task name.
Definition: transwarp.h:2513
void pop()
Removes the value at the front of the buffer (the oldest value)
Definition: transwarp.h:1544
event_type
The task events that can be subscribed to using the listener interface.
Definition: transwarp.h:329
Removes all listeners per event type from the given task.
Definition: transwarp.h:1242
std::int64_t get_avg_idletime_us() const noexcept
Returns the average idletime in microseconds (-1 if never set)
Definition: transwarp.h:203
The task's functor takes no arguments but waits for the first parent to finish.
Exception thrown when an invalid parameter was passed to a function.
Definition: transwarp.h:70
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:1793
void reset() override
No-op because a value task never runs.
Definition: transwarp.h:2698
std::shared_future< void > make_ready_future()
Returns a ready future.
Definition: transwarp.h:1393
void set_executor_all(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2522
constexpr transwarp::accept_any_type accept_any
The accept_any task tag.
Definition: transwarp.h:112
bool empty() const
Returns whether the buffer is empty.
Definition: transwarp.h:1563
const std::shared_future< result_type > & get_future() const noexceptoverride
Returns the future associated to the underlying execution.
Definition: transwarp.h:2578
Cancels or resumes the given task.
Definition: transwarp.h:1095
void remove_listener_all(const std::shared_ptr< transwarp::listener > &) override
No-op because a value task doesn't raise events.
Definition: transwarp.h:2606
std::size_t idle_count() const
Returns the number of idle graphs in the pool.
Definition: transwarp.h:2977
void remove_executor() override
No-op because a value task never runs.
Definition: transwarp.h:2525
void schedule(bool) override
No-op because a value task never runs.
Definition: transwarp.h:2630
virtual const std::shared_ptr< transwarp::task< result_type > > & final_task() const =0
Returns the final task of the graph.
void schedule_all(bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:2642
Pushes the given task into the vector of tasks.
Definition: transwarp.h:1167
Executor for sequential execution. Runs functors sequentially on the same thread. ...
Definition: transwarp.h:1640
bool has_result() const noexceptoverride
Returns true because a value task always contains a result.
Definition: transwarp.h:2688
task_impl(bool has_name, std::string name, F &&functor, std::shared_ptr< transwarp::task< ParentResults >>...parents)
A task is defined by name, functor, and parent tasks. Note: Don't use this constructor directly...
Definition: transwarp.h:2437
std::shared_future< ResultType > make_future_with_exception(std::exception_ptr exception)
Returns a ready future with the given exception as its state.
Definition: transwarp.h:1401
void remove_executor_all() override
Removes the executor from all tasks.
Definition: transwarp.h:1734
const std::shared_ptr< transwarp::node > & get_node() const noexceptoverride
Returns the associated node.
Definition: transwarp.h:2583
bool is_ready() const override
Returns whether the task has finished processing. Should only be called if was_scheduled() is true...
Definition: transwarp.h:2023
An interface for the task class.
Definition: transwarp.h:358
Result run_task(std::size_t node_id, const std::weak_ptr< Task > &task, Args &&...args)
Runs the task with the given arguments, hence, invoking the task's functor.
Definition: transwarp.h:700