ffead.server.doc
MessageUtil.cpp
1 /*
2  Copyright 2009-2012, Sumeet Chhetri
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 /*
17  * MessageUtil.cpp
18  *
19  * Created on: Oct 1, 2009
20  * Author: sumeet
21  */
22 
23 #include "MessageUtil.h"
24 
25 
26 void *get_in_addr(struct sockaddr *sa)
27 {
28  if (sa->sa_family == AF_INET) {
29  return &(((struct sockaddr_in*)sa)->sin_addr);
30  }
31 
32  return &(((struct sockaddr_in6*)sa)->sin6_addr);
33 }
34 
35 string MessageUtil::getSubscriber() const
36 {
37  return this->subscriber;
38 }
39 
40 void MessageUtil::setSubscriber(string subscriber)
41 {
42  this->subscriber = subscriber;
43 }
44 
45 MessageUtil::MessageUtil(string file)
46 {
47  logger = Logger::getLogger("MessageUtil");
48  XmlParser parser("Parser");
49  Document doc = parser.getDocument(file);
50  Element msgng = doc.getRootElement();
51  Element dest = msgng.getElementByName("destination");
52  Element url = msgng.getElementByName("url");
53  vector<string> vec;
54  string h = url.getText();
55  StringUtil::split(vec, h , (":"));
56  this->host = vec.at(0);
57  this->port = vec.at(1);
58  this->destination.setName(dest.getAttribute("name"));
59  this->destination.setType(dest.getAttribute("type"));
60 }
61 
62 MessageUtil::~MessageUtil()
63 {
64 
65 }
66 
67 bool MessageUtil::sendMessage(Message msg)
68 {
69  int sockfd, numbytes;
70  char buf[100];
71  struct addrinfo hints, *servinfo, *p;
72  int rv;
73  char s[INET6_ADDRSTRLEN];
74 
75  memset(&hints, 0, sizeof hints);
76  hints.ai_family = AF_UNSPEC;
77  hints.ai_socktype = SOCK_STREAM;
78 
79  if ((rv = getaddrinfo(this->host.c_str(), this->port.c_str(), &hints, &servinfo)) != 0) {
80  fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
81  return 1;
82  }
83 
84  // loop through all the results and connect to the first we can
85  for(p = servinfo; p != NULL; p = p->ai_next) {
86  if ((sockfd = socket(p->ai_family, p->ai_socktype,
87  p->ai_protocol)) == -1) {
88  perror("client: socket");
89  continue;
90  }
91 
92  if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
93  close(sockfd);
94  perror("client: connect");
95  continue;
96  }
97 
98  break;
99  }
100 
101  if (p == NULL) {
102  fprintf(stderr, "client: failed to connect\n");
103  return 2;
104  }
105 
106  inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr),
107  s, sizeof s);
108  printf("client: connecting to %s\n", s);
109 
110  freeaddrinfo(servinfo); // all done with this structure
111 
112  string h = msg.toXml();
113  bool flag = true;
114  if (send(sockfd, h.c_str(), h.length(), 0) == -1)
115  {
116  logger << "send failed" << flush;
117  flag = false;
118  }
119  logger << h << flush;
120 
121  if ((numbytes = recv(sockfd, buf, 99, 0)) == -1) {
122  perror("recv");
123  exit(1);
124  }
125 
126  buf[numbytes] = '\0';
127  printf("client: received '%s'\n",buf);
128  close(sockfd);
129  memset(&buf[0], 0, sizeof(buf));
130  return flag;
131 }
132 
133 Message MessageUtil::receiveMessage()
134 {
135  int sockfd, numbytes;
136  char buf[1024];
137  struct addrinfo hints, *servinfo, *p;
138  int rv;
139  char s[INET6_ADDRSTRLEN];
140 
141  memset(&hints, 0, sizeof hints);
142  hints.ai_family = AF_UNSPEC;
143  hints.ai_socktype = SOCK_STREAM;
144 
145  if ((rv = getaddrinfo(this->host.c_str(), this->port.c_str(), &hints, &servinfo)) != 0) {
146  fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
147  }
148 
149  // loop through all the results and connect to the first we can
150  for(p = servinfo; p != NULL; p = p->ai_next) {
151  if ((sockfd = socket(p->ai_family, p->ai_socktype,
152  p->ai_protocol)) == -1) {
153  perror("client: socket");
154  continue;
155  }
156 
157  if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
158  close(sockfd);
159  perror("client: connect");
160  continue;
161  }
162 
163  break;
164  }
165 
166  if (p == NULL) {
167  fprintf(stderr, "client: failed to connect\n");
168  }
169 
170  inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr),
171  s, sizeof s);
172  printf("client: connecting to %s\n", s);
173 
174  freeaddrinfo(servinfo); // all done with this structure
175 
176  string h = "GET FROM "+this->destination.getName()+":"+this->destination.getType();
177  if(this->destination.getType()=="Topic")
178  h += ("-" + this->getSubscriber());
179  //bool flag = true;
180  if (send(sockfd, h.c_str(), h.length(), 0) == -1)
181  {
182  logger << "send failed" << flush;
183  //flag = false;
184  }
185  logger << h << flush;
186 
187  if ((numbytes = recv(sockfd, buf, 1024, 0)) == -1) {
188  perror("recv");
189  exit(1);
190  }
191  string temp,results;
192  stringstream ss;
193  ss << buf;
194  while(getline(ss,temp))
195  {
196  logger << temp << flush;
197  results += temp;
198  }
199  logger << "\n\nclient: received" << flush;
200  logger << results << flush;
201  Message msg(results);
202  buf[numbytes] = '\0';
203 
204  close(sockfd);
205  memset(&buf[0], 0, sizeof(buf));
206  return msg;
207 }
208 
209 Destination MessageUtil::getDestination()
210 {
211  return this->destination;
212 }
213 
214 
215 void MessageUtil::subscribe(string subs)
216 {
217  if(this->destination.getType()=="Topic")
218  {
219  int sockfd, numbytes;
220  char buf[1024];
221  struct addrinfo hints, *servinfo, *p;
222  int rv;
223  char s[INET6_ADDRSTRLEN];
224 
225  memset(&hints, 0, sizeof hints);
226  hints.ai_family = AF_UNSPEC;
227  hints.ai_socktype = SOCK_STREAM;
228 
229  if ((rv = getaddrinfo(this->host.c_str(), this->port.c_str(), &hints, &servinfo)) != 0) {
230  fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
231  }
232 
233  // loop through all the results and connect to the first we can
234  for(p = servinfo; p != NULL; p = p->ai_next) {
235  if ((sockfd = socket(p->ai_family, p->ai_socktype,
236  p->ai_protocol)) == -1) {
237  perror("client: socket");
238  continue;
239  }
240 
241  if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
242  close(sockfd);
243  perror("client: connect");
244  continue;
245  }
246 
247  break;
248  }
249 
250  if (p == NULL) {
251  fprintf(stderr, "client: failed to connect\n");
252  }
253 
254  inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr),
255  s, sizeof s);
256  printf("client: connecting to %s\n", s);
257 
258  freeaddrinfo(servinfo); // all done with this structure
259 
260  string h = "SUBSCRIBE "+subs+" TO "+this->destination.getName()+":"+this->destination.getType();
261  //bool flag = true;
262  if (send(sockfd, h.c_str(), h.length(), 0) == -1)
263  {
264  logger << "send failed" << flush;
265  //flag = false;
266  }
267  logger << h << flush;
268 
269  if ((numbytes = recv(sockfd, buf, 1024, 0)) == -1) {
270  perror("recv");
271  exit(1);
272  }
273  string temp,results;
274  stringstream ss;
275  ss << buf;
276  while(getline(ss,temp))
277  {
278  logger << temp << flush;
279  results += temp;
280  }
281  logger << "\n\nclient: received" << flush;
282  logger << results << flush;
283  buf[numbytes] = '\0';
284  close(sockfd);
285  memset(&buf[0], 0, sizeof(buf));
286  }
287 }
288 
289 void MessageUtil::unSubscribe(string subs)
290 {
291  if(this->destination.getType()=="Topic")
292  {
293  int sockfd, numbytes;
294  char buf[1024];
295  struct addrinfo hints, *servinfo, *p;
296  int rv;
297  char s[INET6_ADDRSTRLEN];
298 
299  memset(&hints, 0, sizeof hints);
300  hints.ai_family = AF_UNSPEC;
301  hints.ai_socktype = SOCK_STREAM;
302 
303  if ((rv = getaddrinfo(this->host.c_str(), this->port.c_str(), &hints, &servinfo)) != 0) {
304  fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
305  }
306 
307  // loop through all the results and connect to the first we can
308  for(p = servinfo; p != NULL; p = p->ai_next) {
309  if ((sockfd = socket(p->ai_family, p->ai_socktype,
310  p->ai_protocol)) == -1) {
311  perror("client: socket");
312  continue;
313  }
314 
315  if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
316  close(sockfd);
317  perror("client: connect");
318  continue;
319  }
320 
321  break;
322  }
323 
324  if (p == NULL) {
325  fprintf(stderr, "client: failed to connect\n");
326  }
327 
328  inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr),
329  s, sizeof s);
330  printf("client: connecting to %s\n", s);
331 
332  freeaddrinfo(servinfo); // all done with this structure
333 
334  string h = "UNSUBSCRIBE "+subs+" TO "+this->destination.getName()+":"+this->destination.getType();
335  //bool flag = true;
336  if (send(sockfd, h.c_str(), h.length(), 0) == -1)
337  {
338  logger << "send failed" << flush;
339  //flag = false;
340  }
341  logger << h << flush;
342 
343  if ((numbytes = recv(sockfd, buf, 1024, 0)) == -1) {
344  perror("recv");
345  exit(1);
346  }
347  string temp,results;
348  stringstream ss;
349  ss << buf;
350  while(getline(ss,temp))
351  {
352  logger << temp << flush;
353  results += temp;
354  }
355  logger << "\n\nclient: received" << flush;
356  logger << results << flush;
357  buf[numbytes] = '\0';
358 
359  close(sockfd);
360  memset(&buf[0], 0, sizeof(buf));
361  }
362 }