Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_reqrep.py: 41%

642 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1import asyncio 

2import codecs 

3import functools 

4import io 

5import re 

6import sys 

7import traceback 

8import warnings 

9from hashlib import md5, sha1, sha256 

10from http.cookies import CookieError, Morsel, SimpleCookie 

11from types import MappingProxyType, TracebackType 

12from typing import ( 

13 TYPE_CHECKING, 

14 Any, 

15 Dict, 

16 Iterable, 

17 List, 

18 Mapping, 

19 Optional, 

20 Tuple, 

21 Type, 

22 Union, 

23 cast, 

24) 

25 

26import attr 

27from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

28from yarl import URL 

29 

30from . import hdrs, helpers, http, multipart, payload 

31from .abc import AbstractStreamWriter 

32from .client_exceptions import ( 

33 ClientConnectionError, 

34 ClientOSError, 

35 ClientResponseError, 

36 ContentTypeError, 

37 InvalidURL, 

38 ServerFingerprintMismatch, 

39) 

40from .formdata import FormData 

41from .helpers import ( 

42 PY_36, 

43 BaseTimerContext, 

44 BasicAuth, 

45 HeadersMixin, 

46 TimerNoop, 

47 noop, 

48 reify, 

49 set_result, 

50) 

51from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter 

52from .log import client_logger 

53from .streams import StreamReader 

54from .typedefs import ( 

55 DEFAULT_JSON_DECODER, 

56 JSONDecoder, 

57 LooseCookies, 

58 LooseHeaders, 

59 RawHeaders, 

60) 

61 

62try: 

63 import ssl 

64 from ssl import SSLContext 

65except ImportError: # pragma: no cover 

66 ssl = None # type: ignore[assignment] 

67 SSLContext = object # type: ignore[misc,assignment] 

68 

69try: 

70 import cchardet as chardet 

71except ImportError: # pragma: no cover 

72 import charset_normalizer as chardet # type: ignore[no-redef] 

73 

74 

75__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

76 

77 

78if TYPE_CHECKING: # pragma: no cover 

79 from .client import ClientSession 

80 from .connector import Connection 

81 from .tracing import Trace 

82 

83 

84json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json") 

85 

86 

87@attr.s(auto_attribs=True, frozen=True, slots=True) 

88class ContentDisposition: 

89 type: Optional[str] 

90 parameters: "MappingProxyType[str, str]" 

91 filename: Optional[str] 

92 

93 

94@attr.s(auto_attribs=True, frozen=True, slots=True) 

95class RequestInfo: 

96 url: URL 

97 method: str 

98 headers: "CIMultiDictProxy[str]" 

99 real_url: URL = attr.ib() 

100 

101 @real_url.default 

102 def real_url_default(self) -> URL: 

103 return self.url 

104 

105 

106class Fingerprint: 

107 HASHFUNC_BY_DIGESTLEN = { 

108 16: md5, 

109 20: sha1, 

110 32: sha256, 

111 } 

112 

113 def __init__(self, fingerprint: bytes) -> None: 

114 digestlen = len(fingerprint) 

115 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

116 if not hashfunc: 

117 raise ValueError("fingerprint has invalid length") 

118 elif hashfunc is md5 or hashfunc is sha1: 

119 raise ValueError( 

120 "md5 and sha1 are insecure and " "not supported. Use sha256." 

121 ) 

122 self._hashfunc = hashfunc 

123 self._fingerprint = fingerprint 

124 

125 @property 

126 def fingerprint(self) -> bytes: 

127 return self._fingerprint 

128 

129 def check(self, transport: asyncio.Transport) -> None: 

130 if not transport.get_extra_info("sslcontext"): 

131 return 

132 sslobj = transport.get_extra_info("ssl_object") 

133 cert = sslobj.getpeercert(binary_form=True) 

134 got = self._hashfunc(cert).digest() 

135 if got != self._fingerprint: 

136 host, port, *_ = transport.get_extra_info("peername") 

137 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

138 

139 

140if ssl is not None: 

141 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None)) 

142else: # pragma: no cover 

143 SSL_ALLOWED_TYPES = type(None) 

144 

145 

146def _merge_ssl_params( 

147 ssl: Union["SSLContext", bool, Fingerprint, None], 

148 verify_ssl: Optional[bool], 

149 ssl_context: Optional["SSLContext"], 

150 fingerprint: Optional[bytes], 

151) -> Union["SSLContext", bool, Fingerprint, None]: 

152 if verify_ssl is not None and not verify_ssl: 

