27 #include <condition_variable> 30 #include <unordered_set> 31 #include <unordered_map> 45 template <
typename Closure>
50 std::optional<Closure> task;
88 template <
typename... ArgsT>
98 size_t num_tasks()
const;
112 bool _exiting {
false};
114 auto _this_worker()
const;
117 void _spawn(
unsigned);
122 template <
typename Closure>
129 template <
typename Closure>
135 template <
typename Closure>
141 template <
typename Closure>
143 return _tasks.size();
147 template <
typename Closure>
149 return _threads.size();
153 template <
typename Closure>
156 return _worker_maps.find(
id);
160 template <
typename Closure>
170 for(
auto w : _idlers){
172 w->task = std::nullopt;
178 for(
auto& t : _threads){
184 _worker_maps.clear();
190 template <
typename Closure>
193 assert(
is_owner() && _workers.size() == N);
196 std::scoped_lock lock(_mutex);
198 for(
size_t i=0; i<N; ++i){
200 _threads.emplace_back([
this, &w=_workers[i]]() ->
void {
202 std::optional<Closure> t;
209 _idlers.push_back(&w);
215 t = std::move(w.task);
216 w.task = std::nullopt;
219 t = std::move(_tasks.back());
229 t = std::move(w.task);
230 w.task = std::nullopt;
241 _worker_maps[_threads[i].get_id()] = &_workers[i];
245 template <
typename Closure>
246 template <
typename... ArgsT>
251 Closure{std::forward<ArgsT>(args)...}();
259 auto iter = _worker_maps.find(tid);
260 if(iter != _worker_maps.end() && !(iter->second->task)){
261 iter->second->task.emplace(std::forward<ArgsT>(args)...);
266 std::scoped_lock lock(_mutex);
268 _tasks.emplace_back(std::forward<ArgsT>(args)...);
271 Worker* w = _idlers.back();
274 w->task.emplace(std::forward<ArgsT>(args)...);
280 template <
typename Closure>
299 auto iter = _this_worker();
300 if(iter != _worker_maps.end() && !(iter->second->task)){
301 iter->second->task.emplace(std::move(tasks[consumed++]));
302 if(tasks.size() == consumed) {
308 std::scoped_lock lock(_mutex);
309 while(!_idlers.empty() && tasks.size() != consumed) {
310 Worker* w = _idlers.back();
313 w->task.emplace(std::move(tasks[consumed ++]));
317 if(tasks.size() == consumed)
return ;
318 _tasks.reserve(_tasks.size() + tasks.size() - consumed);
319 std::move(tasks.begin()+consumed, tasks.end(), std::back_inserter(_tasks));
Definition: taskflow.hpp:6
bool is_owner() const
queries if the caller is the owner of the executor
Definition: speculative_threadpool.hpp:136
size_t num_workers() const
queries the number of worker threads
Definition: speculative_threadpool.hpp:148
~SpeculativeThreadpool()
destructs the executor
Definition: speculative_threadpool.hpp:130
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: speculative_threadpool.hpp:247
Executor that implements a centralized task queue with a speculative execution strategy.
Definition: speculative_threadpool.hpp:46
SpeculativeThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: speculative_threadpool.hpp:123
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: speculative_threadpool.hpp:281