Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 25%

620 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import asyncio 

2import codecs 

3import contextlib 

4import dataclasses 

5import functools 

6import io 

7import re 

8import sys 

9import traceback 

10import warnings 

11from hashlib import md5, sha1, sha256 

12from http.cookies import CookieError, Morsel, SimpleCookie 

13from types import MappingProxyType, TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Dict, 

18 Iterable, 

19 List, 

20 Mapping, 

21 Optional, 

22 Tuple, 

23 Type, 

24 Union, 

25 cast, 

26) 

27 

28from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

29from yarl import URL 

30 

31from . import hdrs, helpers, http, multipart, payload 

32from .abc import AbstractStreamWriter 

33from .client_exceptions import ( 

34 ClientConnectionError, 

35 ClientOSError, 

36 ClientResponseError, 

37 ContentTypeError, 

38 InvalidURL, 

39 ServerFingerprintMismatch, 

40) 

41from .compression_utils import HAS_BROTLI 

42from .formdata import FormData 

43from .hdrs import CONTENT_TYPE 

44from .helpers import ( 

45 BaseTimerContext, 

46 BasicAuth, 

47 HeadersMixin, 

48 TimerNoop, 

49 basicauth_from_netrc, 

50 is_expected_content_type, 

51 netrc_from_env, 

52 noop, 

53 parse_mimetype, 

54 reify, 

55 set_result, 

56) 

57from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter 

58from .log import client_logger 

59from .streams import StreamReader 

60from .typedefs import ( 

61 DEFAULT_JSON_DECODER, 

62 JSONDecoder, 

63 LooseCookies, 

64 LooseHeaders, 

65 RawHeaders, 

66) 

67 

68try: 

69 import ssl 

70 from ssl import SSLContext 

71except ImportError: # pragma: no cover 

72 ssl = None # type: ignore[assignment] 

73 SSLContext = object # type: ignore[misc,assignment] 

74 

75try: 

76 import cchardet as chardet 

77except ImportError: # pragma: no cover 

78 import charset_normalizer as chardet # type: ignore[no-redef] 

79 

80 

81__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

82 

83 

84if TYPE_CHECKING: # pragma: no cover 

85 from .client import ClientSession 

86 from .connector import Connection 

87 from .tracing import Trace 

88 

89 

90_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

91 

92 

93def _gen_default_accept_encoding() -> str: 

94 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate" 

95 

96 

97@dataclasses.dataclass(frozen=True) 

98class ContentDisposition: 

99 type: Optional[str] 

100 parameters: "MappingProxyType[str, str]" 

101 filename: Optional[str] 

102 

103 

104@dataclasses.dataclass(frozen=True) 

105class RequestInfo: 

106 url: URL 

107 method: str 

108 headers: "CIMultiDictProxy[str]" 

109 real_url: URL 

110 

111 

112class Fingerprint: 

113 HASHFUNC_BY_DIGESTLEN = { 

114 16: md5, 

115 20: sha1, 

116 32: sha256, 

117 } 

118 

119 def __init__(self, fingerprint: bytes) -> None: 

120 digestlen = len(fingerprint) 

121 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

122 if not hashfunc: 

123 raise ValueError("fingerprint has invalid length") 

124 elif hashfunc is md5 or hashfunc is sha1: 

125 raise ValueError( 

126 "md5 and sha1 are insecure and " "not supported. Use sha256." 

127 ) 

128 self._hashfunc = hashfunc 

129 self._fingerprint = fingerprint 

130 

131 @property 

132 def fingerprint(self) -> bytes: 

133 return self._fingerprint 

134 

135 def check(self, transport: asyncio.Transport) -> None: 

136 if not transport.get_extra_info("sslcontext"): 

137 return 

138 sslobj = transport.get_extra_info("ssl_object") 

139 cert = sslobj.getpeercert(binary_form=True) 

140 got = self._hashfunc(cert).digest() 

141 if got != self._fingerprint: 

142 host, port, *_ = transport.get_extra_info("peername") 

143 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

144 

145 

146if ssl is not None: 

147 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None)) 

148else: # pragma: no cover 

149 SSL_ALLOWED_TYPES = type(None) 

150 

151 

152@dataclasses.dataclass(frozen=True) 

153class ConnectionKey: 

