View Javadoc

1   // ========================================================================
2   // Copyright 2006-2007 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.jetty.client;
16  
17  import java.io.IOException;
18  import java.io.InputStream;
19  import java.io.InterruptedIOException;
20  import java.util.concurrent.atomic.AtomicBoolean;
21  
22  import org.mortbay.io.Buffer;
23  import org.mortbay.io.Buffers;
24  import org.mortbay.io.ByteArrayBuffer;
25  import org.mortbay.io.Connection;
26  import org.mortbay.io.EndPoint;
27  import org.mortbay.io.nio.SelectChannelEndPoint;
28  import org.mortbay.jetty.HttpGenerator;
29  import org.mortbay.jetty.HttpHeaderValues;
30  import org.mortbay.jetty.HttpHeaders;
31  import org.mortbay.jetty.HttpParser;
32  import org.mortbay.jetty.HttpSchemes;
33  import org.mortbay.jetty.HttpVersions;
34  import org.mortbay.jetty.client.security.Authorization;
35  import org.mortbay.jetty.security.SslHttpChannelEndPoint;
36  import org.mortbay.log.Log;
37  import org.mortbay.thread.Timeout;
38  
39  /**
40   *
41   * @author Greg Wilkins
42   * @author Guillaume Nodet
43   */
44  public class HttpConnection implements Connection
45  {
46      HttpDestination _destination;
47      EndPoint _endp;
48      HttpGenerator _generator;
49      HttpParser _parser;
50      boolean _http11 = true;
51      Buffer _connectionHeader;
52      Buffer _requestContentChunk;
53      boolean _requestComplete;
54      public String _message;
55      public boolean _reserved;
56      // The current exchange waiting for a response
57      volatile HttpExchange _exchange;
58      HttpExchange _pipeline;
59      private final Timeout.Task _timeout = new TimeoutTask();
60      private AtomicBoolean _idle = new AtomicBoolean(false);
61  
62      public void dump() throws IOException
63      {
64          System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
65          System.err.println("generator=" + _generator);
66          System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
67          System.err.println("exchange=" + _exchange);
68          if (_endp instanceof SslHttpChannelEndPoint)
69              ((SslHttpChannelEndPoint)_endp).dump();
70      }
71  
72      /* ------------------------------------------------------------ */
73      HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
74      {
75          _endp = endp;
76          _generator = new HttpGenerator(buffers,endp,hbs,cbs);
77          _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
78      }
79  
80      public void setReserved (boolean reserved)
81      {
82          _reserved = reserved;
83      }
84  
85      public boolean isReserved()
86      {
87          return _reserved;
88      }
89  
90      /* ------------------------------------------------------------ */
91      public HttpDestination getDestination()
92      {
93          return _destination;
94      }
95  
96      /* ------------------------------------------------------------ */
97      public void setDestination(HttpDestination destination)
98      {
99          _destination = destination;
100     }
101 
102     /* ------------------------------------------------------------ */
103     public boolean send(HttpExchange ex) throws IOException
104     {
105         // _message =
106         // Thread.currentThread().getName()+": Generator instance="+_generator
107         // .hashCode()+" state= "+_generator.getState()+" _exchange="+_exchange;
108         synchronized (this)
109         {
110             if (_exchange != null)
111             {
112                 if (_pipeline != null)
113                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
114                 _pipeline = ex;
115                 return true;
116             }
117 
118             if (!_endp.isOpen())
119                 return false;
120 
121             _exchange = ex;
122             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
123 
124             if (_endp.isBlocking())
125             {
126                 this.notify();
127             }
128             else
129             {
130                 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
131                 scep.scheduleWrite();
132             }
133             _destination.getHttpClient().schedule(_timeout);
134 
135             return true;
136         }
137     }
138 
139     /* ------------------------------------------------------------ */
140     public void handle() throws IOException
141     {
142         int no_progress = 0;
143         long flushed = 0;
144 
145         boolean failed = false;
146         while (_endp.isBufferingInput() || _endp.isOpen())
147         {
148             synchronized (this)
149             {
150                 while (_exchange == null)
151                 {
152                     if (_endp.isBlocking())
153                     {
154                         try
155                         {
156                             this.wait();
157                         }
158                         catch (InterruptedException e)
159                         {
160                             throw new InterruptedIOException();
161                         }
162                     }
163                     else
164                     {
165                         // Hopefully just space?
166                         _parser.fill();
167                         _parser.skipCRLF();
168                         if (_parser.isMoreInBuffer())
169                         {
170                             Log.warn("Unexpected data received but no request sent");
171                             close();
172                         }
173                         return;
174                     }
175                 }
176             }
177             if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
178             {
179                 no_progress = 0;
180                 commitRequest();
181             }
182 
183             try
184             {
185                 long io = 0;
186                 _endp.flush();
187 
188                 if (_generator.isComplete())
189                 {
190                     if (!_requestComplete)
191                     {
192                         _requestComplete = true;
193                         _exchange.getEventListener().onRequestComplete();
194                     }
195                 }
196                 else
197                 {
198                     // Write as much of the request as possible
199                     synchronized (this)
200                     {
201                         if (_exchange == null)
202                             continue;
203                         flushed = _generator.flush();
204                         io += flushed;
205                     }
206 
207                     if (!_generator.isComplete())
208                     {
209                         InputStream in = _exchange.getRequestContentSource();
210                         if (in != null)
211                         {
212                             if (_requestContentChunk == null || _requestContentChunk.length() == 0)
213                             {
214                                 _requestContentChunk = _exchange.getRequestContentChunk();
215                                 if (_requestContentChunk != null)
216                                     _generator.addContent(_requestContentChunk,false);
217                                 else
218                                     _generator.complete();
219                                 io += _generator.flush();
220                             }
221                         }
222                         else
223                             _generator.complete();
224                     }
225                 }
226 
227                 if (_generator.isComplete() && !_requestComplete)
228                 {
229                     _requestComplete = true;
230                     _exchange.getEventListener().onRequestComplete();
231                 }
232 
233                 // If we are not ended then parse available
234                 if (!_parser.isComplete() && _generator.isCommitted())
235                 {
236                     long filled = _parser.parseAvailable();
237                     io += filled;
238                 }
239 
240                 if (io > 0)
241                     no_progress = 0;
242                 else if (no_progress++ >= 2 && !_endp.isBlocking())
243                 {
244                     // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
245                     if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
246                     {
247                         if (_generator.flush()>0)
248                             continue;
249                     }
250                     return;
251                 }
252             }
253             catch (Throwable e)
254             {
255                 Log.debug("Failure on " + _exchange, e);
256 
257                 if (e instanceof ThreadDeath)
258                     throw (ThreadDeath)e;
259 
260                 synchronized (this)
261                 {
262                     if (_exchange != null)
263                     {
264                         _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
265                         _exchange.getEventListener().onException(e);
266                     }
267                 }
268 
269                 failed = true;
270                 if (e instanceof IOException)
271                     throw (IOException)e;
272 
273                 if (e instanceof Error)
274                     throw (Error)e;
275 
276                 if (e instanceof RuntimeException)
277                     throw (RuntimeException)e;
278 
279                throw new RuntimeException(e);
280             }
281             finally
282             {
283                 boolean complete = false;
284                 boolean close = failed; // always close the connection on error
285                 if (!failed)
286                 {
287                     // are we complete?
288                     if (_generator.isComplete())
289                     {
290                         if (!_requestComplete)
291                         {
292                             _requestComplete = true;
293                             _exchange.getEventListener().onRequestComplete();
294                         }
295 
296                         // we need to return the HttpConnection to a state that
297                         // it can be reused or closed out
298                         if (_parser.isComplete())
299                         {
300                             _destination.getHttpClient().cancel(_timeout);
301                             complete = true;
302                         }
303                     }
304                 }
305 
306                 if (complete || failed)
307                 {
308                     synchronized (this)
309                     {
310                         if (!close)
311                             close = shouldClose();
312 
313                         reset(true);
314 
315                         no_progress = 0;
316                         if (_exchange != null)
317                         {
318                             _exchange = null;
319 
320                             if (_pipeline == null)
321                             {
322                                 if (!isReserved())
323                                     _destination.returnConnection(this,close);
324                             }
325                             else
326                             {
327                                 if (close)
328                                 {
329                                     if (!isReserved())
330                                         _destination.returnConnection(this,close);
331 
332                                     HttpExchange exchange = _pipeline;
333                                     _pipeline = null;
334                                     _destination.send(exchange);
335                                 }
336                                 else
337                                 {
338                                     HttpExchange exchange = _pipeline;
339                                     _pipeline = null;
340                                     send(exchange);
341                                 }
342                             }
343                         }
344                     }
345                 }
346             }
347         }
348     }
349 
350     /* ------------------------------------------------------------ */
351     public boolean isIdle()
352     {
353         synchronized (this)
354         {
355             return _exchange == null;
356         }
357     }
358 
359     /* ------------------------------------------------------------ */
360     public EndPoint getEndPoint()
361     {
362         return _endp;
363     }
364 
365     /* ------------------------------------------------------------ */
366     private void commitRequest() throws IOException
367     {
368         synchronized (this)
369         {
370             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
371                 throw new IllegalStateException();
372 
373             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
374             _generator.setVersion(_exchange._version);
375 
376             String uri = _exchange._uri;
377             if (_destination.isProxied() && uri.startsWith("/"))
378             {
379                 // TODO suppress port 80 or 443
380                 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
381                         + _destination.getAddress().getPort() + uri;
382                 Authorization auth = _destination.getProxyAuthentication();
383                 if (auth != null)
384                     auth.setCredentials(_exchange);
385             }
386 
387             _generator.setRequest(_exchange._method,uri);
388 
389             if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
390             {
391                 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
392                     _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
393             }
394 
395             if (_exchange._requestContent != null)
396             {
397                 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
398                 _generator.completeHeader(_exchange._requestFields,false);
399                 _generator.addContent(_exchange._requestContent,true);
400             }
401             else if (_exchange._requestContentSource != null)
402             {
403                 _generator.completeHeader(_exchange._requestFields,false);
404                 int available = _exchange._requestContentSource.available();
405                 if (available > 0)
406                 {
407                     // TODO deal with any known content length
408 
409                     // TODO reuse this buffer!
410                     byte[] buf = new byte[available];
411                     int length = _exchange._requestContentSource.read(buf);
412                     _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
413                 }
414             }
415             else
416             {
417                 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO
418                 _generator.completeHeader(_exchange._requestFields,true);
419             }
420 
421             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
422         }
423     }
424 
425     /* ------------------------------------------------------------ */
426     protected void reset(boolean returnBuffers) throws IOException
427     {
428         _requestComplete = false;
429         _connectionHeader = null;
430         _parser.reset(returnBuffers);
431         _generator.reset(returnBuffers);
432         _http11 = true;
433     }
434 
435     /* ------------------------------------------------------------ */
436     private boolean shouldClose()
437     {
438         if (_connectionHeader!=null)
439         {
440             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
441                 return true;
442             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
443                 return false;
444         }
445         return !_http11;
446     }
447 
448     /* ------------------------------------------------------------ */
449     private class Handler extends HttpParser.EventHandler
450     {
451         @Override
452         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
453         {
454             // System.out.println( method.toString() + "///" + url.toString() +
455             // "///" + version.toString() );
456             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
457             // out here
458             // throw new IllegalStateException();
459         }
460 
461         @Override
462         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
463         {
464             HttpExchange exchange = _exchange;
465             if (exchange!=null)
466             {
467                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
468                 exchange.getEventListener().onResponseStatus(version,status,reason);
469                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
470             }
471         }
472 
473         @Override
474         public void parsedHeader(Buffer name, Buffer value) throws IOException
475         {
476             HttpExchange exchange = _exchange;
477             if (exchange!=null)
478             {
479                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
480                 {
481                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
482                 }
483                 exchange.getEventListener().onResponseHeader(name,value);
484             }
485         }
486 
487         @Override
488         public void headerComplete() throws IOException
489         {
490             HttpExchange exchange = _exchange;
491             if (exchange!=null)
492                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
493         }
494 
495         @Override
496         public void content(Buffer ref) throws IOException
497         {
498             HttpExchange exchange = _exchange;
499             if (exchange!=null)
500                 exchange.getEventListener().onResponseContent(ref);
501         }
502 
503         @Override
504         public void messageComplete(long contextLength) throws IOException
505         {
506             HttpExchange exchange = _exchange;
507             if (exchange!=null)
508                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
509         }
510     }
511 
512     /* ------------------------------------------------------------ */
513     public String toString()
514     {
515         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
516     }
517 
518     /* ------------------------------------------------------------ */
519     public String toDetailString()
520     {
521         return toString() + " ex=" + _exchange + " " + _timeout.getAge();
522     }
523 
524     /* ------------------------------------------------------------ */
525     public void close() throws IOException
526     {
527         _endp.close();
528     }
529 
530 
531     /* ------------------------------------------------------------ */
532     public void setIdleTimeout(long expire)
533     {
534         synchronized (this)
535         {
536             if (_idle.compareAndSet(false,true))
537                 _destination.getHttpClient().scheduleIdle(_timeout);
538             else
539                 throw new IllegalStateException();
540         }
541     }
542     
543     /* ------------------------------------------------------------ */
544     public boolean cancelIdleTimeout()
545     {
546         synchronized (this)
547         {
548             if (_idle.compareAndSet(true,false))
549             {
550                 _destination.getHttpClient().cancel(_timeout);
551                 return true;
552             }
553         }
554         
555         return false;
556     }
557 
558     /* ------------------------------------------------------------ */
559     /* ------------------------------------------------------------ */
560     /* ------------------------------------------------------------ */
561     private class TimeoutTask extends Timeout.Task
562     {
563         public void expired()
564         {
565             HttpExchange ex=null;
566             try
567             {
568                 synchronized (HttpConnection.this)
569                 {
570                     ex = _exchange;
571                     _exchange = null;
572                     if (ex != null)
573                     {
574                         _destination.returnConnection(HttpConnection.this,true);
575                     }
576                     else if (_idle.compareAndSet(true,false))
577                     {
578                         _destination.returnIdleConnection(HttpConnection.this);
579                     }
580                 }
581             }
582             catch (Exception e)
583             {
584                 Log.debug(e);
585             }
586             finally
587             {
588                 try
589                 {
590                     close();
591                 }
592                 catch (IOException e)
593                 {
594                     Log.ignore(e);
595                 }
596 
597                 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
598                 {
599                     ex.setStatus(HttpExchange.STATUS_EXPIRED);
600                 }
601             }
602         }
603     }
604 
605 }