Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/http_parser.py: 90%

466 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import abc 

2import asyncio 

3import collections 

4import re 

5import string 

6from contextlib import suppress 

7from enum import IntEnum 

8from typing import ( 

9 Generic, 

10 List, 

11 NamedTuple, 

12 Optional, 

13 Pattern, 

14 Set, 

15 Tuple, 

16 Type, 

17 TypeVar, 

18 Union, 

19) 

20 

21from multidict import CIMultiDict, CIMultiDictProxy, istr 

22from typing_extensions import Final 

23from yarl import URL 

24 

25from . import hdrs 

26from .base_protocol import BaseProtocol 

27from .compression_utils import HAS_BROTLI, BrotliDecompressor, ZLibDecompressor 

28from .helpers import NO_EXTENSIONS, BaseTimerContext 

29from .http_exceptions import ( 

30 BadHttpMessage, 

31 BadStatusLine, 

32 ContentEncodingError, 

33 ContentLengthError, 

34 InvalidHeader, 

35 LineTooLong, 

36 TransferEncodingError, 

37) 

38from .http_writer import HttpVersion, HttpVersion10 

39from .log import internal_logger 

40from .streams import EMPTY_PAYLOAD, StreamReader 

41from .typedefs import RawHeaders 

42 

43__all__ = ( 

44 "HeadersParser", 

45 "HttpParser", 

46 "HttpRequestParser", 

47 "HttpResponseParser", 

48 "RawRequestMessage", 

49 "RawResponseMessage", 

50) 

51 

52ASCIISET: Final[Set[str]] = set(string.printable) 

53 

54# See https://tools.ietf.org/html/rfc7230#section-3.1.1 

55# and https://tools.ietf.org/html/rfc7230#appendix-B 

56# 

57# method = token 

58# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." / 

59# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA 

60# token = 1*tchar 

61METHRE: Final[Pattern[str]] = re.compile(r"[!#$%&'*+\-.^_`|~0-9A-Za-z]+") 

62VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d+).(\d+)") 

63HDRRE: Final[Pattern[bytes]] = re.compile(rb"[\x00-\x1F\x7F()<>@,;:\[\]={} \t\\\\\"]") 

64 

65 

66class RawRequestMessage(NamedTuple): 

67 method: str 

68 path: str 

69 version: HttpVersion 

70 headers: CIMultiDictProxy[str] 

71 raw_headers: RawHeaders 

72 should_close: bool 

73 compression: Optional[str] 

74 upgrade: bool 

75 chunked: bool 

76 url: URL 

77 

78 

79RawResponseMessage = collections.namedtuple( 

80 "RawResponseMessage", 

81 [ 

82 "version", 

83 "code", 

84 "reason", 

85 "headers", 

86 "raw_headers", 

87 "should_close", 

88 "compression", 

89 "upgrade", 

90 "chunked", 

91 ], 

92) 

93 

94 

95_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage) 

96 

97 

98class ParseState(IntEnum): 

99 PARSE_NONE = 0 

100 PARSE_LENGTH = 1 

101 PARSE_CHUNKED = 2 

102 PARSE_UNTIL_EOF = 3 

103 

104 

105class ChunkState(IntEnum): 

106 PARSE_CHUNKED_SIZE = 0 

107 PARSE_CHUNKED_CHUNK = 1 

108 PARSE_CHUNKED_CHUNK_EOF = 2 

109 PARSE_MAYBE_TRAILERS = 3 

110 PARSE_TRAILERS = 4 

111 

112 

113class HeadersParser: 

114 def __init__( 

115 self, 

116 max_line_size: int = 8190, 

117 max_field_size: int = 8190, 

118 ) -> None: 

119 self.max_line_size = max_line_size 

120 self.max_field_size = max_field_size 

121 

122 def parse_headers( 

123 self, lines: List[bytes] 

124 ) -> Tuple["CIMultiDictProxy[str]", RawHeaders]: 

125 headers: CIMultiDict[str] = CIMultiDict() 

126 raw_headers = [] 

127 

128 lines_idx = 1 

129 line = lines[1] 

130 line_count = len(lines) 

131 

132 while line: 

133 # Parse initial header name : value pair. 

134 try: 

135 bname, bvalue = line.split(b":", 1) 

136 except ValueError: 

137 raise InvalidHeader(line) from None 

138 

139 bname = bname.strip(b" \t") 

