Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

699 statements  

1import asyncio 

2import codecs 

3import contextlib 

4import functools 

5import io 

6import re 

7import sys 

8import traceback 

9import warnings 

10from hashlib import md5, sha1, sha256 

11from http.cookies import CookieError, Morsel, SimpleCookie 

12from types import MappingProxyType, TracebackType 

13from typing import ( 

14 TYPE_CHECKING, 

15 Any, 

16 Callable, 

17 Dict, 

18 Iterable, 

19 List, 

20 Mapping, 

21 NamedTuple, 

22 Optional, 

23 Tuple, 

24 Type, 

25 Union, 

26) 

27 

28from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

29from yarl import URL 

30 

31from . import hdrs, helpers, http, multipart, payload 

32from .abc import AbstractStreamWriter 

33from .client_exceptions import ( 

34 ClientConnectionError, 

35 ClientOSError, 

36 ClientResponseError, 

37 ContentTypeError, 

38 InvalidURL, 

39 ServerFingerprintMismatch, 

40) 

41from .compression_utils import HAS_BROTLI 

42from .formdata import FormData 

43from .hdrs import CONTENT_TYPE 

44from .helpers import ( 

45 _SENTINEL, 

46 BaseTimerContext, 

47 BasicAuth, 

48 HeadersMixin, 

49 TimerNoop, 

50 basicauth_from_netrc, 

51 frozen_dataclass_decorator, 

52 is_expected_content_type, 

53 netrc_from_env, 

54 parse_mimetype, 

55 reify, 

56 set_exception, 

57 set_result, 

58) 

59from .http import ( 

60 SERVER_SOFTWARE, 

61 HttpVersion, 

62 HttpVersion10, 

63 HttpVersion11, 

64 StreamWriter, 

65) 

66from .log import client_logger 

67from .streams import StreamReader 

68from .typedefs import ( 

69 DEFAULT_JSON_DECODER, 

70 JSONDecoder, 

71 LooseCookies, 

72 LooseHeaders, 

73 Query, 

74 RawHeaders, 

75) 

76 

77if TYPE_CHECKING: 

78 import ssl 

79 from ssl import SSLContext 

80else: 

81 try: 

82 import ssl 

83 from ssl import SSLContext 

84 except ImportError: # pragma: no cover 

85 ssl = None # type: ignore[assignment] 

86 SSLContext = object # type: ignore[misc,assignment] 

87 

88 

89__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

90 

91 

92if TYPE_CHECKING: 

93 from .client import ClientSession 

94 from .connector import Connection 

95 from .tracing import Trace 

96 

97 

98_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

99 

100 

101def _gen_default_accept_encoding() -> str: 

102 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate" 

103 

104 

105@frozen_dataclass_decorator 

106class ContentDisposition: 

107 type: Optional[str] 

108 parameters: "MappingProxyType[str, str]" 

109 filename: Optional[str] 

110 

111 

112class _RequestInfo(NamedTuple): 

113 url: URL 

114 method: str 

115 headers: "CIMultiDictProxy[str]" 

116 real_url: URL 

117 

118 

119class RequestInfo(_RequestInfo): 

120 

121 def __new__( 

122 cls, 

123 url: URL, 

124 method: str, 

125 headers: "CIMultiDictProxy[str]", 

126 real_url: URL = _SENTINEL, # type: ignore[assignment] 

127 ) -> "RequestInfo": 

128 """Create a new RequestInfo instance. 

129 

130 For backwards compatibility, the real_url parameter is optional. 

131 """ 

132 return tuple.__new__( 

133 cls, (url, method, headers, url if real_url is _SENTINEL else real_url) 

134 ) 

135 

136 

137class Fingerprint: 

138 HASHFUNC_BY_DIGESTLEN = { 

139 16: md5, 

140 20: sha1, 

141 32: sha256, 

142 } 

143 

144 def __init__(self, fingerprint: bytes) -> None: 

145 digestlen = len(fingerprint) 

146 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

147 if not hashfunc: 

148 raise ValueError("fingerprint has invalid length") 

149 elif hashfunc is md5 or hashfunc is sha1: 

150 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.") 

151 self._hashfunc = hashfunc 

152 self._fingerprint = fingerprint 

153 

154 @property 

155 def fingerprint(self) -> bytes: 

156 return self._fingerprint 

157 

158 def check(self, transport: asyncio.Transport) -> None: 

159 if not transport.get_extra_info("sslcontext"): 

160 return 

161 sslobj = transport.get_extra_info("ssl_object") 

162 cert = sslobj.getpeercert(binary_form=True) 

163 got = self._hashfunc(cert).digest() 

164 if got != self._fingerprint: 

165 host, port, *_ = transport.get_extra_info("peername") 

166 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

167 

168 

169if ssl is not None: 

170 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint) 

171else: # pragma: no cover 

172 SSL_ALLOWED_TYPES = (bool,) # type: ignore[unreachable] 

173 

174 

175_SSL_SCHEMES = frozenset(("https", "wss")) 

176 

177 

178# ConnectionKey is a NamedTuple because it is used as a key in a dict 

179# and a set in the connector. Since a NamedTuple is a tuple it uses 

180# the fast native tuple __hash__ and __eq__ implementation in CPython. 

181class ConnectionKey(NamedTuple): 

182 # the key should contain an information about used proxy / TLS 

183 # to prevent reusing wrong connections from a pool 

