1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.io.IOException;
18 import java.security.SecureRandom;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import javax.servlet.ServletContext;
31 import javax.servlet.http.HttpServletRequest;
32
33 import org.cometd.Bayeux;
34 import org.cometd.BayeuxListener;
35 import org.cometd.Channel;
36 import org.cometd.ChannelBayeuxListener;
37 import org.cometd.Client;
38 import org.cometd.ClientBayeuxListener;
39 import org.cometd.Extension;
40 import org.cometd.Message;
41 import org.cometd.SecurityPolicy;
42 import org.mortbay.util.LazyList;
43 import org.mortbay.util.ajax.JSON;
44
45
46
47
48
49
50
51 public abstract class AbstractBayeux extends MessagePool implements Bayeux
52 {
53 public static final ChannelId META_ID=new ChannelId(META);
54 public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT);
55 public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT);
56 public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT);
57 public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE);
58 public static final ChannelId META_PING_ID=new ChannelId(META_PING);
59 public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS);
60 public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE);
61 public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE);
62
63 private final HashMap<String,Handler> _handlers=new HashMap<String,Handler>();
64 private final ChannelImpl _root=new ChannelImpl("/",this);
65 private final ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>();
66 protected final ConcurrentHashMap<String,ChannelId> _channelIdCache=new ConcurrentHashMap<String,ChannelId>();
67 protected final ConcurrentHashMap<String,List<String>> _browser2client=new ConcurrentHashMap<String,List<String>>();
68 protected final ThreadLocal<HttpServletRequest> _request=new ThreadLocal<HttpServletRequest>();
69 protected final List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>();
70 protected final List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>();
71 protected final Handler _publishHandler;
72 protected final Handler _metaPublishHandler;
73
74 protected SecurityPolicy _securityPolicy=new DefaultPolicy();
75 protected JSON.Literal _advice;
76 protected JSON.Literal _multiFrameAdvice;
77 protected int _adviceVersion=0;
78 protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}");
79 protected int _logLevel;
80 protected long _timeout=30000;
81 protected long _interval=0;
82 protected long _maxInterval=10000;
83 protected boolean _initialized;
84 protected int _multiFrameInterval=-1;
85
86 protected boolean _requestAvailable;
87
88 transient ServletContext _context;
89 transient Random _random;
90 protected int _maxClientQueue=-1;
91
92 protected Extension[] _extensions;
93 protected JSON.Literal _transports=new JSON.Literal("[\"" + Bayeux.TRANSPORT_LONG_POLL + "\",\"" + Bayeux.TRANSPORT_CALLBACK_POLL + "\"]");
94 protected JSON.Literal _replyExt=new JSON.Literal("{\"ack\":\"true\"}");
95
96 protected int _maxLazyLatency=5000;
97
98
99
100
101
102 protected AbstractBayeux()
103 {
104 _publishHandler=new PublishHandler();
105 _metaPublishHandler=new MetaPublishHandler();
106 _handlers.put(META_HANDSHAKE,new HandshakeHandler());
107 _handlers.put(META_CONNECT,new ConnectHandler());
108 _handlers.put(META_DISCONNECT,new DisconnectHandler());
109 _handlers.put(META_SUBSCRIBE,new SubscribeHandler());
110 _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler());
111 _handlers.put(META_PING,new PingHandler());
112
113 setTimeout(getTimeout());
114 }
115
116
117 public void addExtension(Extension ext)
118 {
119 _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
120 }
121
122
123
124
125
126
127 public ChannelImpl getChannel(ChannelId id)
128 {
129 return _root.getChild(id);
130 }
131
132
133 public ChannelImpl getChannel(String id)
134 {
135 ChannelId cid=getChannelId(id);
136 if (cid.depth() == 0)
137 return null;
138 return _root.getChild(cid);
139 }
140
141
142 public Channel getChannel(String id, boolean create)
143 {
144 ChannelImpl channel=getChannel(id);
145
146 if (channel == null && create)
147 {
148 channel=new ChannelImpl(id,this);
149 Channel added =_root.addChild(channel);
150 if (added!=channel)
151 return added;
152 if (isLogInfo())
153 logInfo("newChannel: " + channel);
154 }
155 return channel;
156 }
157
158
159 public ChannelId getChannelId(String id)
160 {
161 ChannelId cid=_channelIdCache.get(id);
162 if (cid == null)
163 {
164
165 cid=new ChannelId(id);
166 ChannelId other=_channelIdCache.putIfAbsent(id,cid);
167 if (other!=null)
168 return other;
169 }
170 return cid;
171 }
172
173
174
175
176
177
178
179 public Client getClient(String client_id)
180 {
181 if (client_id == null)
182 return null;
183 Client client=_clients.get(client_id);
184 return client;
185 }
186
187
188 public Set<String> getClientIDs()
189 {
190 return _clients.keySet();
191 }
192
193
194
195
196
197
198 public long getMaxInterval()
199 {
200 return _maxInterval;
201 }
202
203
204
205
206
207 public int getLogLevel()
208 {
209 return _logLevel;
210 }
211
212
213
214
215
216
217
218 public SecurityPolicy getSecurityPolicy()
219 {
220 return _securityPolicy;
221 }
222
223
224 public long getTimeout()
225 {
226 return _timeout;
227 }
228
229
230 public long getInterval()
231 {
232 return _interval;
233 }
234
235
236
237
238
239
240
241 public boolean isDirectDeliver()
242 {
243 return false;
244 }
245
246
247
248
249
250
251
252
253
254 public void setDirectDeliver(boolean directDeliver)
255 {
256 _context.log("directDeliver is deprecated");
257 }
258
259
260
261
262
263
264
265
266
267
268
269
270
271 public String handle(ClientImpl client, Transport transport, Message message) throws IOException
272 {
273 String channel_id=message.getChannel();
274
275 Handler handler=(Handler)_handlers.get(channel_id);
276 if (handler != null)
277 {
278 message=extendRcvMeta(client,message);
279 handler.handle(client,transport,message);
280 _metaPublishHandler.handle(client,transport,message);
281 }
282 else if (channel_id.startsWith(META_SLASH))
283 {
284 message=extendRcvMeta(client,message);
285 _metaPublishHandler.handle(client,transport,message);
286 }
287 else
288 {
289
290 handler=_publishHandler;
291 message=extendRcv(client,message);
292 handler.handle(client,transport,message);
293 }
294
295 return channel_id;
296 }
297
298
299 public boolean hasChannel(String id)
300 {
301 ChannelId cid=getChannelId(id);
302 return _root.getChild(cid) != null;
303 }
304
305
306 public boolean isInitialized()
307 {
308 return _initialized;
309 }
310
311
312
313
314
315
316 public boolean isJSONCommented()
317 {
318 return false;
319 }
320
321
322 public boolean isLogDebug()
323 {
324 return _logLevel > 1;
325 }
326
327
328 public boolean isLogInfo()
329 {
330 return _logLevel > 0;
331 }
332
333
334 public void logDebug(String message)
335 {
336 if (_logLevel > 1)
337 _context.log(message);
338 }
339
340
341 public void logDebug(String message, Throwable th)
342 {
343 if (_logLevel > 1)
344 _context.log(message,th);
345 }
346
347
348 public void logWarn(String message, Throwable th)
349 {
350 _context.log(message + ": " + th.toString());
351 }
352
353
354 public void logWarn(String message)
355 {
356 _context.log(message);
357 }
358
359
360 public void logInfo(String message)
361 {
362 if (_logLevel > 0)
363 _context.log(message);
364 }
365
366
367 public Client newClient(String idPrefix)
368 {
369 ClientImpl client=new ClientImpl(this,idPrefix);
370 return client;
371 }
372
373
374 public abstract ClientImpl newRemoteClient();
375
376
377
378
379
380
381
382
383
384
385
386 public Transport newTransport(ClientImpl client, Map<?,?> message)
387 {
388 if (isLogDebug())
389 logDebug("newTransport: client=" + client + ",message=" + message);
390
391 Transport result;
392
393 String type = client == null ? null : client.getConnectionType();
394 if (type == null)
395 {
396
397 type = (String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
398 }
399 if (type == null)
400 {
401
402 Object types = message.get(Bayeux.SUPPORTED_CONNECTION_TYPES_FIELD);
403 if (types != null)
404 {
405 List supportedTypes;
406 if (types instanceof Object[]) supportedTypes = Arrays.asList((Object[])types);
407 else if (types instanceof List) supportedTypes = (List)types;
408 else if (types instanceof Map) supportedTypes = new ArrayList(((Map)types).values());
409 else supportedTypes = Collections.emptyList();
410
411 if (supportedTypes.contains(Bayeux.TRANSPORT_LONG_POLL)) type = Bayeux.TRANSPORT_LONG_POLL;
412 else if (supportedTypes.contains(Bayeux.TRANSPORT_CALLBACK_POLL)) type = Bayeux.TRANSPORT_CALLBACK_POLL;
413 }
414 }
415 if (type == null)
416 {
417
418 String jsonp = (String) message.get(Bayeux.JSONP_PARAMETER);
419 type = jsonp != null ? Bayeux.TRANSPORT_CALLBACK_POLL : Bayeux.TRANSPORT_LONG_POLL;
420 }
421
422 if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type))
423 {
424 String jsonp = (String)message.get(Bayeux.JSONP_PARAMETER);
425 if (jsonp == null) throw new IllegalArgumentException("Missing 'jsonp' field in message " + message + " for transport " + type);
426 result = new JSONPTransport(jsonp);
427 }
428 else if (Bayeux.TRANSPORT_LONG_POLL.equals(type))
429 {
430 result = new JSONTransport();
431 }
432 else
433 {
434 throw new IllegalArgumentException("Unsupported transport type " + type);
435 }
436
437 if (isLogDebug())
438 logDebug("newTransport: result="+result);
439
440 return result;
441 }
442
443
444
445
446
447
448
449
450
451
452
453 protected void doPublish(ChannelId to, Client from, Object data, String msgId, boolean lazy)
454 {
455 final MessageImpl message=newMessage();
456 message.put(CHANNEL_FIELD,to.toString());
457
458 if (msgId == null)
459 {
460 long id=message.hashCode() ^ (to == null?0:to.hashCode()) ^ (from == null?0:from.hashCode());
461 id=id < 0?-id:id;
462 message.put(ID_FIELD,Long.toString(id,36));
463 }
464 else
465 message.put(ID_FIELD,msgId);
466 message.put(DATA_FIELD,data);
467
468 message.setLazy(lazy);
469
470 final Message m=extendSendBayeux(from,message);
471
472 if (m != null)
473 _root.doDelivery(to,from,m);
474 if (m instanceof MessageImpl)
475 ((MessageImpl)m).decRef();
476 }
477
478
479 public boolean removeChannel(ChannelImpl channel)
480 {
481 return _root.doRemove(channel,_channelListeners);
482 }
483
484
485 protected void addChannel(ChannelImpl channel)
486 {
487 for (ChannelBayeuxListener l : _channelListeners)
488 l.channelAdded(channel);
489 }
490
491
492 protected String newClientId(long variation, String idPrefix)
493 {
494 if (idPrefix == null)
495 return Long.toString(getRandom(),36) + Long.toString(variation,36);
496 else
497 return idPrefix + "_" + Long.toString(getRandom(),36);
498 }
499
500
501 protected void addClient(ClientImpl client, String idPrefix)
502 {
503 while(true)
504 {
505 String id=newClientId(client.hashCode(),idPrefix);
506 client.setId(id);
507
508 ClientImpl other=_clients.putIfAbsent(id,client);
509 if (other == null)
510 {
511 for (ClientBayeuxListener l : _clientListeners)
512 l.clientAdded((Client)client);
513
514 return;
515 }
516 }
517 }
518
519
520
521
522
523
524
525 public Client removeClient(String client_id)
526 {
527 ClientImpl client;
528 if (client_id == null)
529 return null;
530 client=_clients.remove(client_id);
531 if (client != null)
532 {
533 for (ClientBayeuxListener l : _clientListeners)
534 l.clientRemoved((Client)client);
535 client.unsubscribeAll();
536 }
537 return client;
538 }
539
540
541
542
543
544
545
546 public void setMaxInterval(long ms)
547 {
548 _maxInterval=ms;
549 }
550
551
552
553
554
555 public void setJSONCommented(boolean commented)
556 {
557 if (commented)
558 _context.log("JSONCommented is deprecated");
559 }
560
561
562
563
564
565
566 public void setLogLevel(int logLevel)
567 {
568 _logLevel=logLevel;
569 }
570
571
572
573
574
575
576
577
578
579 public void setSecurityPolicy(SecurityPolicy securityPolicy)
580 {
581 _securityPolicy=securityPolicy;
582 }
583
584
585 public void setTimeout(long ms)
586 {
587 _timeout=ms;
588 generateAdvice();
589 }
590
591
592 public void setInterval(long ms)
593 {
594 _interval=ms;
595 generateAdvice();
596 }
597
598
599
600
601
602
603
604
605
606
607 public void setMultiFrameInterval(int multiFrameInterval)
608 {
609 _multiFrameInterval=multiFrameInterval;
610 generateAdvice();
611 }
612
613
614
615
616
617 public int getMultiFrameInterval()
618 {
619 return _multiFrameInterval;
620 }
621
622
623 void generateAdvice()
624 {
625 setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":" + getInterval() + ",\"timeout\":" + getTimeout() + "}"));
626 }
627
628
629 public void setAdvice(JSON.Literal advice)
630 {
631 synchronized(this)
632 {
633 _adviceVersion++;
634 _advice=advice;
635 _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice)));
636 }
637 }
638
639
640 private Map<String,Object> multiFrameAdvice(JSON.Literal advice)
641 {
642 Map<String,Object> a=(Map<String,Object>)JSON.parse(_advice.toString());
643 a.put("multiple-clients",Boolean.TRUE);
644 if (_multiFrameInterval > 0)
645 {
646 a.put("reconnect","retry");
647 a.put("interval",_multiFrameInterval);
648 }
649 else
650 a.put("reconnect","none");
651 return a;
652 }
653
654
655 public JSON.Literal getAdvice()
656 {
657 return _advice;
658 }
659
660
661
662
663
664
665 public boolean isRequestAvailable()
666 {
667 return _requestAvailable;
668 }
669
670
671
672
673
674
675
676 public void setRequestAvailable(boolean requestAvailable)
677 {
678 _requestAvailable=requestAvailable;
679 }
680
681
682
683
684
685
686 public HttpServletRequest getCurrentRequest()
687 {
688 return _request.get();
689 }
690
691
692
693
694
695
696 void setCurrentRequest(HttpServletRequest request)
697 {
698 _request.set(request);
699 }
700
701
702 public Collection<Channel> getChannels()
703 {
704 List<Channel> channels=new ArrayList<Channel>();
705 _root.getChannels(channels);
706 return channels;
707 }
708
709
710
711
712
713 public int getChannelCount()
714 {
715 return _root.getChannelCount();
716 }
717
718
719 public Collection<Client> getClients()
720 {
721 return new ArrayList<Client>(_clients.values());
722 }
723
724
725
726
727
728 public int getClientCount()
729 {
730 return _clients.size();
731 }
732
733
734 public boolean hasClient(String clientId)
735 {
736 if (clientId == null)
737 return false;
738 return _clients.containsKey(clientId);
739 }
740
741
742 public Channel removeChannel(String channelId)
743 {
744 Channel channel=getChannel(channelId);
745
746 boolean removed=false;
747 if (channel != null)
748 removed=channel.remove();
749
750 if (removed)
751 return channel;
752 else
753 return null;
754 }
755
756
757 protected void initialize(ServletContext context)
758 {
759 synchronized(this)
760 {
761 _initialized=true;
762 _context=context;
763 try
764 {
765 _random=SecureRandom.getInstance("SHA1PRNG");
766 }
767 catch(Exception e)
768 {
769 context.log("Could not get secure random for ID generation",e);
770 _random=new Random();
771 }
772 _random.setSeed(_random.nextLong() ^ hashCode() ^ System.nanoTime() ^ Runtime.getRuntime().freeMemory());
773
774 _root.addChild(new ServiceChannel(Bayeux.SERVICE));
775
776 }
777 }
778
779
780 long getRandom()
781 {
782 long l=_random.nextLong();
783 return l < 0?-l:l;
784 }
785
786
787 void clientOnBrowser(String browserId, String clientId)
788 {
789 List<String> clients=_browser2client.get(browserId);
790 if (clients == null)
791 {
792 List<String> new_clients=new CopyOnWriteArrayList<String>();
793 clients=_browser2client.putIfAbsent(browserId,new_clients);
794 if (clients == null)
795 clients=new_clients;
796 }
797 clients.add(clientId);
798 }
799
800
801 void clientOffBrowser(String browserId, String clientId)
802 {
803 List<String> clients=_browser2client.get(browserId);
804
805 if (clients != null)
806 clients.remove(clientId);
807 }
808
809
810 List<String> clientsOnBrowser(String browserId)
811 {
812 List<String> clients=_browser2client.get(browserId);
813
814 if (clients == null)
815 return Collections.emptyList();
816 return clients;
817 }
818
819
820 public void addListener(BayeuxListener listener)
821 {
822 if (listener instanceof ClientBayeuxListener)
823 _clientListeners.add((ClientBayeuxListener)listener);
824 if (listener instanceof ChannelBayeuxListener)
825 _channelListeners.add((ChannelBayeuxListener)listener);
826 }
827
828
829 public int getMaxClientQueue()
830 {
831 return _maxClientQueue;
832 }
833
834
835 public void setMaxClientQueue(int size)
836 {
837 _maxClientQueue=size;
838 }
839
840
841 protected Message extendRcv(ClientImpl from, Message message)
842 {
843 if (_extensions != null)
844 {
845 for (int i=_extensions.length; message != null && i-- > 0;)
846 message=_extensions[i].rcv(from,message);
847 }
848
849 if (from != null)
850 {
851 Extension[] client_exs=from.getExtensions();
852 if (client_exs != null)
853 {
854 for (int i=client_exs.length; message != null && i-- > 0;)
855 message=client_exs[i].rcv(from,message);
856 }
857 }
858
859 return message;
860 }
861
862
863 protected Message extendRcvMeta(ClientImpl from, Message message)
864 {
865 if (_extensions != null)
866 {
867 for (int i=_extensions.length; message != null && i-- > 0;)
868 message=_extensions[i].rcvMeta(from,message);
869 }
870
871 if (from != null)
872 {
873 Extension[] client_exs=from.getExtensions();
874 if (client_exs != null)
875 {
876 for (int i=client_exs.length; message != null && i-- > 0;)
877 message=client_exs[i].rcvMeta(from,message);
878 }
879 }
880 return message;
881 }
882
883
884 protected Message extendSendBayeux(Client from, Message message)
885 {
886 if (_extensions != null)
887 {
888 for (int i=0; message != null && i < _extensions.length; i++)
889 {
890 message=_extensions[i].send(from,message);
891 }
892 }
893
894 return message;
895 }
896
897
898 public Message extendSendClient(Client from, ClientImpl to, Message message)
899 {
900 if (to != null)
901 {
902 Extension[] client_exs=to.getExtensions();
903 if (client_exs != null)
904 {
905 for (int i=0; message != null && i < client_exs.length; i++)
906 message=client_exs[i].send(from,message);
907 }
908 }
909
910 return message;
911 }
912
913
914 public Message extendSendMeta(ClientImpl from, Message message)
915 {
916 if (_extensions != null)
917 {
918 for (int i=0; message != null && i < _extensions.length; i++)
919 message=_extensions[i].sendMeta(from,message);
920 }
921
922 if (from != null)
923 {
924 Extension[] client_exs=from.getExtensions();
925 if (client_exs != null)
926 {
927 for (int i=0; message != null && i < client_exs.length; i++)
928 message=client_exs[i].sendMeta(from,message);
929 }
930 }
931
932 return message;
933 }
934
935
936
937
938
939
940 public int getMaxLazyLatency()
941 {
942 return _maxLazyLatency;
943 }
944
945
946
947
948
949
950 public void setMaxLazyLatency(int ms)
951 {
952 _maxLazyLatency = ms;
953 }
954
955
956
957 public static class DefaultPolicy implements SecurityPolicy
958 {
959 public boolean canHandshake(Message message)
960 {
961 return true;
962 }
963
964 public boolean canCreate(Client client, String channel, Message message)
965 {
966 return client != null && !channel.startsWith(Bayeux.META_SLASH);
967 }
968
969 public boolean canSubscribe(Client client, String channel, Message message)
970 {
971 if (client != null && ("/**".equals(channel) || "/*".equals(channel)))
972 return false;
973 return client != null && !channel.startsWith(Bayeux.META_SLASH);
974 }
975
976 public boolean canPublish(Client client, String channel, Message message)
977 {
978 return client != null || client == null && Bayeux.META_HANDSHAKE.equals(channel);
979 }
980
981 }
982
983
984
985 protected abstract class Handler
986 {
987 abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException;
988
989 abstract ChannelId getMetaChannelId();
990
991 void unknownClient(Transport transport, String channel) throws IOException
992 {
993 MessageImpl reply=newMessage();
994
995 reply.put(CHANNEL_FIELD,channel);
996 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
997 reply.put(ERROR_FIELD,"402::Unknown client");
998 reply.put("advice",_handshakeAdvice);
999 transport.send(reply);
1000 reply.decRef();
1001 }
1002
1003 void sendMetaReply(final ClientImpl client, Message reply, final Transport transport) throws IOException
1004 {
1005 reply=extendSendMeta(client,reply);
1006 if (reply != null)
1007 {
1008 transport.send(reply);
1009 if (reply instanceof MessageImpl)
1010 ((MessageImpl)reply).decRef();
1011 }
1012 }
1013 }
1014
1015
1016
1017 protected class ConnectHandler extends Handler
1018 {
1019 protected String _metaChannel=META_CONNECT;
1020
1021 @Override
1022 ChannelId getMetaChannelId()
1023 {
1024 return META_CONNECT_ID;
1025 }
1026
1027 @Override
1028 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1029 {
1030 if (client == null)
1031 {
1032 unknownClient(transport,_metaChannel);
1033 return;
1034 }
1035
1036
1037 String type=client.getConnectionType();
1038 boolean polling=true;
1039 if (type == null)
1040 {
1041 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
1042 client.setConnectionType(type);
1043 polling=false;
1044 }
1045
1046 Object advice=message.get(ADVICE_FIELD);
1047 if (advice != null)
1048 {
1049 Long timeout=(Long)((Map)advice).get("timeout");
1050 if (timeout != null && timeout.longValue() > 0)
1051 client.setTimeout(timeout.longValue());
1052 else
1053 client.setTimeout(0);
1054
1055 Long interval=(Long)((Map)advice).get("interval");
1056 if (interval != null && interval.longValue() > 0)
1057 client.setInterval(interval.longValue());
1058 else
1059 client.setInterval(0);
1060 }
1061 else
1062 {
1063 client.setTimeout(0);
1064 client.setInterval(0);
1065 }
1066
1067 advice=null;
1068
1069
1070 if (polling && _multiFrameInterval > 0 && client.getBrowserId() != null)
1071 {
1072 List<String> clients=clientsOnBrowser(client.getBrowserId());
1073 int count=clients.size();
1074 if (count > 1)
1075 {
1076 polling=clients.get(0).equals(client.getId());
1077 advice=client.getAdvice();
1078 if (advice == null)
1079 advice=_multiFrameAdvice;
1080 else
1081
1082 advice=multiFrameAdvice((JSON.Literal)advice);
1083 }
1084 }
1085
1086 synchronized(this)
1087 {
1088 if (advice == null)
1089 {
1090 if (_adviceVersion != client._adviseVersion)
1091 {
1092 advice=_advice;
1093 client._adviseVersion=_adviceVersion;
1094 }
1095 }
1096 else
1097 client._adviseVersion=-1;
1098 }
1099
1100
1101 String id=message.getId();
1102
1103 Message reply=newMessage(message);
1104
1105 reply.put(CHANNEL_FIELD,META_CONNECT);
1106 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1107 if (advice != null)
1108 reply.put(ADVICE_FIELD,advice);
1109 if (id != null)
1110 reply.put(ID_FIELD,id);
1111
1112 if (polling)
1113 transport.setMetaConnectReply(reply);
1114 else
1115 sendMetaReply(client,reply,transport);
1116 }
1117 }
1118
1119
1120
1121 protected class DisconnectHandler extends Handler
1122 {
1123 @Override
1124 ChannelId getMetaChannelId()
1125 {
1126 return META_DISCONNECT_ID;
1127 }
1128
1129 @Override
1130 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1131 {
1132 if (client == null)
1133 {
1134 unknownClient(transport,META_DISCONNECT);
1135 return;
1136 }
1137 if (isLogInfo())
1138 logInfo("Disconnect " + client.getId());
1139
1140 client.remove(false);
1141
1142 Message reply=newMessage(message);
1143 reply.put(CHANNEL_FIELD,META_DISCONNECT);
1144 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1145 String id=message.getId();
1146 if (id != null)
1147 reply.put(ID_FIELD,id);
1148
1149 reply=extendSendMeta(client,reply);
1150
1151 Message pollReply=transport.getMetaConnectReply();
1152 if (pollReply != null)
1153 {
1154 transport.setMetaConnectReply(null);
1155 sendMetaReply(client,pollReply,transport);
1156 }
1157 sendMetaReply(client,reply,transport);
1158 }
1159 }
1160
1161
1162
1163 protected class HandshakeHandler extends Handler
1164 {
1165 @Override
1166 ChannelId getMetaChannelId()
1167 {
1168 return META_HANDSHAKE_ID;
1169 }
1170
1171 @Override
1172 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1173 {
1174 if (client != null)
1175 throw new IllegalStateException();
1176
1177 if (_securityPolicy != null && !_securityPolicy.canHandshake(message))
1178 {
1179 Message reply=newMessage(message);
1180 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1181 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1182 reply.put(ERROR_FIELD,"403::Handshake denied");
1183
1184 sendMetaReply(client,reply,transport);
1185 return;
1186 }
1187
1188 client=newRemoteClient();
1189
1190 Message reply=newMessage(message);
1191 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1192 reply.put(VERSION_FIELD,"1.0");
1193 reply.put(MIN_VERSION_FIELD,"0.9");
1194
1195 if (client != null)
1196 {
1197 reply.put(SUPPORTED_CONNECTION_TYPES_FIELD,_transports);
1198 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1199 reply.put(CLIENT_FIELD,client.getId());
1200 if (_advice != null)
1201 reply.put(ADVICE_FIELD,_advice);
1202 }
1203 else
1204 {
1205 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1206 if (_advice != null)
1207 reply.put(ADVICE_FIELD,_advice);
1208 }
1209
1210 if (isLogDebug())
1211 logDebug("handshake.handle: reply=" + reply);
1212
1213 String id=message.getId();
1214 if (id != null)
1215 reply.put(ID_FIELD,id);
1216
1217 sendMetaReply(client,reply,transport);
1218 }
1219 }
1220
1221
1222
1223 protected class PublishHandler extends Handler
1224 {
1225 @Override
1226 ChannelId getMetaChannelId()
1227 {
1228 return null;
1229 }
1230
1231 @Override
1232 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1233 {
1234 String channel_id=message.getChannel();
1235
1236 if (client == null && message.containsKey(CLIENT_FIELD))
1237 {
1238 unknownClient(transport,channel_id);
1239 return;
1240 }
1241
1242 String id=message.getId();
1243
1244 ChannelId cid=getChannelId(channel_id);
1245 Object data=message.get(Bayeux.DATA_FIELD);
1246
1247 Message reply=newMessage(message);
1248 reply.put(CHANNEL_FIELD,channel_id);
1249 if (id != null)
1250 reply.put(ID_FIELD,id);
1251
1252 if (data == null)
1253 {
1254 message=null;
1255 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1256 reply.put(ERROR_FIELD,"403::No data");
1257 }
1258 else if (!_securityPolicy.canPublish(client,channel_id,message))
1259 {
1260 message=null;
1261 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1262 reply.put(ERROR_FIELD,"403::Publish denied");
1263 }
1264 else
1265 {
1266 message.remove(CLIENT_FIELD);
1267 message=extendSendBayeux(client,message);
1268
1269 if (message != null)
1270 {
1271 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1272 }
1273 else
1274 {
1275 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1276 reply.put(ERROR_FIELD,"404::Message deleted");
1277 }
1278 }
1279
1280 sendMetaReply(client,reply,transport);
1281
1282 if (message != null)
1283 _root.doDelivery(cid,client,message);
1284 }
1285 }
1286
1287
1288
1289 protected class MetaPublishHandler extends Handler
1290 {
1291 @Override
1292 ChannelId getMetaChannelId()
1293 {
1294 return null;
1295 }
1296
1297 @Override
1298 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1299 {
1300 String channel_id=message.getChannel();
1301
1302 if (client == null && !META_HANDSHAKE.equals(channel_id))
1303 {
1304
1305 return;
1306 }
1307
1308 if (_securityPolicy.canPublish(client,channel_id,message))
1309 {
1310 _root.doDelivery(getChannelId(channel_id),client,message);
1311 }
1312 }
1313 }
1314
1315
1316
1317 protected class SubscribeHandler extends Handler
1318 {
1319 @Override
1320 ChannelId getMetaChannelId()
1321 {
1322 return META_SUBSCRIBE_ID;
1323 }
1324
1325 @Override
1326 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1327 {
1328 if (client == null)
1329 {
1330 unknownClient(transport,META_SUBSCRIBE);
1331 return;
1332 }
1333
1334 String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD);
1335
1336
1337 if (subscribe_id == null)
1338 {
1339 subscribe_id=Long.toString(getRandom(),36);
1340 while(getChannel(subscribe_id) != null)
1341 subscribe_id=Long.toString(getRandom(),36);
1342 }
1343
1344 ChannelId cid=null;
1345 boolean can_subscribe=false;
1346
1347 if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH))
1348 {
1349 can_subscribe=true;
1350 }
1351 else if (subscribe_id.startsWith(Bayeux.META_SLASH))
1352 {
1353 can_subscribe=false;
1354 }
1355 else
1356 {
1357 cid=getChannelId(subscribe_id);
1358 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message);
1359 }
1360
1361 Message reply=newMessage(message);
1362 reply.put(CHANNEL_FIELD,META_SUBSCRIBE);
1363 reply.put(SUBSCRIPTION_FIELD,subscribe_id);
1364
1365 if (can_subscribe)
1366 {
1367 if (cid != null)
1368 {
1369 ChannelImpl channel=getChannel(cid);
1370 if (channel == null && _securityPolicy.canCreate(client,subscribe_id,message))
1371 channel=(ChannelImpl)getChannel(subscribe_id,true);
1372
1373 if (channel != null)
1374 channel.subscribe(client);
1375 else
1376 can_subscribe=false;
1377 }
1378
1379 if (can_subscribe)
1380 {
1381 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1382 }
1383 else
1384 {
1385 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1386 reply.put(ERROR_FIELD,"403::cannot create");
1387 }
1388 }
1389 else
1390 {
1391 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1392 reply.put(ERROR_FIELD,"403::cannot subscribe");
1393
1394 }
1395
1396 String id=message.getId();
1397 if (id != null)
1398 reply.put(ID_FIELD,id);
1399
1400 sendMetaReply(client,reply,transport);
1401 }
1402 }
1403
1404
1405
1406 protected class UnsubscribeHandler extends Handler
1407 {
1408 @Override
1409 ChannelId getMetaChannelId()
1410 {
1411 return META_UNSUBSCRIBE_ID;
1412 }
1413
1414 @Override
1415 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1416 {
1417 if (client == null)
1418 {
1419 unknownClient(transport,META_UNSUBSCRIBE);
1420 return;
1421 }
1422
1423 String channel_id=(String)message.get(SUBSCRIPTION_FIELD);
1424 ChannelImpl channel=getChannel(channel_id);
1425 if (channel != null)
1426 channel.unsubscribe(client);
1427
1428 Message reply=newMessage(message);
1429 reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE);
1430 reply.put(SUBSCRIPTION_FIELD,channel_id);
1431 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1432
1433 String id=message.getId();
1434 if (id != null)
1435 reply.put(ID_FIELD,id);
1436
1437 sendMetaReply(client,reply,transport);
1438 }
1439 }
1440
1441
1442
1443 protected class PingHandler extends Handler
1444 {
1445 @Override
1446 ChannelId getMetaChannelId()
1447 {
1448 return META_PING_ID;
1449 }
1450
1451 @Override
1452 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1453 {
1454 Message reply=newMessage(message);
1455 reply.put(CHANNEL_FIELD,META_PING);
1456 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1457
1458 String id=message.getId();
1459 if (id != null)
1460 reply.put(ID_FIELD,id);
1461
1462 sendMetaReply(client,reply,transport);
1463 }
1464 }
1465
1466
1467
1468 protected class ServiceChannel extends ChannelImpl
1469 {
1470 ServiceChannel(String id)
1471 {
1472 super(id,AbstractBayeux.this);
1473 setPersistent(true);
1474 }
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484 @Override
1485 public ChannelImpl addChild(ChannelImpl channel)
1486 {
1487 channel.setPersistent(true);
1488 return super.addChild(channel);
1489 }
1490
1491
1492 @Override
1493 public void subscribe(Client client)
1494 {
1495 if (client.isLocal())
1496 super.subscribe(client);
1497 }
1498
1499 }
1500 }