Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

726 statements  

1import asyncio 

2import codecs 

3import contextlib 

4import functools 

5import io 

6import re 

7import sys 

8import traceback 

9import warnings 

10from collections.abc import Callable, Iterable, Sequence 

11from hashlib import md5, sha1, sha256 

12from http.cookies import BaseCookie, SimpleCookie 

13from types import MappingProxyType, TracebackType 

14from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypedDict 

15 

16from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy 

17from yarl import URL, Query 

18 

19from . import hdrs, multipart, payload 

20from ._cookie_helpers import ( 

21 parse_cookie_header, 

22 parse_set_cookie_headers, 

23 preserve_morsel_with_coded_value, 

24) 

25from .abc import AbstractStreamWriter 

26from .base_protocol import BaseProtocol 

27from .client_exceptions import ( 

28 ClientConnectionError, 

29 ClientOSError, 

30 ClientResponseError, 

31 ContentTypeError, 

32 InvalidURL, 

33 ServerFingerprintMismatch, 

34) 

35from .compression_utils import HAS_BROTLI, HAS_ZSTD 

36from .formdata import FormData 

37from .helpers import ( 

38 _SENTINEL, 

39 BaseTimerContext, 

40 HeadersDictProxy, 

41 HeadersMixin, 

42 TimerNoop, 

43 encode_basic_auth, 

44 frozen_dataclass_decorator, 

45 is_expected_content_type, 

46 parse_mimetype, 

47 reify, 

48 sentinel, 

49 set_exception, 

50 set_result, 

51) 

52from .http import ( 

53 SERVER_SOFTWARE, 

54 HttpProcessingError, 

55 HttpVersion, 

56 HttpVersion10, 

57 HttpVersion11, 

58 StreamWriter, 

59) 

60from .streams import StreamReader 

61from .typedefs import DEFAULT_JSON_DECODER, JSONDecoder, RawHeaders 

62 

63try: 

64 import ssl 

65 from ssl import SSLContext 

66except ImportError: # pragma: no cover 

67 ssl = None # type: ignore[assignment] 

68 SSLContext = object # type: ignore[misc,assignment] 

69 

70 

71__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint") 

72 

73 

74if TYPE_CHECKING: 

75 from .client import ClientSession 

76 from .connector import Connection 

77 from .tracing import Trace 

78 

79 

80_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 

81_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

82_DIGITS_RE = re.compile(r"\d+", re.ASCII) 

83 

84 

85def _gen_default_accept_encoding() -> str: 

86 encodings = [ 

87 "gzip", 

88 "deflate", 

89 ] 

90 if HAS_BROTLI: 

91 encodings.append("br") 

92 if HAS_ZSTD: 

93 encodings.append("zstd") 

94 return ", ".join(encodings) 

95 

96 

97@frozen_dataclass_decorator 

98class ContentDisposition: 

99 type: str | None 

100 parameters: "MappingProxyType[str, str]" 

101 filename: str | None 

102 

103 

104class _RequestInfo(NamedTuple): 

105 url: URL 

106 method: str 

107 headers: "CIMultiDictProxy[str]" 

108 real_url: URL 

109 

110 

111class RequestInfo(_RequestInfo): 

112 

113 def __new__( 

114 cls, 

115 url: URL, 

116 method: str, 

117 headers: "CIMultiDictProxy[str]", 

118 real_url: URL | _SENTINEL = sentinel, 

119 ) -> "RequestInfo": 

120 """Create a new RequestInfo instance. 

121 

122 For backwards compatibility, the real_url parameter is optional. 

123 """ 

124 return tuple.__new__( 

125 cls, (url, method, headers, url if real_url is sentinel else real_url) 

126 ) 

127 

128 

129class Fingerprint: 

130 HASHFUNC_BY_DIGESTLEN = { 

131 16: md5, 

132 20: sha1, 

133 32: sha256, 

134 } 

135 

136 def __init__(self, fingerprint: bytes) -> None: 

137 digestlen = len(fingerprint) 

138 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) 

139 if not hashfunc: 

140 raise ValueError("fingerprint has invalid length") 

141 elif hashfunc is md5 or hashfunc is sha1: 

142 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.") 

143 self._hashfunc = hashfunc 

144 self._fingerprint = fingerprint 

145 

146 @property 

147 def fingerprint(self) -> bytes: 

148 return self._fingerprint 

149 

150 def check(self, transport: asyncio.Transport) -> None: 

151 if not transport.get_extra_info("sslcontext"): 

152 return 

153 sslobj = transport.get_extra_info("ssl_object") 

154 cert = sslobj.getpeercert(binary_form=True) 

155 got = self._hashfunc(cert).digest() 

156 if got != self._fingerprint: 

157 host, port, *_ = transport.get_extra_info("peername") 

158 raise ServerFingerprintMismatch(self._fingerprint, got, host, port) 

159 

160 

161if ssl is not None: 

162 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint) 

163else: # pragma: no cover 

164 SSL_ALLOWED_TYPES = (bool,) # type: ignore[unreachable] 

165 

166 

167_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed") 

168_SSL_SCHEMES = frozenset(("https", "wss")) 

169 

170 

171# ConnectionKey is a NamedTuple because it is used as a key in a dict 

172# and a set in the connector. Since a NamedTuple is a tuple it uses 

173# the fast native tuple __hash__ and __eq__ implementation in CPython. 

174class ConnectionKey(NamedTuple): 

175 # the key should contain an information about used proxy / TLS 

176 # to prevent reusing wrong connections from a pool 

177 host: str 

178 port: int | None 

179 is_ssl: bool 

180 ssl: SSLContext | bool | Fingerprint 

181 proxy: URL | None 

182 proxy_headers_hash: int | None # hash(CIMultiDict) 

183 

184 

185class ClientResponse(HeadersMixin): 

186 # Some of these attributes are None when created, 

187 # but will be set by the start() method. 

188 # As the end user will likely never see the None values, we cheat the types below. 

189 # from the Status-Line of the response 

190 version: HttpVersion | None = None # HTTP-Version 

191 status: int = None # type: ignore[assignment] # Status-Code 

192 reason: str | None = None # Reason-Phrase 

193 

194 content: StreamReader = None # type: ignore[assignment] # Payload stream 