154 # the key should contain an information about used proxy / TLS 

155 # to prevent reusing wrong connections from a pool 

156 host: str 

157 port: Optional[int] 

158 is_ssl: bool 

159 ssl: Union[SSLContext, None, bool, Fingerprint] 

160 proxy: Optional[URL] 

161 proxy_auth: Optional[BasicAuth] 

162 proxy_headers_hash: Optional[int] # hash(CIMultiDict) 

163 

164 

165class ClientRequest: 

166 GET_METHODS = { 

167 hdrs.METH_GET, 

168 hdrs.METH_HEAD, 

169 hdrs.METH_OPTIONS, 

170 hdrs.METH_TRACE, 

171 } 

172 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

173 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 

174 

175 DEFAULT_HEADERS = { 

176 hdrs.ACCEPT: "*/*", 

177 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

178 } 

179 

180 body = b"" 

181 auth = None 

182 response = None 

183 

184 _writer = None # async task for streaming data 

185 _continue = None # waiter future for '100 Continue' response 

186 

187 # N.B. 

188 # Adding __del__ method with self._writer closing doesn't make sense 

189 # because _writer is instance method, thus it keeps a reference to self. 

190 # Until writer has finished finalizer will not be called. 

191 

192 def __init__( 

193 self, 

194 method: str, 

195 url: URL, 

196 *, 

197 params: Optional[Mapping[str, str]] = None, 

198 headers: Optional[LooseHeaders] = None, 

199 skip_auto_headers: Iterable[str] = frozenset(), 

200 data: Any = None, 

201 cookies: Optional[LooseCookies] = None, 

202 auth: Optional[BasicAuth] = None, 

203 version: http.HttpVersion = http.HttpVersion11, 

204 compress: Optional[str] = None, 

205 chunked: Optional[bool] = None, 

206 expect100: bool = False, 

207 loop: asyncio.AbstractEventLoop, 

208 response_class: Optional[Type["ClientResponse"]] = None, 

209 proxy: Optional[URL] = None, 

210 proxy_auth: Optional[BasicAuth] = None, 

211 timer: Optional[BaseTimerContext] = None, 

212 session: Optional["ClientSession"] = None, 

213 ssl: Union[SSLContext, bool, Fingerprint, None] = None, 

214 proxy_headers: Optional[LooseHeaders] = None, 

215 traces: Optional[List["Trace"]] = None, 

216 trust_env: bool = False, 

217 ): 

218 match = _CONTAINS_CONTROL_CHAR_RE.search(method) 

219 if match: 

220 raise ValueError( 

221 f"Method cannot contain non-token characters {method!r} " 

222 f"(found at least {match.group()!r})" 

223 ) 

224 assert isinstance(url, URL), url 

225 assert isinstance(proxy, (URL, type(None))), proxy 

226 # FIXME: session is None in tests only, need to fix tests 

227 # assert session is not None 

228 self._session = cast("ClientSession", session) 

229 if params: 

230 q = MultiDict(url.query) 

231 url2 = url.with_query(params) 

232 q.extend(url2.query) 

233 url = url.with_query(q) 

234 self.original_url = url 

235 self.url = url.with_fragment(None) 

236 self.method = method.upper() 

237 self.chunked = chunked 

238 self.compress = compress 

239 self.loop = loop 

240 self.length = None 

241 if response_class is None: 

242 real_response_class = ClientResponse 

243 else: 

244 real_response_class = response_class 

245 self.response_class: Type[ClientResponse] = real_response_class 

246 self._timer = timer if timer is not None else TimerNoop() 

247 self._ssl = ssl 

248 

249 if loop.get_debug(): 

250 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

251 

252 self.update_version(version) 

253 self.update_host(url) 

254 self.update_headers(headers) 

255 self.update_auto_headers(skip_auto_headers) 

256 self.update_cookies(cookies) 

257 self.update_content_encoding(data) 

258 self.update_auth(auth, trust_env) 

259 self.update_proxy(proxy, proxy_auth, proxy_headers) 

260 

261 self.update_body_from_data(data) 

262 if data is not None or self.method not in self.GET_METHODS: 

263 self.update_transfer_encoding() 

264 self.update_expect_continue(expect100) 

265 if traces is None: 

266 traces = [] 

267 self._traces = traces 

268 

