Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 26%

651 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-26 06:16 +0000

1import asyncio 

2import codecs 

3import contextlib 

4import dataclasses 

5import functools 

6import io 

7import re 

8import sys 

9import traceback 

10import warnings 

11from hashlib import md5, sha1, sha256 

12from http.cookies import CookieError, Morsel, SimpleCookie 

13from types import MappingProxyType, TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Callable, 

18 Dict, 

19 Iterable, 

20 List, 

21 Mapping, 

22 Optional, 

23 Tuple, 

24 Type, 

25 Union, 

26 cast, 

27) 

28 

29from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

30from yarl import URL 

31 

32from . import hdrs, helpers, http, multipart, payload 

33from .abc import AbstractStreamWriter 

34from .client_exceptions import ( 

35 ClientConnectionError, 

36 ClientOSError, 

37 ClientResponseError, 

38 ContentTypeError, 

39 InvalidURL, 

40 ServerFingerprintMismatch, 

41) 

42from .compression_utils import HAS_BROTLI 

43from .formdata import FormData 

44from .hdrs import CONTENT_TYPE 

45from .helpers import ( 

46 BaseTimerContext, 

47 BasicAuth, 

48 HeadersMixin, 

49 TimerNoop, 

50 basicauth_from_netrc, 

51 is_expected_content_type, 

52 netrc_from_env, 

53 noop, 

54 parse_mimetype, 

55 reify, 

56 set_result, 

57) 

58from .http import ( 

59 SERVER_SOFTWARE, 

60 HttpVersion, 

61 HttpVersion10, 

62 HttpVersion11, 

63 StreamWriter, 

64) 

65from .log import client_logger 

66from .streams import StreamReader 

67from .typedefs import ( 

68 DEFAULT_JSON_DECODER, 

69 JSONDecoder, 

70 LooseCookies, 

71 LooseHeaders, 

72 RawHeaders, 

73) 

74 

75try: 

76 import ssl 

77 from ssl import SSLContext 

78except ImportError: # pragma: no cover 

79 ssl = None # type: ignore[assignment] 

80 SSLContext = object # type: ignore[misc,assignment] 

81 

82 

83__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

84 

85 

86if TYPE_CHECKING: 

87 from .client import ClientSession 

88 from .connector import Connection 

89 from .tracing import Trace 

90 

91 

92_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

93 

94 

95def _gen_default_accept_encoding() -> str: 

96 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate" 

97 

98 

99@dataclasses.dataclass(frozen=True) 

100class ContentDisposition: 

101 type: Optional[str] 

102 parameters: "MappingProxyType[str, str]" 

103 filename: Optional[str] 

104 

105 

106@dataclasses.dataclass(frozen=True) 

107class RequestInfo: 

108 url: URL 

109 method: str 

110 headers: "CIMultiDictProxy[str]" 

111 real_url: URL 

112 

113 

114class Fingerprint: 

115 HASHFUNC_BY_DIGESTLEN = { 

116 16: md5, 

117 20: sha1, 

118 32: sha256, 

119 } 

120 

121 def __init__(self, fingerprint: bytes) -> None: 

122 digestlen = len(fingerprint) 

123 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

124 if not hashfunc: 

125 raise ValueError("fingerprint has invalid length") 

126 elif hashfunc is md5 or hashfunc is sha1: 

127 raise ValueError( 

128 "md5 and sha1 are insecure and " "not supported. Use sha256." 

129 ) 

130 self._hashfunc = hashfunc 

131 self._fingerprint = fingerprint 

132 

133 @property 

134 def fingerprint(self) -> bytes: 

135 return self._fingerprint 

136 

137 def check(self, transport: asyncio.Transport) -> None: 

138 if not transport.get_extra_info("sslcontext"): 

139 return 

140 sslobj = transport.get_extra_info("ssl_object") 

141 cert = sslobj.getpeercert(binary_form=True) 

142 got = self._hashfunc(cert).digest() 

143 if got != self._fingerprint: 

144 host, port, *_ = transport.get_extra_info("peername") 

145 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

146 

147 

148if ssl is not None: 

149 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint) 

150else: # pragma: no cover 

151 SSL_ALLOWED_TYPES = (bool,) 

152 

153 

154@dataclasses.dataclass(frozen=True) 

155class ConnectionKey: 

156 # the key should contain an information about used proxy / TLS 

157 # to prevent reusing wrong connections from a pool 

158 host: str 

159 port: Optional[int] 

160 is_ssl: bool 

161 ssl: Union[SSLContext, bool, Fingerprint] 

