Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/client_reqrep.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

756 statements  

1import asyncio 

2import codecs 

3import contextlib 

4import functools 

5import io 

6import re 

7import sys 

8import traceback 

9import warnings 

10from collections.abc import Mapping 

11from hashlib import md5, sha1, sha256 

12from http.cookies import Morsel, SimpleCookie 

13from types import MappingProxyType, TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Callable, 

18 Dict, 

19 Iterable, 

20 List, 

21 Literal, 

22 NamedTuple, 

23 Optional, 

24 Tuple, 

25 Type, 

26 Union, 

27) 

28 

29import attr 

30from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

31from yarl import URL 

32 

33from . import hdrs, helpers, http, multipart, payload 

34from ._cookie_helpers import ( 

35 parse_cookie_header, 

36 parse_set_cookie_headers, 

37 preserve_morsel_with_coded_value, 

38) 

39from .abc import AbstractStreamWriter 

40from .client_exceptions import ( 

41 ClientConnectionError, 

42 ClientOSError, 

43 ClientResponseError, 

44 ContentTypeError, 

45 InvalidURL, 

46 ServerFingerprintMismatch, 

47) 

48from .compression_utils import HAS_BROTLI, HAS_ZSTD 

49from .formdata import FormData 

50from .helpers import ( 

51 _SENTINEL, 

52 BaseTimerContext, 

53 BasicAuth, 

54 HeadersMixin, 

55 TimerNoop, 

56 basicauth_from_netrc, 

57 netrc_from_env, 

58 noop, 

59 reify, 

60 sentinel, 

61 set_exception, 

62 set_result, 

63) 

64from .http import ( 

65 SERVER_SOFTWARE, 

66 HttpVersion, 

67 HttpVersion10, 

68 HttpVersion11, 

69 StreamWriter, 

70) 

71from .streams import StreamReader 

72from .typedefs import ( 

73 DEFAULT_JSON_DECODER, 

74 JSONDecoder, 

75 LooseCookies, 

76 LooseHeaders, 

77 Query, 

78 RawHeaders, 

79) 

80 

81if TYPE_CHECKING: 

82 import ssl 

83 from ssl import SSLContext 

84else: 

85 try: 

86 import ssl 

87 from ssl import SSLContext 

88 except ImportError: # pragma: no cover 

89 ssl = None # type: ignore[assignment] 

90 SSLContext = object # type: ignore[misc,assignment] 

91 

92 

93__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

94 

95 

96if TYPE_CHECKING: 

97 from .client import ClientSession 

98 from .connector import Connection 

99 from .tracing import Trace 

100 

101 

102_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 

103_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

104json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json") 

105 

106 

107def _gen_default_accept_encoding() -> str: 

108 encodings = [ 

109 "gzip", 

110 "deflate", 

111 ] 

112 if HAS_BROTLI: 

113 encodings.append("br") 

114 if HAS_ZSTD: 

115 encodings.append("zstd") 

116 return ", ".join(encodings) 

117 

118 

119@attr.s(auto_attribs=True, frozen=True, slots=True) 

120class ContentDisposition: 

121 type: Optional[str] 

122 parameters: "MappingProxyType[str, str]" 

123 filename: Optional[str] 

124 

125 

126class _RequestInfo(NamedTuple): 

127 url: URL 

128 method: str 

129 headers: "CIMultiDictProxy[str]" 

130 real_url: URL 

131 

132 

133class RequestInfo(_RequestInfo): 

134 

135 def __new__( 

136 cls, 

137 url: URL, 

138 method: str, 

139 headers: "CIMultiDictProxy[str]", 

140 real_url: Union[URL, _SENTINEL] = sentinel, 

141 ) -> "RequestInfo": 

142 """Create a new RequestInfo instance. 

143 

144 For backwards compatibility, the real_url parameter is optional. 

145 """ 

146 return tuple.__new__( 

147 cls, (url, method, headers, url if real_url is sentinel else real_url) 

148 ) 

149 

150 

151class Fingerprint: 

152 HASHFUNC_BY_DIGESTLEN = { 

153 16: md5, 

154 20: sha1, 

155 32: sha256, 

156 } 

157 

158 def __init__(self, fingerprint: bytes) -> None: 

159 digestlen = len(fingerprint) 

160 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

161 if not hashfunc: 

162 raise ValueError("fingerprint has invalid length") 

163 elif hashfunc is md5 or hashfunc is sha1: 

164 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.") 

165 self._hashfunc = hashfunc 

166 self._fingerprint = fingerprint 

167 

168 @property 

169 def fingerprint(self) -> bytes: 

170 return self._fingerprint 

171 

172 def check(self, transport: asyncio.Transport) -> None: 

173 if not transport.get_extra_info("sslcontext"): 

174 return 

175 sslobj = transport.get_extra_info("ssl_object") 

176 cert = sslobj.getpeercert(binary_form=True) 

177 got = self._hashfunc(cert).digest() 

178 if got != self._fingerprint: 

179 host, port, *_ = transport.get_extra_info("peername") 

180 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

181 

182 

183if ssl is not None: 

184 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None)) 

185else: # pragma: no cover 

186 SSL_ALLOWED_TYPES = (bool, type(None)) 

187 

188 

189def _merge_ssl_params( 

190 ssl: Union["SSLContext", bool, Fingerprint], 

191 verify_ssl: Optional[bool], 

192 ssl_context: Optional["SSLContext"], 

193 fingerprint: Optional[bytes], 

194) -> Union["SSLContext", bool, Fingerprint]: 

195 if ssl is None: 

196 ssl = True # Double check for backwards compatibility 

197 if verify_ssl is not None and not verify_ssl: 

198 warnings.warn( 

199 "verify_ssl is deprecated, use ssl=False instead", 

200 DeprecationWarning, 

201 stacklevel=3, 

202 ) 

203 if ssl is not True: 

204 raise ValueError( 

205 "verify_ssl, ssl_context, fingerprint and ssl " 

206 "parameters are mutually exclusive" 

207 ) 

208 else: 

209 ssl = False 

210 if ssl_context is not None: 

211 warnings.warn( 

212 "ssl_context is deprecated, use ssl=context instead", 

213 DeprecationWarning, 

214 stacklevel=3, 

215 ) 

216 if ssl is not True: 

217 raise ValueError( 

218 "verify_ssl, ssl_context, fingerprint and ssl " 

219 "parameters are mutually exclusive" 

220 ) 

