Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_reqrep.py: 41%

680 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import asyncio 

2import codecs 

3import contextlib 

4import functools 

5import io 

6import re 

7import sys 

8import traceback 

9import warnings 

10from hashlib import md5, sha1, sha256 

11from http.cookies import CookieError, Morsel, SimpleCookie 

12from types import MappingProxyType, TracebackType 

13from typing import ( 

14 TYPE_CHECKING, 

15 Any, 

16 Callable, 

17 Dict, 

18 Iterable, 

19 List, 

20 Literal, 

21 Mapping, 

22 Optional, 

23 Tuple, 

24 Type, 

25 Union, 

26 cast, 

27) 

28 

29import attr 

30from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

31from yarl import URL 

32 

33from . import hdrs, helpers, http, multipart, payload 

34from .abc import AbstractStreamWriter 

35from .client_exceptions import ( 

36 ClientConnectionError, 

37 ClientOSError, 

38 ClientResponseError, 

39 ContentTypeError, 

40 InvalidURL, 

41 ServerFingerprintMismatch, 

42) 

43from .compression_utils import HAS_BROTLI 

44from .formdata import FormData 

45from .helpers import ( 

46 BaseTimerContext, 

47 BasicAuth, 

48 HeadersMixin, 

49 TimerNoop, 

50 basicauth_from_netrc, 

51 netrc_from_env, 

52 noop, 

53 reify, 

54 set_result, 

55) 

56from .http import ( 

57 SERVER_SOFTWARE, 

58 HttpVersion, 

59 HttpVersion10, 

60 HttpVersion11, 

61 StreamWriter, 

62) 

63from .log import client_logger 

64from .streams import StreamReader 

65from .typedefs import ( 

66 DEFAULT_JSON_DECODER, 

67 JSONDecoder, 

68 LooseCookies, 

69 LooseHeaders, 

70 RawHeaders, 

71) 

72 

73try: 

74 import ssl 

75 from ssl import SSLContext 

76except ImportError: # pragma: no cover 

77 ssl = None # type: ignore[assignment] 

78 SSLContext = object # type: ignore[misc,assignment] 

79 

80 

81__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

82 

83 

84if TYPE_CHECKING: # pragma: no cover 

85 from .client import ClientSession 

86 from .connector import Connection 

87 from .tracing import Trace 

88 

89 

90_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

91json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json") 

92 

93 

94def _gen_default_accept_encoding() -> str: 

95 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate" 

96 

97 

98@attr.s(auto_attribs=True, frozen=True, slots=True) 

99class ContentDisposition: 

100 type: Optional[str] 

101 parameters: "MappingProxyType[str, str]" 

102 filename: Optional[str] 

103 

104 

105@attr.s(auto_attribs=True, frozen=True, slots=True) 

106class RequestInfo: 

107 url: URL 

108 method: str 

109 headers: "CIMultiDictProxy[str]" 

110 real_url: URL = attr.ib() 

111 

112 @real_url.default 

113 def real_url_default(self) -> URL: 

114 return self.url 

115 

116 

117class Fingerprint: 

118 HASHFUNC_BY_DIGESTLEN = { 

119 16: md5, 

120 20: sha1, 

121 32: sha256, 

122 } 

123 

124 def __init__(self, fingerprint: bytes) -> None: 

125 digestlen = len(fingerprint) 

126 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

127 if not hashfunc: 

128 raise ValueError("fingerprint has invalid length") 

129 elif hashfunc is md5 or hashfunc is sha1: 

130 raise ValueError( 

131 "md5 and sha1 are insecure and " "not supported. Use sha256." 

132 ) 

133 self._hashfunc = hashfunc 

134 self._fingerprint = fingerprint 

135 

136 @property 

137 def fingerprint(self) -> bytes: 

138 return self._fingerprint 

139 

140 def check(self, transport: asyncio.Transport) -> None: 

141 if not transport.get_extra_info("sslcontext"): 

142 return 

143 sslobj = transport.get_extra_info("ssl_object") 

144 cert = sslobj.getpeercert(binary_form=True) 

145 got = self._hashfunc(cert).digest() 

146 if got != self._fingerprint: 

147 host, port, *_ = transport.get_extra_info("peername") 

148 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

149 

150 

151if ssl is not None: 

152 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None)) 

153else: # pragma: no cover 

154 SSL_ALLOWED_TYPES = type(None) 

155 

156 

157def _merge_ssl_params( 

158 ssl: Union["SSLContext", Literal[False], Fingerprint, None], 

159 verify_ssl: Optional[bool], 

160 ssl_context: Optional["SSLContext"], 

161 fingerprint: Optional[bytes], 

162) -> Union["SSLContext", Literal[False], Fingerprint, None]: 

163 if verify_ssl is not None and not verify_ssl: 