162 proxy: Optional[URL] 

163 proxy_auth: Optional[BasicAuth] 

164 proxy_headers_hash: Optional[int] # hash(CIMultiDict) 

165 

166 

167class ClientRequest: 

168 GET_METHODS = { 

169 hdrs.METH_GET, 

170 hdrs.METH_HEAD, 

171 hdrs.METH_OPTIONS, 

172 hdrs.METH_TRACE, 

173 } 

174 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

175 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 

176 

177 DEFAULT_HEADERS = { 

178 hdrs.ACCEPT: "*/*", 

179 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

180 } 

181 

182 body = b"" 

183 auth = None 

184 response = None 

185 

186 __writer = None # async task for streaming data 

187 _continue = None # waiter future for '100 Continue' response 

188 

189 # N.B. 

190 # Adding __del__ method with self._writer closing doesn't make sense 

191 # because _writer is instance method, thus it keeps a reference to self. 

192 # Until writer has finished finalizer will not be called. 

193 

194 def __init__( 

195 self, 

196 method: str, 

197 url: URL, 

198 *, 

199 params: Optional[Mapping[str, str]] = None, 

200 headers: Optional[LooseHeaders] = None, 

201 skip_auto_headers: Iterable[str] = frozenset(), 

202 data: Any = None, 

203 cookies: Optional[LooseCookies] = None, 

204 auth: Optional[BasicAuth] = None, 

205 version: http.HttpVersion = http.HttpVersion11, 

206 compress: Optional[str] = None, 

207 chunked: Optional[bool] = None, 

208 expect100: bool = False, 

209 loop: asyncio.AbstractEventLoop, 

210 response_class: Optional[Type["ClientResponse"]] = None, 

211 proxy: Optional[URL] = None, 

212 proxy_auth: Optional[BasicAuth] = None, 

213 timer: Optional[BaseTimerContext] = None, 

214 session: Optional["ClientSession"] = None, 

215 ssl: Union[SSLContext, bool, Fingerprint] = True, 

216 proxy_headers: Optional[LooseHeaders] = None, 

217 traces: Optional[List["Trace"]] = None, 

218 trust_env: bool = False, 

219 server_hostname: Optional[str] = None, 

220 ): 

221 match = _CONTAINS_CONTROL_CHAR_RE.search(method) 

222 if match: 

223 raise ValueError( 

224 f"Method cannot contain non-token characters {method!r} " 

225 f"(found at least {match.group()!r})" 

226 ) 

227 assert isinstance(url, URL), url 

228 assert isinstance(proxy, (URL, type(None))), proxy 

229 # FIXME: session is None in tests only, need to fix tests 

230 # assert session is not None 

231 self._session = cast("ClientSession", session) 

232 if params: 

233 q = MultiDict(url.query) 

234 url2 = url.with_query(params) 

235 q.extend(url2.query) 

236 url = url.with_query(q) 

237 self.original_url = url 

238 self.url = url.with_fragment(None) 

239 self.method = method.upper() 

240 self.chunked = chunked 

241 self.compress = compress 

242 self.loop = loop 

243 self.length = None 

244 if response_class is None: 

245 real_response_class = ClientResponse 

246 else: 

247 real_response_class = response_class 

248 self.response_class: Type[ClientResponse] = real_response_class 

249 self._timer = timer if timer is not None else TimerNoop() 

250 self._ssl = ssl 

251 self.server_hostname = server_hostname 

252 

253 if loop.get_debug(): 

254 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

255 

256 self.update_version(version) 

257 self.update_host(url) 

258 self.update_headers(headers) 

259 self.update_auto_headers(skip_auto_headers) 

260 self.update_cookies(cookies) 

261 self.update_content_encoding(data) 

262 self.update_auth(auth, trust_env) 

263 self.update_proxy(proxy, proxy_auth, proxy_headers) 

264 

265 self.update_body_from_data(data) 

266 if data is not None or self.method not in self.GET_METHODS: 

267 self.update_transfer_encoding() 

268 self.update_expect_continue(expect100) 

269 if traces is None: 

270 traces = [] 

271 self._traces = traces 

272 

273 def __reset_writer(self, _: object = None) -> None: 

274 self.__writer = None 

275 

276 @property 

277 def _writer(self) -> Optional["asyncio.Task[None]"]: 

278 return self.__writer 

279 

280 @_writer.setter 

281 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 

282 if self.__writer is not None: 

283 self.__writer.remove_done_callback(self.__reset_writer) 

284 self.__writer = writer 

