Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_parser.py: 85%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

543 statements  

1import abc 

2import asyncio 

3import re 

4import string 

5from contextlib import suppress 

6from enum import IntEnum 

7from re import Pattern 

8from typing import Any, ClassVar, Final, Generic, Literal, NamedTuple, TypeVar 

9 

10from multidict import CIMultiDict, CIMultiDictProxy, istr 

11from yarl import URL 

12 

13from . import hdrs 

14from .base_protocol import BaseProtocol 

15from .compression_utils import ( 

16 DEFAULT_MAX_DECOMPRESS_SIZE, 

17 HAS_BROTLI, 

18 HAS_ZSTD, 

19 BrotliDecompressor, 

20 ZLibDecompressor, 

21 ZSTDDecompressor, 

22) 

23from .helpers import ( 

24 _EXC_SENTINEL, 

25 DEBUG, 

26 EMPTY_BODY_METHODS, 

27 EMPTY_BODY_STATUS_CODES, 

28 NO_EXTENSIONS, 

29 BaseTimerContext, 

30 set_exception, 

31) 

32from .http_exceptions import ( 

33 BadHttpMessage, 

34 BadHttpMethod, 

35 BadStatusLine, 

36 ContentEncodingError, 

37 ContentLengthError, 

38 DecompressSizeError, 

39 InvalidHeader, 

40 InvalidURLError, 

41 LineTooLong, 

42 TransferEncodingError, 

43) 

44from .http_writer import HttpVersion, HttpVersion10 

45from .streams import EMPTY_PAYLOAD, StreamReader 

46from .typedefs import RawHeaders 

47 

48__all__ = ( 

49 "HeadersParser", 

50 "HttpParser", 

51 "HttpRequestParser", 

52 "HttpResponseParser", 

53 "RawRequestMessage", 

54 "RawResponseMessage", 

55) 

56 

57_SEP = Literal[b"\r\n", b"\n"] 

58 

59ASCIISET: Final[set[str]] = set(string.printable) 

60 

61# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview 

62# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens 

63# 

64# method = token 

65# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." / 

66# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA 

67# token = 1*tchar 

68_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~") 

69TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+") 

70# https://www.rfc-editor.org/rfc/rfc9110#section-5.5-5 

71_FIELD_VALUE_FORBIDDEN_CTL_RE: Final[Pattern[str]] = re.compile( 

72 r"[\x00-\x08\x0a-\x1f\x7f]" 

73) 

74VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII) 

75DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII) 

76HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+") 

77 

78# RFC 9110 singleton headers — duplicates are rejected in strict mode. 

79# In lax mode (response parser default), the check is skipped entirely 

80# since real-world servers (e.g. Google APIs, Werkzeug) commonly send 

81# duplicate headers like Content-Type or Server. 

82# Lowercased for case-insensitive matching against wire names. 

83SINGLETON_HEADERS: Final[frozenset[str]] = frozenset( 

84 { 

85 "content-length", 

86 "content-location", 

87 "content-range", 

88 "content-type", 

89 "etag", 

90 "host", 

91 "max-forwards", 

92 "server", 

93 "transfer-encoding", 

94 "user-agent", 

95 } 

96) 

97 

98 

99class RawRequestMessage(NamedTuple): 

100 method: str 

101 path: str 

102 version: HttpVersion 

103 headers: CIMultiDictProxy[str] 

104 raw_headers: RawHeaders 

105 should_close: bool 

106 compression: str | None 

107 upgrade: bool 

108 chunked: bool 

109 url: URL 

110 

111 

112class RawResponseMessage(NamedTuple): 

113 version: HttpVersion 

114 code: int 

115 reason: str 

116 headers: CIMultiDictProxy[str] 

117 raw_headers: RawHeaders 

118 should_close: bool 

119 compression: str | None 

120 upgrade: bool 

121 chunked: bool 

122 

123 

124_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage) 

125 

126 

127class ParseState(IntEnum): 

128 PARSE_NONE = 0 

129 PARSE_LENGTH = 1 

130 PARSE_CHUNKED = 2 

131 PARSE_UNTIL_EOF = 3 

132 

133 

134class ChunkState(IntEnum): 

135 PARSE_CHUNKED_SIZE = 0 

136 PARSE_CHUNKED_CHUNK = 1 

137 PARSE_CHUNKED_CHUNK_EOF = 2 

138 PARSE_TRAILERS = 4 

139 

140 

141class HeadersParser: 

142 def __init__(self, max_field_size: int = 8190, lax: bool = False) -> None: 

143 self.max_field_size = max_field_size 

144 self._lax = lax 

145 

146 def parse_headers( 

147 self, lines: list[bytes] 

148 ) -> tuple["CIMultiDictProxy[str]", RawHeaders]: 

