Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/http_parser.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import asyncio
3import re
4import string
5from contextlib import suppress
6from enum import IntEnum
7from typing import (
8 Any,
9 ClassVar,
10 Final,
11 Generic,
12 List,
13 Literal,
14 NamedTuple,
15 Optional,
16 Pattern,
17 Set,
18 Tuple,
19 Type,
20 TypeVar,
21 Union,
22)
24from multidict import CIMultiDict, CIMultiDictProxy, istr
25from yarl import URL
27from . import hdrs
28from .base_protocol import BaseProtocol
29from .compression_utils import (
30 DEFAULT_MAX_DECOMPRESS_SIZE,
31 HAS_BROTLI,
32 HAS_ZSTD,
33 BrotliDecompressor,
34 ZLibDecompressor,
35 ZSTDDecompressor,
36)
37from .helpers import (
38 _EXC_SENTINEL,
39 DEBUG,
40 EMPTY_BODY_METHODS,
41 EMPTY_BODY_STATUS_CODES,
42 NO_EXTENSIONS,
43 BaseTimerContext,
44 set_exception,
45)
46from .http_exceptions import (
47 BadHttpMessage,
48 BadHttpMethod,
49 BadStatusLine,
50 ContentEncodingError,
51 ContentLengthError,
52 DecompressSizeError,
53 InvalidHeader,
54 InvalidURLError,
55 LineTooLong,
56 TransferEncodingError,
57)
58from .http_writer import HttpVersion, HttpVersion10
59from .streams import EMPTY_PAYLOAD, StreamReader
60from .typedefs import RawHeaders
62__all__ = (
63 "HeadersParser",
64 "HttpParser",
65 "HttpRequestParser",
66 "HttpResponseParser",
67 "RawRequestMessage",
68 "RawResponseMessage",
69)
71_SEP = Literal[b"\r\n", b"\n"]
73ASCIISET: Final[Set[str]] = set(string.printable)
75# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
76# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
77#
78# method = token
79# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
80# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
81# token = 1*tchar
82_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
83TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
84# https://www.rfc-editor.org/rfc/rfc9110#section-5.5-5
85_FIELD_VALUE_FORBIDDEN_CTL_RE: Final[Pattern[str]] = re.compile(
86 r"[\x00-\x08\x0a-\x1f\x7f]"
87)
88VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
89DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
90HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
92# RFC 9110 singleton headers — duplicates are rejected in strict mode.
93# In lax mode (response parser default), the check is skipped entirely
94# since real-world servers (e.g. Google APIs, Werkzeug) commonly send
95# duplicate headers like Content-Type or Server.
96# Lowercased for case-insensitive matching against wire names.
97SINGLETON_HEADERS: Final[frozenset[str]] = frozenset(
98 {
99 "content-length",
100 "content-location",
101 "content-range",
102 "content-type",
103 "etag",
104 "host",
105 "max-forwards",
106 "server",
107 "transfer-encoding",
108 "user-agent",
109 }
110)
113class RawRequestMessage(NamedTuple):
114 method: str
115 path: str
116 version: HttpVersion
117 headers: "CIMultiDictProxy[str]"
118 raw_headers: RawHeaders
119 should_close: bool
120 compression: Optional[str]
121 upgrade: bool
122 chunked: bool
123 url: URL
126class RawResponseMessage(NamedTuple):
127 version: HttpVersion
128 code: int
129 reason: str
130 headers: CIMultiDictProxy[str]
131 raw_headers: RawHeaders
132 should_close: bool
133 compression: Optional[str]
134 upgrade: bool
135 chunked: bool
138_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
141class ParseState(IntEnum):
143 PARSE_NONE = 0
144 PARSE_LENGTH = 1
145 PARSE_CHUNKED = 2
146 PARSE_UNTIL_EOF = 3
149class ChunkState(IntEnum):
150 PARSE_CHUNKED_SIZE = 0
151 PARSE_CHUNKED_CHUNK = 1
152 PARSE_CHUNKED_CHUNK_EOF = 2
153 PARSE_MAYBE_TRAILERS = 3
154 PARSE_TRAILERS = 4
157class HeadersParser:
158 def __init__(
159 self,
160 max_line_size: int = 8190,
161 max_headers: int = 32768,
162 max_field_size: int = 8190,
163 lax: bool = False,
164 ) -> None:
165 self.max_line_size = max_line_size
166 self.max_headers = max_headers
167 self.max_field_size = max_field_size
168 self._lax = lax
170 def parse_headers(
171 self, lines: List[bytes]
172 ) -> Tuple["CIMultiDictProxy[str]", RawHeaders]:
173 headers: CIMultiDict[str] = CIMultiDict()
174 # note: "raw" does not mean inclusion of OWS before/after the field value
175 raw_headers = []
177 lines_idx = 0
178 line = lines[lines_idx]
179 line_count = len(lines)
181 while line:
182 # Parse initial header name : value pair.
183 try:
184 bname, bvalue = line.split(b":", 1)
185 except ValueError:
186 raise InvalidHeader(line) from None
188 if len(bname) == 0:
189 raise InvalidHeader(bname)
191 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
192 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
193 raise InvalidHeader(line)
195 bvalue = bvalue.lstrip(b" \t")
196 name = bname.decode("utf-8", "surrogateescape")
197 if not TOKENRE.fullmatch(name):
198 raise InvalidHeader(bname)
200 # next line
201 lines_idx += 1
202 line = lines[lines_idx]
204 # consume continuation lines
205 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
207 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
208 if continuation:
209 header_length = len(bvalue)
210 bvalue_lst = [bvalue]
211 while continuation:
212 header_length += len(line)
213 if header_length > self.max_field_size:
214 header_line = bname + b": " + b"".join(bvalue_lst)
215 raise LineTooLong(
216 header_line[:100] + b"...", self.max_field_size
217 )
218 bvalue_lst.append(line)
220 # next line
221 lines_idx += 1
222 if lines_idx < line_count:
223 line = lines[lines_idx]
224 if line:
225 continuation = line[0] in (32, 9) # (' ', '\t')
226 else:
227 line = b""
228 break
229 bvalue = b"".join(bvalue_lst)
231 bvalue = bvalue.strip(b" \t")
232 value = bvalue.decode("utf-8", "surrogateescape")
234 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
235 if self._lax:
236 if "\n" in value or "\r" in value or "\x00" in value:
237 raise InvalidHeader(bvalue)
238 elif _FIELD_VALUE_FORBIDDEN_CTL_RE.search(value):
239 raise InvalidHeader(bvalue)
241 if not self._lax and name in headers and name.lower() in SINGLETON_HEADERS:
242 raise BadHttpMessage(f"Duplicate '{name}' header found.")
243 headers.add(name, value)
244 raw_headers.append((bname, bvalue))
246 return (CIMultiDictProxy(headers), tuple(raw_headers))
249def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool:
250 """Check if the upgrade header is supported."""
251 u = headers.get(hdrs.UPGRADE, "")
252 # .lower() can transform non-ascii characters.
253 return u.isascii() and u.lower() in {"tcp", "websocket"}
256class HttpParser(abc.ABC, Generic[_MsgT]):
257 lax: ClassVar[bool] = False
259 def __init__(
260 self,
261 protocol: Optional[BaseProtocol] = None,
262 loop: Optional[asyncio.AbstractEventLoop] = None,
263 limit: int = 2**16,
264 max_line_size: int = 8190,
265 max_headers: int = 128,
266 max_field_size: int = 8190,
267 timer: Optional[BaseTimerContext] = None,
268 code: Optional[int] = None,
269 method: Optional[str] = None,
270 payload_exception: Optional[Type[BaseException]] = None,
271 response_with_body: bool = True,
272 read_until_eof: bool = False,
273 auto_decompress: bool = True,
274 ) -> None:
275 self.protocol = protocol
276 self.loop = loop
277 self.max_line_size = max_line_size
278 self.max_headers = max_headers
279 self.max_field_size = max_field_size
280 self.max_headers = max_headers
281 self.timer = timer
282 self.code = code
283 self.method = method
284 self.payload_exception = payload_exception
285 self.response_with_body = response_with_body
286 self.read_until_eof = read_until_eof
288 self._lines: List[bytes] = []
289 self._tail = b""
290 self._upgraded = False
291 self._payload = None
292 self._payload_parser: Optional[HttpPayloadParser] = None
293 self._auto_decompress = auto_decompress
294 self._limit = limit
295 self._headers_parser = HeadersParser(
296 max_line_size, max_headers, max_field_size, self.lax
297 )
299 @abc.abstractmethod
300 def parse_message(self, lines: List[bytes]) -> _MsgT: ...
302 @abc.abstractmethod
303 def _is_chunked_te(self, te: str) -> bool: ...
305 def feed_eof(self) -> Optional[_MsgT]:
306 if self._payload_parser is not None:
307 self._payload_parser.feed_eof()
308 self._payload_parser = None
309 else:
310 # try to extract partial message
311 if self._tail:
312 self._lines.append(self._tail)
314 if self._lines:
315 if self._lines[-1] != "\r\n":
316 self._lines.append(b"")
317 with suppress(Exception):
318 return self.parse_message(self._lines)
319 return None
321 def feed_data(
322 self,
323 data: bytes,
324 SEP: _SEP = b"\r\n",
325 EMPTY: bytes = b"",
326 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
327 METH_CONNECT: str = hdrs.METH_CONNECT,
328 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
329 ) -> Tuple[List[Tuple[_MsgT, StreamReader]], bool, bytes]:
331 messages = []
333 if self._tail:
334 data, self._tail = self._tail + data, b""
336 data_len = len(data)
337 start_pos = 0
338 loop = self.loop
339 max_line_length = self.max_line_size
341 should_close = False
342 while start_pos < data_len:
344 # read HTTP message (request/response line + headers), \r\n\r\n
345 # and split by lines
346 if self._payload_parser is None and not self._upgraded:
347 pos = data.find(SEP, start_pos)
348 # consume \r\n
349 if pos == start_pos and not self._lines:
350 start_pos = pos + len(SEP)
351 continue
353 if pos >= start_pos:
354 if should_close:
355 raise BadHttpMessage("Data after `Connection: close`")
357 # line found
358 line = data[start_pos:pos]
359 if SEP == b"\n": # For lax response parsing
360 line = line.rstrip(b"\r")
361 if len(line) > max_line_length:
362 raise LineTooLong(line[:100] + b"...", max_line_length)
364 self._lines.append(line)
365 # After processing the status/request line, everything is a header.
366 max_line_length = self.max_field_size
368 if len(self._lines) > self.max_headers:
369 raise BadHttpMessage("Too many headers received")
371 start_pos = pos + len(SEP)
373 # \r\n\r\n found
374 if self._lines[-1] == EMPTY:
375 max_trailers = self.max_headers - len(self._lines)
376 try:
377 msg: _MsgT = self.parse_message(self._lines)
378 finally:
379 self._lines.clear()
381 def get_content_length() -> Optional[int]:
382 # payload length
383 length_hdr = msg.headers.get(CONTENT_LENGTH)
384 if length_hdr is None:
385 return None
387 # Shouldn't allow +/- or other number formats.
388 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
389 # msg.headers is already stripped of leading/trailing wsp
390 if not DIGITS.fullmatch(length_hdr):
391 raise InvalidHeader(CONTENT_LENGTH)
393 return int(length_hdr)
395 length = get_content_length()
396 # do not support old websocket spec
397 if SEC_WEBSOCKET_KEY1 in msg.headers:
398 raise InvalidHeader(SEC_WEBSOCKET_KEY1)
400 self._upgraded = msg.upgrade and _is_supported_upgrade(
401 msg.headers
402 )
404 method = getattr(msg, "method", self.method)
405 # code is only present on responses
406 code = getattr(msg, "code", 0)
408 assert self.protocol is not None
409 # calculate payload
410 empty_body = code in EMPTY_BODY_STATUS_CODES or bool(
411 method and method in EMPTY_BODY_METHODS
412 )
413 if not empty_body and (
414 ((length is not None and length > 0) or msg.chunked)
415 and not self._upgraded
416 ):
417 payload = StreamReader(
418 self.protocol,
419 timer=self.timer,
420 loop=loop,
421 limit=self._limit,
422 )
423 payload_parser = HttpPayloadParser(
424 payload,
425 length=length,
426 chunked=msg.chunked,
427 method=method,
428 compression=msg.compression,
429 code=self.code,
430 response_with_body=self.response_with_body,
431 auto_decompress=self._auto_decompress,
432 lax=self.lax,
433 headers_parser=self._headers_parser,
434 max_line_size=self.max_line_size,
435 max_field_size=self.max_field_size,
436 max_trailers=max_trailers,
437 )
438 if not payload_parser.done:
439 self._payload_parser = payload_parser
440 elif method == METH_CONNECT:
441 assert isinstance(msg, RawRequestMessage)
442 payload = StreamReader(
443 self.protocol,
444 timer=self.timer,
445 loop=loop,
446 limit=self._limit,
447 )
448 self._upgraded = True
449 self._payload_parser = HttpPayloadParser(
450 payload,
451 method=msg.method,
452 compression=msg.compression,
453 auto_decompress=self._auto_decompress,
454 lax=self.lax,
455 headers_parser=self._headers_parser,
456 max_line_size=self.max_line_size,
457 max_field_size=self.max_field_size,
458 max_trailers=max_trailers,
459 )
460 elif not empty_body and length is None and self.read_until_eof:
461 payload = StreamReader(
462 self.protocol,
463 timer=self.timer,
464 loop=loop,
465 limit=self._limit,
466 )
467 payload_parser = HttpPayloadParser(
468 payload,
469 length=length,
470 chunked=msg.chunked,
471 method=method,
472 compression=msg.compression,
473 code=self.code,
474 response_with_body=self.response_with_body,
475 auto_decompress=self._auto_decompress,
476 lax=self.lax,
477 headers_parser=self._headers_parser,
478 max_line_size=self.max_line_size,
479 max_field_size=self.max_field_size,
480 max_trailers=max_trailers,
481 )
482 if not payload_parser.done:
483 self._payload_parser = payload_parser
484 else:
485 payload = EMPTY_PAYLOAD
487 messages.append((msg, payload))
488 should_close = msg.should_close
489 else:
490 self._tail = data[start_pos:]
491 if len(self._tail) > self.max_line_size:
492 raise LineTooLong(self._tail[:100] + b"...", self.max_line_size)
493 data = EMPTY
494 break
496 # no parser, just store
497 elif self._payload_parser is None and self._upgraded:
498 assert not self._lines
499 break
501 # feed payload
502 elif data and start_pos < data_len:
503 assert not self._lines
504 assert self._payload_parser is not None
505 try:
506 eof, data = self._payload_parser.feed_data(data[start_pos:], SEP)
507 except BaseException as underlying_exc:
508 reraised_exc = underlying_exc
509 if self.payload_exception is not None:
510 reraised_exc = self.payload_exception(str(underlying_exc))
512 set_exception(
513 self._payload_parser.payload,
514 reraised_exc,
515 underlying_exc,
516 )
518 eof = True
519 data = b""
520 if isinstance(
521 underlying_exc, (InvalidHeader, TransferEncodingError)
522 ):
523 raise
525 if eof:
526 start_pos = 0
527 data_len = len(data)
528 self._payload_parser = None
529 continue
530 else:
531 break
533 if data and start_pos < data_len:
534 data = data[start_pos:]
535 else:
536 data = EMPTY
538 return messages, self._upgraded, data
540 def parse_headers(
541 self, lines: List[bytes]
542 ) -> Tuple[
543 "CIMultiDictProxy[str]", RawHeaders, Optional[bool], Optional[str], bool, bool
544 ]:
545 """Parses RFC 5322 headers from a stream.
547 Line continuations are supported. Returns list of header name
548 and value pairs. Header name is in upper case.
549 """
550 headers, raw_headers = self._headers_parser.parse_headers(lines)
551 close_conn = None
552 encoding = None
553 upgrade = False
554 chunked = False
556 # keep-alive and protocol switching
557 # RFC 9110 section 7.6.1 defines Connection as a comma-separated list.
558 conn_values = headers.getall(hdrs.CONNECTION, ())
559 if conn_values:
560 conn_tokens = {
561 token.lower()
562 for conn_value in conn_values
563 for token in (part.strip(" \t") for part in conn_value.split(","))
564 if token and token.isascii()
565 }
567 if "close" in conn_tokens:
568 close_conn = True
569 elif "keep-alive" in conn_tokens:
570 close_conn = False
572 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
573 if "upgrade" in conn_tokens and headers.get(hdrs.UPGRADE):
574 upgrade = True
576 # encoding
577 enc = headers.get(hdrs.CONTENT_ENCODING, "")
578 if enc.isascii() and enc.lower() in {"gzip", "deflate", "br", "zstd"}:
579 encoding = enc
581 # chunking
582 te = headers.get(hdrs.TRANSFER_ENCODING)
583 if te is not None:
584 if self._is_chunked_te(te):
585 chunked = True
587 if hdrs.CONTENT_LENGTH in headers:
588 raise BadHttpMessage(
589 "Transfer-Encoding can't be present with Content-Length",
590 )
592 return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
594 def set_upgraded(self, val: bool) -> None:
595 """Set connection upgraded (to websocket) mode.
597 :param bool val: new state.
598 """
599 self._upgraded = val
602class HttpRequestParser(HttpParser[RawRequestMessage]):
603 """Read request status line.
605 Exception .http_exceptions.BadStatusLine
606 could be raised in case of any errors in status line.
607 Returns RawRequestMessage.
608 """
610 def parse_message(self, lines: List[bytes]) -> RawRequestMessage:
611 # request line
612 line = lines[0].decode("utf-8", "surrogateescape")
613 try:
614 method, path, version = line.split(" ", maxsplit=2)
615 except ValueError:
616 raise BadHttpMethod(line) from None
618 # method
619 if not TOKENRE.fullmatch(method):
620 raise BadHttpMethod(method)
622 # version
623 match = VERSRE.fullmatch(version)
624 if match is None:
625 raise BadStatusLine(line)
626 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
628 if method == "CONNECT":
629 # authority-form,
630 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
631 url = URL.build(authority=path, encoded=True)
632 elif path.startswith("/"):
633 # origin-form,
634 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
635 path_part, _hash_separator, url_fragment = path.partition("#")
636 path_part, _question_mark_separator, qs_part = path_part.partition("?")
638 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
639 # NOTE: parser does, otherwise it results into the same
640 # NOTE: HTTP Request-Line input producing different
641 # NOTE: `yarl.URL()` objects
642 url = URL.build(
643 path=path_part,
644 query_string=qs_part,
645 fragment=url_fragment,
646 encoded=True,
647 )
648 elif path == "*" and method == "OPTIONS":
649 # asterisk-form,
650 url = URL(path, encoded=True)
651 else:
652 # absolute-form for proxy maybe,
653 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
654 url = URL(path, encoded=True)
655 if url.scheme == "":
656 # not absolute-form
657 raise InvalidURLError(
658 path.encode(errors="surrogateescape").decode("latin1")
659 )
661 # read headers
662 (
663 headers,
664 raw_headers,
665 close,
666 compression,
667 upgrade,
668 chunked,
669 ) = self.parse_headers(lines[1:])
671 if close is None: # then the headers weren't set in the request
672 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
673 close = True
674 else: # HTTP 1.1 must ask to close.
675 close = False
677 return RawRequestMessage(
678 method,
679 path,
680 version_o,
681 headers,
682 raw_headers,
683 close,
684 compression,
685 upgrade,
686 chunked,
687 url,
688 )
690 def _is_chunked_te(self, te: str) -> bool:
691 te = te.rsplit(",", maxsplit=1)[-1].strip(" \t")
692 # .lower() transforms some non-ascii chars, so must check first.
693 if te.isascii() and te.lower() == "chunked":
694 return True
695 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3
696 raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
699class HttpResponseParser(HttpParser[RawResponseMessage]):
700 """Read response status line and headers.
702 BadStatusLine could be raised in case of any errors in status line.
703 Returns RawResponseMessage.
704 """
706 # Lax mode should only be enabled on response parser.
707 lax = not DEBUG
709 def feed_data(
710 self,
711 data: bytes,
712 SEP: Optional[_SEP] = None,
713 *args: Any,
714 **kwargs: Any,
715 ) -> Tuple[List[Tuple[RawResponseMessage, StreamReader]], bool, bytes]:
716 if SEP is None:
717 SEP = b"\r\n" if DEBUG else b"\n"
718 return super().feed_data(data, SEP, *args, **kwargs)
720 def parse_message(self, lines: List[bytes]) -> RawResponseMessage:
721 line = lines[0].decode("utf-8", "surrogateescape")
722 try:
723 version, status = line.split(maxsplit=1)
724 except ValueError:
725 raise BadStatusLine(line) from None
727 try:
728 status, reason = status.split(maxsplit=1)
729 except ValueError:
730 status = status.strip()
731 reason = ""
733 # version
734 match = VERSRE.fullmatch(version)
735 if match is None:
736 raise BadStatusLine(line)
737 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
739 # The status code is a three-digit ASCII number, no padding
740 if len(status) != 3 or not DIGITS.fullmatch(status):
741 raise BadStatusLine(line)
742 status_i = int(status)
744 # read headers
745 (
746 headers,
747 raw_headers,
748 close,
749 compression,
750 upgrade,
751 chunked,
752 ) = self.parse_headers(lines[1:])
754 if close is None:
755 if version_o <= HttpVersion10:
756 close = True
757 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
758 elif 100 <= status_i < 200 or status_i in {204, 304}:
759 close = False
760 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
761 close = False
762 else:
763 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
764 close = True
766 return RawResponseMessage(
767 version_o,
768 status_i,
769 reason.strip(),
770 headers,
771 raw_headers,
772 close,
773 compression,
774 upgrade,
775 chunked,
776 )
778 def _is_chunked_te(self, te: str) -> bool:
779 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2
780 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked"
783class HttpPayloadParser:
784 def __init__(
785 self,
786 payload: StreamReader,
787 length: Optional[int] = None,
788 chunked: bool = False,
789 compression: Optional[str] = None,
790 code: Optional[int] = None,
791 method: Optional[str] = None,
792 response_with_body: bool = True,
793 auto_decompress: bool = True,
794 lax: bool = False,
795 *,
796 headers_parser: HeadersParser,
797 max_line_size: int = 8190,
798 max_field_size: int = 8190,
799 max_trailers: int = 128,
800 ) -> None:
801 self._length = 0
802 self._type = ParseState.PARSE_UNTIL_EOF
803 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
804 self._chunk_size = 0
805 self._chunk_tail = b""
806 self._auto_decompress = auto_decompress
807 self._lax = lax
808 self._headers_parser = headers_parser
809 self._max_line_size = max_line_size
810 self._max_field_size = max_field_size
811 self._max_trailers = max_trailers
812 self._trailer_lines: list[bytes] = []
813 self.done = False
815 # payload decompression wrapper
816 if response_with_body and compression and self._auto_decompress:
817 real_payload: Union[StreamReader, DeflateBuffer] = DeflateBuffer(
818 payload, compression
819 )
820 else:
821 real_payload = payload
823 # payload parser
824 if not response_with_body:
825 # don't parse payload if it's not expected to be received
826 self._type = ParseState.PARSE_NONE
827 real_payload.feed_eof()
828 self.done = True
829 elif chunked:
830 self._type = ParseState.PARSE_CHUNKED
831 elif length is not None:
832 self._type = ParseState.PARSE_LENGTH
833 self._length = length
834 if self._length == 0:
835 real_payload.feed_eof()
836 self.done = True
838 self.payload = real_payload
840 def feed_eof(self) -> None:
841 if self._type == ParseState.PARSE_UNTIL_EOF:
842 self.payload.feed_eof()
843 elif self._type == ParseState.PARSE_LENGTH:
844 raise ContentLengthError(
845 "Not enough data to satisfy content length header."
846 )
847 elif self._type == ParseState.PARSE_CHUNKED:
848 raise TransferEncodingError(
849 "Not enough data to satisfy transfer length header."
850 )
852 def feed_data(
853 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
854 ) -> Tuple[bool, bytes]:
855 # Read specified amount of bytes
856 if self._type == ParseState.PARSE_LENGTH:
857 required = self._length
858 chunk_len = len(chunk)
860 if required >= chunk_len:
861 self._length = required - chunk_len
862 self.payload.feed_data(chunk, chunk_len)
863 if self._length == 0:
864 self.payload.feed_eof()
865 return True, b""
866 else:
867 self._length = 0
868 self.payload.feed_data(chunk[:required], required)
869 self.payload.feed_eof()
870 return True, chunk[required:]
872 # Chunked transfer encoding parser
873 elif self._type == ParseState.PARSE_CHUNKED:
874 if self._chunk_tail:
875 # We should never have a tail if we're inside the payload body.
876 assert self._chunk != ChunkState.PARSE_CHUNKED_CHUNK
877 # We should check the length is sane.
878 max_line_length = self._max_line_size
879 if self._chunk == ChunkState.PARSE_TRAILERS:
880 max_line_length = self._max_field_size
881 if len(self._chunk_tail) > max_line_length:
882 raise LineTooLong(self._chunk_tail[:100] + b"...", max_line_length)
884 chunk = self._chunk_tail + chunk
885 self._chunk_tail = b""
887 while chunk:
889 # read next chunk size
890 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
891 pos = chunk.find(SEP)
892 if pos >= 0:
893 i = chunk.find(CHUNK_EXT, 0, pos)
894 if i >= 0:
895 size_b = chunk[:i] # strip chunk-extensions
896 # Verify no LF in the chunk-extension
897 if b"\n" in (ext := chunk[i:pos]):
898 exc = TransferEncodingError(
899 f"Unexpected LF in chunk-extension: {ext!r}"
900 )
901 set_exception(self.payload, exc)
902 raise exc
903 else:
904 size_b = chunk[:pos]
906 if self._lax: # Allow whitespace in lax mode.
907 size_b = size_b.strip()
909 if not re.fullmatch(HEXDIGITS, size_b):
910 exc = TransferEncodingError(
911 chunk[:pos].decode("ascii", "surrogateescape")
912 )
913 set_exception(self.payload, exc)
914 raise exc
915 size = int(bytes(size_b), 16)
917 chunk = chunk[pos + len(SEP) :]
918 if size == 0: # eof marker
919 self._chunk = ChunkState.PARSE_TRAILERS
920 if self._lax and chunk.startswith(b"\r"):
921 chunk = chunk[1:]
922 else:
923 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
924 self._chunk_size = size
925 self.payload.begin_http_chunk_receiving()
926 else:
927 self._chunk_tail = chunk
928 return False, b""
930 # read chunk and feed buffer
931 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
932 required = self._chunk_size
933 chunk_len = len(chunk)
935 if required > chunk_len:
936 self._chunk_size = required - chunk_len
937 self.payload.feed_data(chunk, chunk_len)
938 return False, b""
939 else:
940 self._chunk_size = 0
941 self.payload.feed_data(chunk[:required], required)
942 chunk = chunk[required:]
943 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
944 self.payload.end_http_chunk_receiving()
946 # toss the CRLF at the end of the chunk
947 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
948 if self._lax and chunk.startswith(b"\r"):
949 chunk = chunk[1:]
950 if chunk[: len(SEP)] == SEP:
951 chunk = chunk[len(SEP) :]
952 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
953 elif len(chunk) >= len(SEP) or chunk != SEP[: len(chunk)]:
954 exc = TransferEncodingError(
955 "Chunk size mismatch: expected CRLF after chunk data"
956 )
957 set_exception(self.payload, exc)
958 raise exc
959 else:
960 self._chunk_tail = chunk
961 return False, b""
963 if self._chunk == ChunkState.PARSE_TRAILERS:
964 pos = chunk.find(SEP)
965 if pos < 0: # No line found
966 self._chunk_tail = chunk
967 return False, b""
969 line = chunk[:pos]
970 chunk = chunk[pos + len(SEP) :]
971 if SEP == b"\n": # For lax response parsing
972 line = line.rstrip(b"\r")
974 if len(line) > self._max_field_size:
975 raise LineTooLong(line[:100] + b"...", self._max_field_size)
977 self._trailer_lines.append(line)
979 if len(self._trailer_lines) > self._max_trailers:
980 raise BadHttpMessage("Too many trailers received")
982 # \r\n\r\n found, end of stream
983 if self._trailer_lines[-1] == b"":
984 # Headers and trailers are defined the same way,
985 # so we reuse the HeadersParser here.
986 try:
987 trailers, raw_trailers = self._headers_parser.parse_headers(
988 self._trailer_lines
989 )
990 finally:
991 self._trailer_lines.clear()
992 self.payload.feed_eof()
993 return True, chunk
995 # Read all bytes until eof
996 elif self._type == ParseState.PARSE_UNTIL_EOF:
997 self.payload.feed_data(chunk, len(chunk))
999 return False, b""
1002class DeflateBuffer:
1003 """DeflateStream decompress stream and feed data into specified stream."""
1005 decompressor: Any
1007 def __init__(
1008 self,
1009 out: StreamReader,
1010 encoding: Optional[str],
1011 max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE,
1012 ) -> None:
1013 self.out = out
1014 self.size = 0
1015 out.total_compressed_bytes = self.size
1016 self.encoding = encoding
1017 self._started_decoding = False
1019 self.decompressor: Union[BrotliDecompressor, ZLibDecompressor, ZSTDDecompressor]
1020 if encoding == "br":
1021 if not HAS_BROTLI: # pragma: no cover
1022 raise ContentEncodingError(
1023 "Can not decode content-encoding: brotli (br). "
1024 "Please install `Brotli`"
1025 )
1026 self.decompressor = BrotliDecompressor()
1027 elif encoding == "zstd":
1028 if not HAS_ZSTD:
1029 raise ContentEncodingError(
1030 "Can not decode content-encoding: zstandard (zstd). "
1031 "Please install `backports.zstd`"
1032 )
1033 self.decompressor = ZSTDDecompressor()
1034 else:
1035 self.decompressor = ZLibDecompressor(encoding=encoding)
1037 self._max_decompress_size = max_decompress_size
1039 def set_exception(
1040 self,
1041 exc: BaseException,
1042 exc_cause: BaseException = _EXC_SENTINEL,
1043 ) -> None:
1044 set_exception(self.out, exc, exc_cause)
1046 def feed_data(self, chunk: bytes, size: int) -> None:
1047 if not size:
1048 return
1050 self.size += size
1051 self.out.total_compressed_bytes = self.size
1053 # RFC1950
1054 # bits 0..3 = CM = 0b1000 = 8 = "deflate"
1055 # bits 4..7 = CINFO = 1..7 = windows size.
1056 if (
1057 not self._started_decoding
1058 and self.encoding == "deflate"
1059 and chunk[0] & 0xF != 8
1060 ):
1061 # Change the decoder to decompress incorrectly compressed data
1062 # Actually we should issue a warning about non-RFC-compliant data.
1063 self.decompressor = ZLibDecompressor(
1064 encoding=self.encoding, suppress_deflate_header=True
1065 )
1067 try:
1068 # Decompress with limit + 1 so we can detect if output exceeds limit
1069 chunk = self.decompressor.decompress_sync(
1070 chunk, max_length=self._max_decompress_size + 1
1071 )
1072 except Exception:
1073 raise ContentEncodingError(
1074 "Can not decode content-encoding: %s" % self.encoding
1075 )
1077 self._started_decoding = True
1079 # Check if decompression limit was exceeded
1080 if len(chunk) > self._max_decompress_size:
1081 raise DecompressSizeError(
1082 "Decompressed data exceeds the configured limit of %d bytes"
1083 % self._max_decompress_size
1084 )
1086 if chunk:
1087 self.out.feed_data(chunk, len(chunk))
1089 def feed_eof(self) -> None:
1090 chunk = self.decompressor.flush()
1092 if chunk or self.size > 0:
1093 self.out.feed_data(chunk, len(chunk))
1094 if self.encoding == "deflate" and not self.decompressor.eof:
1095 raise ContentEncodingError("deflate")
1097 self.out.feed_eof()
1099 def begin_http_chunk_receiving(self) -> None:
1100 self.out.begin_http_chunk_receiving()
1102 def end_http_chunk_receiving(self) -> None:
1103 self.out.end_http_chunk_receiving()
1106HttpRequestParserPy = HttpRequestParser
1107HttpResponseParserPy = HttpResponseParser
1108RawRequestMessagePy = RawRequestMessage
1109RawResponseMessagePy = RawResponseMessage
1111try:
1112 if not NO_EXTENSIONS:
1113 from ._http_parser import ( # type: ignore[import-not-found,no-redef]
1114 HttpRequestParser,
1115 HttpResponseParser,
1116 RawRequestMessage,
1117 RawResponseMessage,
1118 )
1120 HttpRequestParserC = HttpRequestParser
1121 HttpResponseParserC = HttpResponseParser
1122 RawRequestMessageC = RawRequestMessage
1123 RawResponseMessageC = RawResponseMessage
1124except ImportError: # pragma: no cover
1125 pass