221 else: 

222 ssl = ssl_context 

223 if fingerprint is not None: 

224 warnings.warn( 

225 "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead", 

226 DeprecationWarning, 

227 stacklevel=3, 

228 ) 

229 if ssl is not True: 

230 raise ValueError( 

231 "verify_ssl, ssl_context, fingerprint and ssl " 

232 "parameters are mutually exclusive" 

233 ) 

234 else: 

235 ssl = Fingerprint(fingerprint) 

236 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

237 raise TypeError( 

238 "ssl should be SSLContext, bool, Fingerprint or None, " 

239 "got {!r} instead.".format(ssl) 

240 ) 

241 return ssl 

242 

243 

244_SSL_SCHEMES = frozenset(("https", "wss")) 

245 

246 

247# ConnectionKey is a NamedTuple because it is used as a key in a dict 

248# and a set in the connector. Since a NamedTuple is a tuple it uses 

249# the fast native tuple __hash__ and __eq__ implementation in CPython. 

250class ConnectionKey(NamedTuple): 

251 # the key should contain an information about used proxy / TLS 

252 # to prevent reusing wrong connections from a pool 

253 host: str 

254 port: Optional[int] 

255 is_ssl: bool 

256 ssl: Union[SSLContext, bool, Fingerprint] 

257 proxy: Optional[URL] 

258 proxy_auth: Optional[BasicAuth] 

259 proxy_headers_hash: Optional[int] # hash(CIMultiDict) 

260 

261 

262def _is_expected_content_type( 

263 response_content_type: str, expected_content_type: str 

264) -> bool: 

265 if expected_content_type == "application/json": 

266 return json_re.match(response_content_type) is not None 

267 return expected_content_type in response_content_type 

268 

269 

270def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None: 

271 """Warn if the payload is not closed. 

272 

273 Callers must check that the body is a Payload before calling this method. 

274 

275 Args: 

276 payload: The payload to check 

277 stacklevel: Stack level for the warning (default 2 for direct callers) 

278 """ 

279 if not payload.autoclose and not payload.consumed: 

280 warnings.warn( 

281 "The previous request body contains unclosed resources. " 

282 "Use await request.update_body() instead of setting request.body " 

283 "directly to properly close resources and avoid leaks.", 

284 ResourceWarning, 

285 stacklevel=stacklevel, 

286 ) 

287 

288 

289class ClientResponse(HeadersMixin): 

290 

291 # Some of these attributes are None when created, 

292 # but will be set by the start() method. 

293 # As the end user will likely never see the None values, we cheat the types below. 

294 # from the Status-Line of the response 

295 version: Optional[HttpVersion] = None # HTTP-Version 

296 status: int = None # type: ignore[assignment] # Status-Code 

297 reason: Optional[str] = None # Reason-Phrase 

298 

299 content: StreamReader = None # type: ignore[assignment] # Payload stream 

300 _body: Optional[bytes] = None 

301 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment] 

302 _history: Tuple["ClientResponse", ...] = () 

303 _raw_headers: RawHeaders = None # type: ignore[assignment] 

304 

305 _connection: Optional["Connection"] = None # current connection 

306 _cookies: Optional[SimpleCookie] = None 

307 _raw_cookie_headers: Optional[Tuple[str, ...]] = None 

308 _continue: Optional["asyncio.Future[bool]"] = None 

309 _source_traceback: Optional[traceback.StackSummary] = None 

310 _session: Optional["ClientSession"] = None 

311 # set up by ClientRequest after ClientResponse object creation 

312 # post-init stage allows to not change ctor signature 

313 _closed = True # to allow __del__ for non-initialized properly response 

314 _released = False 

315 _in_context = False 

316 

317 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8" 

318 

319 __writer: Optional["asyncio.Task[None]"] = None 

320 

321 def __init__( 

322 self, 

323 method: str, 

324 url: URL, 

325 *, 

326 writer: "Optional[asyncio.Task[None]]", 

327 continue100: Optional["asyncio.Future[bool]"], 

328 timer: BaseTimerContext, 

329 request_info: RequestInfo, 

330 traces: List["Trace"], 

331 loop: asyncio.AbstractEventLoop, 

332 session: "ClientSession", 

333 ) -> None: 

334 # URL forbids subclasses, so a simple type check is enough. 

335 assert type(url) is URL 

336 

337 self.method = method 

338 

339 self._real_url = url 

340 self._url = url.with_fragment(None) if url.raw_fragment else url 

341 if writer is not None: 

342 self._writer = writer 

343 if continue100 is not None: 

344 self._continue = continue100 

345 self._request_info = request_info 

346 self._timer = timer if timer is not None else TimerNoop() 

347 self._cache: Dict[str, Any] = {} 

348 self._traces = traces 

349 self._loop = loop 

350 # Save reference to _resolve_charset, so that get_encoding() will still 

351 # work after the response has finished reading the body. 

352 # TODO: Fix session=None in tests (see ClientRequest.__init__). 

353 if session is not None: 

354 # store a reference to session #1985 

355 self._session = session 

356 self._resolve_charset = session._resolve_charset 

357 if loop.get_debug(): 

358 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

359 

360 def __reset_writer(self, _: object = None) -> None: 

361 self.__writer = None 

362 

363 @property 

364 def _writer(self) -> Optional["asyncio.Task[None]"]: 

365 """The writer task for streaming data. 

366 

367 _writer is only provided for backwards compatibility 

368 for subclasses that may need to access it. 

369 """ 

370 return self.__writer 

371 

372 @_writer.setter 

373 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 

374 """Set the writer task for streaming data.""" 

375 if self.__writer is not None: 

376 self.__writer.remove_done_callback(self.__reset_writer) 

377 self.__writer = writer 

378 if writer is None: 

379 return 

380 if writer.done(): 

381 # The writer is already done, so we can clear it immediately. 

382 self.__writer = None 

383 else: 

384 writer.add_done_callback(self.__reset_writer) 

385 

386 @property 

387 def cookies(self) -> SimpleCookie: 

388 if self._cookies is None: 

389 if self._raw_cookie_headers is not None: 

390 # Parse cookies for response.cookies (SimpleCookie for backward compatibility) 

391 cookies = SimpleCookie() 

392 # Use parse_set_cookie_headers for more lenient parsing that handles 

393 # malformed cookies better than SimpleCookie.load 

394 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers)) 

395 self._cookies = cookies 

396 else: 

