Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

727 statements  

1import asyncio 

2import codecs 

3import contextlib 

4import functools 

5import io 

6import re 

7import sys 

8import traceback 

9import warnings 

10from collections.abc import Callable, Iterable, Sequence 

11from hashlib import md5, sha1, sha256 

12from http.cookies import BaseCookie, SimpleCookie 

13from types import MappingProxyType, TracebackType 

14from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypedDict 

15 

16from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

17from yarl import URL, Query 

18 

19from . import hdrs, multipart, payload 

20from ._cookie_helpers import ( 

21 parse_cookie_header, 

22 parse_set_cookie_headers, 

23 preserve_morsel_with_coded_value, 

24) 

25from .abc import AbstractStreamWriter 

26from .base_protocol import BaseProtocol 

27from .client_exceptions import ( 

28 ClientConnectionError, 

29 ClientOSError, 

30 ClientResponseError, 

31 ContentTypeError, 

32 InvalidURL, 

33 ServerFingerprintMismatch, 

34) 

35from .compression_utils import HAS_BROTLI, HAS_ZSTD 

36from .formdata import FormData 

37from .helpers import ( 

38 _SENTINEL, 

39 BaseTimerContext, 

40 HeadersDictProxy, 

41 HeadersMixin, 

42 TimerNoop, 

43 encode_basic_auth, 

44 frozen_dataclass_decorator, 

45 is_expected_content_type, 

46 parse_mimetype, 

47 reify, 

48 sentinel, 

49 set_exception, 

50 set_result, 

51) 

52from .http import ( 

53 SERVER_SOFTWARE, 

54 HttpProcessingError, 

55 HttpVersion, 

56 HttpVersion10, 

57 HttpVersion11, 

58 StreamWriter, 

59) 

60from .streams import StreamReader 

61from .typedefs import DEFAULT_JSON_DECODER, JSONDecoder, RawHeaders 

62 

63try: 

64 import ssl 

65 from ssl import SSLContext 

66except ImportError: # pragma: no cover 

67 ssl = None # type: ignore[assignment] 

68 SSLContext = object # type: ignore[misc,assignment] 

69 

70 

71__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

72 

73 

74if TYPE_CHECKING: 

75 from .client import ClientSession 

76 from .connector import Connection 

77 from .tracing import Trace 

78 

79 

80_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 

81_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

82_DIGITS_RE = re.compile(r"\d+", re.ASCII) 

83 

84 

85def _gen_default_accept_encoding() -> str: 

86 encodings = [ 

87 "gzip", 

88 "deflate", 

89 ] 

90 if HAS_BROTLI: 

91 encodings.append("br") 

92 if HAS_ZSTD: 

93 encodings.append("zstd") 

94 return ", ".join(encodings) 

95 

96 

97@frozen_dataclass_decorator 

98class ContentDisposition: 

99 type: str | None 

100 parameters: "MappingProxyType[str, str]" 

101 filename: str | None 

102 

103 

104class _RequestInfo(NamedTuple): 

105 url: URL 

106 method: str 

107 headers: "CIMultiDictProxy[str]" 

108 real_url: URL 

109 

110 

111class RequestInfo(_RequestInfo): 

112 

113 def __new__( 

114 cls, 

115 url: URL, 

116 method: str, 

117 headers: "CIMultiDictProxy[str]", 

118 real_url: URL | _SENTINEL = sentinel, 

119 ) -> "RequestInfo": 

120 """Create a new RequestInfo instance. 

121 

122 For backwards compatibility, the real_url parameter is optional. 

123 """ 

124 return tuple.__new__( 

125 cls, (url, method, headers, url if real_url is sentinel else real_url) 

126 ) 

127 

128 

129class Fingerprint: 

130 HASHFUNC_BY_DIGESTLEN = { 

131 16: md5, 

132 20: sha1, 

133 32: sha256, 

134 } 

135 

136 def __init__(self, fingerprint: bytes) -> None: 

137 digestlen = len(fingerprint) 

138 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

139 if not hashfunc: 

140 raise ValueError("fingerprint has invalid length") 

141 elif hashfunc is md5 or hashfunc is sha1: 

142 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.") 

143 self._hashfunc = hashfunc 

144 self._fingerprint = fingerprint 

145 

146 @property 

147 def fingerprint(self) -> bytes: 

148 return self._fingerprint 

149 

150 def check(self, transport: asyncio.Transport) -> None: 

151 if not transport.get_extra_info("sslcontext"): 

152 return 

153 sslobj = transport.get_extra_info("ssl_object") 

154 cert = sslobj.getpeercert(binary_form=True) 

155 got = self._hashfunc(cert).digest() 

156 if got != self._fingerprint: 

157 host, port, *_ = transport.get_extra_info("peername") 

158 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

159 

160 

161if ssl is not None: 

162 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint) 

163else: # pragma: no cover 

164 SSL_ALLOWED_TYPES = (bool,) # type: ignore[unreachable] 

165 

166 

167_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 

168_SSL_SCHEMES = frozenset(("https", "wss")) 

169 

170 

171# ConnectionKey is a NamedTuple because it is used as a key in a dict 

172# and a set in the connector. Since a NamedTuple is a tuple it uses 

173# the fast native tuple __hash__ and __eq__ implementation in CPython. 

174class ConnectionKey(NamedTuple): 

175 # the key should contain an information about used proxy / TLS 

176 # to prevent reusing wrong connections from a pool 

177 host: str 

178 port: int | None 

179 is_ssl: bool 

180 ssl: SSLContext | bool | Fingerprint 

181 proxy: URL | None 

182 proxy_headers_hash: int | None # hash(CIMultiDict) 

183 server_hostname: str | None = None 

184 

185 

186class ClientResponse(HeadersMixin): 

187 # Some of these attributes are None when created, 

188 # but will be set by the start() method. 

189 # As the end user will likely never see the None values, we cheat the types below. 

190 # from the Status-Line of the response 

191 version: HttpVersion | None = None # HTTP-Version 

192 status: int = None # type: ignore[assignment] # Status-Code 

193 reason: str | None = None # Reason-Phrase 

194 

195 content: StreamReader = None # type: ignore[assignment] # Payload stream 

