Cpp-Taskflow  2.1.0
speculative_threadpool.hpp
1 // 2019/02/10 - modified by Tsung-Wei Huang
2 // - removed num_tasks method
3 //
4 // 2018/11/28 - modified by Chun-Xun Lin
5 //
6 // Added the method batch to insert a vector of tasks.
7 //
8 // 2018/10/04 - modified by Tsung-Wei Huang
9 //
10 // Removed shutdown, spawn, and wait_for_all to simplify the design
11 // of the threadpool. The threadpool now can operates on fixed memory
12 // closure to improve the performance.
13 //
14 // 2018/09/12 - created by Tsung-Wei Huang and Chun-Xun Lin
15 //
16 // Speculative threadpool is similar to proactive threadpool except
17 // each thread will speculatively move a new task to its local worker
18 // data structure to reduce extract hit to the task queue.
19 // This can save time from locking the mutex during dynamic tasking.
20 
21 #pragma once
22 
23 #include <iostream>
24 #include <functional>
25 #include <vector>
26 #include <mutex>
27 #include <deque>
28 #include <thread>
29 #include <stdexcept>
30 #include <condition_variable>
31 #include <memory>
32 #include <future>
33 #include <unordered_set>
34 #include <unordered_map>
35 #include <optional>
36 #include <cassert>
37 
38 namespace tf {
39 
48 template <typename Closure>
50 
51  struct Worker {
53  std::optional<Closure> task;
54  bool ready {false};
55  };
56 
57  public:
58 
64  SpeculativeThreadpool(unsigned N);
65 
73 
77  size_t num_workers() const;
78 
82  bool is_owner() const;
83 
91  template <typename... ArgsT>
92  void emplace(ArgsT&&... args);
93 
99  void batch(std::vector<Closure>&& closures);
100 
101  private:
102 
103  const std::thread::id _owner {std::this_thread::get_id()};
104 
105  mutable std::mutex _mutex;
106 
107  std::vector<Closure> _tasks;
108  std::vector<std::thread> _threads;
109  std::vector<Worker*> _idlers;
110  std::vector<Worker> _workers;
112 
113  bool _exiting {false};
114 
115  auto _this_worker() const;
116 
117  void _shutdown();
118  void _spawn(unsigned);
119 
120 }; // class BasicSpeculativeThreadpool. --------------------------------------
121 
122 // Constructor
123 template <typename Closure>
125  _workers {N} {
126  _spawn(N);
127 }
128 
129 // Destructor
130 template <typename Closure>
132  _shutdown();
133 }
134 
135 // Function: is_owner
136 template <typename Closure>
138  return std::this_thread::get_id() == _owner;
139 }
140 
141 // Function: num_workers
142 template <typename Closure>
144  return _threads.size();
145 }
146 
147 // Function: _this_worker
148 template <typename Closure>
150  auto id = std::this_thread::get_id();
151  return _worker_maps.find(id);
152 }
153 
154 // Function: shutdown
155 template <typename Closure>
156 void SpeculativeThreadpool<Closure>::_shutdown(){
157 
158  assert(is_owner());
159 
160  {
161  std::unique_lock lock(_mutex);
162 
163  _exiting = true;
164 
165  for(auto w : _idlers){
166  w->ready = true;
167  w->task = std::nullopt;
168  w->cv.notify_one();
169  }
170  _idlers.clear();
171  }
172 
173  for(auto& t : _threads){
174  t.join();
175  }
176  _threads.clear();
177 
178  _workers.clear();
179  _worker_maps.clear();
180 
181  _exiting = false;
182 }
183 
184 // Function: spawn
185 template <typename Closure>
186 void SpeculativeThreadpool<Closure>::_spawn(unsigned N) {
187 
188  assert(is_owner() && _workers.size() == N);
189 
190  // Lock to synchronize all workers before creating _worker_mapss
191  std::scoped_lock lock(_mutex);
192 
193  for(size_t i=0; i<N; ++i){
194 
195  _threads.emplace_back([this, &w=_workers[i]]() -> void {
196 
197  std::optional<Closure> t;
198 
199  std::unique_lock lock(_mutex);
200 
201  while(!_exiting){
202  if(_tasks.empty()){
203  w.ready = false;
204  _idlers.push_back(&w);
205 
206  while(!w.ready) {
207  w.cv.wait(lock);
208  }
209 
210  t = std::move(w.task);
211  w.task = std::nullopt;
212  }
213  else{
214  t = std::move(_tasks.back());
215  _tasks.pop_back();
216  }
217 
218  if(t) {
219  lock.unlock();
220  // speculation loop
221  while(t) {
222  (*t)();
223  if(w.task) {
224  t = std::move(w.task);
225  w.task = std::nullopt;
226  }
227  else {
228  t = std::nullopt;
229  }
230  }
231  lock.lock();
232  }
233  }
234  });
235 
236  _worker_maps[_threads[i].get_id()] = &_workers[i];
237  } // End of For ---------------------------------------------------------------------------------
238 }
239 
240 template <typename Closure>
241 template <typename... ArgsT>
243 
244  //no worker thread available
245  if(num_workers() == 0){
246  Closure{std::forward<ArgsT>(args)...}();
247  return;
248  }
249 
250  // speculation
251  auto tid = std::this_thread::get_id();
252 
253  if(tid != _owner){
254  auto iter = _worker_maps.find(tid);
255  if(iter != _worker_maps.end() && !(iter->second->task)){
256  iter->second->task.emplace(std::forward<ArgsT>(args)...);
257  return ;
258  }
259  }
260 
261  std::scoped_lock lock(_mutex);
262  if(_idlers.empty()){
263  _tasks.emplace_back(std::forward<ArgsT>(args)...);
264  }
265  else{
266  Worker* w = _idlers.back();
267  _idlers.pop_back();
268  w->ready = true;
269  w->task.emplace(std::forward<ArgsT>(args)...);
270  w->cv.notify_one();
271  }
272 }
273 
274 
275 template <typename Closure>
277 
278  if(tasks.empty()) {
279  return;
280  }
281 
282  //no worker thread available
283  if(num_workers() == 0){
284  for(auto& c: tasks){
285  c();
286  }
287  return;
288  }
289 
290  size_t consumed {0};
291 
292  // speculation
293  if(std::this_thread::get_id() != _owner){
294  auto iter = _this_worker();
295  if(iter != _worker_maps.end() && !(iter->second->task)){
296  iter->second->task.emplace(std::move(tasks[consumed++]));
297  if(tasks.size() == consumed) {
298  return ;
299  }
300  }
301  }
302 
303  std::scoped_lock lock(_mutex);
304  while(!_idlers.empty() && tasks.size() != consumed) {
305  Worker* w = _idlers.back();
306  _idlers.pop_back();
307  w->ready = true;
308  w->task.emplace(std::move(tasks[consumed ++]));
309  w->cv.notify_one();
310  }
311 
312  if(tasks.size() == consumed) return ;
313  _tasks.reserve(_tasks.size() + tasks.size() - consumed);
314  std::move(tasks.begin()+consumed, tasks.end(), std::back_inserter(_tasks));
315 }
316 
317 
318 } // end of namespace tf. ---------------------------------------------------
319 
320 
321 
322 
323 
Definition: taskflow.hpp:6
bool is_owner() const
queries if the caller is the owner of the executor
Definition: speculative_threadpool.hpp:137
size_t num_workers() const
queries the number of worker threads
Definition: speculative_threadpool.hpp:143
~SpeculativeThreadpool()
destructs the executor
Definition: speculative_threadpool.hpp:131
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: speculative_threadpool.hpp:242
T lock(T... args)
Executor that implements a centralized task queue with a speculative execution strategy.
Definition: speculative_threadpool.hpp:49
T move(T... args)
T get_id(T... args)
SpeculativeThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: speculative_threadpool.hpp:124
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: speculative_threadpool.hpp:276