397 self._cookies = SimpleCookie() 

398 return self._cookies 

399 

400 @cookies.setter 

401 def cookies(self, cookies: SimpleCookie) -> None: 

402 self._cookies = cookies 

403 # Generate raw cookie headers from the SimpleCookie 

404 if cookies: 

405 self._raw_cookie_headers = tuple( 

406 morsel.OutputString() for morsel in cookies.values() 

407 ) 

408 else: 

409 self._raw_cookie_headers = None 

410 

411 @reify 

412 def url(self) -> URL: 

413 return self._url 

414 

415 @reify 

416 def url_obj(self) -> URL: 

417 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2) 

418 return self._url 

419 

420 @reify 

421 def real_url(self) -> URL: 

422 return self._real_url 

423 

424 @reify 

425 def host(self) -> str: 

426 assert self._url.host is not None 

427 return self._url.host 

428 

429 @reify 

430 def headers(self) -> "CIMultiDictProxy[str]": 

431 return self._headers 

432 

433 @reify 

434 def raw_headers(self) -> RawHeaders: 

435 return self._raw_headers 

436 

437 @reify 

438 def request_info(self) -> RequestInfo: 

439 return self._request_info 

440 

441 @reify 

442 def content_disposition(self) -> Optional[ContentDisposition]: 

443 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

444 if raw is None: 

445 return None 

446 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

447 params = MappingProxyType(params_dct) 

448 filename = multipart.content_disposition_filename(params) 

449 return ContentDisposition(disposition_type, params, filename) 

450 

451 def __del__(self, _warnings: Any = warnings) -> None: 

452 if self._closed: 

453 return 

454 

455 if self._connection is not None: 

456 self._connection.release() 

457 self._cleanup_writer() 

458 

459 if self._loop.get_debug(): 

460 kwargs = {"source": self} 

461 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs) 

462 context = {"client_response": self, "message": "Unclosed response"} 

463 if self._source_traceback: 

464 context["source_traceback"] = self._source_traceback 

465 self._loop.call_exception_handler(context) 

466 

467 def __repr__(self) -> str: 

468 out = io.StringIO() 

469 ascii_encodable_url = str(self.url) 

470 if self.reason: 

471 ascii_encodable_reason = self.reason.encode( 

472 "ascii", "backslashreplace" 

473 ).decode("ascii") 

474 else: 

475 ascii_encodable_reason = "None" 

476 print( 

477 "<ClientResponse({}) [{} {}]>".format( 

478 ascii_encodable_url, self.status, ascii_encodable_reason 

479 ), 

480 file=out, 

481 ) 

482 print(self.headers, file=out) 

483 return out.getvalue() 

484 

485 @property 

486 def connection(self) -> Optional["Connection"]: 

487 return self._connection 

488 

489 @reify 

490 def history(self) -> Tuple["ClientResponse", ...]: 

491 """A sequence of of responses, if redirects occurred.""" 

492 return self._history 

493 

494 @reify 

495 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]": 

496 links_str = ", ".join(self.headers.getall("link", [])) 

497 

498 if not links_str: 

499 return MultiDictProxy(MultiDict()) 

500 

501 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict() 

502 

503 for val in re.split(r",(?=\s*<)", links_str): 

504 match = re.match(r"\s*<(.*)>(.*)", val) 

505 if match is None: # pragma: no cover 

506 # the check exists to suppress mypy error 

507 continue 

508 url, params_str = match.groups() 

509 params = params_str.split(";")[1:] 

510 

511 link: MultiDict[Union[str, URL]] = MultiDict() 

512 

513 for param in params: 

514 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

515 if match is None: # pragma: no cover 

516 # the check exists to suppress mypy error 

517 continue 

518 key, _, value, _ = match.groups() 

519 

520 link.add(key, value) 

521 

522 key = link.get("rel", url) 

523 

524 link.add("url", self.url.join(URL(url))) 

525 

526 links.add(str(key), MultiDictProxy(link)) 

527 

528 return MultiDictProxy(links) 

529 

530 async def start(self, connection: "Connection") -> "ClientResponse": 

531 """Start response processing.""" 

532 self._closed = False 

533 self._protocol = connection.protocol 

534 self._connection = connection 

535 

536 with self._timer: 

537 while True: 

538 # read response 

539 try: 

540 protocol = self._protocol 

541 message, payload = await protocol.read() # type: ignore[union-attr] 

542 except http.HttpProcessingError as exc: 

543 raise ClientResponseError( 

544 self.request_info, 

545 self.history, 

546 status=exc.code, 

547 message=exc.message, 

548 headers=exc.headers, 

549 ) from exc 

550 

551 if message.code < 100 or message.code > 199 or message.code == 101: 

552 break 

553 

554 if self._continue is not None: 

555 set_result(self._continue, True) 

556 self._continue = None 

557 

558 # payload eof handler 

559 payload.on_eof(self._response_eof) 

560 

561 # response status 

562 self.version = message.version 

563 self.status = message.code 

564 self.reason = message.reason 

565 

566 # headers 

567 self._headers = message.headers # type is CIMultiDictProxy 

568 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes] 

569 

570 # payload 

571 self.content = payload 

572 

573 # cookies 

574 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()): 

575 # Store raw cookie headers for CookieJar 

576 self._raw_cookie_headers = tuple(cookie_hdrs) 

577 return self 

578 

579 def _response_eof(self) -> None: 

580 if self._closed: 

581 return 

582 

583 # protocol could be None because connection could be detached 

584 protocol = self._connection and self._connection.protocol 

585 if protocol is not None and protocol.upgraded: 

586 return 

587 

588 self._closed = True 

589 self._cleanup_writer() 

590 self._release_connection() 

591 

592 @property 

593 def closed(self) -> bool: 

594 return self._closed 

595 

596 def close(self) -> None: 

597 if not self._released: 

598 self._notify_content() 

599 

600 self._closed = True 

601 if self._loop is None or self._loop.is_closed(): 

602 return 

603 

604 self._cleanup_writer() 

605 if self._connection is not None: 

606 self._connection.close() 

607 self._connection = None 

608 

609 def release(self) -> Any: 

610 if not self._released: 

611 self._notify_content() 

612 

613 self._closed = True 

614 

615 self._cleanup_writer() 

616 self._release_connection() 

617 return noop() 

618 

619 @property 

620 def ok(self) -> bool: 

621 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

622 

623 This is **not** a check for ``200 OK`` but a check that the response 

