Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/client_reqrep.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

752 statements  

1import asyncio 

2import codecs 

3import contextlib 

4import functools 

5import io 

6import re 

7import sys 

8import traceback 

9import warnings 

10from collections.abc import Mapping 

11from hashlib import md5, sha1, sha256 

12from http.cookies import Morsel, SimpleCookie 

13from types import MappingProxyType, TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Callable, 

18 Dict, 

19 Iterable, 

20 List, 

21 Literal, 

22 NamedTuple, 

23 Optional, 

24 Tuple, 

25 Type, 

26 Union, 

27) 

28 

29import attr 

30from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

31from yarl import URL 

32 

33from . import hdrs, helpers, http, multipart, payload 

34from ._cookie_helpers import ( 

35 parse_cookie_header, 

36 parse_set_cookie_headers, 

37 preserve_morsel_with_coded_value, 

38) 

39from .abc import AbstractStreamWriter 

40from .client_exceptions import ( 

41 ClientConnectionError, 

42 ClientOSError, 

43 ClientResponseError, 

44 ContentTypeError, 

45 InvalidURL, 

46 ServerFingerprintMismatch, 

47) 

48from .compression_utils import HAS_BROTLI, HAS_ZSTD 

49from .formdata import FormData 

50from .helpers import ( 

51 _SENTINEL, 

52 BaseTimerContext, 

53 BasicAuth, 

54 HeadersMixin, 

55 TimerNoop, 

56 noop, 

57 reify, 

58 sentinel, 

59 set_exception, 

60 set_result, 

61) 

62from .http import ( 

63 SERVER_SOFTWARE, 

64 HttpVersion, 

65 HttpVersion10, 

66 HttpVersion11, 

67 StreamWriter, 

68) 

69from .streams import StreamReader 

70from .typedefs import ( 

71 DEFAULT_JSON_DECODER, 

72 JSONDecoder, 

73 LooseCookies, 

74 LooseHeaders, 

75 Query, 

76 RawHeaders, 

77) 

78 

79if TYPE_CHECKING: 

80 import ssl 

81 from ssl import SSLContext 

82else: 

83 try: 

84 import ssl 

85 from ssl import SSLContext 

86 except ImportError: # pragma: no cover 

87 ssl = None # type: ignore[assignment] 

88 SSLContext = object # type: ignore[misc,assignment] 

89 

90 

91__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

92 

93 

94if TYPE_CHECKING: 

95 from .client import ClientSession 

96 from .connector import Connection 

97 from .tracing import Trace 

98 

99 

100_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 

101_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

102json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json") 

103 

104 

105def _gen_default_accept_encoding() -> str: 

106 encodings = [ 

107 "gzip", 

108 "deflate", 

109 ] 

110 if HAS_BROTLI: 

111 encodings.append("br") 

112 if HAS_ZSTD: 

113 encodings.append("zstd") 

114 return ", ".join(encodings) 

115 

116 

117@attr.s(auto_attribs=True, frozen=True, slots=True) 

118class ContentDisposition: 

119 type: Optional[str] 

120 parameters: "MappingProxyType[str, str]" 

121 filename: Optional[str] 

122 

123 

124class _RequestInfo(NamedTuple): 

125 url: URL 

126 method: str 

127 headers: "CIMultiDictProxy[str]" 

128 real_url: URL 

129 

130 

131class RequestInfo(_RequestInfo): 

132 

133 def __new__( 

134 cls, 

135 url: URL, 

136 method: str, 

137 headers: "CIMultiDictProxy[str]", 

138 real_url: Union[URL, _SENTINEL] = sentinel, 

139 ) -> "RequestInfo": 

140 """Create a new RequestInfo instance. 

141 

142 For backwards compatibility, the real_url parameter is optional. 

143 """ 

144 return tuple.__new__( 

145 cls, (url, method, headers, url if real_url is sentinel else real_url) 

146 ) 

147 

148 

149class Fingerprint: 

150 HASHFUNC_BY_DIGESTLEN = { 

151 16: md5, 

152 20: sha1, 

153 32: sha256, 

154 } 

155 

156 def __init__(self, fingerprint: bytes) -> None: 

157 digestlen = len(fingerprint) 

158 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

159 if not hashfunc: 

160 raise ValueError("fingerprint has invalid length") 

161 elif hashfunc is md5 or hashfunc is sha1: 

162 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.") 

163 self._hashfunc = hashfunc 

164 self._fingerprint = fingerprint 

165 

166 @property 

167 def fingerprint(self) -> bytes: 

168 return self._fingerprint 

169 

170 def check(self, transport: asyncio.Transport) -> None: 

171 if not transport.get_extra_info("sslcontext"): 

172 return 

173 sslobj = transport.get_extra_info("ssl_object") 

174 cert = sslobj.getpeercert(binary_form=True) 

175 got = self._hashfunc(cert).digest() 

176 if got != self._fingerprint: 

177 host, port, *_ = transport.get_extra_info("peername") 

178 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

179 

180 

181if ssl is not None: 

182 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None)) 

183else: # pragma: no cover 

184 SSL_ALLOWED_TYPES = (bool, type(None)) 

185 

186 

187def _merge_ssl_params( 

188 ssl: Union["SSLContext", bool, Fingerprint], 

189 verify_ssl: Optional[bool], 

190 ssl_context: Optional["SSLContext"], 

191 fingerprint: Optional[bytes], 

192) -> Union["SSLContext", bool, Fingerprint]: 

193 if ssl is None: 

194 ssl = True # Double check for backwards compatibility 

195 if verify_ssl is not None and not verify_ssl: 

196 warnings.warn( 

197 "verify_ssl is deprecated, use ssl=False instead", 

198 DeprecationWarning, 

199 stacklevel=3, 

200 ) 

201 if ssl is not True: 

202 raise ValueError( 

203 "verify_ssl, ssl_context, fingerprint and ssl " 

204 "parameters are mutually exclusive" 

205 ) 

206 else: 

207 ssl = False 

208 if ssl_context is not None: 

209 warnings.warn( 

210 "ssl_context is deprecated, use ssl=context instead", 

211 DeprecationWarning, 

212 stacklevel=3, 

213 ) 

214 if ssl is not True: 

215 raise ValueError( 

216 "verify_ssl, ssl_context, fingerprint and ssl " 

217 "parameters are mutually exclusive" 

218 ) 

219 else: 

