View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.Arrays;
18  import java.util.Collection;
19  import java.util.List;
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.ConcurrentMap;
22  
23  import org.cometd.Bayeux;
24  import org.cometd.Channel;
25  import org.cometd.ChannelBayeuxListener;
26  import org.cometd.ChannelListener;
27  import org.cometd.Client;
28  import org.cometd.DataFilter;
29  import org.cometd.Message;
30  import org.cometd.SubscriptionListener;
31  import org.mortbay.log.Log;
32  import org.mortbay.util.LazyList;
33  
34  /* ------------------------------------------------------------ */
35  /**
36   * A Bayuex Channel
37   * 
38   * @author gregw
39   * 
40   */
41  public class ChannelImpl implements Channel
42  {
43      private final AbstractBayeux _bayeux;
44      private final ChannelId _id;
45      private final ConcurrentHashMap<String,ChannelImpl> _children=new ConcurrentHashMap<String,ChannelImpl>();
46      private volatile ClientImpl[] _subscribers=new ClientImpl[0]; // copy on write
47      private volatile DataFilter[] _dataFilters=new DataFilter[0]; // copy on write
48      private volatile SubscriptionListener[] _subscriptionListeners=new SubscriptionListener[0]; // copy on write
49      private volatile ChannelImpl _wild;
50      private volatile ChannelImpl _wildWild;
51      private volatile boolean _persistent;
52      private volatile int _split;
53      private volatile boolean _lazy;
54  
55      /* ------------------------------------------------------------ */
56      protected ChannelImpl(String id, AbstractBayeux bayeux)
57      {
58          _id=new ChannelId(id);
59          _bayeux=bayeux;
60      }
61  
62      /* ------------------------------------------------------------ */
63      /**
64       * A Lazy channel marks published messages as lazy. Lazy messages are queued
65       * but do not wake up waiting clients.
66       * 
67       * @return true if message is lazy
68       */
69      public boolean isLazy()
70      {
71          return _lazy;
72      }
73  
74      /* ------------------------------------------------------------ */
75      /**
76       * A Lazy channel marks published messages as lazy. Lazy messages are queued
77       * but do not wake up waiting clients.
78       * 
79       * @param lazy
80       *            true if message is lazy
81       */
82      public void setLazy(boolean lazy)
83      {
84          _lazy=lazy;
85      }
86  
87      /* ------------------------------------------------------------ */
88      /**
89       * Add a channel
90       * @param channel
91       * @return The added channel, or the existing channel if another thread
92       * already added the channel
93       */
94      public ChannelImpl addChild(ChannelImpl channel)
95      {
96          ChannelId child=channel.getChannelId();
97          if (!_id.isParentOf(child))
98          {
99              throw new IllegalArgumentException(_id + " not parent of " + child);
100         }
101 
102         String next=child.getSegment(_id.depth());
103 
104         if ((child.depth() - _id.depth()) == 1)
105         {
106             // add the channel to this channels
107             ChannelImpl old=_children.putIfAbsent(next,channel);
108             if (old != null)
109                 return old;
110 
111             if (ChannelId.WILD.equals(next))
112                 _wild=channel;
113             else if (ChannelId.WILDWILD.equals(next))
114                 _wildWild=channel;
115             _bayeux.addChannel(channel);
116             return channel;
117         }
118         else
119         {
120             ChannelImpl branch=(ChannelImpl)_bayeux.getChannel((_id.depth() == 0?"/":(_id.toString() + "/")) + next,true);
121             return branch.addChild(channel);
122         }
123     }
124 
125     /* ------------------------------------------------------------ */
126     /**
127      * @param filter
128      */
129     public void addDataFilter(DataFilter filter)
130     {
131         synchronized(this)
132         {
133             _dataFilters=(DataFilter[])LazyList.addToArray(_dataFilters,filter,null);
134         }
135     }
136 
137     /* ------------------------------------------------------------ */
138     /**
139      * @return
140      */
141     public ChannelId getChannelId()
142     {
143         return _id;
144     }
145 
146     /* ------------------------------------------------------------ */
147     public ChannelImpl getChild(ChannelId id)
148     {
149         String next=id.getSegment(_id.depth());
150         if (next == null)
151             return null;
152 
153         ChannelImpl channel=_children.get(next);
154 
155         if (channel == null || channel.getChannelId().depth() == id.depth())
156         {
157             return channel;
158         }
159         return channel.getChild(id);
160     }
161 
162     /* ------------------------------------------------------------ */
163     public void getChannels(List<Channel> list)
164     {
165         list.add(this);
166         for (ChannelImpl channel : _children.values())
167             channel.getChannels(list);
168     }
169 
170     /* ------------------------------------------------------------ */
171     public int getChannelCount()
172     {
173         return _children.size();
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * @return
179      */
180     public String getId()
181     {
182         return _id.toString();
183     }
184 
185     /* ------------------------------------------------------------ */
186     public boolean isPersistent()
187     {
188         return _persistent;
189     }
190 
191     /* ------------------------------------------------------------ */
192     public void deliver(Client from, Iterable<Client> to, Object data, String id)
193     {
194         MessageImpl message=_bayeux.newMessage();
195         message.put(Bayeux.CHANNEL_FIELD,getId());
196         message.put(Bayeux.DATA_FIELD,data);
197         if (id != null)
198             message.put(Bayeux.ID_FIELD,id);
199 
200         Message m=_bayeux.extendSendBayeux(from,message);
201 
202         if (m != null)
203         {
204             for (Client t : to)
205                 ((ClientImpl)t).doDelivery(from,m);
206         }
207         if (m instanceof MessageImpl)
208             ((MessageImpl)m).decRef();
209     }
210 
211     /* ------------------------------------------------------------ */
212     public void publish(Client fromClient, Object data, String msgId)
213     {
214         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);
215     }
216 
217     /* ------------------------------------------------------------ */
218     public void publishLazy(Client fromClient, Object data, String msgId)
219     {
220         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);
221     }
222 
223     /* ------------------------------------------------------------ */
224     public boolean remove()
225     {
226         return _bayeux.removeChannel(this);
227     }
228 
229     /* ------------------------------------------------------------ */
230     public boolean doRemove(ChannelImpl channel, List<ChannelBayeuxListener> listeners)
231     {
232         ChannelId channelId=channel.getChannelId();
233         int diff=channel._id.depth() - _id.depth();
234 
235         if (diff >= 1)
236         {
237             String key=channelId.getSegment(_id.depth());
238             ChannelImpl child=_children.get(key);
239 
240             if (child != null)
241             {
242                 // is it this child we are removing?
243                 if (diff == 1)
244                 {
245                     if (!child.isPersistent())
246                     {
247                         // remove the child
248                         child=_children.remove(key);
249                         if (child !=null)
250                         {
251                             if (_wild==channel)
252                                 _wild=null;
253                             else if (_wildWild==channel)
254                                 _wildWild=null;
255                             if ( child.getChannelCount() > 0)
256                             {
257                                 // remove the children of the child
258                                 for (ChannelImpl c : child._children.values())
259                                     child.doRemove(c,listeners);
260                             }
261                             for (ChannelBayeuxListener l : listeners)
262                                 l.channelRemoved(child);
263                         }
264                         return true;
265                     }
266                     return false;
267                 }
268 
269                 boolean removed=child.doRemove(channel,listeners);
270                 
271                 // Do we remove a non persistent child?
272                 if (removed && !child.isPersistent() && child.getChannelCount() == 0 && child.getSubscriberCount() == 0)
273                 {
274                     child=_children.remove(key);
275                     if (child!=null)
276                         for (ChannelBayeuxListener l : listeners)
277                             l.channelRemoved(child);
278                 }
279 
280                 return removed;
281             }
282 
283         }
284         return false;
285     }
286 
287     /* ------------------------------------------------------------ */
288     /**
289      * @param filter
290      */
291     public DataFilter removeDataFilter(DataFilter filter)
292     {
293         synchronized(this)
294         {
295             _dataFilters=(DataFilter[])LazyList.removeFromArray(_dataFilters,filter);
296             return filter;
297         }
298     }
299 
300     /* ------------------------------------------------------------ */
301     public void setPersistent(boolean persistent)
302     {
303         _persistent=persistent;
304     }
305 
306     /* ------------------------------------------------------------ */
307     /**
308      * @param client
309      */
310     public void subscribe(Client client)
311     {
312         if (!(client instanceof ClientImpl))
313             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
314 
315         synchronized(this)
316         {
317             for (ClientImpl c : _subscribers)
318             {
319                 if (client.equals(c))
320                     return;
321             }
322             _subscribers=(ClientImpl[])LazyList.addToArray(_subscribers,client,null);
323         }
324         SubscriptionListener[] listeners=_subscriptionListeners;
325         for (SubscriptionListener l : listeners)
326             l.subscribed(client,this);
327 
328         ((ClientImpl)client).addSubscription(this);
329     }
330 
331     /* ------------------------------------------------------------ */
332     @Override
333     public String toString()
334     {
335         return _id.toString();
336     }
337 
338     /* ------------------------------------------------------------ */
339     /**
340      * @param client
341      */
342     public void unsubscribe(Client client)
343     {
344         if (!(client instanceof ClientImpl))
345             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
346         ((ClientImpl)client).removeSubscription(this);
347         synchronized(this)
348         {
349             _subscribers=(ClientImpl[])LazyList.removeFromArray(_subscribers,client);
350         }
351 
352         for (SubscriptionListener l : _subscriptionListeners)
353             l.unsubscribed(client,this);
354         
355         if (!_persistent && _subscribers.length == 0 && _children.size() == 0)
356             remove();
357         
358     }
359 
360     /* ------------------------------------------------------------ */
361     protected void doDelivery(ChannelId to, Client from, Message msg)
362     {
363         int tail=to.depth() - _id.depth();
364 
365         Object data=msg.getData();
366 
367         // if we have data, filter it
368         if (data != null)
369         {
370             Object old=data;
371 
372             try
373             {
374                 switch(tail)
375                 {
376                     case 0:
377                     {
378                         final DataFilter[] filters=_dataFilters;
379                         for (DataFilter filter : filters)
380                         {
381                             data=filter.filter(from,this,data);
382                             if (data == null)
383                                 return;
384                         }
385                     }
386                         break;
387 
388                     case 1:
389                         final ChannelImpl wild = _wild;
390                         if (wild != null)
391                         {
392                             final DataFilter[] filters=wild._dataFilters;
393                             for (DataFilter filter : filters)
394                             {
395                                 data=filter.filter(from,this,data);
396                                 if (data == null)
397                                     return;
398                             }
399                         }
400 
401                     default:
402                         final ChannelImpl wildWild = _wildWild;
403                         if (wildWild != null)
404                         {
405                             final DataFilter[] filters=wildWild._dataFilters;
406                             for (DataFilter filter : filters)
407                             {
408                                 data=filter.filter(from,this,data);
409                                 if (data == null)
410                                     return;
411                             }
412                         }
413                 }
414             }
415             catch(IllegalStateException e)
416             {
417                 Log.ignore(e);
418                 return;
419             }
420 
421             // TODO this may not be correct if the message is reused.
422             // probably should close message ?
423             if (data != old)
424                 msg.put(AbstractBayeux.DATA_FIELD,data);
425         }
426 
427         switch(tail)
428         {
429             case 0:
430             {
431                 if (_lazy && msg instanceof MessageImpl)
432                     ((MessageImpl)msg).setLazy(true);
433 
434                 final ClientImpl[] subscribers=_subscribers;
435                 if (subscribers.length > 0)
436                 {
437                     // fair delivery
438                     int split=_split++ % subscribers.length;
439                     for (int i=split; i < subscribers.length; i++)
440                         subscribers[i].doDelivery(from,msg);
441                     for (int i=0; i < split; i++)
442                         subscribers[i].doDelivery(from,msg);
443                 }
444                 break;
445             }
446 
447             case 1:
448                 final ChannelImpl wild = _wild;
449                 if (wild != null)
450                 {
451                     if (wild._lazy && msg instanceof MessageImpl)
452                         ((MessageImpl)msg).setLazy(true);
453                     final ClientImpl[] subscribers=wild._subscribers;
454                     for (ClientImpl client : subscribers)
455                         client.doDelivery(from,msg);
456                 }
457 
458             default:
459             {
460                 final ChannelImpl wildWild = _wildWild;
461                 if (wildWild != null)
462                 {
463                     if (wildWild._lazy && msg instanceof MessageImpl)
464                         ((MessageImpl)msg).setLazy(true);
465                     final ClientImpl[] subscribers=wildWild._subscribers;
466                     for (ClientImpl client : subscribers)
467                         client.doDelivery(from,msg);
468                 }
469                 String next=to.getSegment(_id.depth());
470                 ChannelImpl channel=_children.get(next);
471                 if (channel != null)
472                     channel.doDelivery(to,from,msg);
473             }
474         }
475     }
476 
477     /* ------------------------------------------------------------ */
478     public Collection<Client> getSubscribers()
479     {
480         synchronized(this)
481         {
482             return Arrays.asList((Client[])_subscribers);
483         }
484     }
485 
486     /* ------------------------------------------------------------ */
487     public int getSubscriberCount()
488     {
489         return _subscribers.length;
490     }
491 
492     /* ------------------------------------------------------------ */
493     /*
494      * (non-Javadoc)
495      * 
496      * @see dojox.cometd.Channel#getFilters()
497      */
498     public Collection<DataFilter> getDataFilters()
499     {
500         synchronized(this)
501         {
502             return Arrays.asList(_dataFilters);
503         }
504     }
505 
506     /* ------------------------------------------------------------ */
507     public void addListener(ChannelListener listener)
508     {
509         if (listener instanceof SubscriptionListener)
510         {
511             synchronized(this)
512             {
513                 _subscriptionListeners=(SubscriptionListener[])LazyList.addToArray(_subscriptionListeners,listener,null);
514             }
515         }
516     }
517 
518     public void removeListener(ChannelListener listener)
519     {
520         if (listener instanceof SubscriptionListener)
521         {
522             synchronized(this)
523             {
524                 _subscriptionListeners=(SubscriptionListener[])LazyList.removeFromArray(_subscriptionListeners,listener);
525             }
526         }
527     }
528 }