1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.mortbay.jetty.client;
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import javax.servlet.http.Cookie;
23
24 import org.mortbay.io.Buffer;
25 import org.mortbay.io.ByteArrayBuffer;
26 import org.mortbay.jetty.HttpHeaders;
27 import org.mortbay.jetty.client.security.Authorization;
28 import org.mortbay.jetty.client.security.SecurityListener;
29 import org.mortbay.jetty.servlet.PathMap;
30 import org.mortbay.log.Log;
31
32
33
34
35
36 public class HttpDestination
37 {
38 private ByteArrayBuffer _hostHeader;
39 private final Address _address;
40 private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
41 private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
42 private final HttpClient _client;
43 private final boolean _ssl;
44 private int _maxConnections;
45 private int _pendingConnections=0;
46 private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10,true);
47 private int _newConnection=0;
48 private Address _proxy;
49 private Authorization _proxyAuthentication;
50 private PathMap _authorizations;
51 private List<Cookie> _cookies;
52
53 public void dump() throws IOException
54 {
55 synchronized (this)
56 {
57 System.err.println(this);
58 System.err.println("connections="+_connections.size());
59 System.err.println("idle="+_idle.size());
60 System.err.println("pending="+_pendingConnections);
61 for (HttpConnection c : _connections)
62 {
63 if (!c.isIdle())
64 c.dump();
65 }
66 }
67 }
68
69
70 private LinkedList<HttpExchange> _queue=new LinkedList<HttpExchange>();
71
72
73 HttpDestination(HttpClient pool, Address address, boolean ssl, int maxConnections)
74 {
75 _client=pool;
76 _address=address;
77 _ssl=ssl;
78 _maxConnections=maxConnections;
79 String addressString = address.getHost();
80 if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
81 _hostHeader = new ByteArrayBuffer(addressString);
82 }
83
84
85 public Address getAddress()
86 {
87 return _address;
88 }
89
90
91 public Buffer getHostHeader()
92 {
93 return _hostHeader;
94 }
95
96
97 public HttpClient getHttpClient()
98 {
99 return _client;
100 }
101
102
103 public boolean isSecure()
104 {
105 return _ssl;
106 }
107
108
109 public int getConnections()
110 {
111 synchronized (this)
112 {
113 return _connections.size();
114 }
115 }
116
117
118 public int getIdleConnections()
119 {
120 synchronized (this)
121 {
122 return _idle.size();
123 }
124 }
125
126
127 public void addAuthorization(String pathSpec,Authorization authorization)
128 {
129 synchronized (this)
130 {
131 if (_authorizations==null)
132 _authorizations=new PathMap();
133 _authorizations.put(pathSpec,authorization);
134 }
135
136
137 }
138
139
140 public void addCookie(Cookie cookie)
141 {
142 synchronized (this)
143 {
144 if (_cookies==null)
145 _cookies=new ArrayList<Cookie>();
146 _cookies.add(cookie);
147 }
148
149
150 }
151
152
153
154
155
156
157
158
159
160
161 private HttpConnection getConnection(long timeout) throws IOException
162 {
163 HttpConnection connection = null;
164
165 while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
166 {
167 int totalConnections = 0;
168 boolean starting = false;
169 synchronized (this)
170 {
171 totalConnections = _connections.size() + _pendingConnections;
172 if (totalConnections < _maxConnections)
173 {
174 _newConnection++;
175 startNewConnection();
176 starting = true;
177 }
178 }
179
180 if (!starting)
181 {
182 try
183 {
184 Thread.currentThread().sleep(200);
185 timeout-=200;
186 }
187 catch (InterruptedException e)
188 {
189 Log.ignore(e);
190 }
191 }
192 else
193 {
194 try
195 {
196 Object o = _newQueue.take();
197 if (o instanceof HttpConnection)
198 {
199 connection = (HttpConnection)o;
200 }
201 else
202 throw (IOException)o;
203 }
204 catch (InterruptedException e)
205 {
206 Log.ignore(e);
207 }
208 }
209 }
210 return connection;
211 }
212
213
214 public HttpConnection reserveConnection(long timeout) throws IOException
215 {
216 HttpConnection connection = getConnection(timeout);
217 if (connection != null)
218 connection.setReserved(true);
219 return connection;
220 }
221
222
223 public HttpConnection getIdleConnection() throws IOException
224 {
225 long now = _client.getNow();
226 long idleTimeout=_client.getIdleTimeout();
227 HttpConnection connection = null;
228 while (true)
229 {
230 synchronized (this)
231 {
232 if (connection!=null)
233 {
234 _connections.remove(connection);
235 connection.close();
236 connection=null;
237 }
238 if (_idle.size() > 0)
239 connection = _idle.remove(_idle.size()-1);
240 }
241
242 if (connection==null)
243 return null;
244
245 if (connection.cancelIdleTimeout() )
246 return connection;
247
248 }
249 }
250
251
252 protected void startNewConnection()
253 {
254 try
255 {
256 synchronized (this)
257 {
258 _pendingConnections++;
259 }
260 _client._connector.startConnection(this);
261 }
262 catch(Exception e)
263 {
264 Log.debug(e);
265 onConnectionFailed(e);
266 }
267 }
268
269
270 public void onConnectionFailed(Throwable throwable)
271 {
272 Throwable connect_failure=null;
273
274 synchronized (this)
275 {
276 _pendingConnections--;
277 if (_newConnection>0)
278 {
279 connect_failure=throwable;
280 _newConnection--;
281 }
282 else if (_queue.size()>0)
283 {
284 HttpExchange ex=_queue.removeFirst();
285 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
286 ex.getEventListener().onConnectionFailed(throwable);
287 }
288 }
289
290 if(connect_failure!=null)
291 {
292 try
293 {
294 _newQueue.put(connect_failure);
295 }
296 catch (InterruptedException e)
297 {
298 Log.ignore(e);
299 }
300 }
301 }
302
303
304 public void onException(Throwable throwable)
305 {
306 synchronized (this)
307 {
308 _pendingConnections--;
309 if (_queue.size()>0)
310 {
311 HttpExchange ex=_queue.removeFirst();
312 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
313 ex.getEventListener().onException(throwable);
314 }
315 }
316 }
317
318
319 public void onNewConnection(HttpConnection connection) throws IOException
320 {
321 HttpConnection q_connection=null;
322
323 synchronized (this)
324 {
325 _pendingConnections--;
326 _connections.add(connection);
327
328 if (_newConnection>0)
329 {
330 q_connection=connection;
331 _newConnection--;
332 }
333 else if (_queue.size()==0)
334 {
335 _idle.add(connection);
336 }
337 else
338 {
339 HttpExchange ex=_queue.removeFirst();
340 connection.send(ex);
341 }
342 }
343
344 if (q_connection!=null)
345 {
346 try
347 {
348 _newQueue.put(q_connection);
349 }
350 catch (InterruptedException e)
351 {
352 Log.ignore(e);
353 }
354 }
355 }
356
357
358 public void returnConnection(HttpConnection connection, boolean close) throws IOException
359 {
360 if (connection.isReserved())
361 connection.setReserved(false);
362
363 if (close)
364 {
365 try
366 {
367 connection.close();
368 }
369 catch(IOException e)
370 {
371 Log.ignore(e);
372 }
373 }
374
375 if (!_client.isStarted())
376 return;
377
378 if (!close && connection.getEndPoint().isOpen())
379 {
380 synchronized (this)
381 {
382 if (_queue.size()==0)
383 {
384 connection.setIdleTimeout(_client.getNow()+_client.getIdleTimeout());
385 _idle.add(connection);
386 }
387 else
388 {
389 HttpExchange ex = _queue.removeFirst();
390 connection.send(ex);
391 }
392 this.notifyAll();
393 }
394 }
395 else
396 {
397 synchronized (this)
398 {
399 _connections.remove(connection);
400 if (!_queue.isEmpty())
401 startNewConnection();
402 }
403 }
404 }
405
406
407 public void returnIdleConnection(HttpConnection connection) throws IOException
408 {
409 try
410 {
411 connection.close();
412 }
413 catch (IOException e)
414 {
415 Log.ignore(e);
416 }
417
418 synchronized (this)
419 {
420 _idle.remove(connection);
421 _connections.remove(connection);
422 if (!_queue.isEmpty() && _client.isStarted())
423 startNewConnection();
424 }
425 }
426
427
428
429 public void send(HttpExchange ex) throws IOException
430 {
431 LinkedList<String> listeners = _client.getRegisteredListeners();
432
433 if (listeners != null)
434 {
435
436 for (int i = listeners.size(); i > 0; --i)
437 {
438 String listenerClass = listeners.get(i - 1);
439
440 try
441 {
442 Class listener = Class.forName(listenerClass);
443 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
444 HttpEventListener elistener = (HttpEventListener) constructor.newInstance(this, ex);
445 ex.setEventListener(elistener);
446 }
447 catch (Exception e)
448 {
449 Log.debug(e);
450 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass );
451 }
452 }
453 }
454
455
456 if ( _client.hasRealms() )
457 {
458 ex.setEventListener( new SecurityListener( this, ex ) );
459 }
460
461 doSend(ex);
462 }
463
464
465 public void resend(HttpExchange ex) throws IOException
466 {
467 ex.getEventListener().onRetry();
468 ex.reset();
469 doSend(ex);
470 }
471
472
473 protected void doSend(HttpExchange ex) throws IOException
474 {
475
476
477 if (_cookies!=null)
478 {
479 StringBuilder buf=null;
480 for (Cookie cookie : _cookies)
481 {
482 if (buf==null)
483 buf=new StringBuilder();
484 else
485 buf.append("; ");
486 buf.append(cookie.getName());
487 buf.append("=");
488 buf.append(cookie.getValue());
489 }
490 if (buf!=null)
491 ex.addRequestHeader(HttpHeaders.COOKIE,buf.toString());
492 }
493
494
495 if (_authorizations!=null)
496 {
497 Authorization auth= (Authorization)_authorizations.match(ex.getURI());
498 if (auth !=null)
499 ((Authorization)auth).setCredentials(ex);
500 }
501
502 HttpConnection connection = getIdleConnection();
503 if (connection != null)
504 {
505 boolean sent = connection.send(ex);
506 if (!sent) connection = null;
507 }
508
509 if (connection == null)
510 {
511 synchronized (this)
512 {
513 _queue.add(ex);
514 if (_connections.size() + _pendingConnections < _maxConnections)
515 {
516 startNewConnection();
517 }
518 }
519 }
520 }
521
522
523 public synchronized String toString()
524 {
525 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
526 }
527
528
529 public synchronized String toDetailString()
530 {
531 StringBuilder b = new StringBuilder();
532 b.append(toString());
533 b.append('\n');
534 synchronized(this)
535 {
536 for (HttpConnection connection : _connections)
537 {
538 if (connection._exchange!=null)
539 {
540 b.append(connection.toDetailString());
541 if (_idle.contains(connection))
542 b.append(" IDLE");
543 b.append('\n');
544 }
545 }
546 }
547 b.append("--");
548 b.append('\n');
549
550 return b.toString();
551 }
552
553
554 public void setProxy(Address proxy)
555 {
556 _proxy=proxy;
557 }
558
559
560 public Address getProxy()
561 {
562 return _proxy;
563 }
564
565
566 public Authorization getProxyAuthentication()
567 {
568 return _proxyAuthentication;
569 }
570
571
572 public void setProxyAuthentication(Authorization authentication)
573 {
574 _proxyAuthentication = authentication;
575 }
576
577
578 public boolean isProxied()
579 {
580 return _proxy!=null;
581 }
582
583
584 public void close() throws IOException
585 {
586 synchronized (this)
587 {
588 for (HttpConnection connection : _connections)
589 {
590 connection.close();
591 }
592 }
593 }
594
595 }