140 bvalue = bvalue.lstrip() 

141 if HDRRE.search(bname): 

142 raise InvalidHeader(bname) 

143 if len(bname) > self.max_field_size: 

144 raise LineTooLong( 

145 "request header name {}".format( 

146 bname.decode("utf8", "backslashreplace") 

147 ), 

148 str(self.max_field_size), 

149 str(len(bname)), 

150 ) 

151 

152 header_length = len(bvalue) 

153 

154 # next line 

155 lines_idx += 1 

156 line = lines[lines_idx] 

157 

158 # consume continuation lines 

159 continuation = line and line[0] in (32, 9) # (' ', '\t') 

160 

161 if continuation: 

162 bvalue_lst = [bvalue] 

163 while continuation: 

164 header_length += len(line) 

165 if header_length > self.max_field_size: 

166 raise LineTooLong( 

167 "request header field {}".format( 

168 bname.decode("utf8", "backslashreplace") 

169 ), 

170 str(self.max_field_size), 

171 str(header_length), 

172 ) 

173 bvalue_lst.append(line) 

174 

175 # next line 

176 lines_idx += 1 

177 if lines_idx < line_count: 

178 line = lines[lines_idx] 

179 if line: 

180 continuation = line[0] in (32, 9) # (' ', '\t') 

181 else: 

182 line = b"" 

183 break 

184 bvalue = b"".join(bvalue_lst) 

185 else: 

186 if header_length > self.max_field_size: 

187 raise LineTooLong( 

188 "request header field {}".format( 

189 bname.decode("utf8", "backslashreplace") 

190 ), 

191 str(self.max_field_size), 

192 str(header_length), 

193 ) 

194 

195 bvalue = bvalue.strip() 

196 name = bname.decode("utf-8", "surrogateescape") 

197 value = bvalue.decode("utf-8", "surrogateescape") 

198 

199 headers.add(name, value) 

200 raw_headers.append((bname, bvalue)) 

201 

202 return (CIMultiDictProxy(headers), tuple(raw_headers)) 

203 

204 

205class HttpParser(abc.ABC, Generic[_MsgT]): 

206 def __init__( 

207 self, 

208 protocol: BaseProtocol, 

209 loop: asyncio.AbstractEventLoop, 

210 limit: int, 

211 max_line_size: int = 8190, 

212 max_field_size: int = 8190, 

213 timer: Optional[BaseTimerContext] = None, 

214 code: Optional[int] = None, 

215 method: Optional[str] = None, 

216 readall: bool = False, 

217 payload_exception: Optional[Type[BaseException]] = None, 

218 response_with_body: bool = True, 

219 read_until_eof: bool = False, 

220 auto_decompress: bool = True, 

221 ) -> None: 

222 self.protocol = protocol 

223 self.loop = loop 

224 self.max_line_size = max_line_size 

225 self.max_field_size = max_field_size 

226 self.timer = timer 

227 self.code = code 

228 self.method = method 

229 self.readall = readall 

230 self.payload_exception = payload_exception 

231 self.response_with_body = response_with_body 

232 self.read_until_eof = read_until_eof 

233 

234 self._lines: List[bytes] = [] 

235 self._tail = b"" 

236 self._upgraded = False 

237 self._payload = None 

238 self._payload_parser: Optional[HttpPayloadParser] = None 

239 self._auto_decompress = auto_decompress 

240 self._limit = limit 

241 self._headers_parser = HeadersParser(max_line_size, max_field_size) 

242 

243 @abc.abstractmethod 

244 def parse_message(self, lines: List[bytes]) -> _MsgT: 

245 pass 

246 

247 def feed_eof(self) -> Optional[_MsgT]: 

248 if self._payload_parser is not None: 

249 self._payload_parser.feed_eof() 

250 self._payload_parser = None 

251 else: 

252 # try to extract partial message 

253 if self._tail: 

254 self._lines.append(self._tail) 

255 

256 if self._lines: 

257 if self._lines[-1] != "\r\n": 

258 self._lines.append(b"") 

259 with suppress(Exception): 

260 return self.parse_message(self._lines) 

261 return None 

262 

263 def feed_data( 

264 self, 

265 data: bytes, 

266 SEP: bytes = b"\r\n", 

267 EMPTY: bytes = b"", 

268 CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH, 

269 METH_CONNECT: str = hdrs.METH_CONNECT, 

270 SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1, 

271 ) -> Tuple[List[Tuple[_MsgT, StreamReader]], bool, bytes]: 

