Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/client_reqrep.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

751 statements  

1import asyncio 

2import codecs 

3import contextlib 

4import functools 

5import io 

6import re 

7import sys 

8import traceback 

9import warnings 

10from collections.abc import Mapping 

11from hashlib import md5, sha1, sha256 

12from http.cookies import Morsel, SimpleCookie 

13from types import MappingProxyType, TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Callable, 

18 Dict, 

19 Iterable, 

20 List, 

21 Literal, 

22 NamedTuple, 

23 Optional, 

24 Tuple, 

25 Type, 

26 Union, 

27) 

28 

29import attr 

30from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

31from yarl import URL 

32 

33from . import hdrs, helpers, http, multipart, payload 

34from ._cookie_helpers import ( 

35 parse_cookie_header, 

36 parse_set_cookie_headers, 

37 preserve_morsel_with_coded_value, 

38) 

39from .abc import AbstractStreamWriter 

40from .client_exceptions import ( 

41 ClientConnectionError, 

42 ClientOSError, 

43 ClientResponseError, 

44 ContentTypeError, 

45 InvalidURL, 

46 ServerFingerprintMismatch, 

47) 

48from .compression_utils import HAS_BROTLI 

49from .formdata import FormData 

50from .helpers import ( 

51 _SENTINEL, 

52 BaseTimerContext, 

53 BasicAuth, 

54 HeadersMixin, 

55 TimerNoop, 

56 basicauth_from_netrc, 

57 netrc_from_env, 

58 noop, 

59 reify, 

60 set_exception, 

61 set_result, 

62) 

63from .http import ( 

64 SERVER_SOFTWARE, 

65 HttpVersion, 

66 HttpVersion10, 

67 HttpVersion11, 

68 StreamWriter, 

69) 

70from .streams import StreamReader 

71from .typedefs import ( 

72 DEFAULT_JSON_DECODER, 

73 JSONDecoder, 

74 LooseCookies, 

75 LooseHeaders, 

76 Query, 

77 RawHeaders, 

78) 

79 

80if TYPE_CHECKING: 

81 import ssl 

82 from ssl import SSLContext 

83else: 

84 try: 

85 import ssl 

86 from ssl import SSLContext 

87 except ImportError: # pragma: no cover 

88 ssl = None # type: ignore[assignment] 

89 SSLContext = object # type: ignore[misc,assignment] 

90 

91 

92__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

93 

94 

95if TYPE_CHECKING: 

96 from .client import ClientSession 

97 from .connector import Connection 

98 from .tracing import Trace 

99 

100 

101_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 

102_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

103json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json") 

104 

105 

106def _gen_default_accept_encoding() -> str: 

107 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate" 

108 

109 

110@attr.s(auto_attribs=True, frozen=True, slots=True) 

111class ContentDisposition: 

112 type: Optional[str] 

113 parameters: "MappingProxyType[str, str]" 

114 filename: Optional[str] 

115 

116 

117class _RequestInfo(NamedTuple): 

118 url: URL 

119 method: str 

120 headers: "CIMultiDictProxy[str]" 

121 real_url: URL 

122 

123 

124class RequestInfo(_RequestInfo): 

125 

126 def __new__( 

127 cls, 

128 url: URL, 

129 method: str, 

130 headers: "CIMultiDictProxy[str]", 

131 real_url: URL = _SENTINEL, # type: ignore[assignment] 

132 ) -> "RequestInfo": 

133 """Create a new RequestInfo instance. 

134 

135 For backwards compatibility, the real_url parameter is optional. 

136 """ 

137 return tuple.__new__( 

138 cls, (url, method, headers, url if real_url is _SENTINEL else real_url) 

139 ) 

140 

141 

142class Fingerprint: 

143 HASHFUNC_BY_DIGESTLEN = { 

144 16: md5, 

145 20: sha1, 

146 32: sha256, 

147 } 

148 

149 def __init__(self, fingerprint: bytes) -> None: 

150 digestlen = len(fingerprint) 

151 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

152 if not hashfunc: 

153 raise ValueError("fingerprint has invalid length") 

154 elif hashfunc is md5 or hashfunc is sha1: 

155 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.") 

156 self._hashfunc = hashfunc 

157 self._fingerprint = fingerprint 

158 

159 @property 

160 def fingerprint(self) -> bytes: 

161 return self._fingerprint 

162 

163 def check(self, transport: asyncio.Transport) -> None: 

164 if not transport.get_extra_info("sslcontext"): 

165 return 

166 sslobj = transport.get_extra_info("ssl_object") 

167 cert = sslobj.getpeercert(binary_form=True) 

168 got = self._hashfunc(cert).digest() 

169 if got != self._fingerprint: 

170 host, port, *_ = transport.get_extra_info("peername") 

171 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

172 

173 

174if ssl is not None: 

175 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None)) 

176else: # pragma: no cover 

177 SSL_ALLOWED_TYPES = (bool, type(None)) 

178 

179 

180def _merge_ssl_params( 

181 ssl: Union["SSLContext", bool, Fingerprint], 

182 verify_ssl: Optional[bool], 

183 ssl_context: Optional["SSLContext"], 

184 fingerprint: Optional[bytes], 

185) -> Union["SSLContext", bool, Fingerprint]: 

186 if ssl is None: 

187 ssl = True # Double check for backwards compatibility 

188 if verify_ssl is not None and not verify_ssl: 

189 warnings.warn( 

190 "verify_ssl is deprecated, use ssl=False instead", 

191 DeprecationWarning, 

192 stacklevel=3, 

193 ) 

194 if ssl is not True: 

195 raise ValueError( 

196 "verify_ssl, ssl_context, fingerprint and ssl " 

197 "parameters are mutually exclusive" 

198 ) 

199 else: 

200 ssl = False 

201 if ssl_context is not None: 

202 warnings.warn( 

203 "ssl_context is deprecated, use ssl=context instead", 

204 DeprecationWarning, 

205 stacklevel=3, 

206 ) 

207 if ssl is not True: 

208 raise ValueError( 

209 "verify_ssl, ssl_context, fingerprint and ssl " 

210 "parameters are mutually exclusive" 

211 ) 

212 else: 

213 ssl = ssl_context 

214 if fingerprint is not None: 

215 warnings.warn( 

216 "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead", 

217 DeprecationWarning, 

218 stacklevel=3, 

219 ) 

220 if ssl is not True: 

