Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/client_reqrep.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

778 statements  

1import asyncio 

2import codecs 

3import contextlib 

4import functools 

5import io 

6import re 

7import sys 

8import traceback 

9import warnings 

10from collections.abc import Callable, Iterable, Mapping 

11from hashlib import md5, sha1, sha256 

12from http.cookies import Morsel, SimpleCookie 

13from types import MappingProxyType, TracebackType 

14from typing import TYPE_CHECKING, Any, Literal, NamedTuple, Optional, Union 

15 

16import attr 

17from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

18from yarl import URL 

19 

20from . import hdrs, helpers, http, multipart, payload 

21from ._cookie_helpers import ( 

22 parse_cookie_header, 

23 parse_set_cookie_headers, 

24 preserve_morsel_with_coded_value, 

25) 

26from .abc import AbstractStreamWriter 

27from .client_exceptions import ( 

28 ClientConnectionError, 

29 ClientOSError, 

30 ClientResponseError, 

31 ContentTypeError, 

32 InvalidURL, 

33 ServerFingerprintMismatch, 

34) 

35from .compression_utils import HAS_BROTLI, HAS_ZSTD 

36from .formdata import FormData 

37from .helpers import ( 

38 _SENTINEL, 

39 BaseTimerContext, 

40 BasicAuth, 

41 HeadersMixin, 

42 TimerNoop, 

43 _basic_auth_no_warn, 

44 noop, 

45 reify, 

46 sentinel, 

47 set_exception, 

48 set_result, 

49) 

50from .http import ( 

51 SERVER_SOFTWARE, 

52 HttpVersion, 

53 HttpVersion10, 

54 HttpVersion11, 

55 StreamWriter, 

56) 

57from .streams import StreamReader 

58from .typedefs import ( 

59 DEFAULT_JSON_DECODER, 

60 JSONDecoder, 

61 LooseCookies, 

62 LooseHeaders, 

63 Query, 

64 RawHeaders, 

65) 

66 

67if TYPE_CHECKING: 

68 import ssl 

69 from ssl import SSLContext 

70else: 

71 try: 

72 import ssl 

73 from ssl import SSLContext 

74 except ImportError: # pragma: no cover 

75 ssl = None # type: ignore[assignment] 

76 SSLContext = object # type: ignore[misc,assignment] 

77 

78 

79__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

80 

81 

82if TYPE_CHECKING: 

83 from .client import ClientSession 

84 from .connector import Connection 

85 from .tracing import Trace 

86 

87 

88_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 

89_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

90json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json") 

91_DIGITS_RE = re.compile(r"\d+", re.ASCII) 

92 

93 

94def _gen_default_accept_encoding() -> str: 

95 encodings = [ 

96 "gzip", 

97 "deflate", 

98 ] 

99 if HAS_BROTLI: 

100 encodings.append("br") 

101 if HAS_ZSTD: 

102 encodings.append("zstd") 

103 return ", ".join(encodings) 

104 

105 

106@attr.s(auto_attribs=True, frozen=True, slots=True) 

107class ContentDisposition: 

108 type: str | None 

109 parameters: "MappingProxyType[str, str]" 

110 filename: str | None 

111 

112 

113class _RequestInfo(NamedTuple): 

114 url: URL 

115 method: str 

116 headers: "CIMultiDictProxy[str]" 

117 real_url: URL 

118 

119 

120class RequestInfo(_RequestInfo): 

121 

122 def __new__( 

123 cls, 

124 url: URL, 

125 method: str, 

126 headers: "CIMultiDictProxy[str]", 

127 real_url: URL | _SENTINEL = sentinel, 

128 ) -> "RequestInfo": 

129 """Create a new RequestInfo instance. 

130 

131 For backwards compatibility, the real_url parameter is optional. 

132 """ 

133 return tuple.__new__( 

134 cls, (url, method, headers, url if real_url is sentinel else real_url) 

135 ) 

136 

137 

138class Fingerprint: 

139 HASHFUNC_BY_DIGESTLEN = { 

140 16: md5, 

141 20: sha1, 

142 32: sha256, 

143 } 

144 

145 def __init__(self, fingerprint: bytes) -> None: 

146 digestlen = len(fingerprint) 

147 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

148 if not hashfunc: 

149 raise ValueError("fingerprint has invalid length") 

150 elif hashfunc is md5 or hashfunc is sha1: 

151 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.") 

152 self._hashfunc = hashfunc 

153 self._fingerprint = fingerprint 

154 

155 @property 

156 def fingerprint(self) -> bytes: 

157 return self._fingerprint 

158 

159 def check(self, transport: asyncio.Transport) -> None: 

160 if not transport.get_extra_info("sslcontext"): 

161 return 

162 sslobj = transport.get_extra_info("ssl_object") 

163 cert = sslobj.getpeercert(binary_form=True) 

164 got = self._hashfunc(cert).digest() 

165 if got != self._fingerprint: 

166 host, port, *_ = transport.get_extra_info("peername") 

167 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

168 

169 

170if ssl is not None: 

171 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None)) 

172else: # pragma: no cover 

173 SSL_ALLOWED_TYPES = (bool, type(None)) 

174 

175 

176def _merge_ssl_params( 

177 ssl: Union["SSLContext", bool, Fingerprint], 

178 verify_ssl: bool | None, 

179 ssl_context: Optional["SSLContext"], 

180 fingerprint: bytes | None, 

181) -> Union["SSLContext", bool, Fingerprint]: 

182 if ssl is None: 

183 ssl = True # Double check for backwards compatibility 

184 if verify_ssl is not None and not verify_ssl: 

185 warnings.warn( 

186 "verify_ssl is deprecated, use ssl=False instead", 

187 DeprecationWarning, 

188 stacklevel=3, 

189 ) 

190 if ssl is not True: 

191 raise ValueError( 

192 "verify_ssl, ssl_context, fingerprint and ssl " 

193 "parameters are mutually exclusive" 

194 ) 

195 else: 

196 ssl = False 

197 if ssl_context is not None: 

198 warnings.warn( 

199 "ssl_context is deprecated, use ssl=context instead", 

200 DeprecationWarning, 

201 stacklevel=3, 

202 ) 

203 if ssl is not True: 

204 raise ValueError( 

205 "verify_ssl, ssl_context, fingerprint and ssl " 

206 "parameters are mutually exclusive" 

207 ) 

208 else: 

209 ssl = ssl_context 

210 if fingerprint is not None: 

211 warnings.warn( 

212 "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead", 

213 DeprecationWarning, 

214 stacklevel=3, 

215 ) 