149 headers: CIMultiDict[str] = CIMultiDict() 

150 # note: "raw" does not mean inclusion of OWS before/after the field value 

151 raw_headers = [] 

152 

153 lines_idx = 0 

154 line = lines[lines_idx] 

155 line_count = len(lines) 

156 

157 while line: 

158 # Parse initial header name : value pair. 

159 try: 

160 bname, bvalue = line.split(b":", 1) 

161 except ValueError: 

162 raise InvalidHeader(line) from None 

163 

164 if len(bname) == 0: 

165 raise InvalidHeader(bname) 

166 

167 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2 

168 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"} 

169 raise InvalidHeader(line) 

170 

171 bvalue = bvalue.lstrip(b" \t") 

172 name = bname.decode("utf-8", "surrogateescape") 

173 if not TOKENRE.fullmatch(name): 

174 raise InvalidHeader(bname) 

175 

176 # next line 

177 lines_idx += 1 

178 line = lines[lines_idx] 

179 

180 # consume continuation lines 

181 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t') 

182 

183 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding 

184 if continuation: 

185 header_length = len(bvalue) 

186 bvalue_lst = [bvalue] 

187 while continuation: 

188 header_length += len(line) 

189 if header_length > self.max_field_size: 

190 header_line = bname + b": " + b"".join(bvalue_lst) 

191 raise LineTooLong( 

192 header_line[:100] + b"...", self.max_field_size 

193 ) 

194 bvalue_lst.append(line) 

195 

196 # next line 

197 lines_idx += 1 

198 if lines_idx < line_count: 

199 line = lines[lines_idx] 

200 if line: 

201 continuation = line[0] in (32, 9) # (' ', '\t') 

202 else: 

203 line = b"" 

204 break 

205 bvalue = b"".join(bvalue_lst) 

206 

207 bvalue = bvalue.strip(b" \t") 

208 value = bvalue.decode("utf-8", "surrogateescape") 

209 

210 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5 

211 if self._lax: 

212 if "\n" in value or "\r" in value or "\x00" in value: 

213 raise InvalidHeader(bvalue) 

214 elif _FIELD_VALUE_FORBIDDEN_CTL_RE.search(value): 

215 raise InvalidHeader(bvalue) 

216 

217 if not self._lax and name in headers and name.lower() in SINGLETON_HEADERS: 

218 raise BadHttpMessage(f"Duplicate '{name}' header found.") 

219 headers.add(name, value) 

220 raw_headers.append((bname, bvalue)) 

221 

222 return (CIMultiDictProxy(headers), tuple(raw_headers)) 

223 

224 

225def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool: 

226 """Check if the upgrade header is supported.""" 

227 u = headers.get(hdrs.UPGRADE, "") 

228 # .lower() can transform non-ascii characters. 

229 return u.isascii() and u.lower() in {"tcp", "websocket"} 

230 

231 

232class HttpParser(abc.ABC, Generic[_MsgT]): 

233 lax: ClassVar[bool] = False 

234 

235 def __init__( 

236 self, 

237 protocol: BaseProtocol, 

238 loop: asyncio.AbstractEventLoop, 

239 limit: int, 

240 max_line_size: int = 8190, 

241 max_headers: int = 128, 

242 max_field_size: int = 8190, 

243 timer: BaseTimerContext | None = None, 

244 code: int | None = None, 

245 method: str | None = None, 

246 payload_exception: type[BaseException] | None = None, 

247 response_with_body: bool = True, 

248 read_until_eof: bool = False, 

249 auto_decompress: bool = True, 

250 ) -> None: 

251 self.protocol = protocol 

252 self.loop = loop 

253 self.max_line_size = max_line_size 

254 self.max_field_size = max_field_size 

255 self.max_headers = max_headers 

256 self.timer = timer 

257 self.code = code 

258 self.method = method 

259 self.payload_exception = payload_exception 

260 self.response_with_body = response_with_body 

261 self.read_until_eof = read_until_eof 

262 

263 self._lines: list[bytes] = [] 

264 self._tail = b"" 

265 self._upgraded = False 

266 self._payload = None 

267 self._payload_parser: HttpPayloadParser | None = None 

268 self._auto_decompress = auto_decompress 

269 self._limit = limit 

270 self._headers_parser = HeadersParser(max_field_size, self.lax) 

271 

272 @abc.abstractmethod 

273 def parse_message(self, lines: list[bytes]) -> _MsgT: ... 

274 

275 @abc.abstractmethod 

276 def _is_chunked_te(self, te: str) -> bool: ... 

277 

278 def feed_eof(self) -> _MsgT | None: 

279 if self._payload_parser is not None: 