221 raise ValueError( 

222 "verify_ssl, ssl_context, fingerprint and ssl " 

223 "parameters are mutually exclusive" 

224 ) 

225 else: 

226 ssl = Fingerprint(fingerprint) 

227 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

228 raise TypeError( 

229 "ssl should be SSLContext, bool, Fingerprint or None, " 

230 "got {!r} instead.".format(ssl) 

231 ) 

232 return ssl 

233 

234 

235_SSL_SCHEMES = frozenset(("https", "wss")) 

236 

237 

238# ConnectionKey is a NamedTuple because it is used as a key in a dict 

239# and a set in the connector. Since a NamedTuple is a tuple it uses 

240# the fast native tuple __hash__ and __eq__ implementation in CPython. 

241class ConnectionKey(NamedTuple): 

242 # the key should contain an information about used proxy / TLS 

243 # to prevent reusing wrong connections from a pool 

244 host: str 

245 port: Optional[int] 

246 is_ssl: bool 

247 ssl: Union[SSLContext, bool, Fingerprint] 

248 proxy: Optional[URL] 

249 proxy_auth: Optional[BasicAuth] 

250 proxy_headers_hash: Optional[int] # hash(CIMultiDict) 

251 

252 

253def _is_expected_content_type( 

254 response_content_type: str, expected_content_type: str 

255) -> bool: 

256 if expected_content_type == "application/json": 

257 return json_re.match(response_content_type) is not None 

258 return expected_content_type in response_content_type 

259 

260 

261def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None: 

262 """Warn if the payload is not closed. 

263 

264 Callers must check that the body is a Payload before calling this method. 

265 

266 Args: 

267 payload: The payload to check 

268 stacklevel: Stack level for the warning (default 2 for direct callers) 

269 """ 

270 if not payload.autoclose and not payload.consumed: 

271 warnings.warn( 

272 "The previous request body contains unclosed resources. " 

273 "Use await request.update_body() instead of setting request.body " 

274 "directly to properly close resources and avoid leaks.", 

275 ResourceWarning, 

276 stacklevel=stacklevel, 

277 ) 

278 

279 

280class ClientResponse(HeadersMixin): 

281 

282 # Some of these attributes are None when created, 

283 # but will be set by the start() method. 

284 # As the end user will likely never see the None values, we cheat the types below. 

285 # from the Status-Line of the response 

286 version: Optional[HttpVersion] = None # HTTP-Version 

287 status: int = None # type: ignore[assignment] # Status-Code 

288 reason: Optional[str] = None # Reason-Phrase 

289 

290 content: StreamReader = None # type: ignore[assignment] # Payload stream 

291 _body: Optional[bytes] = None 

292 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment] 

293 _history: Tuple["ClientResponse", ...] = () 

294 _raw_headers: RawHeaders = None # type: ignore[assignment] 

295 

296 _connection: Optional["Connection"] = None # current connection 

297 _cookies: Optional[SimpleCookie] = None 

298 _raw_cookie_headers: Optional[Tuple[str, ...]] = None 

299 _continue: Optional["asyncio.Future[bool]"] = None 

300 _source_traceback: Optional[traceback.StackSummary] = None 

301 _session: Optional["ClientSession"] = None 

302 # set up by ClientRequest after ClientResponse object creation 

303 # post-init stage allows to not change ctor signature 

304 _closed = True # to allow __del__ for non-initialized properly response 

305 _released = False 

306 _in_context = False 

307 

308 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8" 

309 

310 __writer: Optional["asyncio.Task[None]"] = None 

311 

312 def __init__( 

313 self, 

314 method: str, 

315 url: URL, 

316 *, 

317 writer: "Optional[asyncio.Task[None]]", 

318 continue100: Optional["asyncio.Future[bool]"], 

319 timer: BaseTimerContext, 

320 request_info: RequestInfo, 

321 traces: List["Trace"], 

322 loop: asyncio.AbstractEventLoop, 

323 session: "ClientSession", 

324 ) -> None: 

325 # URL forbids subclasses, so a simple type check is enough. 

326 assert type(url) is URL 

327 

328 self.method = method 

329 

330 self._real_url = url 

331 self._url = url.with_fragment(None) if url.raw_fragment else url 

332 if writer is not None: 

333 self._writer = writer 

334 if continue100 is not None: 

335 self._continue = continue100 

336 self._request_info = request_info 

337 self._timer = timer if timer is not None else TimerNoop() 

338 self._cache: Dict[str, Any] = {} 

339 self._traces = traces 

340 self._loop = loop 

341 # Save reference to _resolve_charset, so that get_encoding() will still 

342 # work after the response has finished reading the body. 

343 # TODO: Fix session=None in tests (see ClientRequest.__init__). 

344 if session is not None: 

345 # store a reference to session #1985 

346 self._session = session 

347 self._resolve_charset = session._resolve_charset 

348 if loop.get_debug(): 

349 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

350 

351 def __reset_writer(self, _: object = None) -> None: 

352 self.__writer = None 

353 

354 @property 

355 def _writer(self) -> Optional["asyncio.Task[None]"]: 

356 """The writer task for streaming data. 

357 

358 _writer is only provided for backwards compatibility 

359 for subclasses that may need to access it. 

360 """ 

361 return self.__writer 

362 

363 @_writer.setter 

364 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 

365 """Set the writer task for streaming data.""" 

366 if self.__writer is not None: 

367 self.__writer.remove_done_callback(self.__reset_writer) 

368 self.__writer = writer 

369 if writer is None: 

370 return 

371 if writer.done(): 

372 # The writer is already done, so we can clear it immediately. 

373 self.__writer = None 

374 else: 

375 writer.add_done_callback(self.__reset_writer) 

376 

377 @property 

378 def cookies(self) -> SimpleCookie: 

379 if self._cookies is None: 

380 if self._raw_cookie_headers is not None: 

381 # Parse cookies for response.cookies (SimpleCookie for backward compatibility) 

382 cookies = SimpleCookie() 

383 # Use parse_set_cookie_headers for more lenient parsing that handles 

384 # malformed cookies better than SimpleCookie.load 

385 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers)) 

386 self._cookies = cookies 

387 else: 

388 self._cookies = SimpleCookie() 

389 return self._cookies 

390 

391 @cookies.setter 

392 def cookies(self, cookies: SimpleCookie) -> None: 

393 self._cookies = cookies 