216 if ssl is not True: 

217 raise ValueError( 

218 "verify_ssl, ssl_context, fingerprint and ssl " 

219 "parameters are mutually exclusive" 

220 ) 

221 else: 

222 ssl = Fingerprint(fingerprint) 

223 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

224 raise TypeError( 

225 "ssl should be SSLContext, bool, Fingerprint or None, " 

226 f"got {ssl!r} instead." 

227 ) 

228 return ssl 

229 

230 

231_SSL_SCHEMES = frozenset(("https", "wss")) 

232 

233 

234# ConnectionKey is a NamedTuple because it is used as a key in a dict 

235# and a set in the connector. Since a NamedTuple is a tuple it uses 

236# the fast native tuple __hash__ and __eq__ implementation in CPython. 

237class ConnectionKey(NamedTuple): 

238 # the key should contain an information about used proxy / TLS 

239 # to prevent reusing wrong connections from a pool 

240 host: str 

241 port: int | None 

242 is_ssl: bool 

243 ssl: SSLContext | bool | Fingerprint 

244 proxy: URL | None 

245 proxy_auth: BasicAuth | None 

246 proxy_headers_hash: int | None # hash(CIMultiDict) 

247 server_hostname: str | None = None 

248 

249 

250def _is_expected_content_type( 

251 response_content_type: str, expected_content_type: str 

252) -> bool: 

253 if expected_content_type == "application/json": 

254 return json_re.match(response_content_type) is not None 

255 return expected_content_type in response_content_type 

256 

257 

258def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None: 

259 """Warn if the payload is not closed. 

260 

261 Callers must check that the body is a Payload before calling this method. 

262 

263 Args: 

264 payload: The payload to check 

265 stacklevel: Stack level for the warning (default 2 for direct callers) 

266 """ 

267 if not payload.autoclose and not payload.consumed: 

268 warnings.warn( 

269 "The previous request body contains unclosed resources. " 

270 "Use await request.update_body() instead of setting request.body " 

271 "directly to properly close resources and avoid leaks.", 

272 ResourceWarning, 

273 stacklevel=stacklevel, 

274 ) 

275 

276 

277class ClientResponse(HeadersMixin): 

278 

279 # Some of these attributes are None when created, 

280 # but will be set by the start() method. 

281 # As the end user will likely never see the None values, we cheat the types below. 

282 # from the Status-Line of the response 

283 version: HttpVersion | None = None # HTTP-Version 

284 status: int = None # type: ignore[assignment] # Status-Code 

285 reason: str | None = None # Reason-Phrase 

286 

287 content: StreamReader = None # type: ignore[assignment] # Payload stream 

288 _body: bytes | None = None 

289 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment] 

290 _history: tuple["ClientResponse", ...] = () 

291 _raw_headers: RawHeaders = None # type: ignore[assignment] 

292 

293 _connection: Optional["Connection"] = None # current connection 

294 _cookies: SimpleCookie | None = None 

295 _raw_cookie_headers: tuple[str, ...] | None = None 

296 _continue: Optional["asyncio.Future[bool]"] = None 

297 _source_traceback: traceback.StackSummary | None = None 

298 _session: Optional["ClientSession"] = None 

299 # set up by ClientRequest after ClientResponse object creation 

300 # post-init stage allows to not change ctor signature 

301 _closed = True # to allow __del__ for non-initialized properly response 

302 _released = False 

303 _in_context = False 

304 

305 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8" 

306 

307 __writer: Optional["asyncio.Task[None]"] = None 

308 _stream_writer: Optional[AbstractStreamWriter] = None 

309 _output_size: int = 0 

310 _upload_complete: Optional[asyncio.Future[None]] = None 

311 

312 def __init__( 

313 self, 

314 method: str, 

315 url: URL, 

316 *, 

317 writer: "asyncio.Task[None] | None", 

318 continue100: Optional["asyncio.Future[bool]"], 

319 timer: BaseTimerContext, 

320 request_info: RequestInfo, 

321 traces: list["Trace"], 

322 loop: asyncio.AbstractEventLoop, 

323 session: "ClientSession", 

324 stream_writer: AbstractStreamWriter, 

325 ) -> None: 

326 # URL forbids subclasses, so a simple type check is enough. 

327 assert type(url) is URL 

328 

329 self.method = method 

330 

331 self._real_url = url 

332 self._url = url.with_fragment(None) if url.raw_fragment else url 

333 if writer is None: # Request already sent 

334 self._output_size = stream_writer.output_size 

335 else: 

336 self._stream_writer = stream_writer 

337 self._writer = writer 

338 if continue100 is not None: 

339 self._continue = continue100 

340 self._request_info = request_info 

341 self._timer = timer if timer is not None else TimerNoop() 

342 self._cache: dict[str, Any] = {} 

343 self._traces = traces 

344 self._loop = loop 

345 # Save reference to _resolve_charset, so that get_encoding() will still 

346 # work after the response has finished reading the body. 

347 # TODO: Fix session=None in tests (see ClientRequest.__init__). 

348 if session is not None: 

349 # store a reference to session #1985 

350 self._session = session 

351 self._resolve_charset = session._resolve_charset 

352 if loop.get_debug(): 

353 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

354 

355 def __reset_writer(self, _: object = None) -> None: 

356 self.__writer = None 

357 if self._stream_writer is not None: 

358 self._output_size = self._stream_writer.output_size 

359 self._stream_writer = None 

360 if self._upload_complete is not None and not self._upload_complete.done(): 

361 self._upload_complete.set_result(None) 

362 

363 @property 

364 def _writer(self) -> Optional["asyncio.Task[None]"]: 

365 """The writer task for streaming data. 

366 

367 _writer is only provided for backwards compatibility 

368 for subclasses that may need to access it. 

369 """ 

370 return self.__writer 

371 

372 @_writer.setter 

373 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: 

374 """Set the writer task for streaming data.""" 

375 if self.__writer is not None: 

376 self.__writer.remove_done_callback(self.__reset_writer) 

377 self.__writer = writer 

378 if writer is None: 

379 return 

380 if writer.done(): 

381 # The writer is already done, so we can clear it immediately. 

382 self.__reset_writer() 

383 else: 

384 writer.add_done_callback(self.__reset_writer) 

385 

386 @property 

387 def output_size(self) -> int: 

388 """Number of bytes sent for this request.""" 

389 if self._stream_writer is not None: 

390 return self._stream_writer.output_size 

391 return self._output_size 

392 

393 @property 

394 def upload_complete(self) -> "asyncio.Future[None]": 