153 warnings.warn( 

154 "verify_ssl is deprecated, use ssl=False instead", 

155 DeprecationWarning, 

156 stacklevel=3, 

157 ) 

158 if ssl is not None: 

159 raise ValueError( 

160 "verify_ssl, ssl_context, fingerprint and ssl " 

161 "parameters are mutually exclusive" 

162 ) 

163 else: 

164 ssl = False 

165 if ssl_context is not None: 

166 warnings.warn( 

167 "ssl_context is deprecated, use ssl=context instead", 

168 DeprecationWarning, 

169 stacklevel=3, 

170 ) 

171 if ssl is not None: 

172 raise ValueError( 

173 "verify_ssl, ssl_context, fingerprint and ssl " 

174 "parameters are mutually exclusive" 

175 ) 

176 else: 

177 ssl = ssl_context 

178 if fingerprint is not None: 

179 warnings.warn( 

180 "fingerprint is deprecated, " "use ssl=Fingerprint(fingerprint) instead", 

181 DeprecationWarning, 

182 stacklevel=3, 

183 ) 

184 if ssl is not None: 

185 raise ValueError( 

186 "verify_ssl, ssl_context, fingerprint and ssl " 

187 "parameters are mutually exclusive" 

188 ) 

189 else: 

190 ssl = Fingerprint(fingerprint) 

191 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

192 raise TypeError( 

193 "ssl should be SSLContext, bool, Fingerprint or None, " 

194 "got {!r} instead.".format(ssl) 

195 ) 

196 return ssl 

197 

198 

199@attr.s(auto_attribs=True, slots=True, frozen=True) 

200class ConnectionKey: 

201 # the key should contain an information about used proxy / TLS 

202 # to prevent reusing wrong connections from a pool 

203 host: str 

204 port: Optional[int] 

205 is_ssl: bool 

206 ssl: Union[SSLContext, None, bool, Fingerprint] 

207 proxy: Optional[URL] 

208 proxy_auth: Optional[BasicAuth] 

209 proxy_headers_hash: Optional[int] # hash(CIMultiDict) 

210 

211 

212def _is_expected_content_type( 

213 response_content_type: str, expected_content_type: str 

214) -> bool: 

215 if expected_content_type == "application/json": 

216 return json_re.match(response_content_type) is not None 

217 return expected_content_type in response_content_type 

218 

219 

220class ClientRequest: 

221 GET_METHODS = { 

222 hdrs.METH_GET, 

223 hdrs.METH_HEAD, 

224 hdrs.METH_OPTIONS, 

225 hdrs.METH_TRACE, 

226 } 

227 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

228 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 

229 

230 DEFAULT_HEADERS = { 

231 hdrs.ACCEPT: "*/*", 

232 hdrs.ACCEPT_ENCODING: "gzip, deflate", 

233 } 

234 

235 body = b"" 

236 auth = None 

237 response = None 

238 

239 _writer = None # async task for streaming data 

240 _continue = None # waiter future for '100 Continue' response 

241 

242 # N.B. 

243 # Adding __del__ method with self._writer closing doesn't make sense 

244 # because _writer is instance method, thus it keeps a reference to self. 

245 # Until writer has finished finalizer will not be called. 

246 

247 def __init__( 

248 self, 

249 method: str, 

250 url: URL, 

251 *, 

252 params: Optional[Mapping[str, str]] = None, 

253 headers: Optional[LooseHeaders] = None, 

254 skip_auto_headers: Iterable[str] = frozenset(), 

255 data: Any = None, 

256 cookies: Optional[LooseCookies] = None, 

257 auth: Optional[BasicAuth] = None, 

258 version: http.HttpVersion = http.HttpVersion11, 

259 compress: Optional[str] = None, 

260 chunked: Optional[bool] = None, 

261 expect100: bool = False, 

262 loop: Optional[asyncio.AbstractEventLoop] = None, 

263 response_class: Optional[Type["ClientResponse"]] = None, 

264 proxy: Optional[URL] = None, 

265 proxy_auth: Optional[BasicAuth] = None, 

266 timer: Optional[BaseTimerContext] = None, 

267 session: Optional["ClientSession"] = None, 

268 ssl: Union[SSLContext, bool, Fingerprint, None] = None, 

269 proxy_headers: Optional[LooseHeaders] = None, 

270 traces: Optional[List["Trace"]] = None, 

271 ): 

272 

273 if loop is None: 

274 loop = asyncio.get_event_loop() 

275 

276 assert isinstance(url, URL), url 

