View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.io.IOException;
18  import java.security.SecureRandom;
19  import java.util.ArrayList;
20  import java.util.Arrays;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Random;
27  import java.util.Set;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.CopyOnWriteArrayList;
30  import javax.servlet.ServletContext;
31  import javax.servlet.http.HttpServletRequest;
32  
33  import org.cometd.Bayeux;
34  import org.cometd.BayeuxListener;
35  import org.cometd.Channel;
36  import org.cometd.ChannelBayeuxListener;
37  import org.cometd.Client;
38  import org.cometd.ClientBayeuxListener;
39  import org.cometd.Extension;
40  import org.cometd.Message;
41  import org.cometd.SecurityPolicy;
42  import org.mortbay.util.LazyList;
43  import org.mortbay.util.ajax.JSON;
44  
45  /* ------------------------------------------------------------ */
46  /**
47   * @author gregw
48   * @author aabeling: added JSONP transport
49   *
50   */
51  public abstract class AbstractBayeux extends MessagePool implements Bayeux
52  {
53      public static final ChannelId META_ID=new ChannelId(META);
54      public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT);
55      public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT);
56      public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT);
57      public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE);
58      public static final ChannelId META_PING_ID=new ChannelId(META_PING);
59      public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS);
60      public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE);
61      public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE);
62  
63      private final HashMap<String,Handler> _handlers=new HashMap<String,Handler>();
64      private final ChannelImpl _root=new ChannelImpl("/",this);
65      private final ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>();
66      protected final ConcurrentHashMap<String,ChannelId> _channelIdCache=new ConcurrentHashMap<String,ChannelId>();
67      protected final ConcurrentHashMap<String,List<String>> _browser2client=new ConcurrentHashMap<String,List<String>>();
68      protected final ThreadLocal<HttpServletRequest> _request=new ThreadLocal<HttpServletRequest>();
69      protected final List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>();
70      protected final List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>();
71      protected final Handler _publishHandler;
72      protected final Handler _metaPublishHandler;
73      
74      protected SecurityPolicy _securityPolicy=new DefaultPolicy();
75      protected JSON.Literal _advice;
76      protected JSON.Literal _multiFrameAdvice;
77      protected int _adviceVersion=0;
78      protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}");
79      protected int _logLevel;
80      protected long _timeout=30000;
81      protected long _interval=0;
82      protected long _maxInterval=10000;
83      protected boolean _initialized;
84      protected int _multiFrameInterval=-1;
85  
86      protected boolean _requestAvailable;
87  
88      transient ServletContext _context;
89      transient Random _random;
90      protected int _maxClientQueue=-1;
91  
92      protected Extension[] _extensions;
93      protected JSON.Literal _transports=new JSON.Literal("[\"" + Bayeux.TRANSPORT_LONG_POLL + "\",\"" + Bayeux.TRANSPORT_CALLBACK_POLL + "\"]");
94      protected JSON.Literal _replyExt=new JSON.Literal("{\"ack\":\"true\"}");
95      
96      protected int _maxLazyLatency=5000;
97  
98      /* ------------------------------------------------------------ */
99      /**
100      * @param context The logLevel init parameter is used to set the logging to: 0=none, 1=info, 2=debug
101      */
102     protected AbstractBayeux()
103     {
104         _publishHandler=new PublishHandler();
105         _metaPublishHandler=new MetaPublishHandler();
106         _handlers.put(META_HANDSHAKE,new HandshakeHandler());
107         _handlers.put(META_CONNECT,new ConnectHandler());
108         _handlers.put(META_DISCONNECT,new DisconnectHandler());
109         _handlers.put(META_SUBSCRIBE,new SubscribeHandler());
110         _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler());
111         _handlers.put(META_PING,new PingHandler());
112 
113         setTimeout(getTimeout());
114     }
115 
116     /* ------------------------------------------------------------ */
117     public void addExtension(Extension ext)
118     {
119         _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
120     }
121 
122     /* ------------------------------------------------------------ */
123     /**
124      * @param id
125      * @return
126      */
127     public ChannelImpl getChannel(ChannelId id)
128     {
129         return _root.getChild(id);
130     }
131 
132     /* ------------------------------------------------------------ */
133     public ChannelImpl getChannel(String id)
134     {
135         ChannelId cid=getChannelId(id);
136         if (cid.depth() == 0)
137             return null;
138         return _root.getChild(cid);
139     }
140 
141     /* ------------------------------------------------------------ */
142     public Channel getChannel(String id, boolean create)
143     {
144         ChannelImpl channel=getChannel(id);
145 
146         if (channel == null && create)
147         {
148             channel=new ChannelImpl(id,this);
149             Channel added =_root.addChild(channel);
150             if (added!=channel) 
151                 return added;
152             if (isLogInfo())
153                 logInfo("newChannel: " + channel);
154         }
155         return channel;
156     }
157 
158     /* ------------------------------------------------------------ */
159     public ChannelId getChannelId(String id)
160     {
161         ChannelId cid=_channelIdCache.get(id);
162         if (cid == null)
163         {
164             // TODO shrink cache!
165             cid=new ChannelId(id);
166             ChannelId other=_channelIdCache.putIfAbsent(id,cid);
167             if (other!=null)
168                 return other;
169         }
170         return cid;
171     }
172 
173     /* ------------------------------------------------------------ */
174     /*
175      * (non-Javadoc)
176      * 
177      * @see org.mortbay.cometd.Bx#getClient(java.lang.String)
178      */
179     public Client getClient(String client_id)
180     {
181         if (client_id == null)
182             return null;
183         Client client=_clients.get(client_id);
184         return client;
185     }
186 
187     /* ------------------------------------------------------------ */
188     public Set<String> getClientIDs()
189     {
190         return _clients.keySet();
191     }
192 
193     /* ------------------------------------------------------------ */
194     /**
195      * @return The maximum time in ms to wait between polls before timing out a
196      *         client
197      */
198     public long getMaxInterval()
199     {
200         return _maxInterval;
201     }
202 
203     /* ------------------------------------------------------------ */
204     /**
205      * @return the logLevel. 0=none, 1=info, 2=debug
206      */
207     public int getLogLevel()
208     {
209         return _logLevel;
210     }
211 
212     /* ------------------------------------------------------------ */
213     /*
214      * (non-Javadoc)
215      * 
216      * @see org.mortbay.cometd.Bx#getSecurityPolicy()
217      */
218     public SecurityPolicy getSecurityPolicy()
219     {
220         return _securityPolicy;
221     }
222 
223     /* ------------------------------------------------------------ */
224     public long getTimeout()
225     {
226         return _timeout;
227     }
228 
229     /* ------------------------------------------------------------ */
230     public long getInterval()
231     {
232         return _interval;
233     }
234 
235     /* ------------------------------------------------------------ */
236     /**
237      * @return true if published messages are directly delivered to subscribers.
238      *         False if a new message is to be created that holds only supported
239      *         fields.
240      */
241     public boolean isDirectDeliver()
242     {
243         return false;
244     }
245 
246     /* ------------------------------------------------------------ */
247     /**
248      * @deprecated
249      * @param directDeliver
250      *            true if published messages are directly delivered to
251      *            subscribers. False if a new message is to be created that
252      *            holds only supported fields.
253      */
254     public void setDirectDeliver(boolean directDeliver)
255     {
256         _context.log("directDeliver is deprecated");
257     }
258 
259     /* ------------------------------------------------------------ */
260     /**
261      * Handle a Bayeux message. This is normally only called by the bayeux
262      * servlet or a test harness.
263      *
264      * @param client
265      *            The client if known
266      * @param transport
267      *            The transport to use for the message
268      * @param message
269      *            The bayeux message.
270      */
271     public String handle(ClientImpl client, Transport transport, Message message) throws IOException
272     {
273         String channel_id=message.getChannel();
274 
275         Handler handler=(Handler)_handlers.get(channel_id);
276         if (handler != null)
277         {
278             message=extendRcvMeta(client,message);
279             handler.handle(client,transport,message);
280             _metaPublishHandler.handle(client,transport,message);
281         }
282         else if (channel_id.startsWith(META_SLASH))
283         {
284             message=extendRcvMeta(client,message);
285             _metaPublishHandler.handle(client,transport,message);
286         }
287         else
288         {
289             // non meta channel
290             handler=_publishHandler;
291             message=extendRcv(client,message);
292             handler.handle(client,transport,message);
293         }
294 
295         return channel_id;
296     }
297 
298     /* ------------------------------------------------------------ */
299     public boolean hasChannel(String id)
300     {
301         ChannelId cid=getChannelId(id);
302         return _root.getChild(cid) != null;
303     }
304 
305     /* ------------------------------------------------------------ */
306     public boolean isInitialized()
307     {
308         return _initialized;
309     }
310 
311     /* ------------------------------------------------------------ */
312     /**
313      * @return the commented
314      * @deprecated
315      */
316     public boolean isJSONCommented()
317     {
318         return false;
319     }
320 
321     /* ------------------------------------------------------------ */
322     public boolean isLogDebug()
323     {
324         return _logLevel > 1;
325     }
326 
327     /* ------------------------------------------------------------ */
328     public boolean isLogInfo()
329     {
330         return _logLevel > 0;
331     }
332 
333     /* ------------------------------------------------------------ */
334     public void logDebug(String message)
335     {
336         if (_logLevel > 1)
337             _context.log(message);
338     }
339 
340     /* ------------------------------------------------------------ */
341     public void logDebug(String message, Throwable th)
342     {
343         if (_logLevel > 1)
344             _context.log(message,th);
345     }
346 
347     /* ------------------------------------------------------------ */
348     public void logWarn(String message, Throwable th)
349     {
350         _context.log(message + ": " + th.toString());
351     }
352 
353     /* ------------------------------------------------------------ */
354     public void logWarn(String message)
355     {
356         _context.log(message);
357     }
358 
359     /* ------------------------------------------------------------ */
360     public void logInfo(String message)
361     {
362         if (_logLevel > 0)
363             _context.log(message);
364     }
365 
366     /* ------------------------------------------------------------ */
367     public Client newClient(String idPrefix)
368     {
369         ClientImpl client=new ClientImpl(this,idPrefix);
370         return client;
371     }
372 
373     /* ------------------------------------------------------------ */
374     public abstract ClientImpl newRemoteClient();
375 
376     /* ------------------------------------------------------------ */
377     /**
378      * Create new transport object for a bayeux message
379      *
380      * @param client
381      *            The client
382      * @param message
383      *            the bayeux message
384      * @return the negotiated transport.
385      */
386     public Transport newTransport(ClientImpl client, Map<?,?> message)
387     {
388         if (isLogDebug())
389             logDebug("newTransport: client=" + client + ",message=" + message);
390 
391         Transport result;
392 
393         String type = client == null ? null : client.getConnectionType();
394         if (type == null)
395         {
396             // Check if it is a connect message and we can extract the connection type
397             type = (String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
398         }
399         if (type == null)
400         {
401             // Check if it is an handshake message and we can negotiate the connection type
402             Object types = message.get(Bayeux.SUPPORTED_CONNECTION_TYPES_FIELD);
403             if (types != null)
404             {
405                 List supportedTypes;
406                 if (types instanceof Object[]) supportedTypes = Arrays.asList((Object[])types);
407                 else if (types instanceof List) supportedTypes = (List)types;
408                 else if (types instanceof Map) supportedTypes = new ArrayList(((Map)types).values());
409                 else supportedTypes = Collections.emptyList();
410 
411                 if (supportedTypes.contains(Bayeux.TRANSPORT_LONG_POLL)) type = Bayeux.TRANSPORT_LONG_POLL;
412                 else if (supportedTypes.contains(Bayeux.TRANSPORT_CALLBACK_POLL)) type = Bayeux.TRANSPORT_CALLBACK_POLL;
413             }
414         }
415         if (type == null)
416         {
417             // A normal message, check if it has the jsonp parameter
418             String jsonp = (String) message.get(Bayeux.JSONP_PARAMETER);
419             type = jsonp != null ? Bayeux.TRANSPORT_CALLBACK_POLL : Bayeux.TRANSPORT_LONG_POLL;
420         }
421 
422         if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type))
423         {
424             String jsonp = (String)message.get(Bayeux.JSONP_PARAMETER);
425             if (jsonp == null) throw new IllegalArgumentException("Missing 'jsonp' field in message " + message + " for transport " + type);
426             result = new JSONPTransport(jsonp);
427         }
428         else if (Bayeux.TRANSPORT_LONG_POLL.equals(type))
429         {
430             result = new JSONTransport();
431         }
432         else
433         {
434             throw new IllegalArgumentException("Unsupported transport type " + type);
435         }
436 
437         if (isLogDebug())
438             logDebug("newTransport: result="+result);
439 
440         return result;
441     }
442 
443     /* ------------------------------------------------------------ */
444     /**
445      * Publish data to a channel. Creates a message and delivers it to the root
446      * channel.
447      *
448      * @param to
449      * @param from
450      * @param data
451      * @param msgId
452      */
453     protected void doPublish(ChannelId to, Client from, Object data, String msgId, boolean lazy)
454     {
455         final MessageImpl message=newMessage();
456         message.put(CHANNEL_FIELD,to.toString());
457 
458         if (msgId == null)
459         {
460             long id=message.hashCode() ^ (to == null?0:to.hashCode()) ^ (from == null?0:from.hashCode());
461             id=id < 0?-id:id;
462             message.put(ID_FIELD,Long.toString(id,36));
463         }
464         else
465             message.put(ID_FIELD,msgId);
466         message.put(DATA_FIELD,data);
467 
468         message.setLazy(lazy);
469 
470         final Message m=extendSendBayeux(from,message);
471 
472         if (m != null)
473             _root.doDelivery(to,from,m);
474         if (m instanceof MessageImpl)
475             ((MessageImpl)m).decRef();
476     }
477 
478     /* ------------------------------------------------------------ */
479     public boolean removeChannel(ChannelImpl channel)
480     {
481         return _root.doRemove(channel,_channelListeners);
482     }
483 
484     /* ------------------------------------------------------------ */
485     protected void addChannel(ChannelImpl channel)
486     {
487         for (ChannelBayeuxListener l : _channelListeners)
488             l.channelAdded(channel);
489     }
490 
491     /* ------------------------------------------------------------ */
492     protected String newClientId(long variation, String idPrefix)
493     {
494         if (idPrefix == null)
495             return Long.toString(getRandom(),36) + Long.toString(variation,36);
496         else
497             return idPrefix + "_" + Long.toString(getRandom(),36);
498     }
499 
500     /* ------------------------------------------------------------ */
501     protected void addClient(ClientImpl client, String idPrefix)
502     {
503         while(true)
504         {
505             String id=newClientId(client.hashCode(),idPrefix);
506             client.setId(id);
507 
508             ClientImpl other=_clients.putIfAbsent(id,client);
509             if (other == null)
510             {
511                 for (ClientBayeuxListener l : _clientListeners)
512                     l.clientAdded((Client)client);
513 
514                 return;
515             }
516         }
517     }
518 
519     /* ------------------------------------------------------------ */
520     /*
521      * (non-Javadoc)
522      * 
523      * @see org.mortbay.cometd.Bx#removeClient(java.lang.String)
524      */
525     public Client removeClient(String client_id)
526     {
527         ClientImpl client;
528         if (client_id == null)
529             return null;
530         client=_clients.remove(client_id);
531         if (client != null)
532         {
533             for (ClientBayeuxListener l : _clientListeners)
534                 l.clientRemoved((Client)client);
535             client.unsubscribeAll();
536         }
537         return client;
538     }
539 
540     /* ------------------------------------------------------------ */
541     /**
542      * @param ms
543      *            The maximum time in ms to wait between polls before timing out
544      *            a client
545      */
546     public void setMaxInterval(long ms)
547     {
548         _maxInterval=ms;
549     }
550 
551     /* ------------------------------------------------------------ */
552     /**
553      * @param commented the commented to set
554      */
555     public void setJSONCommented(boolean commented)
556     {
557         if (commented)
558             _context.log("JSONCommented is deprecated");
559     }
560 
561     /* ------------------------------------------------------------ */
562     /**
563      * @param logLevel
564      *            the logLevel: 0=none, 1=info, 2=debug
565      */
566     public void setLogLevel(int logLevel)
567     {
568         _logLevel=logLevel;
569     }
570 
571     /* ------------------------------------------------------------ */
572     /*
573      * (non-Javadoc)
574      *
575      * @see
576      * org.mortbay.cometd.Bx#setSecurityPolicy(org.mortbay.cometd.SecurityPolicy
577      * )
578      */
579     public void setSecurityPolicy(SecurityPolicy securityPolicy)
580     {
581         _securityPolicy=securityPolicy;
582     }
583 
584     /* ------------------------------------------------------------ */
585     public void setTimeout(long ms)
586     {
587         _timeout=ms;
588         generateAdvice();
589     }
590 
591     /* ------------------------------------------------------------ */
592     public void setInterval(long ms)
593     {
594         _interval=ms;
595         generateAdvice();
596     }
597 
598     /* ------------------------------------------------------------ */
599     /**
600      * The time a client should delay between reconnects when multiple
601      * connections from the same browser are detected. This effectively produces
602      * traditional polling.
603      *
604      * @param multiFrameInterval
605      *            the multiFrameInterval to set
606      */
607     public void setMultiFrameInterval(int multiFrameInterval)
608     {
609         _multiFrameInterval=multiFrameInterval;
610         generateAdvice();
611     }
612 
613     /* ------------------------------------------------------------ */
614     /**
615      * @return the multiFrameInterval in milliseconds
616      */
617     public int getMultiFrameInterval()
618     {
619         return _multiFrameInterval;
620     }
621 
622     /* ------------------------------------------------------------ */
623     void generateAdvice()
624     {
625         setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":" + getInterval() + ",\"timeout\":" + getTimeout() + "}"));
626     }
627 
628     /* ------------------------------------------------------------ */
629     public void setAdvice(JSON.Literal advice)
630     {
631         synchronized(this)
632         {
633             _adviceVersion++;
634             _advice=advice;
635             _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice)));
636         }
637     }
638 
639     /* ------------------------------------------------------------ */
640     private Map<String,Object> multiFrameAdvice(JSON.Literal advice)
641     {
642         Map<String,Object> a=(Map<String,Object>)JSON.parse(_advice.toString());
643         a.put("multiple-clients",Boolean.TRUE);
644         if (_multiFrameInterval > 0)
645         {
646             a.put("reconnect","retry");
647             a.put("interval",_multiFrameInterval);
648         }
649         else
650             a.put("reconnect","none");
651         return a;
652     }
653 
654     /* ------------------------------------------------------------ */
655     public JSON.Literal getAdvice()
656     {
657         return _advice;
658     }
659 
660     /* ------------------------------------------------------------ */
661     /**
662      * @return TRUE if {@link #getCurrentRequest()} will return the current
663      *         request
664      */
665     public boolean isRequestAvailable()
666     {
667         return _requestAvailable;
668     }
669 
670     /* ------------------------------------------------------------ */
671     /**
672      * @param requestAvailable
673      *            TRUE if {@link #getCurrentRequest()} will return the current
674      *            request
675      */
676     public void setRequestAvailable(boolean requestAvailable)
677     {
678         _requestAvailable=requestAvailable;
679     }
680 
681     /* ------------------------------------------------------------ */
682     /**
683      * @return the current request if {@link #isRequestAvailable()} is true,
684      *         else null
685      */
686     public HttpServletRequest getCurrentRequest()
687     {
688         return _request.get();
689     }
690 
691     /* ------------------------------------------------------------ */
692     /**
693      * @return the current request if {@link #isRequestAvailable()} is true,
694      *         else null
695      */
696     void setCurrentRequest(HttpServletRequest request)
697     {
698         _request.set(request);
699     }
700 
701     /* ------------------------------------------------------------ */
702     public Collection<Channel> getChannels()
703     {
704         List<Channel> channels=new ArrayList<Channel>();
705         _root.getChannels(channels);
706         return channels;
707     }
708 
709     /* ------------------------------------------------------------ */
710     /**
711      * @return
712      */
713     public int getChannelCount()
714     {
715         return _root.getChannelCount();
716     }
717 
718     /* ------------------------------------------------------------ */
719     public Collection<Client> getClients()
720     {
721         return new ArrayList<Client>(_clients.values());
722     }
723 
724     /* ------------------------------------------------------------ */
725     /**
726      * @return
727      */
728     public int getClientCount()
729     {
730         return _clients.size();
731     }
732 
733     /* ------------------------------------------------------------ */
734     public boolean hasClient(String clientId)
735     {
736         if (clientId == null)
737             return false;
738         return _clients.containsKey(clientId);
739     }
740 
741     /* ------------------------------------------------------------ */
742     public Channel removeChannel(String channelId)
743     {
744         Channel channel=getChannel(channelId);
745 
746         boolean removed=false;
747         if (channel != null)
748             removed=channel.remove();
749 
750         if (removed)
751             return channel;
752         else
753             return null;
754     }
755 
756     /* ------------------------------------------------------------ */
757     protected void initialize(ServletContext context)
758     {
759         synchronized(this)
760         {
761             _initialized=true;
762             _context=context;
763             try
764             {
765                 _random=SecureRandom.getInstance("SHA1PRNG");
766             }
767             catch(Exception e)
768             {
769                 context.log("Could not get secure random for ID generation",e);
770                 _random=new Random();
771             }
772             _random.setSeed(_random.nextLong() ^ hashCode() ^ System.nanoTime() ^ Runtime.getRuntime().freeMemory());
773 
774             _root.addChild(new ServiceChannel(Bayeux.SERVICE));
775 
776         }
777     }
778 
779     /* ------------------------------------------------------------ */
780     long getRandom()
781     {
782         long l=_random.nextLong();
783         return l < 0?-l:l;
784     }
785 
786     /* ------------------------------------------------------------ */
787     void clientOnBrowser(String browserId, String clientId)
788     {
789         List<String> clients=_browser2client.get(browserId);
790         if (clients == null)
791         {
792             List<String> new_clients=new CopyOnWriteArrayList<String>();
793             clients=_browser2client.putIfAbsent(browserId,new_clients);
794             if (clients == null)
795                 clients=new_clients;
796         }
797         clients.add(clientId);
798     }
799 
800     /* ------------------------------------------------------------ */
801     void clientOffBrowser(String browserId, String clientId)
802     {
803         List<String> clients=_browser2client.get(browserId);
804 
805         if (clients != null)
806             clients.remove(clientId);
807     }
808 
809     /* ------------------------------------------------------------ */
810     List<String> clientsOnBrowser(String browserId)
811     {
812         List<String> clients=_browser2client.get(browserId);
813 
814         if (clients == null)
815             return Collections.emptyList();
816         return clients;
817     }
818 
819     /* ------------------------------------------------------------ */
820     public void addListener(BayeuxListener listener)
821     {
822         if (listener instanceof ClientBayeuxListener)
823             _clientListeners.add((ClientBayeuxListener)listener);
824         if (listener instanceof ChannelBayeuxListener)
825             _channelListeners.add((ChannelBayeuxListener)listener);
826     }
827 
828     /* ------------------------------------------------------------ */
829     public int getMaxClientQueue()
830     {
831         return _maxClientQueue;
832     }
833 
834     /* ------------------------------------------------------------ */
835     public void setMaxClientQueue(int size)
836     {
837         _maxClientQueue=size;
838     }
839 
840     /* ------------------------------------------------------------ */
841     protected Message extendRcv(ClientImpl from, Message message)
842     {
843         if (_extensions != null)
844         {
845             for (int i=_extensions.length; message != null && i-- > 0;)
846                 message=_extensions[i].rcv(from,message);
847         }
848 
849         if (from != null)
850         {
851             Extension[] client_exs=from.getExtensions();
852             if (client_exs != null)
853             {
854                 for (int i=client_exs.length; message != null && i-- > 0;)
855                     message=client_exs[i].rcv(from,message);
856             }
857         }
858 
859         return message;
860     }
861 
862     /* ------------------------------------------------------------ */
863     protected Message extendRcvMeta(ClientImpl from, Message message)
864     {
865         if (_extensions != null)
866         {
867             for (int i=_extensions.length; message != null && i-- > 0;)
868                 message=_extensions[i].rcvMeta(from,message);
869         }
870 
871         if (from != null)
872         {
873             Extension[] client_exs=from.getExtensions();
874             if (client_exs != null)
875             {
876                 for (int i=client_exs.length; message != null && i-- > 0;)
877                     message=client_exs[i].rcvMeta(from,message);
878             }
879         }
880         return message;
881     }
882 
883     /* ------------------------------------------------------------ */
884     protected Message extendSendBayeux(Client from, Message message)
885     {
886         if (_extensions != null)
887         {
888             for (int i=0; message != null && i < _extensions.length; i++)
889             {
890                 message=_extensions[i].send(from,message);
891             }
892         }
893 
894         return message;
895     }
896 
897     /* ------------------------------------------------------------ */
898     public Message extendSendClient(Client from, ClientImpl to, Message message)
899     {
900         if (to != null)
901         {
902             Extension[] client_exs=to.getExtensions();
903             if (client_exs != null)
904             {
905                 for (int i=0; message != null && i < client_exs.length; i++)
906                     message=client_exs[i].send(from,message);
907             }
908         }
909 
910         return message;
911     }
912 
913     /* ------------------------------------------------------------ */
914     public Message extendSendMeta(ClientImpl from, Message message)
915     {
916         if (_extensions != null)
917         {
918             for (int i=0; message != null && i < _extensions.length; i++)
919                 message=_extensions[i].sendMeta(from,message);
920         }
921 
922         if (from != null)
923         {
924             Extension[] client_exs=from.getExtensions();
925             if (client_exs != null)
926             {
927                 for (int i=0; message != null && i < client_exs.length; i++)
928                     message=client_exs[i].sendMeta(from,message);
929             }
930         }
931 
932         return message;
933     }
934 
935     /* ------------------------------------------------------------ */
936     /**
937      * @return the maximum ms that a lazy message will wait before 
938      * resuming waiting client
939      */
940     public int getMaxLazyLatency()
941     {
942         return _maxLazyLatency;
943     }
944 
945     /* ------------------------------------------------------------ */
946     /**
947      * @param ms the maximum ms that a lazy message will wait before 
948      * resuming waiting client
949      */
950     public void setMaxLazyLatency(int ms)
951     {
952         _maxLazyLatency = ms;
953     }
954 
955     /* ------------------------------------------------------------ */
956     /* ------------------------------------------------------------ */
957     public static class DefaultPolicy implements SecurityPolicy
958     {
959         public boolean canHandshake(Message message)
960         {
961             return true;
962         }
963 
964         public boolean canCreate(Client client, String channel, Message message)
965         {
966             return client != null && !channel.startsWith(Bayeux.META_SLASH);
967         }
968 
969         public boolean canSubscribe(Client client, String channel, Message message)
970         {
971             if (client != null && ("/**".equals(channel) || "/*".equals(channel)))
972                 return false;
973             return client != null && !channel.startsWith(Bayeux.META_SLASH);
974         }
975 
976         public boolean canPublish(Client client, String channel, Message message)
977         {
978             return client != null || client == null && Bayeux.META_HANDSHAKE.equals(channel);
979         }
980 
981     }
982 
983     /* ------------------------------------------------------------ */
984     /* ------------------------------------------------------------ */
985     protected abstract class Handler
986     {
987         abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException;
988 
989         abstract ChannelId getMetaChannelId();
990 
991         void unknownClient(Transport transport, String channel) throws IOException
992         {
993             MessageImpl reply=newMessage();
994 
995             reply.put(CHANNEL_FIELD,channel);
996             reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
997             reply.put(ERROR_FIELD,"402::Unknown client");
998             reply.put("advice",_handshakeAdvice);
999             transport.send(reply);
1000             reply.decRef();
1001         }
1002 
1003         void sendMetaReply(final ClientImpl client, Message reply, final Transport transport) throws IOException
1004         {
1005             reply=extendSendMeta(client,reply);
1006             if (reply != null)
1007             {
1008                 transport.send(reply);
1009                 if (reply instanceof MessageImpl)
1010                     ((MessageImpl)reply).decRef();
1011             }
1012         }
1013     }
1014 
1015     /* ------------------------------------------------------------ */
1016     /* ------------------------------------------------------------ */
1017     protected class ConnectHandler extends Handler
1018     {
1019         protected String _metaChannel=META_CONNECT;
1020 
1021         @Override
1022         ChannelId getMetaChannelId()
1023         {
1024             return META_CONNECT_ID;
1025         }
1026 
1027         @Override
1028         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1029         {
1030             if (client == null)
1031             {
1032                 unknownClient(transport,_metaChannel);
1033                 return;
1034             }
1035 
1036             // is this the first connect message?
1037             String type=client.getConnectionType();
1038             boolean polling=true;
1039             if (type == null)
1040             {
1041                 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
1042                 client.setConnectionType(type);
1043                 polling=false;
1044             }
1045 
1046             Object advice=message.get(ADVICE_FIELD);
1047             if (advice != null)
1048             {
1049                 Long timeout=(Long)((Map)advice).get("timeout");
1050                 if (timeout != null && timeout.longValue() > 0)
1051                     client.setTimeout(timeout.longValue());
1052                 else
1053                     client.setTimeout(0);
1054 
1055                 Long interval=(Long)((Map)advice).get("interval");
1056                 if (interval != null && interval.longValue() > 0)
1057                     client.setInterval(interval.longValue());
1058                 else
1059                     client.setInterval(0);
1060             }
1061             else
1062             {
1063                 client.setTimeout(0);
1064                 client.setInterval(0);
1065             }
1066 
1067             advice=null;
1068 
1069             // Work out if multiple clients from some browser?
1070             if (polling && _multiFrameInterval > 0 && client.getBrowserId() != null)
1071             {
1072                 List<String> clients=clientsOnBrowser(client.getBrowserId());
1073                 int count=clients.size();
1074                 if (count > 1)
1075                 {
1076                     polling=clients.get(0).equals(client.getId());
1077                     advice=client.getAdvice();
1078                     if (advice == null)
1079                         advice=_multiFrameAdvice;
1080                     else
1081                         // could probably cache this
1082                         advice=multiFrameAdvice((JSON.Literal)advice);
1083                 }
1084             }
1085 
1086             synchronized(this)
1087             {
1088                 if (advice == null)
1089                 {
1090                     if (_adviceVersion != client._adviseVersion)
1091                     {
1092                         advice=_advice;
1093                         client._adviseVersion=_adviceVersion;
1094                     }
1095                 }
1096                 else
1097                     client._adviseVersion=-1; // clear so it is reset after multi state clears
1098             }
1099 
1100             // reply to connect message
1101             String id=message.getId();
1102 
1103             Message reply=newMessage(message);
1104 
1105             reply.put(CHANNEL_FIELD,META_CONNECT);
1106             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1107             if (advice != null)
1108                 reply.put(ADVICE_FIELD,advice);
1109             if (id != null)
1110                 reply.put(ID_FIELD,id);
1111 
1112             if (polling)
1113                 transport.setMetaConnectReply(reply);
1114             else
1115                 sendMetaReply(client,reply,transport);
1116         }
1117     }
1118 
1119     /* ------------------------------------------------------------ */
1120     /* ------------------------------------------------------------ */
1121     protected class DisconnectHandler extends Handler
1122     {
1123         @Override
1124         ChannelId getMetaChannelId()
1125         {
1126             return META_DISCONNECT_ID;
1127         }
1128 
1129         @Override
1130         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1131         {
1132             if (client == null)
1133             {
1134                 unknownClient(transport,META_DISCONNECT);
1135                 return;
1136             }
1137             if (isLogInfo())
1138                 logInfo("Disconnect " + client.getId());
1139 
1140             client.remove(false);
1141 
1142             Message reply=newMessage(message);
1143             reply.put(CHANNEL_FIELD,META_DISCONNECT);
1144             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1145             String id=message.getId();
1146             if (id != null)
1147                 reply.put(ID_FIELD,id);
1148 
1149             reply=extendSendMeta(client,reply);
1150 
1151             Message pollReply=transport.getMetaConnectReply();
1152             if (pollReply != null)
1153             {
1154                 transport.setMetaConnectReply(null);
1155                 sendMetaReply(client,pollReply,transport);
1156             }
1157             sendMetaReply(client,reply,transport);
1158         }
1159     }
1160 
1161     /* ------------------------------------------------------------ */
1162     /* ------------------------------------------------------------ */
1163     protected class HandshakeHandler extends Handler
1164     {
1165         @Override
1166         ChannelId getMetaChannelId()
1167         {
1168             return META_HANDSHAKE_ID;
1169         }
1170 
1171         @Override
1172         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1173         {
1174             if (client != null)
1175                 throw new IllegalStateException();
1176 
1177             if (_securityPolicy != null && !_securityPolicy.canHandshake(message))
1178             {
1179                 Message reply=newMessage(message);
1180                 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1181                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1182                 reply.put(ERROR_FIELD,"403::Handshake denied");
1183 
1184                 sendMetaReply(client,reply,transport);
1185                 return;
1186             }
1187 
1188             client=newRemoteClient();
1189 
1190             Message reply=newMessage(message);
1191             reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1192             reply.put(VERSION_FIELD,"1.0");
1193             reply.put(MIN_VERSION_FIELD,"0.9");
1194 
1195             if (client != null)
1196             {
1197                 reply.put(SUPPORTED_CONNECTION_TYPES_FIELD,_transports);
1198                 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1199                 reply.put(CLIENT_FIELD,client.getId());
1200                 if (_advice != null)
1201                     reply.put(ADVICE_FIELD,_advice);
1202             }
1203             else
1204             {
1205                 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1206                 if (_advice != null)
1207                     reply.put(ADVICE_FIELD,_advice);
1208             }
1209 
1210             if (isLogDebug())
1211                 logDebug("handshake.handle: reply=" + reply);
1212 
1213             String id=message.getId();
1214             if (id != null)
1215                 reply.put(ID_FIELD,id);
1216 
1217             sendMetaReply(client,reply,transport);
1218         }
1219     }
1220 
1221     /* ------------------------------------------------------------ */
1222     /* ------------------------------------------------------------ */
1223     protected class PublishHandler extends Handler
1224     {
1225         @Override
1226         ChannelId getMetaChannelId()
1227         {
1228             return null;
1229         }
1230 
1231         @Override
1232         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1233         {
1234             String channel_id=message.getChannel();
1235 
1236             if (client == null && message.containsKey(CLIENT_FIELD))
1237             {
1238                 unknownClient(transport,channel_id);
1239                 return;
1240             }
1241 
1242             String id=message.getId();
1243 
1244             ChannelId cid=getChannelId(channel_id);
1245             Object data=message.get(Bayeux.DATA_FIELD);
1246 
1247             Message reply=newMessage(message);
1248             reply.put(CHANNEL_FIELD,channel_id);
1249             if (id != null)
1250                 reply.put(ID_FIELD,id);
1251 
1252             if (data == null)
1253             {
1254                 message=null;
1255                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1256                 reply.put(ERROR_FIELD,"403::No data");
1257             }
1258             else if (!_securityPolicy.canPublish(client,channel_id,message))
1259             {
1260                 message=null;
1261                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1262                 reply.put(ERROR_FIELD,"403::Publish denied");
1263             }
1264             else
1265             {
1266                 message.remove(CLIENT_FIELD);
1267                 message=extendSendBayeux(client,message);
1268 
1269                 if (message != null)
1270                 {
1271                     reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1272                 }
1273                 else
1274                 {
1275                     reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1276                     reply.put(ERROR_FIELD,"404::Message deleted");
1277                 }
1278             }
1279 
1280             sendMetaReply(client,reply,transport);
1281 
1282             if (message != null)
1283                 _root.doDelivery(cid,client,message);
1284         }
1285     }
1286 
1287     /* ------------------------------------------------------------ */
1288     /* ------------------------------------------------------------ */
1289     protected class MetaPublishHandler extends Handler
1290     {
1291         @Override
1292         ChannelId getMetaChannelId()
1293         {
1294             return null;
1295         }
1296 
1297         @Override
1298         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1299         {
1300             String channel_id=message.getChannel();
1301 
1302             if (client == null && !META_HANDSHAKE.equals(channel_id))
1303             {
1304                 // unknown client
1305                 return;
1306             }
1307 
1308             if (_securityPolicy.canPublish(client,channel_id,message))
1309             {
1310                 _root.doDelivery(getChannelId(channel_id),client,message);
1311             }
1312         }
1313     }
1314 
1315     /* ------------------------------------------------------------ */
1316     /* ------------------------------------------------------------ */
1317     protected class SubscribeHandler extends Handler
1318     {
1319         @Override
1320         ChannelId getMetaChannelId()
1321         {
1322             return META_SUBSCRIBE_ID;
1323         }
1324 
1325         @Override
1326         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1327         {
1328             if (client == null)
1329             {
1330                 unknownClient(transport,META_SUBSCRIBE);
1331                 return;
1332             }
1333 
1334             String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD);
1335 
1336             // select a random channel ID if none specifified
1337             if (subscribe_id == null)
1338             {
1339                 subscribe_id=Long.toString(getRandom(),36);
1340                 while(getChannel(subscribe_id) != null)
1341                     subscribe_id=Long.toString(getRandom(),36);
1342             }
1343 
1344             ChannelId cid=null;
1345             boolean can_subscribe=false;
1346 
1347             if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH))
1348             {
1349                 can_subscribe=true;
1350             }
1351             else if (subscribe_id.startsWith(Bayeux.META_SLASH))
1352             {
1353                 can_subscribe=false;
1354             }
1355             else
1356             {
1357                 cid=getChannelId(subscribe_id);
1358                 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message);
1359             }
1360 
1361             Message reply=newMessage(message);
1362             reply.put(CHANNEL_FIELD,META_SUBSCRIBE);
1363             reply.put(SUBSCRIPTION_FIELD,subscribe_id);
1364 
1365             if (can_subscribe)
1366             {
1367                 if (cid != null)
1368                 {
1369                     ChannelImpl channel=getChannel(cid);
1370                     if (channel == null && _securityPolicy.canCreate(client,subscribe_id,message))
1371                         channel=(ChannelImpl)getChannel(subscribe_id,true);
1372 
1373                     if (channel != null)
1374                         channel.subscribe(client);
1375                     else
1376                         can_subscribe=false;
1377                 }
1378 
1379                 if (can_subscribe)
1380                 {
1381                     reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1382                 }
1383                 else
1384                 {
1385                     reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1386                     reply.put(ERROR_FIELD,"403::cannot create");
1387                 }
1388             }
1389             else
1390             {
1391                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1392                 reply.put(ERROR_FIELD,"403::cannot subscribe");
1393 
1394             }
1395 
1396             String id=message.getId();
1397             if (id != null)
1398                 reply.put(ID_FIELD,id);
1399 
1400             sendMetaReply(client,reply,transport);
1401         }
1402     }
1403 
1404     /* ------------------------------------------------------------ */
1405     /* ------------------------------------------------------------ */
1406     protected class UnsubscribeHandler extends Handler
1407     {
1408         @Override
1409         ChannelId getMetaChannelId()
1410         {
1411             return META_UNSUBSCRIBE_ID;
1412         }
1413 
1414         @Override
1415         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1416         {
1417             if (client == null)
1418             {
1419                 unknownClient(transport,META_UNSUBSCRIBE);
1420                 return;
1421             }
1422 
1423             String channel_id=(String)message.get(SUBSCRIPTION_FIELD);
1424             ChannelImpl channel=getChannel(channel_id);
1425             if (channel != null)
1426                 channel.unsubscribe(client);
1427 
1428             Message reply=newMessage(message);
1429             reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE);
1430             reply.put(SUBSCRIPTION_FIELD,channel_id);
1431             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1432 
1433             String id=message.getId();
1434             if (id != null)
1435                 reply.put(ID_FIELD,id);
1436 
1437             sendMetaReply(client,reply,transport);
1438         }
1439     }
1440 
1441     /* ------------------------------------------------------------ */
1442     /* ------------------------------------------------------------ */
1443     protected class PingHandler extends Handler
1444     {
1445         @Override
1446         ChannelId getMetaChannelId()
1447         {
1448             return META_PING_ID;
1449         }
1450 
1451         @Override
1452         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1453         {
1454             Message reply=newMessage(message);
1455             reply.put(CHANNEL_FIELD,META_PING);
1456             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1457 
1458             String id=message.getId();
1459             if (id != null)
1460                 reply.put(ID_FIELD,id);
1461 
1462             sendMetaReply(client,reply,transport);
1463         }
1464     }
1465 
1466     /* ------------------------------------------------------------ */
1467     /* ------------------------------------------------------------ */
1468     protected class ServiceChannel extends ChannelImpl
1469     {
1470         ServiceChannel(String id)
1471         {
1472             super(id,AbstractBayeux.this);
1473             setPersistent(true);
1474         }
1475 
1476         /* ------------------------------------------------------------ */
1477         /*
1478          * (non-Javadoc)
1479          *
1480          * @see
1481          * org.mortbay.cometd.ChannelImpl#addChild(org.mortbay.cometd.ChannelImpl
1482          * )
1483          */
1484         @Override
1485         public ChannelImpl addChild(ChannelImpl channel)
1486         {
1487             channel.setPersistent(true);
1488             return super.addChild(channel);
1489         }
1490 
1491         /* ------------------------------------------------------------ */
1492         @Override
1493         public void subscribe(Client client)
1494         {
1495             if (client.isLocal())
1496                 super.subscribe(client);
1497         }
1498 
1499     }
1500 }