285 if writer is not None: 

286 writer.add_done_callback(self.__reset_writer) 

287 

288 def is_ssl(self) -> bool: 

289 return self.url.scheme in ("https", "wss") 

290 

291 @property 

292 def ssl(self) -> Union["SSLContext", bool, Fingerprint]: 

293 return self._ssl 

294 

295 @property 

296 def connection_key(self) -> ConnectionKey: 

297 proxy_headers = self.proxy_headers 

298 if proxy_headers: 

299 h: Optional[int] = hash(tuple((k, v) for k, v in proxy_headers.items())) 

300 else: 

301 h = None 

302 return ConnectionKey( 

303 self.host, 

304 self.port, 

305 self.is_ssl(), 

306 self.ssl, 

307 self.proxy, 

308 self.proxy_auth, 

309 h, 

310 ) 

311 

312 @property 

313 def host(self) -> str: 

314 ret = self.url.raw_host 

315 assert ret is not None 

316 return ret 

317 

318 @property 

319 def port(self) -> Optional[int]: 

320 return self.url.port 

321 

322 @property 

323 def request_info(self) -> RequestInfo: 

324 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 

325 return RequestInfo(self.url, self.method, headers, self.original_url) 

326 

327 def update_host(self, url: URL) -> None: 

328 """Update destination host, port and connection type (ssl).""" 

329 # get host/port 

330 if not url.raw_host: 

331 raise InvalidURL(url) 

332 

333 # basic auth info 

334 username, password = url.user, url.password 

335 if username: 

336 self.auth = helpers.BasicAuth(username, password or "") 

337 

338 def update_version(self, version: Union[http.HttpVersion, str]) -> None: 

339 """Convert request version to two elements tuple. 

340 

341 parser HTTP version '1.1' => (1, 1) 

342 """ 

343 if isinstance(version, str): 

344 v = [part.strip() for part in version.split(".", 1)] 

345 try: 

346 version = http.HttpVersion(int(v[0]), int(v[1])) 

347 except ValueError: 

348 raise ValueError( 

349 f"Can not parse http version number: {version}" 

350 ) from None 

351 self.version = version 

352 

353 def update_headers(self, headers: Optional[LooseHeaders]) -> None: 

354 """Update request headers.""" 

355 self.headers: CIMultiDict[str] = CIMultiDict() 

356 

357 # add host 

358 netloc = cast(str, self.url.raw_host) 

359 if helpers.is_ipv6_address(netloc): 

360 netloc = f"[{netloc}]" 

361 # See https://github.com/aio-libs/aiohttp/issues/3636. 

362 netloc = netloc.rstrip(".") 

363 if self.url.port is not None and not self.url.is_default_port(): 

364 netloc += ":" + str(self.url.port) 

365 self.headers[hdrs.HOST] = netloc 

366 

367 if headers: 

368 if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 

369 headers = headers.items() # type: ignore[assignment] 

370 

371 for key, value in headers: # type: ignore[misc] 

372 # A special case for Host header 

373 if key.lower() == "host": 

374 self.headers[key] = value 

375 else: 

376 self.headers.add(key, value) 

377 

378 def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None: 

379 self.skip_auto_headers = CIMultiDict( 

380 (hdr, None) for hdr in sorted(skip_auto_headers) 

381 ) 

382 used_headers = self.headers.copy() 

383 used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type] 

384 

385 for hdr, val in self.DEFAULT_HEADERS.items(): 

386 if hdr not in used_headers: 

387 self.headers.add(hdr, val) 

388 

389 if hdrs.USER_AGENT not in used_headers: 

390 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

391 

392 def update_cookies(self, cookies: Optional[LooseCookies]) -> None: 

393 """Update request cookies header.""" 

394 if not cookies: 

395 return 

396 

397 c = SimpleCookie() 

398 if hdrs.COOKIE in self.headers: 

399 c.load(self.headers.get(hdrs.COOKIE, "")) 

400 del self.headers[hdrs.COOKIE] 

401 

402 if isinstance(cookies, Mapping): 

403 iter_cookies = cookies.items() 

404 else: 

405 iter_cookies = cookies # type: ignore[assignment] 

406 for name, value in iter_cookies: 

407 if isinstance(value, Morsel): 

408 # Preserve coded_value 

409 mrsl_val = value.get(value.key, Morsel()) 

410 mrsl_val.set(value.key, value.value, value.coded_value) 

411 c[name] = mrsl_val 

412 else: 

413 c[name] = value # type: ignore[assignment] 

414 

415 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

416 

