Cpp-Taskflow  2.0.0
simple_threadpool.hpp
1 // 2018/11/28 - modified by Chun-Xun Lin
2 //
3 // Added the method batch to insert a vector of tasks.
4 //
5 // 2018/10/04 - modified by Tsung-Wei Huang
6 //
7 // Removed shutdown, spawn, and wait_for_all to simplify the design
8 // of the threadpool. The threadpool now can operates on fixed memory
9 // closure to improve the performance.
10 //
11 // 2018/09/14 - modified by Guannan
12 // - added wait_for_all method
13 //
14 // 2018/04/01 - contributed by Tsung-Wei Huang and Chun-Xun Lin
15 //
16 // The basic threadpool implementation based on C++17.
17 
18 #pragma once
19 
20 #include <iostream>
21 #include <functional>
22 #include <vector>
23 #include <mutex>
24 #include <deque>
25 #include <thread>
26 #include <stdexcept>
27 #include <condition_variable>
28 #include <memory>
29 #include <future>
30 #include <unordered_set>
31 
32 namespace tf {
33 
42 template <typename Closure>
44 
45  public:
46 
52  explicit SimpleThreadpool(unsigned N);
53 
61 
69  template <typename... ArgsT>
70  void emplace(ArgsT&&... args);
71 
77  void batch(std::vector<Closure>&& closures);
78 
82  size_t num_workers() const;
83 
87  bool is_owner() const;
88 
89  size_t num_tasks() const;
90 
91  private:
92 
94 
95  mutable std::mutex _mutex;
96 
97  std::condition_variable _worker_signal;
98  std::vector<Closure> _tasks;
99  std::vector<std::thread> _threads;
100 
101  bool _stop {false};
102 
103  void _spawn(unsigned);
104  void _shutdown();
105 };
106 
107 // Constructor
108 template <typename Closure>
110  _spawn(N);
111 }
112 
113 // Destructor
114 template <typename Closure>
116  _shutdown();
117 }
118 
119 // Function: num_tasks
120 // Return the number of "unfinished" tasks.
121 // Notice that this value is not necessary equal to the size of the task_queue
122 // since the task can be popped out from the task queue while
123 // not yet finished.
124 template <typename Closure>
126  return _tasks.size();
127 }
128 
129 template <typename Closure>
131  return _threads.size();
132 }
133 
134 // Function: is_owner
135 template <typename Closure>
137  return std::this_thread::get_id() == _owner;
138 }
139 
140 // Procedure: spawn
141 // The procedure spawns "n" threads monitoring the task queue and executing each task.
142 // After the task is finished, the thread reacts to the returned signal.
143 template <typename Closure>
144 void SimpleThreadpool<Closure>::_spawn(unsigned N) {
145 
146  assert(is_owner());
147 
148  for(size_t i=0; i<N; ++i) {
149 
150  _threads.emplace_back([this] () -> void {
151 
152  Closure task;
153 
154  std::unique_lock lock(_mutex);
155 
156  while(!_stop) {
157 
158  if(!_tasks.empty()) {
159  task = std::move(_tasks.back());
160  _tasks.pop_back();
161 
162  // execute the task
163  lock.unlock();
164  task();
165  lock.lock();
166  }
167  else {
168  while(_tasks.empty() && !_stop) {
169  _worker_signal.wait(lock);
170  }
171  }
172 
173  } // End of worker loop.
174 
175  });
176  }
177 }
178 
179 // Function: emplace
180 template <typename Closure>
181 template <typename... ArgsT>
182 void SimpleThreadpool<Closure>::emplace(ArgsT&&... args) {
183 
184  // No worker, do this right away.
185  if(num_workers() == 0) {
186  Closure{std::forward<ArgsT>(args)...}();
187  }
188  // Dispatch this to a thread.
189  else {
190  std::scoped_lock lock(_mutex);
191  _tasks.emplace_back(std::forward<ArgsT>(args)...);
192  _worker_signal.notify_one();
193  }
194 }
195 
196 
197 // Function: emplace
198 template <typename Closure>
200 
201  // No worker, do this right away.
202  if(num_workers() == 0) {
203  for(auto& t: tasks){
204  t();
205  }
206  return ;
207  }
208  // Dispatch this to a thread.
209  else {
210  bool notify_all = tasks.size() > 1;
211  {
212  std::scoped_lock lock(_mutex);
213  _tasks.reserve(_tasks.size() + tasks.size());
214  std::move(tasks.begin(), tasks.end(), std::back_inserter(_tasks));
215  }
216  if(notify_all) {
217  _worker_signal.notify_all();
218  }
219  else {
220  _worker_signal.notify_one();
221  }
222  }
223 }
224 
225 
226 // Procedure: shutdown
227 // Shut down the threadpool - only the owner can do this.
228 template <typename Closure>
230 
231  assert(is_owner());
232 
233  {
234  std::scoped_lock lock(_mutex);
235  _stop = true;
236  _worker_signal.notify_all();
237  }
238 
239  for(auto& t : _threads) {
240  t.join();
241  }
242 
243  _threads.clear();
244  _stop = false;
245 }
246 
247 }; // end of namespace tf. ---------------------------------------------------
248 
void batch(std::vector< Closure > &&closures)
moves a batch of closures to the executor
Definition: simple_threadpool.hpp:199
Definition: taskflow.hpp:6
size_t num_workers() const
queries the number of worker threads
Definition: simple_threadpool.hpp:130
Executor that implements a centralized task queue with a simple execution strategy.
Definition: simple_threadpool.hpp:43
bool is_owner() const
queries if the caller is the owner of the executor
Definition: simple_threadpool.hpp:136
void emplace(ArgsT &&... args)
constructs the closure in place in the executor
Definition: simple_threadpool.hpp:182
T get_id(T... args)
SimpleThreadpool(unsigned N)
constructs the executor with a given number of worker threads
Definition: simple_threadpool.hpp:109
~SimpleThreadpool()
destructs the executor
Definition: simple_threadpool.hpp:115