624 status is under 400. 

625 """ 

626 return 400 > self.status 

627 

628 def raise_for_status(self) -> None: 

629 if not self.ok: 

630 # reason should always be not None for a started response 

631 assert self.reason is not None 

632 

633 # If we're in a context we can rely on __aexit__() to release as the 

634 # exception propagates. 

635 if not self._in_context: 

636 self.release() 

637 

638 raise ClientResponseError( 

639 self.request_info, 

640 self.history, 

641 status=self.status, 

642 message=self.reason, 

643 headers=self.headers, 

644 ) 

645 

646 def _release_connection(self) -> None: 

647 if self._connection is not None: 

648 if self.__writer is None: 

649 self._connection.release() 

650 self._connection = None 

651 else: 

652 self.__writer.add_done_callback(lambda f: self._release_connection()) 

653 

654 async def _wait_released(self) -> None: 

655 if self.__writer is not None: 

656 try: 

657 await self.__writer 

658 except asyncio.CancelledError: 

659 if ( 

660 sys.version_info >= (3, 11) 

661 and (task := asyncio.current_task()) 

662 and task.cancelling() 

663 ): 

664 raise 

665 self._release_connection() 

666 

667 def _cleanup_writer(self) -> None: 

668 if self.__writer is not None: 

669 self.__writer.cancel() 

670 self._session = None 

671 

672 def _notify_content(self) -> None: 

673 content = self.content 

674 if content and content.exception() is None: 

675 set_exception(content, _CONNECTION_CLOSED_EXCEPTION) 

676 self._released = True 

677 

678 async def wait_for_close(self) -> None: 

679 if self.__writer is not None: 

680 try: 

681 await self.__writer 

682 except asyncio.CancelledError: 

683 if ( 

684 sys.version_info >= (3, 11) 

685 and (task := asyncio.current_task()) 

686 and task.cancelling() 

687 ): 

688 raise 

689 self.release() 

690 

691 async def read(self) -> bytes: 

692 """Read response payload.""" 

693 if self._body is None: 

694 try: 

695 self._body = await self.content.read() 

696 for trace in self._traces: 

697 await trace.send_response_chunk_received( 

698 self.method, self.url, self._body 

699 ) 

700 except BaseException: 

701 self.close() 

702 raise 

703 elif self._released: # Response explicitly released 

704 raise ClientConnectionError("Connection closed") 

705 

706 protocol = self._connection and self._connection.protocol 

707 if protocol is None or not protocol.upgraded: 

708 await self._wait_released() # Underlying connection released 

709 return self._body 

710 

711 def get_encoding(self) -> str: 

712 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

713 mimetype = helpers.parse_mimetype(ctype) 

714 

715 encoding = mimetype.parameters.get("charset") 

716 if encoding: 

717 with contextlib.suppress(LookupError, ValueError): 

718 return codecs.lookup(encoding).name 

719 

720 if mimetype.type == "application" and ( 

721 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

722 ): 

723 # RFC 7159 states that the default encoding is UTF-8. 

724 # RFC 7483 defines application/rdap+json 

725 return "utf-8" 

726 

727 if self._body is None: 

728 raise RuntimeError( 

729 "Cannot compute fallback encoding of a not yet read body" 

730 ) 

731 

732 return self._resolve_charset(self, self._body) 

733 

734 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: 

735 """Read response payload and decode.""" 

736 if self._body is None: 

737 await self.read() 

738 

739 if encoding is None: 

740 encoding = self.get_encoding() 

741 

742 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr] 

743 

744 async def json( 

745 self, 

746 *, 

747 encoding: Optional[str] = None, 

748 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

749 content_type: Optional[str] = "application/json", 

750 ) -> Any: 

751 """Read and decodes JSON response.""" 

752 if self._body is None: 

753 await self.read() 

754 

755 if content_type: 

756 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

757 if not _is_expected_content_type(ctype, content_type): 

758 raise ContentTypeError( 

759 self.request_info, 

760 self.history, 

761 status=self.status, 

762 message=( 

763 "Attempt to decode JSON with unexpected mimetype: %s" % ctype 

764 ), 

765 headers=self.headers, 

766 ) 

767 

768 stripped = self._body.strip() # type: ignore[union-attr] 

769 if not stripped: 

770 return None 

771 

772 if encoding is None: 

773 encoding = self.get_encoding() 

774 

775 return loads(stripped.decode(encoding)) 

776 

777 async def __aenter__(self) -> "ClientResponse": 

778 self._in_context = True 

779 return self 

780 

781 async def __aexit__( 

782 self, 

783 exc_type: Optional[Type[BaseException]], 

784 exc_val: Optional[BaseException], 

785 exc_tb: Optional[TracebackType], 

786 ) -> None: 

787 self._in_context = False 

788 # similar to _RequestContextManager, we do not need to check 

789 # for exceptions, response object can close connection 

790 # if state is broken 

791 self.release() 

792 await self.wait_for_close() 

793 

794 

795class ClientRequest: 

796 GET_METHODS = { 

797 hdrs.METH_GET, 

798 hdrs.METH_HEAD, 

799 hdrs.METH_OPTIONS, 

800 hdrs.METH_TRACE, 

801 } 

802 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

803 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 

804 

805 DEFAULT_HEADERS = { 

806 hdrs.ACCEPT: "*/*", 

807 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

808 } 

809 

810 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic. 

811 _body: Union[None, payload.Payload] = None 

812 auth = None 

813 response = None 

814 

815 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data 

816 

817 # These class defaults help create_autospec() work correctly. 

818 # If autospec is improved in future, maybe these can be removed. 

819 url = URL() 

820 method = "GET" 

821 

822 _continue = None # waiter future for '100 Continue' response 

823 

824 _skip_auto_headers: Optional["CIMultiDict[None]"] = None 

825 

826 # N.B. 

827 # Adding __del__ method with self._writer closing doesn't make sense 

828 # because _writer is instance method, thus it keeps a reference to self. 

829 # Until writer has finished finalizer will not be called. 

830 

831 def __init__( 

832 self, 

833 method: str, 

834 url: URL, 

835 *, 

836 params: Query = None, 

837 headers: Optional[LooseHeaders] = None, 

838 skip_auto_headers: Optional[Iterable[str]] = None, 

839 data: Any = None, 

840 cookies: Optional[LooseCookies] = None, 

841 auth: Optional[BasicAuth] = None, 

842 version: http.HttpVersion = http.HttpVersion11, 

843 compress: Union[str, bool, None] = None, 

844 chunked: Optional[bool] = None, 

845 expect100: bool = False, 

846 loop: Optional[asyncio.AbstractEventLoop] = None, 

847 response_class: Optional[Type["ClientResponse"]] = None, 

848 proxy: Optional[URL] = None, 

849 proxy_auth: Optional[BasicAuth] = None, 

850 timer: Optional[BaseTimerContext] = None, 

851 session: Optional["ClientSession"] = None, 

852 ssl: Union[SSLContext, bool, Fingerprint] = True, 

853 proxy_headers: Optional[LooseHeaders] = None, 

854 traces: Optional[List["Trace"]] = None, 

855 trust_env: bool = False, 

856 server_hostname: Optional[str] = None, 

857 ): 

858 if loop is None: 

859 loop = asyncio.get_event_loop() 

860 if match := _CONTAINS_CONTROL_CHAR_RE.search(method): 

861 raise ValueError( 

862 f"Method cannot contain non-token characters {method!r} " 

863 f"(found at least {match.group()!r})" 

864 ) 

865 # URL forbids subclasses, so a simple type check is enough. 

866 assert type(url) is URL, url 

867 if proxy is not None: 

868 assert type(proxy) is URL, proxy 

869 # FIXME: session is None in tests only, need to fix tests 

870 # assert session is not None 

871 if TYPE_CHECKING: 

872 assert session is not None 

873 self._session = session 

874 if params: 

875 url = url.extend_query(params) 

876 self.original_url = url 

877 self.url = url.with_fragment(None) if url.raw_fragment else url 

878 self.method = method.upper() 

879 self.chunked = chunked 

880 self.compress = compress 

881 self.loop = loop 

882 self.length = None 

883 if response_class is None: 

884 real_response_class = ClientResponse 

885 else: 

886 real_response_class = response_class 

887 self.response_class: Type[ClientResponse] = real_response_class 

888 self._timer = timer if timer is not None else TimerNoop() 

889 self._ssl = ssl if ssl is not None else True 

890 self.server_hostname = server_hostname 

891 

892 if loop.get_debug(): 

893 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

894 

895 self.update_version(version) 

896 self.update_host(url) 

897 self.update_headers(headers) 

898 self.update_auto_headers(skip_auto_headers) 

899 self.update_cookies(cookies) 

900 self.update_content_encoding(data) 

901 self.update_auth(auth, trust_env) 

902 self.update_proxy(proxy, proxy_auth, proxy_headers) 

903 

904 self.update_body_from_data(data) 

905 if data is not None or self.method not in self.GET_METHODS: 

906 self.update_transfer_encoding() 

907 self.update_expect_continue(expect100) 

908 self._traces = [] if traces is None else traces 

909 

910 def __reset_writer(self, _: object = None) -> None: 

911 self.__writer = None 

912 

913 def _get_content_length(self) -> Optional[int]: 

914 """Extract and validate Content-Length header value. 