269 def is_ssl(self) -> bool: 

270 return self.url.scheme in ("https", "wss") 

271 

272 @property 

273 def ssl(self) -> Union["SSLContext", None, bool, Fingerprint]: 

274 return self._ssl 

275 

276 @property 

277 def connection_key(self) -> ConnectionKey: 

278 proxy_headers = self.proxy_headers 

279 if proxy_headers: 

280 h: Optional[int] = hash(tuple((k, v) for k, v in proxy_headers.items())) 

281 else: 

282 h = None 

283 return ConnectionKey( 

284 self.host, 

285 self.port, 

286 self.is_ssl(), 

287 self.ssl, 

288 self.proxy, 

289 self.proxy_auth, 

290 h, 

291 ) 

292 

293 @property 

294 def host(self) -> str: 

295 ret = self.url.raw_host 

296 assert ret is not None 

297 return ret 

298 

299 @property 

300 def port(self) -> Optional[int]: 

301 return self.url.port 

302 

303 @property 

304 def request_info(self) -> RequestInfo: 

305 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 

306 return RequestInfo(self.url, self.method, headers, self.original_url) 

307 

308 def update_host(self, url: URL) -> None: 

309 """Update destination host, port and connection type (ssl).""" 

310 # get host/port 

311 if not url.raw_host: 

312 raise InvalidURL(url) 

313 

314 # basic auth info 

315 username, password = url.user, url.password 

316 if username: 

317 self.auth = helpers.BasicAuth(username, password or "") 

318 

319 def update_version(self, version: Union[http.HttpVersion, str]) -> None: 

320 """Convert request version to two elements tuple. 

321 

322 parser HTTP version '1.1' => (1, 1) 

323 """ 

324 if isinstance(version, str): 

325 v = [part.strip() for part in version.split(".", 1)] 

326 try: 

327 version = http.HttpVersion(int(v[0]), int(v[1])) 

328 except ValueError: 

329 raise ValueError( 

330 f"Can not parse http version number: {version}" 

331 ) from None 

332 self.version = version 

333 

334 def update_headers(self, headers: Optional[LooseHeaders]) -> None: 

335 """Update request headers.""" 

336 self.headers: CIMultiDict[str] = CIMultiDict() 

337 

338 # add host 

339 netloc = cast(str, self.url.raw_host) 

340 if helpers.is_ipv6_address(netloc): 

341 netloc = f"[{netloc}]" 

342 if self.url.port is not None and not self.url.is_default_port(): 

343 netloc += ":" + str(self.url.port) 

344 self.headers[hdrs.HOST] = netloc 

345 

346 if headers: 

347 if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 

348 headers = headers.items() # type: ignore[assignment] 

349 

350 for key, value in headers: # type: ignore[misc] 

351 # A special case for Host header 

352 if key.lower() == "host": 

353 self.headers[key] = value 

354 else: 

355 self.headers.add(key, value) 

356 

357 def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None: 

358 self.skip_auto_headers = CIMultiDict( 

359 (hdr, None) for hdr in sorted(skip_auto_headers) 

360 ) 

361 used_headers = self.headers.copy() 

362 used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type] 

363 

364 for hdr, val in self.DEFAULT_HEADERS.items(): 

365 if hdr not in used_headers: 

366 self.headers.add(hdr, val) 

367 

368 if hdrs.USER_AGENT not in used_headers: 

369 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

370 

371 def update_cookies(self, cookies: Optional[LooseCookies]) -> None: 

372 """Update request cookies header.""" 

373 if not cookies: 

374 return 

375 

376 c: SimpleCookie[str] = SimpleCookie() 

377 if hdrs.COOKIE in self.headers: 

378 c.load(self.headers.get(hdrs.COOKIE, "")) 

379 del self.headers[hdrs.COOKIE] 

380 

381 if isinstance(cookies, Mapping): 

382 iter_cookies = cookies.items() 

383 else: 

384 iter_cookies = cookies # type: ignore[assignment] 

385 for name, value in iter_cookies: 

386 if isinstance(value, Morsel): 

387 # Preserve coded_value 

388 mrsl_val = value.get(value.key, Morsel()) 

389 mrsl_val.set(value.key, value.value, value.coded_value) 

390 c[name] = mrsl_val 

391 else: 

