Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_parser.py: 81%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

610 statements  

1import abc 

2import asyncio 

3import re 

4import string 

5import sys 

6from contextlib import suppress 

7from enum import IntEnum 

8from re import Pattern 

9from typing import ( 

10 TYPE_CHECKING, 

11 Any, 

12 ClassVar, 

13 Final, 

14 Generic, 

15 Literal, 

16 NamedTuple, 

17 TypeVar, 

18) 

19 

20from multidict import CIMultiDict, istr 

21from yarl import URL 

22 

23from . import hdrs 

24from .base_protocol import BaseProtocol 

25from .compression_utils import ( 

26 HAS_BROTLI, 

27 HAS_ZSTD, 

28 BrotliDecompressor, 

29 ZLibDecompressor, 

30 ZSTDDecompressor, 

31) 

32from .helpers import ( 

33 _EXC_SENTINEL, 

34 DEBUG, 

35 DEFAULT_CHUNK_SIZE, 

36 EMPTY_BODY_METHODS, 

37 EMPTY_BODY_STATUS_CODES, 

38 NO_EXTENSIONS, 

39 BaseTimerContext, 

40 HeadersDictProxy, 

41 set_exception, 

42) 

43from .http_exceptions import ( 

44 BadHttpMessage, 

45 BadHttpMethod, 

46 BadStatusLine, 

47 ContentEncodingError, 

48 ContentLengthError, 

49 InvalidHeader, 

50 InvalidURLError, 

51 LineTooLong, 

52 TransferEncodingError, 

53) 

54from .http_writer import HttpVersion, HttpVersion10, HttpVersion11 

55from .streams import EMPTY_PAYLOAD, StreamReader 

56from .typedefs import RawHeaders 

57 

58if TYPE_CHECKING: 

59 from .client_proto import ResponseHandler 

60 

61__all__ = ( 

62 "HeadersParser", 

63 "HttpParser", 

64 "HttpRequestParser", 

65 "HttpResponseParser", 

66 "RawRequestMessage", 

67 "RawResponseMessage", 

68) 

69 

70_T = TypeVar("_T") 

71 

72_SEP = Literal[b"\r\n", b"\n"] 

73 

74ASCIISET: Final[set[str]] = set(string.printable) 

75 

76# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview 

77# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens 

78# 

79# method = token 

80# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." / 

81# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA 

82# token = 1*tchar 

83_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~") 

84TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+") 

85# https://www.rfc-editor.org/rfc/rfc9110#section-5.5-5 

86_FIELD_VALUE_FORBIDDEN_CTL_RE: Final[Pattern[str]] = re.compile( 

87 r"[\x00-\x08\x0a-\x1f\x7f]" 

88) 

89VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII) 

90DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII) 

91HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+") 

92 

93# RFC 9110 singleton headers — duplicates are rejected in strict mode. 

94# In lax mode (response parser default), the check is skipped entirely 

95# since real-world servers (e.g. Google APIs, Werkzeug) commonly send 

96# duplicate headers like Content-Type or Server. 

97# Lowercased for case-insensitive matching against wire names. 

98SINGLETON_HEADERS: Final[frozenset[str]] = frozenset( 

99 { 

100 "content-length", 

101 "content-location", 

102 "content-range", 

103 "content-type", 

104 "etag", 

105 "host", 

106 "max-forwards", 

107 "server", 

108 "transfer-encoding", 

109 "user-agent", 

110 } 

111) 

112 

113 

114class RawRequestMessage(NamedTuple): 

115 method: str 

116 path: str 

117 version: HttpVersion 

118 headers: HeadersDictProxy 

119 raw_headers: RawHeaders 

120 should_close: bool 

121 compression: str | None 

122 upgrade: bool 

123 chunked: bool 

124 url: URL 

125 

126 

127class RawResponseMessage(NamedTuple): 

128 version: HttpVersion 

129 code: int 

130 reason: str 

131 headers: HeadersDictProxy 

132 raw_headers: RawHeaders 

133 should_close: bool 

134 compression: str | None 

135 upgrade: bool 

136 chunked: bool 

137 

138 

139_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage) 

140 

141 

142class PayloadState(IntEnum): 

143 PAYLOAD_COMPLETE = 0 

144 PAYLOAD_NEEDS_INPUT = 1 

145 PAYLOAD_HAS_PENDING_INPUT = 2 

146 

147 

148class ParseState(IntEnum): 

149 PARSE_NONE = 0 

150 PARSE_LENGTH = 1 

151 PARSE_CHUNKED = 2 

152 PARSE_UNTIL_EOF = 3 

153 

154 

155class ChunkState(IntEnum): 

156 PARSE_CHUNKED_SIZE = 0 

157 PARSE_CHUNKED_CHUNK = 1 

158 PARSE_CHUNKED_CHUNK_EOF = 2 

159 PARSE_TRAILERS = 4 

160 

161 

162class HeadersParser: 

163 def __init__(self, max_field_size: int = 8190, lax: bool = False) -> None: 

164 self.max_field_size = max_field_size 

165 self._lax = lax 

166 

167 def parse_headers(self, lines: list[bytes]) -> tuple[HeadersDictProxy, RawHeaders]: 

168 headers: CIMultiDict[str] = CIMultiDict() 

169 # note: "raw" does not mean inclusion of OWS before/after the field value 

170 raw_headers = [] 

171 

172 lines_idx = 0 