195 _body: bytes | None = None 

196 _headers: HeadersDictProxy = None # type: ignore[assignment] 

197 _history: tuple["ClientResponse", ...] = () 

198 _raw_headers: RawHeaders = None # type: ignore[assignment] 

199 

200 _connection: "Connection | None" = None # current connection 

201 _cookies: SimpleCookie | None = None 

202 _raw_cookie_headers: tuple[str, ...] | None = None 

203 _continue: asyncio.Future[bool] | None = None 

204 _source_traceback: traceback.StackSummary | None = None 

205 _session: "ClientSession | None" = None 

206 # set up by ClientRequest after ClientResponse object creation 

207 # post-init stage allows to not change ctor signature 

208 _closed = True # to allow __del__ for non-initialized properly response 

209 _released = False 

210 _in_context = False 

211 

212 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8" 

213 

214 __writer: asyncio.Task[None] | None = None 

215 _stream_writer: AbstractStreamWriter | None = None 

216 _output_size: int = 0 

217 _upload_complete: asyncio.Future[None] | None = None 

218 

219 def __init__( 

220 self, 

221 method: str, 

222 url: URL, 

223 *, 

224 writer: asyncio.Task[None] | None, 

225 continue100: asyncio.Future[bool] | None, 

226 timer: BaseTimerContext | None, 

227 traces: Sequence["Trace"], 

228 loop: asyncio.AbstractEventLoop, 

229 session: "ClientSession | None", 

230 request_headers: CIMultiDict[str], 

231 original_url: URL, 

232 stream_writer: AbstractStreamWriter, 

233 **kwargs: object, 

234 ) -> None: 

235 # kwargs exists so authors of subclasses should expect to pass through unknown 

236 # arguments. This allows us to safely add new arguments in future releases. 

237 # But, we should never receive unknown arguments here in the parent class, this 

238 # would indicate an argument has been named wrong or similar in the subclass. 

239 assert not kwargs, "Unexpected arguments to ClientResponse" 

240 # URL forbids subclasses, so a simple type check is enough. 

241 assert type(url) is URL 

242 

243 self.method = method 

244 

245 self._real_url = url 

246 self._url = url.with_fragment(None) if url.raw_fragment else url 

247 if writer is None: # Request already sent 

248 self._output_size = stream_writer.output_size 

249 else: 

250 self._stream_writer = stream_writer 

251 self._writer = writer 

252 if continue100 is not None: 

253 self._continue = continue100 

254 self._request_headers = request_headers 

255 self._original_url = original_url 

256 self._timer = timer if timer is not None else TimerNoop() 

257 self._cache: dict[str, Any] = {} 

258 self._traces = traces 

259 self._loop = loop 

260 # Save reference to _resolve_charset, so that get_encoding() will still 

261 # work after the response has finished reading the body. 

262 if session is not None: 

263 # store a reference to session #1985 

264 self._session = session 

265 self._resolve_charset = session._resolve_charset 

266 if loop.get_debug(): 

267 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

268 

269 def __reset_writer(self, _: object = None) -> None: 

270 self.__writer = None 

271 if self._stream_writer is not None: 

272 self._output_size = self._stream_writer.output_size 

273 self._stream_writer = None 

274 if self._upload_complete is not None and not self._upload_complete.done(): 

275 self._upload_complete.set_result(None) 

276 

277 @property 

278 def _writer(self) -> asyncio.Task[None] | None: 

279 """The writer task for streaming data. 

280 

281 _writer is only provided for backwards compatibility 

282 for subclasses that may need to access it. 

283 """ 

284 return self.__writer 

285 

286 @_writer.setter 

287 def _writer(self, writer: asyncio.Task[None] | None) -> None: 

288 """Set the writer task for streaming data.""" 

289 if self.__writer is not None: 

290 self.__writer.remove_done_callback(self.__reset_writer) 

291 self.__writer = writer 

292 if writer is None: 

293 return 

294 if writer.done(): 

295 # The writer is already done, so we can clear it immediately. 

296 self.__reset_writer() 

297 else: 

298 writer.add_done_callback(self.__reset_writer) 

299 

300 @property 

301 def output_size(self) -> int: 

302 """Number of bytes sent for this request.""" 

303 if self._stream_writer is not None: 

304 return self._stream_writer.output_size 

305 return self._output_size 

306 

307 @property 

308 def upload_complete(self) -> "asyncio.Future[None]": 

309 """Future set when the request body has been fully sent. 

310 

311 Already done when the request had no body or was written eagerly. 

312 """ 

313 if self._upload_complete is None: 

314 self._upload_complete = self._loop.create_future() 

315 if self._stream_writer is None: # upload already finished 

316 self._upload_complete.set_result(None) 

317 return self._upload_complete 

318 

319 @property 

320 def cookies(self) -> SimpleCookie: 

321 if self._cookies is None: 

322 if self._raw_cookie_headers is not None: 

323 # Parse cookies for response.cookies (SimpleCookie for backward compatibility) 

324 cookies = SimpleCookie() 

325 # Use parse_set_cookie_headers for more lenient parsing that handles 

326 # malformed cookies better than SimpleCookie.load 

327 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers)) 

328 self._cookies = cookies 

329 else: 

330 self._cookies = SimpleCookie() 

331 return self._cookies 

332 

333 @cookies.setter 

334 def cookies(self, cookies: SimpleCookie) -> None: 

335 self._cookies = cookies 

336 # Generate raw cookie headers from the SimpleCookie 

337 if cookies: 

338 self._raw_cookie_headers = tuple( 

339 morsel.OutputString() for morsel in cookies.values() 

340 ) 

341 else: 

342 self._raw_cookie_headers = None 

343 

344 @reify 

345 def url(self) -> URL: 

346 return self._url 

347 

348 @reify 

349 def real_url(self) -> URL: 

350 return self._real_url 

351 

352 @reify 

353 def host(self) -> str: 

354 assert self._url.host is not None 

355 return self._url.host 

356 

357 @reify 

358 def headers(self) -> HeadersDictProxy: 

359 return self._headers 

360 

361 @reify 

362 def raw_headers(self) -> RawHeaders: 

363 return self._raw_headers 

364 

365 @reify 

366 def request_info(self) -> RequestInfo: 