417 def update_content_encoding(self, data: Any) -> None: 

418 """Set request content encoding.""" 

419 if data is None: 

420 return 

421 

422 enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower() 

423 if enc: 

424 if self.compress: 

425 raise ValueError( 

426 "compress can not be set " "if Content-Encoding header is set" 

427 ) 

428 elif self.compress: 

429 if not isinstance(self.compress, str): 

430 self.compress = "deflate" 

431 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

432 self.chunked = True # enable chunked, no need to deal with length 

433 

434 def update_transfer_encoding(self) -> None: 

435 """Analyze transfer-encoding header.""" 

436 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

437 

438 if "chunked" in te: 

439 if self.chunked: 

440 raise ValueError( 

441 "chunked can not be set " 

442 'if "Transfer-Encoding: chunked" header is set' 

443 ) 

444 

445 elif self.chunked: 

446 if hdrs.CONTENT_LENGTH in self.headers: 

447 raise ValueError( 

448 "chunked can not be set " "if Content-Length header is set" 

449 ) 

450 

451 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

452 else: 

453 if hdrs.CONTENT_LENGTH not in self.headers: 

454 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body)) 

455 

456 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None: 

457 """Set basic auth.""" 

458 if auth is None: 

459 auth = self.auth 

460 if auth is None and trust_env and self.url.host is not None: 

461 netrc_obj = netrc_from_env() 

462 with contextlib.suppress(LookupError): 

463 auth = basicauth_from_netrc(netrc_obj, self.url.host) 

464 if auth is None: 

465 return 

466 

467 if not isinstance(auth, helpers.BasicAuth): 

468 raise TypeError("BasicAuth() tuple is required instead") 

469 

470 self.headers[hdrs.AUTHORIZATION] = auth.encode() 

471 

472 def update_body_from_data(self, body: Any) -> None: 

473 if body is None: 

474 return 

475 

476 # FormData 

477 if isinstance(body, FormData): 

478 body = body() 

479 

480 try: 

481 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None) 

482 except payload.LookupError: 

483 boundary = None 

484 if CONTENT_TYPE in self.headers: 

485 boundary = parse_mimetype(self.headers[CONTENT_TYPE]).parameters.get( 

486 "boundary" 

487 ) 

488 body = FormData(body, boundary=boundary)() 

489 

490 self.body = body 

491 

492 # enable chunked encoding if needed 

493 if not self.chunked: 

494 if hdrs.CONTENT_LENGTH not in self.headers: 

495 size = body.size 

496 if size is None: 

497 self.chunked = True 

498 else: 

499 if hdrs.CONTENT_LENGTH not in self.headers: 

500 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

501 

502 # copy payload headers 

503 assert body.headers 

504 for key, value in body.headers.items(): 

505 if key in self.headers: 

506 continue 

507 if key in self.skip_auto_headers: 

508 continue 

509 self.headers[key] = value 

510 

511 def update_expect_continue(self, expect: bool = False) -> None: 

512 if expect: 

513 self.headers[hdrs.EXPECT] = "100-continue" 

514 elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue": 

515 expect = True 

516 

517 if expect: 

518 self._continue = self.loop.create_future() 

519 

520 def update_proxy( 

521 self, 

522 proxy: Optional[URL], 

523 proxy_auth: Optional[BasicAuth], 

524 proxy_headers: Optional[LooseHeaders], 

525 ) -> None: 

526 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 

527 raise ValueError("proxy_auth must be None or BasicAuth() tuple") 

528 self.proxy = proxy 

529 self.proxy_auth = proxy_auth 

530 self.proxy_headers = proxy_headers 

531 

532 def keep_alive(self) -> bool: 

533 if self.version < HttpVersion10: 

534 # keep alive not supported at all 

535 return False 

536 if self.version == HttpVersion10: 

537 if self.headers.get(hdrs.CONNECTION) == "keep-alive": 

538 return True 

539 else: # no headers means we close for Http 1.0 

540 return False 

541 elif self.headers.get(hdrs.CONNECTION) == "close": 

542 return False 

543 

544 return True 

545 

546 async def write_bytes( 

547 self, writer: AbstractStreamWriter, conn: "Connection" 

548 ) -> None: 

549 """Support coroutines that yields bytes objects.""" 

550 # 100 response 

551 if self._continue is not None: 

552 try: 

553 await writer.drain() 

554 await self._continue 

555 except asyncio.CancelledError: 

556 return 

557 

558 protocol = conn.protocol 

559 assert protocol is not None 

560 try: 