173 line = lines[lines_idx] 

174 line_count = len(lines) 

175 

176 while line: 

177 # Parse initial header name : value pair. 

178 try: 

179 bname, bvalue = line.split(b":", 1) 

180 except ValueError: 

181 raise InvalidHeader(line) from None 

182 

183 if len(bname) == 0: 

184 raise InvalidHeader(bname) 

185 

186 # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2 

187 if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"} 

188 raise InvalidHeader(line) 

189 

190 bvalue = bvalue.lstrip(b" \t") 

191 name = bname.decode("utf-8", "surrogateescape") 

192 if not TOKENRE.fullmatch(name): 

193 raise InvalidHeader(bname) 

194 

195 # next line 

196 lines_idx += 1 

197 line = lines[lines_idx] 

198 

199 # consume continuation lines 

200 continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t') 

201 

202 # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding 

203 if continuation: 

204 header_length = len(bvalue) 

205 bvalue_lst = [bvalue] 

206 while continuation: 

207 header_length += len(line) 

208 if header_length > self.max_field_size: 

209 header_line = bname + b": " + b"".join(bvalue_lst) 

210 raise LineTooLong( 

211 header_line[:100] + b"...", self.max_field_size 

212 ) 

213 bvalue_lst.append(line) 

214 

215 # next line 

216 lines_idx += 1 

217 if lines_idx < line_count: 

218 line = lines[lines_idx] 

219 if line: 

220 continuation = line[0] in (32, 9) # (' ', '\t') 

221 else: 

222 line = b"" 

223 break 

224 bvalue = b"".join(bvalue_lst) 

225 

226 bvalue = bvalue.strip(b" \t") 

227 value = bvalue.decode("utf-8", "surrogateescape") 

228 

229 # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5 

230 if self._lax: 

231 if "\n" in value or "\r" in value or "\x00" in value: 

232 raise InvalidHeader(bvalue) 

233 elif _FIELD_VALUE_FORBIDDEN_CTL_RE.search(value): 

234 raise InvalidHeader(bvalue) 

235 

236 if not self._lax and name in headers and name.lower() in SINGLETON_HEADERS: 

237 raise BadHttpMessage(f"Duplicate '{name}' header found.") 

238 headers.add(name, value) 

239 raw_headers.append((bname, bvalue)) 

240 

241 return (HeadersDictProxy(headers), tuple(raw_headers)) 

242 

243 

244def _is_supported_upgrade(headers: HeadersDictProxy) -> bool: 

245 """Check if the upgrade header is supported.""" 

246 u = headers.get(hdrs.UPGRADE, "") 

247 # .lower() can transform non-ascii characters. 

248 return u.isascii() and u.lower() in {"tcp", "websocket"} 

249 

250 

251class HttpParser(abc.ABC, Generic[_MsgT]): 

252 lax: ClassVar[bool] = False 

253 

254 def __init__( 

255 self, 

256 protocol: BaseProtocol, 

257 loop: asyncio.AbstractEventLoop, 

258 limit: int, 

259 max_line_size: int = 8190, 

260 max_headers: int = 128, 

261 max_field_size: int = 8190, 

262 timer: BaseTimerContext | None = None, 

263 code: int | None = None, 

264 method: str | None = None, 

265 payload_exception: type[BaseException] | None = None, 

266 response_with_body: bool = True, 

267 read_until_eof: bool = False, 

268 auto_decompress: bool = True, 

269 max_msg_queue_size: int = 0, 

270 ) -> None: 

271 self.protocol = protocol 

272 self.loop = loop 

273 self.max_line_size = max_line_size 

274 self.max_field_size = max_field_size 

275 self.max_headers = max_headers 

276 self.timer = timer 

277 self.code = code 

278 self.method = method 

279 self.payload_exception = payload_exception 

280 self.response_with_body = response_with_body 

281 self.read_until_eof = read_until_eof 

282 

283 self._lines: list[bytes] = [] 

284 self._tail = b"" 

285 self._upgraded = False 

286 self._payload = None 

287 self._payload_parser: HttpPayloadParser | None = None 

288 self._payload_has_more_data = False 

289 self._auto_decompress = auto_decompress 

290 self._limit = limit 

291 self._headers_parser = HeadersParser(max_field_size, self.lax) 

292 # Stop emitting messages once this many are queued unconsumed (0 = off). 

293 self._max_msg_queue_size = max_msg_queue_size 

294 self._msg_in_flight = 0 

295 

296 @abc.abstractmethod 

297 def parse_message(self, lines: list[bytes]) -> _MsgT: ... 

298 

299 @abc.abstractmethod 

300 def _is_chunked_te(self, te: str) -> bool: ... 

301 

302 def pause_reading(self) -> None: 

303 assert self._payload_parser is not None 

304 self._payload_parser.pause_reading() 

305 

306 def message_consumed(self) -> None: 

307 """Protocol drained a queued message; free a slot for parsing.""" 

308 if self._msg_in_flight > 0: 

309 self._msg_in_flight -= 1 

310 

311 def feed_eof(self) -> _MsgT | None: 

312 if self._payload_parser is not None: 

313 self._payload_parser.feed_eof() 

314 if self._payload_parser.done: 

315 self._payload_parser = None 

316 else: 

317 # try to extract partial message 

318 if self._tail: 

319 self._lines.append(self._tail) 

320 

321 if self._lines: 