196 _body: bytes | None = None 

197 _headers: HeadersDictProxy = None # type: ignore[assignment] 

198 _history: tuple["ClientResponse", ...] = () 

199 _raw_headers: RawHeaders = None # type: ignore[assignment] 

200 

201 _connection: "Connection | None" = None # current connection 

202 _cookies: SimpleCookie | None = None 

203 _raw_cookie_headers: tuple[str, ...] | None = None 

204 _continue: asyncio.Future[bool] | None = None 

205 _source_traceback: traceback.StackSummary | None = None 

206 _session: "ClientSession | None" = None 

207 # set up by ClientRequest after ClientResponse object creation 

208 # post-init stage allows to not change ctor signature 

209 _closed = True # to allow __del__ for non-initialized properly response 

210 _released = False 

211 _in_context = False 

212 

213 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8" 

214 

215 __writer: asyncio.Task[None] | None = None 

216 _stream_writer: AbstractStreamWriter | None = None 

217 _output_size: int = 0 

218 _upload_complete: asyncio.Future[None] | None = None 

219 

220 def __init__( 

221 self, 

222 method: str, 

223 url: URL, 

224 *, 

225 writer: asyncio.Task[None] | None, 

226 continue100: asyncio.Future[bool] | None, 

227 timer: BaseTimerContext | None, 

228 traces: Sequence["Trace"], 

229 loop: asyncio.AbstractEventLoop, 

230 session: "ClientSession | None", 

231 request_headers: CIMultiDict[str], 

232 original_url: URL, 

233 stream_writer: AbstractStreamWriter, 

234 **kwargs: object, 

235 ) -> None: 

236 # kwargs exists so authors of subclasses should expect to pass through unknown 

237 # arguments. This allows us to safely add new arguments in future releases. 

238 # But, we should never receive unknown arguments here in the parent class, this 

239 # would indicate an argument has been named wrong or similar in the subclass. 

240 assert not kwargs, "Unexpected arguments to ClientResponse" 

241 # URL forbids subclasses, so a simple type check is enough. 

242 assert type(url) is URL 

243 

244 self.method = method 

245 

246 self._real_url = url 

247 self._url = url.with_fragment(None) if url.raw_fragment else url 

248 if writer is None: # Request already sent 

249 self._output_size = stream_writer.output_size 

250 else: 

251 self._stream_writer = stream_writer 

252 self._writer = writer 

253 if continue100 is not None: 

254 self._continue = continue100 

255 self._request_headers = request_headers 

256 self._original_url = original_url 

257 self._timer = timer if timer is not None else TimerNoop() 

258 self._cache: dict[str, Any] = {} 

259 self._traces = traces 

260 self._loop = loop 

261 # Save reference to _resolve_charset, so that get_encoding() will still 

262 # work after the response has finished reading the body. 

263 if session is not None: 

264 # store a reference to session #1985 

265 self._session = session 

266 self._resolve_charset = session._resolve_charset 

267 if loop.get_debug(): 

268 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

269 

270 def __reset_writer(self, _: object = None) -> None: 

271 self.__writer = None 

272 if self._stream_writer is not None: 

273 self._output_size = self._stream_writer.output_size 

274 self._stream_writer = None 

275 if self._upload_complete is not None and not self._upload_complete.done(): 

276 self._upload_complete.set_result(None) 

277 

278 @property 

279 def _writer(self) -> asyncio.Task[None] | None: 

280 """The writer task for streaming data. 

281 

282 _writer is only provided for backwards compatibility 

283 for subclasses that may need to access it. 

284 """ 

285 return self.__writer 

286 

287 @_writer.setter 

288 def _writer(self, writer: asyncio.Task[None] | None) -> None: 

289 """Set the writer task for streaming data.""" 

290 if self.__writer is not None: 

291 self.__writer.remove_done_callback(self.__reset_writer) 

292 self.__writer = writer 

293 if writer is None: 

294 return 

295 if writer.done(): 

296 # The writer is already done, so we can clear it immediately. 

297 self.__reset_writer() 

298 else: 

299 writer.add_done_callback(self.__reset_writer) 

300 

301 @property 

302 def output_size(self) -> int: 

303 """Number of bytes sent for this request.""" 

304 if self._stream_writer is not None: 

305 return self._stream_writer.output_size 

306 return self._output_size 

307 

308 @property 

309 def upload_complete(self) -> "asyncio.Future[None]": 

310 """Future set when the request body has been fully sent. 

311 

312 Already done when the request had no body or was written eagerly. 

313 """ 

314 if self._upload_complete is None: 

315 self._upload_complete = self._loop.create_future() 

316 if self._stream_writer is None: # upload already finished 

317 self._upload_complete.set_result(None) 

318 return self._upload_complete 

319 

320 @property 

321 def cookies(self) -> SimpleCookie: 

322 if self._cookies is None: 

323 if self._raw_cookie_headers is not None: 

324 # Parse cookies for response.cookies (SimpleCookie for backward compatibility) 

325 cookies = SimpleCookie() 

326 # Use parse_set_cookie_headers for more lenient parsing that handles 

327 # malformed cookies better than SimpleCookie.load 

328 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers)) 

329 self._cookies = cookies 

330 else: 

331 self._cookies = SimpleCookie() 

332 return self._cookies 

333 

334 @cookies.setter 

335 def cookies(self, cookies: SimpleCookie) -> None: 

336 self._cookies = cookies 

337 # Generate raw cookie headers from the SimpleCookie 

338 if cookies: 

339 self._raw_cookie_headers = tuple( 

340 morsel.OutputString() for morsel in cookies.values() 

341 ) 

342 else: 

343 self._raw_cookie_headers = None 

344 

345 @reify 

346 def url(self) -> URL: 

347 return self._url 

348 

349 @reify 

350 def real_url(self) -> URL: 

351 return self._real_url 

352 

353 @reify 

354 def host(self) -> str: 

355 assert self._url.host is not None 

356 return self._url.host 

357 

358 @reify 

359 def headers(self) -> HeadersDictProxy: 

360 return self._headers 

361 

362 @reify 

363 def raw_headers(self) -> RawHeaders: 

364 return self._raw_headers 

365 

366 @reify 