392 c[name] = value # type: ignore[assignment] 

393 

394 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

395 

396 def update_content_encoding(self, data: Any) -> None: 

397 """Set request content encoding.""" 

398 if data is None: 

399 return 

400 

401 enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower() 

402 if enc: 

403 if self.compress: 

404 raise ValueError( 

405 "compress can not be set " "if Content-Encoding header is set" 

406 ) 

407 elif self.compress: 

408 if not isinstance(self.compress, str): 

409 self.compress = "deflate" 

410 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

411 self.chunked = True # enable chunked, no need to deal with length 

412 

413 def update_transfer_encoding(self) -> None: 

414 """Analyze transfer-encoding header.""" 

415 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

416 

417 if "chunked" in te: 

418 if self.chunked: 

419 raise ValueError( 

420 "chunked can not be set " 

421 'if "Transfer-Encoding: chunked" header is set' 

422 ) 

423 

424 elif self.chunked: 

425 if hdrs.CONTENT_LENGTH in self.headers: 

426 raise ValueError( 

427 "chunked can not be set " "if Content-Length header is set" 

428 ) 

429 

430 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

431 else: 

432 if hdrs.CONTENT_LENGTH not in self.headers: 

433 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body)) 

434 

435 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None: 

436 """Set basic auth.""" 

437 if auth is None: 

438 auth = self.auth 

439 if auth is None and trust_env and self.url.host is not None: 

440 netrc_obj = netrc_from_env() 

441 with contextlib.suppress(LookupError): 

442 auth = basicauth_from_netrc(netrc_obj, self.url.host) 

443 if auth is None: 

444 return 

445 

446 if not isinstance(auth, helpers.BasicAuth): 

447 raise TypeError("BasicAuth() tuple is required instead") 

448 

449 self.headers[hdrs.AUTHORIZATION] = auth.encode() 

450 

451 def update_body_from_data(self, body: Any) -> None: 

452 if body is None: 

453 return 

454 

455 # FormData 

456 if isinstance(body, FormData): 

457 body = body() 

458 

459 try: 

460 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None) 

461 except payload.LookupError: 

462 boundary = None 

463 if CONTENT_TYPE in self.headers: 

464 boundary = parse_mimetype(self.headers[CONTENT_TYPE]).parameters.get( 

465 "boundary" 

466 ) 

467 body = FormData(body, boundary=boundary)() 

468 

469 self.body = body 

470 

471 # enable chunked encoding if needed 

472 if not self.chunked: 

473 if hdrs.CONTENT_LENGTH not in self.headers: 

474 size = body.size 

475 if size is None: 

476 self.chunked = True 

477 else: 

478 if hdrs.CONTENT_LENGTH not in self.headers: 

479 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

480 

481 # copy payload headers 

482 assert body.headers 

483 for key, value in body.headers.items(): 

484 if key in self.headers: 

485 continue 

486 if key in self.skip_auto_headers: 

487 continue 

488 self.headers[key] = value 

489 

490 def update_expect_continue(self, expect: bool = False) -> None: 

491 if expect: 

492 self.headers[hdrs.EXPECT] = "100-continue" 

493 elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue": 

494 expect = True 

495 

496 if expect: 

497 self._continue = self.loop.create_future() 

498 

499 def update_proxy( 

500 self, 

501 proxy: Optional[URL], 

502 proxy_auth: Optional[BasicAuth], 

503 proxy_headers: Optional[LooseHeaders], 

504 ) -> None: 

505 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 

506 raise ValueError("proxy_auth must be None or BasicAuth() tuple") 

507 self.proxy = proxy 

508 self.proxy_auth = proxy_auth 

509 self.proxy_headers = proxy_headers 

510 

511 def keep_alive(self) -> bool: 

512 if self.version < HttpVersion10: 

513 # keep alive not supported at all 

514 return False 

515 if self.version == HttpVersion10: 

516 if self.headers.get(hdrs.CONNECTION) == "keep-alive": 

517 return True 

518 else: # no headers means we close for Http 1.0 

519 return False 

520 elif self.headers.get(hdrs.CONNECTION) == "close": 

521 return False 

522 

523 return True 

524 

525 async def write_bytes( 

526 self, writer: AbstractStreamWriter, conn: "Connection" 

527 ) -> None: 