395 """Future set when the request body has been fully sent. 

396 

397 Already done when the request had no body or was written eagerly. 

398 """ 

399 if self._upload_complete is None: 

400 self._upload_complete = self._loop.create_future() 

401 if self._stream_writer is None: # upload already finished 

402 self._upload_complete.set_result(None) 

403 return self._upload_complete 

404 

405 @property 

406 def cookies(self) -> SimpleCookie: 

407 if self._cookies is None: 

408 if self._raw_cookie_headers is not None: 

409 # Parse cookies for response.cookies (SimpleCookie for backward compatibility) 

410 cookies = SimpleCookie() 

411 # Use parse_set_cookie_headers for more lenient parsing that handles 

412 # malformed cookies better than SimpleCookie.load 

413 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers)) 

414 self._cookies = cookies 

415 else: 

416 self._cookies = SimpleCookie() 

417 return self._cookies 

418 

419 @cookies.setter 

420 def cookies(self, cookies: SimpleCookie) -> None: 

421 self._cookies = cookies 

422 # Generate raw cookie headers from the SimpleCookie 

423 if cookies: 

424 self._raw_cookie_headers = tuple( 

425 morsel.OutputString() for morsel in cookies.values() 

426 ) 

427 else: 

428 self._raw_cookie_headers = None 

429 

430 @reify 

431 def url(self) -> URL: 

432 return self._url 

433 

434 @reify 

435 def url_obj(self) -> URL: 

436 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2) 

437 return self._url 

438 

439 @reify 

440 def real_url(self) -> URL: 

441 return self._real_url 

442 

443 @reify 

444 def host(self) -> str: 

445 assert self._url.host is not None 

446 return self._url.host 

447 

448 @reify 

449 def headers(self) -> "CIMultiDictProxy[str]": 

450 return self._headers 

451 

452 @reify 

453 def raw_headers(self) -> RawHeaders: 

454 return self._raw_headers 

455 

456 @reify 

457 def request_info(self) -> RequestInfo: 

458 return self._request_info 

459 

460 @reify 

461 def content_disposition(self) -> ContentDisposition | None: 

462 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

463 if raw is None: 

464 return None 

465 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

466 params = MappingProxyType(params_dct) 

467 filename = multipart.content_disposition_filename(params) 

468 return ContentDisposition(disposition_type, params, filename) 

469 

470 def __del__(self, _warnings: Any = warnings) -> None: 

471 if self._closed: 

472 return 

473 

474 if self._connection is not None: 

475 self._connection.release() 

476 self._cleanup_writer() 

477 

478 if self._loop.get_debug(): 

479 kwargs = {"source": self} 

480 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs) 

481 context = {"client_response": self, "message": "Unclosed response"} 

482 if self._source_traceback: 

483 context["source_traceback"] = self._source_traceback 

484 self._loop.call_exception_handler(context) 

485 

486 def __repr__(self) -> str: 

487 out = io.StringIO() 

488 ascii_encodable_url = str(self.url) 

489 if self.reason: 

490 ascii_encodable_reason = self.reason.encode( 

491 "ascii", "backslashreplace" 

492 ).decode("ascii") 

493 else: 

494 ascii_encodable_reason = "None" 

495 print( 

496 f"<ClientResponse({ascii_encodable_url}) [{self.status} {ascii_encodable_reason}]>", 

497 file=out, 

498 ) 

499 print(self.headers, file=out) 

500 return out.getvalue() 

501 

502 @property 

503 def connection(self) -> Optional["Connection"]: 

504 return self._connection 

505 

506 @reify 

507 def history(self) -> tuple["ClientResponse", ...]: 

508 """A sequence of of responses, if redirects occurred.""" 

509 return self._history 

510 

511 @reify 

512 def links(self) -> "MultiDictProxy[MultiDictProxy[str | URL]]": 

513 links_str = ", ".join(self.headers.getall("link", [])) 

514 

515 if not links_str: 

516 return MultiDictProxy(MultiDict()) 

517 

518 links: MultiDict[MultiDictProxy[str | URL]] = MultiDict() 

519 

520 for val in re.split(r",(?=\s*<)", links_str): 

521 match = re.match(r"\s*<(.*)>(.*)", val) 

522 if match is None: # pragma: no cover 

523 # the check exists to suppress mypy error 

524 continue 

525 url, params_str = match.groups() 

526 params = params_str.split(";")[1:] 

527 

528 link: MultiDict[str | URL] = MultiDict() 

529 

530 for param in params: 

531 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

532 if match is None: # pragma: no cover 

533 # the check exists to suppress mypy error 

534 continue 

535 key, _, value, _ = match.groups() 

536 

537 link.add(key, value) 

538 

539 key = link.get("rel", url) 

540 

541 link.add("url", self.url.join(URL(url))) 

542 

543 links.add(str(key), MultiDictProxy(link)) 

544 

545 return MultiDictProxy(links) 

546 

547 async def start(self, connection: "Connection") -> "ClientResponse": 

548 """Start response processing.""" 

549 self._closed = False 

550 self._protocol = connection.protocol 

551 self._connection = connection 

552 

553 with self._timer: 

554 while True: 

555 # read response 

556 try: 

557 protocol = self._protocol 

558 message, payload = await protocol.read() # type: ignore[union-attr] 

559 except http.HttpProcessingError as exc: 

560 raise ClientResponseError( 

561 self.request_info, 

562 self.history, 

563 status=exc.code, 

564 message=exc.message, 

565 headers=exc.headers, 

566 ) from exc 

567 

568 if message.code < 100 or message.code > 199 or message.code == 101: 

569 break 

570 

571 if self._continue is not None: 

572 set_result(self._continue, True) 

573 self._continue = None 

574 

575 # payload eof handler 

576 payload.on_eof(self._response_eof) 

577 

578 # response status 

579 self.version = message.version 

580 self.status = message.code 

581 self.reason = message.reason 

582 

583 # headers 

584 self._headers = message.headers # type is CIMultiDictProxy 

585 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes] 

586 

587 # payload 

588 self.content = payload 

589 

590 # cookies 

591 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()): 

592 # Store raw cookie headers for CookieJar 

593 self._raw_cookie_headers = tuple(cookie_hdrs) 

594 return self 

595 

596 def _response_eof(self) -> None: 

597 if self._closed: 

598 return 

599 

600 # protocol could be None because connection could be detached 

601 protocol = self._connection and self._connection.protocol 

602 if protocol is not None and protocol.upgraded: 

603 return 

604 

605 self._closed = True 

606 self._cleanup_writer() 

607 self._release_connection() 

608 

609 @property 