367 def request_info(self) -> RequestInfo: 

368 # Build RequestInfo lazily from components 

369 headers = CIMultiDictProxy(self._request_headers) 

370 return tuple.__new__( 

371 RequestInfo, (self._url, self.method, headers, self._original_url) 

372 ) 

373 

374 @reify 

375 def content_disposition(self) -> ContentDisposition | None: 

376 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

377 if raw is None: 

378 return None 

379 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

380 params = MappingProxyType(params_dct) 

381 filename = multipart.content_disposition_filename(params) 

382 return ContentDisposition(disposition_type, params, filename) 

383 

384 def __del__(self, _warnings: Any = warnings) -> None: 

385 if self._closed: 

386 return 

387 

388 if self._connection is not None: 

389 self._connection.release() 

390 self._cleanup_writer() 

391 

392 if self._loop.get_debug(): 

393 _warnings.warn( 

394 f"Unclosed response {self!r}", ResourceWarning, source=self 

395 ) 

396 context = {"client_response": self, "message": "Unclosed response"} 

397 if self._source_traceback: 

398 context["source_traceback"] = self._source_traceback 

399 self._loop.call_exception_handler(context) 

400 

401 def __repr__(self) -> str: 

402 out = io.StringIO() 

403 ascii_encodable_url = str(self.url) 

404 if self.reason: 

405 ascii_encodable_reason = self.reason.encode( 

406 "ascii", "backslashreplace" 

407 ).decode("ascii") 

408 else: 

409 ascii_encodable_reason = "None" 

410 print( 

411 f"<ClientResponse({ascii_encodable_url}) [{self.status} {ascii_encodable_reason}]>", 

412 file=out, 

413 ) 

414 print(self.headers, file=out) 

415 return out.getvalue() 

416 

417 @property 

418 def connection(self) -> "Connection | None": 

419 return self._connection 

420 

421 @reify 

422 def history(self) -> tuple["ClientResponse", ...]: 

423 """A sequence of responses, if redirects occurred.""" 

424 return self._history 

425 

426 @reify 

427 def links(self) -> "MultiDictProxy[MultiDictProxy[str | URL]]": 

428 links: MultiDict[MultiDictProxy[str | URL]] = MultiDict() 

429 for val in self.headers.getall("link"): 

430 match = re.match(r"\s*<(.*)>(.*)", val) 

431 if match is None: # Malformed link 

432 continue 

433 url, params_str = match.groups() 

434 params = params_str.split(";")[1:] 

435 

436 link: MultiDict[str | URL] = MultiDict() 

437 

438 for param in params: 

439 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

440 if match is None: # Malformed param 

441 continue 

442 key, _, value, _ = match.groups() 

443 

444 link.add(key, value) 

445 

446 key = link.get("rel", url) 

447 

448 link.add("url", self.url.join(URL(url))) 

449 

450 links.add(str(key), MultiDictProxy(link)) 

451 

452 return MultiDictProxy(links) 

453 

454 async def start(self, connection: "Connection") -> "ClientResponse": 

455 """Start response processing.""" 

456 self._closed = False 

457 self._protocol = connection.protocol 

458 self._connection = connection 

459 

460 with self._timer: 

461 while True: 

462 # read response 

463 try: 

464 protocol = self._protocol 

465 message, payload = await protocol.read() # type: ignore[union-attr] 

466 except HttpProcessingError as exc: 

467 raise ClientResponseError( 

468 self.request_info, 

469 self.history, 

470 status=exc.code, 

471 message=exc.message, 

472 headers=exc.headers, 

473 ) from exc 

474 

475 if message.code < 100 or message.code > 199 or message.code == 101: 

476 break 

477 

478 if self._continue is not None: 

479 set_result(self._continue, True) 

480 self._continue = None 

481 

482 # payload eof handler 

483 payload.on_eof(self._response_eof) 

484 

485 # response status 

486 self.version = message.version 

487 self.status = message.code 

488 self.reason = message.reason 

489 

490 # headers 

491 self._headers = message.headers 

492 self._raw_headers = message.raw_headers 

493 

494 # payload 

495 self.content = payload 

496 

497 # cookies 

498 if cookie_hdrs := self.headers._md.getall(hdrs.SET_COOKIE, ()): 

499 # Store raw cookie headers for CookieJar 

500 self._raw_cookie_headers = tuple(cookie_hdrs) 

501 return self 

502 

503 def _response_eof(self) -> None: 

504 if self._closed: 

505 return 

506 

507 # protocol could be None because connection could be detached 

508 protocol = self._connection and self._connection.protocol 

509 if protocol is not None and protocol.upgraded: 

510 return 

511 

512 self._closed = True 

513 self._cleanup_writer() 

514 self._release_connection() 

515 

516 @property 

517 def closed(self) -> bool: 

518 return self._closed 

519 

520 def close(self) -> None: 

521 if not self._released: 

522 self._notify_content() 

523 

524 self._closed = True 

525 if self._loop.is_closed(): 

526 return 

527 

528 self._cleanup_writer() 

529 if self._connection is not None: 

530 self._connection.close() 

531 self._connection = None 

532 

533 def release(self) -> None: 

534 if not self._released: 

535 self._notify_content() 

536 

537 self._closed = True 

538 

539 self._cleanup_writer() 

540 self._release_connection() 

541 

542 @property 

543 def ok(self) -> bool: 

544 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

545 

546 This is **not** a check for ``200 OK`` but a check that the response 

547 status is under 400. 