367 # Build RequestInfo lazily from components 

368 headers = CIMultiDictProxy(self._request_headers) 

369 return tuple.__new__( 

370 RequestInfo, (self._url, self.method, headers, self._original_url) 

371 ) 

372 

373 @reify 

374 def content_disposition(self) -> ContentDisposition | None: 

375 raw = self._headers.get(hdrs.CONTENT_DISPOSITION) 

376 if raw is None: 

377 return None 

378 disposition_type, params_dct = multipart.parse_content_disposition(raw) 

379 params = MappingProxyType(params_dct) 

380 filename = multipart.content_disposition_filename(params) 

381 return ContentDisposition(disposition_type, params, filename) 

382 

383 def __del__(self, _warnings: Any = warnings) -> None: 

384 if self._closed: 

385 return 

386 

387 if self._connection is not None: 

388 self._connection.release() 

389 self._cleanup_writer() 

390 

391 if self._loop.get_debug(): 

392 _warnings.warn( 

393 f"Unclosed response {self!r}", ResourceWarning, source=self 

394 ) 

395 context = {"client_response": self, "message": "Unclosed response"} 

396 if self._source_traceback: 

397 context["source_traceback"] = self._source_traceback 

398 self._loop.call_exception_handler(context) 

399 

400 def __repr__(self) -> str: 

401 out = io.StringIO() 

402 ascii_encodable_url = str(self.url) 

403 if self.reason: 

404 ascii_encodable_reason = self.reason.encode( 

405 "ascii", "backslashreplace" 

406 ).decode("ascii") 

407 else: 

408 ascii_encodable_reason = "None" 

409 print( 

410 f"<ClientResponse({ascii_encodable_url}) [{self.status} {ascii_encodable_reason}]>", 

411 file=out, 

412 ) 

413 print(self.headers, file=out) 

414 return out.getvalue() 

415 

416 @property 

417 def connection(self) -> "Connection | None": 

418 return self._connection 

419 

420 @reify 

421 def history(self) -> tuple["ClientResponse", ...]: 

422 """A sequence of responses, if redirects occurred.""" 

423 return self._history 

424 

425 @reify 

426 def links(self) -> "MultiDictProxy[MultiDictProxy[str | URL]]": 

427 links: MultiDict[MultiDictProxy[str | URL]] = MultiDict() 

428 for val in self.headers.getall("link"): 

429 match = re.match(r"\s*<(.*)>(.*)", val) 

430 if match is None: # Malformed link 

431 continue 

432 url, params_str = match.groups() 

433 params = params_str.split(";")[1:] 

434 

435 link: MultiDict[str | URL] = MultiDict() 

436 

437 for param in params: 

438 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) 

439 if match is None: # Malformed param 

440 continue 

441 key, _, value, _ = match.groups() 

442 

443 link.add(key, value) 

444 

445 key = link.get("rel", url) 

446 

447 link.add("url", self.url.join(URL(url))) 

448 

449 links.add(str(key), MultiDictProxy(link)) 

450 

451 return MultiDictProxy(links) 

452 

453 async def start(self, connection: "Connection") -> "ClientResponse": 

454 """Start response processing.""" 

455 self._closed = False 

456 self._protocol = connection.protocol 

457 self._connection = connection 

458 

459 with self._timer: 

460 while True: 

461 # read response 

462 try: 

463 protocol = self._protocol 

464 message, payload = await protocol.read() # type: ignore[union-attr] 

465 except HttpProcessingError as exc: 

466 raise ClientResponseError( 

467 self.request_info, 

468 self.history, 

469 status=exc.code, 

470 message=exc.message, 

471 headers=exc.headers, 

472 ) from exc 

473 

474 if message.code < 100 or message.code > 199 or message.code == 101: 

475 break 

476 

477 if self._continue is not None: 

478 set_result(self._continue, True) 

479 self._continue = None 

480 

481 # payload eof handler 

482 payload.on_eof(self._response_eof) 

483 

484 # response status 

485 self.version = message.version 

486 self.status = message.code 

487 self.reason = message.reason 

488 

489 # headers 

490 self._headers = message.headers 

491 self._raw_headers = message.raw_headers 

492 

493 # payload 

494 self.content = payload 

495 

496 # cookies 

497 if cookie_hdrs := self.headers._md.getall(hdrs.SET_COOKIE, ()): 

498 # Store raw cookie headers for CookieJar 

499 self._raw_cookie_headers = tuple(cookie_hdrs) 

500 return self 

501 

502 def _response_eof(self) -> None: 

503 if self._closed: 

504 return 

505 

506 # protocol could be None because connection could be detached 

507 protocol = self._connection and self._connection.protocol 

508 if protocol is not None and protocol.upgraded: 

509 return 

510 

511 self._closed = True 

512 self._cleanup_writer() 

513 self._release_connection() 

514 

515 @property 

516 def closed(self) -> bool: 

517 return self._closed 

518 

519 def close(self) -> None: 

520 if not self._released: 

521 self._notify_content() 

522 

523 self._closed = True 

524 if self._loop.is_closed(): 

525 return 

526 

527 self._cleanup_writer() 

528 if self._connection is not None: 

529 self._connection.close() 

530 self._connection = None 

531 

532 def release(self) -> None: 

533 if not self._released: 

534 self._notify_content() 

535 

536 self._closed = True 

537 

538 self._cleanup_writer() 

539 self._release_connection() 

540 

541 @property 

542 def ok(self) -> bool: 

543 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not. 

544 

545 This is **not** a check for ``200 OK`` but a check that the response 

546 status is under 400. 