610 def closed(self) -> bool: 

611 return self._closed 

612 

613 def close(self) -> None: 

614 if not self._released: 

615 self._notify_content() 

616 

617 self._closed = True 

618 if self._loop is None or self._loop.is_closed(): 

619 return 

620 

621 self._cleanup_writer() 

622 if self._connection is not None: 

623 self._connection.close() 

624 self._connection = None 

625 

626 def release(self) -> Any: 

627 if not self._released: 

628 self._notify_content() 

629 

630 self._closed = True 

631 

632 self._cleanup_writer() 

633 self._release_connection() 

634 return noop() 

635 

636 @property 

637 def ok(self) -> bool: 

638 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

639 

640 This is **not** a check for ``200 OK`` but a check that the response 

641 status is under 400. 

642 """ 

643 return 400 > self.status 

644 

645 def raise_for_status(self) -> None: 

646 if not self.ok: 

647 # reason should always be not None for a started response 

648 assert self.reason is not None 

649 

650 # If we're in a context we can rely on __aexit__() to release as the 

651 # exception propagates. 

652 if not self._in_context: 

653 self.release() 

654 

655 raise ClientResponseError( 

656 self.request_info, 

657 self.history, 

658 status=self.status, 

659 message=self.reason, 

660 headers=self.headers, 

661 ) 

662 

663 def _release_connection(self) -> None: 

664 if self._connection is not None: 

665 if self.__writer is None: 

666 self._connection.release() 

667 self._connection = None 

668 else: 

669 self.__writer.add_done_callback(lambda f: self._release_connection()) 

670 

671 async def _wait_released(self) -> None: 

672 if self.__writer is not None: 

673 try: 

674 await self.__writer 

675 except asyncio.CancelledError: 

676 if ( 

677 sys.version_info >= (3, 11) 

678 and (task := asyncio.current_task()) 

679 and task.cancelling() 

680 ): 

681 raise 

682 self._release_connection() 

683 

684 def _cleanup_writer(self) -> None: 

685 if self.__writer is not None: 

686 self.__writer.cancel() 

687 if self._stream_writer is not None: 

688 self._output_size = self._stream_writer.output_size 

689 self._stream_writer = None 

690 self._session = None 

691 

692 def _notify_content(self) -> None: 

693 content = self.content 

694 if content and content.exception() is None: 

695 set_exception(content, _CONNECTION_CLOSED_EXCEPTION) 

696 self._released = True 

697 

698 async def wait_for_close(self) -> None: 

699 if self.__writer is not None: 

700 try: 

701 await self.__writer 

702 except asyncio.CancelledError: 

703 if ( 

704 sys.version_info >= (3, 11) 

705 and (task := asyncio.current_task()) 

706 and task.cancelling() 

707 ): 

708 raise 

709 self.release() 

710 

711 async def read(self) -> bytes: 

712 """Read response payload.""" 

713 if self._body is None: 

714 try: 

715 self._body = await self.content.read() 

716 for trace in self._traces: 

717 await trace.send_response_chunk_received( 

718 self.method, self.url, self._body 

719 ) 

720 except BaseException: 

721 self.close() 

722 raise 

723 elif self._released: # Response explicitly released 

724 raise ClientConnectionError("Connection closed") 

725 

726 protocol = self._connection and self._connection.protocol 

727 if protocol is None or not protocol.upgraded: 

728 await self._wait_released() # Underlying connection released 

729 return self._body 

730 

731 def get_encoding(self) -> str: 

732 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

733 mimetype = helpers.parse_mimetype(ctype) 

734 

735 encoding = mimetype.parameters.get("charset") 

736 if encoding: 

737 with contextlib.suppress(LookupError, ValueError): 

738 return codecs.lookup(encoding).name 

739 

740 if mimetype.type == "application" and ( 

741 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

742 ): 

743 # RFC 7159 states that the default encoding is UTF-8. 

744 # RFC 7483 defines application/rdap+json 

745 return "utf-8" 

746 

747 if self._body is None: 

748 raise RuntimeError( 

749 "Cannot compute fallback encoding of a not yet read body" 

750 ) 

751 

752 return self._resolve_charset(self, self._body) 

753 

754 async def text(self, encoding: str | None = None, errors: str = "strict") -> str: 

755 """Read response payload and decode.""" 

756 if self._body is None: 

757 await self.read() 

758 

759 if encoding is None: 

760 encoding = self.get_encoding() 

761 

762 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr] 

763 

764 async def json( 

765 self, 

766 *, 

767 encoding: str | None = None, 

768 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

769 content_type: str | None = "application/json", 

770 ) -> Any: 

771 """Read and decodes JSON response.""" 

772 if self._body is None: 

773 await self.read() 

774 

775 if content_type: 

776 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

777 if not _is_expected_content_type(ctype, content_type): 

778 raise ContentTypeError( 

779 self.request_info, 

780 self.history, 

781 status=self.status, 

782 message=( 

783 "Attempt to decode JSON with unexpected mimetype: %s" % ctype 

784 ), 

785 headers=self.headers, 

786 ) 

787 

788 stripped = self._body.strip() # type: ignore[union-attr] 

789 if not stripped: 

790 return None 

791 

792 if encoding is None: 

793 encoding = self.get_encoding() 

794 

795 return loads(stripped.decode(encoding)) 

796 

797 async def __aenter__(self) -> "ClientResponse": 

798 self._in_context = True 

799 return self 

800 

801 async def __aexit__( 

802 self, 

803 exc_type: type[BaseException] | None, 

804 exc_val: BaseException | None, 

805 exc_tb: TracebackType | None, 

806 ) -> None: 

807 self._in_context = False 

808 # similar to _RequestContextManager, we do not need to check 

809 # for exceptions, response object can close connection 

810 # if state is broken 

811 self.release() 

812 await self.wait_for_close() 

813 

814 

815class ClientRequest: 

816 GET_METHODS = { 

817 hdrs.METH_GET, 

818 hdrs.METH_HEAD, 

819 hdrs.METH_OPTIONS, 

820 hdrs.METH_TRACE, 

821 } 

822 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

823 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE}) 

824 

825 DEFAULT_HEADERS = { 

826 hdrs.ACCEPT: "*/*", 

827 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

828 } 

829 

830 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic. 

831 _body: None | payload.Payload = None 

832 auth = None 

833 response = None 

834 

835 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data 

836 

837 # These class defaults help create_autospec() work correctly. 

838 # If autospec is improved in future, maybe these can be removed. 

839 url = URL() 

840 method = "GET" 

841 

842 _continue = None # waiter future for '100 Continue' response 

843 

844 _skip_auto_headers: Optional["CIMultiDict[None]"] = None 

845 

846 # N.B. 

847 # Adding __del__ method with self._writer closing doesn't make sense 

848 # because _writer is instance method, thus it keeps a reference to self. 

849 # Until writer has finished finalizer will not be called. 

850 

851 def __init__( 

852 self, 

853 method: str, 

854 url: URL, 

855 *, 

856 params: Query = None, 

857 headers: LooseHeaders | None = None, 

858 skip_auto_headers: Iterable[str] | None = None, 

859 data: Any = None, 

860 cookies: LooseCookies | None = None, 

861 auth: BasicAuth | None = None, 

862 version: http.HttpVersion = http.HttpVersion11, 

863 compress: str | bool | None = None, 

864 chunked: bool | None = None, 

865 expect100: bool = False, 

866 loop: asyncio.AbstractEventLoop | None = None, 

867 response_class: type["ClientResponse"] | None = None, 

868 proxy: URL | None = None, 

869 proxy_auth: BasicAuth | None = None, 

870 timer: BaseTimerContext | None = None, 

871 session: Optional["ClientSession"] = None, 

872 ssl: SSLContext | bool | Fingerprint = True, 

873 proxy_headers: LooseHeaders | None = None, 

874 traces: list["Trace"] | None = None, 

875 trust_env: bool = False, 

876 server_hostname: str | None = None, 

877 ): 

878 if loop is None: 

879 loop = asyncio.get_event_loop() 

880 if match := _CONTAINS_CONTROL_CHAR_RE.search(method): 

881 raise ValueError( 

882 f"Method cannot contain non-token characters {method!r} " 

883 f"(found at least {match.group()!r})" 

884 ) 

885 # URL forbids subclasses, so a simple type check is enough. 

886 assert type(url) is URL, url 

887 if proxy is not None: 

888 assert type(proxy) is URL, proxy 

889 # FIXME: session is None in tests only, need to fix tests 

890 # assert session is not None 

891 if TYPE_CHECKING: 

892 assert session is not None 

893 self._session = session 

894 if params: 

895 url = url.extend_query(params) 

896 self.original_url = url 

897 self.url = url.with_fragment(None) if url.raw_fragment else url 

898 self.method = method.upper() 

899 self.chunked = chunked 

900 self.compress = compress 

901 self.loop = loop 

902 self.length = None 

903 if response_class is None: 

904 real_response_class = ClientResponse 

905 else: 

906 real_response_class = response_class 

907 self.response_class: type[ClientResponse] = real_response_class 

908 self._timer = timer if timer is not None else TimerNoop() 

909 self._ssl = ssl if ssl is not None else True 

910 self.server_hostname = server_hostname 

911 

912 if loop.get_debug(): 

913 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

914 

915 self.update_version(version) 

916 self.update_host(url) 

917 self.update_headers(headers) 

918 self.update_auto_headers(skip_auto_headers) 

919 self.update_cookies(cookies) 

920 self.update_content_encoding(data) 

921 self.update_auth(auth, trust_env) 

922 self.update_proxy(proxy, proxy_auth, proxy_headers) 

923 

924 self.update_body_from_data(data) 

925 if data is not None or self.method not in self.GET_METHODS: 

926 self.update_transfer_encoding() 

927 self.update_expect_continue(expect100) 

928 self._traces = [] if traces is None else traces 

929 

930 def __reset_writer(self, _: object = None) -> None: 

931 self.__writer = None 

932 

933 def _get_content_length(self) -> int | None: 

934 """Extract and validate Content-Length header value. 

