30 #include <condition_variable> 33 #include <unordered_set> 34 #include <unordered_map> 48 template <
typename Closure>
53 std::optional<Closure> task;
91 template <
typename... ArgsT>
113 bool _exiting {
false};
115 auto _this_worker()
const;
118 void _spawn(
unsigned);
123 template <
typename Closure>
130 template <
typename Closure>
136 template <
typename Closure>
142 template <
typename Closure>
144 return _threads.size();
148 template <
typename Closure>
151 return _worker_maps.find(
id);
155 template <
typename Closure>
156 void SpeculativeThreadpool<Closure>::_shutdown(){
165 for(
auto w : _idlers){
167 w->task = std::nullopt;
173 for(
auto& t : _threads){
179 _worker_maps.clear();
185 template <
typename Closure>
186 void SpeculativeThreadpool<Closure>::_spawn(
unsigned N) {
188 assert(is_owner() && _workers.size() == N);
191 std::scoped_lock
lock(_mutex);
193 for(
size_t i=0; i<N; ++i){
195 _threads.emplace_back([
this, &w=_workers[i]]() ->
void {
197 std::optional<Closure> t;
204 _idlers.push_back(&w);
211 w.task = std::nullopt;
225 w.task = std::nullopt;
236 _worker_maps[_threads[i].get_id()] = &_workers[i];
240 template <
typename Closure>
241 template <
typename... ArgsT>
245 if(num_workers() == 0){
246 Closure{std::forward<ArgsT>(args)...}();
254 auto iter = _worker_maps.find(tid);
255 if(iter != _worker_maps.end() && !(iter->second->task)){
256 iter->second->task.emplace(std::forward<ArgsT>(args)...);
261 std::scoped_lock lock(_mutex);
263 _tasks.emplace_back(std::forward<ArgsT>(args)...);
266 Worker* w = _idlers.back();
269 w->task.emplace(std::forward<ArgsT>(args)...);
275 template <
typename Closure>
283 if(num_workers() == 0){
294 auto iter = _this_worker();
295 if(iter != _worker_maps.end() && !(iter->second->task)){
296 iter->second->task.emplace(std::move(tasks[consumed++]));
297 if(tasks.size() == consumed) {
303 std::scoped_lock lock(_mutex);
304 while(!_idlers.empty() && tasks.size() != consumed) {
305 Worker* w = _idlers.back();
308 w->task.emplace(std::move(tasks[consumed ++]));
312 if(tasks.size() == consumed)
return ;
313 _tasks.reserve(_tasks.size() + tasks.size() - consumed);
314 std::move(tasks.begin()+consumed, tasks.end(), std::back_inserter(_tasks));
Definition: taskflow.hpp:6
bool is_owner() const
queries if the caller is the owner of the executor
Definition: speculative_threadpool.hpp:137
size_t num_workers() const
queries the number of worker threads
Definition: speculative_threadpool.hpp:143
~SpeculativeThreadpool()
destructs the executor
Definition: speculative_threadpool.hpp:131
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: speculative_threadpool.hpp:242
Executor that implements a centralized task queue with a speculative execution strategy.
Definition: speculative_threadpool.hpp:49
SpeculativeThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: speculative_threadpool.hpp:124
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: speculative_threadpool.hpp:276