277 assert isinstance(proxy, (URL, type(None))), proxy 

278 # FIXME: session is None in tests only, need to fix tests 

279 # assert session is not None 

280 self._session = cast("ClientSession", session) 

281 if params: 

282 q = MultiDict(url.query) 

283 url2 = url.with_query(params) 

284 q.extend(url2.query) 

285 url = url.with_query(q) 

286 self.original_url = url 

287 self.url = url.with_fragment(None) 

288 self.method = method.upper() 

289 self.chunked = chunked 

290 self.compress = compress 

291 self.loop = loop 

292 self.length = None 

293 if response_class is None: 

294 real_response_class = ClientResponse 

295 else: 

296 real_response_class = response_class 

297 self.response_class: Type[ClientResponse] = real_response_class 

298 self._timer = timer if timer is not None else TimerNoop() 

299 self._ssl = ssl 

300 

301 if loop.get_debug(): 

302 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

303 

304 self.update_version(version) 

305 self.update_host(url) 

306 self.update_headers(headers) 

307 self.update_auto_headers(skip_auto_headers) 

308 self.update_cookies(cookies) 

309 self.update_content_encoding(data) 

310 self.update_auth(auth) 

311 self.update_proxy(proxy, proxy_auth, proxy_headers) 

312 

313 self.update_body_from_data(data) 

314 if data is not None or self.method not in self.GET_METHODS: 

315 self.update_transfer_encoding() 

316 self.update_expect_continue(expect100) 

317 if traces is None: 

318 traces = [] 

319 self._traces = traces 

320 

321 def is_ssl(self) -> bool: 

322 return self.url.scheme in ("https", "wss") 

323 

324 @property 

325 def ssl(self) -> Union["SSLContext", None, bool, Fingerprint]: 

326 return self._ssl 

327 

328 @property 

329 def connection_key(self) -> ConnectionKey: 

330 proxy_headers = self.proxy_headers 

331 if proxy_headers: 

332 h: Optional[int] = hash(tuple((k, v) for k, v in proxy_headers.items())) 

333 else: 

334 h = None 

335 return ConnectionKey( 

336 self.host, 

337 self.port, 

338 self.is_ssl(), 

339 self.ssl, 

340 self.proxy, 

341 self.proxy_auth, 

342 h, 

343 ) 

344 

345 @property 

346 def host(self) -> str: 

347 ret = self.url.raw_host 

348 assert ret is not None 

349 return ret 

350 

351 @property 

352 def port(self) -> Optional[int]: 

353 return self.url.port 

354 

355 @property 

356 def request_info(self) -> RequestInfo: 

357 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 

358 return RequestInfo(self.url, self.method, headers, self.original_url) 

359 

360 def update_host(self, url: URL) -> None: 

361 """Update destination host, port and connection type (ssl).""" 

362 # get host/port 

363 if not url.raw_host: 

364 raise InvalidURL(url) 

365 

366 # basic auth info 

367 username, password = url.user, url.password 

368 if username: 

369 self.auth = helpers.BasicAuth(username, password or "") 

370 

371 def update_version(self, version: Union[http.HttpVersion, str]) -> None: 

372 """Convert request version to two elements tuple. 

373 

374 parser HTTP version '1.1' => (1, 1) 

375 """ 

376 if isinstance(version, str): 

377 v = [part.strip() for part in version.split(".", 1)] 

378 try: 

379 version = http.HttpVersion(int(v[0]), int(v[1])) 

380 except ValueError: 

381 raise ValueError( 

382 f"Can not parse http version number: {version}" 

383 ) from None 

384 self.version = version 

385 

386 def update_headers(self, headers: Optional[LooseHeaders]) -> None: 

387 """Update request headers.""" 

388 self.headers: CIMultiDict[str] = CIMultiDict() 

389 

390 # add host 

391 netloc = cast(str, self.url.raw_host) 

392 if helpers.is_ipv6_address(netloc): 

393 netloc = f"[{netloc}]" 

394 if self.url.port is not None and not self.url.is_default_port(): 

395 netloc += ":" + str(self.url.port) 

396 self.headers[hdrs.HOST] = netloc 

397 

398 if headers: 

399 if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 

400 headers = headers.items() # type: ignore[assignment] 

401 

402 for key, value in headers: # type: ignore[misc] 

403 # A special case for Host header 

404 if key.lower() == "host": 

405 self.headers[key] = value 

406 else: 

407 self.headers.add(key, value) 

408 

409 def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None: 