935 

936 Returns parsed Content-Length value or None if not set. 

937 Raises ValueError if header exists but cannot be parsed as an integer. 

938 """ 

939 if hdrs.CONTENT_LENGTH not in self.headers: 

940 return None 

941 

942 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH] 

943 if not _DIGITS_RE.fullmatch(content_length_hdr): 

944 raise ValueError(f"Invalid Content-Length header: {content_length_hdr!r}") 

945 return int(content_length_hdr) 

946 

947 @property 

948 def skip_auto_headers(self) -> CIMultiDict[None]: 

949 return self._skip_auto_headers or CIMultiDict() 

950 

951 @property 

952 def _writer(self) -> Optional["asyncio.Task[None]"]: 

953 return self.__writer 

954 

955 @_writer.setter 

956 def _writer(self, writer: "asyncio.Task[None]") -> None: 

957 if self.__writer is not None: 

958 self.__writer.remove_done_callback(self.__reset_writer) 

959 self.__writer = writer 

960 writer.add_done_callback(self.__reset_writer) 

961 

962 def is_ssl(self) -> bool: 

963 return self.url.scheme in _SSL_SCHEMES 

964 

965 @property 

966 def ssl(self) -> Union["SSLContext", bool, Fingerprint]: 

967 return self._ssl 

968 

969 @property 

970 def connection_key(self) -> ConnectionKey: 

971 if proxy_headers := self.proxy_headers: 

972 h: int | None = hash(tuple(proxy_headers.items())) 

973 else: 

974 h = None 

975 url = self.url 

976 return tuple.__new__( 

977 ConnectionKey, 

978 ( 

979 url.raw_host or "", 

980 url.port, 

981 url.scheme in _SSL_SCHEMES, 

982 self._ssl, 

983 self.proxy, 

984 self.proxy_auth, 

985 h, 

986 self.server_hostname, 

987 ), 

988 ) 

989 

990 @property 

991 def host(self) -> str: 

992 ret = self.url.raw_host 

993 assert ret is not None 

994 return ret 

995 

996 @property 

997 def port(self) -> int | None: 

998 return self.url.port 

999 

1000 @property 

1001 def body(self) -> payload.Payload | Literal[b""]: 

1002 """Request body.""" 

1003 # empty body is represented as bytes for backwards compatibility 

1004 return self._body or b"" 

1005 

1006 @body.setter 

1007 def body(self, value: Any) -> None: 

1008 """Set request body with warning for non-autoclose payloads. 

1009 

1010 WARNING: This setter must be called from within an event loop and is not 

1011 thread-safe. Setting body outside of an event loop may raise RuntimeError 

1012 when closing file-based payloads. 

1013 

1014 DEPRECATED: Direct assignment to body is deprecated and will be removed 

1015 in a future version. Use await update_body() instead for proper resource 

1016 management. 