184 host: str 

185 port: Optional[int] 

186 is_ssl: bool 

187 ssl: Union[SSLContext, bool, Fingerprint] 

188 proxy: Optional[URL] 

189 proxy_auth: Optional[BasicAuth] 

190 proxy_headers_hash: Optional[int] # hash(CIMultiDict) 

191 

192 

193class ClientRequest: 

194 GET_METHODS = { 

195 hdrs.METH_GET, 

196 hdrs.METH_HEAD, 

197 hdrs.METH_OPTIONS, 

198 hdrs.METH_TRACE, 

199 } 

200 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

201 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 

202 

203 DEFAULT_HEADERS = { 

204 hdrs.ACCEPT: "*/*", 

205 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

206 } 

207 

208 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic. 

209 body: Any = b"" 

210 auth = None 

211 response = None 

212 

213 # These class defaults help create_autospec() work correctly. 

214 # If autospec is improved in future, maybe these can be removed. 

215 url = URL() 

216 method = "GET" 

217 

218 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data 

219 _continue = None # waiter future for '100 Continue' response 

220 

221 _skip_auto_headers: Optional["CIMultiDict[None]"] = None 

222 

223 # N.B. 

224 # Adding __del__ method with self._writer closing doesn't make sense 

225 # because _writer is instance method, thus it keeps a reference to self. 

226 # Until writer has finished finalizer will not be called. 

227 

228 def __init__( 

229 self, 

230 method: str, 

231 url: URL, 

232 *, 

233 params: Query = None, 

234 headers: Optional[LooseHeaders] = None, 

235 skip_auto_headers: Optional[Iterable[str]] = None, 

236 data: Any = None, 

237 cookies: Optional[LooseCookies] = None, 

238 auth: Optional[BasicAuth] = None, 

239 version: http.HttpVersion = http.HttpVersion11, 

240 compress: Union[str, bool] = False, 

241 chunked: Optional[bool] = None, 

242 expect100: bool = False, 

243 loop: asyncio.AbstractEventLoop, 

244 response_class: Optional[Type["ClientResponse"]] = None, 

245 proxy: Optional[URL] = None, 

246 proxy_auth: Optional[BasicAuth] = None, 

247 timer: Optional[BaseTimerContext] = None, 

248 session: Optional["ClientSession"] = None, 

249 ssl: Union[SSLContext, bool, Fingerprint] = True, 

250 proxy_headers: Optional[LooseHeaders] = None, 

251 traces: Optional[List["Trace"]] = None, 

252 trust_env: bool = False, 

253 server_hostname: Optional[str] = None, 

254 ): 

255 if match := _CONTAINS_CONTROL_CHAR_RE.search(method): 

256 raise ValueError( 

257 f"Method cannot contain non-token characters {method!r} " 

258 f"(found at least {match.group()!r})" 

259 ) 

260 # URL forbids subclasses, so a simple type check is enough. 

261 assert type(url) is URL, url 

262 if proxy is not None: 

263 assert type(proxy) is URL, proxy 

264 # FIXME: session is None in tests only, need to fix tests 

265 # assert session is not None 

266 if TYPE_CHECKING: 

267 assert session is not None 

268 self._session = session 

269 if params: 

270 url = url.extend_query(params) 

271 self.original_url = url 

272 self.url = url.with_fragment(None) if url.raw_fragment else url 

273 self.method = method.upper() 

274 self.chunked = chunked 

275 self.loop = loop 

276 self.length = None 

277 if response_class is None: 

278 real_response_class = ClientResponse 

279 else: 

280 real_response_class = response_class 

281 self.response_class: Type[ClientResponse] = real_response_class 

282 self._timer = timer if timer is not None else TimerNoop() 

283 self._ssl = ssl 

284 self.server_hostname = server_hostname 

285 

286 if loop.get_debug(): 

287 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

288 

289 self.update_version(version) 

290 self.update_host(url) 

291 self.update_headers(headers) 

292 self.update_auto_headers(skip_auto_headers) 

293 self.update_cookies(cookies) 

294 self.update_content_encoding(data, compress) 

295 self.update_auth(auth, trust_env) 

296 self.update_proxy(proxy, proxy_auth, proxy_headers) 

297 

298 self.update_body_from_data(data) 

299 if data is not None or self.method not in self.GET_METHODS: 

300 self.update_transfer_encoding() 

301 self.update_expect_continue(expect100) 

302 self._traces = [] if traces is None else traces 

303 

304 def __reset_writer(self, _: object = None) -> None: 

305 self.__writer = None 

306 

307 def _get_content_length(self) -> Optional[int]: 

308 """Extract and validate Content-Length header value. 

309 

310 Returns parsed Content-Length value or None if not set. 

311 Raises ValueError if header exists but cannot be parsed as an integer. 

312 """ 

313 if hdrs.CONTENT_LENGTH not in self.headers: 

314 return None 

315 

316 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH] 

317 try: 

318 return int(content_length_hdr) 

319 except ValueError: 

320 raise ValueError( 

321 f"Invalid Content-Length header: {content_length_hdr}" 

322 ) from None 

323 

324 @property 

325 def skip_auto_headers(self) -> CIMultiDict[None]: 

326 return self._skip_auto_headers or CIMultiDict() 

327 

328 @property 

329 def _writer(self) -> Optional["asyncio.Task[None]"]: 

330 return self.__writer 

331 