272 messages = [] 

273 

274 if self._tail: 

275 data, self._tail = self._tail + data, b"" 

276 

277 data_len = len(data) 

278 start_pos = 0 

279 loop = self.loop 

280 

281 while start_pos < data_len: 

282 # read HTTP message (request/response line + headers), \r\n\r\n 

283 # and split by lines 

284 if self._payload_parser is None and not self._upgraded: 

285 pos = data.find(SEP, start_pos) 

286 # consume \r\n 

287 if pos == start_pos and not self._lines: 

288 start_pos = pos + 2 

289 continue 

290 

291 if pos >= start_pos: 

292 # line found 

293 self._lines.append(data[start_pos:pos]) 

294 start_pos = pos + 2 

295 

296 # \r\n\r\n found 

297 if self._lines[-1] == EMPTY: 

298 try: 

299 msg: _MsgT = self.parse_message(self._lines) 

300 finally: 

301 self._lines.clear() 

302 

303 def get_content_length() -> Optional[int]: 

304 # payload length 

305 length_hdr = msg.headers.get(CONTENT_LENGTH) 

306 if length_hdr is None: 

307 return None 

308 

309 try: 

310 length = int(length_hdr) 

311 except ValueError: 

312 raise InvalidHeader(CONTENT_LENGTH) 

313 

314 if length < 0: 

315 raise InvalidHeader(CONTENT_LENGTH) 

316 

317 return length 

318 

319 length = get_content_length() 

320 # do not support old websocket spec 

321 if SEC_WEBSOCKET_KEY1 in msg.headers: 

322 raise InvalidHeader(SEC_WEBSOCKET_KEY1) 

323 

324 self._upgraded = msg.upgrade 

325 

326 method = getattr(msg, "method", self.method) 

327 

328 assert self.protocol is not None 

329 # calculate payload 

330 if ( 

331 (length is not None and length > 0) 

332 or msg.chunked 

333 and not msg.upgrade 

334 ): 

335 payload = StreamReader( 

336 self.protocol, 

337 timer=self.timer, 

338 loop=loop, 

339 limit=self._limit, 

340 ) 

341 payload_parser = HttpPayloadParser( 

342 payload, 

343 length=length, 

344 chunked=msg.chunked, 

345 method=method, 

346 compression=msg.compression, 

347 code=self.code, 

348 readall=self.readall, 

349 response_with_body=self.response_with_body, 

350 auto_decompress=self._auto_decompress, 

351 ) 

352 if not payload_parser.done: 

353 self._payload_parser = payload_parser 

354 elif method == METH_CONNECT: 

355 assert isinstance(msg, RawRequestMessage) 

356 payload = StreamReader( 

357 self.protocol, 

358 timer=self.timer, 

359 loop=loop, 

360 limit=self._limit, 

361 ) 

362 self._upgraded = True 

363 self._payload_parser = HttpPayloadParser( 

364 payload, 

365 method=msg.method, 

366 compression=msg.compression, 

367 readall=True, 

368 auto_decompress=self._auto_decompress, 

369 ) 

370 else: 

371 if ( 

372 getattr(msg, "code", 100) >= 199 

373 and length is None 

374 and self.read_until_eof 

375 ): 

376 payload = StreamReader( 

377 self.protocol, 

378 timer=self.timer, 

379 loop=loop, 

380 limit=self._limit, 

381 ) 

382 payload_parser = HttpPayloadParser( 

383 payload, 

384 length=length, 

385 chunked=msg.chunked, 

386 method=method, 

387 compression=msg.compression, 

388 code=self.code, 

389 readall=True, 

390 response_with_body=self.response_with_body, 

391 auto_decompress=self._auto_decompress, 

392 ) 

393 if not payload_parser.done: 

394 self._payload_parser = payload_parser 

395 else: 

396 payload = EMPTY_PAYLOAD 

397 

398 messages.append((msg, payload)) 

399 else: 

400 self._tail = data[start_pos:] 

401 data = EMPTY 

402 break 

403 

404 # no parser, just store 

405 elif self._payload_parser is None and self._upgraded: 

406 assert not self._lines 

407 break 

408 

409 # feed payload 

410 elif data and start_pos < data_len: 

411 assert not self._lines 

412 assert self._payload_parser is not None 

413 try: 