322 if self._lines[-1] != "\r\n": 

323 self._lines.append(b"") 

324 with suppress(Exception): 

325 return self.parse_message(self._lines) 

326 return None 

327 

328 def feed_data( 

329 self, 

330 data: bytes, 

331 SEP: _SEP = b"\r\n", 

332 EMPTY: bytes = b"", 

333 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH, 

334 METH_CONNECT: str = hdrs.METH_CONNECT, 

335 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1, 

336 ) -> tuple[list[tuple[_MsgT, StreamReader]], bool, bytes]: 

337 messages = [] 

338 

339 if self._tail: 

340 data, self._tail = self._tail + data, b"" 

341 

342 data_len = len(data) 

343 start_pos = 0 

344 loop = self.loop 

345 max_line_length = self.max_line_size 

346 

347 should_close = False 

348 while start_pos < data_len or self._payload_has_more_data: 

349 # read HTTP message (request/response line + headers), \r\n\r\n 

350 # and split by lines 

351 if self._payload_parser is None and not self._upgraded: 

352 if ( 

353 self._max_msg_queue_size 

354 and self._msg_in_flight >= self._max_msg_queue_size 

355 ): 

356 # Queue full: buffer the rest and stop. Safe pause point; 

357 # any preceding body is consumed before the next request 

358 # line. Resumes via feed_data(b"") when the queue drains. 

359 self._tail = data[start_pos:] 

360 break 

361 pos = data.find(SEP, start_pos) 

362 # consume \r\n 

363 if pos == start_pos and not self._lines: 

364 start_pos = pos + len(SEP) 

365 continue 

366 

367 if pos >= start_pos: 

368 if should_close: 

369 raise BadHttpMessage("Data after `Connection: close`") 

370 

371 # line found 

372 line = data[start_pos:pos] 

373 if SEP == b"\n": # For lax response parsing 

374 line = line.rstrip(b"\r") 

375 if len(line) > max_line_length: 

376 raise LineTooLong(line[:100] + b"...", max_line_length) 

377 

378 self._lines.append(line) 

379 # After processing the status/request line, everything is a header. 

380 max_line_length = self.max_field_size 

381 

382 if len(self._lines) > self.max_headers: 

383 raise BadHttpMessage("Too many headers received") 

384 

385 start_pos = pos + len(SEP) 

386 

387 # \r\n\r\n found 

388 if self._lines[-1] == EMPTY: 

389 max_trailers = self.max_headers - len(self._lines) 

390 try: 

391 msg: _MsgT = self.parse_message(self._lines) 

392 finally: 

393 self._lines.clear() 

394 

395 def get_content_length() -> int | None: 

396 # payload length 

397 length_hdr = msg.headers.get(CONTENT_LENGTH) 

398 if length_hdr is None: 

399 return None 

400 

401 # Shouldn't allow +/- or other number formats. 

402 # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2 

403 # msg.headers is already stripped of leading/trailing wsp 

404 if not DIGITS.fullmatch(length_hdr): 

405 raise InvalidHeader(CONTENT_LENGTH) 

406 

407 return int(length_hdr) 

408 

409 length = get_content_length() 

410 # do not support old websocket spec 

411 if SEC_WEBSOCKET_KEY1 in msg.headers: 

412 raise InvalidHeader(SEC_WEBSOCKET_KEY1) 

413 

414 self._upgraded = msg.upgrade and _is_supported_upgrade( 

415 msg.headers 

416 ) 

417 

418 method = getattr(msg, "method", self.method) 

419 # code is only present on responses 

420 code = getattr(msg, "code", 0) 

421 

422 assert self.protocol is not None 

423 # calculate payload 

424 empty_body = code in EMPTY_BODY_STATUS_CODES or bool( 

425 method and method in EMPTY_BODY_METHODS 

426 ) 

427 if not empty_body and ( 

428 ((length is not None and length > 0) or msg.chunked) 

429 and not self._upgraded 

430 ): 

431 payload = StreamReader( 

432 self.protocol, 

433 timer=self.timer, 

434 loop=loop, 

435 limit=self._limit, 

436 ) 

437 payload_parser = HttpPayloadParser( 

438 payload, 

439 length=length, 

440 chunked=msg.chunked, 

441 method=method, 

442 compression=msg.compression, 

443 code=self.code, 

444 response_with_body=self.response_with_body, 

445 auto_decompress=self._auto_decompress, 

446 lax=self.lax, 

447 headers_parser=self._headers_parser, 

448 max_line_size=self.max_line_size, 

449 max_field_size=self.max_field_size, 

450 max_trailers=max_trailers, 

451 limit=self._limit, 

452 ) 

453 if not payload_parser.done: 

454 self._payload_parser = payload_parser 

455 elif method == METH_CONNECT: 

456 assert isinstance(msg, RawRequestMessage) 

457 payload = StreamReader( 

458 self.protocol, 

459 timer=self.timer, 

460 loop=loop, 

461 limit=self._limit, 

462 ) 

463 self._upgraded = True 

464 self._payload_parser = HttpPayloadParser( 

465 payload, 

466 method=msg.method, 

467 compression=msg.compression, 

468 auto_decompress=self._auto_decompress, 

469 lax=self.lax, 

470 headers_parser=self._headers_parser, 

471 max_line_size=self.max_line_size, 

472 max_field_size=self.max_field_size, 

473 max_trailers=max_trailers, 

474 limit=self._limit, 

475 ) 