332 @_writer.setter 

333 def _writer(self, writer: "asyncio.Task[None]") -> None: 

334 if self.__writer is not None: 

335 self.__writer.remove_done_callback(self.__reset_writer) 

336 self.__writer = writer 

337 writer.add_done_callback(self.__reset_writer) 

338 

339 def is_ssl(self) -> bool: 

340 return self.url.scheme in _SSL_SCHEMES 

341 

342 @property 

343 def ssl(self) -> Union["SSLContext", bool, Fingerprint]: 

344 return self._ssl 

345 

346 @property 

347 def connection_key(self) -> ConnectionKey: # type: ignore[misc] 

348 if proxy_headers := self.proxy_headers: 

349 h: Optional[int] = hash(tuple(proxy_headers.items())) 

350 else: 

351 h = None 

352 url = self.url 

353 return tuple.__new__( 

354 ConnectionKey, 

355 ( 

356 url.raw_host or "", 

357 url.port, 

358 url.scheme in _SSL_SCHEMES, 

359 self._ssl, 

360 self.proxy, 

361 self.proxy_auth, 

362 h, 

363 ), 

364 ) 

365 

366 @property 

367 def host(self) -> str: 

368 ret = self.url.raw_host 

369 assert ret is not None 

370 return ret 

371 

372 @property 

373 def port(self) -> Optional[int]: 

374 return self.url.port 

375 

376 @property 

377 def request_info(self) -> RequestInfo: 

378 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 

379 # These are created on every request, so we use a NamedTuple 

380 # for performance reasons. We don't use the RequestInfo.__new__ 

381 # method because it has a different signature which is provided 

382 # for backwards compatibility only. 

383 return tuple.__new__( 

384 RequestInfo, (self.url, self.method, headers, self.original_url) 

385 ) 

386 

387 @property 

388 def session(self) -> "ClientSession": 

389 """Return the ClientSession instance. 

390 

391 This property provides access to the ClientSession that initiated 

392 this request, allowing middleware to make additional requests 

393 using the same session. 

394 """ 

395 return self._session 

396 

397 def update_host(self, url: URL) -> None: 

398 """Update destination host, port and connection type (ssl).""" 

399 # get host/port 

400 if not url.raw_host: 

401 raise InvalidURL(url) 

402 

403 # basic auth info 

404 if url.raw_user or url.raw_password: 

405 self.auth = helpers.BasicAuth(url.user or "", url.password or "") 

406 

407 def update_version(self, version: Union[http.HttpVersion, str]) -> None: 

408 """Convert request version to two elements tuple. 

409 

410 parser HTTP version '1.1' => (1, 1) 

411 """ 

412 if isinstance(version, str): 

413 v = [part.strip() for part in version.split(".", 1)] 

414 try: 

415 version = http.HttpVersion(int(v[0]), int(v[1])) 

416 except ValueError: 

417 raise ValueError( 

418 f"Can not parse http version number: {version}" 

419 ) from None 

420 self.version = version 

421 

422 def update_headers(self, headers: Optional[LooseHeaders]) -> None: 

423 """Update request headers.""" 

424 self.headers: CIMultiDict[str] = CIMultiDict() 

425 

426 # Build the host header 

427 host = self.url.host_port_subcomponent 

428 

429 # host_port_subcomponent is None when the URL is a relative URL. 

430 # but we know we do not have a relative URL here. 

431 assert host is not None 

432 self.headers[hdrs.HOST] = host 

433 

434 if not headers: 

435 return 

436 

437 if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 

438 headers = headers.items() 

439 

440 for key, value in headers: # type: ignore[misc] 

441 # A special case for Host header 

442 if key in hdrs.HOST_ALL: 

443 self.headers[key] = value 

444 else: 

445 self.headers.add(key, value) 

446 

447 def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None: 

448 if skip_auto_headers is not None: 

449 self._skip_auto_headers = CIMultiDict( 

450 (hdr, None) for hdr in sorted(skip_auto_headers) 

451 ) 

452 used_headers = self.headers.copy() 

453 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type] 

454 else: 

455 # Fast path when there are no headers to skip 

456 # which is the most common case. 

457 used_headers = self.headers 

458 

459 for hdr, val in self.DEFAULT_HEADERS.items(): 

460 if hdr not in used_headers: 

461 self.headers[hdr] = val 

462 

463 if hdrs.USER_AGENT not in used_headers: 

464 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

465 

466 def update_cookies(self, cookies: Optional[LooseCookies]) -> None: 

467 """Update request cookies header.""" 

468 if not cookies: 

469 return 

470 

471 c = SimpleCookie() 

472 if hdrs.COOKIE in self.headers: 

473 c.load(self.headers.get(hdrs.COOKIE, "")) 

474 del self.headers[hdrs.COOKIE] 

475 

476 if isinstance(cookies, Mapping): 

477 iter_cookies = cookies.items() 

478 else: 

479 iter_cookies = cookies # type: ignore[assignment] 

480 for name, value in iter_cookies: 

481 if isinstance(value, Morsel): 

482 # Preserve coded_value 

483 mrsl_val = value.get(value.key, Morsel()) 

484 mrsl_val.set(value.key, value.value, value.coded_value) 

485 c[name] = mrsl_val 

486 else: 

487 c[name] = value # type: ignore[assignment] 

488 

489 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

490 

491 def update_content_encoding(self, data: Any, compress: Union[bool, str]) -> None: 