915 

916 Returns parsed Content-Length value or None if not set. 

917 Raises ValueError if header exists but cannot be parsed as an integer. 

918 """ 

919 if hdrs.CONTENT_LENGTH not in self.headers: 

920 return None 

921 

922 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH] 

923 try: 

924 return int(content_length_hdr) 

925 except ValueError: 

926 raise ValueError( 

927 f"Invalid Content-Length header: {content_length_hdr}" 

928 ) from None 

929 

930 @property 

931 def skip_auto_headers(self) -> CIMultiDict[None]: 

932 return self._skip_auto_headers or CIMultiDict() 

933 

934 @property 

935 def _writer(self) -> Optional["asyncio.Task[None]"]: 

936 return self.__writer 

937 

938 @_writer.setter 

939 def _writer(self, writer: "asyncio.Task[None]") -> None: 

940 if self.__writer is not None: 

941 self.__writer.remove_done_callback(self.__reset_writer) 

942 self.__writer = writer 

943 writer.add_done_callback(self.__reset_writer) 

944 

945 def is_ssl(self) -> bool: 

946 return self.url.scheme in _SSL_SCHEMES 

947 

948 @property 

949 def ssl(self) -> Union["SSLContext", bool, Fingerprint]: 

950 return self._ssl 

951 

952 @property 

953 def connection_key(self) -> ConnectionKey: 

954 if proxy_headers := self.proxy_headers: 

955 h: Optional[int] = hash(tuple(proxy_headers.items())) 

956 else: 

957 h = None 

958 url = self.url 

959 return tuple.__new__( 

960 ConnectionKey, 

961 ( 

962 url.raw_host or "", 

963 url.port, 

964 url.scheme in _SSL_SCHEMES, 

965 self._ssl, 

966 self.proxy, 

967 self.proxy_auth, 

968 h, 

969 ), 

970 ) 

971 

972 @property 

973 def host(self) -> str: 

974 ret = self.url.raw_host 

975 assert ret is not None 

976 return ret 

977 

978 @property 

979 def port(self) -> Optional[int]: 

980 return self.url.port 

981 

982 @property 

983 def body(self) -> Union[payload.Payload, Literal[b""]]: 

984 """Request body.""" 

985 # empty body is represented as bytes for backwards compatibility 

986 return self._body or b"" 

987 

988 @body.setter 

989 def body(self, value: Any) -> None: 

990 """Set request body with warning for non-autoclose payloads. 

991 

992 WARNING: This setter must be called from within an event loop and is not 

993 thread-safe. Setting body outside of an event loop may raise RuntimeError 

994 when closing file-based payloads. 

995 

996 DEPRECATED: Direct assignment to body is deprecated and will be removed 

997 in a future version. Use await update_body() instead for proper resource 

998 management. 

999 """ 

1000 # Close existing payload if present 

1001 if self._body is not None: 

1002 # Warn if the payload needs manual closing 

1003 # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload 

1004 _warn_if_unclosed_payload(self._body, stacklevel=3) 

1005 # NOTE: In the future, when we remove sync close support, 

1006 # this setter will need to be removed and only the async 

1007 # update_body() method will be available. For now, we call 

1008 # _close() for backwards compatibility. 

1009 self._body._close() 

1010 self._update_body(value) 

1011 

1012 @property 

1013 def request_info(self) -> RequestInfo: 

1014 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 

1015 # These are created on every request, so we use a NamedTuple 

1016 # for performance reasons. We don't use the RequestInfo.__new__ 

1017 # method because it has a different signature which is provided 

1018 # for backwards compatibility only. 

1019 return tuple.__new__( 

1020 RequestInfo, (self.url, self.method, headers, self.original_url) 

1021 ) 

1022 

1023 @property 

1024 def session(self) -> "ClientSession": 

1025 """Return the ClientSession instance. 