547 """ 

548 return 400 > self.status 

549 

550 def raise_for_status(self) -> None: 

551 if not self.ok: 

552 # reason should always be not None for a started response 

553 assert self.reason is not None 

554 

555 # If we're in a context we can rely on __aexit__() to release as the 

556 # exception propagates. 

557 if not self._in_context: 

558 self.release() 

559 

560 raise ClientResponseError( 

561 self.request_info, 

562 self.history, 

563 status=self.status, 

564 message=self.reason, 

565 headers=self.headers, 

566 ) 

567 

568 def _release_connection(self) -> None: 

569 if self._connection is not None: 

570 if self.__writer is None: 

571 self._connection.release() 

572 self._connection = None 

573 else: 

574 self.__writer.add_done_callback(lambda f: self._release_connection()) 

575 

576 async def _wait_released(self) -> None: 

577 if self.__writer is not None: 

578 try: 

579 await self.__writer 

580 except asyncio.CancelledError: 

581 if ( 

582 sys.version_info >= (3, 11) 

583 and (task := asyncio.current_task()) 

584 and task.cancelling() 

585 ): 

586 raise 

587 self._release_connection() 

588 

589 def _cleanup_writer(self) -> None: 

590 if self.__writer is not None: 

591 self.__writer.cancel() 

592 if self._stream_writer is not None: 

593 self._output_size = self._stream_writer.output_size 

594 self._stream_writer = None 

595 self._session = None 

596 

597 def _notify_content(self) -> None: 

598 content = self.content 

599 # content can be None here, but the types are cheated elsewhere. 

600 if content and content.exception() is None: # type: ignore[truthy-bool] 

601 set_exception(content, _CONNECTION_CLOSED_EXCEPTION) 

602 self._released = True 

603 

604 async def wait_for_close(self) -> None: 

605 if self.__writer is not None: 

606 try: 

607 await self.__writer 

608 except asyncio.CancelledError: 

609 if ( 

610 sys.version_info >= (3, 11) 

611 and (task := asyncio.current_task()) 

612 and task.cancelling() 

613 ): 

614 raise 

615 self.release() 

616 

617 async def read(self) -> bytes: 

618 """Read response payload.""" 

619 if self._body is None: 

620 try: 

621 self._body = await self.content.read() 

622 for trace in self._traces: 

623 await trace.send_response_chunk_received( 

624 self.method, self.url, self._body 

625 ) 

626 except BaseException: 

627 self.close() 

628 raise 

629 elif self._released: # Response explicitly released 

630 raise ClientConnectionError("Connection closed") 

631 

632 protocol = self._connection and self._connection.protocol 

633 if protocol is None or not protocol.upgraded: 

634 await self._wait_released() # Underlying connection released 

635 return self._body 

636 

637 def get_encoding(self) -> str: 

638 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower() 

639 mimetype = parse_mimetype(ctype) 

640 

641 encoding = mimetype.parameters.get("charset") 

642 if encoding: 

643 with contextlib.suppress(LookupError, ValueError): 

644 return codecs.lookup(encoding).name 

645 

646 if mimetype.type == "application" and ( 

647 mimetype.subtype == "json" or mimetype.subtype == "rdap" 

648 ): 

649 # RFC 7159 states that the default encoding is UTF-8. 

650 # RFC 7483 defines application/rdap+json 

651 return "utf-8" 

652 

653 if self._body is None: 

654 raise RuntimeError( 

655 "Cannot compute fallback encoding of a not yet read body" 

656 ) 

657 

658 return self._resolve_charset(self, self._body) 

659 

660 async def text(self, encoding: str | None = None, errors: str = "strict") -> str: 

661 """Read response payload and decode.""" 

662 await self.read() 

663 

664 if encoding is None: 

665 encoding = self.get_encoding() 

666 

667 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr] 

668 

669 async def json( 

670 self, 

671 *, 

672 encoding: str | None = None, 

673 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

674 content_type: str | None = "application/json", 

675 ) -> Any: 

676 """Read and decodes JSON response.""" 

677 await self.read() 

678 

679 if content_type: 

680 if not is_expected_content_type(self.content_type, content_type): 

681 raise ContentTypeError( 

682 self.request_info, 

683 self.history, 

684 status=self.status, 

685 message=( 

686 "Attempt to decode JSON with " 

687 "unexpected mimetype: %s" % self.content_type 

688 ), 

689 headers=self.headers, 

690 ) 

691 

692 if encoding is None: 

693 encoding = self.get_encoding() 

694 

695 return loads(self._body.decode(encoding)) # type: ignore[union-attr] 

696 

697 async def __aenter__(self) -> "ClientResponse": 

698 self._in_context = True 

699 return self 

700 

701 async def __aexit__( 

702 self, 

703 exc_type: type[BaseException] | None, 

704 exc_val: BaseException | None, 

705 exc_tb: TracebackType | None, 

706 ) -> None: 

707 self._in_context = False 

708 # similar to _RequestContextManager, we do not need to check 

709 # for exceptions, response object can close connection 

710 # if state is broken 

711 self.release() 

712 await self.wait_for_close() 

713 

714 

715class ClientRequestBase: 

716 """An internal class for proxy requests.""" 

717 

718 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT} 

719 

720 proxy: URL | None = None 

721 response_class = ClientResponse 

722 server_hostname: str | None = None # Needed in connector.py 

723 version = HttpVersion11 

724 _response = None 

725 

726 # These class defaults help create_autospec() work correctly. 

727 # If autospec is improved in future, maybe these can be removed. 

728 url = URL() 

729 method = "GET" 

730 

731 _writer_task: asyncio.Task[None] | None = None # async task for streaming data 

732 

733 _skip_auto_headers: "CIMultiDict[None] | None" = None 

734 

735 # N.B. 

736 # Adding __del__ method with self._writer closing doesn't make sense 

737 # because _writer is instance method, thus it keeps a reference to self. 

738 # Until writer has finished finalizer will not be called. 

739 

740 def __init__( 

741 self, 

742 method: str, 

743 url: URL, 

744 *, 

745 headers: CIMultiDict[str], 

746 loop: asyncio.AbstractEventLoop, 

747 ssl: SSLContext | bool | Fingerprint, 

748 trust_env: bool = False, 

749 ): 

750 if match := _CONTAINS_CONTROL_CHAR_RE.search(method): 

751 raise ValueError( 

752 f"Method cannot contain non-token characters {method!r} " 

753 f"(found at least {match.group()!r})" 

754 ) 

755 # URL forbids subclasses, so a simple type check is enough. 

756 assert type(url) is URL, url 

757 self.original_url = url 

758 self.url = url.with_fragment(None) if url.raw_fragment else url 

759 self.method = method.upper() 

760 self.loop = loop 

761 self._ssl = ssl 

762 

763 if loop.get_debug(): 

764 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

765 

766 if not url.raw_host: 

767 raise InvalidURL(url) 

768 self._update_headers(headers) 

769 if url.raw_user or url.raw_password: 

770 self.headers[hdrs.AUTHORIZATION] = encode_basic_auth( 

771 url.user or "", url.password or "" 

772 ) 

773 

774 def _reset_writer(self, _: object = None) -> None: 

775 self._writer_task = None 

776 

777 def _get_content_length(self) -> int | None: 

778 """Extract and validate Content-Length header value. 