548 """ 

549 return 400 > self.status 

550 

551 def raise_for_status(self) -> None: 

552 if not self.ok: 

553 # reason should always be not None for a started response 

554 assert self.reason is not None 

555 

556 # If we're in a context we can rely on __aexit__() to release as the 

557 # exception propagates. 

558 if not self._in_context: 

559 self.release() 

560 

561 raise ClientResponseError( 

562 self.request_info, 

563 self.history, 

564 status=self.status, 

565 message=self.reason, 

566 headers=self.headers, 

567 ) 

568 

569 def _release_connection(self) -> None: 

570 if self._connection is not None: 

571 if self.__writer is None: 

572 self._connection.release() 

573 self._connection = None 

574 else: 

575 self.__writer.add_done_callback(lambda f: self._release_connection()) 

576 

577 async def _wait_released(self) -> None: 

578 if self.__writer is not None: 

579 try: 

580 await self.__writer 

581 except asyncio.CancelledError: 

582 if ( 

583 sys.version_info >= (3, 11) 

584 and (task := asyncio.current_task()) 

585 and task.cancelling() 

586 ): 

587 raise 

588 self._release_connection() 

589 

590 def _cleanup_writer(self) -> None: 

591 if self.__writer is not None: 

592 self.__writer.cancel() 

593 if self._stream_writer is not None: 

594 self._output_size = self._stream_writer.output_size 

595 self._stream_writer = None 

596 self._session = None 

597 

598 def _notify_content(self) -> None: 

599 content = self.content 

600 # content can be None here, but the types are cheated elsewhere. 

601 if content and content.exception() is None: # type: ignore[truthy-bool] 

602 set_exception(content, _CONNECTION_CLOSED_EXCEPTION) 

603 self._released = True 

604 

605 async def wait_for_close(self) -> None: 

606 if self.__writer is not None: 

607 try: 

608 await self.__writer 

609 except asyncio.CancelledError: 

610 if ( 

611 sys.version_info >= (3, 11) 

612 and (task := asyncio.current_task()) 

613 and task.cancelling() 

614 ): 

615 raise 

616 self.release() 

617 

618 async def read(self) -> bytes: 

619 """Read response payload.""" 

620 if self._body is None: 

621 try: 

622 self._body = await self.content.read() 

623 for trace in self._traces: 

624 await trace.send_response_chunk_received( 

625 self.method, self.url, self._body 

626 ) 

627 except BaseException: 

628 self.close() 

629 raise 

630 elif self._released: # Response explicitly released 

631 raise ClientConnectionError("Connection closed") 

632 

633 protocol = self._connection and self._connection.protocol 

634 if protocol is None or not protocol.upgraded: 

635 await self._wait_released() # Underlying connection released 

636 return self._body 

637 

638 def get_encoding(self) -> str: 

639 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

640 mimetype = parse_mimetype(ctype) 

641 

642 encoding = mimetype.parameters.get("charset") 

643 if encoding: 

644 with contextlib.suppress(LookupError, ValueError): 

645 return codecs.lookup(encoding).name 

646 

647 if mimetype.type == "application" and ( 

648 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

649 ): 

650 # RFC 7159 states that the default encoding is UTF-8. 

651 # RFC 7483 defines application/rdap+json 

652 return "utf-8" 

653 

654 if self._body is None: 

655 raise RuntimeError( 

656 "Cannot compute fallback encoding of a not yet read body" 

657 ) 

658 

659 return self._resolve_charset(self, self._body) 

660 

661 async def text(self, encoding: str | None = None, errors: str = "strict") -> str: 

662 """Read response payload and decode.""" 

663 await self.read() 

664 

665 if encoding is None: 

666 encoding = self.get_encoding() 

667 

668 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr] 

669 

670 async def json( 

671 self, 

672 *, 

673 encoding: str | None = None, 

674 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

675 content_type: str | None = "application/json", 

676 ) -> Any: 

677 """Read and decodes JSON response.""" 

678 await self.read() 

679 

680 if content_type: 

681 if not is_expected_content_type(self.content_type, content_type): 

682 raise ContentTypeError( 

683 self.request_info, 

684 self.history, 

685 status=self.status, 

686 message=( 

687 "Attempt to decode JSON with " 

688 "unexpected mimetype: %s" % self.content_type 

689 ), 

690 headers=self.headers, 

691 ) 

692 

693 if encoding is None: 

694 encoding = self.get_encoding() 

695 

696 return loads(self._body.decode(encoding)) # type: ignore[union-attr] 

697 

698 async def __aenter__(self) -> "ClientResponse": 

699 self._in_context = True 

700 return self 

701 

702 async def __aexit__( 

703 self, 

704 exc_type: type[BaseException] | None, 

705 exc_val: BaseException | None, 

706 exc_tb: TracebackType | None, 

707 ) -> None: 

708 self._in_context = False 

709 # similar to _RequestContextManager, we do not need to check 

710 # for exceptions, response object can close connection 

711 # if state is broken 

712 self.release() 

713 await self.wait_for_close() 

714 

715 

716class ClientRequestBase: 

717 """An internal class for proxy requests.""" 

718 

719 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

720 

721 proxy: URL | None = None 

722 response_class = ClientResponse 

723 server_hostname: str | None = None # Needed in connector.py 

724 version = HttpVersion11 

725 _response = None 

726 

727 # These class defaults help create_autospec() work correctly. 

728 # If autospec is improved in future, maybe these can be removed. 

729 url = URL() 

730 method = "GET" 

731 

732 _writer_task: asyncio.Task[None] | None = None # async task for streaming data 

733 

734 _skip_auto_headers: "CIMultiDict[None] | None" = None 

735 

736 # N.B. 

737 # Adding __del__ method with self._writer closing doesn't make sense 

738 # because _writer is instance method, thus it keeps a reference to self. 

739 # Until writer has finished finalizer will not be called. 

740 

741 def __init__( 

742 self, 

743 method: str, 

744 url: URL, 

745 *, 

746 headers: CIMultiDict[str], 

747 loop: asyncio.AbstractEventLoop, 

748 ssl: SSLContext | bool | Fingerprint, 

749 trust_env: bool = False, 

750 ): 

751 if match := _CONTAINS_CONTROL_CHAR_RE.search(method): 

752 raise ValueError( 

753 f"Method cannot contain non-token characters {method!r} " 

754 f"(found at least {match.group()!r})" 

755 ) 

756 # URL forbids subclasses, so a simple type check is enough. 

757 assert type(url) is URL, url 

758 self.original_url = url 

759 self.url = url.with_fragment(None) if url.raw_fragment else url 

760 self.method = method.upper() 

761 self.loop = loop 

762 self._ssl = ssl 

763 

764 if loop.get_debug(): 

765 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

766 

767 if not url.raw_host: 

768 raise InvalidURL(url) 

769 self._update_headers(headers) 

770 if url.raw_user or url.raw_password: 

771 self.headers[hdrs.AUTHORIZATION] = encode_basic_auth( 

772 url.user or "", url.password or "" 

773 ) 

774 

775 def _reset_writer(self, _: object = None) -> None: 

776 self._writer_task = None 

777 

778 def _get_content_length(self) -> int | None: 

779 """Extract and validate Content-Length header value. 

