Cpp-Taskflow  2.1.0
simple_threadpool.hpp
1 // 2019/02/13 - modified by Tsung-Wei Huang
2 // - remove num_tasks method
3 //
4 // 2018/11/28 - modified by Chun-Xun Lin
5 //
6 // Added the method batch to insert a vector of tasks.
7 //
8 // 2018/10/04 - modified by Tsung-Wei Huang
9 //
10 // Removed shutdown, spawn, and wait_for_all to simplify the design
11 // of the threadpool. The threadpool now can operates on fixed memory
12 // closure to improve the performance.
13 //
14 // 2018/09/14 - modified by Guannan
15 // - added wait_for_all method
16 //
17 // 2018/04/01 - contributed by Tsung-Wei Huang and Chun-Xun Lin
18 //
19 // The basic threadpool implementation based on C++17.
20 
21 #pragma once
22 
23 #include <iostream>
24 #include <functional>
25 #include <vector>
26 #include <mutex>
27 #include <deque>
28 #include <thread>
29 #include <stdexcept>
30 #include <condition_variable>
31 #include <memory>
32 #include <future>
33 #include <cassert>
34 #include <unordered_set>
35 
36 namespace tf {
37 
46 template <typename Closure>
48 
49  public:
50 
56  explicit SimpleThreadpool(unsigned N);
57 
65 
73  template <typename... ArgsT>
74  void emplace(ArgsT&&... args);
75 
81  void batch(std::vector<Closure>&& closures);
82 
86  size_t num_workers() const;
87 
91  bool is_owner() const;
92 
93  private:
94 
96 
97  mutable std::mutex _mutex;
98 
99  std::condition_variable _worker_signal;
100  std::vector<Closure> _tasks;
101  std::vector<std::thread> _threads;
102 
103  bool _stop {false};
104 
105  void _spawn(unsigned);
106  void _shutdown();
107 };
108 
109 // Constructor
110 template <typename Closure>
112  _spawn(N);
113 }
114 
115 // Destructor
116 template <typename Closure>
118  _shutdown();
119 }
120 
121 template <typename Closure>
123  return _threads.size();
124 }
125 
126 // Function: is_owner
127 template <typename Closure>
129  return std::this_thread::get_id() == _owner;
130 }
131 
132 // Procedure: spawn
133 // The procedure spawns "n" threads monitoring the task queue and executing each task.
134 // After the task is finished, the thread reacts to the returned signal.
135 template <typename Closure>
136 void SimpleThreadpool<Closure>::_spawn(unsigned N) {
137 
138  assert(is_owner());
139 
140  for(size_t i=0; i<N; ++i) {
141 
142  _threads.emplace_back([this] () -> void {
143 
144  Closure task;
145 
146  std::unique_lock lock(_mutex);
147 
148  while(!_stop) {
149 
150  if(!_tasks.empty()) {
151  task = std::move(_tasks.back());
152  _tasks.pop_back();
153 
154  // execute the task
155  lock.unlock();
156  task();
157  lock.lock();
158  }
159  else {
160  while(_tasks.empty() && !_stop) {
161  _worker_signal.wait(lock);
162  }
163  }
164 
165  } // End of worker loop.
166 
167  });
168  }
169 }
170 
171 // Function: emplace
172 template <typename Closure>
173 template <typename... ArgsT>
174 void SimpleThreadpool<Closure>::emplace(ArgsT&&... args) {
175 
176  // No worker, do this right away.
177  if(num_workers() == 0) {
178  Closure{std::forward<ArgsT>(args)...}();
179  }
180  // Dispatch this to a thread.
181  else {
182  std::scoped_lock lock(_mutex);
183  _tasks.emplace_back(std::forward<ArgsT>(args)...);
184  _worker_signal.notify_one();
185  }
186 }
187 
188 
189 // Function: emplace
190 template <typename Closure>
192 
193  // No worker, do this right away.
194  if(num_workers() == 0) {
195  for(auto& t: tasks){
196  t();
197  }
198  return ;
199  }
200  // Dispatch this to a thread.
201  else {
202  bool notify_all = tasks.size() > 1;
203  {
204  std::scoped_lock lock(_mutex);
205  _tasks.reserve(_tasks.size() + tasks.size());
206  std::move(tasks.begin(), tasks.end(), std::back_inserter(_tasks));
207  }
208  if(notify_all) {
209  _worker_signal.notify_all();
210  }
211  else {
212  _worker_signal.notify_one();
213  }
214  }
215 }
216 
217 
218 // Procedure: shutdown
219 // Shut down the threadpool - only the owner can do this.
220 template <typename Closure>
222 
223  assert(is_owner());
224 
225  {
226  std::scoped_lock lock(_mutex);
227  _stop = true;
228  _worker_signal.notify_all();
229  }
230 
231  for(auto& t : _threads) {
232  t.join();
233  }
234 
235  _threads.clear();
236  _stop = false;
237 }
238 
239 } // end of namespace tf. ---------------------------------------------------
240 
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: simple_threadpool.hpp:191
Definition: taskflow.hpp:6
size_t num_workers() const
queries the number of worker threads
Definition: simple_threadpool.hpp:122
Executor that implements a centralized task queue with a simple execution strategy.
Definition: simple_threadpool.hpp:47
bool is_owner() const
queries if the caller is the owner of the executor
Definition: simple_threadpool.hpp:128
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: simple_threadpool.hpp:174
T get_id(T... args)
SimpleThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: simple_threadpool.hpp:111
~SimpleThreadpool()
destructs the executor
Definition: simple_threadpool.hpp:117