528 """Support coroutines that yields bytes objects.""" 

529 # 100 response 

530 if self._continue is not None: 

531 await writer.drain() 

532 await self._continue 

533 

534 protocol = conn.protocol 

535 assert protocol is not None 

536 try: 

537 if isinstance(self.body, payload.Payload): 

538 await self.body.write(writer) 

539 else: 

540 if isinstance(self.body, (bytes, bytearray)): 

541 self.body = (self.body,) # type: ignore[assignment] 

542 

543 for chunk in self.body: 

544 await writer.write(chunk) # type: ignore[arg-type] 

545 

546 await writer.write_eof() 

547 except OSError as exc: 

548 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

549 protocol.set_exception(exc) 

550 else: 

551 new_exc = ClientOSError( 

552 exc.errno, "Can not write request body for %s" % self.url 

553 ) 

554 new_exc.__context__ = exc 

555 new_exc.__cause__ = exc 

556 protocol.set_exception(new_exc) 

557 except asyncio.CancelledError as exc: 

558 if not conn.closed: 

559 protocol.set_exception(exc) 

560 except Exception as exc: 

561 protocol.set_exception(exc) 

562 else: 

563 protocol.start_timeout() 

564 finally: 

565 self._writer = None 

566 

567 async def send(self, conn: "Connection") -> "ClientResponse": 

568 # Specify request target: 

569 # - CONNECT request must send authority form URI 

570 # - not CONNECT proxy must send absolute form URI 

571 # - most common is origin form URI 

572 if self.method == hdrs.METH_CONNECT: 

573 connect_host = self.url.raw_host 

574 assert connect_host is not None 

575 if helpers.is_ipv6_address(connect_host): 

576 connect_host = f"[{connect_host}]" 

577 path = f"{connect_host}:{self.url.port}" 

578 elif self.proxy and not self.is_ssl(): 

579 path = str(self.url) 

580 else: 

581 path = self.url.raw_path 

582 if self.url.raw_query_string: 

583 path += "?" + self.url.raw_query_string 

584 

585 protocol = conn.protocol 

586 assert protocol is not None 

587 writer = StreamWriter( 

588 protocol, 

589 self.loop, 

590 on_chunk_sent=functools.partial( 

591 self._on_chunk_request_sent, self.method, self.url 

592 ), 

593 on_headers_sent=functools.partial( 

594 self._on_headers_request_sent, self.method, self.url 

595 ), 

596 ) 

597 

598 if self.compress: 

599 writer.enable_compression(self.compress) 

600 

601 if self.chunked is not None: 

602 writer.enable_chunking() 

603 

604 # set default content-type 

605 if ( 

606 self.method in self.POST_METHODS 

607 and hdrs.CONTENT_TYPE not in self.skip_auto_headers 

608 and hdrs.CONTENT_TYPE not in self.headers 

609 ): 

610 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

611 

612 # set the connection header 

613 connection = self.headers.get(hdrs.CONNECTION) 

614 if not connection: 

615 if self.keep_alive(): 

616 if self.version == HttpVersion10: 

617 connection = "keep-alive" 

618 else: 

619 if self.version == HttpVersion11: 

620 connection = "close" 

621 

622 if connection is not None: 

623 self.headers[hdrs.CONNECTION] = connection 

624 

625 # status + headers 

626 status_line = "{0} {1} HTTP/{2[0]}.{2[1]}".format( 

627 self.method, path, self.version 

628 ) 

629 await writer.write_headers(status_line, self.headers) 

630 

631 self._writer = self.loop.create_task(self.write_bytes(writer, conn)) 

632 

633 response_class = self.response_class 

634 assert response_class is not None 

635 self.response = response_class( 

636 self.method, 

637 self.original_url, 

638 writer=self._writer, 

639 continue100=self._continue, 

640 timer=self._timer, 

641 request_info=self.request_info, 

642 traces=self._traces, 

643 loop=self.loop, 

644 session=self._session, 

645 ) 

646 return self.response 

647 

648 async def close(self) -> None: 

649 if self._writer is not None: 

650 try: 

651 await self._writer 

652 finally: 

653 self._writer = None 

654 

655 def terminate(self) -> None: 

656 if self._writer is not None: 

657 if not self.loop.is_closed(): 

658 self._writer.cancel() 