780 

781 Returns parsed Content-Length value or None if not set. 

782 Raises ValueError if header exists but cannot be parsed as an integer. 

783 """ 

784 if hdrs.CONTENT_LENGTH not in self.headers: 

785 return None 

786 

787 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH] 

788 if not _DIGITS_RE.fullmatch(content_length_hdr): 

789 raise ValueError(f"Invalid Content-Length header: {content_length_hdr!r}") 

790 return int(content_length_hdr) 

791 

792 @property 

793 def _writer(self) -> asyncio.Task[None] | None: 

794 return self._writer_task 

795 

796 @_writer.setter 

797 def _writer(self, writer: asyncio.Task[None]) -> None: 

798 if self._writer_task is not None: 

799 self._writer_task.remove_done_callback(self._reset_writer) 

800 self._writer_task = writer 

801 writer.add_done_callback(self._reset_writer) 

802 

803 def is_ssl(self) -> bool: 

804 return self.url.scheme in _SSL_SCHEMES 

805 

806 @property 

807 def ssl(self) -> "SSLContext | bool | Fingerprint": 

808 return self._ssl 

809 

810 @property 

811 def connection_key(self) -> ConnectionKey: 

812 url = self.url 

813 return tuple.__new__( 

814 ConnectionKey, 

815 ( 

816 url.raw_host or "", 

817 url.port, 

818 url.scheme in _SSL_SCHEMES, 

819 self._ssl, 

820 None, 

821 None, 

822 self.server_hostname, 

823 ), 

824 ) 

825 

826 def _update_headers(self, headers: CIMultiDict[str]) -> None: 

827 """Update request headers.""" 

828 self.headers: CIMultiDict[str] = CIMultiDict() 

829 

830 # Build the host header 

831 host = self.url.host_port_subcomponent 

832 

833 # host_port_subcomponent is None when the URL is a relative URL. 

834 # but we know we do not have a relative URL here. 

835 assert host is not None 

836 self.headers[hdrs.HOST] = headers.pop(hdrs.HOST, host) 

837 self.headers.extend(headers) 

838 

839 def _create_response( 

840 self, 

841 task: asyncio.Task[None] | None, 

842 stream_writer: AbstractStreamWriter, 

843 ) -> ClientResponse: 

844 return self.response_class( 

845 self.method, 

846 self.original_url, 

847 writer=task, 

848 continue100=None, 

849 timer=TimerNoop(), 

850 traces=(), 

851 loop=self.loop, 

852 session=None, 

853 request_headers=self.headers, 

854 original_url=self.original_url, 

855 stream_writer=stream_writer, 

856 ) 

857 

858 def _create_writer(self, protocol: BaseProtocol) -> StreamWriter: 

859 return StreamWriter(protocol, self.loop) 

860 

861 def _should_write(self, protocol: BaseProtocol) -> bool: 

862 return protocol.writing_paused 

863 

864 async def _send(self, conn: "Connection") -> ClientResponse: 

865 # Specify request target: 

866 # - CONNECT request must send authority form URI 

867 # - not CONNECT proxy must send absolute form URI 

868 # - most common is origin form URI 

869 if self.method == hdrs.METH_CONNECT: 

870 connect_host = self.url.host_subcomponent 

871 assert connect_host is not None 

872 path = f"{connect_host}:{self.url.port}" 

873 elif self.proxy and not self.is_ssl(): 

874 path = str(self.url) 

875 else: 

876 path = self.url.raw_path_qs 

877 

878 protocol = conn.protocol 

879 assert protocol is not None 

880 writer = self._create_writer(protocol) 

881 

882 # set default content-type 

883 if ( 

884 self.method in self.POST_METHODS 

885 and ( 

886 self._skip_auto_headers is None 

887 or hdrs.CONTENT_TYPE not in self._skip_auto_headers 

888 ) 

889 and hdrs.CONTENT_TYPE not in self.headers 

890 ): 

891 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

892 

893 v = self.version 

894 if hdrs.CONNECTION not in self.headers: 

895 if conn._connector.force_close: 

896 if v == HttpVersion11: 

897 self.headers[hdrs.CONNECTION] = "close" 

898 elif v == HttpVersion10: 

899 self.headers[hdrs.CONNECTION] = "keep-alive" 

900 

901 # status + headers 

902 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}" 

903 

904 # Buffer headers for potential coalescing with body 

905 await writer.write_headers(status_line, self.headers) 

906 

907 task: asyncio.Task[None] | None 

908 if self._should_write(protocol): 

909 coro = self._write_bytes(writer, conn, self._get_content_length()) 

910 if sys.version_info >= (3, 12): 

911 # Optimization for Python 3.12, try to write 

912 # bytes immediately to avoid having to schedule 

913 # the task on the event loop. 

914 task = asyncio.Task(coro, loop=self.loop, eager_start=True) 

915 else: 

916 task = self.loop.create_task(coro) 

917 if task.done(): 

918 task = None 

919 else: 

920 self._writer = task 

921 else: 

922 # We have nothing to write because 

923 # - there is no body 

924 # - the protocol does not have writing paused 

925 # - we are not waiting for a 100-continue response 

926 protocol.start_timeout() 

927 writer.set_eof() 

928 task = None 

929 self._response = self._create_response(task, stream_writer=writer) 

930 return self._response 

931 

932 async def _write_bytes( 

933 self, 

934 writer: AbstractStreamWriter, 

935 conn: "Connection", 

936 content_length: int | None, 

937 ) -> None: 

938 # Base class never has a body, this will never be run. 

939 assert False 

940 

941 

942class ClientRequestArgs(TypedDict, total=False): 

943 params: Query 

944 headers: CIMultiDict[str] 

945 skip_auto_headers: Iterable[str] | None 

946 data: Any 

947 cookies: BaseCookie[str] 

948 version: HttpVersion 

949 compress: Literal["deflate", "gzip"] | bool 

950 chunked: bool | None 

951 expect100: bool 

952 loop: asyncio.AbstractEventLoop 

953 response_class: type[ClientResponse] 

954 proxy: URL | None 

955 timer: BaseTimerContext 

956 session: "ClientSession" 

957 ssl: SSLContext | bool | Fingerprint 

958 proxy_headers: CIMultiDict[str] | None 

959 traces: list["Trace"] 

960 trust_env: bool 

961 server_hostname: str | None 

962 

963 

964class ClientRequest(ClientRequestBase): 

965 _EMPTY_BODY = payload.PAYLOAD_REGISTRY.get(b"", disposition=None) 

966 _body = _EMPTY_BODY 

967 _continue = None # waiter future for '100 Continue' response 

968 

969 GET_METHODS = { 

970 hdrs.METH_GET, 

971 hdrs.METH_HEAD, 

972 hdrs.METH_OPTIONS, 

973 hdrs.METH_TRACE, 

974 } 

975 DEFAULT_HEADERS = { 

976 hdrs.ACCEPT: "*/*", 

977 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

978 } 

979 

980 def __init__( 

981 self, 

982 method: str, 

983 url: URL, 

984 *, 

985 params: Query, 

986 headers: CIMultiDict[str], 

987 skip_auto_headers: Iterable[str] | None, 

988 data: Any, 

989 cookies: BaseCookie[str], 

990 version: HttpVersion, 

991 compress: Literal["deflate", "gzip"] | bool, 

992 chunked: bool | None, 

993 expect100: bool, 

994 loop: asyncio.AbstractEventLoop, 

995 response_class: type[ClientResponse], 

996 proxy: URL | None, 

997 timer: BaseTimerContext, 

998 session: "ClientSession", 

999 ssl: SSLContext | bool | Fingerprint, 

1000 proxy_headers: CIMultiDict[str] | None, 

1001 traces: list["Trace"], 

1002 trust_env: bool, 

1003 server_hostname: str | None, 

1004 **kwargs: object, 

1005 ): 

1006 # kwargs exists so authors of subclasses should expect to pass through unknown 

1007 # arguments. This allows us to safely add new arguments in future releases. 

1008 # But, we should never receive unknown arguments here in the parent class, this 

1009 # would indicate an argument has been named wrong or similar in the subclass. 

1010 assert not kwargs, "Unexpected arguments to ClientRequest" 

1011 

1012 if params: 

1013 url = url.extend_query(params) 

1014 super().__init__(method, url, headers=headers, loop=loop, ssl=ssl) 

1015 

1016 if proxy is not None: 

1017 assert type(proxy) is URL, proxy 

1018 self._session = session 

1019 self.chunked = chunked 

1020 self.response_class = response_class 

1021 self._timer = timer 

1022 self.server_hostname = server_hostname 

1023 self.version = version 

1024 

1025 self._update_auto_headers(skip_auto_headers) 

1026 self._update_cookies(cookies) 

1027 self._update_content_encoding(data, compress) 

1028 self._update_proxy(proxy, proxy_headers) 

1029 

1030 self._update_body_from_data(data) 

1031 if data is not None or self.method not in self.GET_METHODS: 

1032 self._update_transfer_encoding() 

1033 self._update_expect_continue(expect100) 

1034 self._traces = traces 

1035 

1036 @property 

1037 def body(self) -> payload.Payload: 

1038 return self._body 

1039 

1040 @property 

1041 def skip_auto_headers(self) -> CIMultiDict[None]: 

1042 return self._skip_auto_headers or CIMultiDict() 

1043 

1044 @property 

1045 def connection_key(self) -> ConnectionKey: 

1046 if proxy_headers := self.proxy_headers: 

1047 h: int | None = hash(tuple(proxy_headers.items())) 

1048 else: 

1049 h = None 

1050 url = self.url 

1051 return tuple.__new__( 

1052 ConnectionKey, 

1053 ( 

1054 url.raw_host or "", 

1055 url.port, 

1056 url.scheme in _SSL_SCHEMES, 

1057 self._ssl, 

1058 self.proxy, 

1059 h, 

1060 self.server_hostname, 

1061 ), 

1062 ) 

1063 

1064 @property 

1065 def session(self) -> "ClientSession": 

1066 """Return the ClientSession instance. 