164 warnings.warn( 

165 "verify_ssl is deprecated, use ssl=False instead", 

166 DeprecationWarning, 

167 stacklevel=3, 

168 ) 

169 if ssl is not None: 

170 raise ValueError( 

171 "verify_ssl, ssl_context, fingerprint and ssl " 

172 "parameters are mutually exclusive" 

173 ) 

174 else: 

175 ssl = False 

176 if ssl_context is not None: 

177 warnings.warn( 

178 "ssl_context is deprecated, use ssl=context instead", 

179 DeprecationWarning, 

180 stacklevel=3, 

181 ) 

182 if ssl is not None: 

183 raise ValueError( 

184 "verify_ssl, ssl_context, fingerprint and ssl " 

185 "parameters are mutually exclusive" 

186 ) 

187 else: 

188 ssl = ssl_context 

189 if fingerprint is not None: 

190 warnings.warn( 

191 "fingerprint is deprecated, " "use ssl=Fingerprint(fingerprint) instead", 

192 DeprecationWarning, 

193 stacklevel=3, 

194 ) 

195 if ssl is not None: 

196 raise ValueError( 

197 "verify_ssl, ssl_context, fingerprint and ssl " 

198 "parameters are mutually exclusive" 

199 ) 

200 else: 

201 ssl = Fingerprint(fingerprint) 

202 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

203 raise TypeError( 

204 "ssl should be SSLContext, bool, Fingerprint or None, " 

205 "got {!r} instead.".format(ssl) 

206 ) 

207 return ssl 

208 

209 

210@attr.s(auto_attribs=True, slots=True, frozen=True) 

211class ConnectionKey: 

212 # the key should contain an information about used proxy / TLS 

213 # to prevent reusing wrong connections from a pool 

214 host: str 

215 port: Optional[int] 

216 is_ssl: bool 

217 ssl: Union[SSLContext, None, Literal[False], Fingerprint] 

218 proxy: Optional[URL] 

219 proxy_auth: Optional[BasicAuth] 

220 proxy_headers_hash: Optional[int] # hash(CIMultiDict) 

221 

222 

223def _is_expected_content_type( 

224 response_content_type: str, expected_content_type: str 

225) -> bool: 

226 if expected_content_type == "application/json": 

227 return json_re.match(response_content_type) is not None 

228 return expected_content_type in response_content_type 

229 

230 

231class ClientRequest: 

232 GET_METHODS = { 

233 hdrs.METH_GET, 

234 hdrs.METH_HEAD, 

235 hdrs.METH_OPTIONS, 

236 hdrs.METH_TRACE, 

237 } 

238 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

239 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 

240 

241 DEFAULT_HEADERS = { 

242 hdrs.ACCEPT: "*/*", 

243 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

244 } 

245 

246 body = b"" 

247 auth = None 

248 response = None 

249 

250 __writer = None # async task for streaming data 

251 _continue = None # waiter future for '100 Continue' response 

252 

253 # N.B. 

254 # Adding __del__ method with self._writer closing doesn't make sense 

255 # because _writer is instance method, thus it keeps a reference to self. 

256 # Until writer has finished finalizer will not be called. 

257 

258 def __init__( 

259 self, 

260 method: str, 

261 url: URL, 

262 *, 

263 params: Optional[Mapping[str, str]] = None, 

264 headers: Optional[LooseHeaders] = None, 

265 skip_auto_headers: Iterable[str] = frozenset(), 

266 data: Any = None, 

267 cookies: Optional[LooseCookies] = None, 

268 auth: Optional[BasicAuth] = None, 

269 version: http.HttpVersion = http.HttpVersion11, 

270 compress: Optional[str] = None, 

271 chunked: Optional[bool] = None, 

272 expect100: bool = False, 

273 loop: Optional[asyncio.AbstractEventLoop] = None, 

274 response_class: Optional[Type["ClientResponse"]] = None, 

275 proxy: Optional[URL] = None, 

276 proxy_auth: Optional[BasicAuth] = None, 

277 timer: Optional[BaseTimerContext] = None, 

278 session: Optional["ClientSession"] = None, 

279 ssl: Union[SSLContext, Literal[False], Fingerprint, None] = None, 

280 proxy_headers: Optional[LooseHeaders] = None, 

281 traces: Optional[List["Trace"]] = None, 

282 trust_env: bool = False, 

283 server_hostname: Optional[str] = None, 

284 ): 

285 if loop is None: 

286 loop = asyncio.get_event_loop() 

287 

288 match = _CONTAINS_CONTROL_CHAR_RE.search(method) 

289 if match: 

290 raise ValueError( 

291 f"Method cannot contain non-token characters {method!r} " 

292 "(found at least {match.group()!r})" 

293 ) 

294 

295 assert isinstance(url, URL), url 

296 assert isinstance(proxy, (URL, type(None))), proxy 

297 # FIXME: session is None in tests only, need to fix tests 