492 """Set request content encoding.""" 

493 self.compress = None 

494 if not data: 

495 return 

496 

497 if self.headers.get(hdrs.CONTENT_ENCODING): 

498 if compress: 

499 raise ValueError( 

500 "compress can not be set if Content-Encoding header is set" 

501 ) 

502 elif compress: 

503 self.compress = compress if isinstance(compress, str) else "deflate" 

504 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

505 self.chunked = True # enable chunked, no need to deal with length 

506 

507 def update_transfer_encoding(self) -> None: 

508 """Analyze transfer-encoding header.""" 

509 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

510 

511 if "chunked" in te: 

512 if self.chunked: 

513 raise ValueError( 

514 "chunked can not be set " 

515 'if "Transfer-Encoding: chunked" header is set' 

516 ) 

517 

518 elif self.chunked: 

519 if hdrs.CONTENT_LENGTH in self.headers: 

520 raise ValueError( 

521 "chunked can not be set if Content-Length header is set" 

522 ) 

523 

524 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

525 else: 

526 if hdrs.CONTENT_LENGTH not in self.headers: 

527 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body)) 

528 

529 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None: 

530 """Set basic auth.""" 

531 if auth is None: 

532 auth = self.auth 

533 if auth is None and trust_env and self.url.host is not None: 

534 netrc_obj = netrc_from_env() 

535 with contextlib.suppress(LookupError): 

536 auth = basicauth_from_netrc(netrc_obj, self.url.host) 

537 if auth is None: 

538 return 

539 

540 if not isinstance(auth, helpers.BasicAuth): 

541 raise TypeError("BasicAuth() tuple is required instead") 

542 

543 self.headers[hdrs.AUTHORIZATION] = auth.encode() 

544 

545 def update_body_from_data(self, body: Any) -> None: 

546 if body is None: 

547 return 

548 

549 # FormData 

550 if isinstance(body, FormData): 

551 body = body() 

552 

553 try: 

554 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None) 

555 except payload.LookupError: 

556 boundary = None 

557 if CONTENT_TYPE in self.headers: 

558 boundary = parse_mimetype(self.headers[CONTENT_TYPE]).parameters.get( 

559 "boundary" 

560 ) 

561 body = FormData(body, boundary=boundary)() 

562 

563 self.body = body 

564 

565 # enable chunked encoding if needed 

566 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers: 

567 if (size := body.size) is not None: 

568 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

569 else: 

570 self.chunked = True 

571 

572 # copy payload headers 

573 assert body.headers 

574 headers = self.headers 

575 skip_headers = self._skip_auto_headers 

576 for key, value in body.headers.items(): 

577 if key in headers or (skip_headers is not None and key in skip_headers): 

578 continue 

579 headers[key] = value 

580 

581 def update_expect_continue(self, expect: bool = False) -> None: 

582 if expect: 

583 self.headers[hdrs.EXPECT] = "100-continue" 

584 elif ( 

585 hdrs.EXPECT in self.headers 

586 and self.headers[hdrs.EXPECT].lower() == "100-continue" 

587 ): 

588 expect = True 

589 

590 if expect: 

591 self._continue = self.loop.create_future() 

592 

593 def update_proxy( 

594 self, 

595 proxy: Optional[URL], 

596 proxy_auth: Optional[BasicAuth], 

597 proxy_headers: Optional[LooseHeaders], 

598 ) -> None: 

599 self.proxy = proxy 

600 if proxy is None: 

601 self.proxy_auth = None 

602 self.proxy_headers = None 

603 return 

604 

605 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 

606 raise ValueError("proxy_auth must be None or BasicAuth() tuple") 

607 self.proxy_auth = proxy_auth 

608 

609 if proxy_headers is not None and not isinstance( 

610 proxy_headers, (MultiDict, MultiDictProxy) 

611 ): 

612 proxy_headers = CIMultiDict(proxy_headers) 

613 self.proxy_headers = proxy_headers 

614 

615 async def write_bytes( 

616 self, 

617 writer: AbstractStreamWriter, 

618 conn: "Connection", 

619 content_length: Optional[int], 

620 ) -> None: 

621 """ 

622 Write the request body to the connection stream. 

623 

624 This method handles writing different types of request bodies: 

625 1. Payload objects (using their specialized write_with_length method) 

626 2. Bytes/bytearray objects 

627 3. Iterable body content 

628 

629 Args: 

630 writer: The stream writer to write the body to 

631 conn: The connection being used for this request 

632 content_length: Optional maximum number of bytes to write from the body 

633 (None means write the entire body) 

634 

635 The method properly handles: 

636 - Waiting for 100-Continue responses if required 

637 - Content length constraints for chunked encoding 

638 - Error handling for network issues, cancellation, and other exceptions 

639 - Signaling EOF and timeout management 

640 

641 Raises: 

642 ClientOSError: When there's an OS-level error writing the body 

643 ClientConnectionError: When there's a general connection error 

644 asyncio.CancelledError: When the operation is cancelled 

645 

646 """ 

647 # 100 response 

648 if self._continue is not None: 

649 await writer.drain() 

650 await self._continue 

651 

652 protocol = conn.protocol 

653 assert protocol is not None 

654 try: 

655 if isinstance(self.body, payload.Payload): 

656 # Specialized handling for Payload objects that know how to write themselves 

