1import abc
2import asyncio
3import re
4import string
5from contextlib import suppress
6from enum import IntEnum
7from typing import (
8 Any,
9 ClassVar,
10 Final,
11 Generic,
12 List,
13 Literal,
14 NamedTuple,
15 Optional,
16 Pattern,
17 Set,
18 Tuple,
19 Type,
20 TypeVar,
21 Union,
22)
23
24from multidict import CIMultiDict, CIMultiDictProxy, istr
25from yarl import URL
26
27from . import hdrs
28from .base_protocol import BaseProtocol
29from .compression_utils import (
30 HAS_BROTLI,
31 HAS_ZSTD,
32 BrotliDecompressor,
33 ZLibDecompressor,
34 ZSTDDecompressor,
35)
36from .helpers import (
37 _EXC_SENTINEL,
38 DEBUG,
39 EMPTY_BODY_METHODS,
40 EMPTY_BODY_STATUS_CODES,
41 NO_EXTENSIONS,
42 BaseTimerContext,
43 set_exception,
44)
45from .http_exceptions import (
46 BadHttpMessage,
47 BadHttpMethod,
48 BadStatusLine,
49 ContentEncodingError,
50 ContentLengthError,
51 InvalidHeader,
52 InvalidURLError,
53 LineTooLong,
54 TransferEncodingError,
55)
56from .http_writer import HttpVersion, HttpVersion10
57from .streams import EMPTY_PAYLOAD, StreamReader
58from .typedefs import RawHeaders
59
60__all__ = (
61 "HeadersParser",
62 "HttpParser",
63 "HttpRequestParser",
64 "HttpResponseParser",
65 "RawRequestMessage",
66 "RawResponseMessage",
67)
68
69_SEP = Literal[b"\r\n", b"\n"]
70
71ASCIISET: Final[Set[str]] = set(string.printable)
72
73# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
74# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
75#
76# method = token
77# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
78# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
79# token = 1*tchar
80_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
81TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
82VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
83DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
84HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
85
86
87class RawRequestMessage(NamedTuple):
88 method: str
89 path: str
90 version: HttpVersion
91 headers: "CIMultiDictProxy[str]"
92 raw_headers: RawHeaders
93 should_close: bool
94 compression: Optional[str]
95 upgrade: bool
96 chunked: bool
97 url: URL
98
99
100class RawResponseMessage(NamedTuple):
101 version: HttpVersion
102 code: int
103 reason: str
104 headers: CIMultiDictProxy[str]
105 raw_headers: RawHeaders
106 should_close: bool
107 compression: Optional[str]
108 upgrade: bool
109 chunked: bool
110
111
112_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
113
114
115class ParseState(IntEnum):
116
117 PARSE_NONE = 0
118 PARSE_LENGTH = 1
119 PARSE_CHUNKED = 2
120 PARSE_UNTIL_EOF = 3
121
122
123class ChunkState(IntEnum):
124 PARSE_CHUNKED_SIZE = 0
125 PARSE_CHUNKED_CHUNK = 1
126 PARSE_CHUNKED_CHUNK_EOF = 2
127 PARSE_MAYBE_TRAILERS = 3
128 PARSE_TRAILERS = 4
129
130
131class HeadersParser:
132 def __init__(
133 self,
134 max_line_size: int = 8190,
135 max_headers: int = 32768,
136 max_field_size: int = 8190,
137 lax: bool = False,
138 ) -> None:
139 self.max_line_size = max_line_size
140 self.max_headers = max_headers
141 self.max_field_size = max_field_size
142 self._lax = lax
143
144 def parse_headers(
145 self, lines: List[bytes]
146 ) -> Tuple["CIMultiDictProxy[str]", RawHeaders]:
147 headers: CIMultiDict[str] = CIMultiDict()
148 # note: "raw" does not mean inclusion of OWS before/after the field value
149 raw_headers = []
150
151 lines_idx = 0
152 line = lines[lines_idx]
153 line_count = len(lines)
154
155 while line:
156 # Parse initial header name : value pair.
157 try:
158 bname, bvalue = line.split(b":", 1)
159 except ValueError:
160 raise InvalidHeader(line) from None
161
162 if len(bname) == 0:
163 raise InvalidHeader(bname)
164
165 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
166 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
167 raise InvalidHeader(line)
168
169 bvalue = bvalue.lstrip(b" \t")
170 if len(bname) > self.max_field_size:
171 raise LineTooLong(
172 "request header name {}".format(
173 bname.decode("utf8", "backslashreplace")
174 ),
175 str(self.max_field_size),
176 str(len(bname)),
177 )
178 name = bname.decode("utf-8", "surrogateescape")
179 if not TOKENRE.fullmatch(name):
180 raise InvalidHeader(bname)
181
182 header_length = len(bvalue)
183
184 # next line
185 lines_idx += 1
186 line = lines[lines_idx]
187
188 # consume continuation lines
189 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
190
191 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
192 if continuation:
193 bvalue_lst = [bvalue]
194 while continuation:
195 header_length += len(line)
196 if header_length > self.max_field_size:
197 raise LineTooLong(
198 "request header field {}".format(
199 bname.decode("utf8", "backslashreplace")
200 ),
201 str(self.max_field_size),
202 str(header_length),
203 )
204 bvalue_lst.append(line)
205
206 # next line
207 lines_idx += 1
208 if lines_idx < line_count:
209 line = lines[lines_idx]
210 if line:
211 continuation = line[0] in (32, 9) # (' ', '\t')
212 else:
213 line = b""
214 break
215 bvalue = b"".join(bvalue_lst)
216 else:
217 if header_length > self.max_field_size:
218 raise LineTooLong(
219 "request header field {}".format(
220 bname.decode("utf8", "backslashreplace")
221 ),
222 str(self.max_field_size),
223 str(header_length),
224 )
225
226 bvalue = bvalue.strip(b" \t")
227 value = bvalue.decode("utf-8", "surrogateescape")
228
229 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
230 if "\n" in value or "\r" in value or "\x00" in value:
231 raise InvalidHeader(bvalue)
232
233 headers.add(name, value)
234 raw_headers.append((bname, bvalue))
235
236 return (CIMultiDictProxy(headers), tuple(raw_headers))
237
238
239def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool:
240 """Check if the upgrade header is supported."""
241 return headers.get(hdrs.UPGRADE, "").lower() in {"tcp", "websocket"}
242
243
244class HttpParser(abc.ABC, Generic[_MsgT]):
245 lax: ClassVar[bool] = False
246
247 def __init__(
248 self,
249 protocol: Optional[BaseProtocol] = None,
250 loop: Optional[asyncio.AbstractEventLoop] = None,
251 limit: int = 2**16,
252 max_line_size: int = 8190,
253 max_headers: int = 32768,
254 max_field_size: int = 8190,
255 timer: Optional[BaseTimerContext] = None,
256 code: Optional[int] = None,
257 method: Optional[str] = None,
258 payload_exception: Optional[Type[BaseException]] = None,
259 response_with_body: bool = True,
260 read_until_eof: bool = False,
261 auto_decompress: bool = True,
262 ) -> None:
263 self.protocol = protocol
264 self.loop = loop
265 self.max_line_size = max_line_size
266 self.max_headers = max_headers
267 self.max_field_size = max_field_size
268 self.timer = timer
269 self.code = code
270 self.method = method
271 self.payload_exception = payload_exception
272 self.response_with_body = response_with_body
273 self.read_until_eof = read_until_eof
274
275 self._lines: List[bytes] = []
276 self._tail = b""
277 self._upgraded = False
278 self._payload = None
279 self._payload_parser: Optional[HttpPayloadParser] = None
280 self._auto_decompress = auto_decompress
281 self._limit = limit
282 self._headers_parser = HeadersParser(
283 max_line_size, max_headers, max_field_size, self.lax
284 )
285
286 @abc.abstractmethod
287 def parse_message(self, lines: List[bytes]) -> _MsgT: ...
288
289 @abc.abstractmethod
290 def _is_chunked_te(self, te: str) -> bool: ...
291
292 def feed_eof(self) -> Optional[_MsgT]:
293 if self._payload_parser is not None:
294 self._payload_parser.feed_eof()
295 self._payload_parser = None
296 else:
297 # try to extract partial message
298 if self._tail:
299 self._lines.append(self._tail)
300
301 if self._lines:
302 if self._lines[-1] != "\r\n":
303 self._lines.append(b"")
304 with suppress(Exception):
305 return self.parse_message(self._lines)
306 return None
307
308 def feed_data(
309 self,
310 data: bytes,
311 SEP: _SEP = b"\r\n",
312 EMPTY: bytes = b"",
313 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
314 METH_CONNECT: str = hdrs.METH_CONNECT,
315 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
316 ) -> Tuple[List[Tuple[_MsgT, StreamReader]], bool, bytes]:
317
318 messages = []
319
320 if self._tail:
321 data, self._tail = self._tail + data, b""
322
323 data_len = len(data)
324 start_pos = 0
325 loop = self.loop
326
327 should_close = False
328 while start_pos < data_len:
329
330 # read HTTP message (request/response line + headers), \r\n\r\n
331 # and split by lines
332 if self._payload_parser is None and not self._upgraded:
333 pos = data.find(SEP, start_pos)
334 # consume \r\n
335 if pos == start_pos and not self._lines:
336 start_pos = pos + len(SEP)
337 continue
338
339 if pos >= start_pos:
340 if should_close:
341 raise BadHttpMessage("Data after `Connection: close`")
342
343 # line found
344 line = data[start_pos:pos]
345 if SEP == b"\n": # For lax response parsing
346 line = line.rstrip(b"\r")
347 self._lines.append(line)
348 start_pos = pos + len(SEP)
349
350 # \r\n\r\n found
351 if self._lines[-1] == EMPTY:
352 try:
353 msg: _MsgT = self.parse_message(self._lines)
354 finally:
355 self._lines.clear()
356
357 def get_content_length() -> Optional[int]:
358 # payload length
359 length_hdr = msg.headers.get(CONTENT_LENGTH)
360 if length_hdr is None:
361 return None
362
363 # Shouldn't allow +/- or other number formats.
364 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
365 # msg.headers is already stripped of leading/trailing wsp
366 if not DIGITS.fullmatch(length_hdr):
367 raise InvalidHeader(CONTENT_LENGTH)
368
369 return int(length_hdr)
370
371 length = get_content_length()
372 # do not support old websocket spec
373 if SEC_WEBSOCKET_KEY1 in msg.headers:
374 raise InvalidHeader(SEC_WEBSOCKET_KEY1)
375
376 self._upgraded = msg.upgrade and _is_supported_upgrade(
377 msg.headers
378 )
379
380 method = getattr(msg, "method", self.method)
381 # code is only present on responses
382 code = getattr(msg, "code", 0)
383
384 assert self.protocol is not None
385 # calculate payload
386 empty_body = code in EMPTY_BODY_STATUS_CODES or bool(
387 method and method in EMPTY_BODY_METHODS
388 )
389 if not empty_body and (
390 ((length is not None and length > 0) or msg.chunked)
391 and not self._upgraded
392 ):
393 payload = StreamReader(
394 self.protocol,
395 timer=self.timer,
396 loop=loop,
397 limit=self._limit,
398 )
399 payload_parser = HttpPayloadParser(
400 payload,
401 length=length,
402 chunked=msg.chunked,
403 method=method,
404 compression=msg.compression,
405 code=self.code,
406 response_with_body=self.response_with_body,
407 auto_decompress=self._auto_decompress,
408 lax=self.lax,
409 headers_parser=self._headers_parser,
410 )
411 if not payload_parser.done:
412 self._payload_parser = payload_parser
413 elif method == METH_CONNECT:
414 assert isinstance(msg, RawRequestMessage)
415 payload = StreamReader(
416 self.protocol,
417 timer=self.timer,
418 loop=loop,
419 limit=self._limit,
420 )
421 self._upgraded = True
422 self._payload_parser = HttpPayloadParser(
423 payload,
424 method=msg.method,
425 compression=msg.compression,
426 auto_decompress=self._auto_decompress,
427 lax=self.lax,
428 headers_parser=self._headers_parser,
429 )
430 elif not empty_body and length is None and self.read_until_eof:
431 payload = StreamReader(
432 self.protocol,
433 timer=self.timer,
434 loop=loop,
435 limit=self._limit,
436 )
437 payload_parser = HttpPayloadParser(
438 payload,
439 length=length,
440 chunked=msg.chunked,
441 method=method,
442 compression=msg.compression,
443 code=self.code,
444 response_with_body=self.response_with_body,
445 auto_decompress=self._auto_decompress,
446 lax=self.lax,
447 headers_parser=self._headers_parser,
448 )
449 if not payload_parser.done:
450 self._payload_parser = payload_parser
451 else:
452 payload = EMPTY_PAYLOAD
453
454 messages.append((msg, payload))
455 should_close = msg.should_close
456 else:
457 self._tail = data[start_pos:]
458 data = EMPTY
459 break
460
461 # no parser, just store
462 elif self._payload_parser is None and self._upgraded:
463 assert not self._lines
464 break
465
466 # feed payload
467 elif data and start_pos < data_len:
468 assert not self._lines
469 assert self._payload_parser is not None
470 try:
471 eof, data = self._payload_parser.feed_data(data[start_pos:], SEP)
472 except BaseException as underlying_exc:
473 reraised_exc = underlying_exc
474 if self.payload_exception is not None:
475 reraised_exc = self.payload_exception(str(underlying_exc))
476
477 set_exception(
478 self._payload_parser.payload,
479 reraised_exc,
480 underlying_exc,
481 )
482
483 eof = True
484 data = b""
485 if isinstance(
486 underlying_exc, (InvalidHeader, TransferEncodingError)
487 ):
488 raise
489
490 if eof:
491 start_pos = 0
492 data_len = len(data)
493 self._payload_parser = None
494 continue
495 else:
496 break
497
498 if data and start_pos < data_len:
499 data = data[start_pos:]
500 else:
501 data = EMPTY
502
503 return messages, self._upgraded, data
504
505 def parse_headers(
506 self, lines: List[bytes]
507 ) -> Tuple[
508 "CIMultiDictProxy[str]", RawHeaders, Optional[bool], Optional[str], bool, bool
509 ]:
510 """Parses RFC 5322 headers from a stream.
511
512 Line continuations are supported. Returns list of header name
513 and value pairs. Header name is in upper case.
514 """
515 headers, raw_headers = self._headers_parser.parse_headers(lines)
516 close_conn = None
517 encoding = None
518 upgrade = False
519 chunked = False
520
521 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-6
522 # https://www.rfc-editor.org/rfc/rfc9110.html#name-collected-abnf
523 singletons = (
524 hdrs.CONTENT_LENGTH,
525 hdrs.CONTENT_LOCATION,
526 hdrs.CONTENT_RANGE,
527 hdrs.CONTENT_TYPE,
528 hdrs.ETAG,
529 hdrs.HOST,
530 hdrs.MAX_FORWARDS,
531 hdrs.SERVER,
532 hdrs.TRANSFER_ENCODING,
533 hdrs.USER_AGENT,
534 )
535 bad_hdr = next((h for h in singletons if len(headers.getall(h, ())) > 1), None)
536 if bad_hdr is not None:
537 raise BadHttpMessage(f"Duplicate '{bad_hdr}' header found.")
538
539 # keep-alive
540 conn = headers.get(hdrs.CONNECTION)
541 if conn:
542 v = conn.lower()
543 if v == "close":
544 close_conn = True
545 elif v == "keep-alive":
546 close_conn = False
547 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
548 elif v == "upgrade" and headers.get(hdrs.UPGRADE):
549 upgrade = True
550
551 # encoding
552 enc = headers.get(hdrs.CONTENT_ENCODING)
553 if enc:
554 enc = enc.lower()
555 if enc in ("gzip", "deflate", "br", "zstd"):
556 encoding = enc
557
558 # chunking
559 te = headers.get(hdrs.TRANSFER_ENCODING)
560 if te is not None:
561 if self._is_chunked_te(te):
562 chunked = True
563
564 if hdrs.CONTENT_LENGTH in headers:
565 raise BadHttpMessage(
566 "Transfer-Encoding can't be present with Content-Length",
567 )
568
569 return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
570
571 def set_upgraded(self, val: bool) -> None:
572 """Set connection upgraded (to websocket) mode.
573
574 :param bool val: new state.
575 """
576 self._upgraded = val
577
578
579class HttpRequestParser(HttpParser[RawRequestMessage]):
580 """Read request status line.
581
582 Exception .http_exceptions.BadStatusLine
583 could be raised in case of any errors in status line.
584 Returns RawRequestMessage.
585 """
586
587 def parse_message(self, lines: List[bytes]) -> RawRequestMessage:
588 # request line
589 line = lines[0].decode("utf-8", "surrogateescape")
590 try:
591 method, path, version = line.split(" ", maxsplit=2)
592 except ValueError:
593 raise BadHttpMethod(line) from None
594
595 if len(path) > self.max_line_size:
596 raise LineTooLong(
597 "Status line is too long", str(self.max_line_size), str(len(path))
598 )
599
600 # method
601 if not TOKENRE.fullmatch(method):
602 raise BadHttpMethod(method)
603
604 # version
605 match = VERSRE.fullmatch(version)
606 if match is None:
607 raise BadStatusLine(line)
608 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
609
610 if method == "CONNECT":
611 # authority-form,
612 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
613 url = URL.build(authority=path, encoded=True)
614 elif path.startswith("/"):
615 # origin-form,
616 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
617 path_part, _hash_separator, url_fragment = path.partition("#")
618 path_part, _question_mark_separator, qs_part = path_part.partition("?")
619
620 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
621 # NOTE: parser does, otherwise it results into the same
622 # NOTE: HTTP Request-Line input producing different
623 # NOTE: `yarl.URL()` objects
624 url = URL.build(
625 path=path_part,
626 query_string=qs_part,
627 fragment=url_fragment,
628 encoded=True,
629 )
630 elif path == "*" and method == "OPTIONS":
631 # asterisk-form,
632 url = URL(path, encoded=True)
633 else:
634 # absolute-form for proxy maybe,
635 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
636 url = URL(path, encoded=True)
637 if url.scheme == "":
638 # not absolute-form
639 raise InvalidURLError(
640 path.encode(errors="surrogateescape").decode("latin1")
641 )
642
643 # read headers
644 (
645 headers,
646 raw_headers,
647 close,
648 compression,
649 upgrade,
650 chunked,
651 ) = self.parse_headers(lines[1:])
652
653 if close is None: # then the headers weren't set in the request
654 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
655 close = True
656 else: # HTTP 1.1 must ask to close.
657 close = False
658
659 return RawRequestMessage(
660 method,
661 path,
662 version_o,
663 headers,
664 raw_headers,
665 close,
666 compression,
667 upgrade,
668 chunked,
669 url,
670 )
671
672 def _is_chunked_te(self, te: str) -> bool:
673 if te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked":
674 return True
675 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3
676 raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
677
678
679class HttpResponseParser(HttpParser[RawResponseMessage]):
680 """Read response status line and headers.
681
682 BadStatusLine could be raised in case of any errors in status line.
683 Returns RawResponseMessage.
684 """
685
686 # Lax mode should only be enabled on response parser.
687 lax = not DEBUG
688
689 def feed_data(
690 self,
691 data: bytes,
692 SEP: Optional[_SEP] = None,
693 *args: Any,
694 **kwargs: Any,
695 ) -> Tuple[List[Tuple[RawResponseMessage, StreamReader]], bool, bytes]:
696 if SEP is None:
697 SEP = b"\r\n" if DEBUG else b"\n"
698 return super().feed_data(data, SEP, *args, **kwargs)
699
700 def parse_message(self, lines: List[bytes]) -> RawResponseMessage:
701 line = lines[0].decode("utf-8", "surrogateescape")
702 try:
703 version, status = line.split(maxsplit=1)
704 except ValueError:
705 raise BadStatusLine(line) from None
706
707 try:
708 status, reason = status.split(maxsplit=1)
709 except ValueError:
710 status = status.strip()
711 reason = ""
712
713 if len(reason) > self.max_line_size:
714 raise LineTooLong(
715 "Status line is too long", str(self.max_line_size), str(len(reason))
716 )
717
718 # version
719 match = VERSRE.fullmatch(version)
720 if match is None:
721 raise BadStatusLine(line)
722 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
723
724 # The status code is a three-digit ASCII number, no padding
725 if len(status) != 3 or not DIGITS.fullmatch(status):
726 raise BadStatusLine(line)
727 status_i = int(status)
728
729 # read headers
730 (
731 headers,
732 raw_headers,
733 close,
734 compression,
735 upgrade,
736 chunked,
737 ) = self.parse_headers(lines[1:])
738
739 if close is None:
740 if version_o <= HttpVersion10:
741 close = True
742 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
743 elif 100 <= status_i < 200 or status_i in {204, 304}:
744 close = False
745 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
746 close = False
747 else:
748 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
749 close = True
750
751 return RawResponseMessage(
752 version_o,
753 status_i,
754 reason.strip(),
755 headers,
756 raw_headers,
757 close,
758 compression,
759 upgrade,
760 chunked,
761 )
762
763 def _is_chunked_te(self, te: str) -> bool:
764 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2
765 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked"
766
767
768class HttpPayloadParser:
769 def __init__(
770 self,
771 payload: StreamReader,
772 length: Optional[int] = None,
773 chunked: bool = False,
774 compression: Optional[str] = None,
775 code: Optional[int] = None,
776 method: Optional[str] = None,
777 response_with_body: bool = True,
778 auto_decompress: bool = True,
779 lax: bool = False,
780 *,
781 headers_parser: HeadersParser,
782 ) -> None:
783 self._length = 0
784 self._type = ParseState.PARSE_UNTIL_EOF
785 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
786 self._chunk_size = 0
787 self._chunk_tail = b""
788 self._auto_decompress = auto_decompress
789 self._lax = lax
790 self._headers_parser = headers_parser
791 self._trailer_lines: list[bytes] = []
792 self.done = False
793
794 # payload decompression wrapper
795 if response_with_body and compression and self._auto_decompress:
796 real_payload: Union[StreamReader, DeflateBuffer] = DeflateBuffer(
797 payload, compression
798 )
799 else:
800 real_payload = payload
801
802 # payload parser
803 if not response_with_body:
804 # don't parse payload if it's not expected to be received
805 self._type = ParseState.PARSE_NONE
806 real_payload.feed_eof()
807 self.done = True
808 elif chunked:
809 self._type = ParseState.PARSE_CHUNKED
810 elif length is not None:
811 self._type = ParseState.PARSE_LENGTH
812 self._length = length
813 if self._length == 0:
814 real_payload.feed_eof()
815 self.done = True
816
817 self.payload = real_payload
818
819 def feed_eof(self) -> None:
820 if self._type == ParseState.PARSE_UNTIL_EOF:
821 self.payload.feed_eof()
822 elif self._type == ParseState.PARSE_LENGTH:
823 raise ContentLengthError(
824 "Not enough data to satisfy content length header."
825 )
826 elif self._type == ParseState.PARSE_CHUNKED:
827 raise TransferEncodingError(
828 "Not enough data to satisfy transfer length header."
829 )
830
831 def feed_data(
832 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
833 ) -> Tuple[bool, bytes]:
834 # Read specified amount of bytes
835 if self._type == ParseState.PARSE_LENGTH:
836 required = self._length
837 chunk_len = len(chunk)
838
839 if required >= chunk_len:
840 self._length = required - chunk_len
841 self.payload.feed_data(chunk, chunk_len)
842 if self._length == 0:
843 self.payload.feed_eof()
844 return True, b""
845 else:
846 self._length = 0
847 self.payload.feed_data(chunk[:required], required)
848 self.payload.feed_eof()
849 return True, chunk[required:]
850
851 # Chunked transfer encoding parser
852 elif self._type == ParseState.PARSE_CHUNKED:
853 if self._chunk_tail:
854 chunk = self._chunk_tail + chunk
855 self._chunk_tail = b""
856
857 while chunk:
858
859 # read next chunk size
860 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
861 pos = chunk.find(SEP)
862 if pos >= 0:
863 i = chunk.find(CHUNK_EXT, 0, pos)
864 if i >= 0:
865 size_b = chunk[:i] # strip chunk-extensions
866 # Verify no LF in the chunk-extension
867 if b"\n" in (ext := chunk[i:pos]):
868 exc = TransferEncodingError(
869 f"Unexpected LF in chunk-extension: {ext!r}"
870 )
871 set_exception(self.payload, exc)
872 raise exc
873 else:
874 size_b = chunk[:pos]
875
876 if self._lax: # Allow whitespace in lax mode.
877 size_b = size_b.strip()
878
879 if not re.fullmatch(HEXDIGITS, size_b):
880 exc = TransferEncodingError(
881 chunk[:pos].decode("ascii", "surrogateescape")
882 )
883 set_exception(self.payload, exc)
884 raise exc
885 size = int(bytes(size_b), 16)
886
887 chunk = chunk[pos + len(SEP) :]
888 if size == 0: # eof marker
889 self._chunk = ChunkState.PARSE_TRAILERS
890 if self._lax and chunk.startswith(b"\r"):
891 chunk = chunk[1:]
892 else:
893 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
894 self._chunk_size = size
895 self.payload.begin_http_chunk_receiving()
896 else:
897 self._chunk_tail = chunk
898 return False, b""
899
900 # read chunk and feed buffer
901 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
902 required = self._chunk_size
903 chunk_len = len(chunk)
904
905 if required > chunk_len:
906 self._chunk_size = required - chunk_len
907 self.payload.feed_data(chunk, chunk_len)
908 return False, b""
909 else:
910 self._chunk_size = 0
911 self.payload.feed_data(chunk[:required], required)
912 chunk = chunk[required:]
913 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
914 self.payload.end_http_chunk_receiving()
915
916 # toss the CRLF at the end of the chunk
917 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
918 if self._lax and chunk.startswith(b"\r"):
919 chunk = chunk[1:]
920 if chunk[: len(SEP)] == SEP:
921 chunk = chunk[len(SEP) :]
922 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
923 else:
924 self._chunk_tail = chunk
925 return False, b""
926
927 if self._chunk == ChunkState.PARSE_TRAILERS:
928 pos = chunk.find(SEP)
929 if pos < 0: # No line found
930 self._chunk_tail = chunk
931 return False, b""
932
933 line = chunk[:pos]
934 chunk = chunk[pos + len(SEP) :]
935 if SEP == b"\n": # For lax response parsing
936 line = line.rstrip(b"\r")
937 self._trailer_lines.append(line)
938
939 # \r\n\r\n found, end of stream
940 if self._trailer_lines[-1] == b"":
941 # Headers and trailers are defined the same way,
942 # so we reuse the HeadersParser here.
943 try:
944 trailers, raw_trailers = self._headers_parser.parse_headers(
945 self._trailer_lines
946 )
947 finally:
948 self._trailer_lines.clear()
949 self.payload.feed_eof()
950 return True, chunk
951
952 # Read all bytes until eof
953 elif self._type == ParseState.PARSE_UNTIL_EOF:
954 self.payload.feed_data(chunk, len(chunk))
955
956 return False, b""
957
958
959class DeflateBuffer:
960 """DeflateStream decompress stream and feed data into specified stream."""
961
962 decompressor: Any
963
964 def __init__(self, out: StreamReader, encoding: Optional[str]) -> None:
965 self.out = out
966 self.size = 0
967 out.total_compressed_bytes = self.size
968 self.encoding = encoding
969 self._started_decoding = False
970
971 self.decompressor: Union[BrotliDecompressor, ZLibDecompressor, ZSTDDecompressor]
972 if encoding == "br":
973 if not HAS_BROTLI: # pragma: no cover
974 raise ContentEncodingError(
975 "Can not decode content-encoding: brotli (br). "
976 "Please install `Brotli`"
977 )
978 self.decompressor = BrotliDecompressor()
979 elif encoding == "zstd":
980 if not HAS_ZSTD:
981 raise ContentEncodingError(
982 "Can not decode content-encoding: zstandard (zstd). "
983 "Please install `zstandard`"
984 )
985 self.decompressor = ZSTDDecompressor()
986 else:
987 self.decompressor = ZLibDecompressor(encoding=encoding)
988
989 def set_exception(
990 self,
991 exc: BaseException,
992 exc_cause: BaseException = _EXC_SENTINEL,
993 ) -> None:
994 set_exception(self.out, exc, exc_cause)
995
996 def feed_data(self, chunk: bytes, size: int) -> None:
997 if not size:
998 return
999
1000 self.size += size
1001 self.out.total_compressed_bytes = self.size
1002
1003 # RFC1950
1004 # bits 0..3 = CM = 0b1000 = 8 = "deflate"
1005 # bits 4..7 = CINFO = 1..7 = windows size.
1006 if (
1007 not self._started_decoding
1008 and self.encoding == "deflate"
1009 and chunk[0] & 0xF != 8
1010 ):
1011 # Change the decoder to decompress incorrectly compressed data
1012 # Actually we should issue a warning about non-RFC-compliant data.
1013 self.decompressor = ZLibDecompressor(
1014 encoding=self.encoding, suppress_deflate_header=True
1015 )
1016
1017 try:
1018 chunk = self.decompressor.decompress_sync(chunk)
1019 except Exception:
1020 raise ContentEncodingError(
1021 "Can not decode content-encoding: %s" % self.encoding
1022 )
1023
1024 self._started_decoding = True
1025
1026 if chunk:
1027 self.out.feed_data(chunk, len(chunk))
1028
1029 def feed_eof(self) -> None:
1030 chunk = self.decompressor.flush()
1031
1032 if chunk or self.size > 0:
1033 self.out.feed_data(chunk, len(chunk))
1034 if self.encoding == "deflate" and not self.decompressor.eof:
1035 raise ContentEncodingError("deflate")
1036
1037 self.out.feed_eof()
1038
1039 def begin_http_chunk_receiving(self) -> None:
1040 self.out.begin_http_chunk_receiving()
1041
1042 def end_http_chunk_receiving(self) -> None:
1043 self.out.end_http_chunk_receiving()
1044
1045
1046HttpRequestParserPy = HttpRequestParser
1047HttpResponseParserPy = HttpResponseParser
1048RawRequestMessagePy = RawRequestMessage
1049RawResponseMessagePy = RawResponseMessage
1050
1051try:
1052 if not NO_EXTENSIONS:
1053 from ._http_parser import ( # type: ignore[import-not-found,no-redef]
1054 HttpRequestParser,
1055 HttpResponseParser,
1056 RawRequestMessage,
1057 RawResponseMessage,
1058 )
1059
1060 HttpRequestParserC = HttpRequestParser
1061 HttpResponseParserC = HttpResponseParser
1062 RawRequestMessageC = RawRequestMessage
1063 RawResponseMessageC = RawResponseMessage
1064except ImportError: # pragma: no cover
1065 pass