779 

780 Returns parsed Content-Length value or None if not set. 

781 Raises ValueError if header exists but cannot be parsed as an integer. 

782 """ 

783 if hdrs.CONTENT_LENGTH not in self.headers: 

784 return None 

785 

786 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH] 

787 if not _DIGITS_RE.fullmatch(content_length_hdr): 

788 raise ValueError(f"Invalid Content-Length header: {content_length_hdr!r}") 

789 return int(content_length_hdr) 

790 

791 @property 

792 def _writer(self) -> asyncio.Task[None] | None: 

793 return self._writer_task 

794 

795 @_writer.setter 

796 def _writer(self, writer: asyncio.Task[None]) -> None: 

797 if self._writer_task is not None: 

798 self._writer_task.remove_done_callback(self._reset_writer) 

799 self._writer_task = writer 

800 writer.add_done_callback(self._reset_writer) 

801 

802 def is_ssl(self) -> bool: 

803 return self.url.scheme in _SSL_SCHEMES 

804 

805 @property 

806 def ssl(self) -> "SSLContext | bool | Fingerprint": 

807 return self._ssl 

808 

809 @property 

810 def connection_key(self) -> ConnectionKey: 

811 url = self.url 

812 return tuple.__new__( 

813 ConnectionKey, 

814 ( 

815 url.raw_host or "", 

816 url.port, 

817 url.scheme in _SSL_SCHEMES, 

818 self._ssl, 

819 None, 

820 None, 

821 ), 

822 ) 

823 

824 def _update_headers(self, headers: CIMultiDict[str]) -> None: 

825 """Update request headers.""" 

826 self.headers: CIMultiDict[str] = CIMultiDict() 

827 

828 # Build the host header 

829 host = self.url.host_port_subcomponent 

830 

831 # host_port_subcomponent is None when the URL is a relative URL. 

832 # but we know we do not have a relative URL here. 

833 assert host is not None 

834 self.headers[hdrs.HOST] = headers.pop(hdrs.HOST, host) 

835 self.headers.extend(headers) 

836 

837 def _create_response( 

838 self, 

839 task: asyncio.Task[None] | None, 

840 stream_writer: AbstractStreamWriter, 

841 ) -> ClientResponse: 

842 return self.response_class( 

843 self.method, 

844 self.original_url, 

845 writer=task, 

846 continue100=None, 

847 timer=TimerNoop(), 

848 traces=(), 

849 loop=self.loop, 

850 session=None, 

851 request_headers=self.headers, 

852 original_url=self.original_url, 

853 stream_writer=stream_writer, 

854 ) 

855 

856 def _create_writer(self, protocol: BaseProtocol) -> StreamWriter: 

857 return StreamWriter(protocol, self.loop) 

858 

859 def _should_write(self, protocol: BaseProtocol) -> bool: 

860 return protocol.writing_paused 

861 

862 async def _send(self, conn: "Connection") -> ClientResponse: 

863 # Specify request target: 

864 # - CONNECT request must send authority form URI 

865 # - not CONNECT proxy must send absolute form URI 

866 # - most common is origin form URI 

867 if self.method == hdrs.METH_CONNECT: 

868 connect_host = self.url.host_subcomponent 

869 assert connect_host is not None 

870 path = f"{connect_host}:{self.url.port}" 

871 elif self.proxy and not self.is_ssl(): 

872 path = str(self.url) 

873 else: 

874 path = self.url.raw_path_qs 

875 

876 protocol = conn.protocol 

877 assert protocol is not None 

878 writer = self._create_writer(protocol) 

879 

880 # set default content-type 

881 if ( 

882 self.method in self.POST_METHODS 

883 and ( 

884 self._skip_auto_headers is None 

885 or hdrs.CONTENT_TYPE not in self._skip_auto_headers 

886 ) 

887 and hdrs.CONTENT_TYPE not in self.headers 

888 ): 

889 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream" 

890 

891 v = self.version 

892 if hdrs.CONNECTION not in self.headers: 

893 if conn._connector.force_close: 

894 if v == HttpVersion11: 

895 self.headers[hdrs.CONNECTION] = "close" 

896 elif v == HttpVersion10: 

897 self.headers[hdrs.CONNECTION] = "keep-alive" 

898 

899 # status + headers 

900 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}" 

901 

902 # Buffer headers for potential coalescing with body 

903 await writer.write_headers(status_line, self.headers) 

904 

905 task: asyncio.Task[None] | None 

906 if self._should_write(protocol): 

907 coro = self._write_bytes(writer, conn, self._get_content_length()) 

908 if sys.version_info >= (3, 12): 

909 # Optimization for Python 3.12, try to write 

910 # bytes immediately to avoid having to schedule 

911 # the task on the event loop. 

912 task = asyncio.Task(coro, loop=self.loop, eager_start=True) 

913 else: 

914 task = self.loop.create_task(coro) 

915 if task.done(): 

916 task = None 

917 else: 

918 self._writer = task 

919 else: 

920 # We have nothing to write because 

921 # - there is no body 

922 # - the protocol does not have writing paused 

923 # - we are not waiting for a 100-continue response 

924 protocol.start_timeout() 

925 writer.set_eof() 

926 task = None 

927 self._response = self._create_response(task, stream_writer=writer) 

928 return self._response 

929 

930 async def _write_bytes( 

931 self, 

932 writer: AbstractStreamWriter, 

933 conn: "Connection", 

934 content_length: int | None, 

935 ) -> None: 

936 # Base class never has a body, this will never be run. 

937 assert False 

938 

939 

940class ClientRequestArgs(TypedDict, total=False): 

941 params: Query 

942 headers: CIMultiDict[str] 

943 skip_auto_headers: Iterable[str] | None 

944 data: Any 

945 cookies: BaseCookie[str] 

946 version: HttpVersion 

947 compress: Literal["deflate", "gzip"] | bool 

948 chunked: bool | None 

949 expect100: bool 

950 loop: asyncio.AbstractEventLoop 

951 response_class: type[ClientResponse] 

952 proxy: URL | None 

953 timer: BaseTimerContext 

954 session: "ClientSession" 

955 ssl: SSLContext | bool | Fingerprint 

956 proxy_headers: CIMultiDict[str] | None 

957 traces: list["Trace"] 

958 trust_env: bool 

959 server_hostname: str | None 

960 

961 

962class ClientRequest(ClientRequestBase): 

963 _EMPTY_BODY = payload.PAYLOAD_REGISTRY.get(b"", disposition=None) 

964 _body = _EMPTY_BODY 

965 _continue = None # waiter future for '100 Continue' response 

966 

967 GET_METHODS = { 

968 hdrs.METH_GET, 

969 hdrs.METH_HEAD, 

970 hdrs.METH_OPTIONS, 

971 hdrs.METH_TRACE, 

972 } 

973 DEFAULT_HEADERS = { 

974 hdrs.ACCEPT: "*/*", 

975 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(), 

976 } 

977 

978 def __init__( 

979 self, 

980 method: str, 

981 url: URL, 

982 *, 

983 params: Query, 

984 headers: CIMultiDict[str], 

985 skip_auto_headers: Iterable[str] | None, 

986 data: Any, 

987 cookies: BaseCookie[str], 

988 version: HttpVersion, 

989 compress: Literal["deflate", "gzip"] | bool, 

990 chunked: bool | None, 

991 expect100: bool, 

992 loop: asyncio.AbstractEventLoop, 

993 response_class: type[ClientResponse], 

994 proxy: URL | None, 

995 timer: BaseTimerContext, 

996 session: "ClientSession", 

997 ssl: SSLContext | bool | Fingerprint, 

998 proxy_headers: CIMultiDict[str] | None, 

999 traces: list["Trace"], 

1000 trust_env: bool, 

1001 server_hostname: str | None, 

1002 **kwargs: object, 

1003 ): 

1004 # kwargs exists so authors of subclasses should expect to pass through unknown 

1005 # arguments. This allows us to safely add new arguments in future releases. 

1006 # But, we should never receive unknown arguments here in the parent class, this 

1007 # would indicate an argument has been named wrong or similar in the subclass. 

1008 assert not kwargs, "Unexpected arguments to ClientRequest" 

1009 

1010 if params: 

1011 url = url.extend_query(params) 

1012 super().__init__(method, url, headers=headers, loop=loop, ssl=ssl) 

1013 

1014 if proxy is not None: 

1015 assert type(proxy) is URL, proxy 

1016 self._session = session 

1017 self.chunked = chunked 

1018 self.response_class = response_class 

1019 self._timer = timer 

1020 self.server_hostname = server_hostname 

1021 self.version = version 

1022 

1023 self._update_auto_headers(skip_auto_headers) 

1024 self._update_cookies(cookies) 

1025 self._update_content_encoding(data, compress) 

1026 self._update_proxy(proxy, proxy_headers) 

1027 

1028 self._update_body_from_data(data) 

1029 if data is not None or self.method not in self.GET_METHODS: 

1030 self._update_transfer_encoding() 

1031 self._update_expect_continue(expect100) 

1032 self._traces = traces 

1033 

1034 @property 

1035 def body(self) -> payload.Payload: 

1036 return self._body 

1037 

1038 @property 

1039 def skip_auto_headers(self) -> CIMultiDict[None]: 

1040 return self._skip_auto_headers or CIMultiDict() 

1041 

1042 @property 

1043 def connection_key(self) -> ConnectionKey: 

1044 if proxy_headers := self.proxy_headers: 

1045 h: int | None = hash(tuple(proxy_headers.items())) 

1046 else: 

1047 h = None 

1048 url = self.url 

1049 return tuple.__new__( 

1050 ConnectionKey, 

1051 ( 

1052 url.raw_host or "", 

1053 url.port, 

1054 url.scheme in _SSL_SCHEMES, 

1055 self._ssl, 

1056 self.proxy, 

1057 h, 

1058 ), 

1059 ) 

1060 

1061 @property 

1062 def session(self) -> "ClientSession": 

1063 """Return the ClientSession instance. 