476 elif not empty_body and length is None and self.read_until_eof: 

477 payload = StreamReader( 

478 self.protocol, 

479 timer=self.timer, 

480 loop=loop, 

481 limit=self._limit, 

482 ) 

483 payload_parser = HttpPayloadParser( 

484 payload, 

485 length=length, 

486 chunked=msg.chunked, 

487 method=method, 

488 compression=msg.compression, 

489 code=self.code, 

490 response_with_body=self.response_with_body, 

491 auto_decompress=self._auto_decompress, 

492 lax=self.lax, 

493 headers_parser=self._headers_parser, 

494 max_line_size=self.max_line_size, 

495 max_field_size=self.max_field_size, 

496 max_trailers=max_trailers, 

497 limit=self._limit, 

498 ) 

499 if not payload_parser.done: 

500 self._payload_parser = payload_parser 

501 else: 

502 payload = EMPTY_PAYLOAD 

503 

504 messages.append((msg, payload)) 

505 if self._max_msg_queue_size: 

506 self._msg_in_flight += 1 

507 should_close = msg.should_close 

508 else: 

509 self._tail = data[start_pos:] 

510 if len(self._tail) > self.max_line_size: 

511 raise LineTooLong(self._tail[:100] + b"...", self.max_line_size) 

512 data = EMPTY 

513 break 

514 

515 # no parser, just store 

516 elif self._payload_parser is None and self._upgraded: 

517 assert not self._lines 

518 break 

519 

520 # feed payload 

521 else: 

522 assert not self._lines 

523 assert self._payload_parser is not None 

524 try: 

525 payload_state, data = self._payload_parser.feed_data( 

526 data[start_pos:], SEP 

527 ) 

528 except Exception as underlying_exc: 

529 reraised_exc: BaseException = underlying_exc 

530 if self.payload_exception is not None: 

531 reraised_exc = self.payload_exception(str(underlying_exc)) 

532 

533 set_exception( 

534 self._payload_parser.payload, 

535 reraised_exc, 

536 underlying_exc, 

537 ) 

538 

539 payload_state = PayloadState.PAYLOAD_COMPLETE 

540 data = b"" 

541 if isinstance( 

542 underlying_exc, (InvalidHeader, TransferEncodingError) 

543 ): 

544 raise 

545 

546 self._payload_has_more_data = ( 

547 payload_state == PayloadState.PAYLOAD_HAS_PENDING_INPUT 

548 ) 

549 

550 if payload_state is not PayloadState.PAYLOAD_COMPLETE: 

551 # We've either consumed all available data, or we're pausing 

552 # until the reader buffer is freed up. 

553 break 

554 

555 start_pos = 0 

556 data_len = len(data) 

557 self._payload_parser = None 

558 

559 if data and start_pos < data_len: 

560 data = data[start_pos:] 

561 else: 

562 data = EMPTY 

563 

564 return messages, self._upgraded, data 

565 

566 def parse_headers( 

567 self, lines: list[bytes] 

568 ) -> tuple[HeadersDictProxy, RawHeaders, bool | None, str | None, bool, bool]: 

569 """Parses RFC 5322 headers from a stream. 

570 

571 Line continuations are supported. Returns list of header name 

572 and value pairs. Header name is in upper case. 

573 """ 

574 headers, raw_headers = self._headers_parser.parse_headers(lines) 

575 close_conn = None 

576 encoding = None 

577 upgrade = False 

578 chunked = False 

579 

580 # keep-alive and protocol switching 

581 # RFC 9110 section 7.6.1 defines Connection as a comma-separated list. 

582 # We use a simple comma split here rather than getall() for performance, 

583 # as the target tokens (close, keep-alive, upgrade) are simple ASCII 

584 # values that never contain commas. 

585 conn_values = headers.get(hdrs.CONNECTION) 

586 if conn_values: 

587 conn_tokens = { 

588 token.lower() 

589 for token in (part.strip(" \t") for part in conn_values.split(",")) 

590 if token and token.isascii() 

591 } 

592 

593 if "close" in conn_tokens: 

594 close_conn = True 

595 elif "keep-alive" in conn_tokens: 

596 close_conn = False 

597 

598 # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols 

599 if "upgrade" in conn_tokens and headers.get(hdrs.UPGRADE): 

600 upgrade = True 

601 

602 # encoding 

603 enc = headers.get(hdrs.CONTENT_ENCODING, "") 

604 if enc.isascii() and enc.lower() in {"gzip", "deflate", "br", "zstd"}: 

605 encoding = enc 

606 

607 # chunking 

608 te = headers.get(hdrs.TRANSFER_ENCODING) 

609 if te is not None: 

610 if self._is_chunked_te(te): 

611 chunked = True 

612 

613 if hdrs.CONTENT_LENGTH in headers: 

614 raise BadHttpMessage( 

615 "Transfer-Encoding can't be present with Content-Length", 

616 ) 

617 

618 return (headers, raw_headers, close_conn, encoding, upgrade, chunked) 

619 

620 def set_upgraded(self, val: bool) -> None: 

621 """Set connection upgraded (to websocket) mode. 

622 

623 :param bool val: new state. 

624 """ 

625 self._upgraded = val 

626 

627 

628class HttpRequestParser(HttpParser[RawRequestMessage]): 