410 self.skip_auto_headers = CIMultiDict( 

411 (hdr, None) for hdr in sorted(skip_auto_headers) 

412 ) 

413 used_headers = self.headers.copy() 

414 used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type] 

415 

416 for hdr, val in self.DEFAULT_HEADERS.items(): 

417 if hdr not in used_headers: 

418 self.headers.add(hdr, val) 

419 

420 if hdrs.USER_AGENT not in used_headers: 

421 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

422 

423 def update_cookies(self, cookies: Optional[LooseCookies]) -> None: 

424 """Update request cookies header.""" 

425 if not cookies: 

426 return 

427 

428 c: SimpleCookie[str] = SimpleCookie() 

429 if hdrs.COOKIE in self.headers: 

430 c.load(self.headers.get(hdrs.COOKIE, "")) 

431 del self.headers[hdrs.COOKIE] 

432 

433 if isinstance(cookies, Mapping): 

434 iter_cookies = cookies.items() 

435 else: 

436 iter_cookies = cookies # type: ignore[assignment] 

437 for name, value in iter_cookies: 

438 if isinstance(value, Morsel): 

439 # Preserve coded_value 

440 mrsl_val = value.get(value.key, Morsel()) 

441 mrsl_val.set(value.key, value.value, value.coded_value) 

442 c[name] = mrsl_val 

443 else: 

444 c[name] = value # type: ignore[assignment] 

445 

446 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

447 

448 def update_content_encoding(self, data: Any) -> None: 

449 """Set request content encoding.""" 

450 if data is None: 

451 return 

452 

453 enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower() 

454 if enc: 

455 if self.compress: 

456 raise ValueError( 

457 "compress can not be set " "if Content-Encoding header is set" 

458 ) 

459 elif self.compress: 

460 if not isinstance(self.compress, str): 

461 self.compress = "deflate" 

462 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

463 self.chunked = True # enable chunked, no need to deal with length 

464 

465 def update_transfer_encoding(self) -> None: 

466 """Analyze transfer-encoding header.""" 

467 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

468 

469 if "chunked" in te: 

470 if self.chunked: 

471 raise ValueError( 

472 "chunked can not be set " 

473 'if "Transfer-Encoding: chunked" header is set' 

474 ) 

475 

476 elif self.chunked: 

477 if hdrs.CONTENT_LENGTH in self.headers: 

478 raise ValueError( 

479 "chunked can not be set " "if Content-Length header is set" 

480 ) 

481 

482 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

483 else: 

484 if hdrs.CONTENT_LENGTH not in self.headers: 

485 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body)) 

486 

487 def update_auth(self, auth: Optional[BasicAuth]) -> None: 

488 """Set basic auth.""" 

489 if auth is None: 

490 auth = self.auth 

491 if auth is None: 

492 return 

493 

494 if not isinstance(auth, helpers.BasicAuth): 

495 raise TypeError("BasicAuth() tuple is required instead") 

496 

497 self.headers[hdrs.AUTHORIZATION] = auth.encode() 

498 

499 def update_body_from_data(self, body: Any) -> None: 

500 if body is None: 

501 return 

502 

503 # FormData 

504 if isinstance(body, FormData): 

505 body = body() 

506 

507 try: 

508 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None) 

509 except payload.LookupError: 

510 body = FormData(body)() 

511 

512 self.body = body 

513 

514 # enable chunked encoding if needed 

515 if not self.chunked: 

516 if hdrs.CONTENT_LENGTH not in self.headers: 

517 size = body.size 

518 if size is None: 

519 self.chunked = True 

520 else: 

521 if hdrs.CONTENT_LENGTH not in self.headers: 

522 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

523 

524 # copy payload headers 

525 assert body.headers 

526 for (key, value) in body.headers.items(): 

527 if key in self.headers: 

528 continue 

529 if key in self.skip_auto_headers: 

530 continue 

531 self.headers[key] = value 

532 

533 def update_expect_continue(self, expect: bool = False) -> None: 

534 if expect: 

535 self.headers[hdrs.EXPECT] = "100-continue" 

536 elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue": 

537 expect = True 

538 

539 if expect: 

540 self._continue = self.loop.create_future() 

541 

542 def update_proxy( 

543 self, 

544 proxy: Optional[URL], 

545 proxy_auth: Optional[BasicAuth], 

546 proxy_headers: Optional[LooseHeaders], 

547 ) -> None: 

548 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 

549 raise ValueError("proxy_auth must be None or BasicAuth() tuple") 

550 self.proxy = proxy 

551 self.proxy_auth = proxy_auth 