220 ssl = ssl_context 

221 if fingerprint is not None: 

222 warnings.warn( 

223 "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead", 

224 DeprecationWarning, 

225 stacklevel=3, 

226 ) 

227 if ssl is not True: 

228 raise ValueError( 

229 "verify_ssl, ssl_context, fingerprint and ssl " 

230 "parameters are mutually exclusive" 

231 ) 

232 else: 

233 ssl = Fingerprint(fingerprint) 

234 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

235 raise TypeError( 

236 "ssl should be SSLContext, bool, Fingerprint or None, " 

237 "got {!r} instead.".format(ssl) 

238 ) 

239 return ssl 

240 

241 

242_SSL_SCHEMES = frozenset(("https", "wss")) 

243 

244 

245# ConnectionKey is a NamedTuple because it is used as a key in a dict 

246# and a set in the connector. Since a NamedTuple is a tuple it uses 

247# the fast native tuple __hash__ and __eq__ implementation in CPython. 

248class ConnectionKey(NamedTuple): 

249 # the key should contain an information about used proxy / TLS 

250 # to prevent reusing wrong connections from a pool 

251 host: str 

252 port: Optional[int] 

253 is_ssl: bool 

254 ssl: Union[SSLContext, bool, Fingerprint] 

255 proxy: Optional[URL] 

256 proxy_auth: Optional[BasicAuth] 

257 proxy_headers_hash: Optional[int] # hash(CIMultiDict) 

258 

259 

260def _is_expected_content_type( 

261 response_content_type: str, expected_content_type: str 

262) -> bool: 

263 if expected_content_type == "application/json": 

264 return json_re.match(response_content_type) is not None 

265 return expected_content_type in response_content_type 

266 

267 

268def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None: 

269 """Warn if the payload is not closed. 

270 

271 Callers must check that the body is a Payload before calling this method. 

272 

273 Args: 

274 payload: The payload to check 

275 stacklevel: Stack level for the warning (default 2 for direct callers) 

276 """ 

277 if not payload.autoclose and not payload.consumed: 

278 warnings.warn( 

279 "The previous request body contains unclosed resources. " 

280 "Use await request.update_body() instead of setting request.body " 

281 "directly to properly close resources and avoid leaks.", 

282 ResourceWarning, 

283 stacklevel=stacklevel, 

284 ) 

285 

286 

287class ClientResponse(HeadersMixin): 

288 

289 # Some of these attributes are None when created, 

290 # but will be set by the start() method. 

291 # As the end user will likely never see the None values, we cheat the types below. 

292 # from the Status-Line of the response 

293 version: Optional[HttpVersion] = None # HTTP-Version 

294 status: int = None # type: ignore[assignment] # Status-Code 

295 reason: Optional[str] = None # Reason-Phrase 

296 

297 content: StreamReader = None # type: ignore[assignment] # Payload stream 

298 _body: Optional[bytes] = None 

299 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment] 

300 _history: Tuple["ClientResponse", ...] = () 

301 _raw_headers: RawHeaders = None # type: ignore[assignment] 

302 

303 _connection: Optional["Connection"] = None # current connection 

304 _cookies: Optional[SimpleCookie] = None 

305 _raw_cookie_headers: Optional[Tuple[str, ...]] = None 

306 _continue: Optional["asyncio.Future[bool]"] = None 

307 _source_traceback: Optional[traceback.StackSummary] = None 

308 _session: Optional["ClientSession"] = None 

309 # set up by ClientRequest after ClientResponse object creation 

310 # post-init stage allows to not change ctor signature 

311 _closed = True # to allow __del__ for non-initialized properly response 

312 _released = False 

313 _in_context = False 

314 

315 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8" 

316 

317 __writer: Optional["asyncio.Task[None]"] = None 

318 

319 def __init__( 

320 self, 

321 method: str, 

322 url: URL, 

323 *, 

324 writer: "Optional[asyncio.Task[None]]", 

325 continue100: Optional["asyncio.Future[bool]"], 

326 timer: BaseTimerContext, 

327 request_info: RequestInfo, 

328 traces: List["Trace"], 

329 loop: asyncio.AbstractEventLoop, 

330 session: "ClientSession", 

331 ) -> None: 

332 # URL forbids subclasses, so a simple type check is enough. 

333 assert type(url) is URL 

334 

335 self.method = method 

336 

337 self._real_url = url 

338 self._url = url.with_fragment(None) if url.raw_fragment else url 

339 if writer is not None: 

340 self._writer = writer 

341 if continue100 is not None: 

342 self._continue = continue100 

343 self._request_info = request_info 

344 self._timer = timer if timer is not None else TimerNoop() 

345 self._cache: Dict[str, Any] = {} 

346 self._traces = traces 

347 self._loop = loop 

348 # Save reference to _resolve_charset, so that get_encoding() will still 

349 # work after the response has finished reading the body. 

350 # TODO: Fix session=None in tests (see ClientRequest.__init__). 

351 if session is not None: 

352 # store a reference to session #1985 

353 self._session = session 

354 self._resolve_charset = session._resolve_charset 

355 if loop.get_debug(): 

356 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

357 

358 def __reset_writer(self, _: object = None) -> None: 

359 self.__writer = None 

360 

361 @property 

362 def _writer(self) -> Optional["asyncio.Task[None]"]: 

363 """The writer task for streaming data. 

364 

365 _writer is only provided for backwards compatibility 

366 for subclasses that may need to access it. 

367 """ 

368 return self.__writer 

369 

370 @_writer.setter 

371 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 

372 """Set the writer task for streaming data.""" 

373 if self.__writer is not None: 

374 self.__writer.remove_done_callback(self.__reset_writer) 

375 self.__writer = writer 

376 if writer is None: 

377 return 

378 if writer.done(): 

379 # The writer is already done, so we can clear it immediately. 

380 self.__writer = None 

381 else: 

382 writer.add_done_callback(self.__reset_writer) 

383 

384 @property 

385 def cookies(self) -> SimpleCookie: 

386 if self._cookies is None: 

387 if self._raw_cookie_headers is not None: 

388 # Parse cookies for response.cookies (SimpleCookie for backward compatibility) 

389 cookies = SimpleCookie() 

390 # Use parse_set_cookie_headers for more lenient parsing that handles 

391 # malformed cookies better than SimpleCookie.load 

392 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers)) 

393 self._cookies = cookies 

394 else: 