629 """Read request status line. 

630 

631 Exception .http_exceptions.BadStatusLine 

632 could be raised in case of any errors in status line. 

633 Returns RawRequestMessage. 

634 """ 

635 

636 def parse_message(self, lines: list[bytes]) -> RawRequestMessage: 

637 # request line 

638 line = lines[0].decode("utf-8", "surrogateescape") 

639 try: 

640 method, path, version = line.split(" ", maxsplit=2) 

641 except ValueError: 

642 raise BadHttpMethod(line) from None 

643 

644 # method 

645 if not TOKENRE.fullmatch(method): 

646 raise BadHttpMethod(method) 

647 

648 # version 

649 match = VERSRE.fullmatch(version) 

650 if match is None: 

651 raise BadStatusLine(line) 

652 version_o = HttpVersion(int(match.group(1)), int(match.group(2))) 

653 

654 if method == "CONNECT": 

655 # authority-form, 

656 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3 

657 url = URL.build(authority=path, encoded=True) 

658 elif path.startswith("/"): 

659 # origin-form, 

660 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1 

661 path_part, _hash_separator, url_fragment = path.partition("#") 

662 path_part, _question_mark_separator, qs_part = path_part.partition("?") 

663 

664 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based 

665 # NOTE: parser does, otherwise it results into the same 

666 # NOTE: HTTP Request-Line input producing different 

667 # NOTE: `yarl.URL()` objects 

668 url = URL.build( 

669 path=path_part, 

670 query_string=qs_part, 

671 fragment=url_fragment, 

672 encoded=True, 

673 ) 

674 elif path == "*" and method == "OPTIONS": 

675 # asterisk-form, 

676 url = URL(path, encoded=True) 

677 else: 

678 # absolute-form for proxy maybe, 

679 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2 

680 url = URL(path, encoded=True) 

681 if url.scheme == "": 

682 # not absolute-form 

683 raise InvalidURLError( 

684 path.encode(errors="surrogateescape").decode("latin1") 

685 ) 

686 

687 # read headers 

688 ( 

689 headers, 

690 raw_headers, 

691 close, 

692 compression, 

693 upgrade, 

694 chunked, 

695 ) = self.parse_headers(lines[1:]) 

696 

697 if version_o == HttpVersion11 and hdrs.HOST not in headers: 

698 raise BadHttpMessage("Missing 'Host' header in request.") 

699 

700 if close is None: # then the headers weren't set in the request 

701 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close 

702 close = True 

703 else: # HTTP 1.1 must ask to close. 

704 close = False 

705 

706 return RawRequestMessage( 

707 method, 

708 path, 

709 version_o, 

710 headers, 

711 raw_headers, 

712 close, 

713 compression, 

714 upgrade, 

715 chunked, 

716 url, 

717 ) 

718 

719 def _is_chunked_te(self, te: str) -> bool: 

720 # https://www.rfc-editor.org/rfc/rfc9112#section-7.1-3 

721 # "A sender MUST NOT apply the chunked transfer coding more 

722 # than once to a message body" 

723 parts = [p.strip(" \t") for p in te.split(",")] 

724 chunked_count = sum(1 for p in parts if p.isascii() and p.lower() == "chunked") 

725 if chunked_count > 1: 

726 raise BadHttpMessage("Request has duplicate `chunked` Transfer-Encoding") 

727 last = parts[-1] 

728 # .lower() transforms some non-ascii chars, so must check first. 

729 if last.isascii() and last.lower() == "chunked": 

730 return True 

731 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3 

732 raise BadHttpMessage("Request has invalid `Transfer-Encoding`") 

733 

734 

735class HttpResponseParser(HttpParser[RawResponseMessage]): 

736 """Read response status line and headers. 

737 

738 BadStatusLine could be raised in case of any errors in status line. 

739 Returns RawResponseMessage. 

740 """ 

741 

742 protocol: "ResponseHandler" 

743 

744 # Lax mode should only be enabled on response parser. 

745 lax = not DEBUG 

746 

747 def feed_data( 

748 self, 

749 data: bytes, 

750 SEP: _SEP | None = None, 

751 *args: Any, 

752 **kwargs: Any, 

753 ) -> tuple[list[tuple[RawResponseMessage, StreamReader]], bool, bytes]: 

754 if SEP is None: 

755 SEP = b"\r\n" if DEBUG else b"\n" 

756 return super().feed_data(data, SEP, *args, **kwargs) 

757 

758 def parse_message(self, lines: list[bytes]) -> RawResponseMessage: 

759 line = lines[0].decode("utf-8", "surrogateescape") 

760 try: 

761 version, status = line.split(maxsplit=1) 

762 except ValueError: 

763 raise BadStatusLine(line) from None 

764 

765 try: 

766 status, reason = status.split(maxsplit=1) 

767 except ValueError: 

768 status = status.strip() 

769 reason = "" 

770 

771 # version 

772 match = VERSRE.fullmatch(version) 

773 if match is None: 

774 raise BadStatusLine(line) 

775 version_o = HttpVersion(int(match.group(1)), int(match.group(2))) 

776 

777 # The status code is a three-digit ASCII number, no padding 

778 if len(status) != 3 or not DIGITS.fullmatch(status): 

779 raise BadStatusLine(line) 

780 status_i = int(status) 

781 

782 # read headers 

783 ( 

784 headers, 

785 raw_headers, 

786 close, 

787 compression, 

788 upgrade, 

789 chunked, 

790 ) = self.parse_headers(lines[1:]) 