561 if isinstance(self.body, payload.Payload): 

562 await self.body.write(writer) 

563 else: 

564 if isinstance(self.body, (bytes, bytearray)): 

565 self.body = (self.body,) # type: ignore[assignment] 

566 

567 for chunk in self.body: 

568 await writer.write(chunk) # type: ignore[arg-type] 

569 except OSError as exc: 

570 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

571 protocol.set_exception(exc) 

572 else: 

573 new_exc = ClientOSError( 

574 exc.errno, "Can not write request body for %s" % self.url 

575 ) 

576 new_exc.__context__ = exc 

577 new_exc.__cause__ = exc 

578 protocol.set_exception(new_exc) 

579 except asyncio.CancelledError: 

580 await writer.write_eof() 

581 except Exception as exc: 

582 protocol.set_exception(exc) 

583 else: 

584 await writer.write_eof() 

585 protocol.start_timeout() 

586 

587 async def send(self, conn: "Connection") -> "ClientResponse": 

588 # Specify request target: 

589 # - CONNECT request must send authority form URI 

590 # - not CONNECT proxy must send absolute form URI 

591 # - most common is origin form URI 

592 if self.method == hdrs.METH_CONNECT: 

593 connect_host = self.url.raw_host 

594 assert connect_host is not None 

595 if helpers.is_ipv6_address(connect_host): 

596 connect_host = f"[{connect_host}]" 

597 path = f"{connect_host}:{self.url.port}" 

598 elif self.proxy and not self.is_ssl(): 

599 path = str(self.url) 

600 else: 

601 path = self.url.raw_path 

602 if self.url.raw_query_string: 

603 path += "?" + self.url.raw_query_string 

604 

605 protocol = conn.protocol 

606 assert protocol is not None 

607 writer = StreamWriter( 

608 protocol, 

609 self.loop, 

610 on_chunk_sent=functools.partial( 

611 self._on_chunk_request_sent, self.method, self.url 

612 ), 

613 on_headers_sent=functools.partial( 

614 self._on_headers_request_sent, self.method, self.url 

615 ), 

616 ) 

617 

618 if self.compress: 

619 writer.enable_compression(self.compress) 

620 

621 if self.chunked is not None: 

622 writer.enable_chunking() 

623 

624 # set default content-type 

625 if ( 

626 self.method in self.POST_METHODS 

627 and hdrs.CONTENT_TYPE not in self.skip_auto_headers 

628 and hdrs.CONTENT_TYPE not in self.headers 

629 ): 

630 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

631 

632 # set the connection header 

633 connection = self.headers.get(hdrs.CONNECTION) 

634 if not connection: 

635 if self.keep_alive(): 

636 if self.version == HttpVersion10: 

637 connection = "keep-alive" 

638 else: 

639 if self.version == HttpVersion11: 

640 connection = "close" 

641 

642 if connection is not None: 

643 self.headers[hdrs.CONNECTION] = connection 

644 

645 # status + headers 

646 status_line = "{0} {1} HTTP/{v.major}.{v.minor}".format( 

647 self.method, path, v=self.version 

648 ) 

649 await writer.write_headers(status_line, self.headers) 

650 

651 self._writer = self.loop.create_task(self.write_bytes(writer, conn)) 

652 

653 response_class = self.response_class 

654 assert response_class is not None 

655 self.response = response_class( 

656 self.method, 

657 self.original_url, 

658 writer=self._writer, 

659 continue100=self._continue, 

660 timer=self._timer, 

661 request_info=self.request_info, 

662 traces=self._traces, 

663 loop=self.loop, 

664 session=self._session, 

665 ) 

666 return self.response 

667 

668 async def close(self) -> None: 

669 if self._writer is not None: 

670 with contextlib.suppress(asyncio.CancelledError): 

671 await self._writer 

672 

673 def terminate(self) -> None: 

674 if self._writer is not None: 

675 if not self.loop.is_closed(): 

676 self._writer.cancel() 

677 self._writer.remove_done_callback(self.__reset_writer) 

678 self._writer = None 

679 

680 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

681 for trace in self._traces: 

682 await trace.send_request_chunk_sent(method, url, chunk) 

683 

684 async def _on_headers_request_sent( 

685 self, method: str, url: URL, headers: "CIMultiDict[str]" 

686 ) -> None: 

687 for trace in self._traces: 

688 await trace.send_request_headers(method, url, headers) 

689 

690 

691class ClientResponse(HeadersMixin): 

692 # Some of these attributes are None when created, 

693 # but will be set by the start() method. 