395 self._cookies = SimpleCookie() 

396 return self._cookies 

397 

398 @cookies.setter 

399 def cookies(self, cookies: SimpleCookie) -> None: 

400 self._cookies = cookies 

401 # Generate raw cookie headers from the SimpleCookie 

402 if cookies: 

403 self._raw_cookie_headers = tuple( 

404 morsel.OutputString() for morsel in cookies.values() 

405 ) 

406 else: 

407 self._raw_cookie_headers = None 

408 

409 @reify 

410 def url(self) -> URL: 

411 return self._url 

412 

413 @reify 

414 def url_obj(self) -> URL: 

415 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2) 

416 return self._url 

417 

418 @reify 

419 def real_url(self) -> URL: 

420 return self._real_url 

421 

422 @reify 

423 def host(self) -> str: 

424 assert self._url.host is not None 

425 return self._url.host 

426 

427 @reify 

428 def headers(self) -> "CIMultiDictProxy[str]": 

429 return self._headers 

430 

431 @reify 

432 def raw_headers(self) -> RawHeaders: 

433 return self._raw_headers 

434 

435 @reify 

436 def request_info(self) -> RequestInfo: 

437 return self._request_info 

438 

439 @reify 

440 def content_disposition(self) -> Optional[ContentDisposition]: 

441 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

442 if raw is None: 

443 return None 

444 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

445 params = MappingProxyType(params_dct) 

446 filename = multipart.content_disposition_filename(params) 

447 return ContentDisposition(disposition_type, params, filename) 

448 

449 def __del__(self, _warnings: Any = warnings) -> None: 

450 if self._closed: 

451 return 

452 

453 if self._connection is not None: 

454 self._connection.release() 

455 self._cleanup_writer() 

456 

457 if self._loop.get_debug(): 

458 kwargs = {"source": self} 

459 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs) 

460 context = {"client_response": self, "message": "Unclosed response"} 

461 if self._source_traceback: 

462 context["source_traceback"] = self._source_traceback 

463 self._loop.call_exception_handler(context) 

464 

465 def __repr__(self) -> str: 

466 out = io.StringIO() 

467 ascii_encodable_url = str(self.url) 

468 if self.reason: 

469 ascii_encodable_reason = self.reason.encode( 

470 "ascii", "backslashreplace" 

471 ).decode("ascii") 

472 else: 

473 ascii_encodable_reason = "None" 

474 print( 

475 "<ClientResponse({}) [{} {}]>".format( 

476 ascii_encodable_url, self.status, ascii_encodable_reason 

477 ), 

478 file=out, 

479 ) 

480 print(self.headers, file=out) 

481 return out.getvalue() 

482 

483 @property 

484 def connection(self) -> Optional["Connection"]: 

485 return self._connection 

486 

487 @reify 

488 def history(self) -> Tuple["ClientResponse", ...]: 

489 """A sequence of of responses, if redirects occurred.""" 

490 return self._history 

491 

492 @reify 

493 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]": 

494 links_str = ", ".join(self.headers.getall("link", [])) 

495 

496 if not links_str: 

497 return MultiDictProxy(MultiDict()) 

498 

499 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict() 

500 

501 for val in re.split(r",(?=\s*<)", links_str): 

502 match = re.match(r"\s*<(.*)>(.*)", val) 

503 if match is None: # pragma: no cover 

504 # the check exists to suppress mypy error 

505 continue 

506 url, params_str = match.groups() 

507 params = params_str.split(";")[1:] 

508 

509 link: MultiDict[Union[str, URL]] = MultiDict() 

510 

511 for param in params: 

512 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

513 if match is None: # pragma: no cover 

514 # the check exists to suppress mypy error 

515 continue 

516 key, _, value, _ = match.groups() 

517 

518 link.add(key, value) 

519 

520 key = link.get("rel", url) 

521 

522 link.add("url", self.url.join(URL(url))) 

523 

524 links.add(str(key), MultiDictProxy(link)) 

525 

526 return MultiDictProxy(links) 

527 

528 async def start(self, connection: "Connection") -> "ClientResponse": 

529 """Start response processing.""" 

530 self._closed = False 

531 self._protocol = connection.protocol 

532 self._connection = connection 

533 

534 with self._timer: 

535 while True: 

536 # read response 

537 try: 

538 protocol = self._protocol 

539 message, payload = await protocol.read() # type: ignore[union-attr] 

540 except http.HttpProcessingError as exc: 

541 raise ClientResponseError( 

542 self.request_info, 

543 self.history, 

544 status=exc.code, 

545 message=exc.message, 

546 headers=exc.headers, 

547 ) from exc 

548 

549 if message.code < 100 or message.code > 199 or message.code == 101: 

550 break 

551 

552 if self._continue is not None: 

553 set_result(self._continue, True) 

554 self._continue = None 

555 

556 # payload eof handler 

557 payload.on_eof(self._response_eof) 

558 

559 # response status 

560 self.version = message.version 

561 self.status = message.code 

562 self.reason = message.reason 

563 

564 # headers 

565 self._headers = message.headers # type is CIMultiDictProxy 

566 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes] 

567 

568 # payload 

569 self.content = payload 

570 

571 # cookies 

572 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()): 

573 # Store raw cookie headers for CookieJar 

574 self._raw_cookie_headers = tuple(cookie_hdrs) 

575 return self 

576 

577 def _response_eof(self) -> None: 

578 if self._closed: 

579 return 

580 

581 # protocol could be None because connection could be detached 

582 protocol = self._connection and self._connection.protocol 

583 if protocol is not None and protocol.upgraded: 

584 return 

585 

586 self._closed = True 

587 self._cleanup_writer() 

588 self._release_connection() 

589 

590 @property 

591 def closed(self) -> bool: 

592 return self._closed 

593 

594 def close(self) -> None: 

595 if not self._released: 

596 self._notify_content() 

597 

598 self._closed = True 

599 if self._loop is None or self._loop.is_closed(): 

600 return 

601 

602 self._cleanup_writer() 

603 if self._connection is not None: 

604 self._connection.close() 

605 self._connection = None 

606 

607 def release(self) -> Any: 

608 if not self._released: 

609 self._notify_content() 

610 

611 self._closed = True 

612 

613 self._cleanup_writer() 

614 self._release_connection() 

615 return noop() 

616 

617 @property 

618 def ok(self) -> bool: 

619 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

620 