1026 

1027 This property provides access to the ClientSession that initiated 

1028 this request, allowing middleware to make additional requests 

1029 using the same session. 

1030 """ 

1031 return self._session 

1032 

1033 def update_host(self, url: URL) -> None: 

1034 """Update destination host, port and connection type (ssl).""" 

1035 # get host/port 

1036 if not url.raw_host: 

1037 raise InvalidURL(url) 

1038 

1039 # basic auth info 

1040 if url.raw_user or url.raw_password: 

1041 self.auth = helpers.BasicAuth(url.user or "", url.password or "") 

1042 

1043 def update_version(self, version: Union[http.HttpVersion, str]) -> None: 

1044 """Convert request version to two elements tuple. 

1045 

1046 parser HTTP version '1.1' => (1, 1) 

1047 """ 

1048 if isinstance(version, str): 

1049 v = [part.strip() for part in version.split(".", 1)] 

1050 try: 

1051 version = http.HttpVersion(int(v[0]), int(v[1])) 

1052 except ValueError: 

1053 raise ValueError( 

1054 f"Can not parse http version number: {version}" 

1055 ) from None 

1056 self.version = version 

1057 

1058 def update_headers(self, headers: Optional[LooseHeaders]) -> None: 

1059 """Update request headers.""" 

1060 self.headers: CIMultiDict[str] = CIMultiDict() 

1061 

1062 # Build the host header 

1063 host = self.url.host_port_subcomponent 

1064 

1065 # host_port_subcomponent is None when the URL is a relative URL. 

1066 # but we know we do not have a relative URL here. 

1067 assert host is not None 

1068 self.headers[hdrs.HOST] = host 

1069 

1070 if not headers: 

1071 return 

1072 

1073 if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 

1074 headers = headers.items() 

1075 

1076 for key, value in headers: # type: ignore[misc] 

1077 # A special case for Host header 

1078 if key in hdrs.HOST_ALL: 

1079 self.headers[key] = value 

1080 else: 

1081 self.headers.add(key, value) 

1082 

1083 def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None: 

1084 if skip_auto_headers is not None: 

1085 self._skip_auto_headers = CIMultiDict( 

1086 (hdr, None) for hdr in sorted(skip_auto_headers) 

1087 ) 

1088 used_headers = self.headers.copy() 

1089 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type] 

1090 else: 

1091 # Fast path when there are no headers to skip 

1092 # which is the most common case. 

1093 used_headers = self.headers 

1094 

1095 for hdr, val in self.DEFAULT_HEADERS.items(): 

1096 if hdr not in used_headers: 

1097 self.headers[hdr] = val 

1098 

1099 if hdrs.USER_AGENT not in used_headers: 

1100 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

1101 

1102 def update_cookies(self, cookies: Optional[LooseCookies]) -> None: 

1103 """Update request cookies header.""" 

1104 if not cookies: 

1105 return 

1106 

1107 c = SimpleCookie() 

1108 if hdrs.COOKIE in self.headers: 

1109 # parse_cookie_header for RFC 6265 compliant Cookie header parsing 

1110 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, ""))) 

1111 del self.headers[hdrs.COOKIE] 

1112 

1113 if isinstance(cookies, Mapping): 

1114 iter_cookies = cookies.items() 

1115 else: 

1116 iter_cookies = cookies # type: ignore[assignment] 

1117 for name, value in iter_cookies: 

1118 if isinstance(value, Morsel): 

1119 # Use helper to preserve coded_value exactly as sent by server 

1120 c[name] = preserve_morsel_with_coded_value(value) 

1121 else: 

1122 c[name] = value # type: ignore[assignment] 

1123 

1124 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

1125 

1126 def update_content_encoding(self, data: Any) -> None: 

1127 """Set request content encoding.""" 

1128 if not data: 

1129 # Don't compress an empty body. 

1130 self.compress = None 

1131 return 

1132 

1133 if self.headers.get(hdrs.CONTENT_ENCODING): 

1134 if self.compress: 

1135 raise ValueError( 

1136 "compress can not be set if Content-Encoding header is set" 

1137 ) 

1138 elif self.compress: 

1139 if not isinstance(self.compress, str): 

1140 self.compress = "deflate" 

1141 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

1142 self.chunked = True # enable chunked, no need to deal with length 

1143 

1144 def update_transfer_encoding(self) -> None: 

1145 """Analyze transfer-encoding header.""" 

1146 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

1147 

1148 if "chunked" in te: 

1149 if self.chunked: 

1150 raise ValueError( 

1151 "chunked can not be set " 

1152 'if "Transfer-Encoding: chunked" header is set' 

1153 ) 

1154 

1155 elif self.chunked: 

1156 if hdrs.CONTENT_LENGTH in self.headers: 

1157 raise ValueError( 

1158 "chunked can not be set if Content-Length header is set" 

1159 ) 

1160 

1161 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

1162 

1163 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None: 

1164 """Set basic auth.""" 

1165 if auth is None: 

1166 auth = self.auth 

1167 if auth is None and trust_env and self.url.host is not None: 

1168 netrc_obj = netrc_from_env() 

1169 with contextlib.suppress(LookupError): 

1170 auth = basicauth_from_netrc(netrc_obj, self.url.host) 

1171 if auth is None: 

1172 return 

1173 

1174 if not isinstance(auth, helpers.BasicAuth): 

1175 raise TypeError("BasicAuth() tuple is required instead") 

1176 

1177 self.headers[hdrs.AUTHORIZATION] = auth.encode() 

1178 

1179 def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None: 

1180 """Update request body from data.""" 

1181 if self._body is not None: 

1182 _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel) 

1183 

1184 if body is None: 

1185 self._body = None 

1186 # Set Content-Length to 0 when body is None for methods that expect a body 

1187 if ( 

1188 self.method not in self.GET_METHODS 

1189 and not self.chunked 

1190 and hdrs.CONTENT_LENGTH not in self.headers 

1191 ): 

1192 self.headers[hdrs.CONTENT_LENGTH] = "0" 

1193 return 

1194 

1195 # FormData 

1196 maybe_payload = body() if isinstance(body, FormData) else body 

1197 

1198 try: 

1199 body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None) 

1200 except payload.LookupError: 

1201 body_payload = FormData(maybe_payload)() # type: ignore[arg-type] 

1202 

1203 self._body = body_payload 

1204 # enable chunked encoding if needed 

1205 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers: 

1206 if (size := body_payload.size) is not None: 

1207 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

1208 else: 

1209 self.chunked = True 

1210 

1211 # copy payload headers 

1212 assert body_payload.headers 

1213 headers = self.headers 

1214 skip_headers = self._skip_auto_headers 

1215 for key, value in body_payload.headers.items(): 

1216 if key in headers or (skip_headers is not None and key in skip_headers): 

1217 continue 

1218 headers[key] = value 

1219 

1220 def _update_body(self, body: Any) -> None: 

1221 """Update request body after its already been set.""" 

1222 # Remove existing Content-Length header since body is changing 

1223 if hdrs.CONTENT_LENGTH in self.headers: 

1224 del self.headers[hdrs.CONTENT_LENGTH] 

1225 

1226 # Remove existing Transfer-Encoding header to avoid conflicts 

1227 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers: 

1228 del self.headers[hdrs.TRANSFER_ENCODING] 

1229 

1230 # Now update the body using the existing method 

1231 # Called from _update_body, add 1 to stacklevel from caller 

1232 self.update_body_from_data(body, _stacklevel=4) 

1233 

1234 # Update transfer encoding headers if needed (same logic as __init__) 

1235 if body is not None or self.method not in self.GET_METHODS: 

1236 self.update_transfer_encoding() 

1237 

1238 async def update_body(self, body: Any) -> None: 

1239 """ 