694 # As the end user will likely never see the None values, we cheat the types below. 

695 # from the Status-Line of the response 

696 version: Optional[HttpVersion] = None # HTTP-Version 

697 status: int = None # type: ignore[assignment] # Status-Code 

698 reason: Optional[str] = None # Reason-Phrase 

699 

700 content: StreamReader = None # type: ignore[assignment] # Payload stream 

701 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment] 

702 _raw_headers: RawHeaders = None # type: ignore[assignment] 

703 

704 _connection = None # current connection 

705 _source_traceback: Optional[traceback.StackSummary] = None 

706 # set up by ClientRequest after ClientResponse object creation 

707 # post-init stage allows to not change ctor signature 

708 _closed = True # to allow __del__ for non-initialized properly response 

709 _released = False 

710 __writer = None 

711 

712 def __init__( 

713 self, 

714 method: str, 

715 url: URL, 

716 *, 

717 writer: "asyncio.Task[None]", 

718 continue100: Optional["asyncio.Future[bool]"], 

719 timer: Optional[BaseTimerContext], 

720 request_info: RequestInfo, 

721 traces: List["Trace"], 

722 loop: asyncio.AbstractEventLoop, 

723 session: "ClientSession", 

724 ) -> None: 

725 assert isinstance(url, URL) 

726 super().__init__() 

727 

728 self.method = method 

729 self.cookies = SimpleCookie() 

730 

731 self._real_url = url 

732 self._url = url.with_fragment(None) 

733 self._body: Optional[bytes] = None 

734 self._writer: Optional[asyncio.Task[None]] = writer 

735 self._continue = continue100 # None by default 

736 self._closed = True 

737 self._history: Tuple[ClientResponse, ...] = () 

738 self._request_info = request_info 

739 self._timer = timer if timer is not None else TimerNoop() 

740 self._cache: Dict[str, Any] = {} 

741 self._traces = traces 

742 self._loop = loop 

743 # store a reference to session #1985 

744 self._session: Optional[ClientSession] = session 

745 # Save reference to _resolve_charset, so that get_encoding() will still 

746 # work after the response has finished reading the body. 

747 if session is None: 

748 # TODO: Fix session=None in tests (see ClientRequest.__init__). 

749 self._resolve_charset: Callable[ 

750 ["ClientResponse", bytes], str 

751 ] = lambda *_: "utf-8" 

752 else: 

753 self._resolve_charset = session._resolve_charset 

754 if loop.get_debug(): 

755 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

756 

757 def __reset_writer(self, _: object = None) -> None: 

758 self.__writer = None 

759 

760 @property 

761 def _writer(self) -> Optional["asyncio.Task[None]"]: 

762 return self.__writer 

763 

764 @_writer.setter 

765 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 

766 if self.__writer is not None: 

767 self.__writer.remove_done_callback(self.__reset_writer) 

768 self.__writer = writer 

769 if writer is not None: 

770 writer.add_done_callback(self.__reset_writer) 

771 

772 @reify 

773 def url(self) -> URL: 

774 return self._url 

775 

776 @reify 

777 def real_url(self) -> URL: 

778 return self._real_url 

779 

780 @reify 

781 def host(self) -> str: 

782 assert self._url.host is not None 

783 return self._url.host 

784 

785 @reify 

786 def headers(self) -> "CIMultiDictProxy[str]": 

787 return self._headers 

788 

789 @reify 

790 def raw_headers(self) -> RawHeaders: 

791 return self._raw_headers 

792 

793 @reify 

794 def request_info(self) -> RequestInfo: 

795 return self._request_info 

796 

797 @reify 

798 def content_disposition(self) -> Optional[ContentDisposition]: 

799 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

800 if raw is None: 

801 return None 

802 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

803 params = MappingProxyType(params_dct) 

804 filename = multipart.content_disposition_filename(params) 

805 return ContentDisposition(disposition_type, params, filename) 

806 

807 def __del__(self, _warnings: Any = warnings) -> None: 

808 if self._closed: 

809 return 

810 

811 if self._connection is not None: 

812 self._connection.release() 

813 self._cleanup_writer() 

814 

815 if self._loop.get_debug(): 

816 _warnings.warn( 

817 f"Unclosed response {self!r}", ResourceWarning, source=self 

818 ) 

819 context = {"client_response": self, "message": "Unclosed response"} 

820 if self._source_traceback: 

821 context["source_traceback"] = self._source_traceback 

822 self._loop.call_exception_handler(context) 

823 

824 def __repr__(self) -> str: 