1017 """ 

1018 # Close existing payload if present 

1019 if self._body is not None: 

1020 # Warn if the payload needs manual closing 

1021 # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload 

1022 _warn_if_unclosed_payload(self._body, stacklevel=3) 

1023 # NOTE: In the future, when we remove sync close support, 

1024 # this setter will need to be removed and only the async 

1025 # update_body() method will be available. For now, we call 

1026 # _close() for backwards compatibility. 

1027 self._body._close() 

1028 self._update_body(value) 

1029 

1030 @property 

1031 def request_info(self) -> RequestInfo: 

1032 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers) 

1033 # These are created on every request, so we use a NamedTuple 

1034 # for performance reasons. We don't use the RequestInfo.__new__ 

1035 # method because it has a different signature which is provided 

1036 # for backwards compatibility only. 

1037 return tuple.__new__( 

1038 RequestInfo, (self.url, self.method, headers, self.original_url) 

1039 ) 

1040 

1041 @property 

1042 def session(self) -> "ClientSession": 

1043 """Return the ClientSession instance. 

1044 

1045 This property provides access to the ClientSession that initiated 

1046 this request, allowing middleware to make additional requests 

1047 using the same session. 

1048 """ 

1049 return self._session 

1050 

1051 def update_host(self, url: URL) -> None: 

1052 """Update destination host, port and connection type (ssl).""" 

1053 # get host/port 

1054 if not url.raw_host: 

1055 raise InvalidURL(url) 

1056 

1057 # basic auth info 

1058 if url.raw_user or url.raw_password: 

1059 self.auth = _basic_auth_no_warn(url.user or "", url.password or "") 

1060 

1061 def update_version(self, version: http.HttpVersion | str) -> None: 

1062 """Convert request version to two elements tuple. 

1063 

1064 parser HTTP version '1.1' => (1, 1) 

1065 """ 

1066 if isinstance(version, str): 

1067 v = [part.strip() for part in version.split(".", 1)] 

1068 try: 

1069 version = http.HttpVersion(int(v[0]), int(v[1])) 

1070 except ValueError: 

1071 raise ValueError( 

1072 f"Can not parse http version number: {version}" 

1073 ) from None 

1074 self.version = version 

1075 

1076 def update_headers(self, headers: LooseHeaders | None) -> None: 

1077 """Update request headers.""" 

1078 self.headers: CIMultiDict[str] = CIMultiDict() 

1079 

1080 # Build the host header 

1081 host = self.url.host_port_subcomponent 

1082 

1083 # host_port_subcomponent is None when the URL is a relative URL. 

1084 # but we know we do not have a relative URL here. 

1085 assert host is not None 

1086 self.headers[hdrs.HOST] = host 

1087 

1088 if not headers: 

1089 return 

1090 

1091 if isinstance(headers, (dict, MultiDictProxy, MultiDict)): 

1092 headers = headers.items() 

1093 

1094 for key, value in headers: # type: ignore[str-unpack] 

1095 # A special case for Host header 

1096 if key in hdrs.HOST_ALL: 

1097 self.headers[key] = value 

1098 else: 

1099 self.headers.add(key, value) 

1100 

1101 def update_auto_headers(self, skip_auto_headers: Iterable[str] | None) -> None: 

1102 if skip_auto_headers is not None: 

1103 self._skip_auto_headers = CIMultiDict( 

1104 (hdr, None) for hdr in sorted(skip_auto_headers) 

1105 ) 

1106 used_headers = self.headers.copy() 

1107 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type] 

1108 else: 

1109 # Fast path when there are no headers to skip 

1110 # which is the most common case. 

1111 used_headers = self.headers 

1112 

1113 for hdr, val in self.DEFAULT_HEADERS.items(): 

1114 if hdr not in used_headers: 

1115 self.headers[hdr] = val 

1116 

1117 if hdrs.USER_AGENT not in used_headers: 

1118 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

1119 

1120 def update_cookies(self, cookies: LooseCookies | None) -> None: 

1121 """Update request cookies header.""" 

1122 if not cookies: 

1123 return 

1124 

1125 c = SimpleCookie() 

1126 if hdrs.COOKIE in self.headers: 

1127 # parse_cookie_header for RFC 6265 compliant Cookie header parsing 

1128 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, ""))) 

1129 del self.headers[hdrs.COOKIE] 

1130 

1131 if isinstance(cookies, Mapping): 

1132 iter_cookies = cookies.items() 

1133 else: 

1134 iter_cookies = cookies # type: ignore[assignment] 

1135 for name, value in iter_cookies: 

1136 if isinstance(value, Morsel): 

1137 # Use helper to preserve coded_value exactly as sent by server 

1138 c[name] = preserve_morsel_with_coded_value(value) 

1139 else: 

1140 c[name] = value # type: ignore[assignment] 

1141 

1142 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

1143 

1144 def update_content_encoding(self, data: Any) -> None: 

1145 """Set request content encoding.""" 

1146 if not data: 

1147 # Don't compress an empty body. 

1148 self.compress = None 

1149 return 

1150 

1151 if self.headers.get(hdrs.CONTENT_ENCODING): 

1152 if self.compress: 

1153 raise ValueError( 

1154 "compress can not be set if Content-Encoding header is set" 

1155 ) 

1156 elif self.compress: 

1157 if not isinstance(self.compress, str): 

1158 self.compress = "deflate" 

1159 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

1160 self.chunked = True # enable chunked, no need to deal with length 

1161 

1162 def update_transfer_encoding(self) -> None: 

1163 """Analyze transfer-encoding header.""" 

1164 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

1165 

1166 if "chunked" in te: 

1167 if self.chunked: 

1168 raise ValueError( 

1169 "chunked can not be set " 

1170 'if "Transfer-Encoding: chunked" header is set' 

1171 ) 

1172 

1173 elif self.chunked: 

1174 if hdrs.CONTENT_LENGTH in self.headers: 

1175 raise ValueError( 

1176 "chunked can not be set if Content-Length header is set" 

1177 ) 

1178 

1179 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

1180 

1181 def update_auth(self, auth: BasicAuth | None, trust_env: bool = False) -> None: 

1182 """Set basic auth.""" 

1183 if auth is None: 

1184 auth = self.auth 

1185 if auth is None: 

1186 return 

1187 

1188 if not isinstance(auth, helpers.BasicAuth): 

1189 raise TypeError("BasicAuth() tuple is required instead") 

1190 

1191 self.headers[hdrs.AUTHORIZATION] = auth.encode() 

1192 

1193 def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None: 

1194 """Update request body from data.""" 

1195 if self._body is not None: 

1196 _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel) 

1197 

1198 if body is None: 

1199 self._body = None 

1200 # Set Content-Length to 0 when body is None for methods that expect a body 

1201 if ( 

1202 self.method not in self.GET_METHODS 

1203 and not self.chunked 

1204 and hdrs.CONTENT_LENGTH not in self.headers 

1205 ): 

1206 self.headers[hdrs.CONTENT_LENGTH] = "0" 

1207 return 

1208 

1209 # FormData 

1210 maybe_payload = body() if isinstance(body, FormData) else body 

1211 

1212 try: 

1213 body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None) 

1214 except payload.LookupError: 

1215 body_payload = FormData(maybe_payload)() # type: ignore[arg-type] 

1216 

1217 self._body = body_payload 

1218 # enable chunked encoding if needed 

1219 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers: 

1220 if (size := body_payload.size) is not None: 

1221 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

1222 else: 

1223 self.chunked = True 

1224 

1225 # copy payload headers 

1226 assert body_payload.headers 

1227 headers = self.headers 

1228 skip_headers = self._skip_auto_headers 

1229 for key, value in body_payload.headers.items(): 

1230 if key in headers or (skip_headers is not None and key in skip_headers): 

1231 continue 

1232 headers[key] = value 

1233 

1234 def _update_body(self, body: Any) -> None: 

1235 """Update request body after its already been set.""" 

1236 # Remove existing Content-Length header since body is changing 

1237 if hdrs.CONTENT_LENGTH in self.headers: 

1238 del self.headers[hdrs.CONTENT_LENGTH] 

1239 

1240 # Remove existing Transfer-Encoding header to avoid conflicts 

1241 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers: 

1242 del self.headers[hdrs.TRANSFER_ENCODING] 

1243 

1244 # Now update the body using the existing method 

1245 # Called from _update_body, add 1 to stacklevel from caller 

1246 self.update_body_from_data(body, _stacklevel=4) 

1247 

1248 # Update transfer encoding headers if needed (same logic as __init__) 

1249 if body is not None or self.method not in self.GET_METHODS: 

1250 self.update_transfer_encoding() 

1251 

1252 async def update_body(self, body: Any) -> None: 

1253 """ 