280 self._payload_parser.feed_eof() 

281 self._payload_parser = None 

282 else: 

283 # try to extract partial message 

284 if self._tail: 

285 self._lines.append(self._tail) 

286 

287 if self._lines: 

288 if self._lines[-1] != "\r\n": 

289 self._lines.append(b"") 

290 with suppress(Exception): 

291 return self.parse_message(self._lines) 

292 return None 

293 

294 def feed_data( 

295 self, 

296 data: bytes, 

297 SEP: _SEP = b"\r\n", 

298 EMPTY: bytes = b"", 

299 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH, 

300 METH_CONNECT: str = hdrs.METH_CONNECT, 

301 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1, 

302 ) -> tuple[list[tuple[_MsgT, StreamReader]], bool, bytes]: 

303 messages = [] 

304 

305 if self._tail: 

306 data, self._tail = self._tail + data, b"" 

307 

308 data_len = len(data) 

309 start_pos = 0 

310 loop = self.loop 

311 max_line_length = self.max_line_size 

312 

313 should_close = False 

314 while start_pos < data_len: 

315 # read HTTP message (request/response line + headers), \r\n\r\n 

316 # and split by lines 

317 if self._payload_parser is None and not self._upgraded: 

318 pos = data.find(SEP, start_pos) 

319 # consume \r\n 

320 if pos == start_pos and not self._lines: 

321 start_pos = pos + len(SEP) 

322 continue 

323 

324 if pos >= start_pos: 

325 if should_close: 

326 raise BadHttpMessage("Data after `Connection: close`") 

327 

328 # line found 

329 line = data[start_pos:pos] 

330 if SEP == b"\n": # For lax response parsing 

331 line = line.rstrip(b"\r") 

332 if len(line) > max_line_length: 

333 raise LineTooLong(line[:100] + b"...", max_line_length) 

334 

335 self._lines.append(line) 

336 # After processing the status/request line, everything is a header. 

337 max_line_length = self.max_field_size 

338 

339 if len(self._lines) > self.max_headers: 

340 raise BadHttpMessage("Too many headers received") 

341 

342 start_pos = pos + len(SEP) 

343 

344 # \r\n\r\n found 

345 if self._lines[-1] == EMPTY: 

346 max_trailers = self.max_headers - len(self._lines) 

347 try: 

348 msg: _MsgT = self.parse_message(self._lines) 

349 finally: 

350 self._lines.clear() 

351 

352 def get_content_length() -> int | None: 

353 # payload length 

354 length_hdr = msg.headers.get(CONTENT_LENGTH) 

355 if length_hdr is None: 

356 return None 

357 

358 # Shouldn't allow +/- or other number formats. 

359 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2 

360 # msg.headers is already stripped of leading/trailing wsp 

361 if not DIGITS.fullmatch(length_hdr): 

362 raise InvalidHeader(CONTENT_LENGTH) 

363 

364 return int(length_hdr) 

365 

366 length = get_content_length() 

367 # do not support old websocket spec 

368 if SEC_WEBSOCKET_KEY1 in msg.headers: 

369 raise InvalidHeader(SEC_WEBSOCKET_KEY1) 

370 

371 self._upgraded = msg.upgrade and _is_supported_upgrade( 

372 msg.headers 

373 ) 

374 

375 method = getattr(msg, "method", self.method) 

376 # code is only present on responses 

377 code = getattr(msg, "code", 0) 

378 

379 assert self.protocol is not None 

380 # calculate payload 

381 empty_body = code in EMPTY_BODY_STATUS_CODES or bool( 

382 method and method in EMPTY_BODY_METHODS 

383 ) 

384 if not empty_body and ( 

385 ((length is not None and length > 0) or msg.chunked) 

386 and not self._upgraded 

387 ): 

388 payload = StreamReader( 

389 self.protocol, 

390 timer=self.timer, 

391 loop=loop, 

392 limit=self._limit, 

393 ) 

394 payload_parser = HttpPayloadParser( 

395 payload, 

396 length=length, 

397 chunked=msg.chunked, 

398 method=method, 

399 compression=msg.compression, 

400 code=self.code, 

401 response_with_body=self.response_with_body, 

402 auto_decompress=self._auto_decompress, 

403 lax=self.lax, 

404 headers_parser=self._headers_parser, 

405 max_line_size=self.max_line_size, 

406 max_field_size=self.max_field_size, 

407 max_trailers=max_trailers, 

408 ) 

409 if not payload_parser.done: 

410 self._payload_parser = payload_parser 

411 elif method == METH_CONNECT: 

412 assert isinstance(msg, RawRequestMessage) 

413 payload = StreamReader( 

414 self.protocol, 

415 timer=self.timer, 

416 loop=loop, 

417 limit=self._limit, 

418 ) 