1064 

1065 This property provides access to the ClientSession that initiated 

1066 this request, allowing middleware to make additional requests 

1067 using the same session. 

1068 """ 

1069 return self._session 

1070 

1071 def _update_auto_headers(self, skip_auto_headers: Iterable[str] | None) -> None: 

1072 if skip_auto_headers is not None: 

1073 self._skip_auto_headers = CIMultiDict( 

1074 (hdr, None) for hdr in sorted(skip_auto_headers) 

1075 ) 

1076 used_headers = self.headers.copy() 

1077 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type] 

1078 else: 

1079 # Fast path when there are no headers to skip 

1080 # which is the most common case. 

1081 used_headers = self.headers 

1082 

1083 for hdr, val in self.DEFAULT_HEADERS.items(): 

1084 if hdr not in used_headers: 

1085 self.headers[hdr] = val 

1086 

1087 if hdrs.USER_AGENT not in used_headers: 

1088 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE 

1089 

1090 def _update_cookies(self, cookies: BaseCookie[str]) -> None: 

1091 """Update request cookies header.""" 

1092 if not cookies: 

1093 return 

1094 

1095 c = SimpleCookie() 

1096 if hdrs.COOKIE in self.headers: 

1097 # parse_cookie_header for RFC 6265 compliant Cookie header parsing 

1098 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, ""))) 

1099 del self.headers[hdrs.COOKIE] 

1100 

1101 for name, value in cookies.items(): 

1102 # Use helper to preserve coded_value exactly as sent by server 

1103 c[name] = preserve_morsel_with_coded_value(value) 

1104 

1105 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip() 

1106 

1107 def _update_content_encoding( 

1108 self, data: Any, compress: bool | Literal["deflate", "gzip"] 

1109 ) -> None: 

1110 """Set request content encoding.""" 

1111 self.compress = None 

1112 if not data: 

1113 return 

1114 

1115 if self.headers.get(hdrs.CONTENT_ENCODING): 

1116 if compress: 

1117 raise ValueError( 

1118 "compress can not be set if Content-Encoding header is set" 

1119 ) 

1120 elif compress: 

1121 if isinstance(compress, str) and compress not in {"deflate", "gzip"}: 

1122 raise ValueError( 

1123 "compress must be one of True, False, 'deflate', or 'gzip'" 

1124 ) 

1125 self.compress = compress if isinstance(compress, str) else "deflate" 

1126 self.headers[hdrs.CONTENT_ENCODING] = self.compress 

1127 self.chunked = True # enable chunked, no need to deal with length 

1128 

1129 def _update_transfer_encoding(self) -> None: 

1130 """Analyze transfer-encoding header.""" 

1131 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower() 

1132 

1133 if "chunked" in te: 

1134 if self.chunked: 

1135 raise ValueError( 

1136 "chunked can not be set " 

1137 'if "Transfer-Encoding: chunked" header is set' 

1138 ) 

1139 

1140 elif self.chunked: 

1141 if hdrs.CONTENT_LENGTH in self.headers: 

1142 raise ValueError( 

1143 "chunked can not be set if Content-Length header is set" 

1144 ) 

1145 

1146 self.headers[hdrs.TRANSFER_ENCODING] = "chunked" 

1147 

1148 def _update_body_from_data(self, body: Any) -> None: 

1149 """Update request body from data.""" 

1150 if body is None: 

1151 self._body = self._EMPTY_BODY 

1152 # Set Content-Length to 0 when body is None for methods that expect a body 

1153 if ( 

1154 self.method not in self.GET_METHODS 

1155 and not self.chunked 

1156 and hdrs.CONTENT_LENGTH not in self.headers 

1157 ): 

1158 self.headers[hdrs.CONTENT_LENGTH] = "0" 

1159 return 

1160 

1161 # FormData 

1162 if isinstance(body, FormData): 

1163 body = body() 

1164 else: 

1165 try: 

1166 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None) 

1167 except payload.LookupError: 

1168 boundary = None 

1169 if hdrs.CONTENT_TYPE in self.headers: 

1170 boundary = parse_mimetype( 

1171 self.headers[hdrs.CONTENT_TYPE] 

1172 ).parameters.get("boundary") 

1173 body = FormData(body, boundary=boundary)() 

1174 

1175 self._body = body 

1176 

1177 # enable chunked encoding if needed 

1178 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers: 

1179 if (size := body.size) is not None: 

1180 self.headers[hdrs.CONTENT_LENGTH] = str(size) 

1181 else: 

1182 self.chunked = True 

1183 

1184 # copy payload headers 

1185 assert body.headers 

1186 headers = self.headers 

1187 skip_headers = self._skip_auto_headers 

1188 for key, value in body.headers.items(): 

1189 if key in headers or (skip_headers is not None and key in skip_headers): 

1190 continue 

1191 headers[key] = value 

1192 

1193 def _update_body(self, body: Any) -> None: 

1194 """Update request body after its already been set.""" 

1195 # Remove existing Content-Length header since body is changing 

1196 if hdrs.CONTENT_LENGTH in self.headers: 

1197 del self.headers[hdrs.CONTENT_LENGTH] 

1198 

1199 # Remove existing Transfer-Encoding header to avoid conflicts 

1200 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers: 

1201 del self.headers[hdrs.TRANSFER_ENCODING] 

1202 

1203 # Now update the body using the existing method 

1204 self._update_body_from_data(body) 

1205 

1206 # Update transfer encoding headers if needed (same logic as __init__) 

1207 if body is not None or self.method not in self.GET_METHODS: 

1208 self._update_transfer_encoding() 

1209 

1210 async def update_body(self, body: Any) -> None: 

1211 """ 