552 self.proxy_headers = proxy_headers 

553 

554 def keep_alive(self) -> bool: 

555 if self.version < HttpVersion10: 

556 # keep alive not supported at all 

557 return False 

558 if self.version == HttpVersion10: 

559 if self.headers.get(hdrs.CONNECTION) == "keep-alive": 

560 return True 

561 else: # no headers means we close for Http 1.0 

562 return False 

563 elif self.headers.get(hdrs.CONNECTION) == "close": 

564 return False 

565 

566 return True 

567 

568 async def write_bytes( 

569 self, writer: AbstractStreamWriter, conn: "Connection" 

570 ) -> None: 

571 """Support coroutines that yields bytes objects.""" 

572 # 100 response 

573 if self._continue is not None: 

574 await writer.drain() 

575 await self._continue 

576 

577 protocol = conn.protocol 

578 assert protocol is not None 

579 try: 

580 if isinstance(self.body, payload.Payload): 

581 await self.body.write(writer) 

582 else: 

583 if isinstance(self.body, (bytes, bytearray)): 

584 self.body = (self.body,) # type: ignore[assignment] 

585 

586 for chunk in self.body: 

587 await writer.write(chunk) # type: ignore[arg-type] 

588 

589 await writer.write_eof() 

590 except OSError as exc: 

591 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

592 protocol.set_exception(exc) 

593 else: 

594 new_exc = ClientOSError( 

595 exc.errno, "Can not write request body for %s" % self.url 

596 ) 

597 new_exc.__context__ = exc 

598 new_exc.__cause__ = exc 

599 protocol.set_exception(new_exc) 

600 except asyncio.CancelledError as exc: 

601 if not conn.closed: 

602 protocol.set_exception(exc) 

603 except Exception as exc: 

604 protocol.set_exception(exc) 

605 finally: 

606 self._writer = None 

607 

608 async def send(self, conn: "Connection") -> "ClientResponse": 

609 # Specify request target: 

610 # - CONNECT request must send authority form URI 

611 # - not CONNECT proxy must send absolute form URI 

612 # - most common is origin form URI 

613 if self.method == hdrs.METH_CONNECT: 

614 connect_host = self.url.raw_host 

615 assert connect_host is not None 

616 if helpers.is_ipv6_address(connect_host): 

617 connect_host = f"[{connect_host}]" 

618 path = f"{connect_host}:{self.url.port}" 

619 elif self.proxy and not self.is_ssl(): 

620 path = str(self.url) 

621 else: 

622 path = self.url.raw_path 

623 if self.url.raw_query_string: 

624 path += "?" + self.url.raw_query_string 

625 

626 protocol = conn.protocol 

627 assert protocol is not None 

628 writer = StreamWriter( 

629 protocol, 

630 self.loop, 

631 on_chunk_sent=functools.partial( 

632 self._on_chunk_request_sent, self.method, self.url 

633 ), 

634 on_headers_sent=functools.partial( 

635 self._on_headers_request_sent, self.method, self.url 

636 ), 

637 ) 

638 

639 if self.compress: 

640 writer.enable_compression(self.compress) 

641 

642 if self.chunked is not None: 

643 writer.enable_chunking() 

644 

645 # set default content-type 

646 if ( 

647 self.method in self.POST_METHODS 

648 and hdrs.CONTENT_TYPE not in self.skip_auto_headers 

649 and hdrs.CONTENT_TYPE not in self.headers 

650 ): 

651 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

652 

653 # set the connection header 

654 connection = self.headers.get(hdrs.CONNECTION) 

655 if not connection: 

656 if self.keep_alive(): 

657 if self.version == HttpVersion10: 

658 connection = "keep-alive" 

659 else: 

660 if self.version == HttpVersion11: 

661 connection = "close" 

662 

663 if connection is not None: 

664 self.headers[hdrs.CONNECTION] = connection 

665 

666 # status + headers 

667 status_line = "{0} {1} HTTP/{2[0]}.{2[1]}".format( 

668 self.method, path, self.version 

669 ) 

670 await writer.write_headers(status_line, self.headers) 

671 

672 self._writer = self.loop.create_task(self.write_bytes(writer, conn)) 

673 

674 response_class = self.response_class 

675 assert response_class is not None 

676 self.response = response_class( 

677 self.method, 

678 self.original_url, 

679 writer=self._writer, 

680 continue100=self._continue, 

681 timer=self._timer, 

682 request_info=self.request_info, 

683 traces=self._traces, 

684 loop=self.loop, 

685 session=self._session, 

686 ) 

