Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/http_parser.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import asyncio
3import re
4import string
5import sys
6from contextlib import suppress
7from enum import IntEnum
8from re import Pattern
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 ClassVar,
13 Final,
14 Generic,
15 Literal,
16 NamedTuple,
17 TypeVar,
18)
20from multidict import CIMultiDict, CIMultiDictProxy, istr
21from yarl import URL
23from . import hdrs
24from .base_protocol import BaseProtocol
25from .compression_utils import (
26 HAS_BROTLI,
27 HAS_ZSTD,
28 BrotliDecompressor,
29 ZLibDecompressor,
30 ZSTDDecompressor,
31)
32from .helpers import (
33 _EXC_SENTINEL,
34 DEBUG,
35 DEFAULT_CHUNK_SIZE,
36 EMPTY_BODY_METHODS,
37 EMPTY_BODY_STATUS_CODES,
38 NO_EXTENSIONS,
39 BaseTimerContext,
40 set_exception,
41)
42from .http_exceptions import (
43 BadHttpMessage,
44 BadHttpMethod,
45 BadStatusLine,
46 ContentEncodingError,
47 ContentLengthError,
48 InvalidHeader,
49 InvalidURLError,
50 LineTooLong,
51 TransferEncodingError,
52)
53from .http_writer import HttpVersion, HttpVersion10, HttpVersion11
54from .streams import EMPTY_PAYLOAD, StreamReader
55from .typedefs import RawHeaders
57if TYPE_CHECKING:
58 from .client_proto import ResponseHandler
60__all__ = (
61 "HeadersParser",
62 "HttpParser",
63 "HttpRequestParser",
64 "HttpResponseParser",
65 "RawRequestMessage",
66 "RawResponseMessage",
67)
69_SEP = Literal[b"\r\n", b"\n"]
71ASCIISET: Final[set[str]] = set(string.printable)
73# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
74# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
75#
76# method = token
77# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
78# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
79# token = 1*tchar
80_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
81TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
82VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
83DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
84HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
85# https://www.rfc-editor.org/rfc/rfc9110#section-5.5-5
86_FIELD_VALUE_FORBIDDEN_CTL_RE: Final[Pattern[str]] = re.compile(
87 r"[\x00-\x08\x0a-\x1f\x7f]"
88)
90# RFC 9110 singleton headers — duplicates are rejected in strict mode.
91# In lax mode (response parser default), the check is skipped entirely
92# since real-world servers (e.g. Google APIs, Werkzeug) commonly send
93# duplicate headers like Content-Type or Server.
94# Lowercased for case-insensitive matching against wire names.
95SINGLETON_HEADERS: Final[frozenset[str]] = frozenset(
96 {
97 "content-length",
98 "content-location",
99 "content-range",
100 "content-type",
101 "etag",
102 "host",
103 "max-forwards",
104 "server",
105 "transfer-encoding",
106 "user-agent",
107 }
108)
111class RawRequestMessage(NamedTuple):
112 method: str
113 path: str
114 version: HttpVersion
115 headers: "CIMultiDictProxy[str]"
116 raw_headers: RawHeaders
117 should_close: bool
118 compression: str | None
119 upgrade: bool
120 chunked: bool
121 url: URL
124class RawResponseMessage(NamedTuple):
125 version: HttpVersion
126 code: int
127 reason: str
128 headers: CIMultiDictProxy[str]
129 raw_headers: RawHeaders
130 should_close: bool
131 compression: str | None
132 upgrade: bool
133 chunked: bool
136_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
139class PayloadState(IntEnum):
140 PAYLOAD_COMPLETE = 0
141 PAYLOAD_NEEDS_INPUT = 1
142 PAYLOAD_HAS_PENDING_INPUT = 2
145class ParseState(IntEnum):
147 PARSE_NONE = 0
148 PARSE_LENGTH = 1
149 PARSE_CHUNKED = 2
150 PARSE_UNTIL_EOF = 3
153class ChunkState(IntEnum):
154 PARSE_CHUNKED_SIZE = 0
155 PARSE_CHUNKED_CHUNK = 1
156 PARSE_CHUNKED_CHUNK_EOF = 2
157 PARSE_MAYBE_TRAILERS = 3
158 PARSE_TRAILERS = 4
161class HeadersParser:
162 def __init__(
163 self,
164 max_line_size: int = 8190,
165 max_headers: int = 32768,
166 max_field_size: int = 8190,
167 lax: bool = False,
168 ) -> None:
169 self.max_line_size = max_line_size
170 self.max_headers = max_headers
171 self.max_field_size = max_field_size
172 self._lax = lax
174 def parse_headers(
175 self, lines: list[bytes]
176 ) -> tuple["CIMultiDictProxy[str]", RawHeaders]:
177 headers: CIMultiDict[str] = CIMultiDict()
178 # note: "raw" does not mean inclusion of OWS before/after the field value
179 raw_headers = []
181 lines_idx = 0
182 line = lines[lines_idx]
183 line_count = len(lines)
185 while line:
186 # Parse initial header name : value pair.
187 try:
188 bname, bvalue = line.split(b":", 1)
189 except ValueError:
190 raise InvalidHeader(line) from None
192 if len(bname) == 0:
193 raise InvalidHeader(bname)
195 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
196 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
197 raise InvalidHeader(line)
199 bvalue = bvalue.lstrip(b" \t")
200 name = bname.decode("utf-8", "surrogateescape")
201 if not TOKENRE.fullmatch(name):
202 raise InvalidHeader(bname)
204 # next line
205 lines_idx += 1
206 line = lines[lines_idx]
208 # consume continuation lines
209 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
211 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
212 if continuation:
213 header_length = len(bvalue)
214 bvalue_lst = [bvalue]
215 while continuation:
216 header_length += len(line)
217 if header_length > self.max_field_size:
218 header_line = bname + b": " + b"".join(bvalue_lst)
219 raise LineTooLong(
220 header_line[:100] + b"...", self.max_field_size
221 )
222 bvalue_lst.append(line)
224 # next line
225 lines_idx += 1
226 if lines_idx < line_count:
227 line = lines[lines_idx]
228 if line:
229 continuation = line[0] in (32, 9) # (' ', '\t')
230 else:
231 line = b""
232 break
233 bvalue = b"".join(bvalue_lst)
235 bvalue = bvalue.strip(b" \t")
236 value = bvalue.decode("utf-8", "surrogateescape")
238 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
239 if self._lax:
240 if "\n" in value or "\r" in value or "\x00" in value:
241 raise InvalidHeader(bvalue)
242 elif _FIELD_VALUE_FORBIDDEN_CTL_RE.search(value):
243 raise InvalidHeader(bvalue)
245 if not self._lax and name in headers and name.lower() in SINGLETON_HEADERS:
246 raise BadHttpMessage(f"Duplicate '{name}' header found.")
247 headers.add(name, value)
248 raw_headers.append((bname, bvalue))
250 return (CIMultiDictProxy(headers), tuple(raw_headers))
253def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool:
254 """Check if the upgrade header is supported."""
255 u = headers.get(hdrs.UPGRADE, "")
256 # .lower() can transform non-ascii characters.
257 return u.isascii() and u.lower() in {"tcp", "websocket"}
260class HttpParser(abc.ABC, Generic[_MsgT]):
261 lax: ClassVar[bool] = False
263 def __init__(
264 self,
265 protocol: BaseProtocol | None = None,
266 loop: asyncio.AbstractEventLoop | None = None,
267 limit: int = 2**16,
268 max_line_size: int = 8190,
269 max_headers: int = 128,
270 max_field_size: int = 8190,
271 timer: BaseTimerContext | None = None,
272 code: int | None = None,
273 method: str | None = None,
274 payload_exception: type[BaseException] | None = None,
275 response_with_body: bool = True,
276 read_until_eof: bool = False,
277 auto_decompress: bool = True,
278 max_msg_queue_size: int = 0,
279 ) -> None:
280 self.protocol = protocol
281 self.loop = loop
282 self.max_line_size = max_line_size
283 self.max_headers = max_headers
284 self.max_field_size = max_field_size
285 self.max_headers = max_headers
286 self.timer = timer
287 self.code = code
288 self.method = method
289 self.payload_exception = payload_exception
290 self.response_with_body = response_with_body
291 self.read_until_eof = read_until_eof
293 self._lines: list[bytes] = []
294 self._tail = b""
295 self._upgraded = False
296 self._payload = None
297 self._payload_parser: HttpPayloadParser | None = None
298 self._payload_has_more_data = False
299 self._auto_decompress = auto_decompress
300 self._limit = limit
301 self._headers_parser = HeadersParser(
302 max_line_size, max_headers, max_field_size, self.lax
303 )
304 # Stop emitting messages once this many are queued unconsumed (0 = off).
305 self._max_msg_queue_size = max_msg_queue_size
306 self._msg_in_flight = 0
308 @abc.abstractmethod
309 def parse_message(self, lines: list[bytes]) -> _MsgT: ...
311 @abc.abstractmethod
312 def _is_chunked_te(self, te: str) -> bool: ...
314 def pause_reading(self) -> None:
315 assert self._payload_parser is not None
316 self._payload_parser.pause_reading()
318 def message_consumed(self) -> None:
319 """Protocol drained a queued message; free a slot for parsing."""
320 if self._msg_in_flight > 0:
321 self._msg_in_flight -= 1
323 def feed_eof(self) -> _MsgT | None:
324 if self._payload_parser is not None:
325 self._payload_parser.feed_eof()
326 if self._payload_parser.done:
327 self._payload_parser = None
328 else:
329 # try to extract partial message
330 if self._tail:
331 self._lines.append(self._tail)
333 if self._lines:
334 if self._lines[-1] != "\r\n":
335 self._lines.append(b"")
336 with suppress(Exception):
337 return self.parse_message(self._lines)
338 return None
340 def feed_data(
341 self,
342 data: bytes,
343 SEP: _SEP = b"\r\n",
344 EMPTY: bytes = b"",
345 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
346 METH_CONNECT: str = hdrs.METH_CONNECT,
347 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
348 ) -> tuple[list[tuple[_MsgT, StreamReader]], bool, bytes]:
350 messages = []
352 if self._tail:
353 data, self._tail = self._tail + data, b""
355 data_len = len(data)
356 start_pos = 0
357 loop = self.loop
358 max_line_length = self.max_line_size
360 should_close = False
361 while start_pos < data_len or self._payload_has_more_data:
362 # read HTTP message (request/response line + headers), \r\n\r\n
363 # and split by lines
364 if self._payload_parser is None and not self._upgraded:
365 if (
366 self._max_msg_queue_size
367 and self._msg_in_flight >= self._max_msg_queue_size
368 ):
369 # Queue full: buffer the rest and stop. Safe pause point;
370 # any preceding body is consumed before the next request
371 # line. Resumes via feed_data(b"") when the queue drains.
372 self._tail = data[start_pos:]
373 break
374 pos = data.find(SEP, start_pos)
375 # consume \r\n
376 if pos == start_pos and not self._lines:
377 start_pos = pos + len(SEP)
378 continue
380 if pos >= start_pos:
381 if should_close:
382 raise BadHttpMessage("Data after `Connection: close`")
384 # line found
385 line = data[start_pos:pos]
386 if SEP == b"\n": # For lax response parsing
387 line = line.rstrip(b"\r")
388 if len(line) > max_line_length:
389 raise LineTooLong(line[:100] + b"...", max_line_length)
391 self._lines.append(line)
392 # After processing the status/request line, everything is a header.
393 max_line_length = self.max_field_size
395 if len(self._lines) > self.max_headers:
396 raise BadHttpMessage("Too many headers received")
398 start_pos = pos + len(SEP)
400 # \r\n\r\n found
401 if self._lines[-1] == EMPTY:
402 max_trailers = self.max_headers - len(self._lines)
403 try:
404 msg: _MsgT = self.parse_message(self._lines)
405 finally:
406 self._lines.clear()
408 def get_content_length() -> int | None:
409 # payload length
410 length_hdr = msg.headers.get(CONTENT_LENGTH)
411 if length_hdr is None:
412 return None
414 # Shouldn't allow +/- or other number formats.
415 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
416 # msg.headers is already stripped of leading/trailing wsp
417 if not DIGITS.fullmatch(length_hdr):
418 raise InvalidHeader(CONTENT_LENGTH)
420 return int(length_hdr)
422 length = get_content_length()
423 # do not support old websocket spec
424 if SEC_WEBSOCKET_KEY1 in msg.headers:
425 raise InvalidHeader(SEC_WEBSOCKET_KEY1)
427 self._upgraded = msg.upgrade and _is_supported_upgrade(
428 msg.headers
429 )
431 method = getattr(msg, "method", self.method)
432 # code is only present on responses
433 code = getattr(msg, "code", 0)
435 assert self.protocol is not None
436 # calculate payload
437 empty_body = code in EMPTY_BODY_STATUS_CODES or bool(
438 method and method in EMPTY_BODY_METHODS
439 )
440 if not empty_body and (
441 ((length is not None and length > 0) or msg.chunked)
442 and not self._upgraded
443 ):
444 payload = StreamReader(
445 self.protocol,
446 timer=self.timer,
447 loop=loop,
448 limit=self._limit,
449 )
450 payload_parser = HttpPayloadParser(
451 payload,
452 length=length,
453 chunked=msg.chunked,
454 method=method,
455 compression=msg.compression,
456 code=self.code,
457 response_with_body=self.response_with_body,
458 auto_decompress=self._auto_decompress,
459 lax=self.lax,
460 headers_parser=self._headers_parser,
461 max_line_size=self.max_line_size,
462 max_field_size=self.max_field_size,
463 max_trailers=max_trailers,
464 limit=self._limit,
465 )
466 if not payload_parser.done:
467 self._payload_parser = payload_parser
468 elif method == METH_CONNECT:
469 assert isinstance(msg, RawRequestMessage)
470 payload = StreamReader(
471 self.protocol,
472 timer=self.timer,
473 loop=loop,
474 limit=self._limit,
475 )
476 self._upgraded = True
477 self._payload_parser = HttpPayloadParser(
478 payload,
479 method=msg.method,
480 compression=msg.compression,
481 auto_decompress=self._auto_decompress,
482 lax=self.lax,
483 headers_parser=self._headers_parser,
484 max_line_size=self.max_line_size,
485 max_field_size=self.max_field_size,
486 max_trailers=max_trailers,
487 limit=self._limit,
488 )
489 elif not empty_body and length is None and self.read_until_eof:
490 payload = StreamReader(
491 self.protocol,
492 timer=self.timer,
493 loop=loop,
494 limit=self._limit,
495 )
496 payload_parser = HttpPayloadParser(
497 payload,
498 length=length,
499 chunked=msg.chunked,
500 method=method,
501 compression=msg.compression,
502 code=self.code,
503 response_with_body=self.response_with_body,
504 auto_decompress=self._auto_decompress,
505 lax=self.lax,
506 headers_parser=self._headers_parser,
507 max_line_size=self.max_line_size,
508 max_field_size=self.max_field_size,
509 max_trailers=max_trailers,
510 limit=self._limit,
511 )
512 if not payload_parser.done:
513 self._payload_parser = payload_parser
514 else:
515 payload = EMPTY_PAYLOAD
517 messages.append((msg, payload))
518 if self._max_msg_queue_size:
519 self._msg_in_flight += 1
520 should_close = msg.should_close
521 else:
522 self._tail = data[start_pos:]
523 if len(self._tail) > self.max_line_size:
524 raise LineTooLong(self._tail[:100] + b"...", self.max_line_size)
525 data = EMPTY
526 break
528 # no parser, just store
529 elif self._payload_parser is None and self._upgraded:
530 assert not self._lines
531 break
533 # feed payload
534 else:
535 assert not self._lines
536 assert self._payload_parser is not None
537 try:
538 payload_state, data = self._payload_parser.feed_data(
539 data[start_pos:], SEP
540 )
541 except Exception as underlying_exc:
542 reraised_exc: BaseException = underlying_exc
543 if self.payload_exception is not None:
544 reraised_exc = self.payload_exception(str(underlying_exc))
546 set_exception(
547 self._payload_parser.payload,
548 reraised_exc,
549 underlying_exc,
550 )
552 payload_state = PayloadState.PAYLOAD_COMPLETE
553 data = b""
554 if isinstance(
555 underlying_exc, (InvalidHeader, TransferEncodingError)
556 ):
557 raise
559 self._payload_has_more_data = (
560 payload_state == PayloadState.PAYLOAD_HAS_PENDING_INPUT
561 )
563 if payload_state is not PayloadState.PAYLOAD_COMPLETE:
564 # We've either consumed all available data, or we're pausing
565 # until the reader buffer is freed up.
566 break
568 start_pos = 0
569 data_len = len(data)
570 self._payload_parser = None
572 if data and start_pos < data_len:
573 data = data[start_pos:]
574 else:
575 data = EMPTY
577 return messages, self._upgraded, data
579 def parse_headers(
580 self, lines: list[bytes]
581 ) -> tuple[
582 "CIMultiDictProxy[str]", RawHeaders, bool | None, str | None, bool, bool
583 ]:
584 """Parses RFC 5322 headers from a stream.
586 Line continuations are supported. Returns list of header name
587 and value pairs. Header name is in upper case.
588 """
589 headers, raw_headers = self._headers_parser.parse_headers(lines)
590 close_conn = None
591 encoding = None
592 upgrade = False
593 chunked = False
595 # keep-alive and protocol switching
596 # RFC 9110 section 7.6.1 defines Connection as a comma-separated list.
597 conn_values = headers.getall(hdrs.CONNECTION, ())
598 if conn_values:
599 conn_tokens = {
600 token.lower()
601 for conn_value in conn_values
602 for token in (part.strip(" \t") for part in conn_value.split(","))
603 if token and token.isascii()
604 }
606 if "close" in conn_tokens:
607 close_conn = True
608 elif "keep-alive" in conn_tokens:
609 close_conn = False
611 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
612 if "upgrade" in conn_tokens and headers.get(hdrs.UPGRADE):
613 upgrade = True
615 # encoding
616 enc = headers.get(hdrs.CONTENT_ENCODING, "")
617 if enc.isascii() and enc.lower() in {"gzip", "deflate", "br", "zstd"}:
618 encoding = enc
620 # chunking
621 te = headers.get(hdrs.TRANSFER_ENCODING)
622 if te is not None:
623 if self._is_chunked_te(te):
624 chunked = True
626 if hdrs.CONTENT_LENGTH in headers:
627 raise BadHttpMessage(
628 "Transfer-Encoding can't be present with Content-Length",
629 )
631 return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
633 def set_upgraded(self, val: bool) -> None:
634 """Set connection upgraded (to websocket) mode.
636 :param bool val: new state.
637 """
638 self._upgraded = val
641class HttpRequestParser(HttpParser[RawRequestMessage]):
642 """Read request status line.
644 Exception .http_exceptions.BadStatusLine
645 could be raised in case of any errors in status line.
646 Returns RawRequestMessage.
647 """
649 def parse_message(self, lines: list[bytes]) -> RawRequestMessage:
650 # request line
651 line = lines[0].decode("utf-8", "surrogateescape")
652 try:
653 method, path, version = line.split(" ", maxsplit=2)
654 except ValueError:
655 raise BadHttpMethod(line) from None
657 # method
658 if not TOKENRE.fullmatch(method):
659 raise BadHttpMethod(method)
661 # version
662 match = VERSRE.fullmatch(version)
663 if match is None:
664 raise BadStatusLine(line)
665 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
667 if method == "CONNECT":
668 # authority-form,
669 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
670 url = URL.build(authority=path, encoded=True)
671 elif path.startswith("/"):
672 # origin-form,
673 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
674 path_part, _hash_separator, url_fragment = path.partition("#")
675 path_part, _question_mark_separator, qs_part = path_part.partition("?")
677 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
678 # NOTE: parser does, otherwise it results into the same
679 # NOTE: HTTP Request-Line input producing different
680 # NOTE: `yarl.URL()` objects
681 url = URL.build(
682 path=path_part,
683 query_string=qs_part,
684 fragment=url_fragment,
685 encoded=True,
686 )
687 elif path == "*" and method == "OPTIONS":
688 # asterisk-form,
689 url = URL(path, encoded=True)
690 else:
691 # absolute-form for proxy maybe,
692 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
693 url = URL(path, encoded=True)
694 if url.scheme == "":
695 # not absolute-form
696 raise InvalidURLError(
697 path.encode(errors="surrogateescape").decode("latin1")
698 )
700 # read headers
701 (
702 headers,
703 raw_headers,
704 close,
705 compression,
706 upgrade,
707 chunked,
708 ) = self.parse_headers(lines[1:])
710 if version_o == HttpVersion11 and hdrs.HOST not in headers:
711 raise BadHttpMessage("Missing 'Host' header in request.")
713 if close is None: # then the headers weren't set in the request
714 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
715 close = True
716 else: # HTTP 1.1 must ask to close.
717 close = False
719 return RawRequestMessage(
720 method,
721 path,
722 version_o,
723 headers,
724 raw_headers,
725 close,
726 compression,
727 upgrade,
728 chunked,
729 url,
730 )
732 def _is_chunked_te(self, te: str) -> bool:
733 te = te.rsplit(",", maxsplit=1)[-1].strip(" \t")
734 # .lower() transforms some non-ascii chars, so must check first.
735 if te.isascii() and te.lower() == "chunked":
736 return True
737 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3
738 raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
741class HttpResponseParser(HttpParser[RawResponseMessage]):
742 """Read response status line and headers.
744 BadStatusLine could be raised in case of any errors in status line.
745 Returns RawResponseMessage.
746 """
748 protocol: "ResponseHandler"
750 # Lax mode should only be enabled on response parser.
751 lax = not DEBUG
753 def feed_data(
754 self,
755 data: bytes,
756 SEP: _SEP | None = None,
757 *args: Any,
758 **kwargs: Any,
759 ) -> tuple[list[tuple[RawResponseMessage, StreamReader]], bool, bytes]:
760 if SEP is None:
761 SEP = b"\r\n" if DEBUG else b"\n"
762 return super().feed_data(data, SEP, *args, **kwargs)
764 def parse_message(self, lines: list[bytes]) -> RawResponseMessage:
765 line = lines[0].decode("utf-8", "surrogateescape")
766 try:
767 version, status = line.split(maxsplit=1)
768 except ValueError:
769 raise BadStatusLine(line) from None
771 try:
772 status, reason = status.split(maxsplit=1)
773 except ValueError:
774 status = status.strip()
775 reason = ""
777 # version
778 match = VERSRE.fullmatch(version)
779 if match is None:
780 raise BadStatusLine(line)
781 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
783 # The status code is a three-digit ASCII number, no padding
784 if len(status) != 3 or not DIGITS.fullmatch(status):
785 raise BadStatusLine(line)
786 status_i = int(status)
788 # read headers
789 (
790 headers,
791 raw_headers,
792 close,
793 compression,
794 upgrade,
795 chunked,
796 ) = self.parse_headers(lines[1:])
798 if close is None:
799 if version_o <= HttpVersion10:
800 close = True
801 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
802 elif 100 <= status_i < 200 or status_i in {204, 304}:
803 close = False
804 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
805 close = False
806 else:
807 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
808 close = True
810 return RawResponseMessage(
811 version_o,
812 status_i,
813 reason.strip(),
814 headers,
815 raw_headers,
816 close,
817 compression,
818 upgrade,
819 chunked,
820 )
822 def _is_chunked_te(self, te: str) -> bool:
823 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2
824 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked"
827class HttpPayloadParser:
828 def __init__(
829 self,
830 payload: StreamReader,
831 length: int | None = None,
832 chunked: bool = False,
833 compression: str | None = None,
834 code: int | None = None,
835 method: str | None = None,
836 response_with_body: bool = True,
837 auto_decompress: bool = True,
838 lax: bool = False,
839 *,
840 headers_parser: HeadersParser,
841 max_line_size: int = 8190,
842 max_field_size: int = 8190,
843 max_trailers: int = 128,
844 limit: int = DEFAULT_CHUNK_SIZE,
845 ) -> None:
846 self._length = 0
847 self._paused = False
848 self._type = ParseState.PARSE_UNTIL_EOF
849 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
850 self._chunk_size = 0
851 self._chunk_tail = b""
852 self._auto_decompress = auto_decompress
853 self._lax = lax
854 self._headers_parser = headers_parser
855 self._max_line_size = max_line_size
856 self._max_field_size = max_field_size
857 self._max_trailers = max_trailers
858 self._more_data_available = False
859 self._trailer_lines: list[bytes] = []
860 self.done = False
861 self._eof_pending = False
863 # payload decompression wrapper
864 if response_with_body and compression and self._auto_decompress:
865 real_payload: StreamReader | DeflateBuffer = DeflateBuffer(
866 payload, compression, max_decompress_size=limit
867 )
868 else:
869 real_payload = payload
871 # payload parser
872 if not response_with_body:
873 # don't parse payload if it's not expected to be received
874 self._type = ParseState.PARSE_NONE
875 real_payload.feed_eof()
876 self.done = True
877 elif chunked:
878 self._type = ParseState.PARSE_CHUNKED
879 elif length is not None:
880 self._type = ParseState.PARSE_LENGTH
881 self._length = length
882 self._length_expected = length
883 if self._length == 0:
884 real_payload.feed_eof()
885 self.done = True
887 self.payload = real_payload
889 def pause_reading(self) -> None:
890 self._paused = True
892 def feed_eof(self) -> None:
893 if self._type == ParseState.PARSE_UNTIL_EOF:
894 self._eof_pending = True
895 while self._more_data_available:
896 if self._paused:
897 self._paused = False
898 return # Will resume via feed_data(b"") later
899 self._more_data_available = self.payload.feed_data(b"", 0)
900 self.payload.feed_eof()
901 self.done = True
902 self._eof_pending = False
903 elif self._type == ParseState.PARSE_LENGTH:
904 received = self._length_expected - self._length
905 raise ContentLengthError(
906 f"Not enough data to satisfy content length header "
907 f"(received {received} of {self._length_expected} bytes)."
908 )
909 elif self._type == ParseState.PARSE_CHUNKED:
910 raise TransferEncodingError(
911 "Not enough data to satisfy transfer length header."
912 )
914 def feed_data(
915 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
916 ) -> tuple[PayloadState, bytes]:
917 """Receive a chunk of data to process.
919 Return:
920 PayloadState - The current state of payload processing.
921 This function may be called with empty bytes after returning
922 PAYLOAD_HAS_PENDING_INPUT to continue processing after a pause.
923 bytes - If payload is complete, this is the unconsumed bytes intended for the
924 next message/payload, b"" otherwise.
925 """
926 # Read specified amount of bytes
927 if self._type == ParseState.PARSE_LENGTH:
928 if self._chunk_tail:
929 chunk = self._chunk_tail + chunk
930 self._chunk_tail = b""
932 required = self._length
933 self._length = max(required - len(chunk), 0)
934 self._more_data_available = self.payload.feed_data(
935 chunk[:required], required
936 )
937 while self._more_data_available:
938 if self._paused:
939 self._paused = False
940 self._chunk_tail = chunk[required:]
941 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b""
942 self._more_data_available = self.payload.feed_data(b"", 0)
944 if self._length == 0:
945 self.payload.feed_eof()
946 return PayloadState.PAYLOAD_COMPLETE, chunk[required:]
947 # Chunked transfer encoding parser
948 elif self._type == ParseState.PARSE_CHUNKED:
949 if self._chunk_tail:
950 # We should check the length is sane when not processing payload body.
951 if self._chunk != ChunkState.PARSE_CHUNKED_CHUNK:
952 max_line_length = self._max_line_size
953 if self._chunk == ChunkState.PARSE_TRAILERS:
954 max_line_length = self._max_field_size
955 if len(self._chunk_tail) > max_line_length:
956 raise LineTooLong(
957 self._chunk_tail[:100] + b"...", max_line_length
958 )
960 chunk = self._chunk_tail + chunk
961 self._chunk_tail = b""
963 while chunk or self._more_data_available:
964 # read next chunk size
965 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
966 pos = chunk.find(SEP)
967 if pos >= 0:
968 # Only chunk-size lines reach here; trailers enforce
969 # _max_field_size separately in PARSE_TRAILERS below.
970 if pos > self._max_line_size:
971 raise LineTooLong(chunk[:100] + b"...", self._max_line_size)
972 i = chunk.find(CHUNK_EXT, 0, pos)
973 if i >= 0:
974 size_b = chunk[:i] # strip chunk-extensions
975 # Verify no LF in the chunk-extension
976 if b"\n" in (ext := chunk[i:pos]):
977 exc = TransferEncodingError(
978 f"Unexpected LF in chunk-extension: {ext!r}"
979 )
980 set_exception(self.payload, exc)
981 raise exc
982 else:
983 size_b = chunk[:pos]
985 if self._lax: # Allow whitespace in lax mode.
986 size_b = size_b.strip()
988 if not re.fullmatch(HEXDIGITS, size_b):
989 exc = TransferEncodingError(
990 chunk[:pos].decode("ascii", "surrogateescape")
991 )
992 set_exception(self.payload, exc)
993 raise exc
994 size = int(bytes(size_b), 16)
996 chunk = chunk[pos + len(SEP) :]
997 if size == 0: # eof marker
998 self._chunk = ChunkState.PARSE_TRAILERS
999 if self._lax and chunk.startswith(b"\r"):
1000 chunk = chunk[1:]
1001 else:
1002 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
1003 self._chunk_size = size
1004 self.payload.begin_http_chunk_receiving()
1005 else:
1006 self._chunk_tail = chunk
1007 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1009 # read chunk and feed buffer
1010 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
1011 if self._paused:
1012 self._paused = False
1013 self._chunk_tail = chunk
1014 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b""
1016 required = self._chunk_size
1017 self._chunk_size = max(required - len(chunk), 0)
1018 self._more_data_available = self.payload.feed_data(
1019 chunk[:required], required
1020 )
1021 chunk = chunk[required:]
1023 if self._more_data_available:
1024 continue
1026 if self._chunk_size:
1027 self._paused = False
1028 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1029 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
1030 self.payload.end_http_chunk_receiving()
1032 # toss the CRLF at the end of the chunk
1033 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
1034 if self._lax and chunk.startswith(b"\r"):
1035 chunk = chunk[1:]
1036 if chunk[: len(SEP)] == SEP:
1037 chunk = chunk[len(SEP) :]
1038 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
1039 elif len(chunk) >= len(SEP) or chunk != SEP[: len(chunk)]:
1040 exc = TransferEncodingError(
1041 "Chunk size mismatch: expected CRLF after chunk data"
1042 )
1043 set_exception(self.payload, exc)
1044 raise exc
1045 else:
1046 self._chunk_tail = chunk
1047 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1049 if self._chunk == ChunkState.PARSE_TRAILERS:
1050 pos = chunk.find(SEP)
1051 if pos < 0: # No line found
1052 self._chunk_tail = chunk
1053 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1055 line = chunk[:pos]
1056 chunk = chunk[pos + len(SEP) :]
1057 if SEP == b"\n": # For lax response parsing
1058 line = line.rstrip(b"\r")
1060 if len(line) > self._max_field_size:
1061 raise LineTooLong(line[:100] + b"...", self._max_field_size)
1063 self._trailer_lines.append(line)
1065 if len(self._trailer_lines) > self._max_trailers:
1066 raise BadHttpMessage("Too many trailers received")
1068 # \r\n\r\n found, end of stream
1069 if self._trailer_lines[-1] == b"":
1070 # Headers and trailers are defined the same way,
1071 # so we reuse the HeadersParser here.
1072 try:
1073 trailers, raw_trailers = self._headers_parser.parse_headers(
1074 self._trailer_lines
1075 )
1076 finally:
1077 self._trailer_lines.clear()
1078 self.payload.feed_eof()
1079 return PayloadState.PAYLOAD_COMPLETE, chunk
1081 # Read all bytes until eof
1082 elif self._type == ParseState.PARSE_UNTIL_EOF:
1083 self._more_data_available = self.payload.feed_data(chunk, len(chunk))
1084 while self._more_data_available:
1085 if self._paused:
1086 self._paused = False
1087 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b""
1088 self._more_data_available = self.payload.feed_data(b"", 0)
1090 if self._eof_pending:
1091 self.payload.feed_eof()
1092 self.done = True
1093 self._eof_pending = False
1094 return PayloadState.PAYLOAD_COMPLETE, b""
1096 return PayloadState.PAYLOAD_NEEDS_INPUT, b""
1099class DeflateBuffer:
1100 """DeflateStream decompress stream and feed data into specified stream."""
1102 decompressor: Any
1104 def __init__(
1105 self,
1106 out: StreamReader,
1107 encoding: str | None,
1108 max_decompress_size: int = DEFAULT_CHUNK_SIZE,
1109 ) -> None:
1110 self.out = out
1111 self.size = 0
1112 out.total_compressed_bytes = self.size
1113 self.encoding = encoding
1114 self._started_decoding = False
1116 self.decompressor: BrotliDecompressor | ZLibDecompressor | ZSTDDecompressor
1117 if encoding == "br":
1118 if not HAS_BROTLI: # pragma: no cover
1119 raise ContentEncodingError(
1120 "Can not decode content-encoding: brotli (br). "
1121 "Please install `Brotli`"
1122 )
1123 self.decompressor = BrotliDecompressor()
1124 elif encoding == "zstd":
1125 if not HAS_ZSTD:
1126 raise ContentEncodingError(
1127 "Can not decode content-encoding: zstandard (zstd). "
1128 "Please install `backports.zstd`"
1129 )
1130 self.decompressor = ZSTDDecompressor()
1131 else:
1132 self.decompressor = ZLibDecompressor(encoding=encoding)
1134 self._max_decompress_size = max_decompress_size
1136 def set_exception(
1137 self,
1138 exc: BaseException,
1139 exc_cause: BaseException = _EXC_SENTINEL,
1140 ) -> None:
1141 set_exception(self.out, exc, exc_cause)
1143 def feed_data(self, chunk: bytes, size: int) -> bool:
1144 self.size += size
1145 self.out.total_compressed_bytes = self.size
1147 # RFC1950
1148 # bits 0..3 = CM = 0b1000 = 8 = "deflate"
1149 # bits 4..7 = CINFO = 1..7 = windows size.
1150 if (
1151 not self._started_decoding
1152 and self.encoding == "deflate"
1153 and chunk[0] & 0xF != 8
1154 ):
1155 # Change the decoder to decompress incorrectly compressed data
1156 # Actually we should issue a warning about non-RFC-compliant data.
1157 self.decompressor = ZLibDecompressor(
1158 encoding=self.encoding, suppress_deflate_header=True
1159 )
1161 low_water = self.out._low_water
1162 max_length = (
1163 0 if low_water >= sys.maxsize else max(self._max_decompress_size, low_water)
1164 )
1165 try:
1166 chunk = self.decompressor.decompress_sync(chunk, max_length=max_length)
1167 except Exception:
1168 raise ContentEncodingError(
1169 "Can not decode content-encoding: %s" % self.encoding
1170 )
1172 self._started_decoding = True
1174 if chunk:
1175 self.out.feed_data(chunk, len(chunk))
1176 return self.decompressor.data_available # type: ignore[no-any-return]
1178 def feed_eof(self) -> None:
1179 chunk = self.decompressor.flush()
1180 # This should never contain data as we defer the call until exhausting
1181 # the decompression. If .flush() is returning data, this may indicate a
1182 # zip bomb vulnerability as it will decompress all remaining data at once.
1183 assert not chunk
1185 if self.size > 0:
1186 if self.encoding == "deflate" and not self.decompressor.eof:
1187 raise ContentEncodingError("deflate")
1189 self.out.feed_eof()
1191 def begin_http_chunk_receiving(self) -> None:
1192 self.out.begin_http_chunk_receiving()
1194 def end_http_chunk_receiving(self) -> None:
1195 self.out.end_http_chunk_receiving()
1198HttpRequestParserPy = HttpRequestParser
1199HttpResponseParserPy = HttpResponseParser
1200RawRequestMessagePy = RawRequestMessage
1201RawResponseMessagePy = RawResponseMessage
1203try:
1204 if not NO_EXTENSIONS:
1205 from ._http_parser import ( # type: ignore[import-not-found,no-redef]
1206 HttpRequestParser,
1207 HttpResponseParser,
1208 RawRequestMessage,
1209 RawResponseMessage,
1210 )
1212 HttpRequestParserC = HttpRequestParser
1213 HttpResponseParserC = HttpResponseParser
1214 RawRequestMessageC = RawRequestMessage
1215 RawResponseMessageC = RawResponseMessage
1216except ImportError: # pragma: no cover
1217 pass