Cpp-Taskflow  2.1.0
proactive_threadpool.hpp
1 // 2019/02/10 - modified by Tsung-Wei Huang
2 // - removed num_tasks method
3 //
4 // 2018/11/28 - modified by Chun-Xun Lin
5 //
6 // Added the method batch to insert a vector of tasks.
7 //
8 // 2018/10/04 - modified by Tsung-Wei Huang
9 //
10 // Removed shutdown, spawn, and wait_for_all to simplify the design
11 // of the threadpool. The threadpool now can operates on fixed memory
12 // closure to improve the performance.
13 //
14 // 2018/09/11 - modified by Tsung-Wei Huang & Guannan
15 // - bug fix: shutdown method might hang due to dynamic tasking;
16 // it can be non-empty task queue while all threads are gone;
17 // workers need to be cleared as well under lock, since *_async
18 // will access the worker data structure;
19 // - renamed _worker to _idler
20 //
21 // 2018/09/03 - modified by Guannan Guo
22 //
23 // BasicProactiveThreadpool schedules independent jobs in a greedy manner.
24 // Whenever a job is inserted into the threadpool, the threadpool will check if there
25 // are any spare threads available. The spare thread will be woken through its local
26 // condition variable. The new job will be directly moved into
27 // this thread instead of pushed at the back of the pending queue.
28 
29 #pragma once
30 
31 #include <iostream>
32 #include <functional>
33 #include <vector>
34 #include <mutex>
35 #include <deque>
36 #include <thread>
37 #include <stdexcept>
38 #include <condition_variable>
39 #include <memory>
40 #include <future>
41 #include <unordered_set>
42 #include <optional>
43 #include <cassert>
44 
45 namespace tf {
46 
55 template <typename Closure>
57 
58  // Struct: Worker
59  struct Worker {
61  std::optional<Closure> task;
62  bool ready;
63  };
64 
65  public:
66 
72  ProactiveThreadpool(unsigned N);
73 
81 
85  size_t num_workers() const;
86 
90  bool is_owner() const;
91 
99  template <typename... ArgsT>
100  void emplace(ArgsT&&... args);
101 
107  void batch(std::vector<Closure> && closures);
108 
109  private:
110 
112 
113  mutable std::mutex _mutex;
114 
115  std::vector<Closure> _tasks;
116  std::vector<std::thread> _threads;
117  std::vector<Worker*> _idlers;
118 
119  bool _exiting {false};
120 
121  void _shutdown();
122  void _spawn(unsigned);
123 };
124 
125 // Constructor
126 template <typename Closure>
128  _spawn(N);
129 }
130 
131 // Destructor
132 template <typename Closure>
134  _shutdown();
135 }
136 
137 // Ftion: is_owner
138 template <typename Closure>
140  return std::this_thread::get_id() == _owner;
141 }
142 
143 // Ftion: num_workers
144 template <typename Closure>
146  return _threads.size();
147 }
148 
149 // Procedure: shutdown
150 template <typename Closure>
152 
153  assert(is_owner());
154 
155  {
156  std::unique_lock lock(_mutex);
157 
158  _exiting = true;
159 
160  // we need to clear the workers under lock
161  for(auto w : _idlers){
162  w->ready = true;
163  w->task = std::nullopt;
164  w->cv.notify_one();
165  }
166  _idlers.clear();
167  }
168 
169  for(auto& t : _threads){
170  t.join();
171  }
172  _threads.clear();
173 
174  _exiting = false;
175 }
176 
177 // Procedure: spawn
178 template <typename Closure>
179 void ProactiveThreadpool<Closure>::_spawn(unsigned N) {
180 
181  assert(is_owner());
182 
183  for(size_t i=0; i<N; ++i){
184 
185  _threads.emplace_back([this] () -> void {
186 
187  Worker w;
188 
189  std::unique_lock lock(_mutex);
190 
191  while(!_exiting) {
192 
193  if(_tasks.empty()){
194 
195  w.ready = false;
196  _idlers.push_back(&w);
197 
198  while(!w.ready) {
199  w.cv.wait(lock);
200  }
201 
202  // shutdown cannot have task
203  if(w.task) {
204  lock.unlock();
205  (*w.task)();
206  w.task = std::nullopt;
207  lock.lock();
208  }
209  }
210  else{
211  Closure t{std::move(_tasks.back())};
212  _tasks.pop_back();
213  lock.unlock();
214  t();
215  lock.lock();
216  }
217  }
218  });
219 
220  }
221 }
222 
223 // Procedure: silent_async
224 template <typename Closure>
225 template <typename... ArgsT>
227 
228  //no worker thread available
229  if(num_workers() == 0){
230  Closure{std::forward<ArgsT>(args)...}();
231  }
232  // ask one worker to run the task
233  else {
234  std::scoped_lock lock(_mutex);
235  if(_idlers.empty()){
236  _tasks.emplace_back(std::forward<ArgsT>(args)...);
237  }
238  else{
239  Worker* w = _idlers.back();
240  _idlers.pop_back();
241  w->ready = true;
242  w->task.emplace(std::forward<ArgsT>(args)...);
243  w->cv.notify_one();
244  }
245  }
246 }
247 
248 
249 // Procedure: batch
250 template <typename Closure>
252 
253  //no worker thread available
254  if(num_workers() == 0){
255  for(auto& t: tasks){
256  t();
257  }
258  }
259  // ask one worker to run the task
260  else {
261  size_t consumed {0};
262  std::scoped_lock lock(_mutex);
263  if(_idlers.empty()){
264  std::move(tasks.begin(), tasks.end(), std::back_inserter(_tasks));
265  return ;
266  }
267  else{
268  while(consumed != tasks.size() && !_idlers.empty()) {
269  Worker* w = _idlers.back();
270  _idlers.pop_back();
271  w->ready = true;
272  w->task.emplace(std::move(tasks[consumed++]));
273  w->cv.notify_one();
274  }
275  }
276  if(consumed == tasks.size()) return ;
277  _tasks.reserve(_tasks.size() + tasks.size() - consumed);
278  std::move(tasks.begin()+consumed, tasks.end(), std::back_inserter(_tasks));
279  }
280 }
281 
282 } // namespace tf -----------------------------------------------------------
283 
284 
285 
size_t num_workers() const
queries the number of worker threads
Definition: proactive_threadpool.hpp:145
~ProactiveThreadpool()
destructs the executor
Definition: proactive_threadpool.hpp:133
ProactiveThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: proactive_threadpool.hpp:127
Executor that implements a centralized task queue with a proactive execution strategy.
Definition: proactive_threadpool.hpp:56
Definition: taskflow.hpp:6
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: proactive_threadpool.hpp:226
T lock(T... args)
T move(T... args)
T get_id(T... args)
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: proactive_threadpool.hpp:251
bool is_owner() const
queries if the caller is the owner of the executor
Definition: proactive_threadpool.hpp:139