1254 Update request body and close previous payload if needed. 

1255 

1256 This method safely updates the request body by first closing any existing 

1257 payload to prevent resource leaks, then setting the new body. 

1258 

1259 IMPORTANT: Always use this method instead of setting request.body directly. 

1260 Direct assignment to request.body will leak resources if the previous body 

1261 contains file handles, streams, or other resources that need cleanup. 

1262 

1263 Args: 

1264 body: The new body content. Can be: 

1265 - bytes/bytearray: Raw binary data 

1266 - str: Text data (will be encoded using charset from Content-Type) 

1267 - FormData: Form data that will be encoded as multipart/form-data 

1268 - Payload: A pre-configured payload object 

1269 - AsyncIterable: An async iterable of bytes chunks 

1270 - File-like object: Will be read and sent as binary data 

1271 - None: Clears the body 

1272 

1273 Usage: 

1274 # CORRECT: Use update_body 

1275 await request.update_body(b"new request data") 

1276 

1277 # WRONG: Don't set body directly 

1278 # request.body = b"new request data" # This will leak resources! 

1279 

1280 # Update with form data 

1281 form_data = FormData() 

1282 form_data.add_field('field', 'value') 

1283 await request.update_body(form_data) 

1284 

1285 # Clear body 

1286 await request.update_body(None) 

1287 

1288 Note: 

1289 This method is async because it may need to close file handles or 

1290 other resources associated with the previous payload. Always await 

1291 this method to ensure proper cleanup. 

1292 

1293 Warning: 

1294 Setting request.body directly is highly discouraged and can lead to: 

1295 - Resource leaks (unclosed file handles, streams) 

1296 - Memory leaks (unreleased buffers) 

1297 - Unexpected behavior with streaming payloads 

1298 

1299 It is not recommended to change the payload type in middleware. If the 

1300 body was already set (e.g., as bytes), it's best to keep the same type 

1301 rather than converting it (e.g., to str) as this may result in unexpected 

1302 behavior. 

1303 

1304 See Also: 

1305 - update_body_from_data: Synchronous body update without cleanup 

1306 - body property: Direct body access (STRONGLY DISCOURAGED) 

1307 

1308 """ 

1309 # Close existing payload if it exists and needs closing 

1310 if self._body is not None: 

1311 await self._body.close() 

1312 self._update_body(body) 

1313 

1314 def update_expect_continue(self, expect: bool = False) -> None: 

1315 if expect: 

1316 self.headers[hdrs.EXPECT] = "100-continue" 

1317 elif ( 

1318 hdrs.EXPECT in self.headers 

1319 and self.headers[hdrs.EXPECT].lower() == "100-continue" 

1320 ): 

1321 expect = True 

1322 

1323 if expect: 

1324 self._continue = self.loop.create_future() 

1325 

1326 def update_proxy( 

1327 self, 

1328 proxy: URL | None, 

1329 proxy_auth: BasicAuth | None, 

1330 proxy_headers: LooseHeaders | None, 

1331 ) -> None: 

1332 self.proxy = proxy 

1333 if proxy is None: 

1334 self.proxy_auth = None 

1335 self.proxy_headers = None 

1336 return 

1337 

1338 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth): 

1339 raise ValueError("proxy_auth must be None or BasicAuth() tuple") 

1340 self.proxy_auth = proxy_auth 

1341 

1342 if proxy_headers is not None and not isinstance( 

1343 proxy_headers, (MultiDict, MultiDictProxy) 

1344 ): 

1345 proxy_headers = CIMultiDict(proxy_headers) 

1346 self.proxy_headers = proxy_headers 

1347 

1348 async def write_bytes( 

1349 self, 

1350 writer: AbstractStreamWriter, 

1351 conn: "Connection", 

1352 content_length: int | None = None, 

1353 ) -> None: 

1354 """ 

1355 Write the request body to the connection stream. 

1356 

1357 This method handles writing different types of request bodies: 

1358 1. Payload objects (using their specialized write_with_length method) 

1359 2. Bytes/bytearray objects 

1360 3. Iterable body content 

1361 

1362 Args: 

1363 writer: The stream writer to write the body to 

1364 conn: The connection being used for this request 

1365 content_length: Optional maximum number of bytes to write from the body 

1366 (None means write the entire body) 

1367 

1368 The method properly handles: 

1369 - Waiting for 100-Continue responses if required 

1370 - Content length constraints for chunked encoding 

1371 - Error handling for network issues, cancellation, and other exceptions 

1372 - Signaling EOF and timeout management 

1373 

1374 Raises: 