791 

792 if close is None: 

793 if version_o <= HttpVersion10: 

794 close = True 

795 # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length 

796 elif 100 <= status_i < 200 or status_i in {204, 304}: 

797 close = False 

798 elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers: 

799 close = False 

800 else: 

801 # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8 

802 close = True 

803 

804 return RawResponseMessage( 

805 version_o, 

806 status_i, 

807 reason.strip(), 

808 headers, 

809 raw_headers, 

810 close, 

811 compression, 

812 upgrade, 

813 chunked, 

814 ) 

815 

816 def _is_chunked_te(self, te: str) -> bool: 

817 # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2 

818 return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked" 

819 

820 

821class HttpPayloadParser: 

822 def __init__( 

823 self, 

824 payload: StreamReader, 

825 length: int | None = None, 

826 chunked: bool = False, 

827 compression: str | None = None, 

828 code: int | None = None, 

829 method: str | None = None, 

830 response_with_body: bool = True, 

831 auto_decompress: bool = True, 

832 lax: bool = False, 

833 *, 

834 headers_parser: HeadersParser, 

835 max_line_size: int = 8190, 

836 max_field_size: int = 8190, 

837 max_trailers: int = 128, 

838 limit: int = DEFAULT_CHUNK_SIZE, 

839 ) -> None: 

840 self._length = 0 

841 self._paused = False 

842 self._type = ParseState.PARSE_UNTIL_EOF 

843 self._chunk = ChunkState.PARSE_CHUNKED_SIZE 

844 self._chunk_size = 0 

845 self._chunk_tail = b"" 

846 self._auto_decompress = auto_decompress 

847 self._lax = lax 

848 self._headers_parser = headers_parser 

849 self._max_line_size = max_line_size 

850 self._max_field_size = max_field_size 

851 self._max_trailers = max_trailers 

852 self._more_data_available = False 

853 self._trailer_lines: list[bytes] = [] 

854 self.done = False 

855 self._eof_pending = False 

856 

857 # payload decompression wrapper 

858 if response_with_body and compression and self._auto_decompress: 

859 real_payload: StreamReader | DeflateBuffer = DeflateBuffer( 

860 payload, compression, max_decompress_size=limit 

861 ) 

862 else: 

863 real_payload = payload 

864 

865 # payload parser 

866 if not response_with_body: 

867 # don't parse payload if it's not expected to be received 

868 self._type = ParseState.PARSE_NONE 

869 real_payload.feed_eof() 

870 self.done = True 

871 elif chunked: 

872 self._type = ParseState.PARSE_CHUNKED 

873 elif length is not None: 

874 self._type = ParseState.PARSE_LENGTH 

875 self._length = length 

876 self._length_expected = length 

877 if self._length == 0: 

878 real_payload.feed_eof() 

879 self.done = True 

880 

881 self.payload = real_payload 

882 

883 def pause_reading(self) -> None: 

884 self._paused = True 

885 

886 def feed_eof(self) -> None: 

887 if self._type == ParseState.PARSE_UNTIL_EOF: 

888 self._eof_pending = True 

889 while self._more_data_available: 

890 if self._paused: 

891 self._paused = False 

892 return # Will resume via feed_data(b"") later 

893 self._more_data_available = self.payload.feed_data(b"") 

894 self.payload.feed_eof() 

895 self.done = True 

896 self._eof_pending = False 

897 elif self._type == ParseState.PARSE_LENGTH: 

898 received = self._length_expected - self._length 

899 raise ContentLengthError( 

900 f"Not enough data to satisfy content length header " 

901 f"(received {received} of {self._length_expected} bytes)." 

902 ) 

903 elif self._type == ParseState.PARSE_CHUNKED: 

904 raise TransferEncodingError( 

905 "Not enough data to satisfy transfer length header." 

906 ) 

907 

908 def feed_data( 

909 self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";" 

910 ) -> tuple[PayloadState, bytes]: 

911 """Receive a chunk of data to process. 

912 

913 Return: 

914 PayloadState - The current state of payload processing. 

915 This function may be called with empty bytes after returning 

916 PAYLOAD_HAS_PENDING_INPUT to continue processing after a pause. 

917 bytes - If payload is complete, this is the unconsumed bytes intended for the 

918 next message/payload, b"" otherwise. 

919 """ 

920 # Read specified amount of bytes 

921 if self._type == ParseState.PARSE_LENGTH: 

922 if self._chunk_tail: 

923 chunk = self._chunk_tail + chunk 

924 self._chunk_tail = b"" 

925 

926 required = self._length 

927 self._length = max(required - len(chunk), 0) 

928 self._more_data_available = self.payload.feed_data(chunk[:required]) 

929 while self._more_data_available: 

930 if self._paused: 

931 self._paused = False 

932 self._chunk_tail = chunk[required:] 

933 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b"" 

934 self._more_data_available = self.payload.feed_data(b"") 

935 

936 if self._length == 0: 

937 self.payload.feed_eof() 

938 return PayloadState.PAYLOAD_COMPLETE, chunk[required:] 

939 # Chunked transfer encoding parser 

940 elif self._type == ParseState.PARSE_CHUNKED: 

941 if self._chunk_tail: 

942 # We should check the length is sane when not processing payload body. 

943 if self._chunk != ChunkState.PARSE_CHUNKED_CHUNK: 

944 max_line_length = self._max_line_size 