414 eof, data = self._payload_parser.feed_data(data[start_pos:]) 

415 except BaseException as exc: 

416 if self.payload_exception is not None: 

417 self._payload_parser.payload.set_exception( 

418 self.payload_exception(str(exc)) 

419 ) 

420 else: 

421 self._payload_parser.payload.set_exception(exc) 

422 

423 eof = True 

424 data = b"" 

425 

426 if eof: 

427 start_pos = 0 

428 data_len = len(data) 

429 self._payload_parser = None 

430 continue 

431 else: 

432 break 

433 

434 if data and start_pos < data_len: 

435 data = data[start_pos:] 

436 else: 

437 data = EMPTY 

438 

439 return messages, self._upgraded, data 

440 

441 def parse_headers( 

442 self, lines: List[bytes] 

443 ) -> Tuple[ 

444 "CIMultiDictProxy[str]", RawHeaders, Optional[bool], Optional[str], bool, bool 

445 ]: 

446 """Parses RFC 5322 headers from a stream. 

447 

448 Line continuations are supported. Returns list of header name 

449 and value pairs. Header name is in upper case. 

450 """ 

451 headers, raw_headers = self._headers_parser.parse_headers(lines) 

452 close_conn = None 

453 encoding = None 

454 upgrade = False 

455 chunked = False 

456 

457 # keep-alive 

458 conn = headers.get(hdrs.CONNECTION) 

459 if conn: 

460 v = conn.lower() 

461 if v == "close": 

462 close_conn = True 

463 elif v == "keep-alive": 

464 close_conn = False 

465 elif v == "upgrade": 

466 upgrade = True 

467 

468 # encoding 

469 enc = headers.get(hdrs.CONTENT_ENCODING) 

470 if enc: 

471 enc = enc.lower() 

472 if enc in ("gzip", "deflate", "br"): 

473 encoding = enc 

474 

475 # chunking 

476 te = headers.get(hdrs.TRANSFER_ENCODING) 

477 if te is not None: 

478 if "chunked" == te.lower(): 

479 chunked = True 

480 else: 

481 raise BadHttpMessage("Request has invalid `Transfer-Encoding`") 

482 

483 if hdrs.CONTENT_LENGTH in headers: 

484 raise BadHttpMessage( 

485 "Content-Length can't be present with Transfer-Encoding", 

486 ) 

487 

488 return (headers, raw_headers, close_conn, encoding, upgrade, chunked) 

489 

490 def set_upgraded(self, val: bool) -> None: 

491 """Set connection upgraded (to websocket) mode. 

492 

493 :param bool val: new state. 

494 """ 

495 self._upgraded = val 

496 

497 

498class HttpRequestParser(HttpParser[RawRequestMessage]): 

499 """Read request status line. 

500 

501 Exception .http_exceptions.BadStatusLine 

502 could be raised in case of any errors in status line. 

503 Returns RawRequestMessage. 

504 """ 

505 

506 def parse_message(self, lines: List[bytes]) -> RawRequestMessage: 

507 # request line 

508 line = lines[0].decode("utf-8", "surrogateescape") 

509 try: 

510 method, path, version = line.split(None, 2) 

511 except ValueError: 

512 raise BadStatusLine(line) from None 

513 

514 if len(path) > self.max_line_size: 

515 raise LineTooLong( 

516 "Status line is too long", str(self.max_line_size), str(len(path)) 

517 ) 

518 

519 # method 

520 if not METHRE.match(method): 

521 raise BadStatusLine(method) 

522 

523 # version 

524 try: 

525 if version.startswith("HTTP/"): 

526 n1, n2 = version[5:].split(".", 1) 

527 version_o = HttpVersion(int(n1), int(n2)) 

528 else: 

529 raise BadStatusLine(version) 

530 except Exception: 

531 raise BadStatusLine(version) 

532 

533 if method == "CONNECT": 

534 # authority-form, 

535 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3 

536 url = URL.build(authority=path, encoded=True) 

537 elif path.startswith("/"): 

538 # origin-form, 

539 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1 

540 path_part, _hash_separator, url_fragment = path.partition("#") 

541 path_part, _question_mark_separator, qs_part = path_part.partition("?") 

542 

543 # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based 

544 # NOTE: parser does, otherwise it results into the same 

545 # NOTE: HTTP Request-Line input producing different 

546 # NOTE: `yarl.URL()` objects 

