Cpp-Taskflow  2.0.0
speculative_threadpool.hpp
1 // 2018/11/28 - modified by Chun-Xun Lin
2 //
3 // Added the method batch to insert a vector of tasks.
4 //
5 // 2018/10/04 - modified by Tsung-Wei Huang
6 //
7 // Removed shutdown, spawn, and wait_for_all to simplify the design
8 // of the threadpool. The threadpool now can operates on fixed memory
9 // closure to improve the performance.
10 //
11 // 2018/09/12 - created by Tsung-Wei Huang and Chun-Xun Lin
12 //
13 // Speculative threadpool is similar to proactive threadpool except
14 // each thread will speculatively move a new task to its local worker
15 // data structure to reduce extract hit to the task queue.
16 // This can save time from locking the mutex during dynamic tasking.
17 
18 #pragma once
19 
20 #include <iostream>
21 #include <functional>
22 #include <vector>
23 #include <mutex>
24 #include <deque>
25 #include <thread>
26 #include <stdexcept>
27 #include <condition_variable>
28 #include <memory>
29 #include <future>
30 #include <unordered_set>
31 #include <unordered_map>
32 #include <optional>
33 
34 
35 namespace tf {
36 
45 template <typename Closure>
47 
48  struct Worker {
50  std::optional<Closure> task;
51  bool ready {false};
52  };
53 
54  public:
55 
61  SpeculativeThreadpool(unsigned N);
62 
70 
74  size_t num_workers() const;
75 
79  bool is_owner() const;
80 
88  template <typename... ArgsT>
89  void emplace(ArgsT&&... args);
90 
96  void batch(std::vector<Closure>&& closures);
97 
98  size_t num_tasks() const;
99 
100  private:
101 
102  const std::thread::id _owner {std::this_thread::get_id()};
103 
104  mutable std::mutex _mutex;
105 
106  std::vector<Closure> _tasks;
107  std::vector<std::thread> _threads;
108  std::vector<Worker*> _idlers;
109  std::vector<Worker> _workers;
111 
112  bool _exiting {false};
113 
114  auto _this_worker() const;
115 
116  void _shutdown();
117  void _spawn(unsigned);
118 
119 }; // class BasicSpeculativeThreadpool. --------------------------------------
120 
121 // Constructor
122 template <typename Closure>
124  _workers {N} {
125  _spawn(N);
126 }
127 
128 // Destructor
129 template <typename Closure>
131  _shutdown();
132 }
133 
134 // Function: is_owner
135 template <typename Closure>
137  return std::this_thread::get_id() == _owner;
138 }
139 
140 // Function: num_tasks
141 template <typename Closure>
143  return _tasks.size();
144 }
145 
146 // Function: num_workers
147 template <typename Closure>
149  return _threads.size();
150 }
151 
152 // Function: _this_worker
153 template <typename Closure>
155  auto id = std::this_thread::get_id();
156  return _worker_maps.find(id);
157 }
158 
159 // Function: shutdown
160 template <typename Closure>
162 
163  assert(is_owner());
164 
165  {
166  std::unique_lock lock(_mutex);
167 
168  _exiting = true;
169 
170  for(auto w : _idlers){
171  w->ready = true;
172  w->task = std::nullopt;
173  w->cv.notify_one();
174  }
175  _idlers.clear();
176  }
177 
178  for(auto& t : _threads){
179  t.join();
180  }
181  _threads.clear();
182 
183  _workers.clear();
184  _worker_maps.clear();
185 
186  _exiting = false;
187 }
188 
189 // Function: spawn
190 template <typename Closure>
192 
193  assert(is_owner() && _workers.size() == N);
194 
195  // Lock to synchronize all workers before creating _worker_mapss
196  std::scoped_lock lock(_mutex);
197 
198  for(size_t i=0; i<N; ++i){
199 
200  _threads.emplace_back([this, &w=_workers[i]]() -> void {
201 
202  std::optional<Closure> t;
203 
204  std::unique_lock lock(_mutex);
205 
206  while(!_exiting){
207  if(_tasks.empty()){
208  w.ready = false;
209  _idlers.push_back(&w);
210 
211  while(!w.ready) {
212  w.cv.wait(lock);
213  }
214 
215  t = std::move(w.task);
216  w.task = std::nullopt;
217  }
218  else{
219  t = std::move(_tasks.back());
220  _tasks.pop_back();
221  }
222 
223  if(t) {
224  lock.unlock();
225  // speculation loop
226  while(t) {
227  (*t)();
228  if(w.task) {
229  t = std::move(w.task);
230  w.task = std::nullopt;
231  }
232  else {
233  t = std::nullopt;
234  }
235  }
236  lock.lock();
237  }
238  }
239  });
240 
241  _worker_maps[_threads[i].get_id()] = &_workers[i];
242  } // End of For ---------------------------------------------------------------------------------
243 }
244 
245 template <typename Closure>
246 template <typename... ArgsT>
248 
249  //no worker thread available
250  if(num_workers() == 0){
251  Closure{std::forward<ArgsT>(args)...}();
252  return;
253  }
254 
255  // speculation
256  auto tid = std::this_thread::get_id();
257 
258  if(tid != _owner){
259  auto iter = _worker_maps.find(tid);
260  if(iter != _worker_maps.end() && !(iter->second->task)){
261  iter->second->task.emplace(std::forward<ArgsT>(args)...);
262  return ;
263  }
264  }
265 
266  std::scoped_lock lock(_mutex);
267  if(_idlers.empty()){
268  _tasks.emplace_back(std::forward<ArgsT>(args)...);
269  }
270  else{
271  Worker* w = _idlers.back();
272  _idlers.pop_back();
273  w->ready = true;
274  w->task.emplace(std::forward<ArgsT>(args)...);
275  w->cv.notify_one();
276  }
277 }
278 
279 
280 template <typename Closure>
282 
283  if(tasks.empty()) {
284  return;
285  }
286 
287  //no worker thread available
288  if(num_workers() == 0){
289  for(auto& c: tasks){
290  c();
291  }
292  return;
293  }
294 
295  size_t consumed {0};
296 
297  // speculation
298  if(std::this_thread::get_id() != _owner){
299  auto iter = _this_worker();
300  if(iter != _worker_maps.end() && !(iter->second->task)){
301  iter->second->task.emplace(std::move(tasks[consumed++]));
302  if(tasks.size() == consumed) {
303  return ;
304  }
305  }
306  }
307 
308  std::scoped_lock lock(_mutex);
309  while(!_idlers.empty() && tasks.size() != consumed) {
310  Worker* w = _idlers.back();
311  _idlers.pop_back();
312  w->ready = true;
313  w->task.emplace(std::move(tasks[consumed ++]));
314  w->cv.notify_one();
315  }
316 
317  if(tasks.size() == consumed) return ;
318  _tasks.reserve(_tasks.size() + tasks.size() - consumed);
319  std::move(tasks.begin()+consumed, tasks.end(), std::back_inserter(_tasks));
320 }
321 
322 
323 }; // end of namespace tf. ---------------------------------------------------
324 
325 
326 
327 
328 
Definition: taskflow.hpp:6
bool is_owner() const
queries if the caller is the owner of the executor
Definition: speculative_threadpool.hpp:136
size_t num_workers() const
queries the number of worker threads
Definition: speculative_threadpool.hpp:148
~SpeculativeThreadpool()
destructs the executor
Definition: speculative_threadpool.hpp:130
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: speculative_threadpool.hpp:247
Executor that implements a centralized task queue with a speculative execution strategy.
Definition: speculative_threadpool.hpp:46
T get_id(T... args)
SpeculativeThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: speculative_threadpool.hpp:123
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: speculative_threadpool.hpp:281