394 # Generate raw cookie headers from the SimpleCookie 

395 if cookies: 

396 self._raw_cookie_headers = tuple( 

397 morsel.OutputString() for morsel in cookies.values() 

398 ) 

399 else: 

400 self._raw_cookie_headers = None 

401 

402 @reify 

403 def url(self) -> URL: 

404 return self._url 

405 

406 @reify 

407 def url_obj(self) -> URL: 

408 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2) 

409 return self._url 

410 

411 @reify 

412 def real_url(self) -> URL: 

413 return self._real_url 

414 

415 @reify 

416 def host(self) -> str: 

417 assert self._url.host is not None 

418 return self._url.host 

419 

420 @reify 

421 def headers(self) -> "CIMultiDictProxy[str]": 

422 return self._headers 

423 

424 @reify 

425 def raw_headers(self) -> RawHeaders: 

426 return self._raw_headers 

427 

428 @reify 

429 def request_info(self) -> RequestInfo: 

430 return self._request_info 

431 

432 @reify 

433 def content_disposition(self) -> Optional[ContentDisposition]: 

434 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

435 if raw is None: 

436 return None 

437 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

438 params = MappingProxyType(params_dct) 

439 filename = multipart.content_disposition_filename(params) 

440 return ContentDisposition(disposition_type, params, filename) 

441 

442 def __del__(self, _warnings: Any = warnings) -> None: 

443 if self._closed: 

444 return 

445 

446 if self._connection is not None: 

447 self._connection.release() 

448 self._cleanup_writer() 

449 

450 if self._loop.get_debug(): 

451 kwargs = {"source": self} 

452 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs) 

453 context = {"client_response": self, "message": "Unclosed response"} 

454 if self._source_traceback: 

455 context["source_traceback"] = self._source_traceback 

456 self._loop.call_exception_handler(context) 

457 

458 def __repr__(self) -> str: 

459 out = io.StringIO() 

460 ascii_encodable_url = str(self.url) 

461 if self.reason: 

462 ascii_encodable_reason = self.reason.encode( 

463 "ascii", "backslashreplace" 

464 ).decode("ascii") 

465 else: 

466 ascii_encodable_reason = "None" 

467 print( 

468 "<ClientResponse({}) [{} {}]>".format( 

469 ascii_encodable_url, self.status, ascii_encodable_reason 

470 ), 

471 file=out, 

472 ) 

473 print(self.headers, file=out) 

474 return out.getvalue() 

475 

476 @property 

477 def connection(self) -> Optional["Connection"]: 

478 return self._connection 

479 

480 @reify 

481 def history(self) -> Tuple["ClientResponse", ...]: 

482 """A sequence of of responses, if redirects occurred.""" 

483 return self._history 

484 

485 @reify 

486 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]": 

487 links_str = ", ".join(self.headers.getall("link", [])) 

488 

489 if not links_str: 

490 return MultiDictProxy(MultiDict()) 

491 

492 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict() 

493 

494 for val in re.split(r",(?=\s*<)", links_str): 

495 match = re.match(r"\s*<(.*)>(.*)", val) 

496 if match is None: # pragma: no cover 

497 # the check exists to suppress mypy error 

498 continue 

499 url, params_str = match.groups() 

500 params = params_str.split(";")[1:] 

501 

502 link: MultiDict[Union[str, URL]] = MultiDict() 

503 

504 for param in params: 

505 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

506 if match is None: # pragma: no cover 

507 # the check exists to suppress mypy error 

508 continue 

509 key, _, value, _ = match.groups() 

510 

511 link.add(key, value) 

512 

513 key = link.get("rel", url) 

514 

515 link.add("url", self.url.join(URL(url))) 

516 

517 links.add(str(key), MultiDictProxy(link)) 

518 

519 return MultiDictProxy(links) 

520 

521 async def start(self, connection: "Connection") -> "ClientResponse": 

522 """Start response processing.""" 

523 self._closed = False 

524 self._protocol = connection.protocol 

525 self._connection = connection 

526 

527 with self._timer: 

528 while True: 

529 # read response 

530 try: 

531 protocol = self._protocol 

532 message, payload = await protocol.read() # type: ignore[union-attr] 

533 except http.HttpProcessingError as exc: 

534 raise ClientResponseError( 

535 self.request_info, 

536 self.history, 

537 status=exc.code, 

538 message=exc.message, 

539 headers=exc.headers, 

540 ) from exc 

541 

542 if message.code < 100 or message.code > 199 or message.code == 101: 

543 break 

544 

545 if self._continue is not None: 

546 set_result(self._continue, True) 

547 self._continue = None 

548 

549 # payload eof handler 

550 payload.on_eof(self._response_eof) 

551 

552 # response status 

553 self.version = message.version 

554 self.status = message.code 

555 self.reason = message.reason 

556 

557 # headers 

558 self._headers = message.headers # type is CIMultiDictProxy 

559 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes] 

560 

561 # payload 

562 self.content = payload 

563 

564 # cookies 

565 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()): 

566 # Store raw cookie headers for CookieJar 

567 self._raw_cookie_headers = tuple(cookie_hdrs) 

568 return self 

569 

570 def _response_eof(self) -> None: 

571 if self._closed: 

572 return 

573 

574 # protocol could be None because connection could be detached 

575 protocol = self._connection and self._connection.protocol 

576 if protocol is not None and protocol.upgraded: 

577 return 

578 

579 self._closed = True 

580 self._cleanup_writer() 

581 self._release_connection() 

582 

583 @property 

584 def closed(self) -> bool: 

585 return self._closed 

586 

587 def close(self) -> None: 

588 if not self._released: 

589 self._notify_content() 

590 

591 self._closed = True 

592 if self._loop is None or self._loop.is_closed(): 

593 return 

594 

595 self._cleanup_writer() 

596 if self._connection is not None: 

597 self._connection.close() 

598 self._connection = None 

599 

600 def release(self) -> Any: 

601 if not self._released: 

602 self._notify_content() 

603 

604 self._closed = True 

605 

606 self._cleanup_writer() 

607 self._release_connection() 

608 return noop() 

609 

610 @property 

611 def ok(self) -> bool: 

612 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

613 

614 This is **not** a check for ``200 OK`` but a check that the response 

615 status is under 400. 