621 This is **not** a check for ``200 OK`` but a check that the response 

622 status is under 400. 

623 """ 

624 return 400 > self.status 

625 

626 def raise_for_status(self) -> None: 

627 if not self.ok: 

628 # reason should always be not None for a started response 

629 assert self.reason is not None 

630 

631 # If we're in a context we can rely on __aexit__() to release as the 

632 # exception propagates. 

633 if not self._in_context: 

634 self.release() 

635 

636 raise ClientResponseError( 

637 self.request_info, 

638 self.history, 

639 status=self.status, 

640 message=self.reason, 

641 headers=self.headers, 

642 ) 

643 

644 def _release_connection(self) -> None: 

645 if self._connection is not None: 

646 if self.__writer is None: 

647 self._connection.release() 

648 self._connection = None 

649 else: 

650 self.__writer.add_done_callback(lambda f: self._release_connection()) 

651 

652 async def _wait_released(self) -> None: 

653 if self.__writer is not None: 

654 try: 

655 await self.__writer 

656 except asyncio.CancelledError: 

657 if ( 

658 sys.version_info >= (3, 11) 

659 and (task := asyncio.current_task()) 

660 and task.cancelling() 

661 ): 

662 raise 

663 self._release_connection() 

664 

665 def _cleanup_writer(self) -> None: 

666 if self.__writer is not None: 

667 self.__writer.cancel() 

668 self._session = None 

669 

670 def _notify_content(self) -> None: 

671 content = self.content 

672 if content and content.exception() is None: 

673 set_exception(content, _CONNECTION_CLOSED_EXCEPTION) 

674 self._released = True 

675 

676 async def wait_for_close(self) -> None: 

677 if self.__writer is not None: 

678 try: 

679 await self.__writer 

680 except asyncio.CancelledError: 

681 if ( 

682 sys.version_info >= (3, 11) 

683 and (task := asyncio.current_task()) 

684 and task.cancelling() 

685 ): 

686 raise 

687 self.release() 

688 

689 async def read(self) -> bytes: 

690 """Read response payload.""" 

691 if self._body is None: 

692 try: 

693 self._body = await self.content.read() 

694 for trace in self._traces: 

695 await trace.send_response_chunk_received( 

696 self.method, self.url, self._body 

697 ) 

698 except BaseException: 

699 self.close() 

700 raise 

701 elif self._released: # Response explicitly released 

702 raise ClientConnectionError("Connection closed") 

703 

704 protocol = self._connection and self._connection.protocol 

705 if protocol is None or not protocol.upgraded: 

706 await self._wait_released() # Underlying connection released 

707 return self._body 

708 

709 def get_encoding(self) -> str: 

710 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

711 mimetype = helpers.parse_mimetype(ctype) 

712 

713 encoding = mimetype.parameters.get("charset") 

714 if encoding: 

715 with contextlib.suppress(LookupError, ValueError): 

716 return codecs.lookup(encoding).name 

717 

718 if mimetype.type == "application" and ( 

719 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

720 ): 

721 # RFC 7159 states that the default encoding is UTF-8. 

722 # RFC 7483 defines application/rdap+json 

723 return "utf-8" 

724 

725 if self._body is None: 

726 raise RuntimeError( 

727 "Cannot compute fallback encoding of a not yet read body" 

728 ) 

729 

730 return self._resolve_charset(self, self._body) 

731 

732 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str: 

733 """Read response payload and decode.""" 

734 if self._body is None: 

735 await self.read() 

736 

737 if encoding is None: 

738 encoding = self.get_encoding() 

739 

740 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr] 

741 

742 async def json( 

743 self, 

744 *, 

745 encoding: Optional[str] = None, 

746 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

747 content_type: Optional[str] = "application/json", 

748 ) -> Any: 

749 """Read and decodes JSON response.""" 

750 if self._body is None: 

751 await self.read() 

752 

753 if content_type: 

754 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

755 if not _is_expected_content_type(ctype, content_type): 

756 raise ContentTypeError( 

757 self.request_info, 

758 self.history, 

759 status=self.status, 

760 message=( 

761 "Attempt to decode JSON with unexpected mimetype: %s" % ctype 

762 ), 

763 headers=self.headers, 

764 ) 

765 

766 stripped = self._body.strip() # type: ignore[union-attr] 

767 if not stripped: 

768 return None 

769 

770 if encoding is None: 

771 encoding = self.get_encoding() 

772 

773 return loads(stripped.decode(encoding)) 

774 

775 async def __aenter__(self) -> "ClientResponse": 

776 self._in_context = True 

777 return self 

778 

779 async def __aexit__( 

780 self, 

781 exc_type: Optional[Type[BaseException]], 

782 exc_val: Optional[BaseException], 

783 exc_tb: Optional[TracebackType], 

784 ) -> None: 

785 self._in_context = False 

786 # similar to _RequestContextManager, we do not need to check 

787 # for exceptions, response object can close connection 

788 # if state is broken 

789 self.release() 

790 await self.wait_for_close() 

791 

792 

793class ClientRequest: 

794 GET_METHODS = { 

795 hdrs.METH_GET, 

796 hdrs.METH_HEAD, 

797 hdrs.METH_OPTIONS, 

798 hdrs.METH_TRACE, 

799 } 

800 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

801 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 

802 

803 DEFAULT_HEADERS = { 

804 hdrs.ACCEPT: "*/*", 

805 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

806 } 

807 

808 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic. 

809 _body: Union[None, payload.Payload] = None 

810 auth = None 

811 response = None 

812 

813 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data 

814 

815 # These class defaults help create_autospec() work correctly. 

816 # If autospec is improved in future, maybe these can be removed. 

817 url = URL() 

818 method = "GET" 

819 

820 _continue = None # waiter future for '100 Continue' response 

821 

822 _skip_auto_headers: Optional["CIMultiDict[None]"] = None 

823 

824 # N.B. 

825 # Adding __del__ method with self._writer closing doesn't make sense 

826 # because _writer is instance method, thus it keeps a reference to self. 

827 # Until writer has finished finalizer will not be called. 

828 

829 def __init__( 

830 self, 

831 method: str, 

832 url: URL, 

833 *, 

834 params: Query = None, 

835 headers: Optional[LooseHeaders] = None, 

836 skip_auto_headers: Optional[Iterable[str]] = None, 

837 data: Any = None, 

838 cookies: Optional[LooseCookies] = None, 

839 auth: Optional[BasicAuth] = None, 

840 version: http.HttpVersion = http.HttpVersion11, 

841 compress: Union[str, bool, None] = None, 

842 chunked: Optional[bool] = None, 

843 expect100: bool = False, 

844 loop: Optional[asyncio.AbstractEventLoop] = None, 

845 response_class: Optional[Type["ClientResponse"]] = None, 

846 proxy: Optional[URL] = None, 

847 proxy_auth: Optional[BasicAuth] = None, 

848 timer: Optional[BaseTimerContext] = None, 

849 session: Optional["ClientSession"] = None, 

850 ssl: Union[SSLContext, bool, Fingerprint] = True, 

851 proxy_headers: Optional[LooseHeaders] = None, 

852 traces: Optional[List["Trace"]] = None, 

853 trust_env: bool = False, 

854 server_hostname: Optional[str] = None, 

855 ): 

856 if loop is None: 

857 loop = asyncio.get_event_loop() 

858 if match := _CONTAINS_CONTROL_CHAR_RE.search(method): 

859 raise ValueError( 

860 f"Method cannot contain non-token characters {method!r} " 

861 f"(found at least {match.group()!r})" 

862 ) 

863 # URL forbids subclasses, so a simple type check is enough. 

864 assert type(url) is URL, url 

865 if proxy is not None: 

866 assert type(proxy) is URL, proxy 

867 # FIXME: session is None in tests only, need to fix tests 

868 # assert session is not None 

869 if TYPE_CHECKING: 

870 assert session is not None 

871 self._session = session 

872 if params: 

873 url = url.extend_query(params) 

874 self.original_url = url 

875 self.url = url.with_fragment(None) if url.raw_fragment else url 

876 self.method = method.upper() 

877 self.chunked = chunked 

878 self.compress = compress 

879 self.loop = loop 

880 self.length = None 

881 if response_class is None: 

882 real_response_class = ClientResponse 

883 else: 

884 real_response_class = response_class 

885 self.response_class: Type[ClientResponse] = real_response_class 

886 self._timer = timer if timer is not None else TimerNoop() 

887 self._ssl = ssl if ssl is not None else True 

888 self.server_hostname = server_hostname 

889 

890 if loop.get_debug(): 

891 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

892 

893 self.update_version(version) 

894 self.update_host(url) 

895 self.update_headers(headers) 

896 self.update_auto_headers(skip_auto_headers) 

897 self.update_cookies(cookies) 

898 self.update_content_encoding(data) 

899 self.update_auth(auth, trust_env) 

900 self.update_proxy(proxy, proxy_auth, proxy_headers) 

901 

902 self.update_body_from_data(data) 

903 if data is not None or self.method not in self.GET_METHODS: 

904 self.update_transfer_encoding() 

905 self.update_expect_continue(expect100) 

906 self._traces = [] if traces is None else traces 

907 

908 def __reset_writer(self, _: object = None) -> None: 

909 self.__writer = None 

910 

911 def _get_content_length(self) -> Optional[int]: 

912 """Extract and validate Content-Length header value. 