825 out = io.StringIO() 

826 ascii_encodable_url = str(self.url) 

827 if self.reason: 

828 ascii_encodable_reason = self.reason.encode( 

829 "ascii", "backslashreplace" 

830 ).decode("ascii") 

831 else: 

832 ascii_encodable_reason = "None" 

833 print( 

834 "<ClientResponse({}) [{} {}]>".format( 

835 ascii_encodable_url, self.status, ascii_encodable_reason 

836 ), 

837 file=out, 

838 ) 

839 print(self.headers, file=out) 

840 return out.getvalue() 

841 

842 @property 

843 def connection(self) -> Optional["Connection"]: 

844 return self._connection 

845 

846 @reify 

847 def history(self) -> Tuple["ClientResponse", ...]: 

848 """A sequence of responses, if redirects occurred.""" 

849 return self._history 

850 

851 @reify 

852 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]": 

853 links_str = ", ".join(self.headers.getall("link", [])) 

854 

855 if not links_str: 

856 return MultiDictProxy(MultiDict()) 

857 

858 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict() 

859 

860 for val in re.split(r",(?=\s*<)", links_str): 

861 match = re.match(r"\s*<(.*)>(.*)", val) 

862 if match is None: # pragma: no cover 

863 # the check exists to suppress mypy error 

864 continue 

865 url, params_str = match.groups() 

866 params = params_str.split(";")[1:] 

867 

868 link: MultiDict[Union[str, URL]] = MultiDict() 

869 

870 for param in params: 

871 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

872 if match is None: # pragma: no cover 

873 # the check exists to suppress mypy error 

874 continue 

875 key, _, value, _ = match.groups() 

876 

877 link.add(key, value) 

878 

879 key = link.get("rel", url) 

880 

881 link.add("url", self.url.join(URL(url))) 

882 

883 links.add(str(key), MultiDictProxy(link)) 

884 

885 return MultiDictProxy(links) 

886 

887 async def start(self, connection: "Connection") -> "ClientResponse": 

888 """Start response processing.""" 

889 self._closed = False 

890 self._protocol = connection.protocol 

891 self._connection = connection 

892 

893 with self._timer: 

894 while True: 

895 # read response 

896 try: 

897 protocol = self._protocol 

898 message, payload = await protocol.read() # type: ignore[union-attr] 

899 except http.HttpProcessingError as exc: 

900 raise ClientResponseError( 

901 self.request_info, 

902 self.history, 

903 status=exc.code, 

904 message=exc.message, 

905 headers=exc.headers, 

906 ) from exc 

907 

908 if message.code < 100 or message.code > 199 or message.code == 101: 

909 break 

910 

911 if self._continue is not None: 

912 set_result(self._continue, True) 

913 self._continue = None 

914 

915 # payload eof handler 

916 payload.on_eof(self._response_eof) 

917 

918 # response status 

919 self.version = message.version 

920 self.status = message.code 

921 self.reason = message.reason 

922 

923 # headers 

924 self._headers = message.headers # type is CIMultiDictProxy 

925 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes] 

926 

927 # payload 

928 self.content = payload 

929 

930 # cookies 

931 for hdr in self.headers.getall(hdrs.SET_COOKIE, ()): 

932 try: 

933 self.cookies.load(hdr) 

934 except CookieError as exc: 

935 client_logger.warning("Can not load response cookies: %s", exc) 

936 return self 

937 

938 def _response_eof(self) -> None: 

939 if self._closed: 

940 return 

941 

942 # protocol could be None because connection could be detached 

943 protocol = self._connection and self._connection.protocol 

944 if protocol is not None and protocol.upgraded: 

945 return 

946 

947 self._closed = True 

948 self._cleanup_writer() 

949 self._release_connection() 

950 

951 @property 

952 def closed(self) -> bool: 

953 return self._closed 

954 

955 def close(self) -> None: 

956 if not self._released: 

957 self._notify_content() 

958 

959 self._closed = True 

960 if self._loop.is_closed(): 

961 return 

962 

963 self._cleanup_writer() 

964 if self._connection is not None: 

965 self._connection.close() 

966 self._connection = None 

967 

968 def release(self) -> Any: 

969 if not self._released: 

970 self._notify_content() 

971 

972 self._closed = True 

973 

974 self._cleanup_writer() 

975 self._release_connection() 

976 return noop() 

977 

978 @property 

979 def ok(self) -> bool: 

980 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

981 

982 This is **not** a check for ``200 OK`` but a check that the response 

983 status is under 400. 