616 """ 

617 return 400 > self.status 

618 

619 def raise_for_status(self) -> None: 

620 if not self.ok: 

621 # reason should always be not None for a started response 

622 assert self.reason is not None 

623 

624 # If we're in a context we can rely on __aexit__() to release as the 

625 # exception propagates. 

626 if not self._in_context: 

627 self.release() 

628 

629 raise ClientResponseError( 

630 self.request_info, 

631 self.history, 

632 status=self.status, 

633 message=self.reason, 

634 headers=self.headers, 

635 ) 

636 

637 def _release_connection(self) -> None: 

638 if self._connection is not None: 

639 if self.__writer is None: 

640 self._connection.release() 

641 self._connection = None 

642 else: 

643 self.__writer.add_done_callback(lambda f: self._release_connection()) 

644 

645 async def _wait_released(self) -> None: 

646 if self.__writer is not None: 

647 try: 

648 await self.__writer 

649 except asyncio.CancelledError: 

650 if ( 

651 sys.version_info >= (3, 11) 

652 and (task := asyncio.current_task()) 

653 and task.cancelling() 

654 ): 

655 raise 

656 self._release_connection() 

657 

658 def _cleanup_writer(self) -> None: 

659 if self.__writer is not None: 

660 self.__writer.cancel() 

661 self._session = None 

662 

663 def _notify_content(self) -> None: 

664 content = self.content 

665 if content and content.exception() is None: 

666 set_exception(content, _CONNECTION_CLOSED_EXCEPTION) 

667 self._released = True 

668 

669 async def wait_for_close(self) -> None: 

670 if self.__writer is not None: 

671 try: 

672 await self.__writer 

673 except asyncio.CancelledError: 

674 if ( 

675 sys.version_info >= (3, 11) 

676 and (task := asyncio.current_task()) 

677 and task.cancelling() 

678 ): 

679 raise 

680 self.release() 

681 

682 async def read(self) -> bytes: 

683 """Read response payload.""" 

684 if self._body is None: 

685 try: 

686 self._body = await self.content.read() 

687 for trace in self._traces: 

688 await trace.send_response_chunk_received( 

689 self.method, self.url, self._body 

690 ) 

691 except BaseException: 

692 self.close() 

693 raise 

694 elif self._released: # Response explicitly released 

695 raise ClientConnectionError("Connection closed") 

696 

697 protocol = self._connection and self._connection.protocol 

698 if protocol is None or not protocol.upgraded: 

699 await self._wait_released() # Underlying connection released 

700 return self._body 

701 

702 def get_encoding(self) -> str: 

703 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

704 mimetype = helpers.parse_mimetype(ctype) 

705 

706 encoding = mimetype.parameters.get("charset") 

707 if encoding: 

708 with contextlib.suppress(LookupError, ValueError): 

709 return codecs.lookup(encoding).name 

710 

711 if mimetype.type == "application" and ( 

712 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

713 ): 

714 # RFC 7159 states that the default encoding is UTF-8. 

715 # RFC 7483 defines application/rdap+json 

716 return "utf-8" 

717 

718 if self._body is None: 

719 raise RuntimeError( 

720 "Cannot compute fallback encoding of a not yet read body" 

721 ) 

722 

723 return self._resolve_charset(self, self._body) 

724 

725 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: 

726 """Read response payload and decode.""" 

727 if self._body is None: 

728 await self.read() 

729 

730 if encoding is None: 

731 encoding = self.get_encoding() 

732 

733 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr] 

734 

735 async def json( 

736 self, 

737 *, 

738 encoding: Optional[str] = None, 

739 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

740 content_type: Optional[str] = "application/json", 

741 ) -> Any: 

742 """Read and decodes JSON response.""" 

743 if self._body is None: 

744 await self.read() 

745 

746 if content_type: 

747 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

748 if not _is_expected_content_type(ctype, content_type): 

749 raise ContentTypeError( 

750 self.request_info, 

751 self.history, 

752 status=self.status, 

753 message=( 

754 "Attempt to decode JSON with unexpected mimetype: %s" % ctype 

755 ), 

756 headers=self.headers, 

757 ) 

758 

759 stripped = self._body.strip() # type: ignore[union-attr] 

760 if not stripped: 

761 return None 

762 

763 if encoding is None: 

764 encoding = self.get_encoding() 

765 

766 return loads(stripped.decode(encoding)) 

767 

768 async def __aenter__(self) -> "ClientResponse": 

769 self._in_context = True 

770 return self 

771 

772 async def __aexit__( 

773 self, 

774 exc_type: Optional[Type[BaseException]], 

775 exc_val: Optional[BaseException], 

776 exc_tb: Optional[TracebackType], 

777 ) -> None: 

778 self._in_context = False 

779 # similar to _RequestContextManager, we do not need to check 

780 # for exceptions, response object can close connection 

781 # if state is broken 

782 self.release() 

783 await self.wait_for_close() 

784 

785 

786class ClientRequest: 

787 GET_METHODS = { 

788 hdrs.METH_GET, 

789 hdrs.METH_HEAD, 

790 hdrs.METH_OPTIONS, 

791 hdrs.METH_TRACE, 

792 } 

793 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

794 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 

795 

796 DEFAULT_HEADERS = { 

797 hdrs.ACCEPT: "*/*", 

798 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

799 } 

800 

801 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic. 

802 _body: Union[None, payload.Payload] = None 

803 auth = None 

804 response = None 

805 

806 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data 

807 

808 # These class defaults help create_autospec() work correctly. 

809 # If autospec is improved in future, maybe these can be removed. 

810 url = URL() 

811 method = "GET" 

812 

813 _continue = None # waiter future for '100 Continue' response 

814 

815 _skip_auto_headers: Optional["CIMultiDict[None]"] = None 

816 

817 # N.B. 

818 # Adding __del__ method with self._writer closing doesn't make sense 

819 # because _writer is instance method, thus it keeps a reference to self. 

820 # Until writer has finished finalizer will not be called. 

821 

822 def __init__( 

823 self, 

824 method: str, 

825 url: URL, 

826 *, 

827 params: Query = None, 

828 headers: Optional[LooseHeaders] = None, 

829 skip_auto_headers: Optional[Iterable[str]] = None, 

830 data: Any = None, 

831 cookies: Optional[LooseCookies] = None, 

832 auth: Optional[BasicAuth] = None, 

833 version: http.HttpVersion = http.HttpVersion11, 

834 compress: Union[str, bool, None] = None, 

835 chunked: Optional[bool] = None, 

836 expect100: bool = False, 

837 loop: Optional[asyncio.AbstractEventLoop] = None, 

838 response_class: Optional[Type["ClientResponse"]] = None, 

839 proxy: Optional[URL] = None, 

840 proxy_auth: Optional[BasicAuth] = None, 

841 timer: Optional[BaseTimerContext] = None, 

842 session: Optional["ClientSession"] = None, 

843 ssl: Union[SSLContext, bool, Fingerprint] = True, 

844 proxy_headers: Optional[LooseHeaders] = None, 

845 traces: Optional[List["Trace"]] = None, 

846 trust_env: bool = False, 

847 server_hostname: Optional[str] = None, 

848 ): 

849 if loop is None: 

850 loop = asyncio.get_event_loop() 

851 if match := _CONTAINS_CONTROL_CHAR_RE.search(method): 

852 raise ValueError( 

853 f"Method cannot contain non-token characters {method!r} " 

854 f"(found at least {match.group()!r})" 

855 ) 

856 # URL forbids subclasses, so a simple type check is enough. 

857 assert type(url) is URL, url 

858 if proxy is not None: 

859 assert type(proxy) is URL, proxy 

860 # FIXME: session is None in tests only, need to fix tests 

861 # assert session is not None 

862 if TYPE_CHECKING: 

863 assert session is not None 

864 self._session = session 

865 if params: 

866 url = url.extend_query(params) 

867 self.original_url = url 

868 self.url = url.with_fragment(None) if url.raw_fragment else url 

869 self.method = method.upper() 

870 self.chunked = chunked 

871 self.compress = compress 

872 self.loop = loop 

873 self.length = None 

874 if response_class is None: 

875 real_response_class = ClientResponse 

876 else: 

877 real_response_class = response_class 

878 self.response_class: Type[ClientResponse] = real_response_class 

879 self._timer = timer if timer is not None else TimerNoop() 

880 self._ssl = ssl if ssl is not None else True 

881 self.server_hostname = server_hostname 

882 

883 if loop.get_debug(): 

884 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

885 

886 self.update_version(version) 

887 self.update_host(url) 

888 self.update_headers(headers) 

889 self.update_auto_headers(skip_auto_headers) 

890 self.update_cookies(cookies) 

891 self.update_content_encoding(data) 

892 self.update_auth(auth, trust_env) 

893 self.update_proxy(proxy, proxy_auth, proxy_headers) 

894 

895 self.update_body_from_data(data) 

896 if data is not None or self.method not in self.GET_METHODS: 

897 self.update_transfer_encoding() 

898 self.update_expect_continue(expect100) 

899 self._traces = [] if traces is None else traces 

900 

901 def __reset_writer(self, _: object = None) -> None: 

902 self.__writer = None 

903 

904 def _get_content_length(self) -> Optional[int]: 

905 """Extract and validate Content-Length header value. 

