Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_parser.py: 85%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import asyncio
3import re
4import string
5from contextlib import suppress
6from enum import IntEnum
7from re import Pattern
8from typing import Any, ClassVar, Final, Generic, Literal, NamedTuple, TypeVar
10from multidict import CIMultiDict, CIMultiDictProxy, istr
11from yarl import URL
13from . import hdrs
14from .base_protocol import BaseProtocol
15from .compression_utils import (
16 DEFAULT_MAX_DECOMPRESS_SIZE,
17 HAS_BROTLI,
18 HAS_ZSTD,
19 BrotliDecompressor,
20 ZLibDecompressor,
21 ZSTDDecompressor,
22)
23from .helpers import (
24 _EXC_SENTINEL,
25 DEBUG,
26 EMPTY_BODY_METHODS,
27 EMPTY_BODY_STATUS_CODES,
28 NO_EXTENSIONS,
29 BaseTimerContext,
30 set_exception,
31)
32from .http_exceptions import (
33 BadHttpMessage,
34 BadHttpMethod,
35 BadStatusLine,
36 ContentEncodingError,
37 ContentLengthError,
38 DecompressSizeError,
39 InvalidHeader,
40 InvalidURLError,
41 LineTooLong,
42 TransferEncodingError,
43)
44from .http_writer import HttpVersion, HttpVersion10
45from .streams import EMPTY_PAYLOAD, StreamReader
46from .typedefs import RawHeaders
48__all__ = (
49 "HeadersParser",
50 "HttpParser",
51 "HttpRequestParser",
52 "HttpResponseParser",
53 "RawRequestMessage",
54 "RawResponseMessage",
55)
57_SEP = Literal[b"\r\n", b"\n"]
59ASCIISET: Final[set[str]] = set(string.printable)
61# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
62# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
63#
64# method = token
65# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
66# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
67# token = 1*tchar
68_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
69TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
70# https://www.rfc-editor.org/rfc/rfc9110#section-5.5-5
71_FIELD_VALUE_FORBIDDEN_CTL_RE: Final[Pattern[str]] = re.compile(
72 r"[\x00-\x08\x0a-\x1f\x7f]"
73)
74VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
75DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
76HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
78# RFC 9110 singleton headers — duplicates are rejected in strict mode.
79# In lax mode (response parser default), the check is skipped entirely
80# since real-world servers (e.g. Google APIs, Werkzeug) commonly send
81# duplicate headers like Content-Type or Server.
82# Lowercased for case-insensitive matching against wire names.
83SINGLETON_HEADERS: Final[frozenset[str]] = frozenset(
84 {
85 "content-length",
86 "content-location",
87 "content-range",
88 "content-type",
89 "etag",
90 "host",
91 "max-forwards",
92 "server",
93 "transfer-encoding",
94 "user-agent",
95 }
96)
99class RawRequestMessage(NamedTuple):
100 method: str
101 path: str
102 version: HttpVersion
103 headers: CIMultiDictProxy[str]
104 raw_headers: RawHeaders
105 should_close: bool
106 compression: str | None
107 upgrade: bool
108 chunked: bool
109 url: URL
112class RawResponseMessage(NamedTuple):
113 version: HttpVersion
114 code: int
115 reason: str
116 headers: CIMultiDictProxy[str]
117 raw_headers: RawHeaders
118 should_close: bool
119 compression: str | None
120 upgrade: bool
121 chunked: bool
124_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
127class ParseState(IntEnum):
128 PARSE_NONE = 0
129 PARSE_LENGTH = 1
130 PARSE_CHUNKED = 2
131 PARSE_UNTIL_EOF = 3
134class ChunkState(IntEnum):
135 PARSE_CHUNKED_SIZE = 0
136 PARSE_CHUNKED_CHUNK = 1
137 PARSE_CHUNKED_CHUNK_EOF = 2
138 PARSE_TRAILERS = 4
141class HeadersParser:
142 def __init__(self, max_field_size: int = 8190, lax: bool = False) -> None:
143 self.max_field_size = max_field_size
144 self._lax = lax
146 def parse_headers(
147 self, lines: list[bytes]
148 ) -> tuple["CIMultiDictProxy[str]", RawHeaders]:
149 headers: CIMultiDict[str] = CIMultiDict()
150 # note: "raw" does not mean inclusion of OWS before/after the field value
151 raw_headers = []
153 lines_idx = 0
154 line = lines[lines_idx]
155 line_count = len(lines)
157 while line:
158 # Parse initial header name : value pair.
159 try:
160 bname, bvalue = line.split(b":", 1)
161 except ValueError:
162 raise InvalidHeader(line) from None
164 if len(bname) == 0:
165 raise InvalidHeader(bname)
167 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
168 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
169 raise InvalidHeader(line)
171 bvalue = bvalue.lstrip(b" \t")
172 name = bname.decode("utf-8", "surrogateescape")
173 if not TOKENRE.fullmatch(name):
174 raise InvalidHeader(bname)
176 # next line
177 lines_idx += 1
178 line = lines[lines_idx]
180 # consume continuation lines
181 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
183 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
184 if continuation:
185 header_length = len(bvalue)
186 bvalue_lst = [bvalue]
187 while continuation:
188 header_length += len(line)
189 if header_length > self.max_field_size:
190 header_line = bname + b": " + b"".join(bvalue_lst)
191 raise LineTooLong(
192 header_line[:100] + b"...", self.max_field_size
193 )
194 bvalue_lst.append(line)
196 # next line
197 lines_idx += 1
198 if lines_idx < line_count:
199 line = lines[lines_idx]
200 if line:
201 continuation = line[0] in (32, 9) # (' ', '\t')
202 else:
203 line = b""
204 break
205 bvalue = b"".join(bvalue_lst)
207 bvalue = bvalue.strip(b" \t")
208 value = bvalue.decode("utf-8", "surrogateescape")
210 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
211 if self._lax:
212 if "\n" in value or "\r" in value or "\x00" in value:
213 raise InvalidHeader(bvalue)
214 elif _FIELD_VALUE_FORBIDDEN_CTL_RE.search(value):
215 raise InvalidHeader(bvalue)
217 if not self._lax and name in headers and name.lower() in SINGLETON_HEADERS:
218 raise BadHttpMessage(f"Duplicate '{name}' header found.")
219 headers.add(name, value)
220 raw_headers.append((bname, bvalue))
222 return (CIMultiDictProxy(headers), tuple(raw_headers))
225def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool:
226 """Check if the upgrade header is supported."""
227 u = headers.get(hdrs.UPGRADE, "")
228 # .lower() can transform non-ascii characters.
229 return u.isascii() and u.lower() in {"tcp", "websocket"}
232class HttpParser(abc.ABC, Generic[_MsgT]):
233 lax: ClassVar[bool] = False
235 def __init__(
236 self,
237 protocol: BaseProtocol,
238 loop: asyncio.AbstractEventLoop,
239 limit: int,
240 max_line_size: int = 8190,
241 max_headers: int = 128,
242 max_field_size: int = 8190,
243 timer: BaseTimerContext | None = None,
244 code: int | None = None,
245 method: str | None = None,
246 payload_exception: type[BaseException] | None = None,
247 response_with_body: bool = True,
248 read_until_eof: bool = False,
249 auto_decompress: bool = True,
250 ) -> None:
251 self.protocol = protocol
252 self.loop = loop
253 self.max_line_size = max_line_size
254 self.max_field_size = max_field_size
255 self.max_headers = max_headers
256 self.timer = timer
257 self.code = code
258 self.method = method
259 self.payload_exception = payload_exception
260 self.response_with_body = response_with_body
261 self.read_until_eof = read_until_eof
263 self._lines: list[bytes] = []
264 self._tail = b""
265 self._upgraded = False
266 self._payload = None
267 self._payload_parser: HttpPayloadParser | None = None
268 self._auto_decompress = auto_decompress
269 self._limit = limit
270 self._headers_parser = HeadersParser(max_field_size, self.lax)
272 @abc.abstractmethod
273 def parse_message(self, lines: list[bytes]) -> _MsgT: ...
275 @abc.abstractmethod
276 def _is_chunked_te(self, te: str) -> bool: ...
278 def feed_eof(self) -> _MsgT | None:
279 if self._payload_parser is not None:
280 self._payload_parser.feed_eof()
281 self._payload_parser = None
282 else:
283 # try to extract partial message
284 if self._tail:
285 self._lines.append(self._tail)
287 if self._lines:
288 if self._lines[-1] != "\r\n":
289 self._lines.append(b"")
290 with suppress(Exception):
291 return self.parse_message(self._lines)
292 return None
294 def feed_data(
295 self,
296 data: bytes,
297 SEP: _SEP = b"\r\n",
298 EMPTY: bytes = b"",
299 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
300 METH_CONNECT: str = hdrs.METH_CONNECT,
301 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
302 ) -> tuple[list[tuple[_MsgT, StreamReader]], bool, bytes]:
303 messages = []
305 if self._tail:
306 data, self._tail = self._tail + data, b""
308 data_len = len(data)
309 start_pos = 0
310 loop = self.loop
311 max_line_length = self.max_line_size
313 should_close = False
314 while start_pos < data_len:
315 # read HTTP message (request/response line + headers), \r\n\r\n
316 # and split by lines
317 if self._payload_parser is None and not self._upgraded:
318 pos = data.find(SEP, start_pos)
319 # consume \r\n
320 if pos == start_pos and not self._lines:
321 start_pos = pos + len(SEP)
322 continue
324 if pos >= start_pos:
325 if should_close:
326 raise BadHttpMessage("Data after `Connection: close`")
328 # line found
329 line = data[start_pos:pos]
330 if SEP == b"\n": # For lax response parsing
331 line = line.rstrip(b"\r")
332 if len(line) > max_line_length:
333 raise LineTooLong(line[:100] + b"...", max_line_length)
335 self._lines.append(line)
336 # After processing the status/request line, everything is a header.
337 max_line_length = self.max_field_size
339 if len(self._lines) > self.max_headers:
340 raise BadHttpMessage("Too many headers received")
342 start_pos = pos + len(SEP)
344 # \r\n\r\n found
345 if self._lines[-1] == EMPTY:
346 max_trailers = self.max_headers - len(self._lines)
347 try:
348 msg: _MsgT = self.parse_message(self._lines)
349 finally:
350 self._lines.clear()
352 def get_content_length() -> int | None:
353 # payload length
354 length_hdr = msg.headers.get(CONTENT_LENGTH)
355 if length_hdr is None:
356 return None
358 # Shouldn't allow +/- or other number formats.
359 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
360 # msg.headers is already stripped of leading/trailing wsp
361 if not DIGITS.fullmatch(length_hdr):
362 raise InvalidHeader(CONTENT_LENGTH)
364 return int(length_hdr)
366 length = get_content_length()
367 # do not support old websocket spec
368 if SEC_WEBSOCKET_KEY1 in msg.headers:
369 raise InvalidHeader(SEC_WEBSOCKET_KEY1)
371 self._upgraded = msg.upgrade and _is_supported_upgrade(
372 msg.headers
373 )
375 method = getattr(msg, "method", self.method)
376 # code is only present on responses
377 code = getattr(msg, "code", 0)
379 assert self.protocol is not None
380 # calculate payload
381 empty_body = code in EMPTY_BODY_STATUS_CODES or bool(
382 method and method in EMPTY_BODY_METHODS
383 )
384 if not empty_body and (
385 ((length is not None and length > 0) or msg.chunked)
386 and not self._upgraded
387 ):
388 payload = StreamReader(
389 self.protocol,
390 timer=self.timer,
391 loop=loop,
392 limit=self._limit,
393 )
394 payload_parser = HttpPayloadParser(
395 payload,
396 length=length,
397 chunked=msg.chunked,
398 method=method,
399 compression=msg.compression,
400 code=self.code,
401 response_with_body=self.response_with_body,
402 auto_decompress=self._auto_decompress,
403 lax=self.lax,
404 headers_parser=self._headers_parser,
405 max_line_size=self.max_line_size,
406 max_field_size=self.max_field_size,
407 max_trailers=max_trailers,
408 )
409 if not payload_parser.done:
410 self._payload_parser = payload_parser
411 elif method == METH_CONNECT:
412 assert isinstance(msg, RawRequestMessage)
413 payload = StreamReader(
414 self.protocol,
415 timer=self.timer,
416 loop=loop,
417 limit=self._limit,
418 )
419 self._upgraded = True
420 self._payload_parser = HttpPayloadParser(
421 payload,
422 method=msg.method,
423 compression=msg.compression,
424 auto_decompress=self._auto_decompress,
425 lax=self.lax,
426 headers_parser=self._headers_parser,
427 max_line_size=self.max_line_size,
428 max_field_size=self.max_field_size,
429 max_trailers=max_trailers,
430 )
431 elif not empty_body and length is None and self.read_until_eof:
432 payload = StreamReader(
433 self.protocol,
434 timer=self.timer,
435 loop=loop,
436 limit=self._limit,
437 )
438 payload_parser = HttpPayloadParser(
439 payload,
440 length=length,
441 chunked=msg.chunked,
442 method=method,
443 compression=msg.compression,
444 code=self.code,
445 response_with_body=self.response_with_body,
446 auto_decompress=self._auto_decompress,
447 lax=self.lax,
448 headers_parser=self._headers_parser,
449 max_line_size=self.max_line_size,
450 max_field_size=self.max_field_size,
451 max_trailers=max_trailers,
452 )
453 if not payload_parser.done:
454 self._payload_parser = payload_parser
455 else:
456 payload = EMPTY_PAYLOAD
458 messages.append((msg, payload))
459 should_close = msg.should_close
460 else:
461 self._tail = data[start_pos:]
462 if len(self._tail) > self.max_line_size:
463 raise LineTooLong(self._tail[:100] + b"...", self.max_line_size)
464 data = EMPTY
465 break
467 # no parser, just store
468 elif self._payload_parser is None and self._upgraded:
469 assert not self._lines
470 break
472 # feed payload
473 elif data and start_pos < data_len:
474 assert not self._lines
475 assert self._payload_parser is not None
476 try:
477 eof, data = self._payload_parser.feed_data(data[start_pos:], SEP)
478 except Exception as underlying_exc:
479 reraised_exc: BaseException = underlying_exc
480 if self.payload_exception is not None:
481 reraised_exc = self.payload_exception(str(underlying_exc))
483 set_exception(
484 self._payload_parser.payload,
485 reraised_exc,
486 underlying_exc,
487 )
489 eof = True
490 data = b""
491 if isinstance(
492 underlying_exc, (InvalidHeader, TransferEncodingError)
493 ):
494 raise
496 if eof:
497 start_pos = 0
498 data_len = len(data)
499 self._payload_parser = None
500 continue
501 else:
502 break
504 if data and start_pos < data_len:
505 data = data[start_pos:]
506 else:
507 data = EMPTY
509 return messages, self._upgraded, data
511 def parse_headers(
512 self, lines: list[bytes]
513 ) -> tuple[
514 "CIMultiDictProxy[str]", RawHeaders, bool | None, str | None, bool, bool
515 ]:
516 """Parses RFC 5322 headers from a stream.
518 Line continuations are supported. Returns list of header name
519 and value pairs. Header name is in upper case.
520 """
521 headers, raw_headers = self._headers_parser.parse_headers(lines)
522 close_conn = None
523 encoding = None
524 upgrade = False
525 chunked = False
527 # keep-alive and protocol switching
528 # RFC 9110 section 7.6.1 defines Connection as a comma-separated list.
529 conn_values = headers.getall(hdrs.CONNECTION, ())
530 if conn_values:
531 conn_tokens = {
532 token.lower()
533 for conn_value in conn_values
534 for token in (part.strip(" \t") for part in conn_value.split(","))
535 if token and token.isascii()
536 }
538 if "close" in conn_tokens:
539 close_conn = True
540 elif "keep-alive" in conn_tokens:
541 close_conn = False
543 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
544 if "upgrade" in conn_tokens and headers.get(hdrs.UPGRADE):
545 upgrade = True
547 # encoding
548 enc = headers.get(hdrs.CONTENT_ENCODING, "")
549 if enc.isascii() and enc.lower() in {"gzip", "deflate", "br", "zstd"}:
550 encoding = enc
552 # chunking
553 te = headers.get(hdrs.TRANSFER_ENCODING)
554 if te is not None:
555 if self._is_chunked_te(te):
556 chunked = True
558 if hdrs.CONTENT_LENGTH in headers:
559 raise BadHttpMessage(
560 "Transfer-Encoding can't be present with Content-Length",
561 )
563 return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
565 def set_upgraded(self, val: bool) -> None:
566 """Set connection upgraded (to websocket) mode.
568 :param bool val: new state.
569 """
570 self._upgraded = val
573class HttpRequestParser(HttpParser[RawRequestMessage]):
574 """Read request status line.
576 Exception .http_exceptions.BadStatusLine
577 could be raised in case of any errors in status line.
578 Returns RawRequestMessage.
579 """
581 def parse_message(self, lines: list[bytes]) -> RawRequestMessage:
582 # request line
583 line = lines[0].decode("utf-8", "surrogateescape")
584 try:
585 method, path, version = line.split(" ", maxsplit=2)
586 except ValueError:
587 raise BadHttpMethod(line) from None
589 # method
590 if not TOKENRE.fullmatch(method):
591 raise BadHttpMethod(method)
593 # version
594 match = VERSRE.fullmatch(version)
595 if match is None:
596 raise BadStatusLine(line)
597 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
599 if method == "CONNECT":
600 # authority-form,
601 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
602 url = URL.build(authority=path, encoded=True)
603 elif path.startswith("/"):
604 # origin-form,
605 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
606 path_part, _hash_separator, url_fragment = path.partition("#")
607 path_part, _question_mark_separator, qs_part = path_part.partition("?")
609 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
610 # NOTE: parser does, otherwise it results into the same
611 # NOTE: HTTP Request-Line input producing different
612 # NOTE: `yarl.URL()` objects
613 url = URL.build(
614 path=path_part,
615 query_string=qs_part,
616 fragment=url_fragment,
617 encoded=True,
618 )
619 elif path == "*" and method == "OPTIONS":
620 # asterisk-form,
621 url = URL(path, encoded=True)
622 else:
623 # absolute-form for proxy maybe,
624 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
625 url = URL(path, encoded=True)
626 if url.scheme == "":
627 # not absolute-form
628 raise InvalidURLError(
629 path.encode(errors="surrogateescape").decode("latin1")
630 )
632 # read headers
633 (
634 headers,
635 raw_headers,
636 close,
637 compression,
638 upgrade,
639 chunked,
640 ) = self.parse_headers(lines[1:])
642 if close is None: # then the headers weren't set in the request
643 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
644 close = True
645 else: # HTTP 1.1 must ask to close.
646 close = False
648 return RawRequestMessage(
649 method,
650 path,
651 version_o,
652 headers,
653 raw_headers,
654 close,
655 compression,
656 upgrade,
657 chunked,
658 url,
659 )
661 def _is_chunked_te(self, te: str) -> bool:
662 # https://www.rfc-editor.org/rfc/rfc9112#section-7.1-3
663 # "A sender MUST NOT apply the chunked transfer coding more
664 # than once to a message body"
665 parts = [p.strip(" \t") for p in te.split(",")]
666 chunked_count = sum(1 for p in parts if p.isascii() and p.lower() == "chunked")
667 if chunked_count > 1:
668 raise BadHttpMessage("Request has duplicate `chunked` Transfer-Encoding")
669 last = parts[-1]
670 # .lower() transforms some non-ascii chars, so must check first.
671 if last.isascii() and last.lower() == "chunked":
672 return True
673 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3
674 raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
677class HttpResponseParser(HttpParser[RawResponseMessage]):
678 """Read response status line and headers.
680 BadStatusLine could be raised in case of any errors in status line.
681 Returns RawResponseMessage.
682 """
684 # Lax mode should only be enabled on response parser.
685 lax = not DEBUG
687 def feed_data(
688 self,
689 data: bytes,
690 SEP: _SEP | None = None,
691 *args: Any,
692 **kwargs: Any,
693 ) -> tuple[list[tuple[RawResponseMessage, StreamReader]], bool, bytes]:
694 if SEP is None:
695 SEP = b"\r\n" if DEBUG else b"\n"
696 return super().feed_data(data, SEP, *args, **kwargs)
698 def parse_message(self, lines: list[bytes]) -> RawResponseMessage:
699 line = lines[0].decode("utf-8", "surrogateescape")
700 try:
701 version, status = line.split(maxsplit=1)
702 except ValueError:
703 raise BadStatusLine(line) from None
705 try:
706 status, reason = status.split(maxsplit=1)
707 except ValueError:
708 status = status.strip()
709 reason = ""
711 # version
712 match = VERSRE.fullmatch(version)
713 if match is None:
714 raise BadStatusLine(line)
715 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
717 # The status code is a three-digit ASCII number, no padding
718 if len(status) != 3 or not DIGITS.fullmatch(status):
719 raise BadStatusLine(line)
720 status_i = int(status)
722 # read headers
723 (
724 headers,
725 raw_headers,
726 close,
727 compression,
728 upgrade,
729 chunked,
730 ) = self.parse_headers(lines[1:])
732 if close is None:
733 if version_o <= HttpVersion10:
734 close = True
735 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
736 elif 100 <= status_i < 200 or status_i in {204, 304}:
737 close = False
738 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
739 close = False
740 else:
741 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
742 close = True
744 return RawResponseMessage(
745 version_o,
746 status_i,
747 reason.strip(),
748 headers,
749 raw_headers,
750 close,
751 compression,
752 upgrade,
753 chunked,
754 )
756 def _is_chunked_te(self, te: str) -> bool:
757 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2
758 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked"
761class HttpPayloadParser:
762 def __init__(
763 self,
764 payload: StreamReader,
765 length: int | None = None,
766 chunked: bool = False,
767 compression: str | None = None,
768 code: int | None = None,
769 method: str | None = None,
770 response_with_body: bool = True,
771 auto_decompress: bool = True,
772 lax: bool = False,
773 *,
774 headers_parser: HeadersParser,
775 max_line_size: int = 8190,
776 max_field_size: int = 8190,
777 max_trailers: int = 128,
778 ) -> None:
779 self._length = 0
780 self._type = ParseState.PARSE_UNTIL_EOF
781 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
782 self._chunk_size = 0
783 self._chunk_tail = b""
784 self._auto_decompress = auto_decompress
785 self._lax = lax
786 self._headers_parser = headers_parser
787 self._max_line_size = max_line_size
788 self._max_field_size = max_field_size
789 self._max_trailers = max_trailers
790 self._trailer_lines: list[bytes] = []
791 self.done = False
793 # payload decompression wrapper
794 if response_with_body and compression and self._auto_decompress:
795 real_payload: StreamReader | DeflateBuffer = DeflateBuffer(
796 payload, compression
797 )
798 else:
799 real_payload = payload
801 # payload parser
802 if not response_with_body:
803 # don't parse payload if it's not expected to be received
804 self._type = ParseState.PARSE_NONE
805 real_payload.feed_eof()
806 self.done = True
807 elif chunked:
808 self._type = ParseState.PARSE_CHUNKED
809 elif length is not None:
810 self._type = ParseState.PARSE_LENGTH
811 self._length = length
812 if self._length == 0:
813 real_payload.feed_eof()
814 self.done = True
816 self.payload = real_payload
818 def feed_eof(self) -> None:
819 if self._type == ParseState.PARSE_UNTIL_EOF:
820 self.payload.feed_eof()
821 elif self._type == ParseState.PARSE_LENGTH:
822 raise ContentLengthError(
823 "Not enough data to satisfy content length header."
824 )
825 elif self._type == ParseState.PARSE_CHUNKED:
826 raise TransferEncodingError(
827 "Not enough data to satisfy transfer length header."
828 )
830 def feed_data(
831 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
832 ) -> tuple[bool, bytes]:
833 # Read specified amount of bytes
834 if self._type == ParseState.PARSE_LENGTH:
835 required = self._length
836 self._length = max(required - len(chunk), 0)
837 self.payload.feed_data(chunk[:required])
838 if self._length == 0:
839 self.payload.feed_eof()
840 return True, chunk[required:]
842 # Chunked transfer encoding parser
843 elif self._type == ParseState.PARSE_CHUNKED:
844 if self._chunk_tail:
845 # We should never have a tail if we're inside the payload body.
846 assert self._chunk != ChunkState.PARSE_CHUNKED_CHUNK
847 # We should check the length is sane.
848 max_line_length = self._max_line_size
849 if self._chunk == ChunkState.PARSE_TRAILERS:
850 max_line_length = self._max_field_size
851 if len(self._chunk_tail) > max_line_length:
852 raise LineTooLong(self._chunk_tail[:100] + b"...", max_line_length)
854 chunk = self._chunk_tail + chunk
855 self._chunk_tail = b""
857 while chunk:
858 # read next chunk size
859 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
860 pos = chunk.find(SEP)
861 if pos >= 0:
862 i = chunk.find(CHUNK_EXT, 0, pos)
863 if i >= 0:
864 size_b = chunk[:i] # strip chunk-extensions
865 # Verify no LF in the chunk-extension
866 if b"\n" in (ext := chunk[i:pos]):
867 exc = TransferEncodingError(
868 f"Unexpected LF in chunk-extension: {ext!r}"
869 )
870 set_exception(self.payload, exc)
871 raise exc
872 else:
873 size_b = chunk[:pos]
875 if self._lax: # Allow whitespace in lax mode.
876 size_b = size_b.strip()
878 if not re.fullmatch(HEXDIGITS, size_b):
879 exc = TransferEncodingError(
880 chunk[:pos].decode("ascii", "surrogateescape")
881 )
882 set_exception(self.payload, exc)
883 raise exc
884 size = int(bytes(size_b), 16)
886 chunk = chunk[pos + len(SEP) :]
887 if size == 0: # eof marker
888 self._chunk = ChunkState.PARSE_TRAILERS
889 if self._lax and chunk.startswith(b"\r"):
890 chunk = chunk[1:]
891 else:
892 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
893 self._chunk_size = size
894 self.payload.begin_http_chunk_receiving()
895 else:
896 self._chunk_tail = chunk
897 return False, b""
899 # read chunk and feed buffer
900 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
901 required = self._chunk_size
902 self._chunk_size = max(required - len(chunk), 0)
903 self.payload.feed_data(chunk[:required])
905 if self._chunk_size:
906 return False, b""
907 chunk = chunk[required:]
908 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
909 self.payload.end_http_chunk_receiving()
911 # toss the CRLF at the end of the chunk
912 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
913 if self._lax and chunk.startswith(b"\r"):
914 chunk = chunk[1:]
915 if chunk[: len(SEP)] == SEP:
916 chunk = chunk[len(SEP) :]
917 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
918 elif len(chunk) >= len(SEP) or chunk != SEP[: len(chunk)]:
919 exc = TransferEncodingError(
920 "Chunk size mismatch: expected CRLF after chunk data"
921 )
922 set_exception(self.payload, exc)
923 raise exc
924 else:
925 self._chunk_tail = chunk
926 return False, b""
928 if self._chunk == ChunkState.PARSE_TRAILERS:
929 pos = chunk.find(SEP)
930 if pos < 0: # No line found
931 self._chunk_tail = chunk
932 return False, b""
934 line = chunk[:pos]
935 chunk = chunk[pos + len(SEP) :]
936 if SEP == b"\n": # For lax response parsing
937 line = line.rstrip(b"\r")
939 if len(line) > self._max_field_size:
940 raise LineTooLong(line[:100] + b"...", self._max_field_size)
942 self._trailer_lines.append(line)
944 if len(self._trailer_lines) > self._max_trailers:
945 raise BadHttpMessage("Too many trailers received")
947 # \r\n\r\n found, end of stream
948 if self._trailer_lines[-1] == b"":
949 # Headers and trailers are defined the same way,
950 # so we reuse the HeadersParser here.
951 try:
952 trailers, raw_trailers = self._headers_parser.parse_headers(
953 self._trailer_lines
954 )
955 finally:
956 self._trailer_lines.clear()
957 self.payload.feed_eof()
958 return True, chunk
960 # Read all bytes until eof
961 elif self._type == ParseState.PARSE_UNTIL_EOF:
962 self.payload.feed_data(chunk)
964 return False, b""
967class DeflateBuffer:
968 """DeflateStream decompress stream and feed data into specified stream."""
970 def __init__(
971 self,
972 out: StreamReader,
973 encoding: str | None,
974 max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE,
975 ) -> None:
976 self.out = out
977 self.size = 0
978 out.total_compressed_bytes = self.size
979 self.encoding = encoding
980 self._started_decoding = False
982 self.decompressor: BrotliDecompressor | ZLibDecompressor | ZSTDDecompressor
983 if encoding == "br":
984 if not HAS_BROTLI:
985 raise ContentEncodingError(
986 "Can not decode content-encoding: brotli (br). "
987 "Please install `Brotli`"
988 )
989 self.decompressor = BrotliDecompressor()
990 elif encoding == "zstd":
991 if not HAS_ZSTD:
992 raise ContentEncodingError(
993 "Can not decode content-encoding: zstandard (zstd). "
994 "Please install `backports.zstd`"
995 )
996 self.decompressor = ZSTDDecompressor()
997 else:
998 self.decompressor = ZLibDecompressor(encoding=encoding)
1000 self._max_decompress_size = max_decompress_size
1002 def set_exception(
1003 self,
1004 exc: type[BaseException] | BaseException,
1005 exc_cause: BaseException = _EXC_SENTINEL,
1006 ) -> None:
1007 set_exception(self.out, exc, exc_cause)
1009 def feed_data(self, chunk: bytes) -> None:
1010 if not chunk:
1011 return
1013 self.size += len(chunk)
1014 self.out.total_compressed_bytes = self.size
1016 # RFC1950
1017 # bits 0..3 = CM = 0b1000 = 8 = "deflate"
1018 # bits 4..7 = CINFO = 1..7 = windows size.
1019 if (
1020 not self._started_decoding
1021 and self.encoding == "deflate"
1022 and chunk[0] & 0xF != 8
1023 ):
1024 # Change the decoder to decompress incorrectly compressed data
1025 # Actually we should issue a warning about non-RFC-compliant data.
1026 self.decompressor = ZLibDecompressor(
1027 encoding=self.encoding, suppress_deflate_header=True
1028 )
1030 try:
1031 # Decompress with limit + 1 so we can detect if output exceeds limit
1032 chunk = self.decompressor.decompress_sync(
1033 chunk, max_length=self._max_decompress_size + 1
1034 )
1035 except Exception:
1036 raise ContentEncodingError(
1037 "Can not decode content-encoding: %s" % self.encoding
1038 )
1040 self._started_decoding = True
1042 # Check if decompression limit was exceeded
1043 if len(chunk) > self._max_decompress_size:
1044 raise DecompressSizeError(
1045 "Decompressed data exceeds the configured limit of %d bytes"
1046 % self._max_decompress_size
1047 )
1049 if chunk:
1050 self.out.feed_data(chunk)
1052 def feed_eof(self) -> None:
1053 chunk = self.decompressor.flush()
1055 if chunk or self.size > 0:
1056 self.out.feed_data(chunk)
1057 # decompressor is not brotli unless encoding is "br"
1058 if self.encoding == "deflate" and not self.decompressor.eof: # type: ignore[union-attr]
1059 raise ContentEncodingError("deflate")
1061 self.out.feed_eof()
1063 def begin_http_chunk_receiving(self) -> None:
1064 self.out.begin_http_chunk_receiving()
1066 def end_http_chunk_receiving(self) -> None:
1067 self.out.end_http_chunk_receiving()
1070HttpRequestParserPy = HttpRequestParser
1071HttpResponseParserPy = HttpResponseParser
1072RawRequestMessagePy = RawRequestMessage
1073RawResponseMessagePy = RawResponseMessage
1075with suppress(ImportError):
1076 if not NO_EXTENSIONS:
1077 from ._http_parser import ( # type: ignore[import-not-found,no-redef]
1078 HttpRequestParser,
1079 HttpResponseParser,
1080 RawRequestMessage,
1081 RawResponseMessage,
1082 )
1084 HttpRequestParserC = HttpRequestParser
1085 HttpResponseParserC = HttpResponseParser
1086 RawRequestMessageC = RawRequestMessage
1087 RawResponseMessageC = RawResponseMessage