547 url = URL.build( 

548 path=path_part, 

549 query_string=qs_part, 

550 fragment=url_fragment, 

551 encoded=True, 

552 ) 

553 else: 

554 # absolute-form for proxy maybe, 

555 # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2 

556 url = URL(path, encoded=True) 

557 

558 # read headers 

559 ( 

560 headers, 

561 raw_headers, 

562 close, 

563 compression, 

564 upgrade, 

565 chunked, 

566 ) = self.parse_headers(lines) 

567 

568 if close is None: # then the headers weren't set in the request 

569 if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close 

570 close = True 

571 else: # HTTP 1.1 must ask to close. 

572 close = False 

573 

574 return RawRequestMessage( 

575 method, 

576 path, 

577 version_o, 

578 headers, 

579 raw_headers, 

580 close, 

581 compression, 

582 upgrade, 

583 chunked, 

584 url, 

585 ) 

586 

587 

588class HttpResponseParser(HttpParser[RawResponseMessage]): 

589 """Read response status line and headers. 

590 

591 BadStatusLine could be raised in case of any errors in status line. 

592 Returns RawResponseMessage. 

593 """ 

594 

595 def parse_message(self, lines: List[bytes]) -> RawResponseMessage: 

596 line = lines[0].decode("utf-8", "surrogateescape") 

597 try: 

598 version, status = line.split(None, 1) 

599 except ValueError: 

600 raise BadStatusLine(line) from None 

601 

602 try: 

603 status, reason = status.split(None, 1) 

604 except ValueError: 

605 reason = "" 

606 

607 if len(reason) > self.max_line_size: 

608 raise LineTooLong( 

609 "Status line is too long", str(self.max_line_size), str(len(reason)) 

610 ) 

611 

612 # version 

613 match = VERSRE.match(version) 

614 if match is None: 

615 raise BadStatusLine(line) 

616 version_o = HttpVersion(int(match.group(1)), int(match.group(2))) 

617 

618 # The status code is a three-digit number 

619 try: 

620 status_i = int(status) 

621 except ValueError: 

622 raise BadStatusLine(line) from None 

623 

624 if status_i > 999: 

625 raise BadStatusLine(line) 

626 

627 # read headers 

628 ( 

629 headers, 

630 raw_headers, 

631 close, 

632 compression, 

633 upgrade, 

634 chunked, 

635 ) = self.parse_headers(lines) 

636 

637 if close is None: 

638 close = version_o <= HttpVersion10 

639 

640 return RawResponseMessage( 

641 version_o, 

642 status_i, 

643 reason.strip(), 

644 headers, 

645 raw_headers, 

646 close, 

647 compression, 

648 upgrade, 

649 chunked, 

650 ) 

651 

652 

653class HttpPayloadParser: 

654 def __init__( 

655 self, 

656 payload: StreamReader, 

657 length: Optional[int] = None, 

658 chunked: bool = False, 

659 compression: Optional[str] = None, 

660 code: Optional[int] = None, 

661 method: Optional[str] = None, 

662 readall: bool = False, 

663 response_with_body: bool = True, 

664 auto_decompress: bool = True, 

665 ) -> None: 

666 self._length = 0 

667 self._type = ParseState.PARSE_NONE 

668 self._chunk = ChunkState.PARSE_CHUNKED_SIZE 

669 self._chunk_size = 0 

670 self._chunk_tail = b"" 

671 self._auto_decompress = auto_decompress 

672 self.done = False 

673 

674 # payload decompression wrapper 

675 if response_with_body and compression and self._auto_decompress: 

676 real_payload: Union[StreamReader, DeflateBuffer] = DeflateBuffer( 

677 payload, compression 

678 ) 

679 else: 

680 real_payload = payload 

681 

682 # payload parser 

683 if not response_with_body: 

684 # don't parse payload if it's not expected to be received 

685 self._type = ParseState.PARSE_NONE 

686 real_payload.feed_eof() 

687 self.done = True 

688 

689 elif chunked: 

690 self._type = ParseState.PARSE_CHUNKED 

691 elif length is not None: 

692 self._type = ParseState.PARSE_LENGTH 

693 self._length = length 

694 if self._length == 0: 

695 real_payload.feed_eof() 

696 self.done = True 

697 else: 

698 if readall and code != 204: 

699 self._type = ParseState.PARSE_UNTIL_EOF 