1240 Update request body and close previous payload if needed. 

1241 

1242 This method safely updates the request body by first closing any existing 

1243 payload to prevent resource leaks, then setting the new body. 

1244 

1245 IMPORTANT: Always use this method instead of setting request.body directly. 

1246 Direct assignment to request.body will leak resources if the previous body 

1247 contains file handles, streams, or other resources that need cleanup. 

1248 

1249 Args: 

1250 body: The new body content. Can be: 

1251 - bytes/bytearray: Raw binary data 

1252 - str: Text data (will be encoded using charset from Content-Type) 

1253 - FormData: Form data that will be encoded as multipart/form-data 

1254 - Payload: A pre-configured payload object 

1255 - AsyncIterable: An async iterable of bytes chunks 

1256 - File-like object: Will be read and sent as binary data 

1257 - None: Clears the body 

1258 

1259 Usage: 

1260 # CORRECT: Use update_body 

1261 await request.update_body(b"new request data") 

1262 

1263 # WRONG: Don't set body directly 

1264 # request.body = b"new request data" # This will leak resources! 

1265 

1266 # Update with form data 

1267 form_data = FormData() 

1268 form_data.add_field('field', 'value') 

1269 await request.update_body(form_data) 

1270 

1271 # Clear body 

1272 await request.update_body(None) 

1273 

1274 Note: 

1275 This method is async because it may need to close file handles or 

1276 other resources associated with the previous payload. Always await 

1277 this method to ensure proper cleanup. 

1278 

1279 Warning: 

1280 Setting request.body directly is highly discouraged and can lead to: 

1281 - Resource leaks (unclosed file handles, streams) 

1282 - Memory leaks (unreleased buffers) 

1283 - Unexpected behavior with streaming payloads 

1284 

1285 It is not recommended to change the payload type in middleware. If the 

1286 body was already set (e.g., as bytes), it's best to keep the same type 

1287 rather than converting it (e.g., to str) as this may result in unexpected 

1288 behavior. 

1289 

1290 See Also: 

1291 - update_body_from_data: Synchronous body update without cleanup 

1292 - body property: Direct body access (STRONGLY DISCOURAGED) 

1293 

1294 """ 

1295 # Close existing payload if it exists and needs closing 

1296 if self._body is not None: 

1297 await self._body.close() 

1298 self._update_body(body) 

1299 

1300 def update_expect_continue(self, expect: bool = False) -> None: 

1301 if expect: 

1302 self.headers[hdrs.EXPECT] = "100-continue" 

1303 elif ( 

1304 hdrs.EXPECT in self.headers 

1305 and self.headers[hdrs.EXPECT].lower() == "100-continue" 

1306 ): 

1307 expect = True 

1308 

1309 if expect: 

1310 self._continue = self.loop.create_future() 

1311 

1312 def update_proxy( 

1313 self, 

1314 proxy: Optional[URL], 

1315 proxy_auth: Optional[BasicAuth], 

1316 proxy_headers: Optional[LooseHeaders], 

1317 ) -> None: 

1318 self.proxy = proxy 

1319 if proxy is None: 

1320 self.proxy_auth = None 

1321 self.proxy_headers = None 

1322 return 

1323 

1324 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 

1325 raise ValueError("proxy_auth must be None or BasicAuth() tuple") 

1326 self.proxy_auth = proxy_auth 

1327 

1328 if proxy_headers is not None and not isinstance( 

1329 proxy_headers, (MultiDict, MultiDictProxy) 

1330 ): 

1331 proxy_headers = CIMultiDict(proxy_headers) 

1332 self.proxy_headers = proxy_headers 

1333 

1334 async def write_bytes( 

1335 self, 

1336 writer: AbstractStreamWriter, 

1337 conn: "Connection", 

1338 content_length: Optional[int] = None, 

1339 ) -> None: 

1340 """ 

1341 Write the request body to the connection stream. 

1342 

1343 This method handles writing different types of request bodies: 

1344 1. Payload objects (using their specialized write_with_length method) 

1345 2. Bytes/bytearray objects 

1346 3. Iterable body content 

1347 

1348 Args: 

1349 writer: The stream writer to write the body to 

1350 conn: The connection being used for this request 

1351 content_length: Optional maximum number of bytes to write from the body 

1352 (None means write the entire body) 

1353 

1354 The method properly handles: 

1355 - Waiting for 100-Continue responses if required 

1356 - Content length constraints for chunked encoding 

1357 - Error handling for network issues, cancellation, and other exceptions 

1358 - Signaling EOF and timeout management 

1359 

1360 Raises: 

1361 ClientOSError: When there's an OS-level error writing the body 

1362 ClientConnectionError: When there's a general connection error 

1363 asyncio.CancelledError: When the operation is cancelled 