945 if self._chunk == ChunkState.PARSE_TRAILERS: 

946 max_line_length = self._max_field_size 

947 if len(self._chunk_tail) > max_line_length: 

948 raise LineTooLong( 

949 self._chunk_tail[:100] + b"...", max_line_length 

950 ) 

951 

952 chunk = self._chunk_tail + chunk 

953 self._chunk_tail = b"" 

954 

955 while chunk or self._more_data_available: 

956 # read next chunk size 

957 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE: 

958 pos = chunk.find(SEP) 

959 if pos >= 0: 

960 # Only chunk-size lines reach here; trailers enforce 

961 # _max_field_size separately in PARSE_TRAILERS below. 

962 if pos > self._max_line_size: 

963 raise LineTooLong(chunk[:100] + b"...", self._max_line_size) 

964 i = chunk.find(CHUNK_EXT, 0, pos) 

965 if i >= 0: 

966 size_b = chunk[:i] # strip chunk-extensions 

967 # Verify no LF in the chunk-extension 

968 if b"\n" in (ext := chunk[i:pos]): 

969 exc = TransferEncodingError( 

970 f"Unexpected LF in chunk-extension: {ext!r}" 

971 ) 

972 set_exception(self.payload, exc) 

973 raise exc 

974 else: 

975 size_b = chunk[:pos] 

976 

977 if self._lax: # Allow whitespace in lax mode. 

978 size_b = size_b.strip() 

979 

980 if not re.fullmatch(HEXDIGITS, size_b): 

981 exc = TransferEncodingError( 

982 chunk[:pos].decode("ascii", "surrogateescape") 

983 ) 

984 set_exception(self.payload, exc) 

985 raise exc 

986 size = int(bytes(size_b), 16) 

987 

988 chunk = chunk[pos + len(SEP) :] 

989 if size == 0: # eof marker 

990 self._chunk = ChunkState.PARSE_TRAILERS 

991 if self._lax and chunk.startswith(b"\r"): 

992 chunk = chunk[1:] 

993 else: 

994 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK 

995 self._chunk_size = size 

996 self.payload.begin_http_chunk_receiving() 

997 else: 

998 self._chunk_tail = chunk 

999 return PayloadState.PAYLOAD_NEEDS_INPUT, b"" 

1000 

1001 # read chunk and feed buffer 

1002 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK: 

1003 if self._paused: 

1004 self._paused = False 

1005 self._chunk_tail = chunk 

1006 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b"" 

1007 

1008 required = self._chunk_size 

1009 self._chunk_size = max(required - len(chunk), 0) 

1010 self._more_data_available = self.payload.feed_data(chunk[:required]) 

1011 chunk = chunk[required:] 

1012 

1013 if self._more_data_available: 

1014 continue 

1015 

1016 if self._chunk_size: 

1017 self._paused = False 

1018 return PayloadState.PAYLOAD_NEEDS_INPUT, b"" 

1019 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF 

1020 self.payload.end_http_chunk_receiving() 

1021 

1022 # toss the CRLF at the end of the chunk 

1023 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF: 

1024 if self._lax and chunk.startswith(b"\r"): 

1025 chunk = chunk[1:] 

1026 if chunk[: len(SEP)] == SEP: 

1027 chunk = chunk[len(SEP) :] 

1028 self._chunk = ChunkState.PARSE_CHUNKED_SIZE 

1029 elif len(chunk) >= len(SEP) or chunk != SEP[: len(chunk)]: 

1030 exc = TransferEncodingError( 

1031 "Chunk size mismatch: expected CRLF after chunk data" 

1032 ) 

1033 set_exception(self.payload, exc) 

1034 raise exc 

1035 else: 

1036 self._chunk_tail = chunk 

1037 return PayloadState.PAYLOAD_NEEDS_INPUT, b"" 

1038 

1039 if self._chunk == ChunkState.PARSE_TRAILERS: 

1040 pos = chunk.find(SEP) 

1041 if pos < 0: # No line found 

1042 self._chunk_tail = chunk 

1043 return PayloadState.PAYLOAD_NEEDS_INPUT, b"" 

1044 

1045 line = chunk[:pos] 

1046 chunk = chunk[pos + len(SEP) :] 

1047 if SEP == b"\n": # For lax response parsing 

1048 line = line.rstrip(b"\r") 

1049 

1050 if len(line) > self._max_field_size: 

1051 raise LineTooLong(line[:100] + b"...", self._max_field_size) 

1052 

1053 self._trailer_lines.append(line) 

1054 

1055 if len(self._trailer_lines) > self._max_trailers: 

1056 raise BadHttpMessage("Too many trailers received") 

1057 

1058 # \r\n\r\n found, end of stream 

1059 if self._trailer_lines[-1] == b"": 

1060 # Headers and trailers are defined the same way, 

1061 # so we reuse the HeadersParser here. 

1062 try: 

1063 trailers, raw_trailers = self._headers_parser.parse_headers( 

1064 self._trailer_lines 

1065 ) 

1066 finally: 

1067 self._trailer_lines.clear() 

1068 self.payload.feed_eof() 

1069 return PayloadState.PAYLOAD_COMPLETE, chunk 

1070 

1071 # Read all bytes until eof 

1072 elif self._type == ParseState.PARSE_UNTIL_EOF: 

1073 self._more_data_available = self.payload.feed_data(chunk) 