906 

907 Returns parsed Content-Length value or None if not set. 

908 Raises ValueError if header exists but cannot be parsed as an integer. 

909 """ 

910 if hdrs.CONTENT_LENGTH not in self.headers: 

911 return None 

912 

913 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH] 

914 try: 

915 return int(content_length_hdr) 

916 except ValueError: 

917 raise ValueError( 

918 f"Invalid Content-Length header: {content_length_hdr}" 

919 ) from None 

920 

921 @property 

922 def skip_auto_headers(self) -> CIMultiDict[None]: 

923 return self._skip_auto_headers or CIMultiDict() 

924 

925 @property 

926 def _writer(self) -> Optional["asyncio.Task[None]"]: 

927 return self.__writer 

928 

929 @_writer.setter 

930 def _writer(self, writer: "asyncio.Task[None]") -> None: 

931 if self.__writer is not None: 

932 self.__writer.remove_done_callback(self.__reset_writer) 

933 self.__writer = writer 

934 writer.add_done_callback(self.__reset_writer) 

935 

936 def is_ssl(self) -> bool: 

937 return self.url.scheme in _SSL_SCHEMES 

938 

939 @property 

940 def ssl(self) -> Union["SSLContext", bool, Fingerprint]: 

941 return self._ssl 

942 

943 @property 

944 def connection_key(self) -> ConnectionKey: 

945 if proxy_headers := self.proxy_headers: 

946 h: Optional[int] = hash(tuple(proxy_headers.items())) 

947 else: 

948 h = None 

949 url = self.url 

950 return tuple.__new__( 

951 ConnectionKey, 

952 ( 

953 url.raw_host or "", 

954 url.port, 

955 url.scheme in _SSL_SCHEMES, 

956 self._ssl, 

957 self.proxy, 

958 self.proxy_auth, 

959 h, 

960 ), 

961 ) 

962 

963 @property 

964 def host(self) -> str: 

965 ret = self.url.raw_host 

966 assert ret is not None 

967 return ret 

968 

969 @property 

970 def port(self) -> Optional[int]: 

971 return self.url.port 

972 

973 @property 

974 def body(self) -> Union[payload.Payload, Literal[b""]]: 

975 """Request body.""" 

976 # empty body is represented as bytes for backwards compatibility 

977 return self._body or b"" 

978 

979 @body.setter 

980 def body(self, value: Any) -> None: 

981 """Set request body with warning for non-autoclose payloads. 

982 

983 WARNING: This setter must be called from within an event loop and is not 

984 thread-safe. Setting body outside of an event loop may raise RuntimeError 

985 when closing file-based payloads. 

986 

987 DEPRECATED: Direct assignment to body is deprecated and will be removed 

988 in a future version. Use await update_body() instead for proper resource 

989 management. 

990 """ 

991 # Close existing payload if present 

992 if self._body is not None: 

993 # Warn if the payload needs manual closing 

994 # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload 

995 _warn_if_unclosed_payload(self._body, stacklevel=3) 

996 # NOTE: In the future, when we remove sync close support, 

997 # this setter will need to be removed and only the async 

998 # update_body() method will be available. For now, we call 

999 # _close() for backwards compatibility. 

1000 self._body._close() 

1001 self._update_body(value) 

1002 

1003 @property 

1004 def request_info(self) -> RequestInfo: 

1005 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 

1006 # These are created on every request, so we use a NamedTuple 

1007 # for performance reasons. We don't use the RequestInfo.__new__ 

1008 # method because it has a different signature which is provided 

1009 # for backwards compatibility only. 

1010 return tuple.__new__( 

1011 RequestInfo, (self.url, self.method, headers, self.original_url) 

1012 ) 

1013 

1014 @property 

1015 def session(self) -> "ClientSession": 

1016 """Return the ClientSession instance. 