657 await self.body.write_with_length(writer, content_length) 

658 else: 

659 # Handle bytes/bytearray by converting to an iterable for consistent handling 

660 if isinstance(self.body, (bytes, bytearray)): 

661 self.body = (self.body,) 

662 

663 if content_length is None: 

664 # Write the entire body without length constraint 

665 for chunk in self.body: 

666 await writer.write(chunk) 

667 else: 

668 # Write with length constraint, respecting content_length limit 

669 # If the body is larger than content_length, we truncate it 

670 remaining_bytes = content_length 

671 for chunk in self.body: 

672 await writer.write(chunk[:remaining_bytes]) 

673 remaining_bytes -= len(chunk) 

674 if remaining_bytes <= 0: 

675 break 

676 except OSError as underlying_exc: 

677 reraised_exc = underlying_exc 

678 

679 # Distinguish between timeout and other OS errors for better error reporting 

680 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance( 

681 underlying_exc, asyncio.TimeoutError 

682 ) 

683 if exc_is_not_timeout: 

684 reraised_exc = ClientOSError( 

685 underlying_exc.errno, 

686 f"Can not write request body for {self.url !s}", 

687 ) 

688 

689 set_exception(protocol, reraised_exc, underlying_exc) 

690 except asyncio.CancelledError: 

691 # Body hasn't been fully sent, so connection can't be reused 

692 conn.close() 

693 raise 

694 except Exception as underlying_exc: 

695 set_exception( 

696 protocol, 

697 ClientConnectionError( 

698 "Failed to send bytes into the underlying connection " 

699 f"{conn !s}: {underlying_exc!r}", 

700 ), 

701 underlying_exc, 

702 ) 

703 else: 

704 # Successfully wrote the body, signal EOF and start response timeout 

705 await writer.write_eof() 

706 protocol.start_timeout() 

707 

708 async def send(self, conn: "Connection") -> "ClientResponse": 

709 # Specify request target: 

710 # - CONNECT request must send authority form URI 

711 # - not CONNECT proxy must send absolute form URI 

712 # - most common is origin form URI 

713 if self.method == hdrs.METH_CONNECT: 

714 connect_host = self.url.host_subcomponent 

715 assert connect_host is not None 

716 path = f"{connect_host}:{self.url.port}" 

717 elif self.proxy and not self.is_ssl(): 

718 path = str(self.url) 

719 else: 

720 path = self.url.raw_path_qs 

721 

722 protocol = conn.protocol 

723 assert protocol is not None 

724 writer = StreamWriter( 

725 protocol, 

726 self.loop, 

727 on_chunk_sent=( 

728 functools.partial(self._on_chunk_request_sent, self.method, self.url) 

729 if self._traces 

730 else None 

731 ), 

732 on_headers_sent=( 

733 functools.partial(self._on_headers_request_sent, self.method, self.url) 

734 if self._traces 

735 else None 

736 ), 

737 ) 

738 

739 if self.compress: 

740 writer.enable_compression(self.compress) 

741 

742 if self.chunked is not None: 

743 writer.enable_chunking() 

744 

745 # set default content-type 

746 if ( 

747 self.method in self.POST_METHODS 

748 and ( 

749 self._skip_auto_headers is None 

750 or hdrs.CONTENT_TYPE not in self._skip_auto_headers 

751 ) 

752 and hdrs.CONTENT_TYPE not in self.headers 

753 ): 

754 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

755 

756 v = self.version 

757 if hdrs.CONNECTION not in self.headers: 

758 if conn._connector.force_close: 

759 if v == HttpVersion11: 

760 self.headers[hdrs.CONNECTION] = "close" 

761 elif v == HttpVersion10: 

762 self.headers[hdrs.CONNECTION] = "keep-alive" 

763 

764 # status + headers 

765 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}" 

766 await writer.write_headers(status_line, self.headers) 

767 task: Optional["asyncio.Task[None]"] 

768 if self.body or self._continue is not None or protocol.writing_paused: 

769 coro = self.write_bytes(writer, conn, self._get_content_length()) 

770 if sys.version_info >= (3, 12): 

771 # Optimization for Python 3.12, try to write 

772 # bytes immediately to avoid having to schedule 

773 # the task on the event loop. 

774 task = asyncio.Task(coro, loop=self.loop, eager_start=True) 

775 else: 

776 task = self.loop.create_task(coro) 

777 if task.done(): 

778 task = None 

779 else: 

780 self._writer = task 

781 else: 

782 # We have nothing to write because 

783 # - there is no body 

784 # - the protocol does not have writing paused 

785 # - we are not waiting for a 100-continue response 

786 protocol.start_timeout() 

787 writer.set_eof() 

788 task = None 

789 response_class = self.response_class 

790 assert response_class is not None 

791 self.response = response_class( 

792 self.method, 

793 self.original_url, 

794 writer=task, 

795 continue100=self._continue, 

796 timer=self._timer, 

797 request_info=self.request_info, 

798 traces=self._traces, 

799 loop=self.loop, 

800 session=self._session, 

801 ) 

802 return self.response 

803 

804 async def close(self) -> None: 

805 if self.__writer is not None: 

806 try: 

807 await self.__writer 

808 except asyncio.CancelledError: 

809 if ( 

810 sys.version_info >= (3, 11) 

811 and (task := asyncio.current_task()) 

812 and task.cancelling() 

813 ): 

814 raise 

815 