419 self._upgraded = True 

420 self._payload_parser = HttpPayloadParser( 

421 payload, 

422 method=msg.method, 

423 compression=msg.compression, 

424 auto_decompress=self._auto_decompress, 

425 lax=self.lax, 

426 headers_parser=self._headers_parser, 

427 max_line_size=self.max_line_size, 

428 max_field_size=self.max_field_size, 

429 max_trailers=max_trailers, 

430 ) 

431 elif not empty_body and length is None and self.read_until_eof: 

432 payload = StreamReader( 

433 self.protocol, 

434 timer=self.timer, 

435 loop=loop, 

436 limit=self._limit, 

437 ) 

438 payload_parser = HttpPayloadParser( 

439 payload, 

440 length=length, 

441 chunked=msg.chunked, 

442 method=method, 

443 compression=msg.compression, 

444 code=self.code, 

445 response_with_body=self.response_with_body, 

446 auto_decompress=self._auto_decompress, 

447 lax=self.lax, 

448 headers_parser=self._headers_parser, 

449 max_line_size=self.max_line_size, 

450 max_field_size=self.max_field_size, 

451 max_trailers=max_trailers, 

452 ) 

453 if not payload_parser.done: 

454 self._payload_parser = payload_parser 

455 else: 

456 payload = EMPTY_PAYLOAD 

457 

458 messages.append((msg, payload)) 

459 should_close = msg.should_close 

460 else: 

461 self._tail = data[start_pos:] 

462 if len(self._tail) > self.max_line_size: 

463 raise LineTooLong(self._tail[:100] + b"...", self.max_line_size) 

464 data = EMPTY 

465 break 

466 

467 # no parser, just store 

468 elif self._payload_parser is None and self._upgraded: 

469 assert not self._lines 

470 break 

471 

472 # feed payload 

473 elif data and start_pos < data_len: 

474 assert not self._lines 

475 assert self._payload_parser is not None 

476 try: 

477 eof, data = self._payload_parser.feed_data(data[start_pos:], SEP) 

478 except Exception as underlying_exc: 

479 reraised_exc: BaseException = underlying_exc 

480 if self.payload_exception is not None: 

481 reraised_exc = self.payload_exception(str(underlying_exc)) 

482 

483 set_exception( 

484 self._payload_parser.payload, 

485 reraised_exc, 

486 underlying_exc, 

487 ) 

488 

489 eof = True 

490 data = b"" 

491 if isinstance( 

492 underlying_exc, (InvalidHeader, TransferEncodingError) 

493 ): 

494 raise 

495 

496 if eof: 

497 start_pos = 0 

498 data_len = len(data) 

499 self._payload_parser = None 

500 continue 

501 else: 

502 break 

503 

504 if data and start_pos < data_len: 

505 data = data[start_pos:] 

506 else: 

507 data = EMPTY 

508 

509 return messages, self._upgraded, data 

510 

511 def parse_headers( 

512 self, lines: list[bytes] 

513 ) -> tuple[ 

514 "CIMultiDictProxy[str]", RawHeaders, bool | None, str | None, bool, bool 

515 ]: 

516 """Parses RFC 5322 headers from a stream. 

517 

518 Line continuations are supported. Returns list of header name 

519 and value pairs. Header name is in upper case. 

520 """ 

521 headers, raw_headers = self._headers_parser.parse_headers(lines) 

522 close_conn = None 

523 encoding = None 

524 upgrade = False 

525 chunked = False 

526 

527 # keep-alive and protocol switching 

528 # RFC 9110 section 7.6.1 defines Connection as a comma-separated list. 

529 conn_values = headers.getall(hdrs.CONNECTION, ()) 

530 if conn_values: 

531 conn_tokens = { 

532 token.lower() 

533 for conn_value in conn_values 

534 for token in (part.strip(" \t") for part in conn_value.split(",")) 

535 if token and token.isascii() 

536 } 

537 

538 if "close" in conn_tokens: 

539 close_conn = True 

540 elif "keep-alive" in conn_tokens: 

541 close_conn = False 

542 

543 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols 

544 if "upgrade" in conn_tokens and headers.get(hdrs.UPGRADE): 

545 upgrade = True 

546 

547 # encoding 

548 enc = headers.get(hdrs.CONTENT_ENCODING, "") 

549 if enc.isascii() and enc.lower() in {"gzip", "deflate", "br", "zstd"}: 

550 encoding = enc 

551 

552 # chunking 

553 te = headers.get(hdrs.TRANSFER_ENCODING) 

554 if te is not None: 

555 if self._is_chunked_te(te): 

556 chunked = True 

557 

558 if hdrs.CONTENT_LENGTH in headers: 