1017 

1018 This property provides access to the ClientSession that initiated 

1019 this request, allowing middleware to make additional requests 

1020 using the same session. 

1021 """ 

1022 return self._session 

1023 

1024 def update_host(self, url: URL) -> None: 

1025 """Update destination host, port and connection type (ssl).""" 

1026 # get host/port 

1027 if not url.raw_host: 

1028 raise InvalidURL(url) 

1029 

1030 # basic auth info 

1031 if url.raw_user or url.raw_password: 

1032 self.auth = helpers.BasicAuth(url.user or "", url.password or "") 

1033 

1034 def update_version(self, version: Union[http.HttpVersion, str]) -> None: 

1035 """Convert request version to two elements tuple. 

1036 

1037 parser HTTP version '1.1' => (1, 1) 

1038 """ 

1039 if isinstance(version, str): 

1040 v = [part.strip() for part in version.split(".", 1)] 

1041 try: 

1042 version = http.HttpVersion(int(v[0]), int(v[1])) 

1043 except ValueError: 

1044 raise ValueError( 

1045 f"Can not parse http version number: {version}" 

1046 ) from None 

1047 self.version = version 

1048 

1049 def update_headers(self, headers: Optional[LooseHeaders]) -> None: 

1050 """Update request headers.""" 

1051 self.headers: CIMultiDict[str] = CIMultiDict() 

1052 

1053 # Build the host header 

1054 host = self.url.host_port_subcomponent 

1055 

1056 # host_port_subcomponent is None when the URL is a relative URL. 

1057 # but we know we do not have a relative URL here. 

1058 assert host is not None 

1059 self.headers[hdrs.HOST] = host 

1060 

1061 if not headers: 

1062 return 

1063 

1064 if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 

1065 headers = headers.items() 

1066 

1067 for key, value in headers: # type: ignore[misc] 

1068 # A special case for Host header 

1069 if key in hdrs.HOST_ALL: 

1070 self.headers[key] = value 

1071 else: 

1072 self.headers.add(key, value) 

1073 

1074 def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None: 

1075 if skip_auto_headers is not None: 

1076 self._skip_auto_headers = CIMultiDict( 

1077 (hdr, None) for hdr in sorted(skip_auto_headers) 

1078 ) 

1079 used_headers = self.headers.copy() 

1080 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type] 

1081 else: 

1082 # Fast path when there are no headers to skip 

1083 # which is the most common case. 

1084 used_headers = self.headers 

1085 

1086 for hdr, val in self.DEFAULT_HEADERS.items(): 

1087 if hdr not in used_headers: 

1088 self.headers[hdr] = val 

1089 

1090 if hdrs.USER_AGENT not in used_headers: 

1091 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

1092 

1093 def update_cookies(self, cookies: Optional[LooseCookies]) -> None: 

1094 """Update request cookies header.""" 

1095 if not cookies: 

1096 return 

1097 

1098 c = SimpleCookie() 

1099 if hdrs.COOKIE in self.headers: 

1100 # parse_cookie_header for RFC 6265 compliant Cookie header parsing 

1101 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, ""))) 

1102 del self.headers[hdrs.COOKIE] 

1103 

1104 if isinstance(cookies, Mapping): 

1105 iter_cookies = cookies.items() 

1106 else: 

1107 iter_cookies = cookies # type: ignore[assignment] 

1108 for name, value in iter_cookies: 

1109 if isinstance(value, Morsel): 

1110 # Use helper to preserve coded_value exactly as sent by server 

1111 c[name] = preserve_morsel_with_coded_value(value) 

1112 else: 

1113 c[name] = value # type: ignore[assignment] 

1114 

1115 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

1116 

1117 def update_content_encoding(self, data: Any) -> None: 

1118 """Set request content encoding.""" 

1119 if not data: 

1120 # Don't compress an empty body. 

1121 self.compress = None 

1122 return 

1123 

1124 if self.headers.get(hdrs.CONTENT_ENCODING): 

1125 if self.compress: 

1126 raise ValueError( 

1127 "compress can not be set if Content-Encoding header is set" 

1128 ) 

1129 elif self.compress: 

1130 if not isinstance(self.compress, str): 

1131 self.compress = "deflate" 

1132 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

1133 self.chunked = True # enable chunked, no need to deal with length 

1134 

1135 def update_transfer_encoding(self) -> None: 

1136 """Analyze transfer-encoding header.""" 

1137 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

1138 

1139 if "chunked" in te: 

1140 if self.chunked: 

1141 raise ValueError( 

1142 "chunked can not be set " 

1143 'if "Transfer-Encoding: chunked" header is set' 

1144 ) 

1145 

1146 elif self.chunked: 

1147 if hdrs.CONTENT_LENGTH in self.headers: 

1148 raise ValueError( 

1149 "chunked can not be set if Content-Length header is set" 

1150 ) 

1151 

1152 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

1153 

1154 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None: 

1155 """Set basic auth.""" 

1156 if auth is None: 

1157 auth = self.auth 

1158 if auth is None and trust_env and self.url.host is not None: 

1159 netrc_obj = netrc_from_env() 

1160 with contextlib.suppress(LookupError): 

1161 auth = basicauth_from_netrc(netrc_obj, self.url.host) 

1162 if auth is None: 

1163 return 

1164 

1165 if not isinstance(auth, helpers.BasicAuth): 

1166 raise TypeError("BasicAuth() tuple is required instead") 

1167 

1168 self.headers[hdrs.AUTHORIZATION] = auth.encode() 

1169 

1170 def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None: 

1171 """Update request body from data.""" 

1172 if self._body is not None: 

1173 _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel) 

1174 

1175 if body is None: 

1176 self._body = None 

1177 # Set Content-Length to 0 when body is None for methods that expect a body 

1178 if ( 

1179 self.method not in self.GET_METHODS 

1180 and not self.chunked 

1181 and hdrs.CONTENT_LENGTH not in self.headers 

1182 ): 

1183 self.headers[hdrs.CONTENT_LENGTH] = "0" 

1184 return 

1185 

1186 # FormData 

1187 maybe_payload = body() if isinstance(body, FormData) else body 

1188 

1189 try: 

1190 body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None) 

1191 except payload.LookupError: 

1192 body_payload = FormData(maybe_payload)() # type: ignore[arg-type] 

1193 

1194 self._body = body_payload 

1195 # enable chunked encoding if needed 

1196 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers: 

1197 if (size := body_payload.size) is not None: 

1198 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

1199 else: 

1200 self.chunked = True 

1201 

1202 # copy payload headers 

1203 assert body_payload.headers 

1204 headers = self.headers 

1205 skip_headers = self._skip_auto_headers 

1206 for key, value in body_payload.headers.items(): 

1207 if key in headers or (skip_headers is not None and key in skip_headers): 

1208 continue 

1209 headers[key] = value 

1210 

1211 def _update_body(self, body: Any) -> None: 

1212 """Update request body after its already been set.""" 

1213 # Remove existing Content-Length header since body is changing 

1214 if hdrs.CONTENT_LENGTH in self.headers: 

1215 del self.headers[hdrs.CONTENT_LENGTH] 

1216 

1217 # Remove existing Transfer-Encoding header to avoid conflicts 

1218 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers: 

1219 del self.headers[hdrs.TRANSFER_ENCODING] 

1220 

1221 # Now update the body using the existing method 

1222 # Called from _update_body, add 1 to stacklevel from caller 

1223 self.update_body_from_data(body, _stacklevel=4) 

1224 

1225 # Update transfer encoding headers if needed (same logic as __init__) 

1226 if body is not None or self.method not in self.GET_METHODS: 

1227 self.update_transfer_encoding() 

1228 

1229 async def update_body(self, body: Any) -> None: 

1230 """ 