687 return self.response 

688 

689 async def close(self) -> None: 

690 if self._writer is not None: 

691 try: 

692 await self._writer 

693 finally: 

694 self._writer = None 

695 

696 def terminate(self) -> None: 

697 if self._writer is not None: 

698 if not self.loop.is_closed(): 

699 self._writer.cancel() 

700 self._writer = None 

701 

702 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

703 for trace in self._traces: 

704 await trace.send_request_chunk_sent(method, url, chunk) 

705 

706 async def _on_headers_request_sent( 

707 self, method: str, url: URL, headers: "CIMultiDict[str]" 

708 ) -> None: 

709 for trace in self._traces: 

710 await trace.send_request_headers(method, url, headers) 

711 

712 

713class ClientResponse(HeadersMixin): 

714 

715 # from the Status-Line of the response 

716 version = None # HTTP-Version 

717 status: int = None # type: ignore[assignment] # Status-Code 

718 reason = None # Reason-Phrase 

719 

720 content: StreamReader = None # type: ignore[assignment] # Payload stream 

721 _headers: "CIMultiDictProxy[str]" = None # type: ignore[assignment] 

722 _raw_headers: RawHeaders = None # type: ignore[assignment] # Response raw headers 

723 

724 _connection = None # current connection 

725 _source_traceback = None 

726 # setted up by ClientRequest after ClientResponse object creation 

727 # post-init stage allows to not change ctor signature 

728 _closed = True # to allow __del__ for non-initialized properly response 

729 _released = False 

730 

731 def __init__( 

732 self, 

733 method: str, 

734 url: URL, 

735 *, 

736 writer: "asyncio.Task[None]", 

737 continue100: Optional["asyncio.Future[bool]"], 

738 timer: BaseTimerContext, 

739 request_info: RequestInfo, 

740 traces: List["Trace"], 

741 loop: asyncio.AbstractEventLoop, 

742 session: "ClientSession", 

743 ) -> None: 

744 assert isinstance(url, URL) 

745 

746 self.method = method 

747 self.cookies: SimpleCookie[str] = SimpleCookie() 

748 

749 self._real_url = url 

750 self._url = url.with_fragment(None) 

751 self._body: Any = None 

752 self._writer: Optional[asyncio.Task[None]] = writer 

753 self._continue = continue100 # None by default 

754 self._closed = True 

755 self._history: Tuple[ClientResponse, ...] = () 

756 self._request_info = request_info 

757 self._timer = timer if timer is not None else TimerNoop() 

758 self._cache: Dict[str, Any] = {} 

759 self._traces = traces 

760 self._loop = loop 

761 # store a reference to session #1985 

762 self._session: Optional[ClientSession] = session 

763 if loop.get_debug(): 

764 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

765 

766 @reify 

767 def url(self) -> URL: 

768 return self._url 

769 

770 @reify 

771 def url_obj(self) -> URL: 

772 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2) 

773 return self._url 

774 

775 @reify 

776 def real_url(self) -> URL: 

777 return self._real_url 

778 

779 @reify 

780 def host(self) -> str: 

781 assert self._url.host is not None 

782 return self._url.host 

783 

784 @reify 

785 def headers(self) -> "CIMultiDictProxy[str]": 

786 return self._headers 

787 

788 @reify 

789 def raw_headers(self) -> RawHeaders: 

790 return self._raw_headers 

791 

792 @reify 

793 def request_info(self) -> RequestInfo: 

794 return self._request_info 

795 

796 @reify 

797 def content_disposition(self) -> Optional[ContentDisposition]: 

798 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

799 if raw is None: 

800 return None 

801 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

802 params = MappingProxyType(params_dct) 

803 filename = multipart.content_disposition_filename(params) 

804 return ContentDisposition(disposition_type, params, filename) 

805 

806 def __del__(self, _warnings: Any = warnings) -> None: 

807 if self._closed: 

808 return 

809 

810 if self._connection is not None: 

811 self._connection.release() 

812 self._cleanup_writer() 

813 

814 if self._loop.get_debug(): 

815 if PY_36: 

816 kwargs = {"source": self} 

817 else: 

818 kwargs = {} 

819 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs) 

820 context = {"client_response": self, "message": "Unclosed response"} 

821 if self._source_traceback: 

822 context["source_traceback"] = self._source_traceback 

823 self._loop.call_exception_handler(context) 

824 

825 def __repr__(self) -> str: 

826 out = io.StringIO() 

827 ascii_encodable_url = str(self.url) 