298 # assert session is not None 

299 self._session = cast("ClientSession", session) 

300 if params: 

301 q = MultiDict(url.query) 

302 url2 = url.with_query(params) 

303 q.extend(url2.query) 

304 url = url.with_query(q) 

305 self.original_url = url 

306 self.url = url.with_fragment(None) 

307 self.method = method.upper() 

308 self.chunked = chunked 

309 self.compress = compress 

310 self.loop = loop 

311 self.length = None 

312 if response_class is None: 

313 real_response_class = ClientResponse 

314 else: 

315 real_response_class = response_class 

316 self.response_class: Type[ClientResponse] = real_response_class 

317 self._timer = timer if timer is not None else TimerNoop() 

318 self._ssl = ssl 

319 self.server_hostname = server_hostname 

320 

321 if loop.get_debug(): 

322 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

323 

324 self.update_version(version) 

325 self.update_host(url) 

326 self.update_headers(headers) 

327 self.update_auto_headers(skip_auto_headers) 

328 self.update_cookies(cookies) 

329 self.update_content_encoding(data) 

330 self.update_auth(auth, trust_env) 

331 self.update_proxy(proxy, proxy_auth, proxy_headers) 

332 

333 self.update_body_from_data(data) 

334 if data is not None or self.method not in self.GET_METHODS: 

335 self.update_transfer_encoding() 

336 self.update_expect_continue(expect100) 

337 if traces is None: 

338 traces = [] 

339 self._traces = traces 

340 

341 def __reset_writer(self, _: object = None) -> None: 

342 self.__writer = None 

343 

344 @property 

345 def _writer(self) -> Optional["asyncio.Task[None]"]: 

346 return self.__writer 

347 

348 @_writer.setter 

349 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 

350 if self.__writer is not None: 

351 self.__writer.remove_done_callback(self.__reset_writer) 

352 self.__writer = writer 

353 if writer is not None: 

354 writer.add_done_callback(self.__reset_writer) 

355 

356 def is_ssl(self) -> bool: 

357 return self.url.scheme in ("https", "wss") 

358 

359 @property 

360 def ssl(self) -> Union["SSLContext", None, Literal[False], Fingerprint]: 

361 return self._ssl 

362 

363 @property 

364 def connection_key(self) -> ConnectionKey: 

365 proxy_headers = self.proxy_headers 

366 if proxy_headers: 

367 h: Optional[int] = hash(tuple((k, v) for k, v in proxy_headers.items())) 

368 else: 

369 h = None 

370 return ConnectionKey( 

371 self.host, 

372 self.port, 

373 self.is_ssl(), 

374 self.ssl, 

375 self.proxy, 

376 self.proxy_auth, 

377 h, 

378 ) 

379 

380 @property 

381 def host(self) -> str: 

382 ret = self.url.raw_host 

383 assert ret is not None 

384 return ret 

385 

386 @property 

387 def port(self) -> Optional[int]: 

388 return self.url.port 

389 

390 @property 

391 def request_info(self) -> RequestInfo: 

392 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 

393 return RequestInfo(self.url, self.method, headers, self.original_url) 

394 

395 def update_host(self, url: URL) -> None: 

396 """Update destination host, port and connection type (ssl).""" 

397 # get host/port 

398 if not url.raw_host: 

399 raise InvalidURL(url) 

400 

401 # basic auth info 

402 username, password = url.user, url.password 

403 if username: 

404 self.auth = helpers.BasicAuth(username, password or "") 

405 

406 def update_version(self, version: Union[http.HttpVersion, str]) -> None: 

407 """Convert request version to two elements tuple. 

408 

409 parser HTTP version '1.1' => (1, 1) 

410 """ 

411 if isinstance(version, str): 

412 v = [part.strip() for part in version.split(".", 1)] 

413 try: 

414 version = http.HttpVersion(int(v[0]), int(v[1])) 

415 except ValueError: 

416 raise ValueError( 

417 f"Can not parse http version number: {version}" 

418 ) from None 

419 self.version = version 

420 

421 def update_headers(self, headers: Optional[LooseHeaders]) -> None: 

422 """Update request headers.""" 

423 self.headers: CIMultiDict[str] = CIMultiDict() 

424 

425 # add host 

426 netloc = cast(str, self.url.raw_host) 

427 if helpers.is_ipv6_address(netloc): 

428 netloc = f"[{netloc}]" 

429 # See https://github.com/aio-libs/aiohttp/issues/3636. 

430 netloc = netloc.rstrip(".") 

431 if self.url.port is not None and not self.url.is_default_port(): 

432 netloc += ":" + str(self.url.port) 

433 self.headers[hdrs.HOST] = netloc 

434 

435 if headers: 

436 if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 

437 headers = headers.items() # type: ignore[assignment] 

438 

439 for key, value in headers: # type: ignore[misc] 

440 # A special case for Host header 

