ffead.server.doc
ThreadPool.cpp
1 /*
2  Copyright 2009-2012, Sumeet Chhetri
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 /*
17  * ThreadPool.cpp
18  *
19  * Created on: Mar 23, 2010
20  * Author: sumeet
21  */
22 #include "ThreadPool.h"
23 
24 ThreadPool::ThreadPool()
25 {
26  this->runFlag = false;
27  joinComplete = false;
28  logger = Logger::getLogger("ThreadPool");
29 }
30 
31 void ThreadPool::init(int initThreads, int maxThreads,bool console)
32 {
33  if(wpool!=NULL)return;
34  this->console = console;
35  this->lowp = -1;
36  this->highp = -1;
37  this->initThreads = initThreads;
38  this->maxThreads = maxThreads;
39  joinComplete = false;
40  prioritypooling = false;
41  initializeThreads();
42  start();
43 }
44 
45 ThreadPool::ThreadPool(int initThreads, int maxThreads, int lowp, int highp) {
46  if (lowp > highp)
47  throw "Low Priority should be less than Highest Priority";
48  logger = Logger::getLogger("ThreadPool");
49  this->console = false;
50  this->initThreads = initThreads;
51  this->maxThreads = maxThreads;
52  this->lowp = lowp;
53  this->highp = highp;
54  this->runFlag = false;
55  joinComplete = false;
56  prioritypooling = true;
57  initializeThreads();
58 }
59 ThreadPool::ThreadPool(int initThreads, int maxThreads, int lowp, int highp,bool console) {
60  this->console = console;
61  if (lowp > highp)
62  throw "Low Priority should be less than Highest Priority";
63  logger = Logger::getLogger("ThreadPool");
64  this->initThreads = initThreads;
65  this->maxThreads = maxThreads;
66  this->lowp = lowp;
67  this->highp = highp;
68  this->runFlag = false;
69  joinComplete = false;
70  prioritypooling = true;
71  initializeThreads();
72 }
73 
74 ThreadPool::ThreadPool(int initThreads, int maxThreads) {
75  logger = Logger::getLogger("ThreadPool");
76  this->lowp = -1;
77  this->highp = -1;
78  this->console = false;
79  this->initThreads = initThreads;
80  this->maxThreads = maxThreads;
81  this->runFlag = false;
82  joinComplete = false;
83  prioritypooling = false;
84  initializeThreads();
85 }
86 
87 ThreadPool::ThreadPool(int initThreads, int maxThreads,bool console) {
88  logger = Logger::getLogger("ThreadPool");
89  this->console = console;
90  this->lowp = -1;
91  this->highp = -1;
92  this->initThreads = initThreads;
93  this->maxThreads = maxThreads;
94  this->runFlag = false;
95  joinComplete = false;
96  prioritypooling = false;
97  initializeThreads();
98 }
99 
100 void ThreadPool::initializeThreads()
101 {
102  if(runFlag)return;
103  wpool = new TaskPool;
104  wpool->console = console;
105  tpool = new vector<PoolThread*>;
106  for (int i = 0; i < initThreads; i++) {
107  PoolThread *thread = new PoolThread(console);
108  thread->execute();
109  tpool->push_back(thread);
110  }
111  runFlag = true;
112  poller = new Thread(&ThreadPool::poll, this);
113  wpool->start();
114  pollerStarted = false;
115  complete = false;
116  m_mutex = new Mutex;
117 }
118 
119 void ThreadPool::start()
120 {
121  if(pollerStarted)return;
122  poller->execute();
123  pollerStarted = true;
124 }
125 
126 void* ThreadPool::poll(void *arg) {
127  ThreadPool* ths = (ThreadPool*)arg;
128  ths->m_mutex->lock();
129  bool fl = ths->runFlag;
130  ths->m_mutex->unlock();
131  while (fl) {
132  if (!ths->prioritypooling && ths->wpool->tasksPending()) {
133  Task *task = ths->wpool->getTask();
134  if(task!=NULL)
135  {
136  ths->submit(task);
137  }
138  } else if (ths->prioritypooling && ths->wpool->tasksPPending()) {
139  Task *task = ths->wpool->getPTask();
140  if(task!=NULL)
141  {
142  ths->submit(task);
143  }
144  }
145  Thread::mSleep(1);
146  ths->m_mutex->lock();
147  fl = ths->runFlag;
148  ths->m_mutex->unlock();
149  }
150  ths->m_mutex->lock();
151  ths->complete = true;
152  ths->m_mutex->unlock();
153  return NULL;
154 }
155 
156 void ThreadPool::submit(Task *task) {
157  bool flag = true;
158  while (flag) {
159  for (unsigned int var = 0; var < tpool->size(); var++) {
160  if (tpool->at(var)->isIdle()) {
161  tpool->at(var)->checkout(task);
162  flag = false;
163  break;
164  }
165  }
166  Thread::mSleep(1);
167  }
168 }
169 
170 void ThreadPool::joinAll() {
171  while (!joinComplete) {
172  if (!prioritypooling) {
173  while (wpool->tasksPending()) {
174  Thread::sSleep(1);
175  }
176  } else {
177  while (wpool->tasksPPending()) {
178  Thread::sSleep(1);
179  }
180  }
181  int i = 0;
182  for (unsigned int var = 0; var < tpool->size(); var++) {
183  if (tpool->at(var)->isIdle()) {
184  i++;
185  }
186  }
187  if (i == initThreads) {
188  joinComplete = true;
189  break;
190  } else {
191  Thread::sSleep(1);
192  }
193  }
194 }
195 void ThreadPool::execute(Task &task, int priority) {
196 
197  if(console)
198  {
199  logger << "Adding task to wpool\n" << flush;
200  }
201  task.tunit = -1;
202  task.type = -1;
203  task.priority = priority;
204  task.console = console;
205  if (!prioritypooling) {
206  wpool->addTask(task);
207  } else {
208  wpool->addPTask(task);
209  }
210 }
211 void ThreadPool::execute(Task &task) {
212 
213  if(console)
214  {
215  logger << "Adding task to wpool\n" << flush;
216  }
217  task.tunit = -1;
218  task.type = -1;
219  task.priority = -1;
220  task.console = console;
221  if (!prioritypooling) {
222  wpool->addTask(task);
223  } else {
224  wpool->addPTask(task);
225  }
226 }
227 void ThreadPool::schedule(Task &task, long long tunit, int type) {
228 
229  if(console)
230  {
231  logger << "Added task to wpool\n" << flush;
232  }
233  task.tunit = tunit;
234  task.type = type;
235  task.priority = -1;
236  task.console = console;
237  if (!prioritypooling) {
238  wpool->addTask(task);
239  } else {
240  wpool->addPTask(task);
241  }
242 }
243 
244 ThreadPool::~ThreadPool() {
245  while(!joinComplete) {
246  joinAll();
247  Thread::mSleep(1);
248  }
249  this->m_mutex->lock();
250  this->runFlag = false;
251  this->m_mutex->unlock();
252 
253  m_mutex->lock();
254  bool fl = this->complete;
255  m_mutex->unlock();
256  while(!fl)
257  {
258  m_mutex->lock();
259  fl = this->complete;
260  m_mutex->unlock();
261  Thread::mSleep(1);
262  }
263  delete poller;
264  delete wpool;
265  for (int i = 0; i <(int)tpool->size(); i++) {
266  delete tpool->at(i);
267  }
268  delete m_mutex;
269  if(console)
270  {
271  logger << "Destroyed PoolThread Pool\n" << flush;
272  }
273 }