1231 Update request body and close previous payload if needed. 

1232 

1233 This method safely updates the request body by first closing any existing 

1234 payload to prevent resource leaks, then setting the new body. 

1235 

1236 IMPORTANT: Always use this method instead of setting request.body directly. 

1237 Direct assignment to request.body will leak resources if the previous body 

1238 contains file handles, streams, or other resources that need cleanup. 

1239 

1240 Args: 

1241 body: The new body content. Can be: 

1242 - bytes/bytearray: Raw binary data 

1243 - str: Text data (will be encoded using charset from Content-Type) 

1244 - FormData: Form data that will be encoded as multipart/form-data 

1245 - Payload: A pre-configured payload object 

1246 - AsyncIterable: An async iterable of bytes chunks 

1247 - File-like object: Will be read and sent as binary data 

1248 - None: Clears the body 

1249 

1250 Usage: 

1251 # CORRECT: Use update_body 

1252 await request.update_body(b"new request data") 

1253 

1254 # WRONG: Don't set body directly 

1255 # request.body = b"new request data" # This will leak resources! 

1256 

1257 # Update with form data 

1258 form_data = FormData() 

1259 form_data.add_field('field', 'value') 

1260 await request.update_body(form_data) 

1261 

1262 # Clear body 

1263 await request.update_body(None) 

1264 

1265 Note: 

1266 This method is async because it may need to close file handles or 

1267 other resources associated with the previous payload. Always await 

1268 this method to ensure proper cleanup. 

1269 

1270 Warning: 

1271 Setting request.body directly is highly discouraged and can lead to: 

1272 - Resource leaks (unclosed file handles, streams) 

1273 - Memory leaks (unreleased buffers) 

1274 - Unexpected behavior with streaming payloads 

1275 

1276 It is not recommended to change the payload type in middleware. If the 

1277 body was already set (e.g., as bytes), it's best to keep the same type 

1278 rather than converting it (e.g., to str) as this may result in unexpected 

1279 behavior. 

1280 

1281 See Also: 

1282 - update_body_from_data: Synchronous body update without cleanup 

1283 - body property: Direct body access (STRONGLY DISCOURAGED) 

1284 

1285 """ 

1286 # Close existing payload if it exists and needs closing 

1287 if self._body is not None: 

1288 await self._body.close() 

1289 self._update_body(body) 

1290 

1291 def update_expect_continue(self, expect: bool = False) -> None: 

1292 if expect: 

1293 self.headers[hdrs.EXPECT] = "100-continue" 

1294 elif ( 

1295 hdrs.EXPECT in self.headers 

1296 and self.headers[hdrs.EXPECT].lower() == "100-continue" 

1297 ): 

1298 expect = True 

1299 

1300 if expect: 

1301 self._continue = self.loop.create_future() 

1302 

1303 def update_proxy( 

1304 self, 

1305 proxy: Optional[URL], 

1306 proxy_auth: Optional[BasicAuth], 

1307 proxy_headers: Optional[LooseHeaders], 

1308 ) -> None: 

1309 self.proxy = proxy 

1310 if proxy is None: 

1311 self.proxy_auth = None 

1312 self.proxy_headers = None 

1313 return 

1314 

1315 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 

1316 raise ValueError("proxy_auth must be None or BasicAuth() tuple") 

1317 self.proxy_auth = proxy_auth 

1318 

1319 if proxy_headers is not None and not isinstance( 

1320 proxy_headers, (MultiDict, MultiDictProxy) 

1321 ): 

1322 proxy_headers = CIMultiDict(proxy_headers) 

1323 self.proxy_headers = proxy_headers 

1324 

1325 async def write_bytes( 

1326 self, 

1327 writer: AbstractStreamWriter, 

1328 conn: "Connection", 

1329 content_length: Optional[int], 

1330 ) -> None: 

1331 """ 

1332 Write the request body to the connection stream. 

1333 

1334 This method handles writing different types of request bodies: 

1335 1. Payload objects (using their specialized write_with_length method) 

1336 2. Bytes/bytearray objects 

1337 3. Iterable body content 

1338 

1339 Args: 

1340 writer: The stream writer to write the body to 

1341 conn: The connection being used for this request 

1342 content_length: Optional maximum number of bytes to write from the body 

1343 (None means write the entire body) 

1344 

1345 The method properly handles: 

1346 - Waiting for 100-Continue responses if required 

1347 - Content length constraints for chunked encoding 

1348 - Error handling for network issues, cancellation, and other exceptions 

1349 - Signaling EOF and timeout management 

1350 

1351 Raises: 

1352 ClientOSError: When there's an OS-level error writing the body 

1353 ClientConnectionError: When there's a general connection error 

1354 asyncio.CancelledError: When the operation is cancelled 

1355 