441 if key.lower() == "host": 

442 self.headers[key] = value 

443 else: 

444 self.headers.add(key, value) 

445 

446 def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None: 

447 self.skip_auto_headers = CIMultiDict( 

448 (hdr, None) for hdr in sorted(skip_auto_headers) 

449 ) 

450 used_headers = self.headers.copy() 

451 used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type] 

452 

453 for hdr, val in self.DEFAULT_HEADERS.items(): 

454 if hdr not in used_headers: 

455 self.headers.add(hdr, val) 

456 

457 if hdrs.USER_AGENT not in used_headers: 

458 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

459 

460 def update_cookies(self, cookies: Optional[LooseCookies]) -> None: 

461 """Update request cookies header.""" 

462 if not cookies: 

463 return 

464 

465 c = SimpleCookie() 

466 if hdrs.COOKIE in self.headers: 

467 c.load(self.headers.get(hdrs.COOKIE, "")) 

468 del self.headers[hdrs.COOKIE] 

469 

470 if isinstance(cookies, Mapping): 

471 iter_cookies = cookies.items() 

472 else: 

473 iter_cookies = cookies # type: ignore[assignment] 

474 for name, value in iter_cookies: 

475 if isinstance(value, Morsel): 

476 # Preserve coded_value 

477 mrsl_val = value.get(value.key, Morsel()) 

478 mrsl_val.set(value.key, value.value, value.coded_value) 

479 c[name] = mrsl_val 

480 else: 

481 c[name] = value # type: ignore[assignment] 

482 

483 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

484 

485 def update_content_encoding(self, data: Any) -> None: 

486 """Set request content encoding.""" 

487 if data is None: 

488 return 

489 

490 enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower() 

491 if enc: 

492 if self.compress: 

493 raise ValueError( 

494 "compress can not be set " "if Content-Encoding header is set" 

495 ) 

496 elif self.compress: 

497 if not isinstance(self.compress, str): 

498 self.compress = "deflate" 

499 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

500 self.chunked = True # enable chunked, no need to deal with length 

501 

502 def update_transfer_encoding(self) -> None: 

503 """Analyze transfer-encoding header.""" 

504 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

505 

506 if "chunked" in te: 

507 if self.chunked: 

508 raise ValueError( 

509 "chunked can not be set " 

510 'if "Transfer-Encoding: chunked" header is set' 

511 ) 

512 

513 elif self.chunked: 

514 if hdrs.CONTENT_LENGTH in self.headers: 

515 raise ValueError( 

516 "chunked can not be set " "if Content-Length header is set" 

517 ) 

518 

519 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

520 else: 

521 if hdrs.CONTENT_LENGTH not in self.headers: 

522 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body)) 

523 

524 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None: 

525 """Set basic auth.""" 

526 if auth is None: 

527 auth = self.auth 

528 if auth is None and trust_env and self.url.host is not None: 

529 netrc_obj = netrc_from_env() 

530 with contextlib.suppress(LookupError): 

531 auth = basicauth_from_netrc(netrc_obj, self.url.host) 

532 if auth is None: 

533 return 

534 

535 if not isinstance(auth, helpers.BasicAuth): 

536 raise TypeError("BasicAuth() tuple is required instead") 

537 

538 self.headers[hdrs.AUTHORIZATION] = auth.encode() 

539 

540 def update_body_from_data(self, body: Any) -> None: 

541 if body is None: 

542 return 

543 

544 # FormData 

545 if isinstance(body, FormData): 

546 body = body() 

547 

548 try: 

549 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None) 

550 except payload.LookupError: 

551 body = FormData(body)() 

552 

553 self.body = body 

554 

555 # enable chunked encoding if needed 

556 if not self.chunked: 

557 if hdrs.CONTENT_LENGTH not in self.headers: 

558 size = body.size 

559 if size is None: 

560 self.chunked = True 

561 else: 

562 if hdrs.CONTENT_LENGTH not in self.headers: 

563 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

564 

565 # copy payload headers 

566 assert body.headers 

567 for (key, value) in body.headers.items(): 

568 if key in self.headers: 

569 continue 

570 if key in self.skip_auto_headers: 

571 continue 

572 self.headers[key] = value 

573 

574 def update_expect_continue(self, expect: bool = False) -> None: 

575 if expect: 

576 self.headers[hdrs.EXPECT] = "100-continue" 

577 elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue": 

578 expect = True 

579 

580 if expect: 

581 self._continue = self.loop.create_future() 

582 

583 def update_proxy( 

584 self, 

585 proxy: Optional[URL], 

586 proxy_auth: Optional[BasicAuth], 

587 proxy_headers: Optional[LooseHeaders], 

588 ) -> None: 

589 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 

590 raise ValueError("proxy_auth must be None or BasicAuth() tuple") 

591 self.proxy = proxy 

592 self.proxy_auth = proxy_auth 