1067 

1068 This property provides access to the ClientSession that initiated 

1069 this request, allowing middleware to make additional requests 

1070 using the same session. 

1071 """ 

1072 return self._session 

1073 

1074 def _update_auto_headers(self, skip_auto_headers: Iterable[str] | None) -> None: 

1075 if skip_auto_headers is not None: 

1076 self._skip_auto_headers = CIMultiDict( 

1077 (hdr, None) for hdr in sorted(skip_auto_headers) 

1078 ) 

1079 used_headers = self.headers.copy() 

1080 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type] 

1081 else: 

1082 # Fast path when there are no headers to skip 

1083 # which is the most common case. 

1084 used_headers = self.headers 

1085 

1086 for hdr, val in self.DEFAULT_HEADERS.items(): 

1087 if hdr not in used_headers: 

1088 self.headers[hdr] = val 

1089 

1090 if hdrs.USER_AGENT not in used_headers: 

1091 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

1092 

1093 def _update_cookies(self, cookies: BaseCookie[str]) -> None: 

1094 """Update request cookies header.""" 

1095 if not cookies: 

1096 return 

1097 

1098 c = SimpleCookie() 

1099 if hdrs.COOKIE in self.headers: 

1100 # parse_cookie_header for RFC 6265 compliant Cookie header parsing 

1101 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, ""))) 

1102 del self.headers[hdrs.COOKIE] 

1103 

1104 for name, value in cookies.items(): 

1105 # Use helper to preserve coded_value exactly as sent by server 

1106 c[name] = preserve_morsel_with_coded_value(value) 

1107 

1108 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

1109 

1110 def _update_content_encoding( 

1111 self, data: Any, compress: bool | Literal["deflate", "gzip"] 

1112 ) -> None: 

1113 """Set request content encoding.""" 

1114 self.compress = None 

1115 if not data: 

1116 return 

1117 

1118 if self.headers.get(hdrs.CONTENT_ENCODING): 

1119 if compress: 

1120 raise ValueError( 

1121 "compress can not be set if Content-Encoding header is set" 

1122 ) 

1123 elif compress: 

1124 if isinstance(compress, str) and compress not in {"deflate", "gzip"}: 

1125 raise ValueError( 

1126 "compress must be one of True, False, 'deflate', or 'gzip'" 

1127 ) 

1128 self.compress = compress if isinstance(compress, str) else "deflate" 

1129 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

1130 self.chunked = True # enable chunked, no need to deal with length 

1131 

1132 def _update_transfer_encoding(self) -> None: 

1133 """Analyze transfer-encoding header.""" 

1134 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

1135 

1136 if "chunked" in te: 

1137 if self.chunked: 

1138 raise ValueError( 

1139 "chunked can not be set " 

1140 'if "Transfer-Encoding: chunked" header is set' 

1141 ) 

1142 

1143 elif self.chunked: 

1144 if hdrs.CONTENT_LENGTH in self.headers: 

1145 raise ValueError( 

1146 "chunked can not be set if Content-Length header is set" 

1147 ) 

1148 

1149 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

1150 

1151 def _update_body_from_data(self, body: Any) -> None: 

1152 """Update request body from data.""" 

1153 if body is None: 

1154 self._body = self._EMPTY_BODY 

1155 # Set Content-Length to 0 when body is None for methods that expect a body 

1156 if ( 

1157 self.method not in self.GET_METHODS 

1158 and not self.chunked 

1159 and hdrs.CONTENT_LENGTH not in self.headers 

1160 ): 

1161 self.headers[hdrs.CONTENT_LENGTH] = "0" 

1162 return 

1163 

1164 # FormData 

1165 if isinstance(body, FormData): 

1166 body = body() 

1167 else: 

1168 try: 

1169 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None) 

1170 except payload.LookupError: 

1171 boundary = None 

1172 if hdrs.CONTENT_TYPE in self.headers: 

1173 boundary = parse_mimetype( 

1174 self.headers[hdrs.CONTENT_TYPE] 

1175 ).parameters.get("boundary") 

1176 body = FormData(body, boundary=boundary)() 

1177 

1178 self._body = body 

1179 

1180 # enable chunked encoding if needed 

1181 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers: 

1182 if (size := body.size) is not None: 

1183 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

1184 else: 

1185 self.chunked = True 

1186 

1187 # copy payload headers 

1188 assert body.headers 

1189 headers = self.headers 

1190 skip_headers = self._skip_auto_headers 

1191 for key, value in body.headers.items(): 

1192 if key in headers or (skip_headers is not None and key in skip_headers): 

1193 continue 

1194 headers[key] = value 

1195 

1196 def _update_body(self, body: Any) -> None: 

1197 """Update request body after its already been set.""" 

1198 # Remove existing Content-Length header since body is changing 

1199 if hdrs.CONTENT_LENGTH in self.headers: 

1200 del self.headers[hdrs.CONTENT_LENGTH] 

1201 

1202 # Remove existing Transfer-Encoding header to avoid conflicts 

1203 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers: 

1204 del self.headers[hdrs.TRANSFER_ENCODING] 

1205 

1206 # Now update the body using the existing method 

1207 self._update_body_from_data(body) 

1208 

1209 # Update transfer encoding headers if needed (same logic as __init__) 

1210 if body is not None or self.method not in self.GET_METHODS: 

1211 self._update_transfer_encoding() 

1212 

1213 async def update_body(self, body: Any) -> None: 

1214 """ 