816 def terminate(self) -> None: 

817 if self.__writer is not None: 

818 if not self.loop.is_closed(): 

819 self.__writer.cancel() 

820 self.__writer.remove_done_callback(self.__reset_writer) 

821 self.__writer = None 

822 

823 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

824 for trace in self._traces: 

825 await trace.send_request_chunk_sent(method, url, chunk) 

826 

827 async def _on_headers_request_sent( 

828 self, method: str, url: URL, headers: "CIMultiDict[str]" 

829 ) -> None: 

830 for trace in self._traces: 

831 await trace.send_request_headers(method, url, headers) 

832 

833 

834_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 

835 

836 

837class ClientResponse(HeadersMixin): 

838 # Some of these attributes are None when created, 

839 # but will be set by the start() method. 

840 # As the end user will likely never see the None values, we cheat the types below. 

841 # from the Status-Line of the response 

842 version: Optional[HttpVersion] = None # HTTP-Version 

843 status: int = None # type: ignore[assignment] # Status-Code 

844 reason: Optional[str] = None # Reason-Phrase 

845 

846 content: StreamReader = None # type: ignore[assignment] # Payload stream 

847 _body: Optional[bytes] = None 

848 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment] 

849 _history: Tuple["ClientResponse", ...] = () 

850 _raw_headers: RawHeaders = None # type: ignore[assignment] 

851 

852 _connection: Optional["Connection"] = None # current connection 

853 _cookies: Optional[SimpleCookie] = None 

854 _continue: Optional["asyncio.Future[bool]"] = None 

855 _source_traceback: Optional[traceback.StackSummary] = None 

856 _session: Optional["ClientSession"] = None 

857 # set up by ClientRequest after ClientResponse object creation 

858 # post-init stage allows to not change ctor signature 

859 _closed = True # to allow __del__ for non-initialized properly response 

860 _released = False 

861 _in_context = False 

862 

863 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8" 

864 

865 __writer: Optional["asyncio.Task[None]"] = None 

866 

867 def __init__( 

868 self, 

869 method: str, 

870 url: URL, 

871 *, 

872 writer: "Optional[asyncio.Task[None]]", 

873 continue100: Optional["asyncio.Future[bool]"], 

874 timer: Optional[BaseTimerContext], 

875 request_info: RequestInfo, 

876 traces: List["Trace"], 

877 loop: asyncio.AbstractEventLoop, 

878 session: "ClientSession", 

879 ) -> None: 

880 # URL forbids subclasses, so a simple type check is enough. 

881 assert type(url) is URL 

882 

883 self.method = method 

884 

885 self._real_url = url 

886 self._url = url.with_fragment(None) if url.raw_fragment else url 

887 if writer is not None: 

888 self._writer = writer 

889 if continue100 is not None: 

890 self._continue = continue100 

891 self._request_info = request_info 

892 self._timer = timer if timer is not None else TimerNoop() 

893 self._cache: Dict[str, Any] = {} 

894 self._traces = traces 

895 self._loop = loop 

896 # Save reference to _resolve_charset, so that get_encoding() will still 

897 # work after the response has finished reading the body. 

898 # TODO: Fix session=None in tests (see ClientRequest.__init__). 

899 if session is not None: 

900 # store a reference to session #1985 

901 self._session = session 

902 self._resolve_charset = session._resolve_charset 

903 if loop.get_debug(): 

904 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

905 

906 def __reset_writer(self, _: object = None) -> None: 

907 self.__writer = None 

908 

909 @property 

910 def _writer(self) -> Optional["asyncio.Task[None]"]: 

911 """The writer task for streaming data. 

912 

913 _writer is only provided for backwards compatibility 

914 for subclasses that may need to access it. 

915 """ 

916 return self.__writer 

917 

918 @_writer.setter 

919 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 

920 """Set the writer task for streaming data.""" 

921 if self.__writer is not None: 

922 self.__writer.remove_done_callback(self.__reset_writer) 

923 self.__writer = writer 

924 if writer is None: 

925 return 

926 if writer.done(): 

927 # The writer is already done, so we can clear it immediately. 

928 self.__writer = None 

929 else: 

930 writer.add_done_callback(self.__reset_writer) 

931 

932 @property 

933 def cookies(self) -> SimpleCookie: 

934 if self._cookies is None: 

935 self._cookies = SimpleCookie() 

936 return self._cookies 

937 

938 @cookies.setter 

939 def cookies(self, cookies: SimpleCookie) -> None: 

940 self._cookies = cookies 

941 

942 @reify 

943 def url(self) -> URL: 

944 return self._url 

945 

946 @reify 

947 def real_url(self) -> URL: 

948 return self._real_url 

949 

950 @reify 

951 def host(self) -> str: 

952 assert self._url.host is not None 

953 return self._url.host 

954 

955 @reify 

956 def headers(self) -> "CIMultiDictProxy[str]": 

957 return self._headers 

958 

959 @reify 

960 def raw_headers(self) -> RawHeaders: 

961 return self._raw_headers 

962 

963 @reify 

964 def request_info(self) -> RequestInfo: 

965 return self._request_info 

966 

967 @reify 

968 def content_disposition(self) -> Optional[ContentDisposition]: 

969 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

970 if raw is None: 

971 return None 

972 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

973 params = MappingProxyType(params_dct) 

974 filename = multipart.content_disposition_filename(params) 