984 """ 

985 return 400 > self.status 

986 

987 def raise_for_status(self) -> None: 

988 if not self.ok: 

989 # reason should always be not None for a started response 

990 assert self.reason is not None 

991 self.release() 

992 raise ClientResponseError( 

993 self.request_info, 

994 self.history, 

995 status=self.status, 

996 message=self.reason, 

997 headers=self.headers, 

998 ) 

999 

1000 def _release_connection(self) -> None: 

1001 if self._connection is not None: 

1002 if self._writer is None: 

1003 self._connection.release() 

1004 self._connection = None 

1005 else: 

1006 self._writer.add_done_callback(lambda f: self._release_connection()) 

1007 

1008 async def _wait_released(self) -> None: 

1009 if self._writer is not None: 

1010 await self._writer 

1011 self._release_connection() 

1012 

1013 def _cleanup_writer(self) -> None: 

1014 if self._writer is not None: 

1015 self._writer.cancel() 

1016 self._session = None 

1017 

1018 def _notify_content(self) -> None: 

1019 content = self.content 

1020 # content can be None here, but the types are cheated elsewhere. 

1021 if content and content.exception() is None: # type: ignore[truthy-bool] 

1022 content.set_exception(ClientConnectionError("Connection closed")) 

1023 self._released = True 

1024 

1025 async def wait_for_close(self) -> None: 

1026 if self._writer is not None: 

1027 await self._writer 

1028 self.release() 

1029 

1030 async def read(self) -> bytes: 

1031 """Read response payload.""" 

1032 if self._body is None: 

1033 try: 

1034 self._body = await self.content.read() 

1035 for trace in self._traces: 

1036 await trace.send_response_chunk_received( 

1037 self.method, self.url, self._body 

1038 ) 

1039 except BaseException: 

1040 self.close() 

1041 raise 

1042 elif self._released: # Response explicitly released 

1043 raise ClientConnectionError("Connection closed") 

1044 

1045 protocol = self._connection and self._connection.protocol 

1046 if protocol is None or not protocol.upgraded: 

1047 await self._wait_released() # Underlying connection released 

1048 return self._body 

1049 

1050 def get_encoding(self) -> str: 

1051 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

1052 mimetype = helpers.parse_mimetype(ctype) 

1053 

1054 encoding = mimetype.parameters.get("charset") 

1055 if encoding: 

1056 with contextlib.suppress(LookupError): 

1057 return codecs.lookup(encoding).name 

1058 

1059 if mimetype.type == "application" and ( 

1060 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

1061 ): 

1062 # RFC 7159 states that the default encoding is UTF-8. 

1063 # RFC 7483 defines application/rdap+json 

1064 return "utf-8" 

1065 

1066 if self._body is None: 

1067 raise RuntimeError( 

1068 "Cannot compute fallback encoding of a not yet read body" 

1069 ) 

1070 

1071 return self._resolve_charset(self, self._body) 

1072 

1073 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: 

1074 """Read response payload and decode.""" 

1075 if self._body is None: 

1076 await self.read() 

1077 

1078 if encoding is None: 

1079 encoding = self.get_encoding() 

1080 

1081 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr] 

1082 

1083 async def json( 

1084 self, 

1085 *, 

1086 encoding: Optional[str] = None, 

1087 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

1088 content_type: Optional[str] = "application/json", 

1089 ) -> Any: 

1090 """Read and decodes JSON response.""" 

1091 if self._body is None: 

1092 await self.read() 

1093 

1094 if content_type: 

1095 if not is_expected_content_type(self.content_type, content_type): 

1096 raise ContentTypeError( 

1097 self.request_info, 

1098 self.history, 

1099 message=( 

1100 "Attempt to decode JSON with " 

1101 "unexpected mimetype: %s" % self.content_type 

1102 ), 

1103 headers=self.headers, 

1104 ) 

1105 

1106 if encoding is None: 

1107 encoding = self.get_encoding() 

1108 

1109 return loads(self._body.decode(encoding)) # type: ignore[union-attr] 

1110 

1111 async def __aenter__(self) -> "ClientResponse": 

1112 return self 

1113 

1114 async def __aexit__( 

1115 self, 

1116 exc_type: Optional[Type[BaseException]], 

1117 exc_val: Optional[BaseException], 

1118 exc_tb: Optional[TracebackType], 

1119 ) -> None: 

1120 # similar to _RequestContextManager, we do not need to check 

1121 # for exceptions, response object can close connection 

1122 # if state is broken 

1123 self.release() 

1124 await self.wait_for_close()