Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/http_parser.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import asyncio
3import re
4import string
5from contextlib import suppress
6from enum import IntEnum
7from typing import (
8 Any,
9 ClassVar,
10 Final,
11 Generic,
12 List,
13 Literal,
14 NamedTuple,
15 Optional,
16 Pattern,
17 Set,
18 Tuple,
19 Type,
20 TypeVar,
21 Union,
22)
24from multidict import CIMultiDict, CIMultiDictProxy, istr
25from yarl import URL
27from . import hdrs
28from .base_protocol import BaseProtocol
29from .compression_utils import (
30 DEFAULT_MAX_DECOMPRESS_SIZE,
31 HAS_BROTLI,
32 HAS_ZSTD,
33 BrotliDecompressor,
34 ZLibDecompressor,
35 ZSTDDecompressor,
36)
37from .helpers import (
38 _EXC_SENTINEL,
39 DEBUG,
40 EMPTY_BODY_METHODS,
41 EMPTY_BODY_STATUS_CODES,
42 NO_EXTENSIONS,
43 BaseTimerContext,
44 set_exception,
45)
46from .http_exceptions import (
47 BadHttpMessage,
48 BadHttpMethod,
49 BadStatusLine,
50 ContentEncodingError,
51 ContentLengthError,
52 DecompressSizeError,
53 InvalidHeader,
54 InvalidURLError,
55 LineTooLong,
56 TransferEncodingError,
57)
58from .http_writer import HttpVersion, HttpVersion10
59from .streams import EMPTY_PAYLOAD, StreamReader
60from .typedefs import RawHeaders
62__all__ = (
63 "HeadersParser",
64 "HttpParser",
65 "HttpRequestParser",
66 "HttpResponseParser",
67 "RawRequestMessage",
68 "RawResponseMessage",
69)
71_SEP = Literal[b"\r\n", b"\n"]
73ASCIISET: Final[Set[str]] = set(string.printable)
75# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
76# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
77#
78# method = token
79# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
80# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
81# token = 1*tchar
82_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
83TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
84# https://www.rfc-editor.org/rfc/rfc9110#section-5.5-5
85_FIELD_VALUE_FORBIDDEN_CTL_RE: Final[Pattern[str]] = re.compile(
86 r"[\x00-\x08\x0a-\x1f\x7f]"
87)
88VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
89DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
90HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
93class RawRequestMessage(NamedTuple):
94 method: str
95 path: str
96 version: HttpVersion
97 headers: "CIMultiDictProxy[str]"
98 raw_headers: RawHeaders
99 should_close: bool
100 compression: Optional[str]
101 upgrade: bool
102 chunked: bool
103 url: URL
106class RawResponseMessage(NamedTuple):
107 version: HttpVersion
108 code: int
109 reason: str
110 headers: CIMultiDictProxy[str]
111 raw_headers: RawHeaders
112 should_close: bool
113 compression: Optional[str]
114 upgrade: bool
115 chunked: bool
118_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
121class ParseState(IntEnum):
123 PARSE_NONE = 0
124 PARSE_LENGTH = 1
125 PARSE_CHUNKED = 2
126 PARSE_UNTIL_EOF = 3
129class ChunkState(IntEnum):
130 PARSE_CHUNKED_SIZE = 0
131 PARSE_CHUNKED_CHUNK = 1
132 PARSE_CHUNKED_CHUNK_EOF = 2
133 PARSE_MAYBE_TRAILERS = 3
134 PARSE_TRAILERS = 4
137class HeadersParser:
138 def __init__(
139 self,
140 max_line_size: int = 8190,
141 max_headers: int = 32768,
142 max_field_size: int = 8190,
143 lax: bool = False,
144 ) -> None:
145 self.max_line_size = max_line_size
146 self.max_headers = max_headers
147 self.max_field_size = max_field_size
148 self._lax = lax
150 def parse_headers(
151 self, lines: List[bytes]
152 ) -> Tuple["CIMultiDictProxy[str]", RawHeaders]:
153 headers: CIMultiDict[str] = CIMultiDict()
154 # note: "raw" does not mean inclusion of OWS before/after the field value
155 raw_headers = []
157 lines_idx = 0
158 line = lines[lines_idx]
159 line_count = len(lines)
161 while line:
162 # Parse initial header name : value pair.
163 try:
164 bname, bvalue = line.split(b":", 1)
165 except ValueError:
166 raise InvalidHeader(line) from None
168 if len(bname) == 0:
169 raise InvalidHeader(bname)
171 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
172 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
173 raise InvalidHeader(line)
175 bvalue = bvalue.lstrip(b" \t")
176 name = bname.decode("utf-8", "surrogateescape")
177 if not TOKENRE.fullmatch(name):
178 raise InvalidHeader(bname)
180 # next line
181 lines_idx += 1
182 line = lines[lines_idx]
184 # consume continuation lines
185 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
187 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
188 if continuation:
189 header_length = len(bvalue)
190 bvalue_lst = [bvalue]
191 while continuation:
192 header_length += len(line)
193 if header_length > self.max_field_size:
194 header_line = bname + b": " + b"".join(bvalue_lst)
195 raise LineTooLong(
196 header_line[:100] + b"...", self.max_field_size
197 )
198 bvalue_lst.append(line)
200 # next line
201 lines_idx += 1
202 if lines_idx < line_count:
203 line = lines[lines_idx]
204 if line:
205 continuation = line[0] in (32, 9) # (' ', '\t')
206 else:
207 line = b""
208 break
209 bvalue = b"".join(bvalue_lst)
211 bvalue = bvalue.strip(b" \t")
212 value = bvalue.decode("utf-8", "surrogateescape")
214 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
215 if self._lax:
216 if "\n" in value or "\r" in value or "\x00" in value:
217 raise InvalidHeader(bvalue)
218 elif _FIELD_VALUE_FORBIDDEN_CTL_RE.search(value):
219 raise InvalidHeader(bvalue)
221 headers.add(name, value)
222 raw_headers.append((bname, bvalue))
224 return (CIMultiDictProxy(headers), tuple(raw_headers))
227def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool:
228 """Check if the upgrade header is supported."""
229 u = headers.get(hdrs.UPGRADE, "")
230 # .lower() can transform non-ascii characters.
231 return u.isascii() and u.lower() in {"tcp", "websocket"}
234class HttpParser(abc.ABC, Generic[_MsgT]):
235 lax: ClassVar[bool] = False
237 def __init__(
238 self,
239 protocol: Optional[BaseProtocol] = None,
240 loop: Optional[asyncio.AbstractEventLoop] = None,
241 limit: int = 2**16,
242 max_line_size: int = 8190,
243 max_headers: int = 128,
244 max_field_size: int = 8190,
245 timer: Optional[BaseTimerContext] = None,
246 code: Optional[int] = None,
247 method: Optional[str] = None,
248 payload_exception: Optional[Type[BaseException]] = None,
249 response_with_body: bool = True,
250 read_until_eof: bool = False,
251 auto_decompress: bool = True,
252 ) -> None:
253 self.protocol = protocol
254 self.loop = loop
255 self.max_line_size = max_line_size
256 self.max_headers = max_headers
257 self.max_field_size = max_field_size
258 self.max_headers = max_headers
259 self.timer = timer
260 self.code = code
261 self.method = method
262 self.payload_exception = payload_exception
263 self.response_with_body = response_with_body
264 self.read_until_eof = read_until_eof
266 self._lines: List[bytes] = []
267 self._tail = b""
268 self._upgraded = False
269 self._payload = None
270 self._payload_parser: Optional[HttpPayloadParser] = None
271 self._auto_decompress = auto_decompress
272 self._limit = limit
273 self._headers_parser = HeadersParser(
274 max_line_size, max_headers, max_field_size, self.lax
275 )
277 @abc.abstractmethod
278 def parse_message(self, lines: List[bytes]) -> _MsgT: ...
280 @abc.abstractmethod
281 def _is_chunked_te(self, te: str) -> bool: ...
283 def feed_eof(self) -> Optional[_MsgT]:
284 if self._payload_parser is not None:
285 self._payload_parser.feed_eof()
286 self._payload_parser = None
287 else:
288 # try to extract partial message
289 if self._tail:
290 self._lines.append(self._tail)
292 if self._lines:
293 if self._lines[-1] != "\r\n":
294 self._lines.append(b"")
295 with suppress(Exception):
296 return self.parse_message(self._lines)
297 return None
299 def feed_data(
300 self,
301 data: bytes,
302 SEP: _SEP = b"\r\n",
303 EMPTY: bytes = b"",
304 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
305 METH_CONNECT: str = hdrs.METH_CONNECT,
306 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
307 ) -> Tuple[List[Tuple[_MsgT, StreamReader]], bool, bytes]:
309 messages = []
311 if self._tail:
312 data, self._tail = self._tail + data, b""
314 data_len = len(data)
315 start_pos = 0
316 loop = self.loop
317 max_line_length = self.max_line_size
319 should_close = False
320 while start_pos < data_len:
322 # read HTTP message (request/response line + headers), \r\n\r\n
323 # and split by lines
324 if self._payload_parser is None and not self._upgraded:
325 pos = data.find(SEP, start_pos)
326 # consume \r\n
327 if pos == start_pos and not self._lines:
328 start_pos = pos + len(SEP)
329 continue
331 if pos >= start_pos:
332 if should_close:
333 raise BadHttpMessage("Data after `Connection: close`")
335 # line found
336 line = data[start_pos:pos]
337 if SEP == b"\n": # For lax response parsing
338 line = line.rstrip(b"\r")
339 if len(line) > max_line_length:
340 raise LineTooLong(line[:100] + b"...", max_line_length)
342 self._lines.append(line)
343 # After processing the status/request line, everything is a header.
344 max_line_length = self.max_field_size
346 if len(self._lines) > self.max_headers:
347 raise BadHttpMessage("Too many headers received")
349 start_pos = pos + len(SEP)
351 # \r\n\r\n found
352 if self._lines[-1] == EMPTY:
353 max_trailers = self.max_headers - len(self._lines)
354 try:
355 msg: _MsgT = self.parse_message(self._lines)
356 finally:
357 self._lines.clear()
359 def get_content_length() -> Optional[int]:
360 # payload length
361 length_hdr = msg.headers.get(CONTENT_LENGTH)
362 if length_hdr is None:
363 return None
365 # Shouldn't allow +/- or other number formats.
366 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
367 # msg.headers is already stripped of leading/trailing wsp
368 if not DIGITS.fullmatch(length_hdr):
369 raise InvalidHeader(CONTENT_LENGTH)
371 return int(length_hdr)
373 length = get_content_length()
374 # do not support old websocket spec
375 if SEC_WEBSOCKET_KEY1 in msg.headers:
376 raise InvalidHeader(SEC_WEBSOCKET_KEY1)
378 self._upgraded = msg.upgrade and _is_supported_upgrade(
379 msg.headers
380 )
382 method = getattr(msg, "method", self.method)
383 # code is only present on responses
384 code = getattr(msg, "code", 0)
386 assert self.protocol is not None
387 # calculate payload
388 empty_body = code in EMPTY_BODY_STATUS_CODES or bool(
389 method and method in EMPTY_BODY_METHODS
390 )
391 if not empty_body and (
392 ((length is not None and length > 0) or msg.chunked)
393 and not self._upgraded
394 ):
395 payload = StreamReader(
396 self.protocol,
397 timer=self.timer,
398 loop=loop,
399 limit=self._limit,
400 )
401 payload_parser = HttpPayloadParser(
402 payload,
403 length=length,
404 chunked=msg.chunked,
405 method=method,
406 compression=msg.compression,
407 code=self.code,
408 response_with_body=self.response_with_body,
409 auto_decompress=self._auto_decompress,
410 lax=self.lax,
411 headers_parser=self._headers_parser,
412 max_line_size=self.max_line_size,
413 max_field_size=self.max_field_size,
414 max_trailers=max_trailers,
415 )
416 if not payload_parser.done:
417 self._payload_parser = payload_parser
418 elif method == METH_CONNECT:
419 assert isinstance(msg, RawRequestMessage)
420 payload = StreamReader(
421 self.protocol,
422 timer=self.timer,
423 loop=loop,
424 limit=self._limit,
425 )
426 self._upgraded = True
427 self._payload_parser = HttpPayloadParser(
428 payload,
429 method=msg.method,
430 compression=msg.compression,
431 auto_decompress=self._auto_decompress,
432 lax=self.lax,
433 headers_parser=self._headers_parser,
434 max_line_size=self.max_line_size,
435 max_field_size=self.max_field_size,
436 max_trailers=max_trailers,
437 )
438 elif not empty_body and length is None and self.read_until_eof:
439 payload = StreamReader(
440 self.protocol,
441 timer=self.timer,
442 loop=loop,
443 limit=self._limit,
444 )
445 payload_parser = HttpPayloadParser(
446 payload,
447 length=length,
448 chunked=msg.chunked,
449 method=method,
450 compression=msg.compression,
451 code=self.code,
452 response_with_body=self.response_with_body,
453 auto_decompress=self._auto_decompress,
454 lax=self.lax,
455 headers_parser=self._headers_parser,
456 max_line_size=self.max_line_size,
457 max_field_size=self.max_field_size,
458 max_trailers=max_trailers,
459 )
460 if not payload_parser.done:
461 self._payload_parser = payload_parser
462 else:
463 payload = EMPTY_PAYLOAD
465 messages.append((msg, payload))
466 should_close = msg.should_close
467 else:
468 self._tail = data[start_pos:]
469 if len(self._tail) > self.max_line_size:
470 raise LineTooLong(self._tail[:100] + b"...", self.max_line_size)
471 data = EMPTY
472 break
474 # no parser, just store
475 elif self._payload_parser is None and self._upgraded:
476 assert not self._lines
477 break
479 # feed payload
480 elif data and start_pos < data_len:
481 assert not self._lines
482 assert self._payload_parser is not None
483 try:
484 eof, data = self._payload_parser.feed_data(data[start_pos:], SEP)
485 except BaseException as underlying_exc:
486 reraised_exc = underlying_exc
487 if self.payload_exception is not None:
488 reraised_exc = self.payload_exception(str(underlying_exc))
490 set_exception(
491 self._payload_parser.payload,
492 reraised_exc,
493 underlying_exc,
494 )
496 eof = True
497 data = b""
498 if isinstance(
499 underlying_exc, (InvalidHeader, TransferEncodingError)
500 ):
501 raise
503 if eof:
504 start_pos = 0
505 data_len = len(data)
506 self._payload_parser = None
507 continue
508 else:
509 break
511 if data and start_pos < data_len:
512 data = data[start_pos:]
513 else:
514 data = EMPTY
516 return messages, self._upgraded, data
518 def parse_headers(
519 self, lines: List[bytes]
520 ) -> Tuple[
521 "CIMultiDictProxy[str]", RawHeaders, Optional[bool], Optional[str], bool, bool
522 ]:
523 """Parses RFC 5322 headers from a stream.
525 Line continuations are supported. Returns list of header name
526 and value pairs. Header name is in upper case.
527 """
528 headers, raw_headers = self._headers_parser.parse_headers(lines)
529 close_conn = None
530 encoding = None
531 upgrade = False
532 chunked = False
534 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-6
535 # https://www.rfc-editor.org/rfc/rfc9110.html#name-collected-abnf
536 singletons = (
537 hdrs.CONTENT_LENGTH,
538 hdrs.CONTENT_LOCATION,
539 hdrs.CONTENT_RANGE,
540 hdrs.CONTENT_TYPE,
541 hdrs.ETAG,
542 hdrs.HOST,
543 hdrs.MAX_FORWARDS,
544 hdrs.SERVER,
545 hdrs.TRANSFER_ENCODING,
546 hdrs.USER_AGENT,
547 )
548 bad_hdr = next((h for h in singletons if len(headers.getall(h, ())) > 1), None)
549 if bad_hdr is not None:
550 raise BadHttpMessage(f"Duplicate '{bad_hdr}' header found.")
552 # keep-alive and protocol switching
553 # RFC 9110 section 7.6.1 defines Connection as a comma-separated list.
554 conn_values = headers.getall(hdrs.CONNECTION, ())
555 if conn_values:
556 conn_tokens = {
557 token.lower()
558 for conn_value in conn_values
559 for token in (part.strip(" \t") for part in conn_value.split(","))
560 if token and token.isascii()
561 }
563 if "close" in conn_tokens:
564 close_conn = True
565 elif "keep-alive" in conn_tokens:
566 close_conn = False
568 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
569 if "upgrade" in conn_tokens and headers.get(hdrs.UPGRADE):
570 upgrade = True
572 # encoding
573 enc = headers.get(hdrs.CONTENT_ENCODING, "")
574 if enc.isascii() and enc.lower() in {"gzip", "deflate", "br", "zstd"}:
575 encoding = enc
577 # chunking
578 te = headers.get(hdrs.TRANSFER_ENCODING)
579 if te is not None:
580 if self._is_chunked_te(te):
581 chunked = True
583 if hdrs.CONTENT_LENGTH in headers:
584 raise BadHttpMessage(
585 "Transfer-Encoding can't be present with Content-Length",
586 )
588 return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
590 def set_upgraded(self, val: bool) -> None:
591 """Set connection upgraded (to websocket) mode.
593 :param bool val: new state.
594 """
595 self._upgraded = val
598class HttpRequestParser(HttpParser[RawRequestMessage]):
599 """Read request status line.
601 Exception .http_exceptions.BadStatusLine
602 could be raised in case of any errors in status line.
603 Returns RawRequestMessage.
604 """
606 def parse_message(self, lines: List[bytes]) -> RawRequestMessage:
607 # request line
608 line = lines[0].decode("utf-8", "surrogateescape")
609 try:
610 method, path, version = line.split(" ", maxsplit=2)
611 except ValueError:
612 raise BadHttpMethod(line) from None
614 # method
615 if not TOKENRE.fullmatch(method):
616 raise BadHttpMethod(method)
618 # version
619 match = VERSRE.fullmatch(version)
620 if match is None:
621 raise BadStatusLine(line)
622 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
624 if method == "CONNECT":
625 # authority-form,
626 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
627 url = URL.build(authority=path, encoded=True)
628 elif path.startswith("/"):
629 # origin-form,
630 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
631 path_part, _hash_separator, url_fragment = path.partition("#")
632 path_part, _question_mark_separator, qs_part = path_part.partition("?")
634 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
635 # NOTE: parser does, otherwise it results into the same
636 # NOTE: HTTP Request-Line input producing different
637 # NOTE: `yarl.URL()` objects
638 url = URL.build(
639 path=path_part,
640 query_string=qs_part,
641 fragment=url_fragment,
642 encoded=True,
643 )
644 elif path == "*" and method == "OPTIONS":
645 # asterisk-form,
646 url = URL(path, encoded=True)
647 else:
648 # absolute-form for proxy maybe,
649 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
650 url = URL(path, encoded=True)
651 if url.scheme == "":
652 # not absolute-form
653 raise InvalidURLError(
654 path.encode(errors="surrogateescape").decode("latin1")
655 )
657 # read headers
658 (
659 headers,
660 raw_headers,
661 close,
662 compression,
663 upgrade,
664 chunked,
665 ) = self.parse_headers(lines[1:])
667 if close is None: # then the headers weren't set in the request
668 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
669 close = True
670 else: # HTTP 1.1 must ask to close.
671 close = False
673 return RawRequestMessage(
674 method,
675 path,
676 version_o,
677 headers,
678 raw_headers,
679 close,
680 compression,
681 upgrade,
682 chunked,
683 url,
684 )
686 def _is_chunked_te(self, te: str) -> bool:
687 te = te.rsplit(",", maxsplit=1)[-1].strip(" \t")
688 # .lower() transforms some non-ascii chars, so must check first.
689 if te.isascii() and te.lower() == "chunked":
690 return True
691 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3
692 raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
695class HttpResponseParser(HttpParser[RawResponseMessage]):
696 """Read response status line and headers.
698 BadStatusLine could be raised in case of any errors in status line.
699 Returns RawResponseMessage.
700 """
702 # Lax mode should only be enabled on response parser.
703 lax = not DEBUG
705 def feed_data(
706 self,
707 data: bytes,
708 SEP: Optional[_SEP] = None,
709 *args: Any,
710 **kwargs: Any,
711 ) -> Tuple[List[Tuple[RawResponseMessage, StreamReader]], bool, bytes]:
712 if SEP is None:
713 SEP = b"\r\n" if DEBUG else b"\n"
714 return super().feed_data(data, SEP, *args, **kwargs)
716 def parse_message(self, lines: List[bytes]) -> RawResponseMessage:
717 line = lines[0].decode("utf-8", "surrogateescape")
718 try:
719 version, status = line.split(maxsplit=1)
720 except ValueError:
721 raise BadStatusLine(line) from None
723 try:
724 status, reason = status.split(maxsplit=1)
725 except ValueError:
726 status = status.strip()
727 reason = ""
729 # version
730 match = VERSRE.fullmatch(version)
731 if match is None:
732 raise BadStatusLine(line)
733 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
735 # The status code is a three-digit ASCII number, no padding
736 if len(status) != 3 or not DIGITS.fullmatch(status):
737 raise BadStatusLine(line)
738 status_i = int(status)
740 # read headers
741 (
742 headers,
743 raw_headers,
744 close,
745 compression,
746 upgrade,
747 chunked,
748 ) = self.parse_headers(lines[1:])
750 if close is None:
751 if version_o <= HttpVersion10:
752 close = True
753 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
754 elif 100 <= status_i < 200 or status_i in {204, 304}:
755 close = False
756 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
757 close = False
758 else:
759 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
760 close = True
762 return RawResponseMessage(
763 version_o,
764 status_i,
765 reason.strip(),
766 headers,
767 raw_headers,
768 close,
769 compression,
770 upgrade,
771 chunked,
772 )
774 def _is_chunked_te(self, te: str) -> bool:
775 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2
776 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked"
779class HttpPayloadParser:
780 def __init__(
781 self,
782 payload: StreamReader,
783 length: Optional[int] = None,
784 chunked: bool = False,
785 compression: Optional[str] = None,
786 code: Optional[int] = None,
787 method: Optional[str] = None,
788 response_with_body: bool = True,
789 auto_decompress: bool = True,
790 lax: bool = False,
791 *,
792 headers_parser: HeadersParser,
793 max_line_size: int = 8190,
794 max_field_size: int = 8190,
795 max_trailers: int = 128,
796 ) -> None:
797 self._length = 0
798 self._type = ParseState.PARSE_UNTIL_EOF
799 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
800 self._chunk_size = 0
801 self._chunk_tail = b""
802 self._auto_decompress = auto_decompress
803 self._lax = lax
804 self._headers_parser = headers_parser
805 self._max_line_size = max_line_size
806 self._max_field_size = max_field_size
807 self._max_trailers = max_trailers
808 self._trailer_lines: list[bytes] = []
809 self.done = False
811 # payload decompression wrapper
812 if response_with_body and compression and self._auto_decompress:
813 real_payload: Union[StreamReader, DeflateBuffer] = DeflateBuffer(
814 payload, compression
815 )
816 else:
817 real_payload = payload
819 # payload parser
820 if not response_with_body:
821 # don't parse payload if it's not expected to be received
822 self._type = ParseState.PARSE_NONE
823 real_payload.feed_eof()
824 self.done = True
825 elif chunked:
826 self._type = ParseState.PARSE_CHUNKED
827 elif length is not None:
828 self._type = ParseState.PARSE_LENGTH
829 self._length = length
830 if self._length == 0:
831 real_payload.feed_eof()
832 self.done = True
834 self.payload = real_payload
836 def feed_eof(self) -> None:
837 if self._type == ParseState.PARSE_UNTIL_EOF:
838 self.payload.feed_eof()
839 elif self._type == ParseState.PARSE_LENGTH:
840 raise ContentLengthError(
841 "Not enough data to satisfy content length header."
842 )
843 elif self._type == ParseState.PARSE_CHUNKED:
844 raise TransferEncodingError(
845 "Not enough data to satisfy transfer length header."
846 )
848 def feed_data(
849 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
850 ) -> Tuple[bool, bytes]:
851 # Read specified amount of bytes
852 if self._type == ParseState.PARSE_LENGTH:
853 required = self._length
854 chunk_len = len(chunk)
856 if required >= chunk_len:
857 self._length = required - chunk_len
858 self.payload.feed_data(chunk, chunk_len)
859 if self._length == 0:
860 self.payload.feed_eof()
861 return True, b""
862 else:
863 self._length = 0
864 self.payload.feed_data(chunk[:required], required)
865 self.payload.feed_eof()
866 return True, chunk[required:]
868 # Chunked transfer encoding parser
869 elif self._type == ParseState.PARSE_CHUNKED:
870 if self._chunk_tail:
871 # We should never have a tail if we're inside the payload body.
872 assert self._chunk != ChunkState.PARSE_CHUNKED_CHUNK
873 # We should check the length is sane.
874 max_line_length = self._max_line_size
875 if self._chunk == ChunkState.PARSE_TRAILERS:
876 max_line_length = self._max_field_size
877 if len(self._chunk_tail) > max_line_length:
878 raise LineTooLong(self._chunk_tail[:100] + b"...", max_line_length)
880 chunk = self._chunk_tail + chunk
881 self._chunk_tail = b""
883 while chunk:
885 # read next chunk size
886 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
887 pos = chunk.find(SEP)
888 if pos >= 0:
889 i = chunk.find(CHUNK_EXT, 0, pos)
890 if i >= 0:
891 size_b = chunk[:i] # strip chunk-extensions
892 # Verify no LF in the chunk-extension
893 if b"\n" in (ext := chunk[i:pos]):
894 exc = TransferEncodingError(
895 f"Unexpected LF in chunk-extension: {ext!r}"
896 )
897 set_exception(self.payload, exc)
898 raise exc
899 else:
900 size_b = chunk[:pos]
902 if self._lax: # Allow whitespace in lax mode.
903 size_b = size_b.strip()
905 if not re.fullmatch(HEXDIGITS, size_b):
906 exc = TransferEncodingError(
907 chunk[:pos].decode("ascii", "surrogateescape")
908 )
909 set_exception(self.payload, exc)
910 raise exc
911 size = int(bytes(size_b), 16)
913 chunk = chunk[pos + len(SEP) :]
914 if size == 0: # eof marker
915 self._chunk = ChunkState.PARSE_TRAILERS
916 if self._lax and chunk.startswith(b"\r"):
917 chunk = chunk[1:]
918 else:
919 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
920 self._chunk_size = size
921 self.payload.begin_http_chunk_receiving()
922 else:
923 self._chunk_tail = chunk
924 return False, b""
926 # read chunk and feed buffer
927 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
928 required = self._chunk_size
929 chunk_len = len(chunk)
931 if required > chunk_len:
932 self._chunk_size = required - chunk_len
933 self.payload.feed_data(chunk, chunk_len)
934 return False, b""
935 else:
936 self._chunk_size = 0
937 self.payload.feed_data(chunk[:required], required)
938 chunk = chunk[required:]
939 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
940 self.payload.end_http_chunk_receiving()
942 # toss the CRLF at the end of the chunk
943 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
944 if self._lax and chunk.startswith(b"\r"):
945 chunk = chunk[1:]
946 if chunk[: len(SEP)] == SEP:
947 chunk = chunk[len(SEP) :]
948 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
949 elif len(chunk) >= len(SEP) or chunk != SEP[: len(chunk)]:
950 exc = TransferEncodingError(
951 "Chunk size mismatch: expected CRLF after chunk data"
952 )
953 set_exception(self.payload, exc)
954 raise exc
955 else:
956 self._chunk_tail = chunk
957 return False, b""
959 if self._chunk == ChunkState.PARSE_TRAILERS:
960 pos = chunk.find(SEP)
961 if pos < 0: # No line found
962 self._chunk_tail = chunk
963 return False, b""
965 line = chunk[:pos]
966 chunk = chunk[pos + len(SEP) :]
967 if SEP == b"\n": # For lax response parsing
968 line = line.rstrip(b"\r")
970 if len(line) > self._max_field_size:
971 raise LineTooLong(line[:100] + b"...", self._max_field_size)
973 self._trailer_lines.append(line)
975 if len(self._trailer_lines) > self._max_trailers:
976 raise BadHttpMessage("Too many trailers received")
978 # \r\n\r\n found, end of stream
979 if self._trailer_lines[-1] == b"":
980 # Headers and trailers are defined the same way,
981 # so we reuse the HeadersParser here.
982 try:
983 trailers, raw_trailers = self._headers_parser.parse_headers(
984 self._trailer_lines
985 )
986 finally:
987 self._trailer_lines.clear()
988 self.payload.feed_eof()
989 return True, chunk
991 # Read all bytes until eof
992 elif self._type == ParseState.PARSE_UNTIL_EOF:
993 self.payload.feed_data(chunk, len(chunk))
995 return False, b""
998class DeflateBuffer:
999 """DeflateStream decompress stream and feed data into specified stream."""
1001 decompressor: Any
1003 def __init__(
1004 self,
1005 out: StreamReader,
1006 encoding: Optional[str],
1007 max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE,
1008 ) -> None:
1009 self.out = out
1010 self.size = 0
1011 out.total_compressed_bytes = self.size
1012 self.encoding = encoding
1013 self._started_decoding = False
1015 self.decompressor: Union[BrotliDecompressor, ZLibDecompressor, ZSTDDecompressor]
1016 if encoding == "br":
1017 if not HAS_BROTLI: # pragma: no cover
1018 raise ContentEncodingError(
1019 "Can not decode content-encoding: brotli (br). "
1020 "Please install `Brotli`"
1021 )
1022 self.decompressor = BrotliDecompressor()
1023 elif encoding == "zstd":
1024 if not HAS_ZSTD:
1025 raise ContentEncodingError(
1026 "Can not decode content-encoding: zstandard (zstd). "
1027 "Please install `backports.zstd`"
1028 )
1029 self.decompressor = ZSTDDecompressor()
1030 else:
1031 self.decompressor = ZLibDecompressor(encoding=encoding)
1033 self._max_decompress_size = max_decompress_size
1035 def set_exception(
1036 self,
1037 exc: BaseException,
1038 exc_cause: BaseException = _EXC_SENTINEL,
1039 ) -> None:
1040 set_exception(self.out, exc, exc_cause)
1042 def feed_data(self, chunk: bytes, size: int) -> None:
1043 if not size:
1044 return
1046 self.size += size
1047 self.out.total_compressed_bytes = self.size
1049 # RFC1950
1050 # bits 0..3 = CM = 0b1000 = 8 = "deflate"
1051 # bits 4..7 = CINFO = 1..7 = windows size.
1052 if (
1053 not self._started_decoding
1054 and self.encoding == "deflate"
1055 and chunk[0] & 0xF != 8
1056 ):
1057 # Change the decoder to decompress incorrectly compressed data
1058 # Actually we should issue a warning about non-RFC-compliant data.
1059 self.decompressor = ZLibDecompressor(
1060 encoding=self.encoding, suppress_deflate_header=True
1061 )
1063 try:
1064 # Decompress with limit + 1 so we can detect if output exceeds limit
1065 chunk = self.decompressor.decompress_sync(
1066 chunk, max_length=self._max_decompress_size + 1
1067 )
1068 except Exception:
1069 raise ContentEncodingError(
1070 "Can not decode content-encoding: %s" % self.encoding
1071 )
1073 self._started_decoding = True
1075 # Check if decompression limit was exceeded
1076 if len(chunk) > self._max_decompress_size:
1077 raise DecompressSizeError(
1078 "Decompressed data exceeds the configured limit of %d bytes"
1079 % self._max_decompress_size
1080 )
1082 if chunk:
1083 self.out.feed_data(chunk, len(chunk))
1085 def feed_eof(self) -> None:
1086 chunk = self.decompressor.flush()
1088 if chunk or self.size > 0:
1089 self.out.feed_data(chunk, len(chunk))
1090 if self.encoding == "deflate" and not self.decompressor.eof:
1091 raise ContentEncodingError("deflate")
1093 self.out.feed_eof()
1095 def begin_http_chunk_receiving(self) -> None:
1096 self.out.begin_http_chunk_receiving()
1098 def end_http_chunk_receiving(self) -> None:
1099 self.out.end_http_chunk_receiving()
1102HttpRequestParserPy = HttpRequestParser
1103HttpResponseParserPy = HttpResponseParser
1104RawRequestMessagePy = RawRequestMessage
1105RawResponseMessagePy = RawResponseMessage
1107try:
1108 if not NO_EXTENSIONS:
1109 from ._http_parser import ( # type: ignore[import-not-found,no-redef]
1110 HttpRequestParser,
1111 HttpResponseParser,
1112 RawRequestMessage,
1113 RawResponseMessage,
1114 )
1116 HttpRequestParserC = HttpRequestParser
1117 HttpResponseParserC = HttpResponseParser
1118 RawRequestMessageC = RawRequestMessage
1119 RawResponseMessageC = RawResponseMessage
1120except ImportError: # pragma: no cover
1121 pass