1215 Update request body and close previous payload if needed. 

1216 

1217 This method safely updates the request body by first closing any existing 

1218 payload to prevent resource leaks, then setting the new body. 

1219 

1220 IMPORTANT: Always use this method instead of setting request.body directly. 

1221 Direct assignment to request.body will leak resources if the previous body 

1222 contains file handles, streams, or other resources that need cleanup. 

1223 

1224 Args: 

1225 body: The new body content. Can be: 

1226 - bytes/bytearray: Raw binary data 

1227 - str: Text data (will be encoded using charset from Content-Type) 

1228 - FormData: Form data that will be encoded as multipart/form-data 

1229 - Payload: A pre-configured payload object 

1230 - AsyncIterable: An async iterable of bytes chunks 

1231 - File-like object: Will be read and sent as binary data 

1232 - None: Clears the body 

1233 

1234 Usage: 

1235 # CORRECT: Use update_body 

1236 await request.update_body(b"new request data") 

1237 

1238 # WRONG: Don't set body directly 

1239 # request.body = b"new request data" # This will leak resources! 

1240 

1241 # Update with form data 

1242 form_data = FormData() 

1243 form_data.add_field('field', 'value') 

1244 await request.update_body(form_data) 

1245 

1246 # Clear body 

1247 await request.update_body(None) 

1248 

1249 Note: 

1250 This method is async because it may need to close file handles or 

1251 other resources associated with the previous payload. Always await 

1252 this method to ensure proper cleanup. 

1253 

1254 Warning: 

1255 Setting request.body directly is highly discouraged and can lead to: 

1256 - Resource leaks (unclosed file handles, streams) 

1257 - Memory leaks (unreleased buffers) 

1258 - Unexpected behavior with streaming payloads 

1259 

1260 It is not recommended to change the payload type in middleware. If the 

1261 body was already set (e.g., as bytes), it's best to keep the same type 

1262 rather than converting it (e.g., to str) as this may result in unexpected 

1263 behavior. 

1264 

1265 See Also: 

1266 - update_body_from_data: Synchronous body update without cleanup 

1267 - body property: Direct body access (STRONGLY DISCOURAGED) 

1268 