1364 

1365 """ 

1366 # 100 response 

1367 if self._continue is not None: 

1368 # Force headers to be sent before waiting for 100-continue 

1369 writer.send_headers() 

1370 await writer.drain() 

1371 await self._continue 

1372 

1373 protocol = conn.protocol 

1374 assert protocol is not None 

1375 try: 

1376 # This should be a rare case but the 

1377 # self._body can be set to None while 

1378 # the task is being started or we wait above 

1379 # for the 100-continue response. 

1380 # The more likely case is we have an empty 

1381 # payload, but 100-continue is still expected. 

1382 if self._body is not None: 

1383 await self._body.write_with_length(writer, content_length) 

1384 except OSError as underlying_exc: 

1385 reraised_exc = underlying_exc 

1386 

1387 # Distinguish between timeout and other OS errors for better error reporting 

1388 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance( 

1389 underlying_exc, asyncio.TimeoutError 

1390 ) 

1391 if exc_is_not_timeout: 

1392 reraised_exc = ClientOSError( 

1393 underlying_exc.errno, 

1394 f"Can not write request body for {self.url !s}", 

1395 ) 

1396 

1397 set_exception(protocol, reraised_exc, underlying_exc) 

1398 except asyncio.CancelledError: 

1399 # Body hasn't been fully sent, so connection can't be reused 

1400 conn.close() 

1401 raise 

1402 except Exception as underlying_exc: 

1403 set_exception( 

1404 protocol, 

1405 ClientConnectionError( 

1406 "Failed to send bytes into the underlying connection " 

1407 f"{conn !s}: {underlying_exc!r}", 

1408 ), 

1409 underlying_exc, 

1410 ) 

1411 else: 

1412 # Successfully wrote the body, signal EOF and start response timeout 

1413 await writer.write_eof() 

1414 protocol.start_timeout() 

1415 

1416 async def send(self, conn: "Connection") -> "ClientResponse": 

1417 # Specify request target: 

1418 # - CONNECT request must send authority form URI 

1419 # - not CONNECT proxy must send absolute form URI 

1420 # - most common is origin form URI 

1421 if self.method == hdrs.METH_CONNECT: 

1422 connect_host = self.url.host_subcomponent 

1423 assert connect_host is not None 

1424 path = f"{connect_host}:{self.url.port}" 

1425 elif self.proxy and not self.is_ssl(): 

1426 path = str(self.url) 

1427 else: 

1428 path = self.url.raw_path_qs 

1429 

1430 protocol = conn.protocol 

1431 assert protocol is not None 

1432 writer = StreamWriter( 

1433 protocol, 

1434 self.loop, 

1435 on_chunk_sent=( 

1436 functools.partial(self._on_chunk_request_sent, self.method, self.url) 

1437 if self._traces 

1438 else None 

1439 ), 

1440 on_headers_sent=( 

1441 functools.partial(self._on_headers_request_sent, self.method, self.url) 

1442 if self._traces 

1443 else None 

1444 ), 

1445 ) 

1446 

1447 if self.compress: 

1448 writer.enable_compression(self.compress) # type: ignore[arg-type] 

1449 

1450 if self.chunked is not None: 

1451 writer.enable_chunking() 

1452 

1453 # set default content-type 

1454 if ( 

1455 self.method in self.POST_METHODS 

1456 and ( 

1457 self._skip_auto_headers is None 

1458 or hdrs.CONTENT_TYPE not in self._skip_auto_headers 

1459 ) 

1460 and hdrs.CONTENT_TYPE not in self.headers 

1461 ): 

1462 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

1463 

1464 v = self.version 

1465 if hdrs.CONNECTION not in self.headers: 

1466 if conn._connector.force_close: 

1467 if v == HttpVersion11: 

1468 self.headers[hdrs.CONNECTION] = "close" 

1469 elif v == HttpVersion10: 

1470 self.headers[hdrs.CONNECTION] = "keep-alive" 

1471 

1472 # status + headers 

1473 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}" 

1474 

1475 # Buffer headers for potential coalescing with body 

1476 await writer.write_headers(status_line, self.headers) 

1477 

1478 task: Optional["asyncio.Task[None]"] 

1479 if self._body or self._continue is not None or protocol.writing_paused: 

1480 coro = self.write_bytes(writer, conn, self._get_content_length()) 

1481 if sys.version_info >= (3, 12): 

1482 # Optimization for Python 3.12, try to write 

1483 # bytes immediately to avoid having to schedule 

1484 # the task on the event loop. 

1485 task = asyncio.Task(coro, loop=self.loop, eager_start=True) 

1486 else: 

1487 task = self.loop.create_task(coro) 

1488 if task.done(): 

1489 task = None 

1490 else: 

1491 self._writer = task 

1492 else: 

1493 # We have nothing to write because 

1494 # - there is no body 

1495 # - the protocol does not have writing paused 

1496 # - we are not waiting for a 100-continue response 

1497 protocol.start_timeout() 

1498 writer.set_eof() 

1499 task = None 

1500 response_class = self.response_class 

1501 assert response_class is not None 

1502 self.response = response_class( 

1503 self.method, 

1504 self.original_url, 

1505 writer=task, 

1506 continue100=self._continue, 

1507 timer=self._timer, 

1508 request_info=self.request_info, 

1509 traces=self._traces, 

1510 loop=self.loop, 

1511 session=self._session, 

1512 ) 

1513 return self.response 

1514 

1515 async def close(self) -> None: 

1516 if self.__writer is not None: 

1517 try: 

1518 await self.__writer 

1519 except asyncio.CancelledError: 

1520 if ( 

1521 sys.version_info >= (3, 11) 

1522 and (task := asyncio.current_task()) 

1523 and task.cancelling() 

1524 ): 

1525 raise 

1526 

1527 def terminate(self) -> None: 

1528 if self.__writer is not None: 

1529 if not self.loop.is_closed(): 

1530 self.__writer.cancel() 

1531 self.__writer.remove_done_callback(self.__reset_writer) 

1532 self.__writer = None 

1533 

1534 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

1535 for trace in self._traces: 

1536 await trace.send_request_chunk_sent(method, url, chunk) 

1537 

1538 async def _on_headers_request_sent( 

1539 self, method: str, url: URL, headers: "CIMultiDict[str]" 

1540 ) -> None: 

1541 for trace in self._traces: 

1542 await trace.send_request_headers(method, url, headers)