1356 """ 

1357 # 100 response 

1358 if self._continue is not None: 

1359 # Force headers to be sent before waiting for 100-continue 

1360 writer.send_headers() 

1361 await writer.drain() 

1362 await self._continue 

1363 

1364 protocol = conn.protocol 

1365 assert protocol is not None 

1366 try: 

1367 # This should be a rare case but the 

1368 # self._body can be set to None while 

1369 # the task is being started or we wait above 

1370 # for the 100-continue response. 

1371 # The more likely case is we have an empty 

1372 # payload, but 100-continue is still expected. 

1373 if self._body is not None: 

1374 await self._body.write_with_length(writer, content_length) 

1375 except OSError as underlying_exc: 

1376 reraised_exc = underlying_exc 

1377 

1378 # Distinguish between timeout and other OS errors for better error reporting 

1379 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance( 

1380 underlying_exc, asyncio.TimeoutError 

1381 ) 

1382 if exc_is_not_timeout: 

1383 reraised_exc = ClientOSError( 

1384 underlying_exc.errno, 

1385 f"Can not write request body for {self.url !s}", 

1386 ) 

1387 

1388 set_exception(protocol, reraised_exc, underlying_exc) 

1389 except asyncio.CancelledError: 

1390 # Body hasn't been fully sent, so connection can't be reused 

1391 conn.close() 

1392 raise 

1393 except Exception as underlying_exc: 

1394 set_exception( 

1395 protocol, 

1396 ClientConnectionError( 

1397 "Failed to send bytes into the underlying connection " 

1398 f"{conn !s}: {underlying_exc!r}", 

1399 ), 

1400 underlying_exc, 

1401 ) 

1402 else: 

1403 # Successfully wrote the body, signal EOF and start response timeout 

1404 await writer.write_eof() 

1405 protocol.start_timeout() 

1406 

1407 async def send(self, conn: "Connection") -> "ClientResponse": 

1408 # Specify request target: 

1409 # - CONNECT request must send authority form URI 

1410 # - not CONNECT proxy must send absolute form URI 

1411 # - most common is origin form URI 

1412 if self.method == hdrs.METH_CONNECT: 

1413 connect_host = self.url.host_subcomponent 

1414 assert connect_host is not None 

1415 path = f"{connect_host}:{self.url.port}" 

1416 elif self.proxy and not self.is_ssl(): 

1417 path = str(self.url) 

1418 else: 

1419 path = self.url.raw_path_qs 

1420 

1421 protocol = conn.protocol 

1422 assert protocol is not None 

1423 writer = StreamWriter( 

1424 protocol, 

1425 self.loop, 

1426 on_chunk_sent=( 

1427 functools.partial(self._on_chunk_request_sent, self.method, self.url) 

1428 if self._traces 

1429 else None 

1430 ), 

1431 on_headers_sent=( 

1432 functools.partial(self._on_headers_request_sent, self.method, self.url) 

1433 if self._traces 

1434 else None 

1435 ), 

1436 ) 

1437 

1438 if self.compress: 

1439 writer.enable_compression(self.compress) # type: ignore[arg-type] 

1440 

1441 if self.chunked is not None: 

1442 writer.enable_chunking() 

1443 

1444 # set default content-type 

1445 if ( 

1446 self.method in self.POST_METHODS 

1447 and ( 

1448 self._skip_auto_headers is None 

1449 or hdrs.CONTENT_TYPE not in self._skip_auto_headers 

1450 ) 

1451 and hdrs.CONTENT_TYPE not in self.headers 

1452 ): 

1453 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

1454 

1455 v = self.version 

1456 if hdrs.CONNECTION not in self.headers: 

1457 if conn._connector.force_close: 

1458 if v == HttpVersion11: 

1459 self.headers[hdrs.CONNECTION] = "close" 

1460 elif v == HttpVersion10: 

1461 self.headers[hdrs.CONNECTION] = "keep-alive" 

1462 

1463 # status + headers 

1464 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}" 

1465 

1466 # Buffer headers for potential coalescing with body 

1467 await writer.write_headers(status_line, self.headers) 

1468 

1469 task: Optional["asyncio.Task[None]"] 

1470 if self._body or self._continue is not None or protocol.writing_paused: 

1471 coro = self.write_bytes(writer, conn, self._get_content_length()) 

1472 if sys.version_info >= (3, 12): 

1473 # Optimization for Python 3.12, try to write 

1474 # bytes immediately to avoid having to schedule 

1475 # the task on the event loop. 

1476 task = asyncio.Task(coro, loop=self.loop, eager_start=True) 

1477 else: 

1478 task = self.loop.create_task(coro) 

1479 if task.done(): 

1480 task = None 

1481 else: 

1482 self._writer = task 

1483 else: 

1484 # We have nothing to write because 

1485 # - there is no body 

1486 # - the protocol does not have writing paused 

1487 # - we are not waiting for a 100-continue response 

1488 protocol.start_timeout() 

1489 writer.set_eof() 

1490 task = None 

1491 response_class = self.response_class 

1492 assert response_class is not None 

1493 self.response = response_class( 

1494 self.method, 

1495 self.original_url, 

1496 writer=task, 

1497 continue100=self._continue, 

1498 timer=self._timer, 

1499 request_info=self.request_info, 

1500 traces=self._traces, 

1501 loop=self.loop, 

1502 session=self._session, 

1503 ) 

1504 return self.response 

1505 

1506 async def close(self) -> None: 

1507 if self.__writer is not None: 

1508 try: 

1509 await self.__writer 

1510 except asyncio.CancelledError: 

1511 if ( 

1512 sys.version_info >= (3, 11) 

1513 and (task := asyncio.current_task()) 

1514 and task.cancelling() 

1515 ): 

1516 raise 

1517 

1518 def terminate(self) -> None: 

1519 if self.__writer is not None: 

1520 if not self.loop.is_closed(): 

1521 self.__writer.cancel() 

1522 self.__writer.remove_done_callback(self.__reset_writer) 

1523 self.__writer = None 

1524 

1525 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

1526 for trace in self._traces: 

1527 await trace.send_request_chunk_sent(method, url, chunk) 

1528 

1529 async def _on_headers_request_sent( 

1530 self, method: str, url: URL, headers: "CIMultiDict[str]" 

1531 ) -> None: 

1532 for trace in self._traces: 

1533 await trace.send_request_headers(method, url, headers)