593 self.proxy_headers = proxy_headers 

594 

595 def keep_alive(self) -> bool: 

596 if self.version < HttpVersion10: 

597 # keep alive not supported at all 

598 return False 

599 if self.version == HttpVersion10: 

600 if self.headers.get(hdrs.CONNECTION) == "keep-alive": 

601 return True 

602 else: # no headers means we close for Http 1.0 

603 return False 

604 elif self.headers.get(hdrs.CONNECTION) == "close": 

605 return False 

606 

607 return True 

608 

609 async def write_bytes( 

610 self, writer: AbstractStreamWriter, conn: "Connection" 

611 ) -> None: 

612 """Support coroutines that yields bytes objects.""" 

613 # 100 response 

614 if self._continue is not None: 

615 try: 

616 await writer.drain() 

617 await self._continue 

618 except asyncio.CancelledError: 

619 return 

620 

621 protocol = conn.protocol 

622 assert protocol is not None 

623 try: 

624 if isinstance(self.body, payload.Payload): 

625 await self.body.write(writer) 

626 else: 

627 if isinstance(self.body, (bytes, bytearray)): 

628 self.body = (self.body,) # type: ignore[assignment] 

629 

630 for chunk in self.body: 

631 await writer.write(chunk) # type: ignore[arg-type] 

632 except OSError as exc: 

633 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

634 protocol.set_exception(exc) 

635 else: 

636 new_exc = ClientOSError( 

637 exc.errno, "Can not write request body for %s" % self.url 

638 ) 

639 new_exc.__context__ = exc 

640 new_exc.__cause__ = exc 

641 protocol.set_exception(new_exc) 

642 except asyncio.CancelledError: 

643 await writer.write_eof() 

644 except Exception as exc: 

645 protocol.set_exception(exc) 

646 else: 

647 await writer.write_eof() 

648 protocol.start_timeout() 

649 

650 async def send(self, conn: "Connection") -> "ClientResponse": 

651 # Specify request target: 

652 # - CONNECT request must send authority form URI 

653 # - not CONNECT proxy must send absolute form URI 

654 # - most common is origin form URI 

655 if self.method == hdrs.METH_CONNECT: 

656 connect_host = self.url.raw_host 

657 assert connect_host is not None 

658 if helpers.is_ipv6_address(connect_host): 

659 connect_host = f"[{connect_host}]" 

660 path = f"{connect_host}:{self.url.port}" 

661 elif self.proxy and not self.is_ssl(): 

662 path = str(self.url) 

663 else: 

664 path = self.url.raw_path 

665 if self.url.raw_query_string: 

666 path += "?" + self.url.raw_query_string 

667 

668 protocol = conn.protocol 

669 assert protocol is not None 

670 writer = StreamWriter( 

671 protocol, 

672 self.loop, 

673 on_chunk_sent=functools.partial( 

674 self._on_chunk_request_sent, self.method, self.url 

675 ), 

676 on_headers_sent=functools.partial( 

677 self._on_headers_request_sent, self.method, self.url 

678 ), 

679 ) 

680 

681 if self.compress: 

682 writer.enable_compression(self.compress) 

683 

684 if self.chunked is not None: 

685 writer.enable_chunking() 

686 

687 # set default content-type 

688 if ( 

689 self.method in self.POST_METHODS 

690 and hdrs.CONTENT_TYPE not in self.skip_auto_headers 

691 and hdrs.CONTENT_TYPE not in self.headers 

692 ): 

693 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

694 

695 # set the connection header 

696 connection = self.headers.get(hdrs.CONNECTION) 

697 if not connection: 

698 if self.keep_alive(): 

699 if self.version == HttpVersion10: 

700 connection = "keep-alive" 

701 else: 

702 if self.version == HttpVersion11: 

703 connection = "close" 

704 

705 if connection is not None: 

706 self.headers[hdrs.CONNECTION] = connection 

707 

708 # status + headers 

709 status_line = "{0} {1} HTTP/{v.major}.{v.minor}".format( 

710 self.method, path, v=self.version 

711 ) 

712 await writer.write_headers(status_line, self.headers) 

713 

714 self._writer = self.loop.create_task(self.write_bytes(writer, conn)) 

715 

716 response_class = self.response_class 

717 assert response_class is not None 

718 self.response = response_class( 

719 self.method, 

720 self.original_url, 

721 writer=self._writer, 

722 continue100=self._continue, 

723 timer=self._timer, 

724 request_info=self.request_info, 

725 traces=self._traces, 

726 loop=self.loop, 

727 session=self._session, 

728 ) 

729 return self.response 

730 

731 async def close(self) -> None: 

732 if self._writer is not None: 

733 with contextlib.suppress(asyncio.CancelledError): 

734 await self._writer 

735 

736 def terminate(self) -> None: 

737 if self._writer is not None: 

