23 #include "MessageHandler.h"
26 MessageHandler::MessageHandler(
string path)
28 logger = Logger::getLogger(
"MessageHandler");
33 Message MessageHandler::readMessageFromQ(
string fileName,
bool erase)
36 ifstream::pos_type fileSize;
37 char *fileContents, *remcontents;
38 file.open(fileName.c_str(), ios::in | ios::binary | ios::ate);
41 fileContents =
new char[4];
42 file.seekg(0, ios::beg);
43 if(!file.read(fileContents, 4))
45 _mess_instance->logger <<
"Failed to readMessageFromQ" << endl;
47 int len = AMEFResources::charArrayToInt(fileContents);
48 fileContents =
new char[len];
49 file.seekg(4, ios::beg);
50 if(!file.read(fileContents, len))
52 _mess_instance->logger <<
"Failed to readMessageFromQ" << endl;
56 fileSize = (int)file.tellg() - len - 4;
57 remcontents =
new char[fileSize];
58 file.seekg(4+len, ios::beg);
59 if(!file.read(remcontents, fileSize))
61 _mess_instance->logger <<
"Failed to readMessageFromQ" << endl;
66 string f(fileContents);
70 m.setTimestamp(obj->getPackets().at(0)->getValue());
71 m.setType(obj->getPackets().at(1)->getValue());
72 m.setPriority(obj->getPackets().at(2)->getValue());
73 m.setUserId(obj->getPackets().at(3)->getValue());
74 m.setEncoding(obj->getPackets().at(4)->getValue());
75 m.setBody(obj->getPackets().at(5)->getValue());
76 m.getDestination().setName(obj->getPackets().at(5)->getValue());
77 m.getDestination().setType(obj->getPackets().at(5)->getValue());
78 delete[] fileContents;
82 myfile.open(fileName.c_str(), ios::binary | ios::trunc);
83 myfile << remcontents;
90 void MessageHandler::writeMessageToQ(
Message msg,
string fileName)
100 ob.
addPacket(msg.getDestination().getName());
101 ob.
addPacket(msg.getDestination().getType());
103 myfile.open(fileName.c_str(), ios::binary | ios::app);
104 myfile << enc.encodeB(&ob,
true);
108 bool MessageHandler::tempUnSubscribe(
string subs,
string fileName)
112 myfile1.open(fileName.c_str());
113 if (myfile1.is_open())
115 while(getline(myfile1,subscribers))
123 myfile.open(fileName.c_str());
124 string rep = subs +
":";
125 StringUtil::replaceFirst(subscribers,rep,
"");
126 myfile.write(subscribers.c_str(),subscribers.length());
128 if(subscribers.find(
":")!=string::npos)
134 Message MessageHandler::readMessageFromT(
string fileName,
string subs)
136 bool done = tempUnSubscribe(subs,fileName+
":SubslistTemp");
137 Message msg = readMessageFromQ(fileName, done);
141 void MessageHandler::writeMessageToT(
Message msg,
string fileName)
143 writeMessageToQ(msg ,fileName);
146 void MessageHandler::subscribe(
string subs,
string fileName)
149 myfile1.open(fileName.c_str());
151 if (myfile1.is_open())
153 while(getline(myfile1,subscribers))
161 myfile.open(fileName.c_str());
162 subscribers += (subs +
":");
163 myfile.write(subscribers.c_str(),subscribers.length());
166 myfile.open(fileName.c_str());
167 myfile.write(subscribers.c_str(),subscribers.length());
171 void MessageHandler::unSubscribe(
string subs,
string fileName)
175 myfile1.open(fileName.c_str());
176 if (myfile1.is_open())
178 while(getline(myfile1,subscribers))
186 myfile.open(fileName.c_str());
187 string rep = subs +
":";
188 StringUtil::replaceFirst(subscribers,rep,
"");
189 myfile.write(subscribers.c_str(),subscribers.length());
192 myfile.open(fileName.c_str());
193 myfile.write(subscribers.c_str(),subscribers.length());
197 void* MessageHandler::service(
void* arg)
203 _mess_instance->getServer().Receive(fd,results,1024);
212 results = results.substr(0,results.find_last_of(
">")+1);
213 _mess_instance->logger << results << flush;
215 if(results.find(
"<")!=string::npos && results.find(
">")!=string::npos)
217 string h =
"Received Message----";
222 string fileName = _mess_instance->path+msg.getDestination().getName()+
":"+msg.getDestination().getType();
223 if(msg.getDestination().getType()==
"Queue")
224 _mess_instance->writeMessageToQ(msg,fileName);
225 else if(msg.getDestination().getType()==
"Topic")
226 _mess_instance->writeMessageToT(msg,fileName);
230 _mess_instance->logger << e->what() << flush;
232 _mess_instance->getServer().Send(fd,h);
235 _mess_instance->logger << h << flush;
237 else if(results.find(
"GET FROM ")!=string::npos)
240 if(results.find(
"Queue")!=string::npos)
242 StringUtil::replaceFirst(results,
"GET FROM ",_mess_instance->path);
243 msg = _mess_instance->readMessageFromQ(results,
true);
245 else if(results.find(
"Topic")!=string::npos)
247 string subs = results.substr(results.find(
"-")+1);
248 string te =
"-" + subs;
249 StringUtil::replaceFirst(results,te,
"");
250 StringUtil::replaceFirst(results,
"GET FROM ",_mess_instance->path);
251 msg = _mess_instance->readMessageFromT(results,subs);
254 if(results.find(
"Queue")!=string::npos || results.find(
"Topic")!=string::npos)
257 _mess_instance->logger << h << flush;
260 h =
"Improper Destination";
261 _mess_instance->getServer().Send(fd,h);
265 else if(results.find(
"SUBSCRIBE ")!=string::npos)
267 int len = results.find(
"TO") - results.find(
"SUBSCRIBE ") - 11;
268 string subs = results.substr(results.find(
"SUBSCRIBE ")+10,len);
269 results = results.substr(results.find(
"TO ")+3);
270 results = (_mess_instance->path+results+
":Subslist");
271 _mess_instance->subscribe(subs,results);
272 string h =
"Subscribed";
273 _mess_instance->getServer().Send(fd,h);
277 else if(results.find(
"UNSUBSCRIBE ")!=string::npos)
279 int len = results.find(
"TO") - results.find(
"UNSUBSCRIBE ") - 12;
280 string subs = results.substr(results.find(
"UNSUBSCRIBE ")+11,len);
281 results = results.substr(results.find(
"TO ")+3);
282 results = (_mess_instance->path+results+
":Subslist");
283 _mess_instance->subscribe(subs,results);
284 string h =
"Unsubscribed";
285 _mess_instance->getServer().Send(fd,h);
289 memset(&buf[0], 0,
sizeof(buf));
295 void MessageHandler::init(
string path)
297 if(_mess_instance==NULL)
300 _mess_instance->running =
false;
304 void MessageHandler::trigger(
string port,
string path)
307 if(_mess_instance->running)
309 Server serv(port,
false,500,&service,2);
310 _mess_instance->server = serv;
311 _mess_instance->running =
true;
315 void MessageHandler::stop()
317 _mess_instance->server.stop();