ffead.server.doc
MessageHandler.cpp
1 /*
2  Copyright 2009-2012, Sumeet Chhetri
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 /*
17  * MessageHandler.cpp
18  *
19  * Created on: Sep 27, 2009
20  * Author: sumeet
21  */
22 
23 #include "MessageHandler.h"
24 
25 using namespace std;
26 MessageHandler::MessageHandler(string path)
27 {
28  logger = Logger::getLogger("MessageHandler");
29  this->path = path;
30 }
31 MessageHandler* _mess_instance = NULL;
32 
33 Message MessageHandler::readMessageFromQ(string fileName, bool erase)
34 {
35  ifstream file;
36  ifstream::pos_type fileSize;
37  char *fileContents, *remcontents;
38  file.open(fileName.c_str(), ios::in | ios::binary | ios::ate);
39  if (file.is_open())
40  {
41  fileContents = new char[4];
42  file.seekg(0, ios::beg);
43  if(!file.read(fileContents, 4))
44  {
45  _mess_instance->logger << "Failed to readMessageFromQ" << endl;
46  }
47  int len = AMEFResources::charArrayToInt(fileContents);
48  fileContents = new char[len];
49  file.seekg(4, ios::beg);
50  if(!file.read(fileContents, len))
51  {
52  _mess_instance->logger << "Failed to readMessageFromQ" << endl;
53  }
54  if(erase)
55  {
56  fileSize = (int)file.tellg() - len - 4;
57  remcontents = new char[fileSize];
58  file.seekg(4+len, ios::beg);
59  if(!file.read(remcontents, fileSize))
60  {
61  _mess_instance->logger << "Failed to readMessageFromQ" << endl;
62  }
63  }
64  file.close();
65  }
66  string f(fileContents);
67  AMEFDecoder dec;
68  AMEFObject* obj = dec.decodeB(f, false, true);
69  Message m;
70  m.setTimestamp(obj->getPackets().at(0)->getValue());
71  m.setType(obj->getPackets().at(1)->getValue());
72  m.setPriority(obj->getPackets().at(2)->getValue());
73  m.setUserId(obj->getPackets().at(3)->getValue());
74  m.setEncoding(obj->getPackets().at(4)->getValue());
75  m.setBody(obj->getPackets().at(5)->getValue());
76  m.getDestination().setName(obj->getPackets().at(5)->getValue());
77  m.getDestination().setType(obj->getPackets().at(5)->getValue());
78  delete[] fileContents;
79  if(erase)
80  {
81  ofstream myfile;
82  myfile.open(fileName.c_str(), ios::binary | ios::trunc);
83  myfile << remcontents;
84  myfile.close();
85  delete[] remcontents;
86  }
87  return m;
88 }
89 
90 void MessageHandler::writeMessageToQ(Message msg,string fileName)
91 {
92  AMEFEncoder enc;
93  AMEFObject ob;
94  ob.addPacket(msg.getTimestamp());
95  ob.addPacket(msg.getType());
96  ob.addPacket(msg.getPriority());
97  ob.addPacket(msg.getUserId());
98  ob.addPacket(msg.getEncoding());
99  ob.addPacket(msg.getBody());
100  ob.addPacket(msg.getDestination().getName());
101  ob.addPacket(msg.getDestination().getType());
102  ofstream myfile;
103  myfile.open(fileName.c_str(), ios::binary | ios::app);
104  myfile << enc.encodeB(&ob, true);
105  myfile.close();
106 }
107 
108 bool MessageHandler::tempUnSubscribe(string subs,string fileName)
109 {
110  string subscribers;
111  ifstream myfile1;
112  myfile1.open(fileName.c_str());
113  if (myfile1.is_open())
114  {
115  while(getline(myfile1,subscribers))
116  {
117  //_mess_instance->logger << subscribers << flush;
118  break;
119  }
120  }
121  myfile1.close();
122  ofstream myfile;
123  myfile.open(fileName.c_str());
124  string rep = subs + ":";
125  StringUtil::replaceFirst(subscribers,rep,"");
126  myfile.write(subscribers.c_str(),subscribers.length());
127  myfile.close();
128  if(subscribers.find(":")!=string::npos)
129  return false;
130  else
131  return true;
132 }
133 
134 Message MessageHandler::readMessageFromT(string fileName,string subs)
135 {
136  bool done = tempUnSubscribe(subs,fileName+":SubslistTemp");
137  Message msg = readMessageFromQ(fileName, done);
138  return msg;
139 }
140 
141 void MessageHandler::writeMessageToT(Message msg,string fileName)
142 {
143  writeMessageToQ(msg ,fileName);
144 }
145 
146 void MessageHandler::subscribe(string subs,string fileName)
147 {
148  ifstream myfile1;
149  myfile1.open(fileName.c_str());
150  string subscribers;
151  if (myfile1.is_open())
152  {
153  while(getline(myfile1,subscribers))
154  {
155  //_mess_instance->logger << subscribers << flush;
156  break;
157  }
158  }
159  myfile1.close();
160  ofstream myfile;
161  myfile.open(fileName.c_str());
162  subscribers += (subs + ":");
163  myfile.write(subscribers.c_str(),subscribers.length());
164  myfile.close();
165  fileName += "Temp";
166  myfile.open(fileName.c_str());
167  myfile.write(subscribers.c_str(),subscribers.length());
168  myfile.close();
169 }
170 
171 void MessageHandler::unSubscribe(string subs,string fileName)
172 {
173  string subscribers;
174  ifstream myfile1;
175  myfile1.open(fileName.c_str());
176  if (myfile1.is_open())
177  {
178  while(getline(myfile1,subscribers))
179  {
180  //_mess_instance->logger << subscribers << flush;
181  break;
182  }
183  }
184  myfile1.close();
185  ofstream myfile;
186  myfile.open(fileName.c_str());
187  string rep = subs + ":";
188  StringUtil::replaceFirst(subscribers,rep,"");
189  myfile.write(subscribers.c_str(),subscribers.length());
190  myfile.close();
191  fileName += "Temp";
192  myfile.open(fileName.c_str());
193  myfile.write(subscribers.c_str(),subscribers.length());
194  myfile.close();
195 }
196 
197 void* MessageHandler::service(void* arg)
198 {
199  int fd = *(int*)arg;
200  char buf[MAXBUFLEN];
201  string results;
202  //int bytes = recv(fd, buf, sizeof buf, 0);
203  _mess_instance->getServer().Receive(fd,results,1024);
204  //string temp,results;
205  /*stringstream ss;
206  ss << buf;
207  while(getline(ss,temp))
208  {
209  _mess_instance->logger << temp << flush;
210  results.append(temp);
211  }*/
212  results = results.substr(0,results.find_last_of(">")+1);
213  _mess_instance->logger << results << flush;
214 
215  if(results.find("<")!=string::npos && results.find(">")!=string::npos)
216  {
217  string h = "Received Message----";
218  Cont test;
219  try
220  {
221  Message msg(results);
222  string fileName = _mess_instance->path+msg.getDestination().getName()+":"+msg.getDestination().getType();
223  if(msg.getDestination().getType()=="Queue")
224  _mess_instance->writeMessageToQ(msg,fileName);
225  else if(msg.getDestination().getType()=="Topic")
226  _mess_instance->writeMessageToT(msg,fileName);
227  }
228  catch(Exception *e)
229  {
230  _mess_instance->logger << e->what() << flush;
231  }
232  _mess_instance->getServer().Send(fd,h);
233  //if (send(fd,&h[0] , h.length(), 0) == -1)
234  // _mess_instance->logger << "send failed" << flush;
235  _mess_instance->logger << h << flush;
236  }
237  else if(results.find("GET FROM ")!=string::npos)
238  {
239  Message msg;
240  if(results.find("Queue")!=string::npos)
241  {
242  StringUtil::replaceFirst(results,"GET FROM ",_mess_instance->path);
243  msg = _mess_instance->readMessageFromQ(results, true);
244  }
245  else if(results.find("Topic")!=string::npos)
246  {
247  string subs = results.substr(results.find("-")+1);
248  string te = "-" + subs;
249  StringUtil::replaceFirst(results,te,"");
250  StringUtil::replaceFirst(results,"GET FROM ",_mess_instance->path);
251  msg = _mess_instance->readMessageFromT(results,subs);
252  }
253  string h;
254  if(results.find("Queue")!=string::npos || results.find("Topic")!=string::npos)
255  {
256  h = msg.toXml();
257  _mess_instance->logger << h << flush;
258  }
259  else
260  h = "Improper Destination";
261  _mess_instance->getServer().Send(fd,h);
262  //if (send(fd,&h[0] , h.length(), 0) == -1)
263  // _mess_instance->logger << "send failed" << flush;
264  }
265  else if(results.find("SUBSCRIBE ")!=string::npos)
266  {
267  int len = results.find("TO") - results.find("SUBSCRIBE ") - 11;
268  string subs = results.substr(results.find("SUBSCRIBE ")+10,len);
269  results = results.substr(results.find("TO ")+3);
270  results = (_mess_instance->path+results+":Subslist");
271  _mess_instance->subscribe(subs,results);
272  string h = "Subscribed";
273  _mess_instance->getServer().Send(fd,h);
274  //if (send(fd,&h[0] , h.length(), 0) == -1)
275  // _mess_instance->logger << "send failed" << flush;
276  }
277  else if(results.find("UNSUBSCRIBE ")!=string::npos)
278  {
279  int len = results.find("TO") - results.find("UNSUBSCRIBE ") - 12;
280  string subs = results.substr(results.find("UNSUBSCRIBE ")+11,len);
281  results = results.substr(results.find("TO ")+3);
282  results = (_mess_instance->path+results+":Subslist");
283  _mess_instance->subscribe(subs,results);
284  string h = "Unsubscribed";
285  _mess_instance->getServer().Send(fd,h);
286  //if (send(fd,&h[0] , h.length(), 0) == -1)
287  // _mess_instance->logger << "send failed" << flush;
288  }
289  memset(&buf[0], 0, sizeof(buf));
290  close(fd);
291  return NULL;
292 }
293 
294 
295 void MessageHandler::init(string path)
296 {
297  if(_mess_instance==NULL)
298  {
299  _mess_instance = new MessageHandler(path);
300  _mess_instance->running = false;
301  }
302 }
303 
304 void MessageHandler::trigger(string port,string path)
305 {
306  init(path);
307  if(_mess_instance->running)
308  return;
309  Server serv(port,false,500,&service,2);
310  _mess_instance->server = serv;
311  _mess_instance->running = true;
312  return;
313 }
314 
315 void MessageHandler::stop()
316 {
317  _mess_instance->server.stop();
318 }
319