913 

914 Returns parsed Content-Length value or None if not set. 

915 Raises ValueError if header exists but cannot be parsed as an integer. 

916 """ 

917 if hdrs.CONTENT_LENGTH not in self.headers: 

918 return None 

919 

920 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH] 

921 try: 

922 return int(content_length_hdr) 

923 except ValueError: 

924 raise ValueError( 

925 f"Invalid Content-Length header: {content_length_hdr}" 

926 ) from None 

927 

928 @property 

929 def skip_auto_headers(self) -> CIMultiDict[None]: 

930 return self._skip_auto_headers or CIMultiDict() 

931 

932 @property 

933 def _writer(self) -> Optional["asyncio.Task[None]"]: 

934 return self.__writer 

935 

936 @_writer.setter 

937 def _writer(self, writer: "asyncio.Task[None]") -> None: 

938 if self.__writer is not None: 

939 self.__writer.remove_done_callback(self.__reset_writer) 

940 self.__writer = writer 

941 writer.add_done_callback(self.__reset_writer) 

942 

943 def is_ssl(self) -> bool: 

944 return self.url.scheme in _SSL_SCHEMES 

945 

946 @property 

947 def ssl(self) -> Union["SSLContext", bool, Fingerprint]: 

948 return self._ssl 

949 

950 @property 

951 def connection_key(self) -> ConnectionKey: 

952 if proxy_headers := self.proxy_headers: 

953 h: Optional[int] = hash(tuple(proxy_headers.items())) 

954 else: 

955 h = None 

956 url = self.url 

957 return tuple.__new__( 

958 ConnectionKey, 

959 ( 

960 url.raw_host or "", 

961 url.port, 

962 url.scheme in _SSL_SCHEMES, 

963 self._ssl, 

964 self.proxy, 

965 self.proxy_auth, 

966 h, 

967 ), 

968 ) 

969 

970 @property 

971 def host(self) -> str: 

972 ret = self.url.raw_host 

973 assert ret is not None 

974 return ret 

975 

976 @property 

977 def port(self) -> Optional[int]: 

978 return self.url.port 

979 

980 @property 

981 def body(self) -> Union[payload.Payload, Literal[b""]]: 

982 """Request body.""" 

983 # empty body is represented as bytes for backwards compatibility 

984 return self._body or b"" 

985 

986 @body.setter 

987 def body(self, value: Any) -> None: 

988 """Set request body with warning for non-autoclose payloads. 

989 

990 WARNING: This setter must be called from within an event loop and is not 

991 thread-safe. Setting body outside of an event loop may raise RuntimeError 

992 when closing file-based payloads. 

993 

994 DEPRECATED: Direct assignment to body is deprecated and will be removed 

995 in a future version. Use await update_body() instead for proper resource 

996 management. 

997 """ 

998 # Close existing payload if present 

999 if self._body is not None: 

1000 # Warn if the payload needs manual closing 

1001 # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload 

1002 _warn_if_unclosed_payload(self._body, stacklevel=3) 

1003 # NOTE: In the future, when we remove sync close support, 

1004 # this setter will need to be removed and only the async 

1005 # update_body() method will be available. For now, we call 

1006 # _close() for backwards compatibility. 

1007 self._body._close() 

1008 self._update_body(value) 

1009 

1010 @property 

1011 def request_info(self) -> RequestInfo: 

1012 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 

1013 # These are created on every request, so we use a NamedTuple 

1014 # for performance reasons. We don't use the RequestInfo.__new__ 

1015 # method because it has a different signature which is provided 

1016 # for backwards compatibility only. 

1017 return tuple.__new__( 

1018 RequestInfo, (self.url, self.method, headers, self.original_url) 

1019 ) 

1020 

1021 @property 

1022 def session(self) -> "ClientSession": 

1023 """Return the ClientSession instance. 

