1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.jetty.client;
16
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20 import java.util.concurrent.atomic.AtomicBoolean;
21
22 import org.mortbay.io.Buffer;
23 import org.mortbay.io.Buffers;
24 import org.mortbay.io.ByteArrayBuffer;
25 import org.mortbay.io.Connection;
26 import org.mortbay.io.EndPoint;
27 import org.mortbay.io.nio.SelectChannelEndPoint;
28 import org.mortbay.jetty.HttpGenerator;
29 import org.mortbay.jetty.HttpHeaderValues;
30 import org.mortbay.jetty.HttpHeaders;
31 import org.mortbay.jetty.HttpParser;
32 import org.mortbay.jetty.HttpSchemes;
33 import org.mortbay.jetty.HttpVersions;
34 import org.mortbay.jetty.client.security.Authorization;
35 import org.mortbay.jetty.security.SslHttpChannelEndPoint;
36 import org.mortbay.log.Log;
37 import org.mortbay.thread.Timeout;
38
39
40
41
42
43
44 public class HttpConnection implements Connection
45 {
46 HttpDestination _destination;
47 EndPoint _endp;
48 HttpGenerator _generator;
49 HttpParser _parser;
50 boolean _http11 = true;
51 Buffer _connectionHeader;
52 Buffer _requestContentChunk;
53 boolean _requestComplete;
54 public String _message;
55 public boolean _reserved;
56
57 volatile HttpExchange _exchange;
58 HttpExchange _pipeline;
59 private final Timeout.Task _timeout = new TimeoutTask();
60 private AtomicBoolean _idle = new AtomicBoolean(false);
61
62 public void dump() throws IOException
63 {
64 System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
65 System.err.println("generator=" + _generator);
66 System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
67 System.err.println("exchange=" + _exchange);
68 if (_endp instanceof SslHttpChannelEndPoint)
69 ((SslHttpChannelEndPoint)_endp).dump();
70 }
71
72
73 HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
74 {
75 _endp = endp;
76 _generator = new HttpGenerator(buffers,endp,hbs,cbs);
77 _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
78 }
79
80 public void setReserved (boolean reserved)
81 {
82 _reserved = reserved;
83 }
84
85 public boolean isReserved()
86 {
87 return _reserved;
88 }
89
90
91 public HttpDestination getDestination()
92 {
93 return _destination;
94 }
95
96
97 public void setDestination(HttpDestination destination)
98 {
99 _destination = destination;
100 }
101
102
103 public boolean send(HttpExchange ex) throws IOException
104 {
105
106
107
108 synchronized (this)
109 {
110 if (_exchange != null)
111 {
112 if (_pipeline != null)
113 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
114 _pipeline = ex;
115 return true;
116 }
117
118 if (!_endp.isOpen())
119 return false;
120
121 _exchange = ex;
122 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
123
124 if (_endp.isBlocking())
125 {
126 this.notify();
127 }
128 else
129 {
130 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
131 scep.scheduleWrite();
132 }
133 _destination.getHttpClient().schedule(_timeout);
134
135 return true;
136 }
137 }
138
139
140 public void handle() throws IOException
141 {
142 int no_progress = 0;
143 long flushed = 0;
144
145 boolean failed = false;
146 while (_endp.isBufferingInput() || _endp.isOpen())
147 {
148 synchronized (this)
149 {
150 while (_exchange == null)
151 {
152 if (_endp.isBlocking())
153 {
154 try
155 {
156 this.wait();
157 }
158 catch (InterruptedException e)
159 {
160 throw new InterruptedIOException();
161 }
162 }
163 else
164 {
165
166 _parser.fill();
167 _parser.skipCRLF();
168 if (_parser.isMoreInBuffer())
169 {
170 Log.warn("Unexpected data received but no request sent");
171 close();
172 }
173 return;
174 }
175 }
176 }
177 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
178 {
179 no_progress = 0;
180 commitRequest();
181 }
182
183 try
184 {
185 long io = 0;
186 _endp.flush();
187
188 if (_generator.isComplete())
189 {
190 if (!_requestComplete)
191 {
192 _requestComplete = true;
193 _exchange.getEventListener().onRequestComplete();
194 }
195 }
196 else
197 {
198
199 synchronized (this)
200 {
201 if (_exchange == null)
202 continue;
203 flushed = _generator.flush();
204 io += flushed;
205 }
206
207 if (!_generator.isComplete())
208 {
209 InputStream in = _exchange.getRequestContentSource();
210 if (in != null)
211 {
212 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
213 {
214 _requestContentChunk = _exchange.getRequestContentChunk();
215 if (_requestContentChunk != null)
216 _generator.addContent(_requestContentChunk,false);
217 else
218 _generator.complete();
219 io += _generator.flush();
220 }
221 }
222 else
223 _generator.complete();
224 }
225 }
226
227 if (_generator.isComplete() && !_requestComplete)
228 {
229 _requestComplete = true;
230 _exchange.getEventListener().onRequestComplete();
231 }
232
233
234 if (!_parser.isComplete() && _generator.isCommitted())
235 {
236 long filled = _parser.parseAvailable();
237 io += filled;
238 }
239
240 if (io > 0)
241 no_progress = 0;
242 else if (no_progress++ >= 2 && !_endp.isBlocking())
243 {
244
245 if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
246 {
247 if (_generator.flush()>0)
248 continue;
249 }
250 return;
251 }
252 }
253 catch (Throwable e)
254 {
255 Log.debug("Failure on " + _exchange, e);
256
257 if (e instanceof ThreadDeath)
258 throw (ThreadDeath)e;
259
260 synchronized (this)
261 {
262 if (_exchange != null)
263 {
264 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
265 _exchange.getEventListener().onException(e);
266 }
267 }
268
269 failed = true;
270 if (e instanceof IOException)
271 throw (IOException)e;
272
273 if (e instanceof Error)
274 throw (Error)e;
275
276 if (e instanceof RuntimeException)
277 throw (RuntimeException)e;
278
279 throw new RuntimeException(e);
280 }
281 finally
282 {
283 boolean complete = false;
284 boolean close = failed;
285 if (!failed)
286 {
287
288 if (_generator.isComplete())
289 {
290 if (!_requestComplete)
291 {
292 _requestComplete = true;
293 _exchange.getEventListener().onRequestComplete();
294 }
295
296
297
298 if (_parser.isComplete())
299 {
300 _destination.getHttpClient().cancel(_timeout);
301 complete = true;
302 }
303 }
304 }
305
306 if (complete || failed)
307 {
308 synchronized (this)
309 {
310 if (!close)
311 close = shouldClose();
312
313 reset(true);
314
315 no_progress = 0;
316 if (_exchange != null)
317 {
318 _exchange = null;
319
320 if (_pipeline == null)
321 {
322 if (!isReserved())
323 _destination.returnConnection(this,close);
324 }
325 else
326 {
327 if (close)
328 {
329 if (!isReserved())
330 _destination.returnConnection(this,close);
331
332 HttpExchange exchange = _pipeline;
333 _pipeline = null;
334 _destination.send(exchange);
335 }
336 else
337 {
338 HttpExchange exchange = _pipeline;
339 _pipeline = null;
340 send(exchange);
341 }
342 }
343 }
344 }
345 }
346 }
347 }
348 }
349
350
351 public boolean isIdle()
352 {
353 synchronized (this)
354 {
355 return _exchange == null;
356 }
357 }
358
359
360 public EndPoint getEndPoint()
361 {
362 return _endp;
363 }
364
365
366 private void commitRequest() throws IOException
367 {
368 synchronized (this)
369 {
370 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
371 throw new IllegalStateException();
372
373 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
374 _generator.setVersion(_exchange._version);
375
376 String uri = _exchange._uri;
377 if (_destination.isProxied() && uri.startsWith("/"))
378 {
379
380 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
381 + _destination.getAddress().getPort() + uri;
382 Authorization auth = _destination.getProxyAuthentication();
383 if (auth != null)
384 auth.setCredentials(_exchange);
385 }
386
387 _generator.setRequest(_exchange._method,uri);
388
389 if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
390 {
391 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
392 _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
393 }
394
395 if (_exchange._requestContent != null)
396 {
397 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
398 _generator.completeHeader(_exchange._requestFields,false);
399 _generator.addContent(_exchange._requestContent,true);
400 }
401 else if (_exchange._requestContentSource != null)
402 {
403 _generator.completeHeader(_exchange._requestFields,false);
404 int available = _exchange._requestContentSource.available();
405 if (available > 0)
406 {
407
408
409
410 byte[] buf = new byte[available];
411 int length = _exchange._requestContentSource.read(buf);
412 _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
413 }
414 }
415 else
416 {
417 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH);
418 _generator.completeHeader(_exchange._requestFields,true);
419 }
420
421 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
422 }
423 }
424
425
426 protected void reset(boolean returnBuffers) throws IOException
427 {
428 _requestComplete = false;
429 _connectionHeader = null;
430 _parser.reset(returnBuffers);
431 _generator.reset(returnBuffers);
432 _http11 = true;
433 }
434
435
436 private boolean shouldClose()
437 {
438 if (_connectionHeader!=null)
439 {
440 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
441 return true;
442 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
443 return false;
444 }
445 return !_http11;
446 }
447
448
449 private class Handler extends HttpParser.EventHandler
450 {
451 @Override
452 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
453 {
454
455
456
457
458
459 }
460
461 @Override
462 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
463 {
464 HttpExchange exchange = _exchange;
465 if (exchange!=null)
466 {
467 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
468 exchange.getEventListener().onResponseStatus(version,status,reason);
469 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
470 }
471 }
472
473 @Override
474 public void parsedHeader(Buffer name, Buffer value) throws IOException
475 {
476 HttpExchange exchange = _exchange;
477 if (exchange!=null)
478 {
479 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
480 {
481 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
482 }
483 exchange.getEventListener().onResponseHeader(name,value);
484 }
485 }
486
487 @Override
488 public void headerComplete() throws IOException
489 {
490 HttpExchange exchange = _exchange;
491 if (exchange!=null)
492 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
493 }
494
495 @Override
496 public void content(Buffer ref) throws IOException
497 {
498 HttpExchange exchange = _exchange;
499 if (exchange!=null)
500 exchange.getEventListener().onResponseContent(ref);
501 }
502
503 @Override
504 public void messageComplete(long contextLength) throws IOException
505 {
506 HttpExchange exchange = _exchange;
507 if (exchange!=null)
508 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
509 }
510 }
511
512
513 public String toString()
514 {
515 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
516 }
517
518
519 public String toDetailString()
520 {
521 return toString() + " ex=" + _exchange + " " + _timeout.getAge();
522 }
523
524
525 public void close() throws IOException
526 {
527 _endp.close();
528 }
529
530
531
532 public void setIdleTimeout(long expire)
533 {
534 synchronized (this)
535 {
536 if (_idle.compareAndSet(false,true))
537 _destination.getHttpClient().scheduleIdle(_timeout);
538 else
539 throw new IllegalStateException();
540 }
541 }
542
543
544 public boolean cancelIdleTimeout()
545 {
546 synchronized (this)
547 {
548 if (_idle.compareAndSet(true,false))
549 {
550 _destination.getHttpClient().cancel(_timeout);
551 return true;
552 }
553 }
554
555 return false;
556 }
557
558
559
560
561 private class TimeoutTask extends Timeout.Task
562 {
563 public void expired()
564 {
565 HttpExchange ex=null;
566 try
567 {
568 synchronized (HttpConnection.this)
569 {
570 ex = _exchange;
571 _exchange = null;
572 if (ex != null)
573 {
574 _destination.returnConnection(HttpConnection.this,true);
575 }
576 else if (_idle.compareAndSet(true,false))
577 {
578 _destination.returnIdleConnection(HttpConnection.this);
579 }
580 }
581 }
582 catch (Exception e)
583 {
584 Log.debug(e);
585 }
586 finally
587 {
588 try
589 {
590 close();
591 }
592 catch (IOException e)
593 {
594 Log.ignore(e);
595 }
596
597 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
598 {
599 ex.setStatus(HttpExchange.STATUS_EXPIRED);
600 }
601 }
602 }
603 }
604
605 }