828 if self.reason: 

829 ascii_encodable_reason = self.reason.encode( 

830 "ascii", "backslashreplace" 

831 ).decode("ascii") 

832 else: 

833 ascii_encodable_reason = self.reason 

834 print( 

835 "<ClientResponse({}) [{} {}]>".format( 

836 ascii_encodable_url, self.status, ascii_encodable_reason 

837 ), 

838 file=out, 

839 ) 

840 print(self.headers, file=out) 

841 return out.getvalue() 

842 

843 @property 

844 def connection(self) -> Optional["Connection"]: 

845 return self._connection 

846 

847 @reify 

848 def history(self) -> Tuple["ClientResponse", ...]: 

849 """A sequence of of responses, if redirects occurred.""" 

850 return self._history 

851 

852 @reify 

853 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]": 

854 links_str = ", ".join(self.headers.getall("link", [])) 

855 

856 if not links_str: 

857 return MultiDictProxy(MultiDict()) 

858 

859 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict() 

860 

861 for val in re.split(r",(?=\s*<)", links_str): 

862 match = re.match(r"\s*<(.*)>(.*)", val) 

863 if match is None: # pragma: no cover 

864 # the check exists to suppress mypy error 

865 continue 

866 url, params_str = match.groups() 

867 params = params_str.split(";")[1:] 

868 

869 link: MultiDict[Union[str, URL]] = MultiDict() 

870 

871 for param in params: 

872 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

873 if match is None: # pragma: no cover 

874 # the check exists to suppress mypy error 

875 continue 

876 key, _, value, _ = match.groups() 

877 

878 link.add(key, value) 

879 

880 key = link.get("rel", url) # type: ignore[assignment] 

881 

882 link.add("url", self.url.join(URL(url))) 

883 

884 links.add(key, MultiDictProxy(link)) 

885 

886 return MultiDictProxy(links) 

887 

888 async def start(self, connection: "Connection") -> "ClientResponse": 

889 """Start response processing.""" 

890 self._closed = False 

891 self._protocol = connection.protocol 

892 self._connection = connection 

893 

894 with self._timer: 

895 while True: 

896 # read response 

897 try: 

898 protocol = self._protocol 

899 message, payload = await protocol.read() # type: ignore[union-attr] 

900 except http.HttpProcessingError as exc: 

901 raise ClientResponseError( 

902 self.request_info, 

903 self.history, 

904 status=exc.code, 

905 message=exc.message, 

906 headers=exc.headers, 

907 ) from exc 

908 

909 if message.code < 100 or message.code > 199 or message.code == 101: 

910 break 

911 

912 if self._continue is not None: 

913 set_result(self._continue, True) 

914 self._continue = None 

915 

916 # payload eof handler 

917 payload.on_eof(self._response_eof) 

918 

919 # response status 

920 self.version = message.version 

921 self.status = message.code 

922 self.reason = message.reason 

923 

924 # headers 

925 self._headers = message.headers # type is CIMultiDictProxy 

926 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes] 

927 

928 # payload 

929 self.content = payload 

930 

931 # cookies 

932 for hdr in self.headers.getall(hdrs.SET_COOKIE, ()): 

933 try: 

934 self.cookies.load(hdr) 

935 except CookieError as exc: 

936 client_logger.warning("Can not load response cookies: %s", exc) 

937 return self 

938 

939 def _response_eof(self) -> None: 

940 if self._closed: 

941 return 

942 

943 if self._connection is not None: 

944 # websocket, protocol could be None because 

945 # connection could be detached 

946 if ( 

947 self._connection.protocol is not None 

948 and self._connection.protocol.upgraded 

949 ): 

950 return 

951 

952 self._connection.release() 

953 self._connection = None 

954 

955 self._closed = True 

956 self._cleanup_writer() 

957 

958 @property 

959 def closed(self) -> bool: 

960 return self._closed 

961 

962 def close(self) -> None: 

963 if not self._released: 

964 self._notify_content() 

965 if self._closed: 

966 return 

967 

968 self._closed = True 

969 if self._loop is None or self._loop.is_closed(): 

970 return 

971 

972 if self._connection is not None: 

973 self._connection.close() 

974 self._connection = None 

975 self._cleanup_writer() 

976 

977 def release(self) -> Any: 

978 if not self._released: 

979 self._notify_content() 

980 if self._closed: 

981 return noop() 

982 

983 self._closed = True 

984 if self._connection is not None: 

985 self._connection.release() 

986 self._connection = None 

987 