1024 

1025 This property provides access to the ClientSession that initiated 

1026 this request, allowing middleware to make additional requests 

1027 using the same session. 

1028 """ 

1029 return self._session 

1030 

1031 def update_host(self, url: URL) -> None: 

1032 """Update destination host, port and connection type (ssl).""" 

1033 # get host/port 

1034 if not url.raw_host: 

1035 raise InvalidURL(url) 

1036 

1037 # basic auth info 

1038 if url.raw_user or url.raw_password: 

1039 self.auth = helpers.BasicAuth(url.user or "", url.password or "") 

1040 

1041 def update_version(self, version: Union[http.HttpVersion, str]) -> None: 

1042 """Convert request version to two elements tuple. 

1043 

1044 parser HTTP version '1.1' => (1, 1) 

1045 """ 

1046 if isinstance(version, str): 

1047 v = [part.strip() for part in version.split(".", 1)] 

1048 try: 

1049 version = http.HttpVersion(int(v[0]), int(v[1])) 

1050 except ValueError: 

1051 raise ValueError( 

1052 f"Can not parse http version number: {version}" 

1053 ) from None 

1054 self.version = version 

1055 

1056 def update_headers(self, headers: Optional[LooseHeaders]) -> None: 

1057 """Update request headers.""" 

1058 self.headers: CIMultiDict[str] = CIMultiDict() 

1059 

1060 # Build the host header 

1061 host = self.url.host_port_subcomponent 

1062 

1063 # host_port_subcomponent is None when the URL is a relative URL. 

1064 # but we know we do not have a relative URL here. 

1065 assert host is not None 

1066 self.headers[hdrs.HOST] = host 

1067 

1068 if not headers: 

1069 return 

1070 

1071 if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 

1072 headers = headers.items() 

1073 

1074 for key, value in headers: # type: ignore[misc] 

1075 # A special case for Host header 

1076 if key in hdrs.HOST_ALL: 

1077 self.headers[key] = value 

1078 else: 

1079 self.headers.add(key, value) 

1080 

1081 def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None: 

1082 if skip_auto_headers is not None: 

1083 self._skip_auto_headers = CIMultiDict( 

1084 (hdr, None) for hdr in sorted(skip_auto_headers) 

1085 ) 

1086 used_headers = self.headers.copy() 

1087 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type] 

1088 else: 

1089 # Fast path when there are no headers to skip 

1090 # which is the most common case. 

1091 used_headers = self.headers 

1092 

1093 for hdr, val in self.DEFAULT_HEADERS.items(): 

1094 if hdr not in used_headers: 

1095 self.headers[hdr] = val 

1096 

1097 if hdrs.USER_AGENT not in used_headers: 

1098 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

1099 

1100 def update_cookies(self, cookies: Optional[LooseCookies]) -> None: 

1101 """Update request cookies header.""" 

1102 if not cookies: 

1103 return 

1104 

1105 c = SimpleCookie() 

1106 if hdrs.COOKIE in self.headers: 

1107 # parse_cookie_header for RFC 6265 compliant Cookie header parsing 

1108 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, ""))) 

1109 del self.headers[hdrs.COOKIE] 

1110 

1111 if isinstance(cookies, Mapping): 

1112 iter_cookies = cookies.items() 

1113 else: 

1114 iter_cookies = cookies # type: ignore[assignment] 

1115 for name, value in iter_cookies: 

1116 if isinstance(value, Morsel): 

1117 # Use helper to preserve coded_value exactly as sent by server 

1118 c[name] = preserve_morsel_with_coded_value(value) 

1119 else: 

1120 c[name] = value # type: ignore[assignment] 

1121 

1122 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

1123 

1124 def update_content_encoding(self, data: Any) -> None: 

1125 """Set request content encoding.""" 

1126 if not data: 

1127 # Don't compress an empty body. 

1128 self.compress = None 

1129 return 

1130 

1131 if self.headers.get(hdrs.CONTENT_ENCODING): 

1132 if self.compress: 

1133 raise ValueError( 

1134 "compress can not be set if Content-Encoding header is set" 

1135 ) 

1136 elif self.compress: 

1137 if not isinstance(self.compress, str): 

1138 self.compress = "deflate" 

1139 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

1140 self.chunked = True # enable chunked, no need to deal with length 

1141 

1142 def update_transfer_encoding(self) -> None: 

1143 """Analyze transfer-encoding header.""" 

1144 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

1145 

1146 if "chunked" in te: 

1147 if self.chunked: 

1148 raise ValueError( 

1149 "chunked can not be set " 

1150 'if "Transfer-Encoding: chunked" header is set' 

1151 ) 

1152 

1153 elif self.chunked: 

1154 if hdrs.CONTENT_LENGTH in self.headers: 

1155 raise ValueError( 

1156 "chunked can not be set if Content-Length header is set" 

1157 ) 

1158 

1159 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

1160 

1161 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None: 

1162 """Set basic auth.""" 

1163 if auth is None: 

1164 auth = self.auth 

1165 if auth is None: 

1166 return 

1167 

1168 if not isinstance(auth, helpers.BasicAuth): 

1169 raise TypeError("BasicAuth() tuple is required instead") 

1170 

1171 self.headers[hdrs.AUTHORIZATION] = auth.encode() 

1172 

1173 def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None: 

1174 """Update request body from data.""" 

1175 if self._body is not None: 

1176 _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel) 

1177 

1178 if body is None: 

1179 self._body = None 

1180 # Set Content-Length to 0 when body is None for methods that expect a body 

1181 if ( 

1182 self.method not in self.GET_METHODS 

1183 and not self.chunked 

1184 and hdrs.CONTENT_LENGTH not in self.headers 

1185 ): 

1186 self.headers[hdrs.CONTENT_LENGTH] = "0" 

1187 return 

1188 

1189 # FormData 

1190 maybe_payload = body() if isinstance(body, FormData) else body 

1191 

1192 try: 

1193 body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None) 

1194 except payload.LookupError: 

1195 body_payload = FormData(maybe_payload)() # type: ignore[arg-type] 

1196 

1197 self._body = body_payload 

1198 # enable chunked encoding if needed 

1199 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers: 

1200 if (size := body_payload.size) is not None: 

1201 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

1202 else: 

1203 self.chunked = True 

1204 

1205 # copy payload headers 

1206 assert body_payload.headers 

1207 headers = self.headers 

1208 skip_headers = self._skip_auto_headers 

1209 for key, value in body_payload.headers.items(): 

1210 if key in headers or (skip_headers is not None and key in skip_headers): 

1211 continue 

1212 headers[key] = value 

1213 

1214 def _update_body(self, body: Any) -> None: 

1215 """Update request body after its already been set.""" 

1216 # Remove existing Content-Length header since body is changing 

1217 if hdrs.CONTENT_LENGTH in self.headers: 

1218 del self.headers[hdrs.CONTENT_LENGTH] 

1219 

1220 # Remove existing Transfer-Encoding header to avoid conflicts 

1221 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers: 

1222 del self.headers[hdrs.TRANSFER_ENCODING] 

1223 

1224 # Now update the body using the existing method 

1225 # Called from _update_body, add 1 to stacklevel from caller 

1226 self.update_body_from_data(body, _stacklevel=4) 

1227 

1228 # Update transfer encoding headers if needed (same logic as __init__) 

1229 if body is not None or self.method not in self.GET_METHODS: 

1230 self.update_transfer_encoding() 

1231 

1232 async def update_body(self, body: Any) -> None: 

1233 """ 

