Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_parser.py: 81%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import asyncio
3import re
4import string
5import sys
6from contextlib import suppress
7from enum import IntEnum
8from re import Pattern
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 ClassVar,
13 Final,
14 Generic,
15 Literal,
16 NamedTuple,
17 TypeVar,
18)
20from multidict import CIMultiDict, istr
21from yarl import URL
23from . import hdrs
24from .base_protocol import BaseProtocol
25from .compression_utils import (
26 HAS_BROTLI,
27 HAS_ZSTD,
28 BrotliDecompressor,
29 ZLibDecompressor,
30 ZSTDDecompressor,
31)
32from .helpers import (
33 _EXC_SENTINEL,
34 DEBUG,
35 DEFAULT_CHUNK_SIZE,
36 EMPTY_BODY_METHODS,
37 EMPTY_BODY_STATUS_CODES,
38 NO_EXTENSIONS,
39 BaseTimerContext,
40 HeadersDictProxy,
41 set_exception,
42)
43from .http_exceptions import (
44 BadHttpMessage,
45 BadHttpMethod,
46 BadStatusLine,
47 ContentEncodingError,
48 ContentLengthError,
49 InvalidHeader,
50 InvalidURLError,
51 LineTooLong,
52 TransferEncodingError,
53)
54from .http_writer import HttpVersion, HttpVersion10, HttpVersion11
55from .streams import EMPTY_PAYLOAD, StreamReader
56from .typedefs import RawHeaders
58if TYPE_CHECKING:
59 from .client_proto import ResponseHandler
61__all__ = (
62 "HeadersParser",
63 "HttpParser",
64 "HttpRequestParser",
65 "HttpResponseParser",
66 "RawRequestMessage",
67 "RawResponseMessage",
68)
70_T = TypeVar("_T")
72_SEP = Literal[b"\r\n", b"\n"]
74ASCIISET: Final[set[str]] = set(string.printable)
76# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
77# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
78#
79# method = token
80# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
81# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
82# token = 1*tchar
83_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
84TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
85# https://www.rfc-editor.org/rfc/rfc9110#section-5.5-5
86_FIELD_VALUE_FORBIDDEN_CTL_RE: Final[Pattern[str]] = re.compile(
87 r"[\x00-\x08\x0a-\x1f\x7f]"
88)
89VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
90DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
91HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
93# RFC 9110 singleton headers — duplicates are rejected in strict mode.
94# In lax mode (response parser default), the check is skipped entirely
95# since real-world servers (e.g. Google APIs, Werkzeug) commonly send
96# duplicate headers like Content-Type or Server.
97# Lowercased for case-insensitive matching against wire names.
98SINGLETON_HEADERS: Final[frozenset[str]] = frozenset(
99 {
100 "content-length",
101 "content-location",
102 "content-range",
103 "content-type",
104 "etag",
105 "host",
106 "max-forwards",
107 "server",
108 "transfer-encoding",
109 "user-agent",
110 }
111)
114class RawRequestMessage(NamedTuple):
115 method: str
116 path: str
117 version: HttpVersion
118 headers: HeadersDictProxy
119 raw_headers: RawHeaders
120 should_close: bool
121 compression: str | None
122 upgrade: bool
123 chunked: bool
124 url: URL
127class RawResponseMessage(NamedTuple):
128 version: HttpVersion
129 code: int
130 reason: str
131 headers: HeadersDictProxy
132 raw_headers: RawHeaders
133 should_close: bool
134 compression: str | None
135 upgrade: bool
136 chunked: bool
139_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
142class PayloadState(IntEnum):
143 PAYLOAD_COMPLETE = 0
144 PAYLOAD_NEEDS_INPUT = 1
145 PAYLOAD_HAS_PENDING_INPUT = 2
148class ParseState(IntEnum):
149 PARSE_NONE = 0
150 PARSE_LENGTH = 1
151 PARSE_CHUNKED = 2
152 PARSE_UNTIL_EOF = 3
155class ChunkState(IntEnum):
156 PARSE_CHUNKED_SIZE = 0
157 PARSE_CHUNKED_CHUNK = 1
158 PARSE_CHUNKED_CHUNK_EOF = 2
159 PARSE_TRAILERS = 4
162class HeadersParser:
163 def __init__(self, max_field_size: int = 8190, lax: bool = False) -> None:
164 self.max_field_size = max_field_size
165 self._lax = lax
167 def parse_headers(self, lines: list[bytes]) -> tuple[HeadersDictProxy, RawHeaders]:
168 headers: CIMultiDict[str] = CIMultiDict()
169 # note: "raw" does not mean inclusion of OWS before/after the field value
170 raw_headers = []
172 lines_idx = 0
173 line = lines[lines_idx]
174 line_count = len(lines)
176 while line:
177 # Parse initial header name : value pair.
178 try:
179 bname, bvalue = line.split(b":", 1)
180 except ValueError:
181 raise InvalidHeader(line) from None
183 if len(bname) == 0:
184 raise InvalidHeader(bname)
186 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
187 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
188 raise InvalidHeader(line)
190 bvalue = bvalue.lstrip(b" \t")
191 name = bname.decode("utf-8", "surrogateescape")
192 if not TOKENRE.fullmatch(name):
193 raise InvalidHeader(bname)
195 # next line
196 lines_idx += 1
197 line = lines[lines_idx]
199 # consume continuation lines
200 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
202 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
203 if continuation:
204 header_length = len(bvalue)
205 bvalue_lst = [bvalue]
206 while continuation:
207 header_length += len(line)
208 if header_length > self.max_field_size:
209 header_line = bname + b": " + b"".join(bvalue_lst)
210 raise LineTooLong(
211 header_line[:100] + b"...", self.max_field_size
212 )
213 bvalue_lst.append(line)
215 # next line
216 lines_idx += 1
217 if lines_idx < line_count:
218 line = lines[lines_idx]
219 if line:
220 continuation = line[0] in (32, 9) # (' ', '\t')
221 else:
222 line = b""
223 break
224 bvalue = b"".join(bvalue_lst)
226 bvalue = bvalue.strip(b" \t")
227 value = bvalue.decode("utf-8", "surrogateescape")
229 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
230 if self._lax:
231 if "\n" in value or "\r" in value or "\x00" in value:
232 raise InvalidHeader(bvalue)
233 elif _FIELD_VALUE_FORBIDDEN_CTL_RE.search(value):
234 raise InvalidHeader(bvalue)
236 if not self._lax and name in headers and name.lower() in SINGLETON_HEADERS:
237 raise BadHttpMessage(f"Duplicate '{name}' header found.")
238 headers.add(name, value)
239 raw_headers.append((bname, bvalue))
241 return (HeadersDictProxy(headers), tuple(raw_headers))
244def _is_supported_upgrade(headers: HeadersDictProxy) -> bool:
245 """Check if the upgrade header is supported."""
246 u = headers.get(hdrs.UPGRADE, "")
247 # .lower() can transform non-ascii characters.
248 return u.isascii() and u.lower() in {"tcp", "websocket"}
251class HttpParser(abc.ABC, Generic[_MsgT]):
252 lax: ClassVar[bool] = False
254 def __init__(
255 self,
256 protocol: BaseProtocol,
257 loop: asyncio.AbstractEventLoop,
258 limit: int,
259 max_line_size: int = 8190,
260 max_headers: int = 128,
261 max_field_size: int = 8190,
262 timer: BaseTimerContext | None = None,
263 code: int | None = None,
264 method: str | None = None,
265 payload_exception: type[BaseException] | None = None,
266 response_with_body: bool = True,
267 read_until_eof: bool = False,
268 auto_decompress: bool = True,
269 max_msg_queue_size: int = 0,
270 ) -> None:
271 self.protocol = protocol
272 self.loop = loop
273 self.max_line_size = max_line_size
274 self.max_field_size = max_field_size
275 self.max_headers = max_headers
276 self.timer = timer
277 self.code = code
278 self.method = method
279 self.payload_exception = payload_exception
280 self.response_with_body = response_with_body
281 self.read_until_eof = read_until_eof
283 self._lines: list[bytes] = []
284 self._tail = b""
285 self._upgraded = False
286 self._payload = None
287 self._payload_parser: HttpPayloadParser | None = None
288 self._payload_has_more_data = False
289 self._auto_decompress = auto_decompress
290 self._limit = limit
291 self._headers_parser = HeadersParser(max_field_size, self.lax)
292 # Stop emitting messages once this many are queued unconsumed (0 = off).
293 self._max_msg_queue_size = max_msg_queue_size
294 self._msg_in_flight = 0
296 @abc.abstractmethod
297 def parse_message(self, lines: list[bytes]) -> _MsgT: ...
299 @abc.abstractmethod
300 def _is_chunked_te(self, te: str) -> bool: ...
302 def pause_reading(self) -> None:
303 assert self._payload_parser is not None
304 self._payload_parser.pause_reading()
306 def message_consumed(self) -> None:
307 """Protocol drained a queued message; free a slot for parsing."""
308 if self._msg_in_flight > 0:
309 self._msg_in_flight -= 1
311 def feed_eof(self) -> _MsgT | None:
312 if self._payload_parser is not None:
313 self._payload_parser.feed_eof()
314 if self._payload_parser.done:
315 self._payload_parser = None
316 else:
317 # try to extract partial message
318 if self._tail:
319 self._lines.append(self._tail)
321 if self._lines:
322 if self._lines[-1] != "\r\n":
323 self._lines.append(b"")
324 with suppress(Exception):
325 return self.parse_message(self._lines)
326 return None
328 def feed_data(
329 self,
330 data: bytes,
331 SEP: _SEP = b"\r\n",
332 EMPTY: bytes = b"",
333 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
334 METH_CONNECT: str = hdrs.METH_CONNECT,
335 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
336 ) -> tuple[list[tuple[_MsgT, StreamReader]], bool, bytes]:
337 messages = []
339 if self._tail:
340 data, self._tail = self._tail + data, b""
342 data_len = len(data)
343 start_pos = 0
344 loop = self.loop
345 max_line_length = self.max_line_size
347 should_close = False
348 while start_pos < data_len or self._payload_has_more_data:
349 # read HTTP message (request/response line + headers), \r\n\r\n
350 # and split by lines
351 if self._payload_parser is None and not self._upgraded:
352 if (
353 self._max_msg_queue_size
354 and self._msg_in_flight >= self._max_msg_queue_size
355 ):
356 # Queue full: buffer the rest and stop. Safe pause point;
357 # any preceding body is consumed before the next request
358 # line. Resumes via feed_data(b"") when the queue drains.
359 self._tail = data[start_pos:]
360 break
361 pos = data.find(SEP, start_pos)
362 # consume \r\n
363 if pos == start_pos and not self._lines:
364 start_pos = pos + len(SEP)
365 continue
367 if pos >= start_pos:
368 if should_close:
369 raise BadHttpMessage("Data after `Connection: close`")
371 # line found
372 line = data[start_pos:pos]
373 if SEP == b"\n": # For lax response parsing
374 line = line.rstrip(b"\r")
375 if len(line) > max_line_length:
376 raise LineTooLong(line[:100] + b"...", max_line_length)
378 self._lines.append(line)
379 # After processing the status/request line, everything is a header.
380 max_line_length = self.max_field_size
382 if len(self._lines) > self.max_headers:
383 raise BadHttpMessage("Too many headers received")
385 start_pos = pos + len(SEP)
387 # \r\n\r\n found
388 if self._lines[-1] == EMPTY:
389 max_trailers = self.max_headers - len(self._lines)
390 try:
391 msg: _MsgT = self.parse_message(self._lines)
392 finally:
393 self._lines.clear()
395 def get_content_length() -> int | None:
396 # payload length
397 length_hdr = msg.headers.get(CONTENT_LENGTH)
398 if length_hdr is None:
399 return None
401 # Shouldn't allow +/- or other number formats.
402 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
403 # msg.headers is already stripped of leading/trailing wsp
404 if not DIGITS.fullmatch(length_hdr):
405 raise InvalidHeader(CONTENT_LENGTH)
407 return int(length_hdr)
409 length = get_content_length()
410 # do not support old websocket spec
411 if SEC_WEBSOCKET_KEY1 in msg.headers:
412 raise InvalidHeader(SEC_WEBSOCKET_KEY1)
414 self._upgraded = msg.upgrade and _is_supported_upgrade(
415 msg.headers
416 )
418 method = getattr(msg, "method", self.method)
419 # code is only present on responses
420 code = getattr(msg, "code", 0)
422 assert self.protocol is not None
423 # calculate payload
424 empty_body = code in EMPTY_BODY_STATUS_CODES or bool(
425 method and method in EMPTY_BODY_METHODS
426 )
427 if not empty_body and (
428 ((length is not None and length > 0) or msg.chunked)
429 and not self._upgraded
430 ):
431 payload = StreamReader(
432 self.protocol,
433 timer=self.timer,
434 loop=loop,
435 limit=self._limit,
436 )
437 payload_parser = HttpPayloadParser(
438 payload,
439 length=length,
440 chunked=msg.chunked,
441 method=method,
442 compression=msg.compression,
443 code=self.code,
444 response_with_body=self.response_with_body,
445 auto_decompress=self._auto_decompress,
446 lax=self.lax,
447 headers_parser=self._headers_parser,
448 max_line_size=self.max_line_size,
449 max_field_size=self.max_field_size,
450 max_trailers=max_trailers,
451 limit=self._limit,
452 )
453 if not payload_parser.done:
454 self._payload_parser = payload_parser
455 elif method == METH_CONNECT:
456 assert isinstance(msg, RawRequestMessage)
457 payload = StreamReader(
458 self.protocol,
459 timer=self.timer,
460 loop=loop,
461 limit=self._limit,
462 )
463 self._upgraded = True
464 self._payload_parser = HttpPayloadParser(
465 payload,
466 method=msg.method,
467 compression=msg.compression,
468 auto_decompress=self._auto_decompress,
469 lax=self.lax,
470 headers_parser=self._headers_parser,
471 max_line_size=self.max_line_size,
472 max_field_size=self.max_field_size,
473 max_trailers=max_trailers,
474 limit=self._limit,
475 )
476 elif not empty_body and length is None and self.read_until_eof:
477 payload = StreamReader(
478 self.protocol,
479 timer=self.timer,
480 loop=loop,
481 limit=self._limit,
482 )
483 payload_parser = HttpPayloadParser(
484 payload,
485 length=length,
486 chunked=msg.chunked,
487 method=method,
488 compression=msg.compression,
489 code=self.code,
490 response_with_body=self.response_with_body,
491 auto_decompress=self._auto_decompress,
492 lax=self.lax,
493 headers_parser=self._headers_parser,
494 max_line_size=self.max_line_size,
495 max_field_size=self.max_field_size,
496 max_trailers=max_trailers,
497 limit=self._limit,
498 )
499 if not payload_parser.done:
500 self._payload_parser = payload_parser
501 else:
502 payload = EMPTY_PAYLOAD
504 messages.append((msg, payload))
505 if self._max_msg_queue_size:
506 self._msg_in_flight += 1
507 should_close = msg.should_close
508 else:
509 self._tail = data[start_pos:]
510 if len(self._tail) > self.max_line_size:
511 raise LineTooLong(self._tail[:100] + b"...", self.max_line_size)
512 data = EMPTY
513 break
515 # no parser, just store
516 elif self._payload_parser is None and self._upgraded:
517 assert not self._lines
518 break
520 # feed payload
521 else:
522 assert not self._lines
523 assert self._payload_parser is not None
524 try:
525 payload_state, data = self._payload_parser.feed_data(
526 data[start_pos:], SEP
527 )
528 except Exception as underlying_exc:
529 reraised_exc: BaseException = underlying_exc
530 if self.payload_exception is not None:
531 reraised_exc = self.payload_exception(str(underlying_exc))
533 set_exception(
534 self._payload_parser.payload,
535 reraised_exc,
536 underlying_exc,
537 )
539 payload_state = PayloadState.PAYLOAD_COMPLETE
540 data = b""
541 if isinstance(
542 underlying_exc, (InvalidHeader, TransferEncodingError)
543 ):
544 raise
546 self._payload_has_more_data = (
547 payload_state == PayloadState.PAYLOAD_HAS_PENDING_INPUT
548 )
550 if payload_state is not PayloadState.PAYLOAD_COMPLETE:
551 # We've either consumed all available data, or we're pausing
552 # until the reader buffer is freed up.
553 break
555 start_pos = 0
556 data_len = len(data)
557 self._payload_parser = None
559 if data and start_pos < data_len:
560 data = data[start_pos:]
561 else:
562 data = EMPTY
564 return messages, self._upgraded, data
566 def parse_headers(
567 self, lines: list[bytes]
568 ) -> tuple[HeadersDictProxy, RawHeaders, bool | None, str | None, bool, bool]:
569 """Parses RFC 5322 headers from a stream.
571 Line continuations are supported. Returns list of header name
572 and value pairs. Header name is in upper case.
573 """
574 headers, raw_headers = self._headers_parser.parse_headers(lines)
575 close_conn = None
576 encoding = None
577 upgrade = False
578 chunked = False
580 # keep-alive and protocol switching
581 # RFC 9110 section 7.6.1 defines Connection as a comma-separated list.
582 # We use a simple comma split here rather than getall() for performance,
583 # as the target tokens (close, keep-alive, upgrade) are simple ASCII
584 # values that never contain commas.
585 conn_values = headers.get(hdrs.CONNECTION)
586 if conn_values:
587 conn_tokens = {
588 token.lower()
589 for token in (part.strip(" \t") for part in conn_values.split(","))
590 if token and token.isascii()
591 }
593 if "close" in conn_tokens:
594 close_conn = True
595 elif "keep-alive" in conn_tokens:
596 close_conn = False
598 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
599 if "upgrade" in conn_tokens and headers.get(hdrs.UPGRADE):
600 upgrade = True
602 # encoding
603 enc = headers.get(hdrs.CONTENT_ENCODING, "")
604 if enc.isascii() and enc.lower() in {"gzip", "deflate", "br", "zstd"}:
605 encoding = enc
607 # chunking
608 te = headers.get(hdrs.TRANSFER_ENCODING)
609 if te is not None:
610 if self._is_chunked_te(te):
611 chunked = True
613 if hdrs.CONTENT_LENGTH in headers:
614 raise BadHttpMessage(
615 "Transfer-Encoding can't be present with Content-Length",
616 )
618 return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
620 def set_upgraded(self, val: bool) -> None:
621 """Set connection upgraded (to websocket) mode.
623 :param bool val: new state.
624 """
625 self._upgraded = val
628class HttpRequestParser(HttpParser[RawRequestMessage]):
629 """Read request status line.
631 Exception .http_exceptions.BadStatusLine
632 could be raised in case of any errors in status line.
633 Returns RawRequestMessage.
634 """
636 def parse_message(self, lines: list[bytes]) -> RawRequestMessage:
637 # request line
638 line = lines[0].decode("utf-8", "surrogateescape")
639 try:
640 method, path, version = line.split(" ", maxsplit=2)
641 except ValueError:
642 raise BadHttpMethod(line) from None
644 # method
645 if not TOKENRE.fullmatch(method):
646 raise BadHttpMethod(method)
648 # version
649 match = VERSRE.fullmatch(version)
650 if match is None:
651 raise BadStatusLine(line)
652 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
654 if method == "CONNECT":
655 # authority-form,
656 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
657 url = URL.build(authority=path, encoded=True)
658 elif path.startswith("/"):
659 # origin-form,
660 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
661 path_part, _hash_separator, url_fragment = path.partition("#")
662 path_part, _question_mark_separator, qs_part = path_part.partition("?")
664 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
665 # NOTE: parser does, otherwise it results into the same
666 # NOTE: HTTP Request-Line input producing different
667 # NOTE: `yarl.URL()` objects
668 url = URL.build(
669 path=path_part,
670 query_string=qs_part,
671 fragment=url_fragment,
672 encoded=True,
673 )
674 elif path == "*" and method == "OPTIONS":
675 # asterisk-form,
676 url = URL(path, encoded=True)
677 else:
678 # absolute-form for proxy maybe,
679 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
680 url = URL(path, encoded=True)
681 if url.scheme == "":
682 # not absolute-form
683 raise InvalidURLError(
684 path.encode(errors="surrogateescape").decode("latin1")
685 )
687 # read headers
688 (
689 headers,
690 raw_headers,
691 close,
692 compression,
693 upgrade,
694 chunked,
695 ) = self.parse_headers(lines[1:])
697 if version_o == HttpVersion11 and hdrs.HOST not in headers:
698 raise BadHttpMessage("Missing 'Host' header in request.")
700 if close is None: # then the headers weren't set in the request
701 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
702 close = True
703 else: # HTTP 1.1 must ask to close.
704 close = False
706 return RawRequestMessage(
707 method,
708 path,
709 version_o,
710 headers,
711 raw_headers,
712 close,
713 compression,
714 upgrade,
715 chunked,
716 url,
717 )
719 def _is_chunked_te(self, te: str) -> bool:
720 # https://www.rfc-editor.org/rfc/rfc9112#section-7.1-3
721 # "A sender MUST NOT apply the chunked transfer coding more
722 # than once to a message body"
723 parts = [p.strip(" \t") for p in te.split(",")]
724 chunked_count = sum(1 for p in parts if p.isascii() and p.lower() == "chunked")
725 if chunked_count > 1:
726 raise BadHttpMessage("Request has duplicate `chunked` Transfer-Encoding")
727 last = parts[-1]
728 # .lower() transforms some non-ascii chars, so must check first.
729 if last.isascii() and last.lower() == "chunked":
730 return True
731 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3
732 raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
735class HttpResponseParser(HttpParser[RawResponseMessage]):
736 """Read response status line and headers.
738 BadStatusLine could be raised in case of any errors in status line.
739 Returns RawResponseMessage.
740 """
742 protocol: "ResponseHandler"
744 # Lax mode should only be enabled on response parser.
745 lax = not DEBUG
747 def feed_data(
748 self,
749 data: bytes,
750 SEP: _SEP | None = None,
751 *args: Any,
752 **kwargs: Any,
753 ) -> tuple[list[tuple[RawResponseMessage, StreamReader]], bool, bytes]:
754 if SEP is None:
755 SEP = b"\r\n" if DEBUG else b"\n"
756 return super().feed_data(data, SEP, *args, **kwargs)
758 def parse_message(self, lines: list[bytes]) -> RawResponseMessage:
759 line = lines[0].decode("utf-8", "surrogateescape")
760 try:
761 version, status = line.split(maxsplit=1)
762 except ValueError:
763 raise BadStatusLine(line) from None
765 try:
766 status, reason = status.split(maxsplit=1)
767 except ValueError:
768 status = status.strip()
769 reason = ""
771 # version
772 match = VERSRE.fullmatch(version)
773 if match is None:
774 raise BadStatusLine(line)
775 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
777 # The status code is a three-digit ASCII number, no padding
778 if len(status) != 3 or not DIGITS.fullmatch(status):
779 raise BadStatusLine(line)
780 status_i = int(status)
782 # read headers
783 (
784 headers,
785 raw_headers,
786 close,
787 compression,
788 upgrade,
789 chunked,
790 ) = self.parse_headers(lines[1:])
792 if close is None:
793 if version_o <= HttpVersion10:
794 close = True
795 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
796 elif 100 <= status_i < 200 or status_i in {204, 304}:
797 close = False
798 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
799 close = False
800 else:
801 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
802 close = True
804 return RawResponseMessage(
805 version_o,
806 status_i,
807 reason.strip(),
808 headers,
809 raw_headers,
810 close,
811 compression,
812 upgrade,
813 chunked,
814 )
816 def _is_chunked_te(self, te: str) -> bool:
817 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2
818 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked"
821class HttpPayloadParser:
822 def __init__(
823 self,
824 payload: StreamReader,
825 length: int | None = None,
826 chunked: bool = False,
827 compression: str | None = None,
828 code: int | None = None,
829 method: str | None = None,
830 response_with_body: bool = True,
831 auto_decompress: bool = True,
832 lax: bool = False,
833 *,
834 headers_parser: HeadersParser,
835 max_line_size: int = 8190,
836 max_field_size: int = 8190,
837 max_trailers: int = 128,
838 limit: int = DEFAULT_CHUNK_SIZE,
839 ) -> None:
840 self._length = 0
841 self._paused = False
842 self._type = ParseState.PARSE_UNTIL_EOF
843 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
844 self._chunk_size = 0
845 self._chunk_tail = b""
846 self._auto_decompress = auto_decompress
847 self._lax = lax
848 self._headers_parser = headers_parser
849 self._max_line_size = max_line_size
850 self._max_field_size = max_field_size
851 self._max_trailers = max_trailers
852 self._more_data_available = False
853 self._trailer_lines: list[bytes] = []
854 self.done = False
855 self._eof_pending = False
857 # payload decompression wrapper
858 if response_with_body and compression and self._auto_decompress:
859 real_payload: StreamReader | DeflateBuffer = DeflateBuffer(
860 payload, compression, max_decompress_size=limit
861 )
862 else:
863 real_payload = payload
865 # payload parser
866 if not response_with_body:
867 # don't parse payload if it's not expected to be received
868 self._type = ParseState.PARSE_NONE
869 real_payload.feed_eof()
870 self.done = True
871 elif chunked:
872 self._type = ParseState.PARSE_CHUNKED
873 elif length is not None:
874 self._type = ParseState.PARSE_LENGTH
875 self._length = length
876 self._length_expected = length
877 if self._length == 0:
878 real_payload.feed_eof()
879 self.done = True
881 self.payload = real_payload
883 def pause_reading(self) -> None:
884 self._paused = True
886 def feed_eof(self) -> None:
887 if self._type == ParseState.PARSE_UNTIL_EOF:
888 self._eof_pending = True
889 while self._more_data_available:
890 if self._paused:
891 self._paused = False
892 return # Will resume via feed_data(b"") later
893 self._more_data_available = self.payload.feed_data(b"")
894 self.payload.feed_eof()
895 self.done = True
896 self._eof_pending = False
897 elif self._type == ParseState.PARSE_LENGTH:
898 received = self._length_expected - self._length
899 raise ContentLengthError(
900 f"Not enough data to satisfy content length header "
901 f"(received {received} of {self._length_expected} bytes)."
902 )
903 elif self._type == ParseState.PARSE_CHUNKED:
904 raise TransferEncodingError(
905 "Not enough data to satisfy transfer length header."
906 )
908 def feed_data(
909 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
910 ) -> tuple[PayloadState, bytes]:
911 """Receive a chunk of data to process.
913 Return:
914 PayloadState - The current state of payload processing.
915 This function may be called with empty bytes after returning
916 PAYLOAD_HAS_PENDING_INPUT to continue processing after a pause.
917 bytes - If payload is complete, this is the unconsumed bytes intended for the
918 next message/payload, b"" otherwise.
919 """
920 # Read specified amount of bytes
921 if self._type == ParseState.PARSE_LENGTH:
922 if self._chunk_tail:
923 chunk = self._chunk_tail + chunk
924 self._chunk_tail = b""
926 required = self._length
927 self._length = max(required - len(chunk), 0)
928 self._more_data_available = self.payload.feed_data(chunk[:required])
929 while self._more_data_available:
930 if self._paused:
931 self._paused = False
932 self._chunk_tail = chunk[required:]
933 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b""
934 self._more_data_available = self.payload.feed_data(b"")
936 if self._length == 0:
937 self.payload.feed_eof()
938 return PayloadState.PAYLOAD_COMPLETE, chunk[required:]
939 # Chunked transfer encoding parser
940 elif self._type == ParseState.PARSE_CHUNKED:
941 if self._chunk_tail:
942 # We should check the length is sane when not processing payload body.
943 if self._chunk != ChunkState.PARSE_CHUNKED_CHUNK:
944 max_line_length = self._max_line_size
945 if self._chunk == ChunkState.PARSE_TRAILERS:
946 max_line_length = self._max_field_size
947 if len(self._chunk_tail) > max_line_length:
948 raise LineTooLong(
949 self._chunk_tail[:100] + b"...", max_line_length
950 )
952 chunk = self._chunk_tail + chunk
953 self._chunk_tail = b""
955 while chunk or self._more_data_available:
956 # read next chunk size
957 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
958 pos = chunk.find(SEP)
959 if pos >= 0:
960 # Only chunk-size lines reach here; trailers enforce
961 # _max_field_size separately in PARSE_TRAILERS below.
962 if pos > self._max_line_size:
963 raise LineTooLong(chunk[:100] + b"...", self._max_line_size)
964 i = chunk.find(CHUNK_EXT, 0, pos)
965 if i >= 0:
966 size_b = chunk[:i] # strip chunk-extensions
967 # Verify no LF in the chunk-extension
968 if b"\n" in (ext := chunk[i:pos]):
969 exc = TransferEncodingError(
970 f"Unexpected LF in chunk-extension: {ext!r}"
971 )
972 set_exception(self.payload, exc)
973 raise exc
974 else:
975 size_b = chunk[:pos]
977 if self._lax: # Allow whitespace in lax mode.
978 size_b = size_b.strip()
980 if not re.fullmatch(HEXDIGITS, size_b):
981 exc = TransferEncodingError(
982 chunk[:pos].decode("ascii", "surrogateescape")
983 )
984 set_exception(self.payload, exc)
985 raise exc
986 size = int(bytes(size_b), 16)
988 chunk = chunk[pos + len(SEP) :]
989 if size == 0: # eof marker
990 self._chunk = ChunkState.PARSE_TRAILERS
991 if self._lax and chunk.startswith(b"\r"):
992 chunk = chunk[1:]
993 else:
994 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
995 self._chunk_size = size
996 self.payload.begin_http_chunk_receiving()
997 else:
998 self._chunk_tail = chunk
999 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1001 # read chunk and feed buffer
1002 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
1003 if self._paused:
1004 self._paused = False
1005 self._chunk_tail = chunk
1006 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b""
1008 required = self._chunk_size
1009 self._chunk_size = max(required - len(chunk), 0)
1010 self._more_data_available = self.payload.feed_data(chunk[:required])
1011 chunk = chunk[required:]
1013 if self._more_data_available:
1014 continue
1016 if self._chunk_size:
1017 self._paused = False
1018 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1019 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
1020 self.payload.end_http_chunk_receiving()
1022 # toss the CRLF at the end of the chunk
1023 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
1024 if self._lax and chunk.startswith(b"\r"):
1025 chunk = chunk[1:]
1026 if chunk[: len(SEP)] == SEP:
1027 chunk = chunk[len(SEP) :]
1028 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
1029 elif len(chunk) >= len(SEP) or chunk != SEP[: len(chunk)]:
1030 exc = TransferEncodingError(
1031 "Chunk size mismatch: expected CRLF after chunk data"
1032 )
1033 set_exception(self.payload, exc)
1034 raise exc
1035 else:
1036 self._chunk_tail = chunk
1037 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1039 if self._chunk == ChunkState.PARSE_TRAILERS:
1040 pos = chunk.find(SEP)
1041 if pos < 0: # No line found
1042 self._chunk_tail = chunk
1043 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1045 line = chunk[:pos]
1046 chunk = chunk[pos + len(SEP) :]
1047 if SEP == b"\n": # For lax response parsing
1048 line = line.rstrip(b"\r")
1050 if len(line) > self._max_field_size:
1051 raise LineTooLong(line[:100] + b"...", self._max_field_size)
1053 self._trailer_lines.append(line)
1055 if len(self._trailer_lines) > self._max_trailers:
1056 raise BadHttpMessage("Too many trailers received")
1058 # \r\n\r\n found, end of stream
1059 if self._trailer_lines[-1] == b"":
1060 # Headers and trailers are defined the same way,
1061 # so we reuse the HeadersParser here.
1062 try:
1063 trailers, raw_trailers = self._headers_parser.parse_headers(
1064 self._trailer_lines
1065 )
1066 finally:
1067 self._trailer_lines.clear()
1068 self.payload.feed_eof()
1069 return PayloadState.PAYLOAD_COMPLETE, chunk
1071 # Read all bytes until eof
1072 elif self._type == ParseState.PARSE_UNTIL_EOF:
1073 self._more_data_available = self.payload.feed_data(chunk)
1074 while self._more_data_available:
1075 if self._paused:
1076 self._paused = False
1077 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b""
1078 self._more_data_available = self.payload.feed_data(b"")
1080 if self._eof_pending:
1081 self.payload.feed_eof()
1082 self.done = True
1083 self._eof_pending = False
1084 return PayloadState.PAYLOAD_COMPLETE, b""
1086 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1089class DeflateBuffer:
1090 """DeflateStream decompress stream and feed data into specified stream."""
1092 def __init__(
1093 self,
1094 out: StreamReader,
1095 encoding: str | None,
1096 max_decompress_size: int = DEFAULT_CHUNK_SIZE,
1097 ) -> None:
1098 self.out = out
1099 self.size = 0
1100 out.total_compressed_bytes = self.size
1101 self.encoding = encoding
1102 self._started_decoding = False
1104 self.decompressor: BrotliDecompressor | ZLibDecompressor | ZSTDDecompressor
1105 if encoding == "br":
1106 if not HAS_BROTLI:
1107 raise ContentEncodingError(
1108 "Can not decode content-encoding: brotli (br). "
1109 "Please install `Brotli`"
1110 )
1111 self.decompressor = BrotliDecompressor()
1112 elif encoding == "zstd":
1113 if not HAS_ZSTD:
1114 raise ContentEncodingError(
1115 "Can not decode content-encoding: zstandard (zstd). "
1116 "Please install `backports.zstd`"
1117 )
1118 self.decompressor = ZSTDDecompressor()
1119 else:
1120 self.decompressor = ZLibDecompressor(encoding=encoding)
1122 self._max_decompress_size = max_decompress_size
1124 def set_exception(
1125 self,
1126 exc: type[BaseException] | BaseException,
1127 exc_cause: BaseException = _EXC_SENTINEL,
1128 ) -> None:
1129 set_exception(self.out, exc, exc_cause)
1131 def feed_data(self, chunk: bytes) -> bool:
1132 """Return True if more data is available and this method should be called again with b""."""
1133 self.size += len(chunk)
1134 self.out.total_compressed_bytes = self.size
1136 # RFC1950
1137 # bits 0..3 = CM = 0b1000 = 8 = "deflate"
1138 # bits 4..7 = CINFO = 1..7 = windows size.
1139 if (
1140 not self._started_decoding
1141 and self.encoding == "deflate"
1142 and chunk[0] & 0xF != 8
1143 ):
1144 # Change the decoder to decompress incorrectly compressed data
1145 # Actually we should issue a warning about non-RFC-compliant data.
1146 self.decompressor = ZLibDecompressor(
1147 encoding=self.encoding, suppress_deflate_header=True
1148 )
1150 low_water = self.out._low_water
1151 max_length = (
1152 0 if low_water >= sys.maxsize else max(self._max_decompress_size, low_water)
1153 )
1154 try:
1155 chunk = self.decompressor.decompress_sync(chunk, max_length=max_length)
1156 except Exception:
1157 raise ContentEncodingError(
1158 "Can not decode content-encoding: %s" % self.encoding
1159 )
1161 self._started_decoding = True
1163 if chunk:
1164 self.out.feed_data(chunk)
1165 return self.decompressor.data_available
1167 def feed_eof(self) -> None:
1168 chunk = self.decompressor.flush()
1169 # This should never contain data as we defer the call until exhausting
1170 # the decompression. If .flush() is returning data, this may indicate a
1171 # zip bomb vulnerability as it will decompress all remaining data at once.
1172 assert not chunk
1174 if self.size > 0:
1175 # decompressor is not brotli unless encoding is "br"
1176 if self.encoding == "deflate" and not self.decompressor.eof: # type: ignore[union-attr]
1177 raise ContentEncodingError("deflate")
1179 self.out.feed_eof()
1181 def begin_http_chunk_receiving(self) -> None:
1182 self.out.begin_http_chunk_receiving()
1184 def end_http_chunk_receiving(self) -> None:
1185 self.out.end_http_chunk_receiving()
1188HttpRequestParserPy = HttpRequestParser
1189HttpResponseParserPy = HttpResponseParser
1190RawRequestMessagePy = RawRequestMessage
1191RawResponseMessagePy = RawResponseMessage
1193with suppress(ImportError):
1194 if not NO_EXTENSIONS:
1195 from ._http_parser import ( # type: ignore[import-not-found,no-redef]
1196 HttpRequestParser,
1197 HttpResponseParser,
1198 RawRequestMessage,
1199 RawResponseMessage,
1200 )
1202 HttpRequestParserC = HttpRequestParser
1203 HttpResponseParserC = HttpResponseParser
1204 RawRequestMessageC = RawRequestMessage
1205 RawResponseMessageC = RawResponseMessage