1074 while self._more_data_available: 

1075 if self._paused: 

1076 self._paused = False 

1077 return PayloadState.PAYLOAD_HAS_PENDING_INPUT, b"" 

1078 self._more_data_available = self.payload.feed_data(b"") 

1079 

1080 if self._eof_pending: 

1081 self.payload.feed_eof() 

1082 self.done = True 

1083 self._eof_pending = False 

1084 return PayloadState.PAYLOAD_COMPLETE, b"" 

1085 

1086 return PayloadState.PAYLOAD_NEEDS_INPUT, b"" 

1087 

1088 

1089class DeflateBuffer: 

1090 """DeflateStream decompress stream and feed data into specified stream.""" 

1091 

1092 def __init__( 

1093 self, 

1094 out: StreamReader, 

1095 encoding: str | None, 

1096 max_decompress_size: int = DEFAULT_CHUNK_SIZE, 

1097 ) -> None: 

1098 self.out = out 

1099 self.size = 0 

1100 out.total_compressed_bytes = self.size 

1101 self.encoding = encoding 

1102 self._started_decoding = False 

1103 

1104 self.decompressor: BrotliDecompressor | ZLibDecompressor | ZSTDDecompressor 

1105 if encoding == "br": 

1106 if not HAS_BROTLI: 

1107 raise ContentEncodingError( 

1108 "Can not decode content-encoding: brotli (br). " 

1109 "Please install `Brotli`" 

1110 ) 

1111 self.decompressor = BrotliDecompressor() 

1112 elif encoding == "zstd": 

1113 if not HAS_ZSTD: 

1114 raise ContentEncodingError( 

1115 "Can not decode content-encoding: zstandard (zstd). " 

1116 "Please install `backports.zstd`" 

1117 ) 

1118 self.decompressor = ZSTDDecompressor() 

1119 else: 

1120 self.decompressor = ZLibDecompressor(encoding=encoding) 

1121 

1122 self._max_decompress_size = max_decompress_size 

1123 

1124 def set_exception( 

1125 self, 

1126 exc: type[BaseException] | BaseException, 

1127 exc_cause: BaseException = _EXC_SENTINEL, 

1128 ) -> None: 

1129 set_exception(self.out, exc, exc_cause) 

1130 

1131 def feed_data(self, chunk: bytes) -> bool: 

1132 """Return True if more data is available and this method should be called again with b"".""" 

1133 self.size += len(chunk) 

1134 self.out.total_compressed_bytes = self.size 

1135 

1136 # RFC1950 

1137 # bits 0..3 = CM = 0b1000 = 8 = "deflate" 

1138 # bits 4..7 = CINFO = 1..7 = windows size. 

1139 if ( 

1140 not self._started_decoding 

1141 and self.encoding == "deflate" 

1142 and chunk[0] & 0xF != 8 

1143 ): 

1144 # Change the decoder to decompress incorrectly compressed data 

1145 # Actually we should issue a warning about non-RFC-compliant data. 

1146 self.decompressor = ZLibDecompressor( 

1147 encoding=self.encoding, suppress_deflate_header=True 

1148 ) 

1149 

1150 low_water = self.out._low_water 

1151 max_length = ( 

1152 0 if low_water >= sys.maxsize else max(self._max_decompress_size, low_water) 

1153 ) 

1154 try: 

1155 chunk = self.decompressor.decompress_sync(chunk, max_length=max_length) 

1156 except Exception: 

1157 raise ContentEncodingError( 

1158 "Can not decode content-encoding: %s" % self.encoding 

1159 ) 

1160 

1161 self._started_decoding = True 

1162 

1163 if chunk: 

1164 self.out.feed_data(chunk) 

1165 return self.decompressor.data_available 

1166 

1167 def feed_eof(self) -> None: 

1168 chunk = self.decompressor.flush() 

1169 # This should never contain data as we defer the call until exhausting 

1170 # the decompression. If .flush() is returning data, this may indicate a 

1171 # zip bomb vulnerability as it will decompress all remaining data at once. 

1172 assert not chunk 

1173 

1174 if self.size > 0: 

1175 # decompressor is not brotli unless encoding is "br" 

1176 if self.encoding == "deflate" and not self.decompressor.eof: # type: ignore[union-attr] 

1177 raise ContentEncodingError("deflate") 

1178 

1179 self.out.feed_eof() 

1180 

1181 def begin_http_chunk_receiving(self) -> None: 

1182 self.out.begin_http_chunk_receiving() 

1183 

1184 def end_http_chunk_receiving(self) -> None: 

1185 self.out.end_http_chunk_receiving() 

1186 

1187 

1188HttpRequestParserPy = HttpRequestParser 

1189HttpResponseParserPy = HttpResponseParser 

1190RawRequestMessagePy = RawRequestMessage 

1191RawResponseMessagePy = RawResponseMessage 

1192 

1193with suppress(ImportError): 

1194 if not NO_EXTENSIONS: 

1195 from ._http_parser import ( # type: ignore[import-not-found,no-redef] 

1196 HttpRequestParser, 

1197 HttpResponseParser, 

1198 RawRequestMessage, 

1199 RawResponseMessage, 

1200 ) 

1201 

1202 HttpRequestParserC = HttpRequestParser 

1203 HttpResponseParserC = HttpResponseParser 

1204 RawRequestMessageC = RawRequestMessage 

1205 RawResponseMessageC = RawResponseMessage