1234 Update request body and close previous payload if needed. 

1235 

1236 This method safely updates the request body by first closing any existing 

1237 payload to prevent resource leaks, then setting the new body. 

1238 

1239 IMPORTANT: Always use this method instead of setting request.body directly. 

1240 Direct assignment to request.body will leak resources if the previous body 

1241 contains file handles, streams, or other resources that need cleanup. 

1242 

1243 Args: 

1244 body: The new body content. Can be: 

1245 - bytes/bytearray: Raw binary data 

1246 - str: Text data (will be encoded using charset from Content-Type) 

1247 - FormData: Form data that will be encoded as multipart/form-data 

1248 - Payload: A pre-configured payload object 

1249 - AsyncIterable: An async iterable of bytes chunks 

1250 - File-like object: Will be read and sent as binary data 

1251 - None: Clears the body 

1252 

1253 Usage: 

1254 # CORRECT: Use update_body 

1255 await request.update_body(b"new request data") 

1256 

1257 # WRONG: Don't set body directly 

1258 # request.body = b"new request data" # This will leak resources! 

1259 

1260 # Update with form data 

1261 form_data = FormData() 

1262 form_data.add_field('field', 'value') 

1263 await request.update_body(form_data) 

1264 

1265 # Clear body 

1266 await request.update_body(None) 

1267 

1268 Note: 

1269 This method is async because it may need to close file handles or 

1270 other resources associated with the previous payload. Always await 

1271 this method to ensure proper cleanup. 

1272 

1273 Warning: 

1274 Setting request.body directly is highly discouraged and can lead to: 

1275 - Resource leaks (unclosed file handles, streams) 

1276 - Memory leaks (unreleased buffers) 

1277 - Unexpected behavior with streaming payloads 

1278 

1279 It is not recommended to change the payload type in middleware. If the 

1280 body was already set (e.g., as bytes), it's best to keep the same type 

1281 rather than converting it (e.g., to str) as this may result in unexpected 

1282 behavior. 

1283 

1284 See Also: 

1285 - update_body_from_data: Synchronous body update without cleanup 

1286 - body property: Direct body access (STRONGLY DISCOURAGED) 

1287 

1288 """ 

1289 # Close existing payload if it exists and needs closing 

1290 if self._body is not None: 

1291 await self._body.close() 

1292 self._update_body(body) 

1293 

1294 def update_expect_continue(self, expect: bool = False) -> None: 

1295 if expect: 

1296 self.headers[hdrs.EXPECT] = "100-continue" 

1297 elif ( 

1298 hdrs.EXPECT in self.headers 

1299 and self.headers[hdrs.EXPECT].lower() == "100-continue" 

1300 ): 

1301 expect = True 

1302 

1303 if expect: 

1304 self._continue = self.loop.create_future() 

1305 

1306 def update_proxy( 

1307 self, 

1308 proxy: Optional[URL], 

1309 proxy_auth: Optional[BasicAuth], 

1310 proxy_headers: Optional[LooseHeaders], 

1311 ) -> None: 

1312 self.proxy = proxy 

1313 if proxy is None: 

1314 self.proxy_auth = None 

1315 self.proxy_headers = None 

1316 return 

1317 

1318 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 

1319 raise ValueError("proxy_auth must be None or BasicAuth() tuple") 

1320 self.proxy_auth = proxy_auth 

1321 

1322 if proxy_headers is not None and not isinstance( 

1323 proxy_headers, (MultiDict, MultiDictProxy) 

1324 ): 

1325 proxy_headers = CIMultiDict(proxy_headers) 

1326 self.proxy_headers = proxy_headers 

1327 

1328 async def write_bytes( 

1329 self, 

1330 writer: AbstractStreamWriter, 

1331 conn: "Connection", 

1332 content_length: Optional[int] = None, 

1333 ) -> None: 

1334 """ 

1335 Write the request body to the connection stream. 

1336 

1337 This method handles writing different types of request bodies: 

1338 1. Payload objects (using their specialized write_with_length method) 

1339 2. Bytes/bytearray objects 

1340 3. Iterable body content 

1341 

1342 Args: 

1343 writer: The stream writer to write the body to 

1344 conn: The connection being used for this request 

1345 content_length: Optional maximum number of bytes to write from the body 

1346 (None means write the entire body) 

1347 

1348 The method properly handles: 

1349 - Waiting for 100-Continue responses if required 

1350 - Content length constraints for chunked encoding 

1351 - Error handling for network issues, cancellation, and other exceptions 

1352 - Signaling EOF and timeout management 

1353 

1354 Raises: 

1355 ClientOSError: When there's an OS-level error writing the body 

1356 ClientConnectionError: When there's a general connection error 

1357 asyncio.CancelledError: When the operation is cancelled 

1358 

