Cpp-Taskflow  2.0.0
proactive_threadpool.hpp
1 // 2018/11/28 - modified by Chun-Xun Lin
2 //
3 // Added the method batch to insert a vector of tasks.
4 //
5 // 2018/10/04 - modified by Tsung-Wei Huang
6 //
7 // Removed shutdown, spawn, and wait_for_all to simplify the design
8 // of the threadpool. The threadpool now can operates on fixed memory
9 // closure to improve the performance.
10 //
11 // 2018/09/11 - modified by Tsung-Wei Huang & Guannan
12 // - bug fix: shutdown method might hang due to dynamic tasking;
13 // it can be non-empty task queue while all threads are gone;
14 // workers need to be cleared as well under lock, since *_async
15 // will access the worker data structure;
16 // - renamed _worker to _idler
17 //
18 // 2018/09/03 - modified by Guannan Guo
19 //
20 // BasicProactiveThreadpool schedules independent jobs in a greedy manner.
21 // Whenever a job is inserted into the threadpool, the threadpool will check if there
22 // are any spare threads available. The spare thread will be woken through its local
23 // condition variable. The new job will be directly moved into
24 // this thread instead of pushed at the back of the pending queue.
25 
26 #pragma once
27 
28 #include <iostream>
29 #include <functional>
30 #include <vector>
31 #include <mutex>
32 #include <deque>
33 #include <thread>
34 #include <stdexcept>
35 #include <condition_variable>
36 #include <memory>
37 #include <future>
38 #include <unordered_set>
39 #include <optional>
40 
41 namespace tf {
42 
51 template <typename Closure>
53 
54  // Struct: Worker
55  struct Worker {
57  std::optional<Closure> task;
58  bool ready;
59  };
60 
61  public:
62 
68  ProactiveThreadpool(unsigned N);
69 
77 
81  size_t num_workers() const;
82 
86  bool is_owner() const;
87 
95  template <typename... ArgsT>
96  void emplace(ArgsT&&... args);
97 
103  void batch(std::vector<Closure> && closures);
104 
105  size_t num_tasks() const;
106 
107  private:
108 
110 
111  mutable std::mutex _mutex;
112 
113  std::vector<Closure> _tasks;
114  std::vector<std::thread> _threads;
115  std::vector<Worker*> _idlers;
116 
117  bool _exiting {false};
118 
119  void _shutdown();
120  void _spawn(unsigned);
121 };
122 
123 // Constructor
124 template <typename Closure>
126  _spawn(N);
127 }
128 
129 // Destructor
130 template <typename Closure>
132  _shutdown();
133 }
134 
135 // Ftion: is_owner
136 template <typename Closure>
138  return std::this_thread::get_id() == _owner;
139 }
140 
141 // Ftion: num_tasks
142 template <typename Closure>
144  return _tasks.size();
145 }
146 
147 // Ftion: num_workers
148 template <typename Closure>
150  return _threads.size();
151 }
152 
153 // Procedure: shutdown
154 template <typename Closure>
156 
157  assert(is_owner());
158 
159  {
160  std::unique_lock lock(_mutex);
161 
162  _exiting = true;
163 
164  // we need to clear the workers under lock
165  for(auto w : _idlers){
166  w->ready = true;
167  w->task = std::nullopt;
168  w->cv.notify_one();
169  }
170  _idlers.clear();
171  }
172 
173  for(auto& t : _threads){
174  t.join();
175  }
176  _threads.clear();
177 
178  _exiting = false;
179 }
180 
181 // Procedure: spawn
182 template <typename Closure>
183 void ProactiveThreadpool<Closure>::_spawn(unsigned N) {
184 
185  assert(is_owner());
186 
187  for(size_t i=0; i<N; ++i){
188 
189  _threads.emplace_back([this] () -> void {
190 
191  Worker w;
192 
193  std::unique_lock lock(_mutex);
194 
195  while(!_exiting) {
196 
197  if(_tasks.empty()){
198 
199  w.ready = false;
200  _idlers.push_back(&w);
201 
202  while(!w.ready) {
203  w.cv.wait(lock);
204  }
205 
206  // shutdown cannot have task
207  if(w.task) {
208  lock.unlock();
209  (*w.task)();
210  w.task = std::nullopt;
211  lock.lock();
212  }
213  }
214  else{
215  Closure t{std::move(_tasks.back())};
216  _tasks.pop_back();
217  lock.unlock();
218  t();
219  lock.lock();
220  }
221  }
222  });
223 
224  }
225 }
226 
227 // Procedure: silent_async
228 template <typename Closure>
229 template <typename... ArgsT>
231 
232  //no worker thread available
233  if(num_workers() == 0){
234  Closure{std::forward<ArgsT>(args)...}();
235  }
236  // ask one worker to run the task
237  else {
238  std::scoped_lock lock(_mutex);
239  if(_idlers.empty()){
240  _tasks.emplace_back(std::forward<ArgsT>(args)...);
241  }
242  else{
243  Worker* w = _idlers.back();
244  _idlers.pop_back();
245  w->ready = true;
246  w->task.emplace(std::forward<ArgsT>(args)...);
247  w->cv.notify_one();
248  }
249  }
250 }
251 
252 
253 // Procedure: batch
254 template <typename Closure>
256 
257  //no worker thread available
258  if(num_workers() == 0){
259  for(auto& t: tasks){
260  t();
261  }
262  }
263  // ask one worker to run the task
264  else {
265  size_t consumed {0};
266  std::scoped_lock lock(_mutex);
267  if(_idlers.empty()){
268  std::move(tasks.begin(), tasks.end(), std::back_inserter(_tasks));
269  return ;
270  }
271  else{
272  while(consumed != tasks.size() && !_idlers.empty()) {
273  Worker* w = _idlers.back();
274  _idlers.pop_back();
275  w->ready = true;
276  w->task.emplace(std::move(tasks[consumed++]));
277  w->cv.notify_one();
278  }
279  }
280  if(consumed == tasks.size()) return ;
281  _tasks.reserve(_tasks.size() + tasks.size() - consumed);
282  std::move(tasks.begin()+consumed, tasks.end(), std::back_inserter(_tasks));
283  }
284 }
285 
286 }; // namespace tf -----------------------------------------------------------
287 
288 
289 
size_t num_workers() const
queries the number of worker threads
Definition: proactive_threadpool.hpp:149
~ProactiveThreadpool()
destructs the executor
Definition: proactive_threadpool.hpp:131
ProactiveThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: proactive_threadpool.hpp:125
Executor that implements a centralized task queue with a proactive execution strategy.
Definition: proactive_threadpool.hpp:52
Definition: taskflow.hpp:6
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: proactive_threadpool.hpp:230
T get_id(T... args)
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: proactive_threadpool.hpp:255
bool is_owner() const
queries if the caller is the owner of the executor
Definition: proactive_threadpool.hpp:137