975 return ContentDisposition(disposition_type, params, filename) 

976 

977 def __del__(self, _warnings: Any = warnings) -> None: 

978 if self._closed: 

979 return 

980 

981 if self._connection is not None: 

982 self._connection.release() 

983 self._cleanup_writer() 

984 

985 if self._loop.get_debug(): 

986 _warnings.warn( 

987 f"Unclosed response {self!r}", ResourceWarning, source=self 

988 ) 

989 context = {"client_response": self, "message": "Unclosed response"} 

990 if self._source_traceback: 

991 context["source_traceback"] = self._source_traceback 

992 self._loop.call_exception_handler(context) 

993 

994 def __repr__(self) -> str: 

995 out = io.StringIO() 

996 ascii_encodable_url = str(self.url) 

997 if self.reason: 

998 ascii_encodable_reason = self.reason.encode( 

999 "ascii", "backslashreplace" 

1000 ).decode("ascii") 

1001 else: 

1002 ascii_encodable_reason = "None" 

1003 print( 

1004 "<ClientResponse({}) [{} {}]>".format( 

1005 ascii_encodable_url, self.status, ascii_encodable_reason 

1006 ), 

1007 file=out, 

1008 ) 

1009 print(self.headers, file=out) 

1010 return out.getvalue() 

1011 

1012 @property 

1013 def connection(self) -> Optional["Connection"]: 

1014 return self._connection 

1015 

1016 @reify 

1017 def history(self) -> Tuple["ClientResponse", ...]: 

1018 """A sequence of responses, if redirects occurred.""" 

1019 return self._history 

1020 

1021 @reify 

1022 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]": 

1023 links_str = ", ".join(self.headers.getall("link", [])) 

1024 

1025 if not links_str: 

1026 return MultiDictProxy(MultiDict()) 

1027 

1028 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict() 

1029 

1030 for val in re.split(r",(?=\s*<)", links_str): 

1031 match = re.match(r"\s*<(.*)>(.*)", val) 

1032 if match is None: # Malformed link 

1033 continue 

1034 url, params_str = match.groups() 

1035 params = params_str.split(";")[1:] 

1036 

1037 link: MultiDict[Union[str, URL]] = MultiDict() 

1038 

1039 for param in params: 

1040 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

1041 if match is None: # Malformed param 

1042 continue 

1043 key, _, value, _ = match.groups() 

1044 

1045 link.add(key, value) 

1046 

1047 key = link.get("rel", url) 

1048 

1049 link.add("url", self.url.join(URL(url))) 

1050 

1051 links.add(str(key), MultiDictProxy(link)) 

1052 

1053 return MultiDictProxy(links) 

1054 

1055 async def start(self, connection: "Connection") -> "ClientResponse": 

1056 """Start response processing.""" 

1057 self._closed = False 

1058 self._protocol = connection.protocol 

1059 self._connection = connection 

1060 

1061 with self._timer: 

1062 while True: 

1063 # read response 

1064 try: 

1065 protocol = self._protocol 

1066 message, payload = await protocol.read() # type: ignore[union-attr] 

1067 except http.HttpProcessingError as exc: 

1068 raise ClientResponseError( 

1069 self.request_info, 

1070 self.history, 

1071 status=exc.code, 

1072 message=exc.message, 

1073 headers=exc.headers, 

1074 ) from exc 

1075 

1076 if message.code < 100 or message.code > 199 or message.code == 101: 

1077 break 

1078 

1079 if self._continue is not None: 

1080 set_result(self._continue, True) 

1081 self._continue = None 

1082 

1083 # payload eof handler 

1084 payload.on_eof(self._response_eof) 

1085 

1086 # response status 

1087 self.version = message.version 

1088 self.status = message.code 

1089 self.reason = message.reason 

1090 

1091 # headers 

1092 self._headers = message.headers # type is CIMultiDictProxy 

1093 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes] 

1094 

1095 # payload 

1096 self.content = payload 

1097 

1098 # cookies 

1099 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()): 

1100 cookies = SimpleCookie() 

1101 for hdr in cookie_hdrs: 

1102 try: 

1103 cookies.load(hdr) 

1104 except CookieError as exc: 

1105 client_logger.warning("Can not load response cookies: %s", exc) 

1106 self._cookies = cookies 

1107 return self 

1108 

1109 def _response_eof(self) -> None: 

1110 if self._closed: 

1111 return 

1112 

1113 # protocol could be None because connection could be detached 

1114 protocol = self._connection and self._connection.protocol 

1115 if protocol is not None and protocol.upgraded: 

1116 return 

1117 

1118 self._closed = True 

1119 self._cleanup_writer() 

1120 self._release_connection() 

1121 

1122 @property 

1123 def closed(self) -> bool: 

1124 return self._closed 

1125 

1126 def close(self) -> None: 

1127 if not self._released: 

1128 self._notify_content() 

1129 

1130 self._closed = True 

1131 if self._loop.is_closed(): 

1132 return 

1133 

1134 self._cleanup_writer() 

1135 if self._connection is not None: 

1136 self._connection.close() 

1137 self._connection = None 

1138 

1139 def release(self) -> None: 

1140 if not self._released: 

1141 self._notify_content() 

1142 

1143 self._closed = True 

1144 

1145 self._cleanup_writer() 

1146 self._release_connection() 

1147 

1148 @property 

1149 def ok(self) -> bool: 

1150 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

1151 