1359 """ 

1360 # 100 response 

1361 if self._continue is not None: 

1362 # Force headers to be sent before waiting for 100-continue 

1363 writer.send_headers() 

1364 await writer.drain() 

1365 await self._continue 

1366 

1367 protocol = conn.protocol 

1368 assert protocol is not None 

1369 try: 

1370 # This should be a rare case but the 

1371 # self._body can be set to None while 

1372 # the task is being started or we wait above 

1373 # for the 100-continue response. 

1374 # The more likely case is we have an empty 

1375 # payload, but 100-continue is still expected. 

1376 if self._body is not None: 

1377 await self._body.write_with_length(writer, content_length) 

1378 except OSError as underlying_exc: 

1379 reraised_exc = underlying_exc 

1380 

1381 # Distinguish between timeout and other OS errors for better error reporting 

1382 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance( 

1383 underlying_exc, asyncio.TimeoutError 

1384 ) 

1385 if exc_is_not_timeout: 

1386 reraised_exc = ClientOSError( 

1387 underlying_exc.errno, 

1388 f"Can not write request body for {self.url !s}", 

1389 ) 

1390 

1391 set_exception(protocol, reraised_exc, underlying_exc) 

1392 except asyncio.CancelledError: 

1393 # Body hasn't been fully sent, so connection can't be reused 

1394 conn.close() 

1395 raise 

1396 except Exception as underlying_exc: 

1397 set_exception( 

1398 protocol, 

1399 ClientConnectionError( 

1400 "Failed to send bytes into the underlying connection " 

1401 f"{conn !s}: {underlying_exc!r}", 

1402 ), 

1403 underlying_exc, 

1404 ) 

1405 else: 

1406 # Successfully wrote the body, signal EOF and start response timeout 

1407 await writer.write_eof() 

1408 protocol.start_timeout() 

1409 

1410 async def send(self, conn: "Connection") -> "ClientResponse": 

1411 # Specify request target: 

1412 # - CONNECT request must send authority form URI 

1413 # - not CONNECT proxy must send absolute form URI 

1414 # - most common is origin form URI 

1415 if self.method == hdrs.METH_CONNECT: 

1416 connect_host = self.url.host_subcomponent 

1417 assert connect_host is not None 

1418 path = f"{connect_host}:{self.url.port}" 

1419 elif self.proxy and not self.is_ssl(): 

1420 path = str(self.url) 

1421 else: 

1422 path = self.url.raw_path_qs 

1423 

1424 protocol = conn.protocol 

1425 assert protocol is not None 

1426 writer = StreamWriter( 

1427 protocol, 

1428 self.loop, 

1429 on_chunk_sent=( 

1430 functools.partial(self._on_chunk_request_sent, self.method, self.url) 

1431 if self._traces 

1432 else None 

1433 ), 

1434 on_headers_sent=( 

1435 functools.partial(self._on_headers_request_sent, self.method, self.url) 

1436 if self._traces 

1437 else None 

1438 ), 

1439 ) 

1440 

1441 if self.compress: 

1442 writer.enable_compression(self.compress) # type: ignore[arg-type] 

1443 

1444 if self.chunked is not None: 

1445 writer.enable_chunking() 

1446 

1447 # set default content-type 

1448 if ( 

1449 self.method in self.POST_METHODS 

1450 and ( 

1451 self._skip_auto_headers is None 

1452 or hdrs.CONTENT_TYPE not in self._skip_auto_headers 

1453 ) 

1454 and hdrs.CONTENT_TYPE not in self.headers 

1455 ): 

1456 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

1457 

1458 v = self.version 

1459 if hdrs.CONNECTION not in self.headers: 

1460 if conn._connector.force_close: 

1461 if v == HttpVersion11: 

1462 self.headers[hdrs.CONNECTION] = "close" 

1463 elif v == HttpVersion10: 

1464 self.headers[hdrs.CONNECTION] = "keep-alive" 

1465 

1466 # status + headers 

1467 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}" 

1468 

1469 # Buffer headers for potential coalescing with body 

1470 await writer.write_headers(status_line, self.headers) 

1471 

1472 task: Optional["asyncio.Task[None]"] 

1473 if self._body or self._continue is not None or protocol.writing_paused: 

1474 coro = self.write_bytes(writer, conn, self._get_content_length()) 

1475 if sys.version_info >= (3, 12): 

1476 # Optimization for Python 3.12, try to write 

1477 # bytes immediately to avoid having to schedule 

1478 # the task on the event loop. 

1479 task = asyncio.Task(coro, loop=self.loop, eager_start=True) 

1480 else: 

1481 task = self.loop.create_task(coro) 

1482 if task.done(): 

1483 task = None 

1484 else: 

1485 self._writer = task 

1486 else: 

1487 # We have nothing to write because 

1488 # - there is no body 

1489 # - the protocol does not have writing paused 

1490 # - we are not waiting for a 100-continue response 

1491 protocol.start_timeout() 

1492 writer.set_eof() 

1493 task = None 

1494 response_class = self.response_class 

1495 assert response_class is not None 

1496 self.response = response_class( 

1497 self.method, 

1498 self.original_url, 

1499 writer=task, 

1500 continue100=self._continue, 

1501 timer=self._timer, 

1502 request_info=self.request_info, 

1503 traces=self._traces, 

1504 loop=self.loop, 

1505 session=self._session, 

1506 ) 

1507 return self.response 

1508 

1509 async def close(self) -> None: 

1510 if self.__writer is not None: 

1511 try: 

1512 await self.__writer 

1513 except asyncio.CancelledError: 

1514 if ( 

1515 sys.version_info >= (3, 11) 

1516 and (task := asyncio.current_task()) 

1517 and task.cancelling() 

1518 ): 

1519 raise 

1520 

1521 def terminate(self) -> None: 

1522 if self.__writer is not None: 

1523 if not self.loop.is_closed(): 

1524 self.__writer.cancel() 

1525 self.__writer.remove_done_callback(self.__reset_writer) 

1526 self.__writer = None 

1527 

1528 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

1529 for trace in self._traces: 

1530 await trace.send_request_chunk_sent(method, url, chunk) 

1531 

1532 async def _on_headers_request_sent( 

1533 self, method: str, url: URL, headers: "CIMultiDict[str]" 

1534 ) -> None: 

1535 for trace in self._traces: 

1536 await trace.send_request_headers(method, url, headers)