738 if not self.loop.is_closed(): 

739 self._writer.cancel() 

740 self._writer.remove_done_callback(self.__reset_writer) 

741 self._writer = None 

742 

743 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

744 for trace in self._traces: 

745 await trace.send_request_chunk_sent(method, url, chunk) 

746 

747 async def _on_headers_request_sent( 

748 self, method: str, url: URL, headers: "CIMultiDict[str]" 

749 ) -> None: 

750 for trace in self._traces: 

751 await trace.send_request_headers(method, url, headers) 

752 

753 

754class ClientResponse(HeadersMixin): 

755 

756 # Some of these attributes are None when created, 

757 # but will be set by the start() method. 

758 # As the end user will likely never see the None values, we cheat the types below. 

759 # from the Status-Line of the response 

760 version: Optional[HttpVersion] = None # HTTP-Version 

761 status: int = None # type: ignore[assignment] # Status-Code 

762 reason: Optional[str] = None # Reason-Phrase 

763 

764 content: StreamReader = None # type: ignore[assignment] # Payload stream 

765 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment] 

766 _raw_headers: RawHeaders = None # type: ignore[assignment] 

767 

768 _connection = None # current connection 

769 _source_traceback: Optional[traceback.StackSummary] = None 

770 # set up by ClientRequest after ClientResponse object creation 

771 # post-init stage allows to not change ctor signature 

772 _closed = True # to allow __del__ for non-initialized properly response 

773 _released = False 

774 __writer = None 

775 

776 def __init__( 

777 self, 

778 method: str, 

779 url: URL, 

780 *, 

781 writer: "asyncio.Task[None]", 

782 continue100: Optional["asyncio.Future[bool]"], 

783 timer: BaseTimerContext, 

784 request_info: RequestInfo, 

785 traces: List["Trace"], 

786 loop: asyncio.AbstractEventLoop, 

787 session: "ClientSession", 

788 ) -> None: 

789 assert isinstance(url, URL) 

790 

791 self.method = method 

792 self.cookies = SimpleCookie() 

793 

794 self._real_url = url 

795 self._url = url.with_fragment(None) 

796 self._body: Any = None 

797 self._writer: Optional[asyncio.Task[None]] = writer 

798 self._continue = continue100 # None by default 

799 self._closed = True 

800 self._history: Tuple[ClientResponse, ...] = () 

801 self._request_info = request_info 

802 self._timer = timer if timer is not None else TimerNoop() 

803 self._cache: Dict[str, Any] = {} 

804 self._traces = traces 

805 self._loop = loop 

806 # store a reference to session #1985 

807 self._session: Optional[ClientSession] = session 

808 # Save reference to _resolve_charset, so that get_encoding() will still 

809 # work after the response has finished reading the body. 

810 if session is None: 

811 # TODO: Fix session=None in tests (see ClientRequest.__init__). 

812 self._resolve_charset: Callable[ 

813 ["ClientResponse", bytes], str 

814 ] = lambda *_: "utf-8" 

815 else: 

816 self._resolve_charset = session._resolve_charset 

817 if loop.get_debug(): 

818 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

819 

820 def __reset_writer(self, _: object = None) -> None: 

821 self.__writer = None 

822 

823 @property 

824 def _writer(self) -> Optional["asyncio.Task[None]"]: 

825 return self.__writer 

826 

827 @_writer.setter 

828 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 

829 if self.__writer is not None: 

830 self.__writer.remove_done_callback(self.__reset_writer) 

831 self.__writer = writer 

832 if writer is not None: 

833 writer.add_done_callback(self.__reset_writer) 

834 

835 @reify 

836 def url(self) -> URL: 

837 return self._url 

838 

839 @reify 

840 def url_obj(self) -> URL: 

841 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2) 

842 return self._url 

843 

844 @reify 

845 def real_url(self) -> URL: 

846 return self._real_url 

847 

848 @reify 

849 def host(self) -> str: 

850 assert self._url.host is not None 

851 return self._url.host 

852 

853 @reify 

854 def headers(self) -> "CIMultiDictProxy[str]": 

855 return self._headers 

856 

857 @reify 

858 def raw_headers(self) -> RawHeaders: 

859 return self._raw_headers 

860 

861 @reify 

862 def request_info(self) -> RequestInfo: 

863 return self._request_info 

864 

865 @reify 

866 def content_disposition(self) -> Optional[ContentDisposition]: 

867 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

868 if raw is None: 

869 return None 

870 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

871 params = MappingProxyType(params_dct) 

872 filename = multipart.content_disposition_filename(params) 

873 return ContentDisposition(disposition_type, params, filename) 

874 

875 def __del__(self, _warnings: Any = warnings) -> None: 

876 if self._closed: 

877 return 

878 

879 if self._connection is not None: 

880 self._connection.release() 

881 self._cleanup_writer() 

882 