559 raise BadHttpMessage( 

560 "Transfer-Encoding can't be present with Content-Length", 

561 ) 

562 

563 return (headers, raw_headers, close_conn, encoding, upgrade, chunked) 

564 

565 def set_upgraded(self, val: bool) -> None: 

566 """Set connection upgraded (to websocket) mode. 

567 

568 :param bool val: new state. 

569 """ 

570 self._upgraded = val 

571 

572 

573class HttpRequestParser(HttpParser[RawRequestMessage]): 

574 """Read request status line. 

575 

576 Exception .http_exceptions.BadStatusLine 

577 could be raised in case of any errors in status line. 

578 Returns RawRequestMessage. 

579 """ 

580 

581 def parse_message(self, lines: list[bytes]) -> RawRequestMessage: 

582 # request line 

583 line = lines[0].decode("utf-8", "surrogateescape") 

584 try: 

585 method, path, version = line.split(" ", maxsplit=2) 

586 except ValueError: 

587 raise BadHttpMethod(line) from None 

588 

589 # method 

590 if not TOKENRE.fullmatch(method): 

591 raise BadHttpMethod(method) 

592 

593 # version 

594 match = VERSRE.fullmatch(version) 

595 if match is None: 

596 raise BadStatusLine(line) 

597 version_o = HttpVersion(int(match.group(1)), int(match.group(2))) 

598 

599 if method == "CONNECT": 

600 # authority-form, 

601 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3 

602 url = URL.build(authority=path, encoded=True) 

603 elif path.startswith("/"): 

604 # origin-form, 

605 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1 

606 path_part, _hash_separator, url_fragment = path.partition("#") 

607 path_part, _question_mark_separator, qs_part = path_part.partition("?") 

608 

609 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based 

610 # NOTE: parser does, otherwise it results into the same 

611 # NOTE: HTTP Request-Line input producing different 

612 # NOTE: `yarl.URL()` objects 

613 url = URL.build( 

614 path=path_part, 

615 query_string=qs_part, 

616 fragment=url_fragment, 

617 encoded=True, 

618 ) 

619 elif path == "*" and method == "OPTIONS": 

620 # asterisk-form, 

621 url = URL(path, encoded=True) 

622 else: 

623 # absolute-form for proxy maybe, 

624 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2 

625 url = URL(path, encoded=True) 

626 if url.scheme == "": 

627 # not absolute-form 

628 raise InvalidURLError( 

629 path.encode(errors="surrogateescape").decode("latin1") 

630 ) 

631 

632 # read headers 

633 ( 

634 headers, 

635 raw_headers, 

636 close, 

637 compression, 

638 upgrade, 

639 chunked, 

640 ) = self.parse_headers(lines[1:]) 

641 

642 if close is None: # then the headers weren't set in the request 

643 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close 

644 close = True 

645 else: # HTTP 1.1 must ask to close. 

646 close = False 

647 

648 return RawRequestMessage( 

649 method, 

650 path, 

651 version_o, 

652 headers, 

653 raw_headers, 

654 close, 

655 compression, 

656 upgrade, 

657 chunked, 

658 url, 

659 ) 

660 

661 def _is_chunked_te(self, te: str) -> bool: 

662 # https://www.rfc-editor.org/rfc/rfc9112#section-7.1-3 

663 # "A sender MUST NOT apply the chunked transfer coding more 

664 # than once to a message body" 

665 parts = [p.strip(" \t") for p in te.split(",")] 

666 chunked_count = sum(1 for p in parts if p.isascii() and p.lower() == "chunked") 

667 if chunked_count > 1: 

668 raise BadHttpMessage("Request has duplicate `chunked` Transfer-Encoding") 

669 last = parts[-1] 

670 # .lower() transforms some non-ascii chars, so must check first. 

671 if last.isascii() and last.lower() == "chunked": 

672 return True 

673 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3 

674 raise BadHttpMessage("Request has invalid `Transfer-Encoding`") 

675 

676 

677class HttpResponseParser(HttpParser[RawResponseMessage]): 

678 """Read response status line and headers. 

679 

680 BadStatusLine could be raised in case of any errors in status line. 

681 Returns RawResponseMessage. 

682 """ 

683 

684 # Lax mode should only be enabled on response parser. 

685 lax = not DEBUG 

686 

687 def feed_data( 

688 self, 

689 data: bytes, 

690 SEP: _SEP | None = None, 

691 *args: Any, 

692 **kwargs: Any, 

693 ) -> tuple[list[tuple[RawResponseMessage, StreamReader]], bool, bytes]: 

694 if SEP is None: 

695 SEP = b"\r\n" if DEBUG else b"\n" 

696 return super().feed_data(data, SEP, *args, **kwargs) 

