Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_parser.py: 83%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import asyncio
3import re
4import string
5import sys
6from contextlib import suppress
7from enum import IntEnum
8from re import Pattern
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 ClassVar,
13 Final,
14 Generic,
15 Literal,
16 NamedTuple,
17 TypeVar,
18)
20from multidict import CIMultiDict, istr
21from yarl import URL
23from . import hdrs
24from .base_protocol import BaseProtocol
25from .compression_utils import (
26 HAS_BROTLI,
27 HAS_ZSTD,
28 BrotliDecompressor,
29 ZLibDecompressor,
30 ZSTDDecompressor,
31)
32from .helpers import (
33 _EXC_SENTINEL,
34 DEBUG,
35 DEFAULT_CHUNK_SIZE,
36 EMPTY_BODY_METHODS,
37 EMPTY_BODY_STATUS_CODES,
38 NO_EXTENSIONS,
39 BaseTimerContext,
40 HeadersDictProxy,
41 set_exception,
42)
43from .http_exceptions import (
44 BadHttpMessage,
45 BadHttpMethod,
46 BadStatusLine,
47 ContentEncodingError,
48 ContentLengthError,
49 InvalidHeader,
50 InvalidURLError,
51 LineTooLong,
52 TransferEncodingError,
53)
54from .http_writer import HttpVersion, HttpVersion10, HttpVersion11
55from .streams import EMPTY_PAYLOAD, StreamReader
56from .typedefs import RawHeaders
58if TYPE_CHECKING:
59 from .client_proto import ResponseHandler
61__all__ = (
62 "HeadersParser",
63 "HttpParser",
64 "HttpRequestParser",
65 "HttpResponseParser",
66 "RawRequestMessage",
67 "RawResponseMessage",
68)
70_T = TypeVar("_T")
72_SEP = Literal[b"\r\n", b"\n"]
74ASCIISET: Final[set[str]] = set(string.printable)
76# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
77# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
78#
79# method = token
80# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
81# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
82# token = 1*tchar
83_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
84TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
85# https://www.rfc-editor.org/rfc/rfc9110#section-5.5-5
86_FIELD_VALUE_FORBIDDEN_CTL_RE: Final[Pattern[str]] = re.compile(
87 r"[\x00-\x08\x0a-\x1f\x7f]"
88)
89VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
90DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
91HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
93# RFC 9110 singleton headers — duplicates are rejected in strict mode.
94# In lax mode (response parser default), the check is skipped entirely
95# since real-world servers (e.g. Google APIs, Werkzeug) commonly send
96# duplicate headers like Content-Type or Server.
97# Lowercased for case-insensitive matching against wire names.
98SINGLETON_HEADERS: Final[frozenset[str]] = frozenset(
99 {
100 "content-length",
101 "content-location",
102 "content-range",
103 "content-type",
104 "etag",
105 "host",
106 "max-forwards",
107 "server",
108 "transfer-encoding",
109 "user-agent",
110 }
111)
114class RawRequestMessage(NamedTuple):
115 method: str
116 path: str
117 version: HttpVersion
118 headers: HeadersDictProxy
119 raw_headers: RawHeaders
120 should_close: bool
121 compression: str | None
122 upgrade: bool
123 chunked: bool
124 url: URL
127class RawResponseMessage(NamedTuple):
128 version: HttpVersion
129 code: int
130 reason: str
131 headers: HeadersDictProxy
132 raw_headers: RawHeaders
133 should_close: bool
134 compression: str | None
135 upgrade: bool
136 chunked: bool
139_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
142class PayloadState(IntEnum):
143 PAYLOAD_COMPLETE = 0
144 PAYLOAD_NEEDS_INPUT = 1
145 PAYLOAD_HAS_PENDING_INPUT = 2
148class ParseState(IntEnum):
149 PARSE_NONE = 0
150 PARSE_LENGTH = 1
151 PARSE_CHUNKED = 2
152 PARSE_UNTIL_EOF = 3
155class ChunkState(IntEnum):
156 PARSE_CHUNKED_SIZE = 0
157 PARSE_CHUNKED_CHUNK = 1
158 PARSE_CHUNKED_CHUNK_EOF = 2
159 PARSE_TRAILERS = 4
162class HeadersParser:
163 def __init__(self, max_field_size: int = 8190, lax: bool = False) -> None:
164 self.max_field_size = max_field_size
165 self._lax = lax
167 def parse_headers(self, lines: list[bytes]) -> tuple[HeadersDictProxy, RawHeaders]:
168 headers: CIMultiDict[str] = CIMultiDict()
169 # note: "raw" does not mean inclusion of OWS before/after the field value
170 raw_headers = []
172 lines_idx = 0
173 line = lines[lines_idx]
174 line_count = len(lines)
176 while line:
177 # Parse initial header name : value pair.
178 try:
179 bname, bvalue = line.split(b":", 1)
180 except ValueError:
181 raise InvalidHeader(line) from None
183 if len(bname) == 0:
184 raise InvalidHeader(bname)
186 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
187 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
188 raise InvalidHeader(line)
190 bvalue = bvalue.lstrip(b" \t")
191 name = bname.decode("utf-8", "surrogateescape")
192 if not TOKENRE.fullmatch(name):
193 raise InvalidHeader(bname)
195 # next line
196 lines_idx += 1
197 line = lines[lines_idx]
199 # consume continuation lines
200 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
202 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
203 if continuation:
204 header_length = len(bvalue)
205 bvalue_lst = [bvalue]
206 while continuation:
207 header_length += len(line)
208 if header_length > self.max_field_size:
209 header_line = bname + b": " + b"".join(bvalue_lst)
210 raise LineTooLong(
211 header_line[:100] + b"...", self.max_field_size
212 )
213 bvalue_lst.append(line)
215 # next line
216 lines_idx += 1
217 if lines_idx < line_count:
218 line = lines[lines_idx]
219 if line:
220 continuation = line[0] in (32, 9) # (' ', '\t')
221 else:
222 line = b""
223 break
224 bvalue = b"".join(bvalue_lst)
226 bvalue = bvalue.strip(b" \t")
227 value = bvalue.decode("utf-8", "surrogateescape")
229 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
230 if self._lax:
231 if "\n" in value or "\r" in value or "\x00" in value:
232 raise InvalidHeader(bvalue)
233 elif _FIELD_VALUE_FORBIDDEN_CTL_RE.search(value):
234 raise InvalidHeader(bvalue)
236 if not self._lax and name in headers and name.lower() in SINGLETON_HEADERS:
237 raise BadHttpMessage(f"Duplicate '{name}' header found.")
238 headers.add(name, value)
239 raw_headers.append((bname, bvalue))
241 return (HeadersDictProxy(headers), tuple(raw_headers))
244def _is_supported_upgrade(headers: HeadersDictProxy) -> bool:
245 """Check if the upgrade header is supported."""
246 u = headers.get(hdrs.UPGRADE, "")
247 # .lower() can transform non-ascii characters.
248 return u.isascii() and u.lower() in {"tcp", "websocket"}
251class HttpParser(abc.ABC, Generic[_MsgT]):
252 lax: ClassVar[bool] = False
254 def __init__(
255 self,
256 protocol: BaseProtocol,
257 loop: asyncio.AbstractEventLoop,
258 limit: int,
259 max_line_size: int = 8190,
260 max_headers: int = 128,
261 max_field_size: int = 8190,
262 timer: BaseTimerContext | None = None,
263 code: int | None = None,
264 method: str | None = None,
265 payload_exception: type[BaseException] | None = None,
266 response_with_body: bool = True,
267 read_until_eof: bool = False,
268 auto_decompress: bool = True,
269 ) -> None:
270 self.protocol = protocol
271 self.loop = loop
272 self.max_line_size = max_line_size
273 self.max_field_size = max_field_size
274 self.max_headers = max_headers
275 self.timer = timer
276 self.code = code
277 self.method = method
278 self.payload_exception = payload_exception
279 self.response_with_body = response_with_body
280 self.read_until_eof = read_until_eof
282 self._lines: list[bytes] = []
283 self._tail = b""
284 self._upgraded = False
285 self._payload = None
286 self._payload_parser: HttpPayloadParser | None = None
287 self._payload_has_more_data = False
288 self._auto_decompress = auto_decompress
289 self._limit = limit
290 self._headers_parser = HeadersParser(max_field_size, self.lax)
292 @abc.abstractmethod
293 def parse_message(self, lines: list[bytes]) -> _MsgT: ...
295 @abc.abstractmethod
296 def _is_chunked_te(self, te: str) -> bool: ...
298 def pause_reading(self) -> None:
299 assert self._payload_parser is not None
300 self._payload_parser.pause_reading()
302 def feed_eof(self) -> _MsgT | None:
303 if self._payload_parser is not None:
304 self._payload_parser.feed_eof()
305 if self._payload_parser.done:
306 self._payload_parser = None
307 else:
308 # try to extract partial message
309 if self._tail:
310 self._lines.append(self._tail)
312 if self._lines:
313 if self._lines[-1] != "\r\n":
314 self._lines.append(b"")
315 with suppress(Exception):
316 return self.parse_message(self._lines)
317 return None
319 def feed_data(
320 self,
321 data: bytes,
322 SEP: _SEP = b"\r\n",
323 EMPTY: bytes = b"",
324 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
325 METH_CONNECT: str = hdrs.METH_CONNECT,
326 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
327 ) -> tuple[list[tuple[_MsgT, StreamReader]], bool, bytes]:
328 messages = []
330 if self._tail:
331 data, self._tail = self._tail + data, b""
333 data_len = len(data)
334 start_pos = 0
335 loop = self.loop
336 max_line_length = self.max_line_size
338 should_close = False
339 while start_pos < data_len or self._payload_has_more_data:
340 # read HTTP message (request/response line + headers), \r\n\r\n
341 # and split by lines
342 if self._payload_parser is None and not self._upgraded:
343 pos = data.find(SEP, start_pos)
344 # consume \r\n
345 if pos == start_pos and not self._lines:
346 start_pos = pos + len(SEP)
347 continue
349 if pos >= start_pos:
350 if should_close:
351 raise BadHttpMessage("Data after `Connection: close`")
353 # line found
354 line = data[start_pos:pos]
355 if SEP == b"\n": # For lax response parsing
356 line = line.rstrip(b"\r")
357 if len(line) > max_line_length:
358 raise LineTooLong(line[:100] + b"...", max_line_length)
360 self._lines.append(line)
361 # After processing the status/request line, everything is a header.
362 max_line_length = self.max_field_size
364 if len(self._lines) > self.max_headers:
365 raise BadHttpMessage("Too many headers received")
367 start_pos = pos + len(SEP)
369 # \r\n\r\n found
370 if self._lines[-1] == EMPTY:
371 max_trailers = self.max_headers - len(self._lines)
372 try:
373 msg: _MsgT = self.parse_message(self._lines)
374 finally:
375 self._lines.clear()
377 def get_content_length() -> int | None:
378 # payload length
379 length_hdr = msg.headers.get(CONTENT_LENGTH)
380 if length_hdr is None:
381 return None
383 # Shouldn't allow +/- or other number formats.
384 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
385 # msg.headers is already stripped of leading/trailing wsp
386 if not DIGITS.fullmatch(length_hdr):
387 raise InvalidHeader(CONTENT_LENGTH)
389 return int(length_hdr)
391 length = get_content_length()
392 # do not support old websocket spec
393 if SEC_WEBSOCKET_KEY1 in msg.headers:
394 raise InvalidHeader(SEC_WEBSOCKET_KEY1)
396 self._upgraded = msg.upgrade and _is_supported_upgrade(
397 msg.headers
398 )
400 method = getattr(msg, "method", self.method)
401 # code is only present on responses
402 code = getattr(msg, "code", 0)
404 assert self.protocol is not None
405 # calculate payload
406 empty_body = code in EMPTY_BODY_STATUS_CODES or bool(
407 method and method in EMPTY_BODY_METHODS
408 )
409 if not empty_body and (
410 ((length is not None and length > 0) or msg.chunked)
411 and not self._upgraded
412 ):
413 payload = StreamReader(
414 self.protocol,
415 timer=self.timer,
416 loop=loop,
417 limit=self._limit,
418 )
419 payload_parser = HttpPayloadParser(
420 payload,
421 length=length,
422 chunked=msg.chunked,
423 method=method,
424 compression=msg.compression,
425 code=self.code,
426 response_with_body=self.response_with_body,
427 auto_decompress=self._auto_decompress,
428 lax=self.lax,
429 headers_parser=self._headers_parser,
430 max_line_size=self.max_line_size,
431 max_field_size=self.max_field_size,
432 max_trailers=max_trailers,
433 limit=self._limit,
434 )
435 if not payload_parser.done:
436 self._payload_parser = payload_parser
437 elif method == METH_CONNECT:
438 assert isinstance(msg, RawRequestMessage)
439 payload = StreamReader(
440 self.protocol,
441 timer=self.timer,
442 loop=loop,
443 limit=self._limit,
444 )
445 self._upgraded = True
446 self._payload_parser = HttpPayloadParser(
447 payload,
448 method=msg.method,
449 compression=msg.compression,
450 auto_decompress=self._auto_decompress,
451 lax=self.lax,
452 headers_parser=self._headers_parser,
453 max_line_size=self.max_line_size,
454 max_field_size=self.max_field_size,
455 max_trailers=max_trailers,
456 limit=self._limit,
457 )
458 elif not empty_body and length is None and self.read_until_eof:
459 payload = StreamReader(
460 self.protocol,
461 timer=self.timer,
462 loop=loop,
463 limit=self._limit,
464 )
465 payload_parser = HttpPayloadParser(
466 payload,
467 length=length,
468 chunked=msg.chunked,
469 method=method,
470 compression=msg.compression,
471 code=self.code,
472 response_with_body=self.response_with_body,
473 auto_decompress=self._auto_decompress,
474 lax=self.lax,
475 headers_parser=self._headers_parser,
476 max_line_size=self.max_line_size,
477 max_field_size=self.max_field_size,
478 max_trailers=max_trailers,
479 limit=self._limit,
480 )
481 if not payload_parser.done:
482 self._payload_parser = payload_parser
483 else:
484 payload = EMPTY_PAYLOAD
486 messages.append((msg, payload))
487 should_close = msg.should_close
488 else:
489 self._tail = data[start_pos:]
490 if len(self._tail) > self.max_line_size:
491 raise LineTooLong(self._tail[:100] + b"...", self.max_line_size)
492 data = EMPTY
493 break
495 # no parser, just store
496 elif self._payload_parser is None and self._upgraded:
497 assert not self._lines
498 break
500 # feed payload
501 else:
502 assert not self._lines
503 assert self._payload_parser is not None
504 try:
505 payload_state, data = self._payload_parser.feed_data(
506 data[start_pos:], SEP
507 )
508 except Exception as underlying_exc:
509 reraised_exc: BaseException = underlying_exc
510 if self.payload_exception is not None:
511 reraised_exc = self.payload_exception(str(underlying_exc))
513 set_exception(
514 self._payload_parser.payload,
515 reraised_exc,
516 underlying_exc,
517 )
519 payload_state = PayloadState.PAYLOAD_COMPLETE
520 data = b""
521 if isinstance(
522 underlying_exc, (InvalidHeader, TransferEncodingError)
523 ):
524 raise
526 self._payload_has_more_data = (
527 payload_state == PayloadState.PAYLOAD_HAS_PENDING_INPUT
528 )
530 if payload_state is not PayloadState.PAYLOAD_COMPLETE:
531 # We've either consumed all available data, or we're pausing
532 # until the reader buffer is freed up.
533 break
535 start_pos = 0
536 data_len = len(data)
537 self._payload_parser = None
539 if data and start_pos < data_len:
540 data = data[start_pos:]
541 else:
542 data = EMPTY
544 return messages, self._upgraded, data
546 def parse_headers(
547 self, lines: list[bytes]
548 ) -> tuple[HeadersDictProxy, RawHeaders, bool | None, str | None, bool, bool]:
549 """Parses RFC 5322 headers from a stream.
551 Line continuations are supported. Returns list of header name
552 and value pairs. Header name is in upper case.
553 """
554 headers, raw_headers = self._headers_parser.parse_headers(lines)
555 close_conn = None
556 encoding = None
557 upgrade = False
558 chunked = False
560 # keep-alive and protocol switching
561 # RFC 9110 section 7.6.1 defines Connection as a comma-separated list.
562 # We use a simple comma split here rather than getall() for performance,
563 # as the target tokens (close, keep-alive, upgrade) are simple ASCII
564 # values that never contain commas.
565 conn_values = headers.get(hdrs.CONNECTION)
566 if conn_values:
567 conn_tokens = {
568 token.lower()
569 for token in (part.strip(" \t") for part in conn_values.split(","))
570 if token and token.isascii()
571 }
573 if "close" in conn_tokens:
574 close_conn = True
575 elif "keep-alive" in conn_tokens:
576 close_conn = False
578 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
579 if "upgrade" in conn_tokens and headers.get(hdrs.UPGRADE):
580 upgrade = True
582 # encoding
583 enc = headers.get(hdrs.CONTENT_ENCODING, "")
584 if enc.isascii() and enc.lower() in {"gzip", "deflate", "br", "zstd"}:
585 encoding = enc
587 # chunking
588 te = headers.get(hdrs.TRANSFER_ENCODING)
589 if te is not None:
590 if self._is_chunked_te(te):
591 chunked = True
593 if hdrs.CONTENT_LENGTH in headers:
594 raise BadHttpMessage(
595 "Transfer-Encoding can't be present with Content-Length",
596 )
598 return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
600 def set_upgraded(self, val: bool) -> None:
601 """Set connection upgraded (to websocket) mode.
603 :param bool val: new state.
604 """
605 self._upgraded = val
608class HttpRequestParser(HttpParser[RawRequestMessage]):
609 """Read request status line.
611 Exception .http_exceptions.BadStatusLine
612 could be raised in case of any errors in status line.
613 Returns RawRequestMessage.
614 """
616 def parse_message(self, lines: list[bytes]) -> RawRequestMessage:
617 # request line
618 line = lines[0].decode("utf-8", "surrogateescape")
619 try:
620 method, path, version = line.split(" ", maxsplit=2)
621 except ValueError:
622 raise BadHttpMethod(line) from None
624 # method
625 if not TOKENRE.fullmatch(method):
626 raise BadHttpMethod(method)
628 # version
629 match = VERSRE.fullmatch(version)
630 if match is None:
631 raise BadStatusLine(line)
632 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
634 if method == "CONNECT":
635 # authority-form,
636 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
637 url = URL.build(authority=path, encoded=True)
638 elif path.startswith("/"):
639 # origin-form,
640 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
641 path_part, _hash_separator, url_fragment = path.partition("#")
642 path_part, _question_mark_separator, qs_part = path_part.partition("?")
644 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
645 # NOTE: parser does, otherwise it results into the same
646 # NOTE: HTTP Request-Line input producing different
647 # NOTE: `yarl.URL()` objects
648 url = URL.build(
649 path=path_part,
650 query_string=qs_part,
651 fragment=url_fragment,
652 encoded=True,
653 )
654 elif path == "*" and method == "OPTIONS":
655 # asterisk-form,
656 url = URL(path, encoded=True)
657 else:
658 # absolute-form for proxy maybe,
659 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
660 url = URL(path, encoded=True)
661 if url.scheme == "":
662 # not absolute-form
663 raise InvalidURLError(
664 path.encode(errors="surrogateescape").decode("latin1")
665 )
667 # read headers
668 (
669 headers,
670 raw_headers,
671 close,
672 compression,
673 upgrade,
674 chunked,
675 ) = self.parse_headers(lines[1:])
677 if version_o == HttpVersion11 and hdrs.HOST not in headers:
678 raise BadHttpMessage("Missing 'Host' header in request.")
680 if close is None: # then the headers weren't set in the request
681 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
682 close = True
683 else: # HTTP 1.1 must ask to close.
684 close = False
686 return RawRequestMessage(
687 method,
688 path,
689 version_o,
690 headers,
691 raw_headers,
692 close,
693 compression,
694 upgrade,
695 chunked,
696 url,
697 )
699 def _is_chunked_te(self, te: str) -> bool:
700 # https://www.rfc-editor.org/rfc/rfc9112#section-7.1-3
701 # "A sender MUST NOT apply the chunked transfer coding more
702 # than once to a message body"
703 parts = [p.strip(" \t") for p in te.split(",")]
704 chunked_count = sum(1 for p in parts if p.isascii() and p.lower() == "chunked")
705 if chunked_count > 1:
706 raise BadHttpMessage("Request has duplicate `chunked` Transfer-Encoding")
707 last = parts[-1]
708 # .lower() transforms some non-ascii chars, so must check first.
709 if last.isascii() and last.lower() == "chunked":
710 return True
711 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3
712 raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
715class HttpResponseParser(HttpParser[RawResponseMessage]):
716 """Read response status line and headers.
718 BadStatusLine could be raised in case of any errors in status line.
719 Returns RawResponseMessage.
720 """
722 protocol: "ResponseHandler"
724 # Lax mode should only be enabled on response parser.
725 lax = not DEBUG
727 def feed_data(
728 self,
729 data: bytes,
730 SEP: _SEP | None = None,
731 *args: Any,
732 **kwargs: Any,
733 ) -> tuple[list[tuple[RawResponseMessage, StreamReader]], bool, bytes]:
734 if SEP is None:
735 SEP = b"\r\n" if DEBUG else b"\n"
736 return super().feed_data(data, SEP, *args, **kwargs)
738 def parse_message(self, lines: list[bytes]) -> RawResponseMessage:
739 line = lines[0].decode("utf-8", "surrogateescape")
740 try:
741 version, status = line.split(maxsplit=1)
742 except ValueError:
743 raise BadStatusLine(line) from None
745 try:
746 status, reason = status.split(maxsplit=1)
747 except ValueError:
748 status = status.strip()
749 reason = ""
751 # version
752 match = VERSRE.fullmatch(version)
753 if match is None:
754 raise BadStatusLine(line)
755 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
757 # The status code is a three-digit ASCII number, no padding
758 if len(status) != 3 or not DIGITS.fullmatch(status):
759 raise BadStatusLine(line)
760 status_i = int(status)
762 # read headers
763 (
764 headers,
765 raw_headers,
766 close,
767 compression,
768 upgrade,
769 chunked,
770 ) = self.parse_headers(lines[1:])
772 if close is None:
773 if version_o <= HttpVersion10:
774 close = True
775 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
776 elif 100 <= status_i < 200 or status_i in {204, 304}:
777 close = False
778 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
779 close = False
780 else:
781 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
782 close = True
784 return RawResponseMessage(
785 version_o,
786 status_i,
787 reason.strip(),
788 headers,
789 raw_headers,
790 close,
791 compression,
792 upgrade,
793 chunked,
794 )
796 def _is_chunked_te(self, te: str) -> bool:
797 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2
798 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked"
801class HttpPayloadParser:
802 def __init__(
803 self,
804 payload: StreamReader,
805 length: int | None = None,
806 chunked: bool = False,
807 compression: str | None = None,
808 code: int | None = None,
809 method: str | None = None,
810 response_with_body: bool = True,
811 auto_decompress: bool = True,
812 lax: bool = False,
813 *,
814 headers_parser: HeadersParser,
815 max_line_size: int = 8190,
816 max_field_size: int = 8190,
817 max_trailers: int = 128,
818 limit: int = DEFAULT_CHUNK_SIZE,
819 ) -> None:
820 self._length = 0
821 self._paused = False
822 self._type = ParseState.PARSE_UNTIL_EOF
823 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
824 self._chunk_size = 0
825 self._chunk_tail = b""
826 self._auto_decompress = auto_decompress
827 self._lax = lax
828 self._headers_parser = headers_parser
829 self._max_line_size = max_line_size
830 self._max_field_size = max_field_size
831 self._max_trailers = max_trailers
832 self._more_data_available = False
833 self._trailer_lines: list[bytes] = []
834 self.done = False
835 self._eof_pending = False
837 # payload decompression wrapper
838 if response_with_body and compression and self._auto_decompress:
839 real_payload: StreamReader | DeflateBuffer = DeflateBuffer(
840 payload, compression, max_decompress_size=limit
841 )
842 else:
843 real_payload = payload
845 # payload parser
846 if not response_with_body:
847 # don't parse payload if it's not expected to be received
848 self._type = ParseState.PARSE_NONE
849 real_payload.feed_eof()
850 self.done = True
851 elif chunked:
852 self._type = ParseState.PARSE_CHUNKED
853 elif length is not None:
854 self._type = ParseState.PARSE_LENGTH
855 self._length = length
856 if self._length == 0:
857 real_payload.feed_eof()
858 self.done = True
860 self.payload = real_payload
862 def pause_reading(self) -> None:
863 self._paused = True
865 def feed_eof(self) -> None:
866 if self._type == ParseState.PARSE_UNTIL_EOF:
867 self._eof_pending = True
868 while self._more_data_available:
869 if self._paused:
870 self._paused = False
871 return # Will resume via feed_data(b"") later
872 self._more_data_available = self.payload.feed_data(b"")
873 self.payload.feed_eof()
874 self.done = True
875 self._eof_pending = False
876 elif self._type == ParseState.PARSE_LENGTH:
877 raise ContentLengthError(
878 "Not enough data to satisfy content length header."
879 )
880 elif self._type == ParseState.PARSE_CHUNKED:
881 raise TransferEncodingError(
882 "Not enough data to satisfy transfer length header."
883 )
885 def feed_data(
886 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
887 ) -> tuple[PayloadState, bytes]:
888 """Receive a chunk of data to process.
890 Return:
891 PayloadState - The current state of payload processing.
892 This function may be called with empty bytes after returning
893 PAYLOAD_HAS_PENDING_INPUT to continue processing after a pause.
894 bytes - If payload is complete, this is the unconsumed bytes intended for the
895 next message/payload, b"" otherwise.
896 """
897 # Read specified amount of bytes
898 if self._type == ParseState.PARSE_LENGTH:
899 if self._chunk_tail:
900 chunk = self._chunk_tail + chunk
901 self._chunk_tail = b""
903 required = self._length
904 self._length = max(required - len(chunk), 0)
905 self._more_data_available = self.payload.feed_data(chunk[:required])
906 while self._more_data_available:
907 if self._paused:
908 self._paused = False
909 self._chunk_tail = chunk[required:]
910 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b""
911 self._more_data_available = self.payload.feed_data(b"")
913 if self._length == 0:
914 self.payload.feed_eof()
915 return PayloadState.PAYLOAD_COMPLETE, chunk[required:]
916 # Chunked transfer encoding parser
917 elif self._type == ParseState.PARSE_CHUNKED:
918 if self._chunk_tail:
919 # We should check the length is sane when not processing payload body.
920 if self._chunk != ChunkState.PARSE_CHUNKED_CHUNK:
921 max_line_length = self._max_line_size
922 if self._chunk == ChunkState.PARSE_TRAILERS:
923 max_line_length = self._max_field_size
924 if len(self._chunk_tail) > max_line_length:
925 raise LineTooLong(
926 self._chunk_tail[:100] + b"...", max_line_length
927 )
929 chunk = self._chunk_tail + chunk
930 self._chunk_tail = b""
932 while chunk or self._more_data_available:
933 # read next chunk size
934 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
935 pos = chunk.find(SEP)
936 if pos >= 0:
937 i = chunk.find(CHUNK_EXT, 0, pos)
938 if i >= 0:
939 size_b = chunk[:i] # strip chunk-extensions
940 # Verify no LF in the chunk-extension
941 if b"\n" in (ext := chunk[i:pos]):
942 exc = TransferEncodingError(
943 f"Unexpected LF in chunk-extension: {ext!r}"
944 )
945 set_exception(self.payload, exc)
946 raise exc
947 else:
948 size_b = chunk[:pos]
950 if self._lax: # Allow whitespace in lax mode.
951 size_b = size_b.strip()
953 if not re.fullmatch(HEXDIGITS, size_b):
954 exc = TransferEncodingError(
955 chunk[:pos].decode("ascii", "surrogateescape")
956 )
957 set_exception(self.payload, exc)
958 raise exc
959 size = int(bytes(size_b), 16)
961 chunk = chunk[pos + len(SEP) :]
962 if size == 0: # eof marker
963 self._chunk = ChunkState.PARSE_TRAILERS
964 if self._lax and chunk.startswith(b"\r"):
965 chunk = chunk[1:]
966 else:
967 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
968 self._chunk_size = size
969 self.payload.begin_http_chunk_receiving()
970 else:
971 self._chunk_tail = chunk
972 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
974 # read chunk and feed buffer
975 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
976 if self._paused:
977 self._paused = False
978 self._chunk_tail = chunk
979 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b""
981 required = self._chunk_size
982 self._chunk_size = max(required - len(chunk), 0)
983 self._more_data_available = self.payload.feed_data(chunk[:required])
984 chunk = chunk[required:]
986 if self._more_data_available:
987 continue
989 if self._chunk_size:
990 self._paused = False
991 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
992 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
993 self.payload.end_http_chunk_receiving()
995 # toss the CRLF at the end of the chunk
996 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
997 if self._lax and chunk.startswith(b"\r"):
998 chunk = chunk[1:]
999 if chunk[: len(SEP)] == SEP:
1000 chunk = chunk[len(SEP) :]
1001 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
1002 elif len(chunk) >= len(SEP) or chunk != SEP[: len(chunk)]:
1003 exc = TransferEncodingError(
1004 "Chunk size mismatch: expected CRLF after chunk data"
1005 )
1006 set_exception(self.payload, exc)
1007 raise exc
1008 else:
1009 self._chunk_tail = chunk
1010 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1012 if self._chunk == ChunkState.PARSE_TRAILERS:
1013 pos = chunk.find(SEP)
1014 if pos < 0: # No line found
1015 self._chunk_tail = chunk
1016 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1018 line = chunk[:pos]
1019 chunk = chunk[pos + len(SEP) :]
1020 if SEP == b"\n": # For lax response parsing
1021 line = line.rstrip(b"\r")
1023 if len(line) > self._max_field_size:
1024 raise LineTooLong(line[:100] + b"...", self._max_field_size)
1026 self._trailer_lines.append(line)
1028 if len(self._trailer_lines) > self._max_trailers:
1029 raise BadHttpMessage("Too many trailers received")
1031 # \r\n\r\n found, end of stream
1032 if self._trailer_lines[-1] == b"":
1033 # Headers and trailers are defined the same way,
1034 # so we reuse the HeadersParser here.
1035 try:
1036 trailers, raw_trailers = self._headers_parser.parse_headers(
1037 self._trailer_lines
1038 )
1039 finally:
1040 self._trailer_lines.clear()
1041 self.payload.feed_eof()
1042 return PayloadState.PAYLOAD_COMPLETE, chunk
1044 # Read all bytes until eof
1045 elif self._type == ParseState.PARSE_UNTIL_EOF:
1046 self._more_data_available = self.payload.feed_data(chunk)
1047 while self._more_data_available:
1048 if self._paused:
1049 self._paused = False
1050 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b""
1051 self._more_data_available = self.payload.feed_data(b"")
1053 if self._eof_pending:
1054 self.payload.feed_eof()
1055 self.done = True
1056 self._eof_pending = False
1057 return PayloadState.PAYLOAD_COMPLETE, b""
1059 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1062class DeflateBuffer:
1063 """DeflateStream decompress stream and feed data into specified stream."""
1065 def __init__(
1066 self,
1067 out: StreamReader,
1068 encoding: str | None,
1069 max_decompress_size: int = DEFAULT_CHUNK_SIZE,
1070 ) -> None:
1071 self.out = out
1072 self.size = 0
1073 out.total_compressed_bytes = self.size
1074 self.encoding = encoding
1075 self._started_decoding = False
1077 self.decompressor: BrotliDecompressor | ZLibDecompressor | ZSTDDecompressor
1078 if encoding == "br":
1079 if not HAS_BROTLI:
1080 raise ContentEncodingError(
1081 "Can not decode content-encoding: brotli (br). "
1082 "Please install `Brotli`"
1083 )
1084 self.decompressor = BrotliDecompressor()
1085 elif encoding == "zstd":
1086 if not HAS_ZSTD:
1087 raise ContentEncodingError(
1088 "Can not decode content-encoding: zstandard (zstd). "
1089 "Please install `backports.zstd`"
1090 )
1091 self.decompressor = ZSTDDecompressor()
1092 else:
1093 self.decompressor = ZLibDecompressor(encoding=encoding)
1095 self._max_decompress_size = max_decompress_size
1097 def set_exception(
1098 self,
1099 exc: type[BaseException] | BaseException,
1100 exc_cause: BaseException = _EXC_SENTINEL,
1101 ) -> None:
1102 set_exception(self.out, exc, exc_cause)
1104 def feed_data(self, chunk: bytes) -> bool:
1105 """Return True if more data is available and this method should be called again with b""."""
1106 self.size += len(chunk)
1107 self.out.total_compressed_bytes = self.size
1109 # RFC1950
1110 # bits 0..3 = CM = 0b1000 = 8 = "deflate"
1111 # bits 4..7 = CINFO = 1..7 = windows size.
1112 if (
1113 not self._started_decoding
1114 and self.encoding == "deflate"
1115 and chunk[0] & 0xF != 8
1116 ):
1117 # Change the decoder to decompress incorrectly compressed data
1118 # Actually we should issue a warning about non-RFC-compliant data.
1119 self.decompressor = ZLibDecompressor(
1120 encoding=self.encoding, suppress_deflate_header=True
1121 )
1123 low_water = self.out._low_water
1124 max_length = (
1125 0 if low_water >= sys.maxsize else max(self._max_decompress_size, low_water)
1126 )
1127 try:
1128 chunk = self.decompressor.decompress_sync(chunk, max_length=max_length)
1129 except Exception:
1130 raise ContentEncodingError(
1131 "Can not decode content-encoding: %s" % self.encoding
1132 )
1134 self._started_decoding = True
1136 if chunk:
1137 self.out.feed_data(chunk)
1138 return self.decompressor.data_available
1140 def feed_eof(self) -> None:
1141 chunk = self.decompressor.flush()
1142 # This should never contain data as we defer the call until exhausting
1143 # the decompression. If .flush() is returning data, this may indicate a
1144 # zip bomb vulnerability as it will decompress all remaining data at once.
1145 assert not chunk
1147 if self.size > 0:
1148 # decompressor is not brotli unless encoding is "br"
1149 if self.encoding == "deflate" and not self.decompressor.eof: # type: ignore[union-attr]
1150 raise ContentEncodingError("deflate")
1152 self.out.feed_eof()
1154 def begin_http_chunk_receiving(self) -> None:
1155 self.out.begin_http_chunk_receiving()
1157 def end_http_chunk_receiving(self) -> None:
1158 self.out.end_http_chunk_receiving()
1161HttpRequestParserPy = HttpRequestParser
1162HttpResponseParserPy = HttpResponseParser
1163RawRequestMessagePy = RawRequestMessage
1164RawResponseMessagePy = RawResponseMessage
1166with suppress(ImportError):
1167 if not NO_EXTENSIONS:
1168 from ._http_parser import ( # type: ignore[import-not-found,no-redef]
1169 HttpRequestParser,
1170 HttpResponseParser,
1171 RawRequestMessage,
1172 RawResponseMessage,
1173 )
1175 HttpRequestParserC = HttpRequestParser
1176 HttpResponseParserC = HttpResponseParser
1177 RawRequestMessageC = RawRequestMessage
1178 RawResponseMessageC = RawResponseMessage