659 self._writer = None 

660 

661 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

662 for trace in self._traces: 

663 await trace.send_request_chunk_sent(method, url, chunk) 

664 

665 async def _on_headers_request_sent( 

666 self, method: str, url: URL, headers: "CIMultiDict[str]" 

667 ) -> None: 

668 for trace in self._traces: 

669 await trace.send_request_headers(method, url, headers) 

670 

671 

672class ClientResponse(HeadersMixin): 

673 # Some of these attributes are None when created, 

674 # but will be set by the start() method. 

675 # As the end user will likely never see the None values, we cheat the types below. 

676 # from the Status-Line of the response 

677 version = None # HTTP-Version 

678 status: int = None # type: ignore[assignment] # Status-Code 

679 reason = None # Reason-Phrase 

680 

681 content: StreamReader = None # type: ignore[assignment] # Payload stream 

682 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment] 

683 _raw_headers: RawHeaders = None # type: ignore[assignment] 

684 

685 _connection = None # current connection 

686 _source_traceback = None 

687 # set up by ClientRequest after ClientResponse object creation 

688 # post-init stage allows to not change ctor signature 

689 _closed = True # to allow __del__ for non-initialized properly response 

690 _released = False 

691 

692 def __init__( 

693 self, 

694 method: str, 

695 url: URL, 

696 *, 

697 writer: "asyncio.Task[None]", 

698 continue100: Optional["asyncio.Future[bool]"], 

699 timer: BaseTimerContext, 

700 request_info: RequestInfo, 

701 traces: List["Trace"], 

702 loop: asyncio.AbstractEventLoop, 

703 session: "ClientSession", 

704 ) -> None: 

705 assert isinstance(url, URL) 

706 super().__init__() 

707 

708 self.method = method 

709 self.cookies: SimpleCookie[str] = SimpleCookie() 

710 

711 self._real_url = url 

712 self._url = url.with_fragment(None) 

713 self._body: Optional[bytes] = None 

714 self._writer: Optional[asyncio.Task[None]] = writer 

715 self._continue = continue100 # None by default 

716 self._closed = True 

717 self._history: Tuple[ClientResponse, ...] = () 

718 self._request_info = request_info 

719 self._timer = timer if timer is not None else TimerNoop() 

720 self._cache: Dict[str, Any] = {} 

721 self._traces = traces 

722 self._loop = loop 

723 # store a reference to session #1985 

724 self._session: Optional[ClientSession] = session 

725 if loop.get_debug(): 

726 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

727 

728 @reify 

729 def url(self) -> URL: 

730 return self._url 

731 

732 @reify 

733 def real_url(self) -> URL: 

734 return self._real_url 

735 

736 @reify 

737 def host(self) -> str: 

738 assert self._url.host is not None 

739 return self._url.host 

740 

741 @reify 

742 def headers(self) -> "CIMultiDictProxy[str]": 

743 return self._headers 

744 

745 @reify 

746 def raw_headers(self) -> RawHeaders: 

747 return self._raw_headers 

748 

749 @reify 

750 def request_info(self) -> RequestInfo: 

751 return self._request_info 

752 

753 @reify 

754 def content_disposition(self) -> Optional[ContentDisposition]: 

755 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

756 if raw is None: 

757 return None 

758 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

759 params = MappingProxyType(params_dct) 

760 filename = multipart.content_disposition_filename(params) 

761 return ContentDisposition(disposition_type, params, filename) 

762 

763 def __del__(self, _warnings: Any = warnings) -> None: 

764 if self._closed: 

765 return 

766 

767 if self._connection is not None: 

768 self._connection.release() 

769 self._cleanup_writer() 

770 

771 if self._loop.get_debug(): 

772 _warnings.warn( 

773 f"Unclosed response {self!r}", ResourceWarning, source=self 

774 ) 

775 context = {"client_response": self, "message": "Unclosed response"} 

776 if self._source_traceback: 

777 context["source_traceback"] = self._source_traceback 

778 self._loop.call_exception_handler(context) 

779 

780 def __repr__(self) -> str: 

781 out = io.StringIO() 

782 ascii_encodable_url = str(self.url) 

783 if self.reason: 

784 ascii_encodable_reason = self.reason.encode( 

785 "ascii", "backslashreplace" 

786 ).decode("ascii") 