700 elif method in ("PUT", "POST"): 

701 internal_logger.warning( # pragma: no cover 

702 "Content-Length or Transfer-Encoding header is required" 

703 ) 

704 self._type = ParseState.PARSE_NONE 

705 real_payload.feed_eof() 

706 self.done = True 

707 

708 self.payload = real_payload 

709 

710 def feed_eof(self) -> None: 

711 if self._type == ParseState.PARSE_UNTIL_EOF: 

712 self.payload.feed_eof() 

713 elif self._type == ParseState.PARSE_LENGTH: 

714 raise ContentLengthError( 

715 "Not enough data for satisfy content length header." 

716 ) 

717 elif self._type == ParseState.PARSE_CHUNKED: 

718 raise TransferEncodingError( 

719 "Not enough data for satisfy transfer length header." 

720 ) 

721 

722 def feed_data( 

723 self, chunk: bytes, SEP: bytes = b"\r\n", CHUNK_EXT: bytes = b";" 

724 ) -> Tuple[bool, bytes]: 

725 # Read specified amount of bytes 

726 if self._type == ParseState.PARSE_LENGTH: 

727 required = self._length 

728 chunk_len = len(chunk) 

729 

730 if required >= chunk_len: 

731 self._length = required - chunk_len 

732 self.payload.feed_data(chunk, chunk_len) 

733 if self._length == 0: 

734 self.payload.feed_eof() 

735 return True, b"" 

736 else: 

737 self._length = 0 

738 self.payload.feed_data(chunk[:required], required) 

739 self.payload.feed_eof() 

740 return True, chunk[required:] 

741 

742 # Chunked transfer encoding parser 

743 elif self._type == ParseState.PARSE_CHUNKED: 

744 if self._chunk_tail: 

745 chunk = self._chunk_tail + chunk 

746 self._chunk_tail = b"" 

747 

748 while chunk: 

749 # read next chunk size 

750 if self._chunk == ChunkState.PARSE_CHUNKED_SIZE: 

751 pos = chunk.find(SEP) 

752 if pos >= 0: 

753 i = chunk.find(CHUNK_EXT, 0, pos) 

754 if i >= 0: 

755 size_b = chunk[:i] # strip chunk-extensions 

756 else: 

757 size_b = chunk[:pos] 

758 

759 try: 

760 size = int(bytes(size_b), 16) 

761 except ValueError: 

762 exc = TransferEncodingError( 

763 chunk[:pos].decode("ascii", "surrogateescape") 

764 ) 

765 self.payload.set_exception(exc) 

766 raise exc from None 

767 

768 chunk = chunk[pos + 2 :] 

769 if size == 0: # eof marker 

770 self._chunk = ChunkState.PARSE_MAYBE_TRAILERS 

771 else: 

772 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK 

773 self._chunk_size = size 

774 self.payload.begin_http_chunk_receiving() 

775 else: 

776 self._chunk_tail = chunk 

777 return False, b"" 

778 

779 # read chunk and feed buffer 

780 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK: 

781 required = self._chunk_size 

782 chunk_len = len(chunk) 

783 

784 if required > chunk_len: 

785 self._chunk_size = required - chunk_len 

786 self.payload.feed_data(chunk, chunk_len) 

787 return False, b"" 

788 else: 

789 self._chunk_size = 0 

790 self.payload.feed_data(chunk[:required], required) 

791 chunk = chunk[required:] 

792 self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF 

793 self.payload.end_http_chunk_receiving() 

794 

795 # toss the CRLF at the end of the chunk 

796 if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF: 

797 if chunk[:2] == SEP: 

798 chunk = chunk[2:] 

799 self._chunk = ChunkState.PARSE_CHUNKED_SIZE 

800 else: 

801 self._chunk_tail = chunk 

802 return False, b"" 

803 

804 # if stream does not contain trailer, after 0\r\n 

805 # we should get another \r\n otherwise 

806 # trailers needs to be skiped until \r\n\r\n 

807 if self._chunk == ChunkState.PARSE_MAYBE_TRAILERS: 

808 head = chunk[:2] 

809 if head == SEP: 

810 # end of stream 

811 self.payload.feed_eof() 

812 return True, chunk[2:] 

813 # Both CR and LF, or only LF may not be received yet. It is 

814 # expected that CRLF or LF will be shown at the very first 

815 # byte next time, otherwise trailers should come. The last 