1152 This is **not** a check for ``200 OK`` but a check that the response 

1153 status is under 400. 

1154 """ 

1155 return 400 > self.status 

1156 

1157 def raise_for_status(self) -> None: 

1158 if not self.ok: 

1159 # reason should always be not None for a started response 

1160 assert self.reason is not None 

1161 

1162 # If we're in a context we can rely on __aexit__() to release as the 

1163 # exception propagates. 

1164 if not self._in_context: 

1165 self.release() 

1166 

1167 raise ClientResponseError( 

1168 self.request_info, 

1169 self.history, 

1170 status=self.status, 

1171 message=self.reason, 

1172 headers=self.headers, 

1173 ) 

1174 

1175 def _release_connection(self) -> None: 

1176 if self._connection is not None: 

1177 if self.__writer is None: 

1178 self._connection.release() 

1179 self._connection = None 

1180 else: 

1181 self.__writer.add_done_callback(lambda f: self._release_connection()) 

1182 

1183 async def _wait_released(self) -> None: 

1184 if self.__writer is not None: 

1185 try: 

1186 await self.__writer 

1187 except asyncio.CancelledError: 

1188 if ( 

1189 sys.version_info >= (3, 11) 

1190 and (task := asyncio.current_task()) 

1191 and task.cancelling() 

1192 ): 

1193 raise 

1194 self._release_connection() 

1195 

1196 def _cleanup_writer(self) -> None: 

1197 if self.__writer is not None: 

1198 self.__writer.cancel() 

1199 self._session = None 

1200 

1201 def _notify_content(self) -> None: 

1202 content = self.content 

1203 # content can be None here, but the types are cheated elsewhere. 

1204 if content and content.exception() is None: # type: ignore[truthy-bool] 

1205 set_exception(content, _CONNECTION_CLOSED_EXCEPTION) 

1206 self._released = True 

1207 

1208 async def wait_for_close(self) -> None: 

1209 if self.__writer is not None: 

1210 try: 

1211 await self.__writer 

1212 except asyncio.CancelledError: 

1213 if ( 

1214 sys.version_info >= (3, 11) 

1215 and (task := asyncio.current_task()) 

1216 and task.cancelling() 

1217 ): 

1218 raise 

1219 self.release() 

1220 

1221 async def read(self) -> bytes: 

1222 """Read response payload.""" 

1223 if self._body is None: 

1224 try: 

1225 self._body = await self.content.read() 

1226 for trace in self._traces: 

1227 await trace.send_response_chunk_received( 

1228 self.method, self.url, self._body 

1229 ) 

1230 except BaseException: 

1231 self.close() 

1232 raise 

1233 elif self._released: # Response explicitly released 

1234 raise ClientConnectionError("Connection closed") 

1235 

1236 protocol = self._connection and self._connection.protocol 

1237 if protocol is None or not protocol.upgraded: 

1238 await self._wait_released() # Underlying connection released 

1239 return self._body 

1240 

1241 def get_encoding(self) -> str: 

1242 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

1243 mimetype = helpers.parse_mimetype(ctype) 

1244 

1245 encoding = mimetype.parameters.get("charset") 

1246 if encoding: 

1247 with contextlib.suppress(LookupError, ValueError): 

1248 return codecs.lookup(encoding).name 

1249 

1250 if mimetype.type == "application" and ( 

1251 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

1252 ): 

1253 # RFC 7159 states that the default encoding is UTF-8. 

1254 # RFC 7483 defines application/rdap+json 

1255 return "utf-8" 

1256 

1257 if self._body is None: 

1258 raise RuntimeError( 

1259 "Cannot compute fallback encoding of a not yet read body" 

1260 ) 

1261 

1262 return self._resolve_charset(self, self._body) 

1263 

1264 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: 

1265 """Read response payload and decode.""" 

1266 await self.read() 

1267 

1268 if encoding is None: 

1269 encoding = self.get_encoding() 

1270 

1271 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr] 

1272 

1273 async def json( 

1274 self, 

1275 *, 

1276 encoding: Optional[str] = None, 

1277 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

1278 content_type: Optional[str] = "application/json", 

1279 ) -> Any: 

1280 """Read and decodes JSON response.""" 

1281 await self.read() 

1282 

1283 if content_type: 

1284 if not is_expected_content_type(self.content_type, content_type): 

1285 raise ContentTypeError( 

1286 self.request_info, 

1287 self.history, 

1288 status=self.status, 

1289 message=( 

1290 "Attempt to decode JSON with " 

1291 "unexpected mimetype: %s" % self.content_type 

1292 ), 

1293 headers=self.headers, 

1294 ) 

1295 

1296 if encoding is None: 

1297 encoding = self.get_encoding() 

1298 

1299 return loads(self._body.decode(encoding)) # type: ignore[union-attr] 

1300 

1301 async def __aenter__(self) -> "ClientResponse": 

1302 self._in_context = True 

1303 return self 

1304 

1305 async def __aexit__( 

1306 self, 

1307 exc_type: Optional[Type[BaseException]], 

1308 exc_val: Optional[BaseException], 

1309 exc_tb: Optional[TracebackType], 

1310 ) -> None: 

1311 self._in_context = False 

1312 # similar to _RequestContextManager, we do not need to check 

1313 # for exceptions, response object can close connection 

1314 # if state is broken 

1315 self.release() 

1316 await self.wait_for_close()