23 #include "MessageUtil.h"
26 void *get_in_addr(
struct sockaddr *sa)
28 if (sa->sa_family == AF_INET) {
29 return &(((
struct sockaddr_in*)sa)->sin_addr);
32 return &(((
struct sockaddr_in6*)sa)->sin6_addr);
35 string MessageUtil::getSubscriber()
const
37 return this->subscriber;
40 void MessageUtil::setSubscriber(
string subscriber)
42 this->subscriber = subscriber;
45 MessageUtil::MessageUtil(
string file)
47 logger = Logger::getLogger(
"MessageUtil");
49 Document doc = parser.getDocument(file);
50 Element msgng = doc.getRootElement();
51 Element dest = msgng.getElementByName(
"destination");
52 Element url = msgng.getElementByName(
"url");
54 string h = url.getText();
55 StringUtil::split(vec, h , (
":"));
56 this->host = vec.at(0);
57 this->port = vec.at(1);
58 this->destination.setName(dest.getAttribute(
"name"));
59 this->destination.setType(dest.getAttribute(
"type"));
62 MessageUtil::~MessageUtil()
67 bool MessageUtil::sendMessage(
Message msg)
71 struct addrinfo hints, *servinfo, *p;
73 char s[INET6_ADDRSTRLEN];
75 memset(&hints, 0,
sizeof hints);
76 hints.ai_family = AF_UNSPEC;
77 hints.ai_socktype = SOCK_STREAM;
79 if ((rv = getaddrinfo(this->host.c_str(), this->port.c_str(), &hints, &servinfo)) != 0) {
80 fprintf(stderr,
"getaddrinfo: %s\n", gai_strerror(rv));
85 for(p = servinfo; p != NULL; p = p->ai_next) {
86 if ((sockfd = socket(p->ai_family, p->ai_socktype,
87 p->ai_protocol)) == -1) {
88 perror(
"client: socket");
92 if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
94 perror(
"client: connect");
102 fprintf(stderr,
"client: failed to connect\n");
106 inet_ntop(p->ai_family, get_in_addr((
struct sockaddr *)p->ai_addr),
108 printf(
"client: connecting to %s\n", s);
110 freeaddrinfo(servinfo);
112 string h = msg.toXml();
114 if (send(sockfd, h.c_str(), h.length(), 0) == -1)
116 logger <<
"send failed" << flush;
119 logger << h << flush;
121 if ((numbytes = recv(sockfd, buf, 99, 0)) == -1) {
126 buf[numbytes] =
'\0';
127 printf(
"client: received '%s'\n",buf);
129 memset(&buf[0], 0,
sizeof(buf));
133 Message MessageUtil::receiveMessage()
135 int sockfd, numbytes;
137 struct addrinfo hints, *servinfo, *p;
139 char s[INET6_ADDRSTRLEN];
141 memset(&hints, 0,
sizeof hints);
142 hints.ai_family = AF_UNSPEC;
143 hints.ai_socktype = SOCK_STREAM;
145 if ((rv = getaddrinfo(this->host.c_str(), this->port.c_str(), &hints, &servinfo)) != 0) {
146 fprintf(stderr,
"getaddrinfo: %s\n", gai_strerror(rv));
150 for(p = servinfo; p != NULL; p = p->ai_next) {
151 if ((sockfd = socket(p->ai_family, p->ai_socktype,
152 p->ai_protocol)) == -1) {
153 perror(
"client: socket");
157 if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
159 perror(
"client: connect");
167 fprintf(stderr,
"client: failed to connect\n");
170 inet_ntop(p->ai_family, get_in_addr((
struct sockaddr *)p->ai_addr),
172 printf(
"client: connecting to %s\n", s);
174 freeaddrinfo(servinfo);
176 string h =
"GET FROM "+this->destination.getName()+
":"+this->destination.getType();
177 if(this->destination.getType()==
"Topic")
178 h += (
"-" + this->getSubscriber());
180 if (send(sockfd, h.c_str(), h.length(), 0) == -1)
182 logger <<
"send failed" << flush;
185 logger << h << flush;
187 if ((numbytes = recv(sockfd, buf, 1024, 0)) == -1) {
194 while(getline(ss,temp))
196 logger << temp << flush;
199 logger <<
"\n\nclient: received" << flush;
200 logger << results << flush;
202 buf[numbytes] =
'\0';
205 memset(&buf[0], 0,
sizeof(buf));
211 return this->destination;
215 void MessageUtil::subscribe(
string subs)
217 if(this->destination.getType()==
"Topic")
219 int sockfd, numbytes;
221 struct addrinfo hints, *servinfo, *p;
223 char s[INET6_ADDRSTRLEN];
225 memset(&hints, 0,
sizeof hints);
226 hints.ai_family = AF_UNSPEC;
227 hints.ai_socktype = SOCK_STREAM;
229 if ((rv = getaddrinfo(this->host.c_str(), this->port.c_str(), &hints, &servinfo)) != 0) {
230 fprintf(stderr,
"getaddrinfo: %s\n", gai_strerror(rv));
234 for(p = servinfo; p != NULL; p = p->ai_next) {
235 if ((sockfd = socket(p->ai_family, p->ai_socktype,
236 p->ai_protocol)) == -1) {
237 perror(
"client: socket");
241 if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
243 perror(
"client: connect");
251 fprintf(stderr,
"client: failed to connect\n");
254 inet_ntop(p->ai_family, get_in_addr((
struct sockaddr *)p->ai_addr),
256 printf(
"client: connecting to %s\n", s);
258 freeaddrinfo(servinfo);
260 string h =
"SUBSCRIBE "+subs+
" TO "+this->destination.getName()+
":"+this->destination.getType();
262 if (send(sockfd, h.c_str(), h.length(), 0) == -1)
264 logger <<
"send failed" << flush;
267 logger << h << flush;
269 if ((numbytes = recv(sockfd, buf, 1024, 0)) == -1) {
276 while(getline(ss,temp))
278 logger << temp << flush;
281 logger <<
"\n\nclient: received" << flush;
282 logger << results << flush;
283 buf[numbytes] =
'\0';
285 memset(&buf[0], 0,
sizeof(buf));
289 void MessageUtil::unSubscribe(
string subs)
291 if(this->destination.getType()==
"Topic")
293 int sockfd, numbytes;
295 struct addrinfo hints, *servinfo, *p;
297 char s[INET6_ADDRSTRLEN];
299 memset(&hints, 0,
sizeof hints);
300 hints.ai_family = AF_UNSPEC;
301 hints.ai_socktype = SOCK_STREAM;
303 if ((rv = getaddrinfo(this->host.c_str(), this->port.c_str(), &hints, &servinfo)) != 0) {
304 fprintf(stderr,
"getaddrinfo: %s\n", gai_strerror(rv));
308 for(p = servinfo; p != NULL; p = p->ai_next) {
309 if ((sockfd = socket(p->ai_family, p->ai_socktype,
310 p->ai_protocol)) == -1) {
311 perror(
"client: socket");
315 if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
317 perror(
"client: connect");
325 fprintf(stderr,
"client: failed to connect\n");
328 inet_ntop(p->ai_family, get_in_addr((
struct sockaddr *)p->ai_addr),
330 printf(
"client: connecting to %s\n", s);
332 freeaddrinfo(servinfo);
334 string h =
"UNSUBSCRIBE "+subs+
" TO "+this->destination.getName()+
":"+this->destination.getType();
336 if (send(sockfd, h.c_str(), h.length(), 0) == -1)
338 logger <<
"send failed" << flush;
341 logger << h << flush;
343 if ((numbytes = recv(sockfd, buf, 1024, 0)) == -1) {
350 while(getline(ss,temp))
352 logger << temp << flush;
355 logger <<
"\n\nclient: received" << flush;
356 logger << results << flush;
357 buf[numbytes] =
'\0';
360 memset(&buf[0], 0,
sizeof(buf));