988 self._cleanup_writer() 

989 return noop() 

990 

991 @property 

992 def ok(self) -> bool: 

993 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

994 

995 This is **not** a check for ``200 OK`` but a check that the response 

996 status is under 400. 

997 """ 

998 return 400 > self.status 

999 

1000 def raise_for_status(self) -> None: 

1001 if not self.ok: 

1002 # reason should always be not None for a started response 

1003 assert self.reason is not None 

1004 self.release() 

1005 raise ClientResponseError( 

1006 self.request_info, 

1007 self.history, 

1008 status=self.status, 

1009 message=self.reason, 

1010 headers=self.headers, 

1011 ) 

1012 

1013 def _cleanup_writer(self) -> None: 

1014 if self._writer is not None: 

1015 self._writer.cancel() 

1016 self._writer = None 

1017 self._session = None 

1018 

1019 def _notify_content(self) -> None: 

1020 content = self.content 

1021 if content and content.exception() is None: 

1022 content.set_exception(ClientConnectionError("Connection closed")) 

1023 self._released = True 

1024 

1025 async def wait_for_close(self) -> None: 

1026 if self._writer is not None: 

1027 try: 

1028 await self._writer 

1029 finally: 

1030 self._writer = None 

1031 self.release() 

1032 

1033 async def read(self) -> bytes: 

1034 """Read response payload.""" 

1035 if self._body is None: 

1036 try: 

1037 self._body = await self.content.read() 

1038 for trace in self._traces: 

1039 await trace.send_response_chunk_received( 

1040 self.method, self.url, self._body 

1041 ) 

1042 except BaseException: 

1043 self.close() 

1044 raise 

1045 elif self._released: 

1046 raise ClientConnectionError("Connection closed") 

1047 

1048 return self._body # type: ignore[no-any-return] 

1049 

1050 def get_encoding(self) -> str: 

1051 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

1052 mimetype = helpers.parse_mimetype(ctype) 

1053 

1054 encoding = mimetype.parameters.get("charset") 

1055 if encoding: 

1056 try: 

1057 codecs.lookup(encoding) 

1058 except LookupError: 

1059 encoding = None 

1060 if not encoding: 

1061 if mimetype.type == "application" and ( 

1062 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

1063 ): 

1064 # RFC 7159 states that the default encoding is UTF-8. 

1065 # RFC 7483 defines application/rdap+json 

1066 encoding = "utf-8" 

1067 elif self._body is None: 

1068 raise RuntimeError( 

1069 "Cannot guess the encoding of " "a not yet read body" 

1070 ) 

1071 else: 

1072 encoding = chardet.detect(self._body)["encoding"] 

1073 if not encoding: 

1074 encoding = "utf-8" 

1075 

1076 return encoding 

1077 

1078 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: 

1079 """Read response payload and decode.""" 

1080 if self._body is None: 

1081 await self.read() 

1082 

1083 if encoding is None: 

1084 encoding = self.get_encoding() 

1085 

1086 return self._body.decode( # type: ignore[no-any-return,union-attr] 

1087 encoding, errors=errors 

1088 ) 

1089 

1090 async def json( 

1091 self, 

1092 *, 

1093 encoding: Optional[str] = None, 

1094 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

1095 content_type: Optional[str] = "application/json", 

1096 ) -> Any: 

1097 """Read and decodes JSON response.""" 

1098 if self._body is None: 

1099 await self.read() 

1100 

1101 if content_type: 

1102 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

1103 if not _is_expected_content_type(ctype, content_type): 

1104 raise ContentTypeError( 

1105 self.request_info, 

1106 self.history, 

1107 message=( 

1108 "Attempt to decode JSON with " "unexpected mimetype: %s" % ctype 

1109 ), 

1110 headers=self.headers, 

1111 ) 

1112 

1113 stripped = self._body.strip() # type: ignore[union-attr] 

1114 if not stripped: 

1115 return None 

1116 

1117 if encoding is None: 

1118 encoding = self.get_encoding() 

1119 

1120 return loads(stripped.decode(encoding)) 

1121 

1122 async def __aenter__(self) -> "ClientResponse": 

1123 return self 

1124 

1125 async def __aexit__( 

1126 self, 

1127 exc_type: Optional[Type[BaseException]], 

1128 exc_val: Optional[BaseException], 

1129 exc_tb: Optional[TracebackType], 

1130 ) -> None: 

1131 # similar to _RequestContextManager, we do not need to check 

1132 # for exceptions, response object can close connection 

1133 # if state is broken 

1134 self.release()