697 

698 def parse_message(self, lines: list[bytes]) -> RawResponseMessage: 

699 line = lines[0].decode("utf-8", "surrogateescape") 

700 try: 

701 version, status = line.split(maxsplit=1) 

702 except ValueError: 

703 raise BadStatusLine(line) from None 

704 

705 try: 

706 status, reason = status.split(maxsplit=1) 

707 except ValueError: 

708 status = status.strip() 

709 reason = "" 

710 

711 # version 

712 match = VERSRE.fullmatch(version) 

713 if match is None: 

714 raise BadStatusLine(line) 

715 version_o = HttpVersion(int(match.group(1)), int(match.group(2))) 

716 

717 # The status code is a three-digit ASCII number, no padding 

718 if len(status) != 3 or not DIGITS.fullmatch(status): 

719 raise BadStatusLine(line) 

720 status_i = int(status) 

721 

722 # read headers 

723 ( 

724 headers, 

725 raw_headers, 

726 close, 

727 compression, 

728 upgrade, 

729 chunked, 

730 ) = self.parse_headers(lines[1:]) 

731 

732 if close is None: 

733 if version_o <= HttpVersion10: 

734 close = True 

735 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length 

736 elif 100 <= status_i < 200 or status_i in {204, 304}: 

737 close = False 

738 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers: 

739 close = False 

740 else: 

741 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8 

742 close = True 

743 

744 return RawResponseMessage( 

745 version_o, 

746 status_i, 

747 reason.strip(), 

748 headers, 

749 raw_headers, 

750 close, 

751 compression, 

752 upgrade, 

753 chunked, 

754 ) 

755 

756 def _is_chunked_te(self, te: str) -> bool: 

757 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2 

758 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked" 

759 

760 

761class HttpPayloadParser: 

762 def __init__( 

763 self, 

764 payload: StreamReader, 

765 length: int | None = None, 

766 chunked: bool = False, 

767 compression: str | None = None, 

768 code: int | None = None, 

769 method: str | None = None, 

770 response_with_body: bool = True, 

771 auto_decompress: bool = True, 

772 lax: bool = False, 

773 *, 

774 headers_parser: HeadersParser, 

775 max_line_size: int = 8190, 

776 max_field_size: int = 8190, 

777 max_trailers: int = 128, 

778 ) -> None: 

779 self._length = 0 

780 self._type = ParseState.PARSE_UNTIL_EOF 

781 self._chunk = ChunkState.PARSE_CHUNKED_SIZE 

782 self._chunk_size = 0 

783 self._chunk_tail = b"" 

784 self._auto_decompress = auto_decompress 

785 self._lax = lax 

786 self._headers_parser = headers_parser 

787 self._max_line_size = max_line_size 

788 self._max_field_size = max_field_size 

789 self._max_trailers = max_trailers 

790 self._trailer_lines: list[bytes] = [] 

791 self.done = False 

792 

793 # payload decompression wrapper 

794 if response_with_body and compression and self._auto_decompress: 

795 real_payload: StreamReader | DeflateBuffer = DeflateBuffer( 

796 payload, compression 

797 ) 

798 else: 

799 real_payload = payload 

800 

801 # payload parser 

802 if not response_with_body: 

803 # don't parse payload if it's not expected to be received 

804 self._type = ParseState.PARSE_NONE 

805 real_payload.feed_eof() 

806 self.done = True 

807 elif chunked: 

808 self._type = ParseState.PARSE_CHUNKED 

809 elif length is not None: 

810 self._type = ParseState.PARSE_LENGTH 

811 self._length = length 

812 if self._length == 0: 

813 real_payload.feed_eof() 

814 self.done = True 

815 

816 self.payload = real_payload 

817 

818 def feed_eof(self) -> None: 

819 if self._type == ParseState.PARSE_UNTIL_EOF: 

820 self.payload.feed_eof() 

821 elif self._type == ParseState.PARSE_LENGTH: 

822 raise ContentLengthError( 

823 "Not enough data to satisfy content length header." 

824 ) 

825 elif self._type == ParseState.PARSE_CHUNKED: 

826 raise TransferEncodingError( 

827 "Not enough data to satisfy transfer length header." 

828 ) 

829 

830 def feed_data( 

831 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";" 

832 ) -> tuple[bool, bytes]: 

833 # Read specified amount of bytes 

834 if self._type == ParseState.PARSE_LENGTH: 

835 required = self._length 

836 self._length = max(required - len(chunk), 0) 

837 self.payload.feed_data(chunk[:required]) 

838 if self._length == 0: 

839 self.payload.feed_eof() 

840 return True, chunk[required:] 

841 

842 # Chunked transfer encoding parser 