1269 """ 

1270 # Close existing payload if it exists and needs closing 

1271 if self._body is not None: 

1272 await self._body.close() 

1273 self._update_body(body) 

1274 

1275 def _update_expect_continue(self, expect: bool = False) -> None: 

1276 if expect: 

1277 self.headers[hdrs.EXPECT] = "100-continue" 

1278 elif ( 

1279 hdrs.EXPECT in self.headers 

1280 and self.headers[hdrs.EXPECT].lower() == "100-continue" 

1281 ): 

1282 expect = True 

1283 

1284 if expect: 

1285 self._continue = self.loop.create_future() 

1286 

1287 def _update_proxy( 

1288 self, 

1289 proxy: URL | None, 

1290 proxy_headers: CIMultiDict[str] | None, 

1291 ) -> None: 

1292 if proxy is None: 

1293 self.proxy = None 

1294 self.proxy_headers = None 

1295 return 

1296 # URL-embedded credentials on the proxy map to Proxy-Authorization. 

1297 if proxy.raw_user or proxy.raw_password: 

1298 auth_header = encode_basic_auth(proxy.user or "", proxy.password or "") 

1299 if proxy_headers is None: 

1300 proxy_headers = CIMultiDict() 

1301 proxy_headers.setdefault(hdrs.PROXY_AUTHORIZATION, auth_header) 

1302 proxy = proxy.with_user(None) 

1303 self.proxy = proxy 

1304 self.proxy_headers = proxy_headers 

1305 

1306 def _create_response( 

1307 self, 

1308 task: asyncio.Task[None] | None, 

1309 stream_writer: AbstractStreamWriter, 

1310 ) -> ClientResponse: 

1311 return self.response_class( 

1312 self.method, 

1313 self.original_url, 

1314 writer=task, 

1315 continue100=self._continue, 

1316 timer=self._timer, 

1317 traces=self._traces, 

1318 loop=self.loop, 

1319 session=self._session, 

1320 request_headers=self.headers, 

1321 original_url=self.original_url, 

1322 stream_writer=stream_writer, 

1323 ) 

1324 

1325 def _create_writer(self, protocol: BaseProtocol) -> StreamWriter: 

1326 writer = StreamWriter( 

1327 protocol, 

1328 self.loop, 

1329 on_chunk_sent=( 

1330 functools.partial(self._on_chunk_request_sent, self.method, self.url) 

1331 if self._traces 

1332 else None 

1333 ), 

1334 on_headers_sent=( 

1335 functools.partial(self._on_headers_request_sent, self.method, self.url) 

1336 if self._traces 

1337 else None 

1338 ), 

1339 ) 

1340 

1341 if self.compress: 

1342 writer.enable_compression(self.compress) 

1343 

1344 if self.chunked is not None: 

1345 writer.enable_chunking() 

1346 return writer 

1347 

1348 def _should_write(self, protocol: BaseProtocol) -> bool: 

1349 return ( 

1350 self.body.size != 0 or self._continue is not None or protocol.writing_paused 

1351 ) 

1352 

1353 async def _write_bytes( 

1354 self, 

1355 writer: AbstractStreamWriter, 

1356 conn: "Connection", 

1357 content_length: int | None, 

1358 ) -> None: 

1359 """ 

1360 Write the request body to the connection stream. 

1361 

1362 This method handles writing different types of request bodies: 

1363 1. Payload objects (using their specialized write_with_length method) 

1364 2. Bytes/bytearray objects 

1365 3. Iterable body content 

1366 

1367 Args: 

1368 writer: The stream writer to write the body to 

1369 conn: The connection being used for this request 

1370 content_length: Optional maximum number of bytes to write from the body 

1371 (None means write the entire body) 

1372 

1373 The method properly handles: 

1374 - Waiting for 100-Continue responses if required 

1375 - Content length constraints for chunked encoding 

1376 - Error handling for network issues, cancellation, and other exceptions 

1377 - Signaling EOF and timeout management 

1378 

1379 Raises: 

1380 ClientOSError: When there's an OS-level error writing the body 

1381 ClientConnectionError: When there's a general connection error 

1382 asyncio.CancelledError: When the operation is cancelled 

1383 

1384 """ 

1385 # 100 response 

1386 if self._continue is not None: 

1387 # Force headers to be sent before waiting for 100-continue 

1388 writer.send_headers() 

1389 await writer.drain() 

1390 await self._continue 

1391 

1392 protocol = conn.protocol 

1393 assert protocol is not None 

1394 try: 

1395 await self._body.write_with_length(writer, content_length) 

1396 except OSError as underlying_exc: 

1397 reraised_exc = underlying_exc 

1398 

1399 # Distinguish between timeout and other OS errors for better error reporting 

1400 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance( 

1401 underlying_exc, asyncio.TimeoutError 

1402 ) 

1403 if exc_is_not_timeout: 

1404 reraised_exc = ClientOSError( 

1405 underlying_exc.errno, 

1406 f"Can not write request body for {self.url !s}", 

1407 ) 

1408 

1409 set_exception(protocol, reraised_exc, underlying_exc) 

1410 except asyncio.CancelledError: 

1411 # Body hasn't been fully sent, so connection can't be reused 

1412 conn.close() 

1413 raise 

1414 except Exception as underlying_exc: 

1415 set_exception( 

1416 protocol, 

1417 ClientConnectionError( 

1418 "Failed to send bytes into the underlying connection " 

1419 f"{conn !s}: {underlying_exc!r}", 

1420 ), 

1421 underlying_exc, 

1422 ) 

1423 else: 

1424 # Successfully wrote the body, signal EOF and start response timeout 

1425 await writer.write_eof() 

1426 protocol.start_timeout() 

1427 

1428 async def _close(self) -> None: 

1429 if self._writer_task is not None: 

1430 try: 

1431 await self._writer_task 

1432 except asyncio.CancelledError: 

1433 if ( 

1434 sys.version_info >= (3, 11) 

1435 and (task := asyncio.current_task()) 

1436 and task.cancelling() 

1437 ): 

1438 raise 

1439 

1440 def _terminate(self) -> None: 

1441 if self._writer_task is not None: 

1442 if not self.loop.is_closed(): 

1443 self._writer_task.cancel() 

1444 self._writer_task.remove_done_callback(self._reset_writer) 

1445 self._writer_task = None 

1446 

1447 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

1448 for trace in self._traces: 

1449 await trace.send_request_chunk_sent(method, url, chunk) 

1450 

1451 async def _on_headers_request_sent( 

1452 self, method: str, url: URL, headers: "CIMultiDict[str]" 

1453 ) -> None: 

1454 for trace in self._traces: 

1455 await trace.send_request_headers(method, url, headers)