787 else: 

788 ascii_encodable_reason = self.reason 

789 print( 

790 "<ClientResponse({}) [{} {}]>".format( 

791 ascii_encodable_url, self.status, ascii_encodable_reason 

792 ), 

793 file=out, 

794 ) 

795 print(self.headers, file=out) 

796 return out.getvalue() 

797 

798 @property 

799 def connection(self) -> Optional["Connection"]: 

800 return self._connection 

801 

802 @reify 

803 def history(self) -> Tuple["ClientResponse", ...]: 

804 """A sequence of responses, if redirects occurred.""" 

805 return self._history 

806 

807 @reify 

808 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]": 

809 links_str = ", ".join(self.headers.getall("link", [])) 

810 

811 if not links_str: 

812 return MultiDictProxy(MultiDict()) 

813 

814 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict() 

815 

816 for val in re.split(r",(?=\s*<)", links_str): 

817 match = re.match(r"\s*<(.*)>(.*)", val) 

818 if match is None: # pragma: no cover 

819 # the check exists to suppress mypy error 

820 continue 

821 url, params_str = match.groups() 

822 params = params_str.split(";")[1:] 

823 

824 link: MultiDict[Union[str, URL]] = MultiDict() 

825 

826 for param in params: 

827 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

828 if match is None: # pragma: no cover 

829 # the check exists to suppress mypy error 

830 continue 

831 key, _, value, _ = match.groups() 

832 

833 link.add(key, value) 

834 

835 key = link.get("rel", url) 

836 

837 link.add("url", self.url.join(URL(url))) 

838 

839 links.add(str(key), MultiDictProxy(link)) 

840 

841 return MultiDictProxy(links) 

842 

843 async def start(self, connection: "Connection") -> "ClientResponse": 

844 """Start response processing.""" 

845 self._closed = False 

846 self._protocol = connection.protocol 

847 self._connection = connection 

848 

849 with self._timer: 

850 while True: 

851 # read response 

852 try: 

853 protocol = self._protocol 

854 message, payload = await protocol.read() # type: ignore[union-attr] 

855 except http.HttpProcessingError as exc: 

856 raise ClientResponseError( 

857 self.request_info, 

858 self.history, 

859 status=exc.code, 

860 message=exc.message, 

861 headers=exc.headers, 

862 ) from exc 

863 

864 if message.code < 100 or message.code > 199 or message.code == 101: 

865 break 

866 

867 if self._continue is not None: 

868 set_result(self._continue, True) 

869 self._continue = None 

870 

871 # payload eof handler 

872 payload.on_eof(self._response_eof) 

873 

874 # response status 

875 self.version = message.version 

876 self.status = message.code 

877 self.reason = message.reason 

878 

879 # headers 

880 self._headers = message.headers # type is CIMultiDictProxy 

881 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes] 

882 

883 # payload 

884 self.content = payload 

885 

886 # cookies 

887 for hdr in self.headers.getall(hdrs.SET_COOKIE, ()): 

888 try: 

889 self.cookies.load(hdr) 

890 except CookieError as exc: 

891 client_logger.warning("Can not load response cookies: %s", exc) 

892 return self 

893 

894 def _response_eof(self) -> None: 

895 if self._closed: 

896 return 

897 

898 if self._connection is not None: 

899 # websocket, protocol could be None because 

900 # connection could be detached 

901 if ( 

902 self._connection.protocol is not None 

903 and self._connection.protocol.upgraded 

904 ): 

905 return 

906 

907 self._connection.release() 

908 self._connection = None 

909 

910 self._closed = True 

911 self._cleanup_writer() 

912 

913 @property 

914 def closed(self) -> bool: 

915 return self._closed 

916 

917 def close(self) -> None: 

918 if not self._released: 

919 self._notify_content() 

920 if self._closed: 

921 return 

922 

923 self._closed = True 

924 if self._loop is None or self._loop.is_closed(): 

925 return 

926 

927 if self._connection is not None: 

928 self._connection.close() 

929 self._connection = None 

930 self._cleanup_writer() 

931 

932 def release(self) -> Any: 

933 if not self._released: 

934 self._notify_content() 

935 if self._closed: 

936 return noop() 

937 

938 self._closed = True 

