1import abc
2import asyncio
3import re
4import string
5from contextlib import suppress
6from enum import IntEnum
7from typing import (
8 Any,
9 ClassVar,
10 Final,
11 Generic,
12 List,
13 Literal,
14 NamedTuple,
15 Optional,
16 Pattern,
17 Set,
18 Tuple,
19 Type,
20 TypeVar,
21 Union,
22)
23
24from multidict import CIMultiDict, CIMultiDictProxy, istr
25from yarl import URL
26
27from . import hdrs
28from .base_protocol import BaseProtocol
29from .compression_utils import HAS_BROTLI, BrotliDecompressor, ZLibDecompressor
30from .helpers import (
31 _EXC_SENTINEL,
32 DEBUG,
33 NO_EXTENSIONS,
34 BaseTimerContext,
35 method_must_be_empty_body,
36 set_exception,
37 status_code_must_be_empty_body,
38)
39from .http_exceptions import (
40 BadHttpMessage,
41 BadStatusLine,
42 ContentEncodingError,
43 ContentLengthError,
44 InvalidHeader,
45 InvalidURLError,
46 LineTooLong,
47 TransferEncodingError,
48)
49from .http_writer import HttpVersion, HttpVersion10
50from .log import internal_logger
51from .streams import EMPTY_PAYLOAD, StreamReader
52from .typedefs import RawHeaders
53
54__all__ = (
55 "HeadersParser",
56 "HttpParser",
57 "HttpRequestParser",
58 "HttpResponseParser",
59 "RawRequestMessage",
60 "RawResponseMessage",
61)
62
63_SEP = Literal[b"\r\n", b"\n"]
64
65ASCIISET: Final[Set[str]] = set(string.printable)
66
67# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
68# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
69#
70# method = token
71# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
72# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
73# token = 1*tchar
74_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
75TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
76VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
77DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
78HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
79
80
81class RawRequestMessage(NamedTuple):
82 method: str
83 path: str
84 version: HttpVersion
85 headers: "CIMultiDictProxy[str]"
86 raw_headers: RawHeaders
87 should_close: bool
88 compression: Optional[str]
89 upgrade: bool
90 chunked: bool
91 url: URL
92
93
94class RawResponseMessage(NamedTuple):
95 version: HttpVersion
96 code: int
97 reason: str
98 headers: CIMultiDictProxy[str]
99 raw_headers: RawHeaders
100 should_close: bool
101 compression: Optional[str]
102 upgrade: bool
103 chunked: bool
104
105
106_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
107
108
109class ParseState(IntEnum):
110
111 PARSE_NONE = 0
112 PARSE_LENGTH = 1
113 PARSE_CHUNKED = 2
114 PARSE_UNTIL_EOF = 3
115
116
117class ChunkState(IntEnum):
118 PARSE_CHUNKED_SIZE = 0
119 PARSE_CHUNKED_CHUNK = 1
120 PARSE_CHUNKED_CHUNK_EOF = 2
121 PARSE_MAYBE_TRAILERS = 3
122 PARSE_TRAILERS = 4
123
124
125class HeadersParser:
126 def __init__(
127 self,
128 max_line_size: int = 8190,
129 max_headers: int = 32768,
130 max_field_size: int = 8190,
131 lax: bool = False,
132 ) -> None:
133 self.max_line_size = max_line_size
134 self.max_headers = max_headers
135 self.max_field_size = max_field_size
136 self._lax = lax
137
138 def parse_headers(
139 self, lines: List[bytes]
140 ) -> Tuple["CIMultiDictProxy[str]", RawHeaders]:
141 headers: CIMultiDict[str] = CIMultiDict()
142 # note: "raw" does not mean inclusion of OWS before/after the field value
143 raw_headers = []
144
145 lines_idx = 1
146 line = lines[1]
147 line_count = len(lines)
148
149 while line:
150 # Parse initial header name : value pair.
151 try:
152 bname, bvalue = line.split(b":", 1)
153 except ValueError:
154 raise InvalidHeader(line) from None
155
156 if len(bname) == 0:
157 raise InvalidHeader(bname)
158
159 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
160 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
161 raise InvalidHeader(line)
162
163 bvalue = bvalue.lstrip(b" \t")
164 if len(bname) > self.max_field_size:
165 raise LineTooLong(
166 "request header name {}".format(
167 bname.decode("utf8", "backslashreplace")
168 ),
169 str(self.max_field_size),
170 str(len(bname)),
171 )
172 name = bname.decode("utf-8", "surrogateescape")
173 if not TOKENRE.fullmatch(name):
174 raise InvalidHeader(bname)
175
176 header_length = len(bvalue)
177
178 # next line
179 lines_idx += 1
180 line = lines[lines_idx]
181
182 # consume continuation lines
183 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
184
185 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
186 if continuation:
187 bvalue_lst = [bvalue]
188 while continuation:
189 header_length += len(line)
190 if header_length > self.max_field_size:
191 raise LineTooLong(
192 "request header field {}".format(
193 bname.decode("utf8", "backslashreplace")
194 ),
195 str(self.max_field_size),
196 str(header_length),
197 )
198 bvalue_lst.append(line)
199
200 # next line
201 lines_idx += 1
202 if lines_idx < line_count:
203 line = lines[lines_idx]
204 if line:
205 continuation = line[0] in (32, 9) # (' ', '\t')
206 else:
207 line = b""
208 break
209 bvalue = b"".join(bvalue_lst)
210 else:
211 if header_length > self.max_field_size:
212 raise LineTooLong(
213 "request header field {}".format(
214 bname.decode("utf8", "backslashreplace")
215 ),
216 str(self.max_field_size),
217 str(header_length),
218 )
219
220 bvalue = bvalue.strip(b" \t")
221 value = bvalue.decode("utf-8", "surrogateescape")
222
223 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
224 if "\n" in value or "\r" in value or "\x00" in value:
225 raise InvalidHeader(bvalue)
226
227 headers.add(name, value)
228 raw_headers.append((bname, bvalue))
229
230 return (CIMultiDictProxy(headers), tuple(raw_headers))
231
232
233def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool:
234 """Check if the upgrade header is supported."""
235 return headers.get(hdrs.UPGRADE, "").lower() in {"tcp", "websocket"}
236
237
238class HttpParser(abc.ABC, Generic[_MsgT]):
239 lax: ClassVar[bool] = False
240
241 def __init__(
242 self,
243 protocol: Optional[BaseProtocol] = None,
244 loop: Optional[asyncio.AbstractEventLoop] = None,
245 limit: int = 2**16,
246 max_line_size: int = 8190,
247 max_headers: int = 32768,
248 max_field_size: int = 8190,
249 timer: Optional[BaseTimerContext] = None,
250 code: Optional[int] = None,
251 method: Optional[str] = None,
252 readall: bool = False,
253 payload_exception: Optional[Type[BaseException]] = None,
254 response_with_body: bool = True,
255 read_until_eof: bool = False,
256 auto_decompress: bool = True,
257 ) -> None:
258 self.protocol = protocol
259 self.loop = loop
260 self.max_line_size = max_line_size
261 self.max_headers = max_headers
262 self.max_field_size = max_field_size
263 self.timer = timer
264 self.code = code
265 self.method = method
266 self.readall = readall
267 self.payload_exception = payload_exception
268 self.response_with_body = response_with_body
269 self.read_until_eof = read_until_eof
270
271 self._lines: List[bytes] = []
272 self._tail = b""
273 self._upgraded = False
274 self._payload = None
275 self._payload_parser: Optional[HttpPayloadParser] = None
276 self._auto_decompress = auto_decompress
277 self._limit = limit
278 self._headers_parser = HeadersParser(
279 max_line_size, max_headers, max_field_size, self.lax
280 )
281
282 @abc.abstractmethod
283 def parse_message(self, lines: List[bytes]) -> _MsgT:
284 pass
285
286 def feed_eof(self) -> Optional[_MsgT]:
287 if self._payload_parser is not None:
288 self._payload_parser.feed_eof()
289 self._payload_parser = None
290 else:
291 # try to extract partial message
292 if self._tail:
293 self._lines.append(self._tail)
294
295 if self._lines:
296 if self._lines[-1] != "\r\n":
297 self._lines.append(b"")
298 with suppress(Exception):
299 return self.parse_message(self._lines)
300 return None
301
302 def feed_data(
303 self,
304 data: bytes,
305 SEP: _SEP = b"\r\n",
306 EMPTY: bytes = b"",
307 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
308 METH_CONNECT: str = hdrs.METH_CONNECT,
309 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
310 ) -> Tuple[List[Tuple[_MsgT, StreamReader]], bool, bytes]:
311
312 messages = []
313
314 if self._tail:
315 data, self._tail = self._tail + data, b""
316
317 data_len = len(data)
318 start_pos = 0
319 loop = self.loop
320
321 while start_pos < data_len:
322
323 # read HTTP message (request/response line + headers), \r\n\r\n
324 # and split by lines
325 if self._payload_parser is None and not self._upgraded:
326 pos = data.find(SEP, start_pos)
327 # consume \r\n
328 if pos == start_pos and not self._lines:
329 start_pos = pos + len(SEP)
330 continue
331
332 if pos >= start_pos:
333 # line found
334 line = data[start_pos:pos]
335 if SEP == b"\n": # For lax response parsing
336 line = line.rstrip(b"\r")
337 self._lines.append(line)
338 start_pos = pos + len(SEP)
339
340 # \r\n\r\n found
341 if self._lines[-1] == EMPTY:
342 try:
343 msg: _MsgT = self.parse_message(self._lines)
344 finally:
345 self._lines.clear()
346
347 def get_content_length() -> Optional[int]:
348 # payload length
349 length_hdr = msg.headers.get(CONTENT_LENGTH)
350 if length_hdr is None:
351 return None
352
353 # Shouldn't allow +/- or other number formats.
354 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
355 # msg.headers is already stripped of leading/trailing wsp
356 if not DIGITS.fullmatch(length_hdr):
357 raise InvalidHeader(CONTENT_LENGTH)
358
359 return int(length_hdr)
360
361 length = get_content_length()
362 # do not support old websocket spec
363 if SEC_WEBSOCKET_KEY1 in msg.headers:
364 raise InvalidHeader(SEC_WEBSOCKET_KEY1)
365
366 self._upgraded = msg.upgrade and _is_supported_upgrade(
367 msg.headers
368 )
369
370 method = getattr(msg, "method", self.method)
371 # code is only present on responses
372 code = getattr(msg, "code", 0)
373
374 assert self.protocol is not None
375 # calculate payload
376 empty_body = status_code_must_be_empty_body(code) or bool(
377 method and method_must_be_empty_body(method)
378 )
379 if not empty_body and (
380 ((length is not None and length > 0) or msg.chunked)
381 and not self._upgraded
382 ):
383 payload = StreamReader(
384 self.protocol,
385 timer=self.timer,
386 loop=loop,
387 limit=self._limit,
388 )
389 payload_parser = HttpPayloadParser(
390 payload,
391 length=length,
392 chunked=msg.chunked,
393 method=method,
394 compression=msg.compression,
395 code=self.code,
396 readall=self.readall,
397 response_with_body=self.response_with_body,
398 auto_decompress=self._auto_decompress,
399 lax=self.lax,
400 )
401 if not payload_parser.done:
402 self._payload_parser = payload_parser
403 elif method == METH_CONNECT:
404 assert isinstance(msg, RawRequestMessage)
405 payload = StreamReader(
406 self.protocol,
407 timer=self.timer,
408 loop=loop,
409 limit=self._limit,
410 )
411 self._upgraded = True
412 self._payload_parser = HttpPayloadParser(
413 payload,
414 method=msg.method,
415 compression=msg.compression,
416 readall=True,
417 auto_decompress=self._auto_decompress,
418 lax=self.lax,
419 )
420 elif not empty_body and length is None and self.read_until_eof:
421 payload = StreamReader(
422 self.protocol,
423 timer=self.timer,
424 loop=loop,
425 limit=self._limit,
426 )
427 payload_parser = HttpPayloadParser(
428 payload,
429 length=length,
430 chunked=msg.chunked,
431 method=method,
432 compression=msg.compression,
433 code=self.code,
434 readall=True,
435 response_with_body=self.response_with_body,
436 auto_decompress=self._auto_decompress,
437 lax=self.lax,
438 )
439 if not payload_parser.done:
440 self._payload_parser = payload_parser
441 else:
442 payload = EMPTY_PAYLOAD
443
444 messages.append((msg, payload))
445 else:
446 self._tail = data[start_pos:]
447 data = EMPTY
448 break
449
450 # no parser, just store
451 elif self._payload_parser is None and self._upgraded:
452 assert not self._lines
453 break
454
455 # feed payload
456 elif data and start_pos < data_len:
457 assert not self._lines
458 assert self._payload_parser is not None
459 try:
460 eof, data = self._payload_parser.feed_data(data[start_pos:], SEP)
461 except BaseException as underlying_exc:
462 reraised_exc = underlying_exc
463 if self.payload_exception is not None:
464 reraised_exc = self.payload_exception(str(underlying_exc))
465
466 set_exception(
467 self._payload_parser.payload,
468 reraised_exc,
469 underlying_exc,
470 )
471
472 eof = True
473 data = b""
474
475 if eof:
476 start_pos = 0
477 data_len = len(data)
478 self._payload_parser = None
479 continue
480 else:
481 break
482
483 if data and start_pos < data_len:
484 data = data[start_pos:]
485 else:
486 data = EMPTY
487
488 return messages, self._upgraded, data
489
490 def parse_headers(
491 self, lines: List[bytes]
492 ) -> Tuple[
493 "CIMultiDictProxy[str]", RawHeaders, Optional[bool], Optional[str], bool, bool
494 ]:
495 """Parses RFC 5322 headers from a stream.
496
497 Line continuations are supported. Returns list of header name
498 and value pairs. Header name is in upper case.
499 """
500 headers, raw_headers = self._headers_parser.parse_headers(lines)
501 close_conn = None
502 encoding = None
503 upgrade = False
504 chunked = False
505
506 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-6
507 # https://www.rfc-editor.org/rfc/rfc9110.html#name-collected-abnf
508 singletons = (
509 hdrs.CONTENT_LENGTH,
510 hdrs.CONTENT_LOCATION,
511 hdrs.CONTENT_RANGE,
512 hdrs.CONTENT_TYPE,
513 hdrs.ETAG,
514 hdrs.HOST,
515 hdrs.MAX_FORWARDS,
516 hdrs.SERVER,
517 hdrs.TRANSFER_ENCODING,
518 hdrs.USER_AGENT,
519 )
520 bad_hdr = next((h for h in singletons if len(headers.getall(h, ())) > 1), None)
521 if bad_hdr is not None:
522 raise BadHttpMessage(f"Duplicate '{bad_hdr}' header found.")
523
524 # keep-alive
525 conn = headers.get(hdrs.CONNECTION)
526 if conn:
527 v = conn.lower()
528 if v == "close":
529 close_conn = True
530 elif v == "keep-alive":
531 close_conn = False
532 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
533 elif v == "upgrade" and headers.get(hdrs.UPGRADE):
534 upgrade = True
535
536 # encoding
537 enc = headers.get(hdrs.CONTENT_ENCODING)
538 if enc:
539 enc = enc.lower()
540 if enc in ("gzip", "deflate", "br"):
541 encoding = enc
542
543 # chunking
544 te = headers.get(hdrs.TRANSFER_ENCODING)
545 if te is not None:
546 if "chunked" == te.lower():
547 chunked = True
548 else:
549 raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
550
551 if hdrs.CONTENT_LENGTH in headers:
552 raise BadHttpMessage(
553 "Transfer-Encoding can't be present with Content-Length",
554 )
555
556 return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
557
558 def set_upgraded(self, val: bool) -> None:
559 """Set connection upgraded (to websocket) mode.
560
561 :param bool val: new state.
562 """
563 self._upgraded = val
564
565
566class HttpRequestParser(HttpParser[RawRequestMessage]):
567 """Read request status line.
568
569 Exception .http_exceptions.BadStatusLine
570 could be raised in case of any errors in status line.
571 Returns RawRequestMessage.
572 """
573
574 def parse_message(self, lines: List[bytes]) -> RawRequestMessage:
575 # request line
576 line = lines[0].decode("utf-8", "surrogateescape")
577 try:
578 method, path, version = line.split(" ", maxsplit=2)
579 except ValueError:
580 raise BadStatusLine(line) from None
581
582 if len(path) > self.max_line_size:
583 raise LineTooLong(
584 "Status line is too long", str(self.max_line_size), str(len(path))
585 )
586
587 # method
588 if not TOKENRE.fullmatch(method):
589 raise BadStatusLine(method)
590
591 # version
592 match = VERSRE.fullmatch(version)
593 if match is None:
594 raise BadStatusLine(line)
595 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
596
597 if method == "CONNECT":
598 # authority-form,
599 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
600 url = URL.build(authority=path, encoded=True)
601 elif path.startswith("/"):
602 # origin-form,
603 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
604 path_part, _hash_separator, url_fragment = path.partition("#")
605 path_part, _question_mark_separator, qs_part = path_part.partition("?")
606
607 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
608 # NOTE: parser does, otherwise it results into the same
609 # NOTE: HTTP Request-Line input producing different
610 # NOTE: `yarl.URL()` objects
611 url = URL.build(
612 path=path_part,
613 query_string=qs_part,
614 fragment=url_fragment,
615 encoded=True,
616 )
617 elif path == "*" and method == "OPTIONS":
618 # asterisk-form,
619 url = URL(path, encoded=True)
620 else:
621 # absolute-form for proxy maybe,
622 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
623 url = URL(path, encoded=True)
624 if url.scheme == "":
625 # not absolute-form
626 raise InvalidURLError(
627 path.encode(errors="surrogateescape").decode("latin1")
628 )
629
630 # read headers
631 (
632 headers,
633 raw_headers,
634 close,
635 compression,
636 upgrade,
637 chunked,
638 ) = self.parse_headers(lines)
639
640 if close is None: # then the headers weren't set in the request
641 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
642 close = True
643 else: # HTTP 1.1 must ask to close.
644 close = False
645
646 return RawRequestMessage(
647 method,
648 path,
649 version_o,
650 headers,
651 raw_headers,
652 close,
653 compression,
654 upgrade,
655 chunked,
656 url,
657 )
658
659
660class HttpResponseParser(HttpParser[RawResponseMessage]):
661 """Read response status line and headers.
662
663 BadStatusLine could be raised in case of any errors in status line.
664 Returns RawResponseMessage.
665 """
666
667 # Lax mode should only be enabled on response parser.
668 lax = not DEBUG
669
670 def feed_data(
671 self,
672 data: bytes,
673 SEP: Optional[_SEP] = None,
674 *args: Any,
675 **kwargs: Any,
676 ) -> Tuple[List[Tuple[RawResponseMessage, StreamReader]], bool, bytes]:
677 if SEP is None:
678 SEP = b"\r\n" if DEBUG else b"\n"
679 return super().feed_data(data, SEP, *args, **kwargs)
680
681 def parse_message(self, lines: List[bytes]) -> RawResponseMessage:
682 line = lines[0].decode("utf-8", "surrogateescape")
683 try:
684 version, status = line.split(maxsplit=1)
685 except ValueError:
686 raise BadStatusLine(line) from None
687
688 try:
689 status, reason = status.split(maxsplit=1)
690 except ValueError:
691 status = status.strip()
692 reason = ""
693
694 if len(reason) > self.max_line_size:
695 raise LineTooLong(
696 "Status line is too long", str(self.max_line_size), str(len(reason))
697 )
698
699 # version
700 match = VERSRE.fullmatch(version)
701 if match is None:
702 raise BadStatusLine(line)
703 version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
704
705 # The status code is a three-digit ASCII number, no padding
706 if len(status) != 3 or not DIGITS.fullmatch(status):
707 raise BadStatusLine(line)
708 status_i = int(status)
709
710 # read headers
711 (
712 headers,
713 raw_headers,
714 close,
715 compression,
716 upgrade,
717 chunked,
718 ) = self.parse_headers(lines)
719
720 if close is None:
721 if version_o <= HttpVersion10:
722 close = True
723 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
724 elif 100 <= status_i < 200 or status_i in {204, 304}:
725 close = False
726 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
727 close = False
728 else:
729 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
730 close = True
731
732 return RawResponseMessage(
733 version_o,
734 status_i,
735 reason.strip(),
736 headers,
737 raw_headers,
738 close,
739 compression,
740 upgrade,
741 chunked,
742 )
743
744
745class HttpPayloadParser:
746 def __init__(
747 self,
748 payload: StreamReader,
749 length: Optional[int] = None,
750 chunked: bool = False,
751 compression: Optional[str] = None,
752 code: Optional[int] = None,
753 method: Optional[str] = None,
754 readall: bool = False,
755 response_with_body: bool = True,
756 auto_decompress: bool = True,
757 lax: bool = False,
758 ) -> None:
759 self._length = 0
760 self._type = ParseState.PARSE_NONE
761 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
762 self._chunk_size = 0
763 self._chunk_tail = b""
764 self._auto_decompress = auto_decompress
765 self._lax = lax
766 self.done = False
767
768 # payload decompression wrapper
769 if response_with_body and compression and self._auto_decompress:
770 real_payload: Union[StreamReader, DeflateBuffer] = DeflateBuffer(
771 payload, compression
772 )
773 else:
774 real_payload = payload
775
776 # payload parser
777 if not response_with_body:
778 # don't parse payload if it's not expected to be received
779 self._type = ParseState.PARSE_NONE
780 real_payload.feed_eof()
781 self.done = True
782
783 elif chunked:
784 self._type = ParseState.PARSE_CHUNKED
785 elif length is not None:
786 self._type = ParseState.PARSE_LENGTH
787 self._length = length
788 if self._length == 0:
789 real_payload.feed_eof()
790 self.done = True
791 else:
792 if readall and code != 204:
793 self._type = ParseState.PARSE_UNTIL_EOF
794 elif method in ("PUT", "POST"):
795 internal_logger.warning( # pragma: no cover
796 "Content-Length or Transfer-Encoding header is required"
797 )
798 self._type = ParseState.PARSE_NONE
799 real_payload.feed_eof()
800 self.done = True
801
802 self.payload = real_payload
803
804 def feed_eof(self) -> None:
805 if self._type == ParseState.PARSE_UNTIL_EOF:
806 self.payload.feed_eof()
807 elif self._type == ParseState.PARSE_LENGTH:
808 raise ContentLengthError(
809 "Not enough data for satisfy content length header."
810 )
811 elif self._type == ParseState.PARSE_CHUNKED:
812 raise TransferEncodingError(
813 "Not enough data for satisfy transfer length header."
814 )
815
816 def feed_data(
817 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
818 ) -> Tuple[bool, bytes]:
819 # Read specified amount of bytes
820 if self._type == ParseState.PARSE_LENGTH:
821 required = self._length
822 chunk_len = len(chunk)
823
824 if required >= chunk_len:
825 self._length = required - chunk_len
826 self.payload.feed_data(chunk, chunk_len)
827 if self._length == 0:
828 self.payload.feed_eof()
829 return True, b""
830 else:
831 self._length = 0
832 self.payload.feed_data(chunk[:required], required)
833 self.payload.feed_eof()
834 return True, chunk[required:]
835
836 # Chunked transfer encoding parser
837 elif self._type == ParseState.PARSE_CHUNKED:
838 if self._chunk_tail:
839 chunk = self._chunk_tail + chunk
840 self._chunk_tail = b""
841
842 while chunk:
843
844 # read next chunk size
845 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
846 pos = chunk.find(SEP)
847 if pos >= 0:
848 i = chunk.find(CHUNK_EXT, 0, pos)
849 if i >= 0:
850 size_b = chunk[:i] # strip chunk-extensions
851 else:
852 size_b = chunk[:pos]
853
854 if self._lax: # Allow whitespace in lax mode.
855 size_b = size_b.strip()
856
857 if not re.fullmatch(HEXDIGITS, size_b):
858 exc = TransferEncodingError(
859 chunk[:pos].decode("ascii", "surrogateescape")
860 )
861 set_exception(self.payload, exc)
862 raise exc
863 size = int(bytes(size_b), 16)
864
865 chunk = chunk[pos + len(SEP) :]
866 if size == 0: # eof marker
867 self._chunk = ChunkState.PARSE_MAYBE_TRAILERS
868 if self._lax and chunk.startswith(b"\r"):
869 chunk = chunk[1:]
870 else:
871 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
872 self._chunk_size = size
873 self.payload.begin_http_chunk_receiving()
874 else:
875 self._chunk_tail = chunk
876 return False, b""
877
878 # read chunk and feed buffer
879 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
880 required = self._chunk_size
881 chunk_len = len(chunk)
882
883 if required > chunk_len:
884 self._chunk_size = required - chunk_len
885 self.payload.feed_data(chunk, chunk_len)
886 return False, b""
887 else:
888 self._chunk_size = 0
889 self.payload.feed_data(chunk[:required], required)
890 chunk = chunk[required:]
891 if self._lax and chunk.startswith(b"\r"):
892 chunk = chunk[1:]
893 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
894 self.payload.end_http_chunk_receiving()
895
896 # toss the CRLF at the end of the chunk
897 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
898 if chunk[: len(SEP)] == SEP:
899 chunk = chunk[len(SEP) :]
900 self._chunk = ChunkState.PARSE_CHUNKED_SIZE
901 else:
902 self._chunk_tail = chunk
903 return False, b""
904
905 # if stream does not contain trailer, after 0\r\n
906 # we should get another \r\n otherwise
907 # trailers needs to be skipped until \r\n\r\n
908 if self._chunk == ChunkState.PARSE_MAYBE_TRAILERS:
909 head = chunk[: len(SEP)]
910 if head == SEP:
911 # end of stream
912 self.payload.feed_eof()
913 return True, chunk[len(SEP) :]
914 # Both CR and LF, or only LF may not be received yet. It is
915 # expected that CRLF or LF will be shown at the very first
916 # byte next time, otherwise trailers should come. The last
917 # CRLF which marks the end of response might not be
918 # contained in the same TCP segment which delivered the
919 # size indicator.
920 if not head:
921 return False, b""
922 if head == SEP[:1]:
923 self._chunk_tail = head
924 return False, b""
925 self._chunk = ChunkState.PARSE_TRAILERS
926
927 # read and discard trailer up to the CRLF terminator
928 if self._chunk == ChunkState.PARSE_TRAILERS:
929 pos = chunk.find(SEP)
930 if pos >= 0:
931 chunk = chunk[pos + len(SEP) :]
932 self._chunk = ChunkState.PARSE_MAYBE_TRAILERS
933 else:
934 self._chunk_tail = chunk
935 return False, b""
936
937 # Read all bytes until eof
938 elif self._type == ParseState.PARSE_UNTIL_EOF:
939 self.payload.feed_data(chunk, len(chunk))
940
941 return False, b""
942
943
944class DeflateBuffer:
945 """DeflateStream decompress stream and feed data into specified stream."""
946
947 decompressor: Any
948
949 def __init__(self, out: StreamReader, encoding: Optional[str]) -> None:
950 self.out = out
951 self.size = 0
952 self.encoding = encoding
953 self._started_decoding = False
954
955 self.decompressor: Union[BrotliDecompressor, ZLibDecompressor]
956 if encoding == "br":
957 if not HAS_BROTLI: # pragma: no cover
958 raise ContentEncodingError(
959 "Can not decode content-encoding: brotli (br). "
960 "Please install `Brotli`"
961 )
962 self.decompressor = BrotliDecompressor()
963 else:
964 self.decompressor = ZLibDecompressor(encoding=encoding)
965
966 def set_exception(
967 self,
968 exc: BaseException,
969 exc_cause: BaseException = _EXC_SENTINEL,
970 ) -> None:
971 set_exception(self.out, exc, exc_cause)
972
973 def feed_data(self, chunk: bytes, size: int) -> None:
974 if not size:
975 return
976
977 self.size += size
978
979 # RFC1950
980 # bits 0..3 = CM = 0b1000 = 8 = "deflate"
981 # bits 4..7 = CINFO = 1..7 = windows size.
982 if (
983 not self._started_decoding
984 and self.encoding == "deflate"
985 and chunk[0] & 0xF != 8
986 ):
987 # Change the decoder to decompress incorrectly compressed data
988 # Actually we should issue a warning about non-RFC-compliant data.
989 self.decompressor = ZLibDecompressor(
990 encoding=self.encoding, suppress_deflate_header=True
991 )
992
993 try:
994 chunk = self.decompressor.decompress_sync(chunk)
995 except Exception:
996 raise ContentEncodingError(
997 "Can not decode content-encoding: %s" % self.encoding
998 )
999
1000 self._started_decoding = True
1001
1002 if chunk:
1003 self.out.feed_data(chunk, len(chunk))
1004
1005 def feed_eof(self) -> None:
1006 chunk = self.decompressor.flush()
1007
1008 if chunk or self.size > 0:
1009 self.out.feed_data(chunk, len(chunk))
1010 if self.encoding == "deflate" and not self.decompressor.eof:
1011 raise ContentEncodingError("deflate")
1012
1013 self.out.feed_eof()
1014
1015 def begin_http_chunk_receiving(self) -> None:
1016 self.out.begin_http_chunk_receiving()
1017
1018 def end_http_chunk_receiving(self) -> None:
1019 self.out.end_http_chunk_receiving()
1020
1021
1022HttpRequestParserPy = HttpRequestParser
1023HttpResponseParserPy = HttpResponseParser
1024RawRequestMessagePy = RawRequestMessage
1025RawResponseMessagePy = RawResponseMessage
1026
1027try:
1028 if not NO_EXTENSIONS:
1029 from ._http_parser import ( # type: ignore[import-not-found,no-redef]
1030 HttpRequestParser,
1031 HttpResponseParser,
1032 RawRequestMessage,
1033 RawResponseMessage,
1034 )
1035
1036 HttpRequestParserC = HttpRequestParser
1037 HttpResponseParserC = HttpResponseParser
1038 RawRequestMessageC = RawRequestMessage
1039 RawResponseMessageC = RawResponseMessage
1040except ImportError: # pragma: no cover
1041 pass