843 elif self._type == ParseState.PARSE_CHUNKED: 

844 if self._chunk_tail: 

845 # We should never have a tail if we're inside the payload body. 

846 assert self._chunk != ChunkState.PARSE_CHUNKED_CHUNK 

847 # We should check the length is sane. 

848 max_line_length = self._max_line_size 

849 if self._chunk == ChunkState.PARSE_TRAILERS: 

850 max_line_length = self._max_field_size 

851 if len(self._chunk_tail) > max_line_length: 

852 raise LineTooLong(self._chunk_tail[:100] + b"...", max_line_length) 

853 

854 chunk = self._chunk_tail + chunk 

855 self._chunk_tail = b"" 

856 

857 while chunk: 

858 # read next chunk size 

859 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE: 

860 pos = chunk.find(SEP) 

861 if pos >= 0: 

862 i = chunk.find(CHUNK_EXT, 0, pos) 

863 if i >= 0: 

864 size_b = chunk[:i] # strip chunk-extensions 

865 # Verify no LF in the chunk-extension 

866 if b"\n" in (ext := chunk[i:pos]): 

867 exc = TransferEncodingError( 

868 f"Unexpected LF in chunk-extension: {ext!r}" 

869 ) 

870 set_exception(self.payload, exc) 

871 raise exc 

872 else: 

873 size_b = chunk[:pos] 

874 

875 if self._lax: # Allow whitespace in lax mode. 

876 size_b = size_b.strip() 

877 

878 if not re.fullmatch(HEXDIGITS, size_b): 

879 exc = TransferEncodingError( 

880 chunk[:pos].decode("ascii", "surrogateescape") 

881 ) 

882 set_exception(self.payload, exc) 

883 raise exc 

884 size = int(bytes(size_b), 16) 

885 

886 chunk = chunk[pos + len(SEP) :] 

887 if size == 0: # eof marker 

888 self._chunk = ChunkState.PARSE_TRAILERS 

889 if self._lax and chunk.startswith(b"\r"): 

890 chunk = chunk[1:] 

891 else: 

892 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK 

893 self._chunk_size = size 

894 self.payload.begin_http_chunk_receiving() 

895 else: 

896 self._chunk_tail = chunk 

897 return False, b"" 

898 

899 # read chunk and feed buffer 

900 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK: 

901 required = self._chunk_size 

902 self._chunk_size = max(required - len(chunk), 0) 

903 self.payload.feed_data(chunk[:required]) 

904 

905 if self._chunk_size: 

906 return False, b"" 

907 chunk = chunk[required:] 

908 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF 

909 self.payload.end_http_chunk_receiving() 

910 

911 # toss the CRLF at the end of the chunk 

912 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF: 

913 if self._lax and chunk.startswith(b"\r"): 

914 chunk = chunk[1:] 

915 if chunk[: len(SEP)] == SEP: 

916 chunk = chunk[len(SEP) :] 

917 self._chunk = ChunkState.PARSE_CHUNKED_SIZE 

918 elif len(chunk) >= len(SEP) or chunk != SEP[: len(chunk)]: 

919 exc = TransferEncodingError( 

920 "Chunk size mismatch: expected CRLF after chunk data" 

921 ) 

922 set_exception(self.payload, exc) 

923 raise exc 

924 else: 

925 self._chunk_tail = chunk 

926 return False, b"" 

927 

928 if self._chunk == ChunkState.PARSE_TRAILERS: 

929 pos = chunk.find(SEP) 

930 if pos < 0: # No line found 

931 self._chunk_tail = chunk 

932 return False, b"" 

933 

934 line = chunk[:pos] 

935 chunk = chunk[pos + len(SEP) :] 

936 if SEP == b"\n": # For lax response parsing 

937 line = line.rstrip(b"\r") 

938 

939 if len(line) > self._max_field_size: 

940 raise LineTooLong(line[:100] + b"...", self._max_field_size) 

941 

942 self._trailer_lines.append(line) 

943 

944 if len(self._trailer_lines) > self._max_trailers: 

945 raise BadHttpMessage("Too many trailers received") 

946 

947 # \r\n\r\n found, end of stream 

948 if self._trailer_lines[-1] == b"": 

949 # Headers and trailers are defined the same way, 

950 # so we reuse the HeadersParser here. 

951 try: 

952 trailers, raw_trailers = self._headers_parser.parse_headers( 

953 self._trailer_lines 

954 ) 

955 finally: 

956 self._trailer_lines.clear() 

957 self.payload.feed_eof() 

958 return True, chunk 

959 

960 # Read all bytes until eof 

961 elif self._type == ParseState.PARSE_UNTIL_EOF: 

962 self.payload.feed_data(chunk) 

963 

964 return False, b"" 

