ffead.server.doc
TaskPool.cpp
1 /*
2  Copyright 2009-2012, Sumeet Chhetri
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 /*
17  * TaskPool.cpp
18  *
19  * Created on: Mar 23, 2010
20  * Author: sumeet
21  */
22 
23 #include "TaskPool.h"
24 
25 void* TaskPool::run(void *arg)
26 {
27  TaskPool* pool = (TaskPool*)arg;
28  vector<Task*>::iterator iter;
29  pool->m_mutex->lock();
30  bool fl = pool->runFlag;
31  pool->m_mutex->unlock();
32  while(fl)
33  {
34  pool->m_mutex->lock();
35  int total = pool->scheduledtasks->size();
36  pool->m_mutex->unlock();
37  std::queue<int> tobeRemoved;
38  for (int i=0; i<total;i++) {
39  pool->m_mutex->lock();
40  Task* task = pool->scheduledtasks->at(i);
41  Timer* timer = pool->scheduledTimers->at(i);
42  pool->m_mutex->unlock();
43  if(task!=NULL)
44  {
45  if(task->isWaitOver(timer))
46  {
47  tobeRemoved.push(i);
48  pool->m_mutex->lock();
49  pool->tasks->push(task);
50  pool->m_mutex->unlock();
51  }
52  }
53  }
54  int counter = 0;
55  while (!tobeRemoved.empty()) {
56  int index = tobeRemoved.front() - counter;
57  counter++;
58  tobeRemoved.pop();
59  pool->m_mutex->lock();
60  Timer* timer = pool->scheduledTimers->at(index);
61  pool->scheduledtasks->erase(pool->scheduledtasks->begin()+index);
62  pool->scheduledTimers->erase(pool->scheduledTimers->begin()+index);
63  delete timer;
64  pool->m_mutex->unlock();
65  }
66  Thread::nSleep(1);
67  pool->m_mutex->lock();
68  fl = pool->runFlag;
69  pool->m_mutex->unlock();
70  }
71  pool->m_mutex->lock();
72  pool->complete = true;
73  pool->m_mutex->unlock();
74  return NULL;
75 }
76 
77 TaskPool::TaskPool() {
78  m_mutex = new Mutex();
79  tasks = new std::queue<Task*>;
80  ptasks = new list<Task*>;
81  scheduledtasks = new vector<Task*>;
82  scheduledTimers = new vector<Timer*>;
83  runFlag = true;
84  complete = false;
85  mthread = new Thread(&run, this);
86  thrdStarted = false;
87 }
88 
89 void TaskPool::start() {
90  if(thrdStarted)return;
91  mthread->execute();
92  thrdStarted = true;
93 }
94 
95 void TaskPool::addTask(Task &task) {
96  m_mutex->lock();
97  if (task.type >= 0 && task.type <= 6 && task.tunit > 0)
98  {
99  Timer* t = new Timer;
100  t->start();
101  scheduledTimers->push_back(t);
102  scheduledtasks->push_back(&task);
103  }
104  else
105  {
106  tasks->push(&task);
107  }
108  m_mutex->unlock();
109 }
110 
111 void TaskPool::addTask(Task *task) {
112  m_mutex->lock();
113  if (task->type >= 0 && task->type <= 6 && task->tunit > 0)
114  {
115  Timer* t = new Timer;
116  t->start();
117  scheduledTimers->push_back(t);
118  scheduledtasks->push_back(task);
119  }
120  else
121  {
122  tasks->push(task);
123  }
124  m_mutex->unlock();
125 }
126 
127 void TaskPool::addPTask(Task &task) {
128  m_mutex->lock();
129  ptasks->push_back(&task);
130  m_mutex->unlock();
131 }
132 void TaskPool::addPTask(Task *task) {
133  m_mutex->lock();
134  ptasks->push_back(task);
135  m_mutex->unlock();
136 }
137 Task* TaskPool::getTask() {
138  Task *task = NULL;
139  m_mutex->lock();
140  if(!tasks->empty())
141  {
142  task = tasks->front();
143  tasks->pop();
144  }
145  m_mutex->unlock();
146  return task;
147 }
148 Task* TaskPool::getPTask() {
149  m_mutex->lock();
150  int currpri = 0;
151  Task *task = NULL;
152  list<Task*>::iterator iter, iter1;
153  for (iter = ptasks->begin(); iter != ptasks->end(); ++iter) {
154  if ((*iter)->priority > currpri) {
155  task = *iter;
156  iter1 = iter;
157  currpri = task->priority;
158  }
159  }
160  if(task!=NULL)ptasks->remove(task);
161  m_mutex->unlock();
162  return task;
163 }
164 bool TaskPool::tasksPending() {
165  m_mutex->lock();
166  bool tp = !tasks->empty();
167  tp |= !scheduledtasks->empty();
168  m_mutex->unlock();
169  return tp;
170 }
171 bool TaskPool::tasksPPending() {
172  m_mutex->lock();
173  bool tp = !ptasks->empty();
174  tp |= !scheduledtasks->empty();
175  m_mutex->unlock();
176  return tp;
177 }
178 TaskPool::~TaskPool() {
179  m_mutex->lock();
180  runFlag = false;
181  m_mutex->unlock();
182  m_mutex->lock();
183  bool fl = this->complete;
184  m_mutex->unlock();
185  while(!fl)
186  {
187  m_mutex->lock();
188  fl = this->complete;
189  m_mutex->unlock();
190  Thread::mSleep(1);
191  }
192  delete mthread;
193  delete tasks;
194  delete ptasks;
195  delete scheduledtasks;
196  delete scheduledTimers;
197  delete m_mutex;
198 }