883 if self._loop.get_debug(): 

884 kwargs = {"source": self} 

885 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs) 

886 context = {"client_response": self, "message": "Unclosed response"} 

887 if self._source_traceback: 

888 context["source_traceback"] = self._source_traceback 

889 self._loop.call_exception_handler(context) 

890 

891 def __repr__(self) -> str: 

892 out = io.StringIO() 

893 ascii_encodable_url = str(self.url) 

894 if self.reason: 

895 ascii_encodable_reason = self.reason.encode( 

896 "ascii", "backslashreplace" 

897 ).decode("ascii") 

898 else: 

899 ascii_encodable_reason = "None" 

900 print( 

901 "<ClientResponse({}) [{} {}]>".format( 

902 ascii_encodable_url, self.status, ascii_encodable_reason 

903 ), 

904 file=out, 

905 ) 

906 print(self.headers, file=out) 

907 return out.getvalue() 

908 

909 @property 

910 def connection(self) -> Optional["Connection"]: 

911 return self._connection 

912 

913 @reify 

914 def history(self) -> Tuple["ClientResponse", ...]: 

915 """A sequence of of responses, if redirects occurred.""" 

916 return self._history 

917 

918 @reify 

919 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]": 

920 links_str = ", ".join(self.headers.getall("link", [])) 

921 

922 if not links_str: 

923 return MultiDictProxy(MultiDict()) 

924 

925 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict() 

926 

927 for val in re.split(r",(?=\s*<)", links_str): 

928 match = re.match(r"\s*<(.*)>(.*)", val) 

929 if match is None: # pragma: no cover 

930 # the check exists to suppress mypy error 

931 continue 

932 url, params_str = match.groups() 

933 params = params_str.split(";")[1:] 

934 

935 link: MultiDict[Union[str, URL]] = MultiDict() 

936 

937 for param in params: 

938 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

939 if match is None: # pragma: no cover 

940 # the check exists to suppress mypy error 

941 continue 

942 key, _, value, _ = match.groups() 

943 

944 link.add(key, value) 

945 

946 key = link.get("rel", url) 

947 

948 link.add("url", self.url.join(URL(url))) 

949 

950 links.add(str(key), MultiDictProxy(link)) 

951 

952 return MultiDictProxy(links) 

953 

954 async def start(self, connection: "Connection") -> "ClientResponse": 

955 """Start response processing.""" 

956 self._closed = False 

957 self._protocol = connection.protocol 

958 self._connection = connection 

959 

960 with self._timer: 

961 while True: 

962 # read response 

963 try: 

964 protocol = self._protocol 

965 message, payload = await protocol.read() # type: ignore[union-attr] 

966 except http.HttpProcessingError as exc: 

967 raise ClientResponseError( 

968 self.request_info, 

969 self.history, 

970 status=exc.code, 

971 message=exc.message, 

972 headers=exc.headers, 

973 ) from exc 

974 

975 if message.code < 100 or message.code > 199 or message.code == 101: 

976 break 

977 

978 if self._continue is not None: 

979 set_result(self._continue, True) 

980 self._continue = None 

981 

982 # payload eof handler 

983 payload.on_eof(self._response_eof) 

984 

985 # response status 

986 self.version = message.version 

987 self.status = message.code 

988 self.reason = message.reason 

989 

990 # headers 

991 self._headers = message.headers # type is CIMultiDictProxy 

992 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes] 

993 

994 # payload 

995 self.content = payload 

996 

997 # cookies 

998 for hdr in self.headers.getall(hdrs.SET_COOKIE, ()): 

999 try: 

1000 self.cookies.load(hdr) 

1001 except CookieError as exc: 

1002 client_logger.warning("Can not load response cookies: %s", exc) 

1003 return self 

1004 

1005 def _response_eof(self) -> None: 

1006 if self._closed: 

1007 return 

1008 

1009 # protocol could be None because connection could be detached 

1010 protocol = self._connection and self._connection.protocol 

1011 if protocol is not None and protocol.upgraded: 

1012 return 

1013 

1014 self._closed = True 

1015 self._cleanup_writer() 

1016 self._release_connection() 

1017 

1018 @property 

1019 def closed(self) -> bool: 

1020 return self._closed 

1021 

1022 def close(self) -> None: 

1023 if not self._released: 

1024 self._notify_content() 

1025 

1026 self._closed = True 

1027 if self._loop is None or self._loop.is_closed(): 

1028 return 

1029 

1030 self._cleanup_writer() 

1031 if self._connection is not None: 

1032 self._connection.close() 

1033 self._connection = None 

1034 

1035 def release(self) -> Any: 

1036 if not self._released: 

1037 self._notify_content() 

1038 

1039 self._closed = True 

1040 

1041 self._cleanup_writer() 

1042 self._release_connection() 

1043 return noop() 

1044 

1045 @property 

1046 def ok(self) -> bool: 