965 

966 

967class DeflateBuffer: 

968 """DeflateStream decompress stream and feed data into specified stream.""" 

969 

970 def __init__( 

971 self, 

972 out: StreamReader, 

973 encoding: str | None, 

974 max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE, 

975 ) -> None: 

976 self.out = out 

977 self.size = 0 

978 out.total_compressed_bytes = self.size 

979 self.encoding = encoding 

980 self._started_decoding = False 

981 

982 self.decompressor: BrotliDecompressor | ZLibDecompressor | ZSTDDecompressor 

983 if encoding == "br": 

984 if not HAS_BROTLI: 

985 raise ContentEncodingError( 

986 "Can not decode content-encoding: brotli (br). " 

987 "Please install `Brotli`" 

988 ) 

989 self.decompressor = BrotliDecompressor() 

990 elif encoding == "zstd": 

991 if not HAS_ZSTD: 

992 raise ContentEncodingError( 

993 "Can not decode content-encoding: zstandard (zstd). " 

994 "Please install `backports.zstd`" 

995 ) 

996 self.decompressor = ZSTDDecompressor() 

997 else: 

998 self.decompressor = ZLibDecompressor(encoding=encoding) 

999 

1000 self._max_decompress_size = max_decompress_size 

1001 

1002 def set_exception( 

1003 self, 

1004 exc: type[BaseException] | BaseException, 

1005 exc_cause: BaseException = _EXC_SENTINEL, 

1006 ) -> None: 

1007 set_exception(self.out, exc, exc_cause) 

1008 

1009 def feed_data(self, chunk: bytes) -> None: 

1010 if not chunk: 

1011 return 

1012 

1013 self.size += len(chunk) 

1014 self.out.total_compressed_bytes = self.size 

1015 

1016 # RFC1950 

1017 # bits 0..3 = CM = 0b1000 = 8 = "deflate" 

1018 # bits 4..7 = CINFO = 1..7 = windows size. 

1019 if ( 

1020 not self._started_decoding 

1021 and self.encoding == "deflate" 

1022 and chunk[0] & 0xF != 8 

1023 ): 

1024 # Change the decoder to decompress incorrectly compressed data 

1025 # Actually we should issue a warning about non-RFC-compliant data. 

1026 self.decompressor = ZLibDecompressor( 

1027 encoding=self.encoding, suppress_deflate_header=True 

1028 ) 

1029 

1030 try: 

1031 # Decompress with limit + 1 so we can detect if output exceeds limit 

1032 chunk = self.decompressor.decompress_sync( 

1033 chunk, max_length=self._max_decompress_size + 1 

1034 ) 

1035 except Exception: 

1036 raise ContentEncodingError( 

1037 "Can not decode content-encoding: %s" % self.encoding 

1038 ) 

1039 

1040 self._started_decoding = True 

1041 

1042 # Check if decompression limit was exceeded 

1043 if len(chunk) > self._max_decompress_size: 

1044 raise DecompressSizeError( 

1045 "Decompressed data exceeds the configured limit of %d bytes" 

1046 % self._max_decompress_size 

1047 ) 

1048 

1049 if chunk: 

1050 self.out.feed_data(chunk) 

1051 

1052 def feed_eof(self) -> None: 

1053 chunk = self.decompressor.flush() 

1054 

1055 if chunk or self.size > 0: 

1056 self.out.feed_data(chunk) 

1057 # decompressor is not brotli unless encoding is "br" 

1058 if self.encoding == "deflate" and not self.decompressor.eof: # type: ignore[union-attr] 

1059 raise ContentEncodingError("deflate") 

1060 

1061 self.out.feed_eof() 

1062 

1063 def begin_http_chunk_receiving(self) -> None: 

1064 self.out.begin_http_chunk_receiving() 

1065 

1066 def end_http_chunk_receiving(self) -> None: 

1067 self.out.end_http_chunk_receiving() 

1068 

1069 

1070HttpRequestParserPy = HttpRequestParser 

1071HttpResponseParserPy = HttpResponseParser 

1072RawRequestMessagePy = RawRequestMessage 

1073RawResponseMessagePy = RawResponseMessage 

1074 

1075with suppress(ImportError): 

1076 if not NO_EXTENSIONS: 

1077 from ._http_parser import ( # type: ignore[import-not-found,no-redef] 

1078 HttpRequestParser, 

1079 HttpResponseParser, 

1080 RawRequestMessage, 

1081 RawResponseMessage, 

1082 ) 

1083 

1084 HttpRequestParserC = HttpRequestParser 

1085 HttpResponseParserC = HttpResponseParser 

1086 RawRequestMessageC = RawRequestMessage 

1087 RawResponseMessageC = RawResponseMessage