1#
2# Copyright 2014 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16"""Client and server implementations of HTTP/1.x.
17
18.. versionadded:: 4.0
19"""
20
21import asyncio
22import logging
23import re
24import types
25
26from tornado.concurrent import (
27 Future,
28 future_add_done_callback,
29 future_set_result_unless_cancelled,
30)
31from tornado.escape import native_str, utf8
32from tornado import gen
33from tornado import httputil
34from tornado import iostream
35from tornado.log import gen_log, app_log
36from tornado.util import GzipDecompressor
37
38
39from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple
40
41CR_OR_LF_RE = re.compile(b"\r|\n")
42
43
44class _QuietException(Exception):
45 def __init__(self) -> None:
46 pass
47
48
49class _ExceptionLoggingContext(object):
50 """Used with the ``with`` statement when calling delegate methods to
51 log any exceptions with the given logger. Any exceptions caught are
52 converted to _QuietException
53 """
54
55 def __init__(self, logger: logging.Logger) -> None:
56 self.logger = logger
57
58 def __enter__(self) -> None:
59 pass
60
61 def __exit__(
62 self,
63 typ: "Optional[Type[BaseException]]",
64 value: Optional[BaseException],
65 tb: types.TracebackType,
66 ) -> None:
67 if value is not None:
68 assert typ is not None
69 self.logger.error("Uncaught exception", exc_info=(typ, value, tb))
70 raise _QuietException
71
72
73class HTTP1ConnectionParameters(object):
74 """Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`."""
75
76 def __init__(
77 self,
78 no_keep_alive: bool = False,
79 chunk_size: Optional[int] = None,
80 max_header_size: Optional[int] = None,
81 header_timeout: Optional[float] = None,
82 max_body_size: Optional[int] = None,
83 body_timeout: Optional[float] = None,
84 decompress: bool = False,
85 ) -> None:
86 """
87 :arg bool no_keep_alive: If true, always close the connection after
88 one request.
89 :arg int chunk_size: how much data to read into memory at once
90 :arg int max_header_size: maximum amount of data for HTTP headers
91 :arg float header_timeout: how long to wait for all headers (seconds)
92 :arg int max_body_size: maximum amount of data for body
93 :arg float body_timeout: how long to wait while reading body (seconds)
94 :arg bool decompress: if true, decode incoming
95 ``Content-Encoding: gzip``
96 """
97 self.no_keep_alive = no_keep_alive
98 self.chunk_size = chunk_size or 65536
99 self.max_header_size = max_header_size or 65536
100 self.header_timeout = header_timeout
101 self.max_body_size = max_body_size
102 self.body_timeout = body_timeout
103 self.decompress = decompress
104
105
106class HTTP1Connection(httputil.HTTPConnection):
107 """Implements the HTTP/1.x protocol.
108
109 This class can be on its own for clients, or via `HTTP1ServerConnection`
110 for servers.
111 """
112
113 def __init__(
114 self,
115 stream: iostream.IOStream,
116 is_client: bool,
117 params: Optional[HTTP1ConnectionParameters] = None,
118 context: Optional[object] = None,
119 ) -> None:
120 """
121 :arg stream: an `.IOStream`
122 :arg bool is_client: client or server
123 :arg params: a `.HTTP1ConnectionParameters` instance or ``None``
124 :arg context: an opaque application-defined object that can be accessed
125 as ``connection.context``.
126 """
127 self.is_client = is_client
128 self.stream = stream
129 if params is None:
130 params = HTTP1ConnectionParameters()
131 self.params = params
132 self.context = context
133 self.no_keep_alive = params.no_keep_alive
134 # The body limits can be altered by the delegate, so save them
135 # here instead of just referencing self.params later.
136 self._max_body_size = (
137 self.params.max_body_size
138 if self.params.max_body_size is not None
139 else self.stream.max_buffer_size
140 )
141 self._body_timeout = self.params.body_timeout
142 # _write_finished is set to True when finish() has been called,
143 # i.e. there will be no more data sent. Data may still be in the
144 # stream's write buffer.
145 self._write_finished = False
146 # True when we have read the entire incoming body.
147 self._read_finished = False
148 # _finish_future resolves when all data has been written and flushed
149 # to the IOStream.
150 self._finish_future = Future() # type: Future[None]
151 # If true, the connection should be closed after this request
152 # (after the response has been written in the server side,
153 # and after it has been read in the client)
154 self._disconnect_on_finish = False
155 self._clear_callbacks()
156 # Save the start lines after we read or write them; they
157 # affect later processing (e.g. 304 responses and HEAD methods
158 # have content-length but no bodies)
159 self._request_start_line = None # type: Optional[httputil.RequestStartLine]
160 self._response_start_line = None # type: Optional[httputil.ResponseStartLine]
161 self._request_headers = None # type: Optional[httputil.HTTPHeaders]
162 # True if we are writing output with chunked encoding.
163 self._chunking_output = False
164 # While reading a body with a content-length, this is the
165 # amount left to read.
166 self._expected_content_remaining = None # type: Optional[int]
167 # A Future for our outgoing writes, returned by IOStream.write.
168 self._pending_write = None # type: Optional[Future[None]]
169
170 def read_response(self, delegate: httputil.HTTPMessageDelegate) -> Awaitable[bool]:
171 """Read a single HTTP response.
172
173 Typical client-mode usage is to write a request using `write_headers`,
174 `write`, and `finish`, and then call ``read_response``.
175
176 :arg delegate: a `.HTTPMessageDelegate`
177
178 Returns a `.Future` that resolves to a bool after the full response has
179 been read. The result is true if the stream is still open.
180 """
181 if self.params.decompress:
182 delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
183 return self._read_message(delegate)
184
185 async def _read_message(self, delegate: httputil.HTTPMessageDelegate) -> bool:
186 need_delegate_close = False
187 try:
188 header_future = self.stream.read_until_regex(
189 b"\r?\n\r?\n", max_bytes=self.params.max_header_size
190 )
191 if self.params.header_timeout is None:
192 header_data = await header_future
193 else:
194 try:
195 header_data = await gen.with_timeout(
196 self.stream.io_loop.time() + self.params.header_timeout,
197 header_future,
198 quiet_exceptions=iostream.StreamClosedError,
199 )
200 except gen.TimeoutError:
201 self.close()
202 return False
203 start_line_str, headers = self._parse_headers(header_data)
204 if self.is_client:
205 resp_start_line = httputil.parse_response_start_line(start_line_str)
206 self._response_start_line = resp_start_line
207 start_line = (
208 resp_start_line
209 ) # type: Union[httputil.RequestStartLine, httputil.ResponseStartLine]
210 # TODO: this will need to change to support client-side keepalive
211 self._disconnect_on_finish = False
212 else:
213 req_start_line = httputil.parse_request_start_line(start_line_str)
214 self._request_start_line = req_start_line
215 self._request_headers = headers
216 start_line = req_start_line
217 self._disconnect_on_finish = not self._can_keep_alive(
218 req_start_line, headers
219 )
220 need_delegate_close = True
221 with _ExceptionLoggingContext(app_log):
222 header_recv_future = delegate.headers_received(start_line, headers)
223 if header_recv_future is not None:
224 await header_recv_future
225 if self.stream is None:
226 # We've been detached.
227 need_delegate_close = False
228 return False
229 skip_body = False
230 if self.is_client:
231 assert isinstance(start_line, httputil.ResponseStartLine)
232 if (
233 self._request_start_line is not None
234 and self._request_start_line.method == "HEAD"
235 ):
236 skip_body = True
237 code = start_line.code
238 if code == 304:
239 # 304 responses may include the content-length header
240 # but do not actually have a body.
241 # http://tools.ietf.org/html/rfc7230#section-3.3
242 skip_body = True
243 if 100 <= code < 200:
244 # 1xx responses should never indicate the presence of
245 # a body.
246 if "Content-Length" in headers or "Transfer-Encoding" in headers:
247 raise httputil.HTTPInputError(
248 "Response code %d cannot have body" % code
249 )
250 # TODO: client delegates will get headers_received twice
251 # in the case of a 100-continue. Document or change?
252 await self._read_message(delegate)
253 else:
254 if headers.get("Expect") == "100-continue" and not self._write_finished:
255 self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
256 if not skip_body:
257 body_future = self._read_body(
258 resp_start_line.code if self.is_client else 0, headers, delegate
259 )
260 if body_future is not None:
261 if self._body_timeout is None:
262 await body_future
263 else:
264 try:
265 await gen.with_timeout(
266 self.stream.io_loop.time() + self._body_timeout,
267 body_future,
268 quiet_exceptions=iostream.StreamClosedError,
269 )
270 except gen.TimeoutError:
271 gen_log.info("Timeout reading body from %s", self.context)
272 self.stream.close()
273 return False
274 self._read_finished = True
275 if not self._write_finished or self.is_client:
276 need_delegate_close = False
277 with _ExceptionLoggingContext(app_log):
278 delegate.finish()
279 # If we're waiting for the application to produce an asynchronous
280 # response, and we're not detached, register a close callback
281 # on the stream (we didn't need one while we were reading)
282 if (
283 not self._finish_future.done()
284 and self.stream is not None
285 and not self.stream.closed()
286 ):
287 self.stream.set_close_callback(self._on_connection_close)
288 await self._finish_future
289 if self.is_client and self._disconnect_on_finish:
290 self.close()
291 if self.stream is None:
292 return False
293 except httputil.HTTPInputError as e:
294 gen_log.info("Malformed HTTP message from %s: %s", self.context, e)
295 if not self.is_client:
296 await self.stream.write(b"HTTP/1.1 400 Bad Request\r\n\r\n")
297 self.close()
298 return False
299 finally:
300 if need_delegate_close:
301 with _ExceptionLoggingContext(app_log):
302 delegate.on_connection_close()
303 header_future = None # type: ignore
304 self._clear_callbacks()
305 return True
306
307 def _clear_callbacks(self) -> None:
308 """Clears the callback attributes.
309
310 This allows the request handler to be garbage collected more
311 quickly in CPython by breaking up reference cycles.
312 """
313 self._write_callback = None
314 self._write_future = None # type: Optional[Future[None]]
315 self._close_callback = None # type: Optional[Callable[[], None]]
316 if self.stream is not None:
317 self.stream.set_close_callback(None)
318
319 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None:
320 """Sets a callback that will be run when the connection is closed.
321
322 Note that this callback is slightly different from
323 `.HTTPMessageDelegate.on_connection_close`: The
324 `.HTTPMessageDelegate` method is called when the connection is
325 closed while receiving a message. This callback is used when
326 there is not an active delegate (for example, on the server
327 side this callback is used if the client closes the connection
328 after sending its request but before receiving all the
329 response.
330 """
331 self._close_callback = callback
332
333 def _on_connection_close(self) -> None:
334 # Note that this callback is only registered on the IOStream
335 # when we have finished reading the request and are waiting for
336 # the application to produce its response.
337 if self._close_callback is not None:
338 callback = self._close_callback
339 self._close_callback = None
340 callback()
341 if not self._finish_future.done():
342 future_set_result_unless_cancelled(self._finish_future, None)
343 self._clear_callbacks()
344
345 def close(self) -> None:
346 if self.stream is not None:
347 self.stream.close()
348 self._clear_callbacks()
349 if not self._finish_future.done():
350 future_set_result_unless_cancelled(self._finish_future, None)
351
352 def detach(self) -> iostream.IOStream:
353 """Take control of the underlying stream.
354
355 Returns the underlying `.IOStream` object and stops all further
356 HTTP processing. May only be called during
357 `.HTTPMessageDelegate.headers_received`. Intended for implementing
358 protocols like websockets that tunnel over an HTTP handshake.
359 """
360 self._clear_callbacks()
361 stream = self.stream
362 self.stream = None # type: ignore
363 if not self._finish_future.done():
364 future_set_result_unless_cancelled(self._finish_future, None)
365 return stream
366
367 def set_body_timeout(self, timeout: float) -> None:
368 """Sets the body timeout for a single request.
369
370 Overrides the value from `.HTTP1ConnectionParameters`.
371 """
372 self._body_timeout = timeout
373
374 def set_max_body_size(self, max_body_size: int) -> None:
375 """Sets the body size limit for a single request.
376
377 Overrides the value from `.HTTP1ConnectionParameters`.
378 """
379 self._max_body_size = max_body_size
380
381 def write_headers(
382 self,
383 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
384 headers: httputil.HTTPHeaders,
385 chunk: Optional[bytes] = None,
386 ) -> "Future[None]":
387 """Implements `.HTTPConnection.write_headers`."""
388 lines = []
389 if self.is_client:
390 assert isinstance(start_line, httputil.RequestStartLine)
391 self._request_start_line = start_line
392 lines.append(utf8("%s %s HTTP/1.1" % (start_line[0], start_line[1])))
393 # Client requests with a non-empty body must have either a
394 # Content-Length or a Transfer-Encoding. If Content-Length is not
395 # present we'll add our Transfer-Encoding below.
396 self._chunking_output = (
397 start_line.method in ("POST", "PUT", "PATCH")
398 and "Content-Length" not in headers
399 )
400 else:
401 assert isinstance(start_line, httputil.ResponseStartLine)
402 assert self._request_start_line is not None
403 assert self._request_headers is not None
404 self._response_start_line = start_line
405 lines.append(utf8("HTTP/1.1 %d %s" % (start_line[1], start_line[2])))
406 self._chunking_output = (
407 # TODO: should this use
408 # self._request_start_line.version or
409 # start_line.version?
410 self._request_start_line.version == "HTTP/1.1"
411 # Omit payload header field for HEAD request.
412 and self._request_start_line.method != "HEAD"
413 # 1xx, 204 and 304 responses have no body (not even a zero-length
414 # body), and so should not have either Content-Length or
415 # Transfer-Encoding headers.
416 and start_line.code not in (204, 304)
417 and (start_line.code < 100 or start_line.code >= 200)
418 # No need to chunk the output if a Content-Length is specified.
419 and "Content-Length" not in headers
420 )
421 # If connection to a 1.1 client will be closed, inform client
422 if (
423 self._request_start_line.version == "HTTP/1.1"
424 and self._disconnect_on_finish
425 ):
426 headers["Connection"] = "close"
427 # If a 1.0 client asked for keep-alive, add the header.
428 if (
429 self._request_start_line.version == "HTTP/1.0"
430 and self._request_headers.get("Connection", "").lower() == "keep-alive"
431 ):
432 headers["Connection"] = "Keep-Alive"
433 if self._chunking_output:
434 headers["Transfer-Encoding"] = "chunked"
435 if not self.is_client and (
436 self._request_start_line.method == "HEAD"
437 or cast(httputil.ResponseStartLine, start_line).code == 304
438 ):
439 self._expected_content_remaining = 0
440 elif "Content-Length" in headers:
441 self._expected_content_remaining = parse_int(headers["Content-Length"])
442 else:
443 self._expected_content_remaining = None
444 # TODO: headers are supposed to be of type str, but we still have some
445 # cases that let bytes slip through. Remove these native_str calls when those
446 # are fixed.
447 header_lines = (
448 native_str(n) + ": " + native_str(v) for n, v in headers.get_all()
449 )
450 lines.extend(line.encode("latin1") for line in header_lines)
451 for line in lines:
452 if CR_OR_LF_RE.search(line):
453 raise ValueError("Illegal characters (CR or LF) in header: %r" % line)
454 future = None
455 if self.stream.closed():
456 future = self._write_future = Future()
457 future.set_exception(iostream.StreamClosedError())
458 future.exception()
459 else:
460 future = self._write_future = Future()
461 data = b"\r\n".join(lines) + b"\r\n\r\n"
462 if chunk:
463 data += self._format_chunk(chunk)
464 self._pending_write = self.stream.write(data)
465 future_add_done_callback(self._pending_write, self._on_write_complete)
466 return future
467
468 def _format_chunk(self, chunk: bytes) -> bytes:
469 if self._expected_content_remaining is not None:
470 self._expected_content_remaining -= len(chunk)
471 if self._expected_content_remaining < 0:
472 # Close the stream now to stop further framing errors.
473 self.stream.close()
474 raise httputil.HTTPOutputError(
475 "Tried to write more data than Content-Length"
476 )
477 if self._chunking_output and chunk:
478 # Don't write out empty chunks because that means END-OF-STREAM
479 # with chunked encoding
480 return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n"
481 else:
482 return chunk
483
484 def write(self, chunk: bytes) -> "Future[None]":
485 """Implements `.HTTPConnection.write`.
486
487 For backwards compatibility it is allowed but deprecated to
488 skip `write_headers` and instead call `write()` with a
489 pre-encoded header block.
490 """
491 future = None
492 if self.stream.closed():
493 future = self._write_future = Future()
494 self._write_future.set_exception(iostream.StreamClosedError())
495 self._write_future.exception()
496 else:
497 future = self._write_future = Future()
498 self._pending_write = self.stream.write(self._format_chunk(chunk))
499 future_add_done_callback(self._pending_write, self._on_write_complete)
500 return future
501
502 def finish(self) -> None:
503 """Implements `.HTTPConnection.finish`."""
504 if (
505 self._expected_content_remaining is not None
506 and self._expected_content_remaining != 0
507 and not self.stream.closed()
508 ):
509 self.stream.close()
510 raise httputil.HTTPOutputError(
511 "Tried to write %d bytes less than Content-Length"
512 % self._expected_content_remaining
513 )
514 if self._chunking_output:
515 if not self.stream.closed():
516 self._pending_write = self.stream.write(b"0\r\n\r\n")
517 self._pending_write.add_done_callback(self._on_write_complete)
518 self._write_finished = True
519 # If the app finished the request while we're still reading,
520 # divert any remaining data away from the delegate and
521 # close the connection when we're done sending our response.
522 # Closing the connection is the only way to avoid reading the
523 # whole input body.
524 if not self._read_finished:
525 self._disconnect_on_finish = True
526 # No more data is coming, so instruct TCP to send any remaining
527 # data immediately instead of waiting for a full packet or ack.
528 self.stream.set_nodelay(True)
529 if self._pending_write is None:
530 self._finish_request(None)
531 else:
532 future_add_done_callback(self._pending_write, self._finish_request)
533
534 def _on_write_complete(self, future: "Future[None]") -> None:
535 exc = future.exception()
536 if exc is not None and not isinstance(exc, iostream.StreamClosedError):
537 future.result()
538 if self._write_callback is not None:
539 callback = self._write_callback
540 self._write_callback = None
541 self.stream.io_loop.add_callback(callback)
542 if self._write_future is not None:
543 future = self._write_future
544 self._write_future = None
545 future_set_result_unless_cancelled(future, None)
546
547 def _can_keep_alive(
548 self, start_line: httputil.RequestStartLine, headers: httputil.HTTPHeaders
549 ) -> bool:
550 if self.params.no_keep_alive:
551 return False
552 connection_header = headers.get("Connection")
553 if connection_header is not None:
554 connection_header = connection_header.lower()
555 if start_line.version == "HTTP/1.1":
556 return connection_header != "close"
557 elif (
558 "Content-Length" in headers
559 or is_transfer_encoding_chunked(headers)
560 or getattr(start_line, "method", None) in ("HEAD", "GET")
561 ):
562 # start_line may be a request or response start line; only
563 # the former has a method attribute.
564 return connection_header == "keep-alive"
565 return False
566
567 def _finish_request(self, future: "Optional[Future[None]]") -> None:
568 self._clear_callbacks()
569 if not self.is_client and self._disconnect_on_finish:
570 self.close()
571 return
572 # Turn Nagle's algorithm back on, leaving the stream in its
573 # default state for the next request.
574 self.stream.set_nodelay(False)
575 if not self._finish_future.done():
576 future_set_result_unless_cancelled(self._finish_future, None)
577
578 def _parse_headers(self, data: bytes) -> Tuple[str, httputil.HTTPHeaders]:
579 # The lstrip removes newlines that some implementations sometimes
580 # insert between messages of a reused connection. Per RFC 7230,
581 # we SHOULD ignore at least one empty line before the request.
582 # http://tools.ietf.org/html/rfc7230#section-3.5
583 data_str = native_str(data.decode("latin1")).lstrip("\r\n")
584 # RFC 7230 section allows for both CRLF and bare LF.
585 eol = data_str.find("\n")
586 start_line = data_str[:eol].rstrip("\r")
587 headers = httputil.HTTPHeaders.parse(data_str[eol:])
588 return start_line, headers
589
590 def _read_body(
591 self,
592 code: int,
593 headers: httputil.HTTPHeaders,
594 delegate: httputil.HTTPMessageDelegate,
595 ) -> Optional[Awaitable[None]]:
596 if "Content-Length" in headers:
597 if "," in headers["Content-Length"]:
598 # Proxies sometimes cause Content-Length headers to get
599 # duplicated. If all the values are identical then we can
600 # use them but if they differ it's an error.
601 pieces = re.split(r",\s*", headers["Content-Length"])
602 if any(i != pieces[0] for i in pieces):
603 raise httputil.HTTPInputError(
604 "Multiple unequal Content-Lengths: %r"
605 % headers["Content-Length"]
606 )
607 headers["Content-Length"] = pieces[0]
608
609 try:
610 content_length: Optional[int] = parse_int(headers["Content-Length"])
611 except ValueError:
612 # Handles non-integer Content-Length value.
613 raise httputil.HTTPInputError(
614 "Only integer Content-Length is allowed: %s"
615 % headers["Content-Length"]
616 )
617
618 if cast(int, content_length) > self._max_body_size:
619 raise httputil.HTTPInputError("Content-Length too long")
620 else:
621 content_length = None
622
623 is_chunked = is_transfer_encoding_chunked(headers)
624
625 if code == 204:
626 # This response code is not allowed to have a non-empty body,
627 # and has an implicit length of zero instead of read-until-close.
628 # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
629 if is_chunked or content_length not in (None, 0):
630 raise httputil.HTTPInputError(
631 "Response with code %d should not have body" % code
632 )
633 content_length = 0
634
635 if is_chunked:
636 return self._read_chunked_body(delegate)
637 if content_length is not None:
638 return self._read_fixed_body(content_length, delegate)
639 if self.is_client:
640 return self._read_body_until_close(delegate)
641 return None
642
643 async def _read_fixed_body(
644 self, content_length: int, delegate: httputil.HTTPMessageDelegate
645 ) -> None:
646 while content_length > 0:
647 body = await self.stream.read_bytes(
648 min(self.params.chunk_size, content_length), partial=True
649 )
650 content_length -= len(body)
651 if not self._write_finished or self.is_client:
652 with _ExceptionLoggingContext(app_log):
653 ret = delegate.data_received(body)
654 if ret is not None:
655 await ret
656
657 async def _read_chunked_body(self, delegate: httputil.HTTPMessageDelegate) -> None:
658 # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
659 total_size = 0
660 while True:
661 chunk_len_str = await self.stream.read_until(b"\r\n", max_bytes=64)
662 try:
663 chunk_len = parse_hex_int(native_str(chunk_len_str[:-2]))
664 except ValueError:
665 raise httputil.HTTPInputError("invalid chunk size")
666 if chunk_len == 0:
667 crlf = await self.stream.read_bytes(2)
668 if crlf != b"\r\n":
669 raise httputil.HTTPInputError(
670 "improperly terminated chunked request"
671 )
672 return
673 total_size += chunk_len
674 if total_size > self._max_body_size:
675 raise httputil.HTTPInputError("chunked body too large")
676 bytes_to_read = chunk_len
677 while bytes_to_read:
678 chunk = await self.stream.read_bytes(
679 min(bytes_to_read, self.params.chunk_size), partial=True
680 )
681 bytes_to_read -= len(chunk)
682 if not self._write_finished or self.is_client:
683 with _ExceptionLoggingContext(app_log):
684 ret = delegate.data_received(chunk)
685 if ret is not None:
686 await ret
687 # chunk ends with \r\n
688 crlf = await self.stream.read_bytes(2)
689 assert crlf == b"\r\n"
690
691 async def _read_body_until_close(
692 self, delegate: httputil.HTTPMessageDelegate
693 ) -> None:
694 body = await self.stream.read_until_close()
695 if not self._write_finished or self.is_client:
696 with _ExceptionLoggingContext(app_log):
697 ret = delegate.data_received(body)
698 if ret is not None:
699 await ret
700
701
702class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
703 """Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``."""
704
705 def __init__(self, delegate: httputil.HTTPMessageDelegate, chunk_size: int) -> None:
706 self._delegate = delegate
707 self._chunk_size = chunk_size
708 self._decompressor = None # type: Optional[GzipDecompressor]
709
710 def headers_received(
711 self,
712 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
713 headers: httputil.HTTPHeaders,
714 ) -> Optional[Awaitable[None]]:
715 if headers.get("Content-Encoding", "").lower() == "gzip":
716 self._decompressor = GzipDecompressor()
717 # Downstream delegates will only see uncompressed data,
718 # so rename the content-encoding header.
719 # (but note that curl_httpclient doesn't do this).
720 headers.add("X-Consumed-Content-Encoding", headers["Content-Encoding"])
721 del headers["Content-Encoding"]
722 return self._delegate.headers_received(start_line, headers)
723
724 async def data_received(self, chunk: bytes) -> None:
725 if self._decompressor:
726 compressed_data = chunk
727 while compressed_data:
728 decompressed = self._decompressor.decompress(
729 compressed_data, self._chunk_size
730 )
731 if decompressed:
732 ret = self._delegate.data_received(decompressed)
733 if ret is not None:
734 await ret
735 compressed_data = self._decompressor.unconsumed_tail
736 if compressed_data and not decompressed:
737 raise httputil.HTTPInputError(
738 "encountered unconsumed gzip data without making progress"
739 )
740 else:
741 ret = self._delegate.data_received(chunk)
742 if ret is not None:
743 await ret
744
745 def finish(self) -> None:
746 if self._decompressor is not None:
747 tail = self._decompressor.flush()
748 if tail:
749 # The tail should always be empty: decompress returned
750 # all that it can in data_received and the only
751 # purpose of the flush call is to detect errors such
752 # as truncated input. If we did legitimately get a new
753 # chunk at this point we'd need to change the
754 # interface to make finish() a coroutine.
755 raise ValueError(
756 "decompressor.flush returned data; possible truncated input"
757 )
758 return self._delegate.finish()
759
760 def on_connection_close(self) -> None:
761 return self._delegate.on_connection_close()
762
763
764class HTTP1ServerConnection(object):
765 """An HTTP/1.x server."""
766
767 def __init__(
768 self,
769 stream: iostream.IOStream,
770 params: Optional[HTTP1ConnectionParameters] = None,
771 context: Optional[object] = None,
772 ) -> None:
773 """
774 :arg stream: an `.IOStream`
775 :arg params: a `.HTTP1ConnectionParameters` or None
776 :arg context: an opaque application-defined object that is accessible
777 as ``connection.context``
778 """
779 self.stream = stream
780 if params is None:
781 params = HTTP1ConnectionParameters()
782 self.params = params
783 self.context = context
784 self._serving_future = None # type: Optional[Future[None]]
785
786 async def close(self) -> None:
787 """Closes the connection.
788
789 Returns a `.Future` that resolves after the serving loop has exited.
790 """
791 self.stream.close()
792 # Block until the serving loop is done, but ignore any exceptions
793 # (start_serving is already responsible for logging them).
794 assert self._serving_future is not None
795 try:
796 await self._serving_future
797 except Exception:
798 pass
799
800 def start_serving(self, delegate: httputil.HTTPServerConnectionDelegate) -> None:
801 """Starts serving requests on this connection.
802
803 :arg delegate: a `.HTTPServerConnectionDelegate`
804 """
805 assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
806 fut = gen.convert_yielded(self._server_request_loop(delegate))
807 self._serving_future = fut
808 # Register the future on the IOLoop so its errors get logged.
809 self.stream.io_loop.add_future(fut, lambda f: f.result())
810
811 async def _server_request_loop(
812 self, delegate: httputil.HTTPServerConnectionDelegate
813 ) -> None:
814 try:
815 while True:
816 conn = HTTP1Connection(self.stream, False, self.params, self.context)
817 request_delegate = delegate.start_request(self, conn)
818 try:
819 ret = await conn.read_response(request_delegate)
820 except (
821 iostream.StreamClosedError,
822 iostream.UnsatisfiableReadError,
823 asyncio.CancelledError,
824 ):
825 return
826 except _QuietException:
827 # This exception was already logged.
828 conn.close()
829 return
830 except Exception:
831 gen_log.error("Uncaught exception", exc_info=True)
832 conn.close()
833 return
834 if not ret:
835 return
836 await asyncio.sleep(0)
837 finally:
838 delegate.on_close(self)
839
840
841DIGITS = re.compile(r"[0-9]+")
842HEXDIGITS = re.compile(r"[0-9a-fA-F]+")
843
844
845def parse_int(s: str) -> int:
846 """Parse a non-negative integer from a string."""
847 if DIGITS.fullmatch(s) is None:
848 raise ValueError("not an integer: %r" % s)
849 return int(s)
850
851
852def parse_hex_int(s: str) -> int:
853 """Parse a non-negative hexadecimal integer from a string."""
854 if HEXDIGITS.fullmatch(s) is None:
855 raise ValueError("not a hexadecimal integer: %r" % s)
856 return int(s, 16)
857
858
859def is_transfer_encoding_chunked(headers: httputil.HTTPHeaders) -> bool:
860 """Returns true if the headers specify Transfer-Encoding: chunked.
861
862 Raise httputil.HTTPInputError if any other transfer encoding is used.
863 """
864 # Note that transfer-encoding is an area in which postel's law can lead
865 # us astray. If a proxy and a backend server are liberal in what they accept,
866 # but accept slightly different things, this can lead to mismatched framing
867 # and request smuggling issues. Therefore we are as strict as possible here
868 # (even technically going beyond the requirements of the RFCs: a value of
869 # ",chunked" is legal but doesn't appear in practice for legitimate traffic)
870 if "Transfer-Encoding" not in headers:
871 return False
872 if "Content-Length" in headers:
873 # Message cannot contain both Content-Length and
874 # Transfer-Encoding headers.
875 # http://tools.ietf.org/html/rfc7230#section-3.3.3
876 raise httputil.HTTPInputError(
877 "Message with both Transfer-Encoding and Content-Length"
878 )
879 if headers["Transfer-Encoding"].lower() == "chunked":
880 return True
881 # We do not support any transfer-encodings other than chunked, and we do not
882 # expect to add any support because the concept of transfer-encoding has
883 # been removed in HTTP/2.
884 raise httputil.HTTPInputError(
885 "Unsupported Transfer-Encoding %s" % headers["Transfer-Encoding"]
886 )