1047 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

1048 

1049 This is **not** a check for ``200 OK`` but a check that the response 

1050 status is under 400. 

1051 """ 

1052 return 400 > self.status 

1053 

1054 def raise_for_status(self) -> None: 

1055 if not self.ok: 

1056 # reason should always be not None for a started response 

1057 assert self.reason is not None 

1058 self.release() 

1059 raise ClientResponseError( 

1060 self.request_info, 

1061 self.history, 

1062 status=self.status, 

1063 message=self.reason, 

1064 headers=self.headers, 

1065 ) 

1066 

1067 def _release_connection(self) -> None: 

1068 if self._connection is not None: 

1069 if self._writer is None: 

1070 self._connection.release() 

1071 self._connection = None 

1072 else: 

1073 self._writer.add_done_callback(lambda f: self._release_connection()) 

1074 

1075 async def _wait_released(self) -> None: 

1076 if self._writer is not None: 

1077 await self._writer 

1078 self._release_connection() 

1079 

1080 def _cleanup_writer(self) -> None: 

1081 if self._writer is not None: 

1082 self._writer.cancel() 

1083 self._session = None 

1084 

1085 def _notify_content(self) -> None: 

1086 content = self.content 

1087 if content and content.exception() is None: 

1088 content.set_exception(ClientConnectionError("Connection closed")) 

1089 self._released = True 

1090 

1091 async def wait_for_close(self) -> None: 

1092 if self._writer is not None: 

1093 await self._writer 

1094 self.release() 

1095 

1096 async def read(self) -> bytes: 

1097 """Read response payload.""" 

1098 if self._body is None: 

1099 try: 

1100 self._body = await self.content.read() 

1101 for trace in self._traces: 

1102 await trace.send_response_chunk_received( 

1103 self.method, self.url, self._body 

1104 ) 

1105 except BaseException: 

1106 self.close() 

1107 raise 

1108 elif self._released: # Response explicitly released 

1109 raise ClientConnectionError("Connection closed") 

1110 

1111 protocol = self._connection and self._connection.protocol 

1112 if protocol is None or not protocol.upgraded: 

1113 await self._wait_released() # Underlying connection released 

1114 return self._body # type: ignore[no-any-return] 

1115 

1116 def get_encoding(self) -> str: 

1117 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

1118 mimetype = helpers.parse_mimetype(ctype) 

1119 

1120 encoding = mimetype.parameters.get("charset") 

1121 if encoding: 

1122 with contextlib.suppress(LookupError): 

1123 return codecs.lookup(encoding).name 

1124 

1125 if mimetype.type == "application" and ( 

1126 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

1127 ): 

1128 # RFC 7159 states that the default encoding is UTF-8. 

1129 # RFC 7483 defines application/rdap+json 

1130 return "utf-8" 

1131 

1132 if self._body is None: 

1133 raise RuntimeError( 

1134 "Cannot compute fallback encoding of a not yet read body" 

1135 ) 

1136 

1137 return self._resolve_charset(self, self._body) 

1138 

1139 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: 

1140 """Read response payload and decode.""" 

1141 if self._body is None: 

1142 await self.read() 

1143 

1144 if encoding is None: 

1145 encoding = self.get_encoding() 

1146 

1147 return self._body.decode( # type: ignore[no-any-return,union-attr] 

1148 encoding, errors=errors 

1149 ) 

1150 

1151 async def json( 

1152 self, 

1153 *, 

1154 encoding: Optional[str] = None, 

1155 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

1156 content_type: Optional[str] = "application/json", 

1157 ) -> Any: 

1158 """Read and decodes JSON response.""" 

1159 if self._body is None: 

1160 await self.read() 

1161 

1162 if content_type: 

1163 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

1164 if not _is_expected_content_type(ctype, content_type): 

1165 raise ContentTypeError( 

1166 self.request_info, 

1167 self.history, 

1168 message=( 

1169 "Attempt to decode JSON with " "unexpected mimetype: %s" % ctype 

1170 ), 

1171 headers=self.headers, 

1172 ) 

1173 

1174 stripped = self._body.strip() # type: ignore[union-attr] 

1175 if not stripped: 

1176 return None 

1177 

1178 if encoding is None: 

1179 encoding = self.get_encoding() 

1180 

1181 return loads(stripped.decode(encoding)) 

1182 

1183 async def __aenter__(self) -> "ClientResponse": 

1184 return self 

1185 

1186 async def __aexit__( 

1187 self, 

1188 exc_type: Optional[Type[BaseException]], 

1189 exc_val: Optional[BaseException], 

1190 exc_tb: Optional[TracebackType], 

1191 ) -> None: 

1192 # similar to _RequestContextManager, we do not need to check 

1193 # for exceptions, response object can close connection 

1194 # if state is broken 

1195 self.release() 

1196 await self.wait_for_close()