1212 Update request body and close previous payload if needed. 

1213 

1214 This method safely updates the request body by first closing any existing 

1215 payload to prevent resource leaks, then setting the new body. 

1216 

1217 IMPORTANT: Always use this method instead of setting request.body directly. 

1218 Direct assignment to request.body will leak resources if the previous body 

1219 contains file handles, streams, or other resources that need cleanup. 

1220 

1221 Args: 

1222 body: The new body content. Can be: 

1223 - bytes/bytearray: Raw binary data 

1224 - str: Text data (will be encoded using charset from Content-Type) 

1225 - FormData: Form data that will be encoded as multipart/form-data 

1226 - Payload: A pre-configured payload object 

1227 - AsyncIterable: An async iterable of bytes chunks 

1228 - File-like object: Will be read and sent as binary data 

1229 - None: Clears the body 

1230 

1231 Usage: 

1232 # CORRECT: Use update_body 

1233 await request.update_body(b"new request data") 

1234 

1235 # WRONG: Don't set body directly 

1236 # request.body = b"new request data" # This will leak resources! 

1237 

1238 # Update with form data 

1239 form_data = FormData() 

1240 form_data.add_field('field', 'value') 

1241 await request.update_body(form_data) 

1242 

1243 # Clear body 

1244 await request.update_body(None) 

1245 

1246 Note: 

1247 This method is async because it may need to close file handles or 

1248 other resources associated with the previous payload. Always await 

1249 this method to ensure proper cleanup. 

1250 

1251 Warning: 

1252 Setting request.body directly is highly discouraged and can lead to: 

1253 - Resource leaks (unclosed file handles, streams) 

1254 - Memory leaks (unreleased buffers) 

1255 - Unexpected behavior with streaming payloads 

1256 

1257 It is not recommended to change the payload type in middleware. If the 

1258 body was already set (e.g., as bytes), it's best to keep the same type 

1259 rather than converting it (e.g., to str) as this may result in unexpected 

1260 behavior. 

1261 

1262 See Also: 

1263 - update_body_from_data: Synchronous body update without cleanup 

1264 - body property: Direct body access (STRONGLY DISCOURAGED) 

1265 