816 # CRLF which marks the end of response might not be 

817 # contained in the same TCP segment which delivered the 

818 # size indicator. 

819 if not head: 

820 return False, b"" 

821 if head == SEP[:1]: 

822 self._chunk_tail = head 

823 return False, b"" 

824 self._chunk = ChunkState.PARSE_TRAILERS 

825 

826 # read and discard trailer up to the CRLF terminator 

827 if self._chunk == ChunkState.PARSE_TRAILERS: 

828 pos = chunk.find(SEP) 

829 if pos >= 0: 

830 chunk = chunk[pos + 2 :] 

831 self._chunk = ChunkState.PARSE_MAYBE_TRAILERS 

832 else: 

833 self._chunk_tail = chunk 

834 return False, b"" 

835 

836 # Read all bytes until eof 

837 elif self._type == ParseState.PARSE_UNTIL_EOF: 

838 self.payload.feed_data(chunk, len(chunk)) 

839 

840 return False, b"" 

841 

842 

843class DeflateBuffer: 

844 """DeflateStream decompress stream and feed data into specified stream.""" 

845 

846 def __init__(self, out: StreamReader, encoding: Optional[str]) -> None: 

847 self.out = out 

848 self.size = 0 

849 self.encoding = encoding 

850 self._started_decoding = False 

851 

852 self.decompressor: Union[BrotliDecompressor, ZLibDecompressor] 

853 if encoding == "br": 

854 if not HAS_BROTLI: # pragma: no cover 

855 raise ContentEncodingError( 

856 "Can not decode content-encoding: brotli (br). " 

857 "Please install `Brotli`" 

858 ) 

859 self.decompressor = BrotliDecompressor() 

860 else: 

861 self.decompressor = ZLibDecompressor(encoding=encoding) 

862 

863 def set_exception(self, exc: BaseException) -> None: 

864 self.out.set_exception(exc) 

865 

866 def feed_data(self, chunk: bytes, size: int) -> None: 

867 if not size: 

868 return 

869 

870 self.size += size 

871 

872 # RFC1950 

873 # bits 0..3 = CM = 0b1000 = 8 = "deflate" 

874 # bits 4..7 = CINFO = 1..7 = windows size. 

875 if ( 

876 not self._started_decoding 

877 and self.encoding == "deflate" 

878 and chunk[0] & 0xF != 8 

879 ): 

880 # Change the decoder to decompress incorrectly compressed data 

881 # Actually we should issue a warning about non-RFC-compliant data. 

882 self.decompressor = ZLibDecompressor( 

883 encoding=self.encoding, suppress_deflate_header=True 

884 ) 

885 

886 try: 

887 chunk = self.decompressor.decompress_sync(chunk) 

888 except Exception: 

889 raise ContentEncodingError( 

890 "Can not decode content-encoding: %s" % self.encoding 

891 ) 

892 

893 self._started_decoding = True 

894 

895 if chunk: 

896 self.out.feed_data(chunk, len(chunk)) 

897 

898 def feed_eof(self) -> None: 

899 chunk = self.decompressor.flush() 

900 

901 if chunk or self.size > 0: 

902 self.out.feed_data(chunk, len(chunk)) 

903 if self.encoding == "deflate" and not self.decompressor.eof: # type: ignore 

904 raise ContentEncodingError("deflate") 

905 

906 self.out.feed_eof() 

907 

908 def begin_http_chunk_receiving(self) -> None: 

909 self.out.begin_http_chunk_receiving() 

910 

911 def end_http_chunk_receiving(self) -> None: 

912 self.out.end_http_chunk_receiving() 

913 

914 

915HttpRequestParserPy = HttpRequestParser 

916HttpResponseParserPy = HttpResponseParser 

917RawRequestMessagePy = RawRequestMessage 

918RawResponseMessagePy = RawResponseMessage 

919 

920try: 

921 if not NO_EXTENSIONS: 

922 from ._http_parser import ( # type: ignore[import,no-redef] 

923 HttpRequestParser, 

924 HttpResponseParser, 

925 RawRequestMessage, 

926 RawResponseMessage, 

927 ) 

928 

929 HttpRequestParserC = HttpRequestParser 

930 HttpResponseParserC = HttpResponseParser 

931 RawRequestMessageC = RawRequestMessage 

932 RawResponseMessageC = RawResponseMessage 

933except ImportError: # pragma: no cover 

934 pass