939 if self._connection is not None: 

940 self._connection.release() 

941 self._connection = None 

942 

943 self._cleanup_writer() 

944 return noop() 

945 

946 @property 

947 def ok(self) -> bool: 

948 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

949 

950 This is **not** a check for ``200 OK`` but a check that the response 

951 status is under 400. 

952 """ 

953 return 400 > self.status 

954 

955 def raise_for_status(self) -> None: 

956 if not self.ok: 

957 # reason should always be not None for a started response 

958 assert self.reason is not None 

959 self.release() 

960 raise ClientResponseError( 

961 self.request_info, 

962 self.history, 

963 status=self.status, 

964 message=self.reason, 

965 headers=self.headers, 

966 ) 

967 

968 def _cleanup_writer(self) -> None: 

969 if self._writer is not None: 

970 self._writer.cancel() 

971 self._writer = None 

972 self._session = None 

973 

974 def _notify_content(self) -> None: 

975 content = self.content 

976 if content and content.exception() is None: 

977 content.set_exception(ClientConnectionError("Connection closed")) 

978 self._released = True 

979 

980 async def wait_for_close(self) -> None: 

981 if self._writer is not None: 

982 try: 

983 await self._writer 

984 finally: 

985 self._writer = None 

986 self.release() 

987 

988 async def read(self) -> bytes: 

989 """Read response payload.""" 

990 if self._body is None: 

991 try: 

992 self._body = await self.content.read() 

993 for trace in self._traces: 

994 await trace.send_response_chunk_received( 

995 self.method, self.url, self._body 

996 ) 

997 except BaseException: 

998 self.close() 

999 raise 

1000 elif self._released: 

1001 raise ClientConnectionError("Connection closed") 

1002 

1003 return self._body 

1004 

1005 def get_encoding(self) -> str: 

1006 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

1007 mimetype = helpers.parse_mimetype(ctype) 

1008 

1009 encoding = mimetype.parameters.get("charset") 

1010 if encoding: 

1011 try: 

1012 codecs.lookup(encoding) 

1013 except LookupError: 

1014 encoding = None 

1015 if not encoding: 

1016 if mimetype.type == "application" and ( 

1017 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

1018 ): 

1019 # RFC 7159 states that the default encoding is UTF-8. 

1020 # RFC 7483 defines application/rdap+json 

1021 encoding = "utf-8" 

1022 elif self._body is None: 

1023 raise RuntimeError( 

1024 "Cannot guess the encoding of " "a not yet read body" 

1025 ) 

1026 else: 

1027 encoding = chardet.detect(self._body)["encoding"] 

1028 if not encoding: 

1029 encoding = "utf-8" 

1030 

1031 return encoding 

1032 

1033 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: 

1034 """Read response payload and decode.""" 

1035 if self._body is None: 

1036 await self.read() 

1037 

1038 if encoding is None: 

1039 encoding = self.get_encoding() 

1040 

1041 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr] 

1042 

1043 async def json( 

1044 self, 

1045 *, 

1046 encoding: Optional[str] = None, 

1047 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

1048 content_type: Optional[str] = "application/json", 

1049 ) -> Any: 

1050 """Read and decodes JSON response.""" 

1051 if self._body is None: 

1052 await self.read() 

1053 

1054 if content_type: 

1055 if not is_expected_content_type(self.content_type, content_type): 

1056 raise ContentTypeError( 

1057 self.request_info, 

1058 self.history, 

1059 message=( 

1060 "Attempt to decode JSON with " 

1061 "unexpected mimetype: %s" % self.content_type 

1062 ), 

1063 headers=self.headers, 

1064 ) 

1065 

1066 if encoding is None: 

1067 encoding = self.get_encoding() 

1068 

1069 return loads(self._body.decode(encoding)) # type: ignore[union-attr] 

1070 

1071 async def __aenter__(self) -> "ClientResponse": 

1072 return self 

1073 

1074 async def __aexit__( 

1075 self, 

1076 exc_type: Optional[Type[BaseException]], 

1077 exc_val: Optional[BaseException], 

1078 exc_tb: Optional[TracebackType], 

1079 ) -> None: 

1080 # similar to _RequestContextManager, we do not need to check 

1081 # for exceptions, response object can close connection 

1082 # if state is broken 

1083 self.release()