1375 ClientOSError: When there's an OS-level error writing the body 

1376 ClientConnectionError: When there's a general connection error 

1377 asyncio.CancelledError: When the operation is cancelled 

1378 

1379 """ 

1380 # 100 response 

1381 if self._continue is not None: 

1382 # Force headers to be sent before waiting for 100-continue 

1383 writer.send_headers() 

1384 await writer.drain() 

1385 await self._continue 

1386 

1387 protocol = conn.protocol 

1388 assert protocol is not None 

1389 try: 

1390 # This should be a rare case but the 

1391 # self._body can be set to None while 

1392 # the task is being started or we wait above 

1393 # for the 100-continue response. 

1394 # The more likely case is we have an empty 

1395 # payload, but 100-continue is still expected. 

1396 if self._body is not None: 

1397 await self._body.write_with_length(writer, content_length) 

1398 except OSError as underlying_exc: 

1399 reraised_exc = underlying_exc 

1400 

1401 # Distinguish between timeout and other OS errors for better error reporting 

1402 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance( 

1403 underlying_exc, asyncio.TimeoutError 

1404 ) 

1405 if exc_is_not_timeout: 

1406 reraised_exc = ClientOSError( 

1407 underlying_exc.errno, 

1408 f"Can not write request body for {self.url !s}", 

1409 ) 

1410 

1411 set_exception(protocol, reraised_exc, underlying_exc) 

1412 except asyncio.CancelledError: 

1413 # Body hasn't been fully sent, so connection can't be reused 

1414 conn.close() 

1415 raise 

1416 except Exception as underlying_exc: 

1417 set_exception( 

1418 protocol, 

1419 ClientConnectionError( 

1420 "Failed to send bytes into the underlying connection " 

1421 f"{conn !s}: {underlying_exc!r}", 

1422 ), 

1423 underlying_exc, 

1424 ) 

1425 else: 

1426 # Successfully wrote the body, signal EOF and start response timeout 

1427 await writer.write_eof() 

1428 protocol.start_timeout() 

1429 

1430 async def send(self, conn: "Connection") -> "ClientResponse": 

1431 # Specify request target: 

1432 # - CONNECT request must send authority form URI 

1433 # - not CONNECT proxy must send absolute form URI 

1434 # - most common is origin form URI 

1435 if self.method == hdrs.METH_CONNECT: 

1436 connect_host = self.url.host_subcomponent 

1437 assert connect_host is not None 

1438 path = f"{connect_host}:{self.url.port}" 

1439 elif self.proxy and not self.is_ssl(): 

1440 path = str(self.url) 

1441 else: 

1442 path = self.url.raw_path_qs 

1443 

1444 protocol = conn.protocol 

1445 assert protocol is not None 

1446 writer = StreamWriter( 

1447 protocol, 

1448 self.loop, 

1449 on_chunk_sent=( 

1450 functools.partial(self._on_chunk_request_sent, self.method, self.url) 

1451 if self._traces 

1452 else None 

1453 ), 

1454 on_headers_sent=( 

1455 functools.partial(self._on_headers_request_sent, self.method, self.url) 

1456 if self._traces 

1457 else None 

1458 ), 

1459 ) 

1460 

1461 if self.compress: 

1462 writer.enable_compression(self.compress) # type: ignore[arg-type] 

1463 

1464 if self.chunked is not None: 

1465 writer.enable_chunking() 

1466 

1467 # set default content-type 

1468 if ( 

1469 self.method in self.POST_METHODS 

1470 and ( 

1471 self._skip_auto_headers is None 

1472 or hdrs.CONTENT_TYPE not in self._skip_auto_headers 

1473 ) 

1474 and hdrs.CONTENT_TYPE not in self.headers 

1475 ): 

1476 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

1477 

1478 v = self.version 

1479 if hdrs.CONNECTION not in self.headers: 

1480 if conn._connector.force_close: 

1481 if v == HttpVersion11: 

1482 self.headers[hdrs.CONNECTION] = "close" 

1483 elif v == HttpVersion10: 

1484 self.headers[hdrs.CONNECTION] = "keep-alive" 

1485 

1486 # status + headers 

1487 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}" 

1488 

1489 # Buffer headers for potential coalescing with body 

1490 await writer.write_headers(status_line, self.headers) 

1491 

1492 task: asyncio.Task[None] | None 

1493 if self._body or self._continue is not None or protocol.writing_paused: 

1494 coro = self.write_bytes(writer, conn, self._get_content_length()) 

1495 if sys.version_info >= (3, 12): 

1496 # Optimization for Python 3.12, try to write 

1497 # bytes immediately to avoid having to schedule 

1498 # the task on the event loop. 

1499 task = asyncio.Task(coro, loop=self.loop, eager_start=True) 

1500 else: 

1501 task = self.loop.create_task(coro) 

1502 if task.done(): 

1503 task = None 

1504 else: 

1505 self._writer = task 

1506 else: 

1507 # We have nothing to write because 

1508 # - there is no body 

1509 # - the protocol does not have writing paused 

1510 # - we are not waiting for a 100-continue response 

1511 protocol.start_timeout() 

1512 writer.set_eof() 

1513 task = None 

1514 response_class = self.response_class 

1515 assert response_class is not None 

1516 self.response = response_class( 

1517 self.method, 

1518 self.original_url, 

1519 writer=task, 

1520 continue100=self._continue, 

1521 timer=self._timer, 

1522 request_info=self.request_info, 

1523 traces=self._traces, 

1524 loop=self.loop, 

1525 session=self._session, 

1526 stream_writer=writer, 

1527 ) 

1528 return self.response 

1529 

1530 async def close(self) -> None: 

1531 if self.__writer is not None: 

1532 try: 

1533 await self.__writer 

1534 except asyncio.CancelledError: 

1535 if ( 

1536 sys.version_info >= (3, 11) 

1537 and (task := asyncio.current_task()) 

1538 and task.cancelling() 

1539 ): 

1540 raise 

1541 

1542 def terminate(self) -> None: 

1543 if self.__writer is not None: 

1544 if not self.loop.is_closed(): 

1545 self.__writer.cancel() 

1546 self.__writer.remove_done_callback(self.__reset_writer) 

1547 self.__writer = None 

1548 

1549 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

1550 for trace in self._traces: 

1551 await trace.send_request_chunk_sent(method, url, chunk) 

1552 

1553 async def _on_headers_request_sent( 

1554 self, method: str, url: URL, headers: "CIMultiDict[str]" 

1555 ) -> None: 

1556 for trace in self._traces: 

1557 await trace.send_request_headers(method, url, headers)