1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.io.nio;
16
17 import java.io.IOException;
18 import java.nio.channels.CancelledKeyException;
19 import java.nio.channels.SelectableChannel;
20 import java.nio.channels.SelectionKey;
21 import java.nio.channels.Selector;
22 import java.nio.channels.ServerSocketChannel;
23 import java.nio.channels.SocketChannel;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27
28 import org.mortbay.component.AbstractLifeCycle;
29 import org.mortbay.io.Connection;
30 import org.mortbay.io.EndPoint;
31 import org.mortbay.log.Log;
32 import org.mortbay.thread.Timeout;
33
34
35
36
37
38
39
40
41
42
43 public abstract class SelectorManager extends AbstractLifeCycle
44 {
45
46 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
47 private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
48 private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
49 private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
50 private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
51
52 private boolean _delaySelectKeyUpdate=true;
53 private long _maxIdleTime;
54 private long _lowResourcesConnections;
55 private long _lowResourcesMaxIdleTime;
56 private transient SelectSet[] _selectSet;
57 private int _selectSets=1;
58 private volatile int _set;
59
60
61
62
63
64
65 public void setMaxIdleTime(long maxIdleTime)
66 {
67 _maxIdleTime=maxIdleTime;
68 }
69
70
71
72
73
74 public void setSelectSets(int selectSets)
75 {
76 long lrc = _lowResourcesConnections * _selectSets;
77 _selectSets=selectSets;
78 _lowResourcesConnections=lrc/_selectSets;
79 }
80
81
82
83
84
85 public long getMaxIdleTime()
86 {
87 return _maxIdleTime;
88 }
89
90
91
92
93
94 public int getSelectSets()
95 {
96 return _selectSets;
97 }
98
99
100
101
102
103 public boolean isDelaySelectKeyUpdate()
104 {
105 return _delaySelectKeyUpdate;
106 }
107
108
109
110
111
112
113
114 public void register(SocketChannel channel, Object att) throws IOException
115 {
116 int s=_set++;
117 s=s%_selectSets;
118 SelectSet[] sets=_selectSet;
119 if (sets!=null)
120 {
121 SelectSet set=sets[s];
122 set.addChange(channel,att);
123 set.wakeup();
124 }
125 }
126
127
128
129
130
131
132
133 public void register(ServerSocketChannel acceptChannel) throws IOException
134 {
135 int s=_set++;
136 s=s%_selectSets;
137 SelectSet set=_selectSet[s];
138 set.addChange(acceptChannel);
139 set.wakeup();
140 }
141
142
143
144
145
146 public long getLowResourcesConnections()
147 {
148 return _lowResourcesConnections*_selectSets;
149 }
150
151
152
153
154
155
156
157
158 public void setLowResourcesConnections(long lowResourcesConnections)
159 {
160 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
161 }
162
163
164
165
166
167 public long getLowResourcesMaxIdleTime()
168 {
169 return _lowResourcesMaxIdleTime;
170 }
171
172
173
174
175
176
177 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
178 {
179 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
180 }
181
182
183
184
185
186
187 public void doSelect(int acceptorID) throws IOException
188 {
189 SelectSet[] sets= _selectSet;
190 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
191 sets[acceptorID].doSelect();
192 }
193
194
195
196
197
198
199 public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
200 {
201 _delaySelectKeyUpdate=delaySelectKeyUpdate;
202 }
203
204
205
206
207
208
209
210 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
211
212
213 public abstract boolean dispatch(Runnable task) throws IOException;
214
215
216
217
218
219 protected void doStart() throws Exception
220 {
221 _selectSet = new SelectSet[_selectSets];
222 for (int i=0;i<_selectSet.length;i++)
223 _selectSet[i]= new SelectSet(i);
224
225 super.doStart();
226 }
227
228
229
230 protected void doStop() throws Exception
231 {
232 SelectSet[] sets= _selectSet;
233 _selectSet=null;
234 if (sets!=null)
235 for (int i=0;i<sets.length;i++)
236 {
237 SelectSet set = sets[i];
238 if (set!=null)
239 set.stop();
240 }
241 super.doStop();
242 }
243
244
245
246
247
248 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
249
250
251
252
253
254 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
255
256
257 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
258
259
260
261
262
263
264
265
266
267 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
268
269
270 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
271 {
272 Log.warn(ex);
273 }
274
275
276
277
278 public class SelectSet
279 {
280 private transient int _change;
281 private transient List[] _changes;
282 private transient Timeout _idleTimeout;
283 private transient int _nextSet;
284 private transient Timeout _retryTimeout;
285 private transient Selector _selector;
286 private transient int _setID;
287 private volatile boolean _selecting;
288 private transient int _jvmBug;
289 private int _selects;
290 private long _monitorStart;
291 private long _monitorNext;
292 private boolean _pausing;
293 private SelectionKey _busyKey;
294 private int _busyKeyCount;
295 private long _log;
296 private int _paused;
297 private int _jvmFix0;
298 private int _jvmFix1;
299 private int _jvmFix2;
300
301
302 SelectSet(int acceptorID) throws Exception
303 {
304 _setID=acceptorID;
305
306 _idleTimeout = new Timeout(this);
307 _idleTimeout.setDuration(getMaxIdleTime());
308 _retryTimeout = new Timeout(this);
309 _retryTimeout.setDuration(0L);
310
311
312 _selector = Selector.open();
313 _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
314 _change=0;
315 _monitorStart=System.currentTimeMillis();
316 _monitorNext=_monitorStart+__MONITOR_PERIOD;
317 _log=_monitorStart+60000;
318 }
319
320
321 public void addChange(Object point)
322 {
323 synchronized (_changes)
324 {
325 _changes[_change].add(point);
326 }
327 }
328
329
330 public void addChange(SelectableChannel channel, Object att)
331 {
332 if (att==null)
333 addChange(channel);
334 else if (att instanceof EndPoint)
335 addChange(att);
336 else
337 addChange(new ChangeSelectableChannel(channel,att));
338 }
339
340
341 public void cancelIdle(Timeout.Task task)
342 {
343 synchronized (this)
344 {
345 task.cancel();
346 }
347 }
348
349
350
351
352
353
354
355 public void doSelect() throws IOException
356 {
357 SelectionKey key=null;
358
359 try
360 {
361 List changes;
362 final Selector selector;
363 synchronized (_changes)
364 {
365 changes=_changes[_change];
366 _change=_change==0?1:0;
367 _selecting=true;
368 selector=_selector;
369 }
370
371
372 for (int i = 0; i < changes.size(); i++)
373 {
374 try
375 {
376 Object o = changes.get(i);
377
378 if (o instanceof EndPoint)
379 {
380
381 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
382 endpoint.doUpdateKey();
383 }
384 else if (o instanceof Runnable)
385 {
386 dispatch((Runnable)o);
387 }
388 else if (o instanceof ChangeSelectableChannel)
389 {
390
391 final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
392 final SelectableChannel channel=asc._channel;
393 final Object att = asc._attachment;
394
395 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
396 {
397 key = channel.register(selector,SelectionKey.OP_READ,att);
398 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
399 key.attach(endpoint);
400 endpoint.dispatch();
401 }
402 else if (channel.isOpen())
403 {
404 channel.register(selector,SelectionKey.OP_CONNECT,att);
405 }
406 }
407 else if (o instanceof SocketChannel)
408 {
409 final SocketChannel channel=(SocketChannel)o;
410
411 if (channel.isConnected())
412 {
413 key = channel.register(selector,SelectionKey.OP_READ,null);
414 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
415 key.attach(endpoint);
416 endpoint.dispatch();
417 }
418 else if (channel.isOpen())
419 {
420 channel.register(selector,SelectionKey.OP_CONNECT,null);
421 }
422 }
423 else if (o instanceof ServerSocketChannel)
424 {
425 ServerSocketChannel channel = (ServerSocketChannel)o;
426 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
427 }
428 else if (o instanceof ChangeTask)
429 {
430 ((ChangeTask)o).run();
431 }
432 else
433 throw new IllegalArgumentException(o.toString());
434 }
435 catch (Exception e)
436 {
437 if (isRunning())
438 Log.warn(e);
439 else
440 Log.debug(e);
441 }
442 }
443 changes.clear();
444
445 long idle_next = 0;
446 long retry_next = 0;
447 long now=System.currentTimeMillis();
448 synchronized (this)
449 {
450 _idleTimeout.setNow(now);
451 _retryTimeout.setNow(now);
452 if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
453 _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
454 else
455 _idleTimeout.setDuration(_maxIdleTime);
456 idle_next=_idleTimeout.getTimeToNext();
457 retry_next=_retryTimeout.getTimeToNext();
458 }
459
460
461 long wait = 1000L;
462 if (idle_next >= 0 && wait > idle_next)
463 wait = idle_next;
464 if (wait > 0 && retry_next >= 0 && wait > retry_next)
465 wait = retry_next;
466
467
468 if (wait > 2)
469 {
470
471 if (_pausing)
472 {
473 try
474 {
475 Thread.sleep(__BUSY_PAUSE);
476 }
477 catch(InterruptedException e)
478 {
479 Log.ignore(e);
480 }
481 }
482
483 long before=now;
484 int selected=selector.select(wait);
485 now = System.currentTimeMillis();
486 _idleTimeout.setNow(now);
487 _retryTimeout.setNow(now);
488 _selects++;
489
490
491
492
493 if (now>_monitorNext)
494 {
495 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
496 _pausing=_selects>__MAX_SELECTS;
497 if (_pausing)
498 _paused++;
499
500 _selects=0;
501 _jvmBug=0;
502 _monitorStart=now;
503 _monitorNext=now+__MONITOR_PERIOD;
504 }
505
506 if (now>_log)
507 {
508 if (_paused>0)
509 Log.info(this+" Busy selector - injecting delay "+_paused+" times");
510
511 if (_jvmFix2>0)
512 Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
513
514 if (_jvmFix1>0)
515 Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, canceled keys "+_jvmFix0+" times");
516
517 else if(Log.isDebugEnabled() && _jvmFix0>0)
518 Log.info(this+" JVM BUG(s) - canceled keys "+_jvmFix0+" times");
519 _paused=0;
520 _jvmFix2=0;
521 _jvmFix1=0;
522 _jvmFix0=0;
523 _log=now+60000;
524 }
525
526
527 if (selected==0 && wait>10 && (now-before)<(wait/2))
528 {
529
530 _jvmBug++;
531 if (_jvmBug>(__JVMBUG_THRESHHOLD))
532 {
533 try
534 {
535 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
536 _jvmFix2++;
537
538 Thread.sleep(__BUSY_PAUSE);
539 }
540 catch(InterruptedException e)
541 {
542 Log.ignore(e);
543 }
544 }
545 else if (_jvmBug==__JVMBUG_THRESHHOLD)
546 {
547 synchronized (this)
548 {
549
550 _jvmFix1++;
551
552 final Selector new_selector = Selector.open();
553 Iterator iterator = _selector.keys().iterator();
554 while (iterator.hasNext())
555 {
556 SelectionKey k = (SelectionKey)iterator.next();
557 if (!k.isValid() || k.interestOps()==0)
558 continue;
559
560 final SelectableChannel channel = k.channel();
561 final Object attachment = k.attachment();
562
563 if (attachment==null)
564 addChange(channel);
565 else
566 addChange(channel,attachment);
567 }
568 _selector.close();
569 _selector=new_selector;
570 return;
571 }
572 }
573 else if (_jvmBug%32==31)
574 {
575
576 int cancelled=0;
577 Iterator iter = selector.keys().iterator();
578 while(iter.hasNext())
579 {
580 SelectionKey k = (SelectionKey) iter.next();
581 if (k.isValid()&&k.interestOps()==0)
582 {
583 k.cancel();
584 cancelled++;
585 }
586 }
587 if (cancelled>0)
588 _jvmFix0++;
589
590 return;
591 }
592 }
593 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
594 {
595
596 SelectionKey busy = (SelectionKey)selector.selectedKeys().iterator().next();
597 if (busy==_busyKey)
598 {
599 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
600 {
601 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
602 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
603 busy.cancel();
604 if (endpoint!=null)
605 endpoint.close();
606 }
607 }
608 else
609 _busyKeyCount=0;
610 _busyKey=busy;
611 }
612 }
613 else
614 {
615 selector.selectNow();
616 _selects++;
617 }
618
619
620 if (_selector==null || !selector.isOpen())
621 return;
622
623
624 Iterator iter = selector.selectedKeys().iterator();
625 while (iter.hasNext())
626 {
627 key = (SelectionKey) iter.next();
628
629 try
630 {
631 if (!key.isValid())
632 {
633 key.cancel();
634 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
635 if (endpoint != null)
636 endpoint.doUpdateKey();
637 continue;
638 }
639
640 Object att = key.attachment();
641
642 if (att instanceof SelectChannelEndPoint)
643 {
644 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
645 endpoint.dispatch();
646 }
647 else if (key.isAcceptable())
648 {
649 SocketChannel channel = acceptChannel(key);
650 if (channel==null)
651 continue;
652
653 channel.configureBlocking(false);
654
655
656 _nextSet=++_nextSet%_selectSet.length;
657
658
659 if (_nextSet==_setID)
660 {
661
662 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
663 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
664 cKey.attach(endpoint);
665 if (endpoint != null)
666 endpoint.dispatch();
667 }
668 else
669 {
670
671 _selectSet[_nextSet].addChange(channel);
672 _selectSet[_nextSet].wakeup();
673 }
674 }
675 else if (key.isConnectable())
676 {
677
678 SocketChannel channel = (SocketChannel)key.channel();
679 boolean connected=false;
680 try
681 {
682 connected=channel.finishConnect();
683 }
684 catch(Exception e)
685 {
686 connectionFailed(channel,e,att);
687 }
688 finally
689 {
690 if (connected)
691 {
692 key.interestOps(SelectionKey.OP_READ);
693 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
694 key.attach(endpoint);
695 endpoint.dispatch();
696 }
697 else
698 {
699 key.cancel();
700 }
701 }
702 }
703 else
704 {
705
706 SocketChannel channel = (SocketChannel)key.channel();
707 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
708 key.attach(endpoint);
709 if (key.isReadable())
710 endpoint.dispatch();
711 }
712 key = null;
713 }
714 catch (CancelledKeyException e)
715 {
716 Log.ignore(e);
717 }
718 catch (Exception e)
719 {
720 if (isRunning())
721 Log.warn(e);
722 else
723 Log.ignore(e);
724
725 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
726 {
727 key.interestOps(0);
728
729 key.cancel();
730 }
731 }
732 }
733
734
735 selector.selectedKeys().clear();
736
737
738 _idleTimeout.tick(now);
739 _retryTimeout.tick(now);
740
741 }
742 catch (CancelledKeyException e)
743 {
744 Log.ignore(e);
745 }
746 finally
747 {
748 _selecting=false;
749 }
750 }
751
752
753 public SelectorManager getManager()
754 {
755 return SelectorManager.this;
756 }
757
758
759 public long getNow()
760 {
761 return _idleTimeout.getNow();
762 }
763
764
765 public void scheduleIdle(Timeout.Task task)
766 {
767 synchronized (this)
768 {
769 if (_idleTimeout.getDuration() <= 0)
770 return;
771
772 task.schedule(_idleTimeout);
773 }
774 }
775
776
777 public void scheduleTimeout(Timeout.Task task, long timeout)
778 {
779 synchronized (this)
780 {
781 _retryTimeout.schedule(task, timeout);
782 }
783 }
784
785
786 public void wakeup()
787 {
788 Selector selector = _selector;
789 if (selector!=null)
790 selector.wakeup();
791 }
792
793
794 Selector getSelector()
795 {
796 return _selector;
797 }
798
799
800 void stop() throws Exception
801 {
802 boolean selecting=true;
803 while(selecting)
804 {
805 wakeup();
806 selecting=_selecting;
807 }
808
809 ArrayList keys=new ArrayList(_selector.keys());
810 Iterator iter =keys.iterator();
811
812 while (iter.hasNext())
813 {
814 SelectionKey key = (SelectionKey)iter.next();
815 if (key==null)
816 continue;
817 Object att=key.attachment();
818 if (att instanceof EndPoint)
819 {
820 EndPoint endpoint = (EndPoint)att;
821 try
822 {
823 endpoint.close();
824 }
825 catch(IOException e)
826 {
827 Log.ignore(e);
828 }
829 }
830 }
831
832 synchronized (this)
833 {
834 selecting=_selecting;
835 while(selecting)
836 {
837 wakeup();
838 selecting=_selecting;
839 }
840
841 _idleTimeout.cancelAll();
842 _retryTimeout.cancelAll();
843 try
844 {
845 if (_selector != null)
846 _selector.close();
847 }
848 catch (IOException e)
849 {
850 Log.ignore(e);
851 }
852 _selector=null;
853 }
854 }
855 }
856
857
858 private static class ChangeSelectableChannel
859 {
860 final SelectableChannel _channel;
861 final Object _attachment;
862
863 public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
864 {
865 super();
866 _channel = channel;
867 _attachment = attachment;
868 }
869 }
870
871
872 private interface ChangeTask
873 {
874 public void run();
875 }
876 }