Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/http_parser.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import asyncio
3import re
4import string
5from contextlib import suppress
6from enum import IntEnum
7from typing import (
8 Any,
9 ClassVar,
10 Final,
11 Generic,
12 List,
13 Literal,
14 NamedTuple,
15 Optional,
16 Pattern,
17 Set,
18 Tuple,
19 Type,
20 TypeVar,
21 Union,
22)
24from multidict import CIMultiDict, CIMultiDictProxy, istr
25from yarl import URL
27from . import hdrs
28from .base_protocol import BaseProtocol
29from .compression_utils import (
30 DEFAULT_MAX_DECOMPRESS_SIZE,
31 HAS_BROTLI,
32 HAS_ZSTD,
33 BrotliDecompressor,
34 ZLibDecompressor,
35 ZSTDDecompressor,
36)
37from .helpers import (
38 _EXC_SENTINEL,
39 DEBUG,
40 EMPTY_BODY_METHODS,
41 EMPTY_BODY_STATUS_CODES,
42 NO_EXTENSIONS,
43 BaseTimerContext,
44 set_exception,
45)
46from .http_exceptions import (
47 BadHttpMessage,
48 BadHttpMethod,
49 BadStatusLine,
50 ContentEncodingError,
51 ContentLengthError,
52 DecompressSizeError,
53 InvalidHeader,
54 InvalidURLError,
55 LineTooLong,
56 TransferEncodingError,
57)
58from .http_writer import HttpVersion, HttpVersion10
59from .streams import EMPTY_PAYLOAD, StreamReader
60from .typedefs import RawHeaders
62__all__ = (
63 "HeadersParser",
64 "HttpParser",
65 "HttpRequestParser",
66 "HttpResponseParser",
67 "RawRequestMessage",
68 "RawResponseMessage",
69)
71_SEP = Literal[b"\r\n", b"\n"]
73ASCIISET: Final[Set[str]] = set(string.printable)
75# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
76# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
77#
78# method = token
79# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
80# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
81# token = 1*tchar
82_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
83TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
84VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
85DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
86HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
89class RawRequestMessage(NamedTuple):
90 method: str
91 path: str
92 version: HttpVersion
93 headers: "CIMultiDictProxy[str]"
94 raw_headers: RawHeaders
95 should_close: bool
96 compression: Optional[str]
97 upgrade: bool
98 chunked: bool
99 url: URL
102class RawResponseMessage(NamedTuple):
103 version: HttpVersion
104 code: int
105 reason: str
106 headers: CIMultiDictProxy[str]
107 raw_headers: RawHeaders
108 should_close: bool
109 compression: Optional[str]
110 upgrade: bool
111 chunked: bool
114_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
117class ParseState(IntEnum):
119 PARSE_NONE = 0
120 PARSE_LENGTH = 1
121 PARSE_CHUNKED = 2
122 PARSE_UNTIL_EOF = 3
125class ChunkState(IntEnum):
126 PARSE_CHUNKED_SIZE = 0
127 PARSE_CHUNKED_CHUNK = 1
128 PARSE_CHUNKED_CHUNK_EOF = 2
129 PARSE_MAYBE_TRAILERS = 3
130 PARSE_TRAILERS = 4
133class HeadersParser:
134 def __init__(
135 self,
136 max_line_size: int = 8190,
137 max_headers: int = 32768,
138 max_field_size: int = 8190,
139 lax: bool = False,
140 ) -> None:
141 self.max_line_size = max_line_size
142 self.max_headers = max_headers
143 self.max_field_size = max_field_size
144 self._lax = lax
146 def parse_headers(
147 self, lines: List[bytes]
148 ) -> Tuple["CIMultiDictProxy[str]", RawHeaders]:
149 headers: CIMultiDict[str] = CIMultiDict()
150 # note: "raw" does not mean inclusion of OWS before/after the field value
151 raw_headers = []
153 lines_idx = 0
154 line = lines[lines_idx]
155 line_count = len(lines)
157 while line:
158 # Parse initial header name : value pair.
159 try:
160 bname, bvalue = line.split(b":", 1)
161 except ValueError:
162 raise InvalidHeader(line) from None
164 if len(bname) == 0:
165 raise InvalidHeader(bname)
167 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
168 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
169 raise InvalidHeader(line)
171 bvalue = bvalue.lstrip(b" \t")
172 if len(bname) > self.max_field_size:
173 raise LineTooLong(
174 "request header name {}".format(
175 bname.decode("utf8", "backslashreplace")
176 ),
177 str(self.max_field_size),
178 str(len(bname)),
179 )
180 name = bname.decode("utf-8", "surrogateescape")
181 if not TOKENRE.fullmatch(name):
182 raise InvalidHeader(bname)
184 header_length = len(bvalue)
186 # next line
187 lines_idx += 1
188 line = lines[lines_idx]
190 # consume continuation lines
191 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
193 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
194 if continuation:
195 bvalue_lst = [bvalue]
196 while continuation:
197 header_length += len(line)
198 if header_length > self.max_field_size:
199 raise LineTooLong(
200 "request header field {}".format(
201 bname.decode("utf8", "backslashreplace")
202 ),
203 str(self.max_field_size),
204 str(header_length),
205 )
206 bvalue_lst.append(line)
208 # next line
209 lines_idx += 1
210 if lines_idx < line_count:
211 line = lines[lines_idx]
212 if line:
213 continuation = line[0] in (32, 9) # (' ', '\t')
214 else:
215 line = b""
216 break
217 bvalue = b"".join(bvalue_lst)
218 else:
219 if header_length > self.max_field_size:
220 raise LineTooLong(
221 "request header field {}".format(
222 bname.decode("utf8", "backslashreplace")
223 ),
224 str(self.max_field_size),
225 str(header_length),
226 )
228 bvalue = bvalue.strip(b" \t")
229 value = bvalue.decode("utf-8", "surrogateescape")
231 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
232 if "\n" in value or "\r" in value or "\x00" in value:
233 raise InvalidHeader(bvalue)
235 headers.add(name, value)
236 raw_headers.append((bname, bvalue))
238 return (CIMultiDictProxy(headers), tuple(raw_headers))
241def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool:
242 """Check if the upgrade header is supported."""
243 u = headers.get(hdrs.UPGRADE, "")
244 # .lower() can transform non-ascii characters.
245 return u.isascii() and u.lower() in {"tcp", "websocket"}
248class HttpParser(abc.ABC, Generic[_MsgT]):
249 lax: ClassVar[bool] = False
251 def __init__(
252 self,
253 protocol: Optional[BaseProtocol] = None,
254 loop: Optional[asyncio.AbstractEventLoop] = None,
255 limit: int = 2**16,
256 max_line_size: int = 8190,
257 max_headers: int = 32768,
258 max_field_size: int = 8190,
259 timer: Optional[BaseTimerContext] = None,
260 code: Optional[int] = None,
261 method: Optional[str] = None,
262 payload_exception: Optional[Type[BaseException]] = None,
263 response_with_body: bool = True,
264 read_until_eof: bool = False,
265 auto_decompress: bool = True,
266 ) -> None:
267 self.protocol = protocol
268 self.loop = loop
269 self.max_line_size = max_line_size
270 self.max_headers = max_headers
271 self.max_field_size = max_field_size
272 self.timer = timer
273 self.code = code
274 self.method = method
275 self.payload_exception = payload_exception
276 self.response_with_body = response_with_body
277 self.read_until_eof = read_until_eof
279 self._lines: List[bytes] = []
280 self._tail = b""
281 self._upgraded = False
282 self._payload = None
283 self._payload_parser: Optional[HttpPayloadParser] = None
284 self._auto_decompress = auto_decompress
285 self._limit = limit
286 self._headers_parser = HeadersParser(
287 max_line_size, max_headers, max_field_size, self.lax
288 )
290 @abc.abstractmethod
291 def parse_message(self, lines: List[bytes]) -> _MsgT: ...
293 @abc.abstractmethod
294 def _is_chunked_te(self, te: str) -> bool: ...
296 def feed_eof(self) -> Optional[_MsgT]:
297 if self._payload_parser is not None:
298 self._payload_parser.feed_eof()
299 self._payload_parser = None
300 else:
301 # try to extract partial message
302 if self._tail:
303 self._lines.append(self._tail)
305 if self._lines:
306 if self._lines[-1] != "\r\n":
307 self._lines.append(b"")
308 with suppress(Exception):
309 return self.parse_message(self._lines)
310 return None
312 def feed_data(
313 self,
314 data: bytes,
315 SEP: _SEP = b"\r\n",
316 EMPTY: bytes = b"",
317 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
318 METH_CONNECT: str = hdrs.METH_CONNECT,
319 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
320 ) -> Tuple[List[Tuple[_MsgT, StreamReader]], bool, bytes]:
322 messages = []
324 if self._tail:
325 data, self._tail = self._tail + data, b""
327 data_len = len(data)
328 start_pos = 0
329 loop = self.loop
331 should_close = False
332 while start_pos < data_len:
334 # read HTTP message (request/response line + headers), \r\n\r\n
335 # and split by lines
336 if self._payload_parser is None and not self._upgraded:
337 pos = data.find(SEP, start_pos)
338 # consume \r\n
339 if pos == start_pos and not self._lines:
340 start_pos = pos + len(SEP)
341 continue
343 if pos >= start_pos:
344 if should_close:
345 raise BadHttpMessage("Data after `Connection: close`")
347 # line found
348 line = data[start_pos:pos]
349 if SEP == b"\n": # For lax response parsing
350 line = line.rstrip(b"\r")
351 self._lines.append(line)
352 start_pos = pos + len(SEP)
354 # \r\n\r\n found
355 if self._lines[-1] == EMPTY:
356 try:
357 msg: _MsgT = self.parse_message(self._lines)
358 finally:
359 self._lines.clear()
361 def get_content_length() -> Optional[int]:
362 # payload length
363 length_hdr = msg.headers.get(CONTENT_LENGTH)
364 if length_hdr is None:
365 return None
367 # Shouldn't allow +/- or other number formats.
368 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
369 # msg.headers is already stripped of leading/trailing wsp
370 if not DIGITS.fullmatch(length_hdr):
371 raise InvalidHeader(CONTENT_LENGTH)
373 return int(length_hdr)
375 length = get_content_length()
376 # do not support old websocket spec
377 if SEC_WEBSOCKET_KEY1 in msg.headers:
378 raise InvalidHeader(SEC_WEBSOCKET_KEY1)
380 self._upgraded = msg.upgrade and _is_supported_upgrade(
381 msg.headers
382 )
384 method = getattr(msg, "method", self.method)
385 # code is only present on responses
386 code = getattr(msg, "code", 0)
388 assert self.protocol is not None
389 # calculate payload
390 empty_body = code in EMPTY_BODY_STATUS_CODES or bool(
391 method and method in EMPTY_BODY_METHODS
392 )
393 if not empty_body and (
394 ((length is not None and length > 0) or msg.chunked)
395 and not self._upgraded
396 ):
397 payload = StreamReader(
398 self.protocol,
399 timer=self.timer,
400 loop=loop,
401 limit=self._limit,
402 )
403 payload_parser = HttpPayloadParser(
404 payload,
405 length=length,
406 chunked=msg.chunked,
407 method=method,
408 compression=msg.compression,
409 code=self.code,
410 response_with_body=self.response_with_body,
411 auto_decompress=self._auto_decompress,
412 lax=self.lax,
413 headers_parser=self._headers_parser,
414 )
415 if not payload_parser.done:
416 self._payload_parser = payload_parser
417 elif method == METH_CONNECT:
418 assert isinstance(msg, RawRequestMessage)
419 payload = StreamReader(
420 self.protocol,
421 timer=self.timer,
422 loop=loop,
423 limit=self._limit,
424 )
425 self._upgraded = True
426 self._payload_parser = HttpPayloadParser(
427 payload,
428 method=msg.method,
429 compression=msg.compression,
430 auto_decompress=self._auto_decompress,
431 lax=self.lax,
432 headers_parser=self._headers_parser,
433 )
434 elif not empty_body and length is None and self.read_until_eof:
435 payload = StreamReader(
436 self.protocol,
437 timer=self.timer,
438 loop=loop,
439 limit=self._limit,
440 )
441 payload_parser = HttpPayloadParser(
442 payload,
443 length=length,
444 chunked=msg.chunked,
445 method=method,
446 compression=msg.compression,
447 code=self.code,
448 response_with_body=self.response_with_body,
449 auto_decompress=self._auto_decompress,
450 lax=self.lax,
451 headers_parser=self._headers_parser,
452 )
453 if not payload_parser.done:
454 self._payload_parser = payload_parser
455 else:
456 payload = EMPTY_PAYLOAD
458 messages.append((msg, payload))
459 should_close = msg.should_close
460 else:
461 self._tail = data[start_pos:]
462 data = EMPTY
463 break
465 # no parser, just store
466 elif self._payload_parser is None and self._upgraded:
467 assert not self._lines
468 break
470 # feed payload
471 elif data and start_pos < data_len:
472 assert not self._lines
473 assert self._payload_parser is not None
474 try:
475 eof, data = self._payload_parser.feed_data(data[start_pos:], SEP)
476 except BaseException as underlying_exc:
477 reraised_exc = underlying_exc
478 if self.payload_exception is not None:
479 reraised_exc = self.payload_exception(str(underlying_exc))
481 set_exception(
482 self._payload_parser.payload,
483 reraised_exc,
484 underlying_exc,
485 )
487 eof = True
488 data = b""
489 if isinstance(
490 underlying_exc, (InvalidHeader, TransferEncodingError)
491 ):
492 raise
494 if eof:
495 start_pos = 0
496 data_len = len(data)
497 self._payload_parser = None
498 continue
499 else:
500 break
502 if data and start_pos < data_len:
503 data = data[start_pos:]
504 else:
505 data = EMPTY
507 return messages, self._upgraded, data
509 def parse_headers(
510 self, lines: List[bytes]
511 ) -> Tuple[
512 "CIMultiDictProxy[str]", RawHeaders, Optional[bool], Optional[str], bool, bool
513 ]:
514 """Parses RFC 5322 headers from a stream.
516 Line continuations are supported. Returns list of header name
517 and value pairs. Header name is in upper case.
518 """
519 headers, raw_headers = self._headers_parser.parse_headers(lines)
520 close_conn = None
521 encoding = None
522 upgrade = False
523 chunked = False
525 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-6
526 # https://www.rfc-editor.org/rfc/rfc9110.html#name-collected-abnf
527 singletons = (
528 hdrs.CONTENT_LENGTH,
529 hdrs.CONTENT_LOCATION,
530 hdrs.CONTENT_RANGE,
531 hdrs.CONTENT_TYPE,
532 hdrs.ETAG,
533 hdrs.HOST,
534 hdrs.MAX_FORWARDS,
535 hdrs.SERVER,
536 hdrs.TRANSFER_ENCODING,
537 hdrs.USER_AGENT,
538 )
539 bad_hdr = next((h for h in singletons if len(headers.getall(h, ())) > 1), None)
540 if bad_hdr is not None:
541 raise BadHttpMessage(f"Duplicate '{bad_hdr}' header found.")
543 # keep-alive
544 conn = headers.get(hdrs.CONNECTION)
545 if conn:
546 v = conn.lower()
547 if v == "close":
548 close_conn = True
549 elif v == "keep-alive":
550 close_conn = False
551 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
552 elif v == "upgrade" and headers.get(hdrs.UPGRADE):
553 upgrade = True
555 # encoding
556 enc = headers.get(hdrs.CONTENT_ENCODING, "")
557 if enc.isascii() and enc.lower() in {"gzip", "deflate", "br", "zstd"}:
558 encoding = enc
560 # chunking
561 te = headers.get(hdrs.TRANSFER_ENCODING)
562 if te is not None:
563 if self._is_chunked_te(te):
564 chunked = True
566 if hdrs.CONTENT_LENGTH in headers:
567 raise BadHttpMessage(
568 "Transfer-Encoding can't be present with Content-Length",
569 )
571 return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
573 def set_upgraded(self, val: bool) -> None:
574 """Set connection upgraded (to websocket) mode.
576 :param bool val: new state.
577 """
578 self._upgraded = val
581class HttpRequestParser(HttpParser[RawRequestMessage]):
582 """Read request status line.
584 Exception .http_exceptions.BadStatusLine
585 could be raised in case of any errors in status line.
586 Returns RawRequestMessage.
587 """
589 def parse_message(self, lines: List[bytes]) -> RawRequestMessage:
590 # request line
591 line = lines[0].decode("utf-8", "surrogateescape")
592 try:
593 method, path, version = line.split(" ", maxsplit=2)
594 except ValueError:
595 raise BadHttpMethod(line) from None
597 if len(path) > self.max_line_size:
598 raise LineTooLong(
599 "Status line is too long", str(self.max_line_size), str(len(path))
600 )
602 # method
603 if not TOKENRE.fullmatch(method):
604 raise BadHttpMethod(method)
606 # version
607 match = VERSRE.fullmatch(version)
608 if match is None:
609 raise BadStatusLine(line)
610 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
612 if method == "CONNECT":
613 # authority-form,
614 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
615 url = URL.build(authority=path, encoded=True)
616 elif path.startswith("/"):
617 # origin-form,
618 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
619 path_part, _hash_separator, url_fragment = path.partition("#")
620 path_part, _question_mark_separator, qs_part = path_part.partition("?")
622 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
623 # NOTE: parser does, otherwise it results into the same
624 # NOTE: HTTP Request-Line input producing different
625 # NOTE: `yarl.URL()` objects
626 url = URL.build(
627 path=path_part,
628 query_string=qs_part,
629 fragment=url_fragment,
630 encoded=True,
631 )
632 elif path == "*" and method == "OPTIONS":
633 # asterisk-form,
634 url = URL(path, encoded=True)
635 else:
636 # absolute-form for proxy maybe,
637 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
638 url = URL(path, encoded=True)
639 if url.scheme == "":
640 # not absolute-form
641 raise InvalidURLError(
642 path.encode(errors="surrogateescape").decode("latin1")
643 )
645 # read headers
646 (
647 headers,
648 raw_headers,
649 close,
650 compression,
651 upgrade,
652 chunked,
653 ) = self.parse_headers(lines[1:])
655 if close is None: # then the headers weren't set in the request
656 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
657 close = True
658 else: # HTTP 1.1 must ask to close.
659 close = False
661 return RawRequestMessage(
662 method,
663 path,
664 version_o,
665 headers,
666 raw_headers,
667 close,
668 compression,
669 upgrade,
670 chunked,
671 url,
672 )
674 def _is_chunked_te(self, te: str) -> bool:
675 te = te.rsplit(",", maxsplit=1)[-1].strip(" \t")
676 # .lower() transforms some non-ascii chars, so must check first.
677 if te.isascii() and te.lower() == "chunked":
678 return True
679 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3
680 raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
683class HttpResponseParser(HttpParser[RawResponseMessage]):
684 """Read response status line and headers.
686 BadStatusLine could be raised in case of any errors in status line.
687 Returns RawResponseMessage.
688 """
690 # Lax mode should only be enabled on response parser.
691 lax = not DEBUG
693 def feed_data(
694 self,
695 data: bytes,
696 SEP: Optional[_SEP] = None,
697 *args: Any,
698 **kwargs: Any,
699 ) -> Tuple[List[Tuple[RawResponseMessage, StreamReader]], bool, bytes]:
700 if SEP is None:
701 SEP = b"\r\n" if DEBUG else b"\n"
702 return super().feed_data(data, SEP, *args, **kwargs)
704 def parse_message(self, lines: List[bytes]) -> RawResponseMessage:
705 line = lines[0].decode("utf-8", "surrogateescape")
706 try:
707 version, status = line.split(maxsplit=1)
708 except ValueError:
709 raise BadStatusLine(line) from None
711 try:
712 status, reason = status.split(maxsplit=1)
713 except ValueError:
714 status = status.strip()
715 reason = ""
717 if len(reason) > self.max_line_size:
718 raise LineTooLong(
719 "Status line is too long", str(self.max_line_size), str(len(reason))
720 )
722 # version
723 match = VERSRE.fullmatch(version)
724 if match is None:
725 raise BadStatusLine(line)
726 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
728 # The status code is a three-digit ASCII number, no padding
729 if len(status) != 3 or not DIGITS.fullmatch(status):
730 raise BadStatusLine(line)
731 status_i = int(status)
733 # read headers
734 (
735 headers,
736 raw_headers,
737 close,
738 compression,
739 upgrade,
740 chunked,
741 ) = self.parse_headers(lines[1:])
743 if close is None:
744 if version_o <= HttpVersion10:
745 close = True
746 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
747 elif 100 <= status_i < 200 or status_i in {204, 304}:
748 close = False
749 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
750 close = False
751 else:
752 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
753 close = True
755 return RawResponseMessage(
756 version_o,
757 status_i,
758 reason.strip(),
759 headers,
760 raw_headers,
761 close,
762 compression,
763 upgrade,
764 chunked,
765 )
767 def _is_chunked_te(self, te: str) -> bool:
768 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2
769 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked"
772class HttpPayloadParser:
773 def __init__(
774 self,
775 payload: StreamReader,
776 length: Optional[int] = None,
777 chunked: bool = False,
778 compression: Optional[str] = None,
779 code: Optional[int] = None,
780 method: Optional[str] = None,
781 response_with_body: bool = True,
782 auto_decompress: bool = True,
783 lax: bool = False,
784 *,
785 headers_parser: HeadersParser,
786 ) -> None:
787 self._length = 0
788 self._type = ParseState.PARSE_UNTIL_EOF
789 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
790 self._chunk_size = 0
791 self._chunk_tail = b""
792 self._auto_decompress = auto_decompress
793 self._lax = lax
794 self._headers_parser = headers_parser
795 self._trailer_lines: list[bytes] = []
796 self.done = False
798 # payload decompression wrapper
799 if response_with_body and compression and self._auto_decompress:
800 real_payload: Union[StreamReader, DeflateBuffer] = DeflateBuffer(
801 payload, compression
802 )
803 else:
804 real_payload = payload
806 # payload parser
807 if not response_with_body:
808 # don't parse payload if it's not expected to be received
809 self._type = ParseState.PARSE_NONE
810 real_payload.feed_eof()
811 self.done = True
812 elif chunked:
813 self._type = ParseState.PARSE_CHUNKED
814 elif length is not None:
815 self._type = ParseState.PARSE_LENGTH
816 self._length = length
817 if self._length == 0:
818 real_payload.feed_eof()
819 self.done = True
821 self.payload = real_payload
823 def feed_eof(self) -> None:
824 if self._type == ParseState.PARSE_UNTIL_EOF:
825 self.payload.feed_eof()
826 elif self._type == ParseState.PARSE_LENGTH:
827 raise ContentLengthError(
828 "Not enough data to satisfy content length header."
829 )
830 elif self._type == ParseState.PARSE_CHUNKED:
831 raise TransferEncodingError(
832 "Not enough data to satisfy transfer length header."
833 )
835 def feed_data(
836 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
837 ) -> Tuple[bool, bytes]:
838 # Read specified amount of bytes
839 if self._type == ParseState.PARSE_LENGTH:
840 required = self._length
841 chunk_len = len(chunk)
843 if required >= chunk_len:
844 self._length = required - chunk_len
845 self.payload.feed_data(chunk, chunk_len)
846 if self._length == 0:
847 self.payload.feed_eof()
848 return True, b""
849 else:
850 self._length = 0
851 self.payload.feed_data(chunk[:required], required)
852 self.payload.feed_eof()
853 return True, chunk[required:]
855 # Chunked transfer encoding parser
856 elif self._type == ParseState.PARSE_CHUNKED:
857 if self._chunk_tail:
858 chunk = self._chunk_tail + chunk
859 self._chunk_tail = b""
861 while chunk:
863 # read next chunk size
864 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
865 pos = chunk.find(SEP)
866 if pos >= 0:
867 i = chunk.find(CHUNK_EXT, 0, pos)
868 if i >= 0:
869 size_b = chunk[:i] # strip chunk-extensions
870 # Verify no LF in the chunk-extension
871 if b"\n" in (ext := chunk[i:pos]):
872 exc = TransferEncodingError(
873 f"Unexpected LF in chunk-extension: {ext!r}"
874 )
875 set_exception(self.payload, exc)
876 raise exc
877 else:
878 size_b = chunk[:pos]
880 if self._lax: # Allow whitespace in lax mode.
881 size_b = size_b.strip()
883 if not re.fullmatch(HEXDIGITS, size_b):
884 exc = TransferEncodingError(
885 chunk[:pos].decode("ascii", "surrogateescape")
886 )
887 set_exception(self.payload, exc)
888 raise exc
889 size = int(bytes(size_b), 16)
891 chunk = chunk[pos + len(SEP) :]
892 if size == 0: # eof marker
893 self._chunk = ChunkState.PARSE_TRAILERS
894 if self._lax and chunk.startswith(b"\r"):
895 chunk = chunk[1:]
896 else:
897 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
898 self._chunk_size = size
899 self.payload.begin_http_chunk_receiving()
900 else:
901 self._chunk_tail = chunk
902 return False, b""
904 # read chunk and feed buffer
905 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
906 required = self._chunk_size
907 chunk_len = len(chunk)
909 if required > chunk_len:
910 self._chunk_size = required - chunk_len
911 self.payload.feed_data(chunk, chunk_len)
912 return False, b""
913 else:
914 self._chunk_size = 0
915 self.payload.feed_data(chunk[:required], required)
916 chunk = chunk[required:]
917 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
918 self.payload.end_http_chunk_receiving()
920 # toss the CRLF at the end of the chunk
921 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
922 if self._lax and chunk.startswith(b"\r"):
923 chunk = chunk[1:]
924 if chunk[: len(SEP)] == SEP:
925 chunk = chunk[len(SEP) :]
926 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
927 else:
928 self._chunk_tail = chunk
929 return False, b""
931 if self._chunk == ChunkState.PARSE_TRAILERS:
932 pos = chunk.find(SEP)
933 if pos < 0: # No line found
934 self._chunk_tail = chunk
935 return False, b""
937 line = chunk[:pos]
938 chunk = chunk[pos + len(SEP) :]
939 if SEP == b"\n": # For lax response parsing
940 line = line.rstrip(b"\r")
941 self._trailer_lines.append(line)
943 # \r\n\r\n found, end of stream
944 if self._trailer_lines[-1] == b"":
945 # Headers and trailers are defined the same way,
946 # so we reuse the HeadersParser here.
947 try:
948 trailers, raw_trailers = self._headers_parser.parse_headers(
949 self._trailer_lines
950 )
951 finally:
952 self._trailer_lines.clear()
953 self.payload.feed_eof()
954 return True, chunk
956 # Read all bytes until eof
957 elif self._type == ParseState.PARSE_UNTIL_EOF:
958 self.payload.feed_data(chunk, len(chunk))
960 return False, b""
963class DeflateBuffer:
964 """DeflateStream decompress stream and feed data into specified stream."""
966 decompressor: Any
968 def __init__(
969 self,
970 out: StreamReader,
971 encoding: Optional[str],
972 max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE,
973 ) -> None:
974 self.out = out
975 self.size = 0
976 out.total_compressed_bytes = self.size
977 self.encoding = encoding
978 self._started_decoding = False
980 self.decompressor: Union[BrotliDecompressor, ZLibDecompressor, ZSTDDecompressor]
981 if encoding == "br":
982 if not HAS_BROTLI: # pragma: no cover
983 raise ContentEncodingError(
984 "Can not decode content-encoding: brotli (br). "
985 "Please install `Brotli`"
986 )
987 self.decompressor = BrotliDecompressor()
988 elif encoding == "zstd":
989 if not HAS_ZSTD:
990 raise ContentEncodingError(
991 "Can not decode content-encoding: zstandard (zstd). "
992 "Please install `backports.zstd`"
993 )
994 self.decompressor = ZSTDDecompressor()
995 else:
996 self.decompressor = ZLibDecompressor(encoding=encoding)
998 self._max_decompress_size = max_decompress_size
1000 def set_exception(
1001 self,
1002 exc: BaseException,
1003 exc_cause: BaseException = _EXC_SENTINEL,
1004 ) -> None:
1005 set_exception(self.out, exc, exc_cause)
1007 def feed_data(self, chunk: bytes, size: int) -> None:
1008 if not size:
1009 return
1011 self.size += size
1012 self.out.total_compressed_bytes = self.size
1014 # RFC1950
1015 # bits 0..3 = CM = 0b1000 = 8 = "deflate"
1016 # bits 4..7 = CINFO = 1..7 = windows size.
1017 if (
1018 not self._started_decoding
1019 and self.encoding == "deflate"
1020 and chunk[0] & 0xF != 8
1021 ):
1022 # Change the decoder to decompress incorrectly compressed data
1023 # Actually we should issue a warning about non-RFC-compliant data.
1024 self.decompressor = ZLibDecompressor(
1025 encoding=self.encoding, suppress_deflate_header=True
1026 )
1028 try:
1029 # Decompress with limit + 1 so we can detect if output exceeds limit
1030 chunk = self.decompressor.decompress_sync(
1031 chunk, max_length=self._max_decompress_size + 1
1032 )
1033 except Exception:
1034 raise ContentEncodingError(
1035 "Can not decode content-encoding: %s" % self.encoding
1036 )
1038 self._started_decoding = True
1040 # Check if decompression limit was exceeded
1041 if len(chunk) > self._max_decompress_size:
1042 raise DecompressSizeError(
1043 "Decompressed data exceeds the configured limit of %d bytes"
1044 % self._max_decompress_size
1045 )
1047 if chunk:
1048 self.out.feed_data(chunk, len(chunk))
1050 def feed_eof(self) -> None:
1051 chunk = self.decompressor.flush()
1053 if chunk or self.size > 0:
1054 self.out.feed_data(chunk, len(chunk))
1055 if self.encoding == "deflate" and not self.decompressor.eof:
1056 raise ContentEncodingError("deflate")
1058 self.out.feed_eof()
1060 def begin_http_chunk_receiving(self) -> None:
1061 self.out.begin_http_chunk_receiving()
1063 def end_http_chunk_receiving(self) -> None:
1064 self.out.end_http_chunk_receiving()
1067HttpRequestParserPy = HttpRequestParser
1068HttpResponseParserPy = HttpResponseParser
1069RawRequestMessagePy = RawRequestMessage
1070RawResponseMessagePy = RawResponseMessage
1072try:
1073 if not NO_EXTENSIONS:
1074 from ._http_parser import ( # type: ignore[import-not-found,no-redef]
1075 HttpRequestParser,
1076 HttpResponseParser,
1077 RawRequestMessage,
1078 RawResponseMessage,
1079 )
1081 HttpRequestParserC = HttpRequestParser
1082 HttpResponseParserC = HttpResponseParser
1083 RawRequestMessageC = RawRequestMessage
1084 RawResponseMessageC = RawResponseMessage
1085except ImportError: # pragma: no cover
1086 pass