1266 """ 

1267 # Close existing payload if it exists and needs closing 

1268 if self._body is not None: 

1269 await self._body.close() 

1270 self._update_body(body) 

1271 

1272 def _update_expect_continue(self, expect: bool = False) -> None: 

1273 if expect: 

1274 self.headers[hdrs.EXPECT] = "100-continue" 

1275 elif ( 

1276 hdrs.EXPECT in self.headers 

1277 and self.headers[hdrs.EXPECT].lower() == "100-continue" 

1278 ): 

1279 expect = True 

1280 

1281 if expect: 

1282 self._continue = self.loop.create_future() 

1283 

1284 def _update_proxy( 

1285 self, 

1286 proxy: URL | None, 

1287 proxy_headers: CIMultiDict[str] | None, 

1288 ) -> None: 

1289 if proxy is None: 

1290 self.proxy = None 

1291 self.proxy_headers = None 

1292 return 

1293 # URL-embedded credentials on the proxy map to Proxy-Authorization. 

1294 if proxy.raw_user or proxy.raw_password: 

1295 auth_header = encode_basic_auth(proxy.user or "", proxy.password or "") 

1296 if proxy_headers is None: 

1297 proxy_headers = CIMultiDict() 

1298 proxy_headers.setdefault(hdrs.PROXY_AUTHORIZATION, auth_header) 

1299 proxy = proxy.with_user(None) 

1300 self.proxy = proxy 

1301 self.proxy_headers = proxy_headers 

1302 

1303 def _create_response( 

1304 self, 

1305 task: asyncio.Task[None] | None, 

1306 stream_writer: AbstractStreamWriter, 

1307 ) -> ClientResponse: 

1308 return self.response_class( 

1309 self.method, 

1310 self.original_url, 

1311 writer=task, 

1312 continue100=self._continue, 

1313 timer=self._timer, 

1314 traces=self._traces, 

1315 loop=self.loop, 

1316 session=self._session, 

1317 request_headers=self.headers, 

1318 original_url=self.original_url, 

1319 stream_writer=stream_writer, 

1320 ) 

1321 

1322 def _create_writer(self, protocol: BaseProtocol) -> StreamWriter: 

1323 writer = StreamWriter( 

1324 protocol, 

1325 self.loop, 

1326 on_chunk_sent=( 

1327 functools.partial(self._on_chunk_request_sent, self.method, self.url) 

1328 if self._traces 

1329 else None 

1330 ), 

1331 on_headers_sent=( 

1332 functools.partial(self._on_headers_request_sent, self.method, self.url) 

1333 if self._traces 

1334 else None 

1335 ), 

1336 ) 

1337 

1338 if self.compress: 

1339 writer.enable_compression(self.compress) 

1340 

1341 if self.chunked is not None: 

1342 writer.enable_chunking() 

1343 return writer 

1344 

1345 def _should_write(self, protocol: BaseProtocol) -> bool: 

1346 return ( 

1347 self.body.size != 0 or self._continue is not None or protocol.writing_paused 

1348 ) 

1349 

1350 async def _write_bytes( 

1351 self, 

1352 writer: AbstractStreamWriter, 

1353 conn: "Connection", 

1354 content_length: int | None, 

1355 ) -> None: 

1356 """ 

1357 Write the request body to the connection stream. 

1358 

1359 This method handles writing different types of request bodies: 

1360 1. Payload objects (using their specialized write_with_length method) 

1361 2. Bytes/bytearray objects 

1362 3. Iterable body content 

1363 

1364 Args: 

1365 writer: The stream writer to write the body to 

1366 conn: The connection being used for this request 

1367 content_length: Optional maximum number of bytes to write from the body 

1368 (None means write the entire body) 

1369 

1370 The method properly handles: 

1371 - Waiting for 100-Continue responses if required 

1372 - Content length constraints for chunked encoding 

1373 - Error handling for network issues, cancellation, and other exceptions 

1374 - Signaling EOF and timeout management 

1375 

1376 Raises: 

1377 ClientOSError: When there's an OS-level error writing the body 

1378 ClientConnectionError: When there's a general connection error 

1379 asyncio.CancelledError: When the operation is cancelled 

1380 

1381 """ 

1382 # 100 response 

1383 if self._continue is not None: 

1384 # Force headers to be sent before waiting for 100-continue 

1385 writer.send_headers() 

1386 await writer.drain() 

1387 await self._continue 

1388 

1389 protocol = conn.protocol 

1390 assert protocol is not None 

1391 try: 

1392 await self._body.write_with_length(writer, content_length) 

1393 except OSError as underlying_exc: 

1394 reraised_exc = underlying_exc 

1395 

1396 # Distinguish between timeout and other OS errors for better error reporting 

1397 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance( 

1398 underlying_exc, asyncio.TimeoutError 

1399 ) 

1400 if exc_is_not_timeout: 

1401 reraised_exc = ClientOSError( 

1402 underlying_exc.errno, 

1403 f"Can not write request body for {self.url !s}", 

1404 ) 

1405 

1406 set_exception(protocol, reraised_exc, underlying_exc) 

1407 except asyncio.CancelledError: 

1408 # Body hasn't been fully sent, so connection can't be reused 

1409 conn.close() 

1410 raise 

1411 except Exception as underlying_exc: 

1412 set_exception( 

1413 protocol, 

1414 ClientConnectionError( 

1415 "Failed to send bytes into the underlying connection " 

1416 f"{conn !s}: {underlying_exc!r}", 

1417 ), 

1418 underlying_exc, 

1419 ) 

1420 else: 

1421 # Successfully wrote the body, signal EOF and start response timeout 

1422 await writer.write_eof() 

1423 protocol.start_timeout() 

1424 

1425 async def _close(self) -> None: 

1426 if self._writer_task is not None: 

1427 try: 

1428 await self._writer_task 

1429 except asyncio.CancelledError: 

1430 if ( 

1431 sys.version_info >= (3, 11) 

1432 and (task := asyncio.current_task()) 

1433 and task.cancelling() 

1434 ): 

1435 raise 

1436 

1437 def _terminate(self) -> None: 

1438 if self._writer_task is not None: 

1439 if not self.loop.is_closed(): 

1440 self._writer_task.cancel() 

1441 self._writer_task.remove_done_callback(self._reset_writer) 

1442 self._writer_task = None 

1443 

1444 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None: 

1445 for trace in self._traces: 

1446 await trace.send_request_chunk_sent(method, url, chunk) 

1447 

1448 async def _on_headers_request_sent( 

1449 self, method: str, url: URL, headers: "CIMultiDict[str]" 

1450 ) -> None: 

1451 for trace in self._traces: 

1452 await trace.send_request_headers(method, url, headers)