1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22
23 import org.cometd.Bayeux;
24 import org.cometd.Channel;
25 import org.cometd.ChannelBayeuxListener;
26 import org.cometd.ChannelListener;
27 import org.cometd.Client;
28 import org.cometd.DataFilter;
29 import org.cometd.Message;
30 import org.cometd.SubscriptionListener;
31 import org.mortbay.log.Log;
32 import org.mortbay.util.LazyList;
33
34
35
36
37
38
39
40
41 public class ChannelImpl implements Channel
42 {
43 private final AbstractBayeux _bayeux;
44 private final ChannelId _id;
45 private final ConcurrentHashMap<String,ChannelImpl> _children=new ConcurrentHashMap<String,ChannelImpl>();
46 private volatile ClientImpl[] _subscribers=new ClientImpl[0];
47 private volatile DataFilter[] _dataFilters=new DataFilter[0];
48 private volatile SubscriptionListener[] _subscriptionListeners=new SubscriptionListener[0];
49 private volatile ChannelImpl _wild;
50 private volatile ChannelImpl _wildWild;
51 private volatile boolean _persistent;
52 private volatile int _split;
53 private volatile boolean _lazy;
54
55
56 protected ChannelImpl(String id, AbstractBayeux bayeux)
57 {
58 _id=new ChannelId(id);
59 _bayeux=bayeux;
60 }
61
62
63
64
65
66
67
68
69 public boolean isLazy()
70 {
71 return _lazy;
72 }
73
74
75
76
77
78
79
80
81
82 public void setLazy(boolean lazy)
83 {
84 _lazy=lazy;
85 }
86
87
88
89
90
91
92
93
94 public ChannelImpl addChild(ChannelImpl channel)
95 {
96 ChannelId child=channel.getChannelId();
97 if (!_id.isParentOf(child))
98 {
99 throw new IllegalArgumentException(_id + " not parent of " + child);
100 }
101
102 String next=child.getSegment(_id.depth());
103
104 if ((child.depth() - _id.depth()) == 1)
105 {
106
107 ChannelImpl old=_children.putIfAbsent(next,channel);
108 if (old != null)
109 return old;
110
111 if (ChannelId.WILD.equals(next))
112 _wild=channel;
113 else if (ChannelId.WILDWILD.equals(next))
114 _wildWild=channel;
115 _bayeux.addChannel(channel);
116 return channel;
117 }
118 else
119 {
120 ChannelImpl branch=(ChannelImpl)_bayeux.getChannel((_id.depth() == 0?"/":(_id.toString() + "/")) + next,true);
121 return branch.addChild(channel);
122 }
123 }
124
125
126
127
128
129 public void addDataFilter(DataFilter filter)
130 {
131 synchronized(this)
132 {
133 _dataFilters=(DataFilter[])LazyList.addToArray(_dataFilters,filter,null);
134 }
135 }
136
137
138
139
140
141 public ChannelId getChannelId()
142 {
143 return _id;
144 }
145
146
147 public ChannelImpl getChild(ChannelId id)
148 {
149 String next=id.getSegment(_id.depth());
150 if (next == null)
151 return null;
152
153 ChannelImpl channel=_children.get(next);
154
155 if (channel == null || channel.getChannelId().depth() == id.depth())
156 {
157 return channel;
158 }
159 return channel.getChild(id);
160 }
161
162
163 public void getChannels(List<Channel> list)
164 {
165 list.add(this);
166 for (ChannelImpl channel : _children.values())
167 channel.getChannels(list);
168 }
169
170
171 public int getChannelCount()
172 {
173 return _children.size();
174 }
175
176
177
178
179
180 public String getId()
181 {
182 return _id.toString();
183 }
184
185
186 public boolean isPersistent()
187 {
188 return _persistent;
189 }
190
191
192 public void deliver(Client from, Iterable<Client> to, Object data, String id)
193 {
194 MessageImpl message=_bayeux.newMessage();
195 message.put(Bayeux.CHANNEL_FIELD,getId());
196 message.put(Bayeux.DATA_FIELD,data);
197 if (id != null)
198 message.put(Bayeux.ID_FIELD,id);
199
200 Message m=_bayeux.extendSendBayeux(from,message);
201
202 if (m != null)
203 {
204 for (Client t : to)
205 ((ClientImpl)t).doDelivery(from,m);
206 }
207 if (m instanceof MessageImpl)
208 ((MessageImpl)m).decRef();
209 }
210
211
212 public void publish(Client fromClient, Object data, String msgId)
213 {
214 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);
215 }
216
217
218 public void publishLazy(Client fromClient, Object data, String msgId)
219 {
220 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);
221 }
222
223
224 public boolean remove()
225 {
226 return _bayeux.removeChannel(this);
227 }
228
229
230 public boolean doRemove(ChannelImpl channel, List<ChannelBayeuxListener> listeners)
231 {
232 ChannelId channelId=channel.getChannelId();
233 int diff=channel._id.depth() - _id.depth();
234
235 if (diff >= 1)
236 {
237 String key=channelId.getSegment(_id.depth());
238 ChannelImpl child=_children.get(key);
239
240 if (child != null)
241 {
242
243 if (diff == 1)
244 {
245 if (!child.isPersistent())
246 {
247
248 child=_children.remove(key);
249 if (child !=null)
250 {
251 if (_wild==channel)
252 _wild=null;
253 else if (_wildWild==channel)
254 _wildWild=null;
255 if ( child.getChannelCount() > 0)
256 {
257
258 for (ChannelImpl c : child._children.values())
259 child.doRemove(c,listeners);
260 }
261 for (ChannelBayeuxListener l : listeners)
262 l.channelRemoved(child);
263 }
264 return true;
265 }
266 return false;
267 }
268
269 boolean removed=child.doRemove(channel,listeners);
270
271
272 if (removed && !child.isPersistent() && child.getChannelCount() == 0 && child.getSubscriberCount() == 0)
273 {
274 child=_children.remove(key);
275 if (child!=null)
276 for (ChannelBayeuxListener l : listeners)
277 l.channelRemoved(child);
278 }
279
280 return removed;
281 }
282
283 }
284 return false;
285 }
286
287
288
289
290
291 public DataFilter removeDataFilter(DataFilter filter)
292 {
293 synchronized(this)
294 {
295 _dataFilters=(DataFilter[])LazyList.removeFromArray(_dataFilters,filter);
296 return filter;
297 }
298 }
299
300
301 public void setPersistent(boolean persistent)
302 {
303 _persistent=persistent;
304 }
305
306
307
308
309
310 public void subscribe(Client client)
311 {
312 if (!(client instanceof ClientImpl))
313 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
314
315 synchronized(this)
316 {
317 for (ClientImpl c : _subscribers)
318 {
319 if (client.equals(c))
320 return;
321 }
322 _subscribers=(ClientImpl[])LazyList.addToArray(_subscribers,client,null);
323 }
324 SubscriptionListener[] listeners=_subscriptionListeners;
325 for (SubscriptionListener l : listeners)
326 l.subscribed(client,this);
327
328 ((ClientImpl)client).addSubscription(this);
329 }
330
331
332 @Override
333 public String toString()
334 {
335 return _id.toString();
336 }
337
338
339
340
341
342 public void unsubscribe(Client client)
343 {
344 if (!(client instanceof ClientImpl))
345 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
346 ((ClientImpl)client).removeSubscription(this);
347 synchronized(this)
348 {
349 _subscribers=(ClientImpl[])LazyList.removeFromArray(_subscribers,client);
350 }
351
352 for (SubscriptionListener l : _subscriptionListeners)
353 l.unsubscribed(client,this);
354
355 if (!_persistent && _subscribers.length == 0 && _children.size() == 0)
356 remove();
357
358 }
359
360
361 protected void doDelivery(ChannelId to, Client from, Message msg)
362 {
363 int tail=to.depth() - _id.depth();
364
365 Object data=msg.getData();
366
367
368 if (data != null)
369 {
370 Object old=data;
371
372 try
373 {
374 switch(tail)
375 {
376 case 0:
377 {
378 final DataFilter[] filters=_dataFilters;
379 for (DataFilter filter : filters)
380 {
381 data=filter.filter(from,this,data);
382 if (data == null)
383 return;
384 }
385 }
386 break;
387
388 case 1:
389 final ChannelImpl wild = _wild;
390 if (wild != null)
391 {
392 final DataFilter[] filters=wild._dataFilters;
393 for (DataFilter filter : filters)
394 {
395 data=filter.filter(from,this,data);
396 if (data == null)
397 return;
398 }
399 }
400
401 default:
402 final ChannelImpl wildWild = _wildWild;
403 if (wildWild != null)
404 {
405 final DataFilter[] filters=wildWild._dataFilters;
406 for (DataFilter filter : filters)
407 {
408 data=filter.filter(from,this,data);
409 if (data == null)
410 return;
411 }
412 }
413 }
414 }
415 catch(IllegalStateException e)
416 {
417 Log.ignore(e);
418 return;
419 }
420
421
422
423 if (data != old)
424 msg.put(AbstractBayeux.DATA_FIELD,data);
425 }
426
427 switch(tail)
428 {
429 case 0:
430 {
431 if (_lazy && msg instanceof MessageImpl)
432 ((MessageImpl)msg).setLazy(true);
433
434 final ClientImpl[] subscribers=_subscribers;
435 if (subscribers.length > 0)
436 {
437
438 int split=_split++ % subscribers.length;
439 for (int i=split; i < subscribers.length; i++)
440 subscribers[i].doDelivery(from,msg);
441 for (int i=0; i < split; i++)
442 subscribers[i].doDelivery(from,msg);
443 }
444 break;
445 }
446
447 case 1:
448 final ChannelImpl wild = _wild;
449 if (wild != null)
450 {
451 if (wild._lazy && msg instanceof MessageImpl)
452 ((MessageImpl)msg).setLazy(true);
453 final ClientImpl[] subscribers=wild._subscribers;
454 for (ClientImpl client : subscribers)
455 client.doDelivery(from,msg);
456 }
457
458 default:
459 {
460 final ChannelImpl wildWild = _wildWild;
461 if (wildWild != null)
462 {
463 if (wildWild._lazy && msg instanceof MessageImpl)
464 ((MessageImpl)msg).setLazy(true);
465 final ClientImpl[] subscribers=wildWild._subscribers;
466 for (ClientImpl client : subscribers)
467 client.doDelivery(from,msg);
468 }
469 String next=to.getSegment(_id.depth());
470 ChannelImpl channel=_children.get(next);
471 if (channel != null)
472 channel.doDelivery(to,from,msg);
473 }
474 }
475 }
476
477
478 public Collection<Client> getSubscribers()
479 {
480 synchronized(this)
481 {
482 return Arrays.asList((Client[])_subscribers);
483 }
484 }
485
486
487 public int getSubscriberCount()
488 {
489 return _subscribers.length;
490 }
491
492
493
494
495
496
497
498 public Collection<DataFilter> getDataFilters()
499 {
500 synchronized(this)
501 {
502 return Arrays.asList(_dataFilters);
503 }
504 }
505
506
507 public void addListener(ChannelListener listener)
508 {
509 if (listener instanceof SubscriptionListener)
510 {
511 synchronized(this)
512 {
513 _subscriptionListeners=(SubscriptionListener[])LazyList.addToArray(_subscriptionListeners,listener,null);
514 }
515 }
516 }
517
518 public void removeListener(ChannelListener listener)
519 {
520 if (listener instanceof SubscriptionListener)
521 {
522 synchronized(this)
523 {
524 _subscriptionListeners=(SubscriptionListener[])LazyList.removeFromArray(_subscriptionListeners,listener);
525 }
526 }
527 }
528 }