1import asyncio
2import codecs
3import contextlib
4import functools
5import io
6import re
7import sys
8import traceback
9import warnings
10from collections.abc import Mapping
11from hashlib import md5, sha1, sha256
12from http.cookies import Morsel, SimpleCookie
13from types import MappingProxyType, TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Callable,
18 Dict,
19 Iterable,
20 List,
21 Literal,
22 NamedTuple,
23 Optional,
24 Tuple,
25 Type,
26 Union,
27)
28
29import attr
30from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
31from yarl import URL
32
33from . import hdrs, helpers, http, multipart, payload
34from ._cookie_helpers import (
35 parse_cookie_header,
36 parse_set_cookie_headers,
37 preserve_morsel_with_coded_value,
38)
39from .abc import AbstractStreamWriter
40from .client_exceptions import (
41 ClientConnectionError,
42 ClientOSError,
43 ClientResponseError,
44 ContentTypeError,
45 InvalidURL,
46 ServerFingerprintMismatch,
47)
48from .compression_utils import HAS_BROTLI
49from .formdata import FormData
50from .helpers import (
51 _SENTINEL,
52 BaseTimerContext,
53 BasicAuth,
54 HeadersMixin,
55 TimerNoop,
56 basicauth_from_netrc,
57 netrc_from_env,
58 noop,
59 reify,
60 set_exception,
61 set_result,
62)
63from .http import (
64 SERVER_SOFTWARE,
65 HttpVersion,
66 HttpVersion10,
67 HttpVersion11,
68 StreamWriter,
69)
70from .streams import StreamReader
71from .typedefs import (
72 DEFAULT_JSON_DECODER,
73 JSONDecoder,
74 LooseCookies,
75 LooseHeaders,
76 Query,
77 RawHeaders,
78)
79
80if TYPE_CHECKING:
81 import ssl
82 from ssl import SSLContext
83else:
84 try:
85 import ssl
86 from ssl import SSLContext
87 except ImportError: # pragma: no cover
88 ssl = None # type: ignore[assignment]
89 SSLContext = object # type: ignore[misc,assignment]
90
91
92__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
93
94
95if TYPE_CHECKING:
96 from .client import ClientSession
97 from .connector import Connection
98 from .tracing import Trace
99
100
101_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
102_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
103json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json")
104
105
106def _gen_default_accept_encoding() -> str:
107 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate"
108
109
110@attr.s(auto_attribs=True, frozen=True, slots=True)
111class ContentDisposition:
112 type: Optional[str]
113 parameters: "MappingProxyType[str, str]"
114 filename: Optional[str]
115
116
117class _RequestInfo(NamedTuple):
118 url: URL
119 method: str
120 headers: "CIMultiDictProxy[str]"
121 real_url: URL
122
123
124class RequestInfo(_RequestInfo):
125
126 def __new__(
127 cls,
128 url: URL,
129 method: str,
130 headers: "CIMultiDictProxy[str]",
131 real_url: URL = _SENTINEL, # type: ignore[assignment]
132 ) -> "RequestInfo":
133 """Create a new RequestInfo instance.
134
135 For backwards compatibility, the real_url parameter is optional.
136 """
137 return tuple.__new__(
138 cls, (url, method, headers, url if real_url is _SENTINEL else real_url)
139 )
140
141
142class Fingerprint:
143 HASHFUNC_BY_DIGESTLEN = {
144 16: md5,
145 20: sha1,
146 32: sha256,
147 }
148
149 def __init__(self, fingerprint: bytes) -> None:
150 digestlen = len(fingerprint)
151 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
152 if not hashfunc:
153 raise ValueError("fingerprint has invalid length")
154 elif hashfunc is md5 or hashfunc is sha1:
155 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.")
156 self._hashfunc = hashfunc
157 self._fingerprint = fingerprint
158
159 @property
160 def fingerprint(self) -> bytes:
161 return self._fingerprint
162
163 def check(self, transport: asyncio.Transport) -> None:
164 if not transport.get_extra_info("sslcontext"):
165 return
166 sslobj = transport.get_extra_info("ssl_object")
167 cert = sslobj.getpeercert(binary_form=True)
168 got = self._hashfunc(cert).digest()
169 if got != self._fingerprint:
170 host, port, *_ = transport.get_extra_info("peername")
171 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
172
173
174if ssl is not None:
175 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
176else: # pragma: no cover
177 SSL_ALLOWED_TYPES = (bool, type(None))
178
179
180def _merge_ssl_params(
181 ssl: Union["SSLContext", bool, Fingerprint],
182 verify_ssl: Optional[bool],
183 ssl_context: Optional["SSLContext"],
184 fingerprint: Optional[bytes],
185) -> Union["SSLContext", bool, Fingerprint]:
186 if ssl is None:
187 ssl = True # Double check for backwards compatibility
188 if verify_ssl is not None and not verify_ssl:
189 warnings.warn(
190 "verify_ssl is deprecated, use ssl=False instead",
191 DeprecationWarning,
192 stacklevel=3,
193 )
194 if ssl is not True:
195 raise ValueError(
196 "verify_ssl, ssl_context, fingerprint and ssl "
197 "parameters are mutually exclusive"
198 )
199 else:
200 ssl = False
201 if ssl_context is not None:
202 warnings.warn(
203 "ssl_context is deprecated, use ssl=context instead",
204 DeprecationWarning,
205 stacklevel=3,
206 )
207 if ssl is not True:
208 raise ValueError(
209 "verify_ssl, ssl_context, fingerprint and ssl "
210 "parameters are mutually exclusive"
211 )
212 else:
213 ssl = ssl_context
214 if fingerprint is not None:
215 warnings.warn(
216 "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead",
217 DeprecationWarning,
218 stacklevel=3,
219 )
220 if ssl is not True:
221 raise ValueError(
222 "verify_ssl, ssl_context, fingerprint and ssl "
223 "parameters are mutually exclusive"
224 )
225 else:
226 ssl = Fingerprint(fingerprint)
227 if not isinstance(ssl, SSL_ALLOWED_TYPES):
228 raise TypeError(
229 "ssl should be SSLContext, bool, Fingerprint or None, "
230 "got {!r} instead.".format(ssl)
231 )
232 return ssl
233
234
235_SSL_SCHEMES = frozenset(("https", "wss"))
236
237
238# ConnectionKey is a NamedTuple because it is used as a key in a dict
239# and a set in the connector. Since a NamedTuple is a tuple it uses
240# the fast native tuple __hash__ and __eq__ implementation in CPython.
241class ConnectionKey(NamedTuple):
242 # the key should contain an information about used proxy / TLS
243 # to prevent reusing wrong connections from a pool
244 host: str
245 port: Optional[int]
246 is_ssl: bool
247 ssl: Union[SSLContext, bool, Fingerprint]
248 proxy: Optional[URL]
249 proxy_auth: Optional[BasicAuth]
250 proxy_headers_hash: Optional[int] # hash(CIMultiDict)
251
252
253def _is_expected_content_type(
254 response_content_type: str, expected_content_type: str
255) -> bool:
256 if expected_content_type == "application/json":
257 return json_re.match(response_content_type) is not None
258 return expected_content_type in response_content_type
259
260
261def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None:
262 """Warn if the payload is not closed.
263
264 Callers must check that the body is a Payload before calling this method.
265
266 Args:
267 payload: The payload to check
268 stacklevel: Stack level for the warning (default 2 for direct callers)
269 """
270 if not payload.autoclose and not payload.consumed:
271 warnings.warn(
272 "The previous request body contains unclosed resources. "
273 "Use await request.update_body() instead of setting request.body "
274 "directly to properly close resources and avoid leaks.",
275 ResourceWarning,
276 stacklevel=stacklevel,
277 )
278
279
280class ClientResponse(HeadersMixin):
281
282 # Some of these attributes are None when created,
283 # but will be set by the start() method.
284 # As the end user will likely never see the None values, we cheat the types below.
285 # from the Status-Line of the response
286 version: Optional[HttpVersion] = None # HTTP-Version
287 status: int = None # type: ignore[assignment] # Status-Code
288 reason: Optional[str] = None # Reason-Phrase
289
290 content: StreamReader = None # type: ignore[assignment] # Payload stream
291 _body: Optional[bytes] = None
292 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment]
293 _history: Tuple["ClientResponse", ...] = ()
294 _raw_headers: RawHeaders = None # type: ignore[assignment]
295
296 _connection: Optional["Connection"] = None # current connection
297 _cookies: Optional[SimpleCookie] = None
298 _raw_cookie_headers: Optional[Tuple[str, ...]] = None
299 _continue: Optional["asyncio.Future[bool]"] = None
300 _source_traceback: Optional[traceback.StackSummary] = None
301 _session: Optional["ClientSession"] = None
302 # set up by ClientRequest after ClientResponse object creation
303 # post-init stage allows to not change ctor signature
304 _closed = True # to allow __del__ for non-initialized properly response
305 _released = False
306 _in_context = False
307
308 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8"
309
310 __writer: Optional["asyncio.Task[None]"] = None
311
312 def __init__(
313 self,
314 method: str,
315 url: URL,
316 *,
317 writer: "Optional[asyncio.Task[None]]",
318 continue100: Optional["asyncio.Future[bool]"],
319 timer: BaseTimerContext,
320 request_info: RequestInfo,
321 traces: List["Trace"],
322 loop: asyncio.AbstractEventLoop,
323 session: "ClientSession",
324 ) -> None:
325 # URL forbids subclasses, so a simple type check is enough.
326 assert type(url) is URL
327
328 self.method = method
329
330 self._real_url = url
331 self._url = url.with_fragment(None) if url.raw_fragment else url
332 if writer is not None:
333 self._writer = writer
334 if continue100 is not None:
335 self._continue = continue100
336 self._request_info = request_info
337 self._timer = timer if timer is not None else TimerNoop()
338 self._cache: Dict[str, Any] = {}
339 self._traces = traces
340 self._loop = loop
341 # Save reference to _resolve_charset, so that get_encoding() will still
342 # work after the response has finished reading the body.
343 # TODO: Fix session=None in tests (see ClientRequest.__init__).
344 if session is not None:
345 # store a reference to session #1985
346 self._session = session
347 self._resolve_charset = session._resolve_charset
348 if loop.get_debug():
349 self._source_traceback = traceback.extract_stack(sys._getframe(1))
350
351 def __reset_writer(self, _: object = None) -> None:
352 self.__writer = None
353
354 @property
355 def _writer(self) -> Optional["asyncio.Task[None]"]:
356 """The writer task for streaming data.
357
358 _writer is only provided for backwards compatibility
359 for subclasses that may need to access it.
360 """
361 return self.__writer
362
363 @_writer.setter
364 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
365 """Set the writer task for streaming data."""
366 if self.__writer is not None:
367 self.__writer.remove_done_callback(self.__reset_writer)
368 self.__writer = writer
369 if writer is None:
370 return
371 if writer.done():
372 # The writer is already done, so we can clear it immediately.
373 self.__writer = None
374 else:
375 writer.add_done_callback(self.__reset_writer)
376
377 @property
378 def cookies(self) -> SimpleCookie:
379 if self._cookies is None:
380 if self._raw_cookie_headers is not None:
381 # Parse cookies for response.cookies (SimpleCookie for backward compatibility)
382 cookies = SimpleCookie()
383 # Use parse_set_cookie_headers for more lenient parsing that handles
384 # malformed cookies better than SimpleCookie.load
385 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers))
386 self._cookies = cookies
387 else:
388 self._cookies = SimpleCookie()
389 return self._cookies
390
391 @cookies.setter
392 def cookies(self, cookies: SimpleCookie) -> None:
393 self._cookies = cookies
394 # Generate raw cookie headers from the SimpleCookie
395 if cookies:
396 self._raw_cookie_headers = tuple(
397 morsel.OutputString() for morsel in cookies.values()
398 )
399 else:
400 self._raw_cookie_headers = None
401
402 @reify
403 def url(self) -> URL:
404 return self._url
405
406 @reify
407 def url_obj(self) -> URL:
408 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2)
409 return self._url
410
411 @reify
412 def real_url(self) -> URL:
413 return self._real_url
414
415 @reify
416 def host(self) -> str:
417 assert self._url.host is not None
418 return self._url.host
419
420 @reify
421 def headers(self) -> "CIMultiDictProxy[str]":
422 return self._headers
423
424 @reify
425 def raw_headers(self) -> RawHeaders:
426 return self._raw_headers
427
428 @reify
429 def request_info(self) -> RequestInfo:
430 return self._request_info
431
432 @reify
433 def content_disposition(self) -> Optional[ContentDisposition]:
434 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
435 if raw is None:
436 return None
437 disposition_type, params_dct = multipart.parse_content_disposition(raw)
438 params = MappingProxyType(params_dct)
439 filename = multipart.content_disposition_filename(params)
440 return ContentDisposition(disposition_type, params, filename)
441
442 def __del__(self, _warnings: Any = warnings) -> None:
443 if self._closed:
444 return
445
446 if self._connection is not None:
447 self._connection.release()
448 self._cleanup_writer()
449
450 if self._loop.get_debug():
451 kwargs = {"source": self}
452 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs)
453 context = {"client_response": self, "message": "Unclosed response"}
454 if self._source_traceback:
455 context["source_traceback"] = self._source_traceback
456 self._loop.call_exception_handler(context)
457
458 def __repr__(self) -> str:
459 out = io.StringIO()
460 ascii_encodable_url = str(self.url)
461 if self.reason:
462 ascii_encodable_reason = self.reason.encode(
463 "ascii", "backslashreplace"
464 ).decode("ascii")
465 else:
466 ascii_encodable_reason = "None"
467 print(
468 "<ClientResponse({}) [{} {}]>".format(
469 ascii_encodable_url, self.status, ascii_encodable_reason
470 ),
471 file=out,
472 )
473 print(self.headers, file=out)
474 return out.getvalue()
475
476 @property
477 def connection(self) -> Optional["Connection"]:
478 return self._connection
479
480 @reify
481 def history(self) -> Tuple["ClientResponse", ...]:
482 """A sequence of of responses, if redirects occurred."""
483 return self._history
484
485 @reify
486 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
487 links_str = ", ".join(self.headers.getall("link", []))
488
489 if not links_str:
490 return MultiDictProxy(MultiDict())
491
492 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict()
493
494 for val in re.split(r",(?=\s*<)", links_str):
495 match = re.match(r"\s*<(.*)>(.*)", val)
496 if match is None: # pragma: no cover
497 # the check exists to suppress mypy error
498 continue
499 url, params_str = match.groups()
500 params = params_str.split(";")[1:]
501
502 link: MultiDict[Union[str, URL]] = MultiDict()
503
504 for param in params:
505 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
506 if match is None: # pragma: no cover
507 # the check exists to suppress mypy error
508 continue
509 key, _, value, _ = match.groups()
510
511 link.add(key, value)
512
513 key = link.get("rel", url)
514
515 link.add("url", self.url.join(URL(url)))
516
517 links.add(str(key), MultiDictProxy(link))
518
519 return MultiDictProxy(links)
520
521 async def start(self, connection: "Connection") -> "ClientResponse":
522 """Start response processing."""
523 self._closed = False
524 self._protocol = connection.protocol
525 self._connection = connection
526
527 with self._timer:
528 while True:
529 # read response
530 try:
531 protocol = self._protocol
532 message, payload = await protocol.read() # type: ignore[union-attr]
533 except http.HttpProcessingError as exc:
534 raise ClientResponseError(
535 self.request_info,
536 self.history,
537 status=exc.code,
538 message=exc.message,
539 headers=exc.headers,
540 ) from exc
541
542 if message.code < 100 or message.code > 199 or message.code == 101:
543 break
544
545 if self._continue is not None:
546 set_result(self._continue, True)
547 self._continue = None
548
549 # payload eof handler
550 payload.on_eof(self._response_eof)
551
552 # response status
553 self.version = message.version
554 self.status = message.code
555 self.reason = message.reason
556
557 # headers
558 self._headers = message.headers # type is CIMultiDictProxy
559 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
560
561 # payload
562 self.content = payload
563
564 # cookies
565 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()):
566 # Store raw cookie headers for CookieJar
567 self._raw_cookie_headers = tuple(cookie_hdrs)
568 return self
569
570 def _response_eof(self) -> None:
571 if self._closed:
572 return
573
574 # protocol could be None because connection could be detached
575 protocol = self._connection and self._connection.protocol
576 if protocol is not None and protocol.upgraded:
577 return
578
579 self._closed = True
580 self._cleanup_writer()
581 self._release_connection()
582
583 @property
584 def closed(self) -> bool:
585 return self._closed
586
587 def close(self) -> None:
588 if not self._released:
589 self._notify_content()
590
591 self._closed = True
592 if self._loop is None or self._loop.is_closed():
593 return
594
595 self._cleanup_writer()
596 if self._connection is not None:
597 self._connection.close()
598 self._connection = None
599
600 def release(self) -> Any:
601 if not self._released:
602 self._notify_content()
603
604 self._closed = True
605
606 self._cleanup_writer()
607 self._release_connection()
608 return noop()
609
610 @property
611 def ok(self) -> bool:
612 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
613
614 This is **not** a check for ``200 OK`` but a check that the response
615 status is under 400.
616 """
617 return 400 > self.status
618
619 def raise_for_status(self) -> None:
620 if not self.ok:
621 # reason should always be not None for a started response
622 assert self.reason is not None
623
624 # If we're in a context we can rely on __aexit__() to release as the
625 # exception propagates.
626 if not self._in_context:
627 self.release()
628
629 raise ClientResponseError(
630 self.request_info,
631 self.history,
632 status=self.status,
633 message=self.reason,
634 headers=self.headers,
635 )
636
637 def _release_connection(self) -> None:
638 if self._connection is not None:
639 if self.__writer is None:
640 self._connection.release()
641 self._connection = None
642 else:
643 self.__writer.add_done_callback(lambda f: self._release_connection())
644
645 async def _wait_released(self) -> None:
646 if self.__writer is not None:
647 try:
648 await self.__writer
649 except asyncio.CancelledError:
650 if (
651 sys.version_info >= (3, 11)
652 and (task := asyncio.current_task())
653 and task.cancelling()
654 ):
655 raise
656 self._release_connection()
657
658 def _cleanup_writer(self) -> None:
659 if self.__writer is not None:
660 self.__writer.cancel()
661 self._session = None
662
663 def _notify_content(self) -> None:
664 content = self.content
665 if content and content.exception() is None:
666 set_exception(content, _CONNECTION_CLOSED_EXCEPTION)
667 self._released = True
668
669 async def wait_for_close(self) -> None:
670 if self.__writer is not None:
671 try:
672 await self.__writer
673 except asyncio.CancelledError:
674 if (
675 sys.version_info >= (3, 11)
676 and (task := asyncio.current_task())
677 and task.cancelling()
678 ):
679 raise
680 self.release()
681
682 async def read(self) -> bytes:
683 """Read response payload."""
684 if self._body is None:
685 try:
686 self._body = await self.content.read()
687 for trace in self._traces:
688 await trace.send_response_chunk_received(
689 self.method, self.url, self._body
690 )
691 except BaseException:
692 self.close()
693 raise
694 elif self._released: # Response explicitly released
695 raise ClientConnectionError("Connection closed")
696
697 protocol = self._connection and self._connection.protocol
698 if protocol is None or not protocol.upgraded:
699 await self._wait_released() # Underlying connection released
700 return self._body
701
702 def get_encoding(self) -> str:
703 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
704 mimetype = helpers.parse_mimetype(ctype)
705
706 encoding = mimetype.parameters.get("charset")
707 if encoding:
708 with contextlib.suppress(LookupError, ValueError):
709 return codecs.lookup(encoding).name
710
711 if mimetype.type == "application" and (
712 mimetype.subtype == "json" or mimetype.subtype == "rdap"
713 ):
714 # RFC 7159 states that the default encoding is UTF-8.
715 # RFC 7483 defines application/rdap+json
716 return "utf-8"
717
718 if self._body is None:
719 raise RuntimeError(
720 "Cannot compute fallback encoding of a not yet read body"
721 )
722
723 return self._resolve_charset(self, self._body)
724
725 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
726 """Read response payload and decode."""
727 if self._body is None:
728 await self.read()
729
730 if encoding is None:
731 encoding = self.get_encoding()
732
733 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
734
735 async def json(
736 self,
737 *,
738 encoding: Optional[str] = None,
739 loads: JSONDecoder = DEFAULT_JSON_DECODER,
740 content_type: Optional[str] = "application/json",
741 ) -> Any:
742 """Read and decodes JSON response."""
743 if self._body is None:
744 await self.read()
745
746 if content_type:
747 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
748 if not _is_expected_content_type(ctype, content_type):
749 raise ContentTypeError(
750 self.request_info,
751 self.history,
752 status=self.status,
753 message=(
754 "Attempt to decode JSON with unexpected mimetype: %s" % ctype
755 ),
756 headers=self.headers,
757 )
758
759 stripped = self._body.strip() # type: ignore[union-attr]
760 if not stripped:
761 return None
762
763 if encoding is None:
764 encoding = self.get_encoding()
765
766 return loads(stripped.decode(encoding))
767
768 async def __aenter__(self) -> "ClientResponse":
769 self._in_context = True
770 return self
771
772 async def __aexit__(
773 self,
774 exc_type: Optional[Type[BaseException]],
775 exc_val: Optional[BaseException],
776 exc_tb: Optional[TracebackType],
777 ) -> None:
778 self._in_context = False
779 # similar to _RequestContextManager, we do not need to check
780 # for exceptions, response object can close connection
781 # if state is broken
782 self.release()
783 await self.wait_for_close()
784
785
786class ClientRequest:
787 GET_METHODS = {
788 hdrs.METH_GET,
789 hdrs.METH_HEAD,
790 hdrs.METH_OPTIONS,
791 hdrs.METH_TRACE,
792 }
793 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
794 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
795
796 DEFAULT_HEADERS = {
797 hdrs.ACCEPT: "*/*",
798 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
799 }
800
801 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
802 _body: Union[None, payload.Payload] = None
803 auth = None
804 response = None
805
806 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data
807
808 # These class defaults help create_autospec() work correctly.
809 # If autospec is improved in future, maybe these can be removed.
810 url = URL()
811 method = "GET"
812
813 _continue = None # waiter future for '100 Continue' response
814
815 _skip_auto_headers: Optional["CIMultiDict[None]"] = None
816
817 # N.B.
818 # Adding __del__ method with self._writer closing doesn't make sense
819 # because _writer is instance method, thus it keeps a reference to self.
820 # Until writer has finished finalizer will not be called.
821
822 def __init__(
823 self,
824 method: str,
825 url: URL,
826 *,
827 params: Query = None,
828 headers: Optional[LooseHeaders] = None,
829 skip_auto_headers: Optional[Iterable[str]] = None,
830 data: Any = None,
831 cookies: Optional[LooseCookies] = None,
832 auth: Optional[BasicAuth] = None,
833 version: http.HttpVersion = http.HttpVersion11,
834 compress: Union[str, bool, None] = None,
835 chunked: Optional[bool] = None,
836 expect100: bool = False,
837 loop: Optional[asyncio.AbstractEventLoop] = None,
838 response_class: Optional[Type["ClientResponse"]] = None,
839 proxy: Optional[URL] = None,
840 proxy_auth: Optional[BasicAuth] = None,
841 timer: Optional[BaseTimerContext] = None,
842 session: Optional["ClientSession"] = None,
843 ssl: Union[SSLContext, bool, Fingerprint] = True,
844 proxy_headers: Optional[LooseHeaders] = None,
845 traces: Optional[List["Trace"]] = None,
846 trust_env: bool = False,
847 server_hostname: Optional[str] = None,
848 ):
849 if loop is None:
850 loop = asyncio.get_event_loop()
851 if match := _CONTAINS_CONTROL_CHAR_RE.search(method):
852 raise ValueError(
853 f"Method cannot contain non-token characters {method!r} "
854 f"(found at least {match.group()!r})"
855 )
856 # URL forbids subclasses, so a simple type check is enough.
857 assert type(url) is URL, url
858 if proxy is not None:
859 assert type(proxy) is URL, proxy
860 # FIXME: session is None in tests only, need to fix tests
861 # assert session is not None
862 if TYPE_CHECKING:
863 assert session is not None
864 self._session = session
865 if params:
866 url = url.extend_query(params)
867 self.original_url = url
868 self.url = url.with_fragment(None) if url.raw_fragment else url
869 self.method = method.upper()
870 self.chunked = chunked
871 self.compress = compress
872 self.loop = loop
873 self.length = None
874 if response_class is None:
875 real_response_class = ClientResponse
876 else:
877 real_response_class = response_class
878 self.response_class: Type[ClientResponse] = real_response_class
879 self._timer = timer if timer is not None else TimerNoop()
880 self._ssl = ssl if ssl is not None else True
881 self.server_hostname = server_hostname
882
883 if loop.get_debug():
884 self._source_traceback = traceback.extract_stack(sys._getframe(1))
885
886 self.update_version(version)
887 self.update_host(url)
888 self.update_headers(headers)
889 self.update_auto_headers(skip_auto_headers)
890 self.update_cookies(cookies)
891 self.update_content_encoding(data)
892 self.update_auth(auth, trust_env)
893 self.update_proxy(proxy, proxy_auth, proxy_headers)
894
895 self.update_body_from_data(data)
896 if data is not None or self.method not in self.GET_METHODS:
897 self.update_transfer_encoding()
898 self.update_expect_continue(expect100)
899 self._traces = [] if traces is None else traces
900
901 def __reset_writer(self, _: object = None) -> None:
902 self.__writer = None
903
904 def _get_content_length(self) -> Optional[int]:
905 """Extract and validate Content-Length header value.
906
907 Returns parsed Content-Length value or None if not set.
908 Raises ValueError if header exists but cannot be parsed as an integer.
909 """
910 if hdrs.CONTENT_LENGTH not in self.headers:
911 return None
912
913 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
914 try:
915 return int(content_length_hdr)
916 except ValueError:
917 raise ValueError(
918 f"Invalid Content-Length header: {content_length_hdr}"
919 ) from None
920
921 @property
922 def skip_auto_headers(self) -> CIMultiDict[None]:
923 return self._skip_auto_headers or CIMultiDict()
924
925 @property
926 def _writer(self) -> Optional["asyncio.Task[None]"]:
927 return self.__writer
928
929 @_writer.setter
930 def _writer(self, writer: "asyncio.Task[None]") -> None:
931 if self.__writer is not None:
932 self.__writer.remove_done_callback(self.__reset_writer)
933 self.__writer = writer
934 writer.add_done_callback(self.__reset_writer)
935
936 def is_ssl(self) -> bool:
937 return self.url.scheme in _SSL_SCHEMES
938
939 @property
940 def ssl(self) -> Union["SSLContext", bool, Fingerprint]:
941 return self._ssl
942
943 @property
944 def connection_key(self) -> ConnectionKey:
945 if proxy_headers := self.proxy_headers:
946 h: Optional[int] = hash(tuple(proxy_headers.items()))
947 else:
948 h = None
949 url = self.url
950 return tuple.__new__(
951 ConnectionKey,
952 (
953 url.raw_host or "",
954 url.port,
955 url.scheme in _SSL_SCHEMES,
956 self._ssl,
957 self.proxy,
958 self.proxy_auth,
959 h,
960 ),
961 )
962
963 @property
964 def host(self) -> str:
965 ret = self.url.raw_host
966 assert ret is not None
967 return ret
968
969 @property
970 def port(self) -> Optional[int]:
971 return self.url.port
972
973 @property
974 def body(self) -> Union[payload.Payload, Literal[b""]]:
975 """Request body."""
976 # empty body is represented as bytes for backwards compatibility
977 return self._body or b""
978
979 @body.setter
980 def body(self, value: Any) -> None:
981 """Set request body with warning for non-autoclose payloads.
982
983 WARNING: This setter must be called from within an event loop and is not
984 thread-safe. Setting body outside of an event loop may raise RuntimeError
985 when closing file-based payloads.
986
987 DEPRECATED: Direct assignment to body is deprecated and will be removed
988 in a future version. Use await update_body() instead for proper resource
989 management.
990 """
991 # Close existing payload if present
992 if self._body is not None:
993 # Warn if the payload needs manual closing
994 # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload
995 _warn_if_unclosed_payload(self._body, stacklevel=3)
996 # NOTE: In the future, when we remove sync close support,
997 # this setter will need to be removed and only the async
998 # update_body() method will be available. For now, we call
999 # _close() for backwards compatibility.
1000 self._body._close()
1001 self._update_body(value)
1002
1003 @property
1004 def request_info(self) -> RequestInfo:
1005 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
1006 # These are created on every request, so we use a NamedTuple
1007 # for performance reasons. We don't use the RequestInfo.__new__
1008 # method because it has a different signature which is provided
1009 # for backwards compatibility only.
1010 return tuple.__new__(
1011 RequestInfo, (self.url, self.method, headers, self.original_url)
1012 )
1013
1014 @property
1015 def session(self) -> "ClientSession":
1016 """Return the ClientSession instance.
1017
1018 This property provides access to the ClientSession that initiated
1019 this request, allowing middleware to make additional requests
1020 using the same session.
1021 """
1022 return self._session
1023
1024 def update_host(self, url: URL) -> None:
1025 """Update destination host, port and connection type (ssl)."""
1026 # get host/port
1027 if not url.raw_host:
1028 raise InvalidURL(url)
1029
1030 # basic auth info
1031 if url.raw_user or url.raw_password:
1032 self.auth = helpers.BasicAuth(url.user or "", url.password or "")
1033
1034 def update_version(self, version: Union[http.HttpVersion, str]) -> None:
1035 """Convert request version to two elements tuple.
1036
1037 parser HTTP version '1.1' => (1, 1)
1038 """
1039 if isinstance(version, str):
1040 v = [part.strip() for part in version.split(".", 1)]
1041 try:
1042 version = http.HttpVersion(int(v[0]), int(v[1]))
1043 except ValueError:
1044 raise ValueError(
1045 f"Can not parse http version number: {version}"
1046 ) from None
1047 self.version = version
1048
1049 def update_headers(self, headers: Optional[LooseHeaders]) -> None:
1050 """Update request headers."""
1051 self.headers: CIMultiDict[str] = CIMultiDict()
1052
1053 # Build the host header
1054 host = self.url.host_port_subcomponent
1055
1056 # host_port_subcomponent is None when the URL is a relative URL.
1057 # but we know we do not have a relative URL here.
1058 assert host is not None
1059 self.headers[hdrs.HOST] = host
1060
1061 if not headers:
1062 return
1063
1064 if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
1065 headers = headers.items()
1066
1067 for key, value in headers: # type: ignore[misc]
1068 # A special case for Host header
1069 if key in hdrs.HOST_ALL:
1070 self.headers[key] = value
1071 else:
1072 self.headers.add(key, value)
1073
1074 def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None:
1075 if skip_auto_headers is not None:
1076 self._skip_auto_headers = CIMultiDict(
1077 (hdr, None) for hdr in sorted(skip_auto_headers)
1078 )
1079 used_headers = self.headers.copy()
1080 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type]
1081 else:
1082 # Fast path when there are no headers to skip
1083 # which is the most common case.
1084 used_headers = self.headers
1085
1086 for hdr, val in self.DEFAULT_HEADERS.items():
1087 if hdr not in used_headers:
1088 self.headers[hdr] = val
1089
1090 if hdrs.USER_AGENT not in used_headers:
1091 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
1092
1093 def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
1094 """Update request cookies header."""
1095 if not cookies:
1096 return
1097
1098 c = SimpleCookie()
1099 if hdrs.COOKIE in self.headers:
1100 # parse_cookie_header for RFC 6265 compliant Cookie header parsing
1101 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, "")))
1102 del self.headers[hdrs.COOKIE]
1103
1104 if isinstance(cookies, Mapping):
1105 iter_cookies = cookies.items()
1106 else:
1107 iter_cookies = cookies # type: ignore[assignment]
1108 for name, value in iter_cookies:
1109 if isinstance(value, Morsel):
1110 # Use helper to preserve coded_value exactly as sent by server
1111 c[name] = preserve_morsel_with_coded_value(value)
1112 else:
1113 c[name] = value # type: ignore[assignment]
1114
1115 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
1116
1117 def update_content_encoding(self, data: Any) -> None:
1118 """Set request content encoding."""
1119 if not data:
1120 # Don't compress an empty body.
1121 self.compress = None
1122 return
1123
1124 if self.headers.get(hdrs.CONTENT_ENCODING):
1125 if self.compress:
1126 raise ValueError(
1127 "compress can not be set if Content-Encoding header is set"
1128 )
1129 elif self.compress:
1130 if not isinstance(self.compress, str):
1131 self.compress = "deflate"
1132 self.headers[hdrs.CONTENT_ENCODING] = self.compress
1133 self.chunked = True # enable chunked, no need to deal with length
1134
1135 def update_transfer_encoding(self) -> None:
1136 """Analyze transfer-encoding header."""
1137 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
1138
1139 if "chunked" in te:
1140 if self.chunked:
1141 raise ValueError(
1142 "chunked can not be set "
1143 'if "Transfer-Encoding: chunked" header is set'
1144 )
1145
1146 elif self.chunked:
1147 if hdrs.CONTENT_LENGTH in self.headers:
1148 raise ValueError(
1149 "chunked can not be set if Content-Length header is set"
1150 )
1151
1152 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
1153
1154 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
1155 """Set basic auth."""
1156 if auth is None:
1157 auth = self.auth
1158 if auth is None and trust_env and self.url.host is not None:
1159 netrc_obj = netrc_from_env()
1160 with contextlib.suppress(LookupError):
1161 auth = basicauth_from_netrc(netrc_obj, self.url.host)
1162 if auth is None:
1163 return
1164
1165 if not isinstance(auth, helpers.BasicAuth):
1166 raise TypeError("BasicAuth() tuple is required instead")
1167
1168 self.headers[hdrs.AUTHORIZATION] = auth.encode()
1169
1170 def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None:
1171 """Update request body from data."""
1172 if self._body is not None:
1173 _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel)
1174
1175 if body is None:
1176 self._body = None
1177 # Set Content-Length to 0 when body is None for methods that expect a body
1178 if (
1179 self.method not in self.GET_METHODS
1180 and not self.chunked
1181 and hdrs.CONTENT_LENGTH not in self.headers
1182 ):
1183 self.headers[hdrs.CONTENT_LENGTH] = "0"
1184 return
1185
1186 # FormData
1187 maybe_payload = body() if isinstance(body, FormData) else body
1188
1189 try:
1190 body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None)
1191 except payload.LookupError:
1192 body_payload = FormData(maybe_payload)() # type: ignore[arg-type]
1193
1194 self._body = body_payload
1195 # enable chunked encoding if needed
1196 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
1197 if (size := body_payload.size) is not None:
1198 self.headers[hdrs.CONTENT_LENGTH] = str(size)
1199 else:
1200 self.chunked = True
1201
1202 # copy payload headers
1203 assert body_payload.headers
1204 headers = self.headers
1205 skip_headers = self._skip_auto_headers
1206 for key, value in body_payload.headers.items():
1207 if key in headers or (skip_headers is not None and key in skip_headers):
1208 continue
1209 headers[key] = value
1210
1211 def _update_body(self, body: Any) -> None:
1212 """Update request body after its already been set."""
1213 # Remove existing Content-Length header since body is changing
1214 if hdrs.CONTENT_LENGTH in self.headers:
1215 del self.headers[hdrs.CONTENT_LENGTH]
1216
1217 # Remove existing Transfer-Encoding header to avoid conflicts
1218 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers:
1219 del self.headers[hdrs.TRANSFER_ENCODING]
1220
1221 # Now update the body using the existing method
1222 # Called from _update_body, add 1 to stacklevel from caller
1223 self.update_body_from_data(body, _stacklevel=4)
1224
1225 # Update transfer encoding headers if needed (same logic as __init__)
1226 if body is not None or self.method not in self.GET_METHODS:
1227 self.update_transfer_encoding()
1228
1229 async def update_body(self, body: Any) -> None:
1230 """
1231 Update request body and close previous payload if needed.
1232
1233 This method safely updates the request body by first closing any existing
1234 payload to prevent resource leaks, then setting the new body.
1235
1236 IMPORTANT: Always use this method instead of setting request.body directly.
1237 Direct assignment to request.body will leak resources if the previous body
1238 contains file handles, streams, or other resources that need cleanup.
1239
1240 Args:
1241 body: The new body content. Can be:
1242 - bytes/bytearray: Raw binary data
1243 - str: Text data (will be encoded using charset from Content-Type)
1244 - FormData: Form data that will be encoded as multipart/form-data
1245 - Payload: A pre-configured payload object
1246 - AsyncIterable: An async iterable of bytes chunks
1247 - File-like object: Will be read and sent as binary data
1248 - None: Clears the body
1249
1250 Usage:
1251 # CORRECT: Use update_body
1252 await request.update_body(b"new request data")
1253
1254 # WRONG: Don't set body directly
1255 # request.body = b"new request data" # This will leak resources!
1256
1257 # Update with form data
1258 form_data = FormData()
1259 form_data.add_field('field', 'value')
1260 await request.update_body(form_data)
1261
1262 # Clear body
1263 await request.update_body(None)
1264
1265 Note:
1266 This method is async because it may need to close file handles or
1267 other resources associated with the previous payload. Always await
1268 this method to ensure proper cleanup.
1269
1270 Warning:
1271 Setting request.body directly is highly discouraged and can lead to:
1272 - Resource leaks (unclosed file handles, streams)
1273 - Memory leaks (unreleased buffers)
1274 - Unexpected behavior with streaming payloads
1275
1276 It is not recommended to change the payload type in middleware. If the
1277 body was already set (e.g., as bytes), it's best to keep the same type
1278 rather than converting it (e.g., to str) as this may result in unexpected
1279 behavior.
1280
1281 See Also:
1282 - update_body_from_data: Synchronous body update without cleanup
1283 - body property: Direct body access (STRONGLY DISCOURAGED)
1284
1285 """
1286 # Close existing payload if it exists and needs closing
1287 if self._body is not None:
1288 await self._body.close()
1289 self._update_body(body)
1290
1291 def update_expect_continue(self, expect: bool = False) -> None:
1292 if expect:
1293 self.headers[hdrs.EXPECT] = "100-continue"
1294 elif (
1295 hdrs.EXPECT in self.headers
1296 and self.headers[hdrs.EXPECT].lower() == "100-continue"
1297 ):
1298 expect = True
1299
1300 if expect:
1301 self._continue = self.loop.create_future()
1302
1303 def update_proxy(
1304 self,
1305 proxy: Optional[URL],
1306 proxy_auth: Optional[BasicAuth],
1307 proxy_headers: Optional[LooseHeaders],
1308 ) -> None:
1309 self.proxy = proxy
1310 if proxy is None:
1311 self.proxy_auth = None
1312 self.proxy_headers = None
1313 return
1314
1315 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
1316 raise ValueError("proxy_auth must be None or BasicAuth() tuple")
1317 self.proxy_auth = proxy_auth
1318
1319 if proxy_headers is not None and not isinstance(
1320 proxy_headers, (MultiDict, MultiDictProxy)
1321 ):
1322 proxy_headers = CIMultiDict(proxy_headers)
1323 self.proxy_headers = proxy_headers
1324
1325 async def write_bytes(
1326 self,
1327 writer: AbstractStreamWriter,
1328 conn: "Connection",
1329 content_length: Optional[int],
1330 ) -> None:
1331 """
1332 Write the request body to the connection stream.
1333
1334 This method handles writing different types of request bodies:
1335 1. Payload objects (using their specialized write_with_length method)
1336 2. Bytes/bytearray objects
1337 3. Iterable body content
1338
1339 Args:
1340 writer: The stream writer to write the body to
1341 conn: The connection being used for this request
1342 content_length: Optional maximum number of bytes to write from the body
1343 (None means write the entire body)
1344
1345 The method properly handles:
1346 - Waiting for 100-Continue responses if required
1347 - Content length constraints for chunked encoding
1348 - Error handling for network issues, cancellation, and other exceptions
1349 - Signaling EOF and timeout management
1350
1351 Raises:
1352 ClientOSError: When there's an OS-level error writing the body
1353 ClientConnectionError: When there's a general connection error
1354 asyncio.CancelledError: When the operation is cancelled
1355
1356 """
1357 # 100 response
1358 if self._continue is not None:
1359 # Force headers to be sent before waiting for 100-continue
1360 writer.send_headers()
1361 await writer.drain()
1362 await self._continue
1363
1364 protocol = conn.protocol
1365 assert protocol is not None
1366 try:
1367 # This should be a rare case but the
1368 # self._body can be set to None while
1369 # the task is being started or we wait above
1370 # for the 100-continue response.
1371 # The more likely case is we have an empty
1372 # payload, but 100-continue is still expected.
1373 if self._body is not None:
1374 await self._body.write_with_length(writer, content_length)
1375 except OSError as underlying_exc:
1376 reraised_exc = underlying_exc
1377
1378 # Distinguish between timeout and other OS errors for better error reporting
1379 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
1380 underlying_exc, asyncio.TimeoutError
1381 )
1382 if exc_is_not_timeout:
1383 reraised_exc = ClientOSError(
1384 underlying_exc.errno,
1385 f"Can not write request body for {self.url !s}",
1386 )
1387
1388 set_exception(protocol, reraised_exc, underlying_exc)
1389 except asyncio.CancelledError:
1390 # Body hasn't been fully sent, so connection can't be reused
1391 conn.close()
1392 raise
1393 except Exception as underlying_exc:
1394 set_exception(
1395 protocol,
1396 ClientConnectionError(
1397 "Failed to send bytes into the underlying connection "
1398 f"{conn !s}: {underlying_exc!r}",
1399 ),
1400 underlying_exc,
1401 )
1402 else:
1403 # Successfully wrote the body, signal EOF and start response timeout
1404 await writer.write_eof()
1405 protocol.start_timeout()
1406
1407 async def send(self, conn: "Connection") -> "ClientResponse":
1408 # Specify request target:
1409 # - CONNECT request must send authority form URI
1410 # - not CONNECT proxy must send absolute form URI
1411 # - most common is origin form URI
1412 if self.method == hdrs.METH_CONNECT:
1413 connect_host = self.url.host_subcomponent
1414 assert connect_host is not None
1415 path = f"{connect_host}:{self.url.port}"
1416 elif self.proxy and not self.is_ssl():
1417 path = str(self.url)
1418 else:
1419 path = self.url.raw_path_qs
1420
1421 protocol = conn.protocol
1422 assert protocol is not None
1423 writer = StreamWriter(
1424 protocol,
1425 self.loop,
1426 on_chunk_sent=(
1427 functools.partial(self._on_chunk_request_sent, self.method, self.url)
1428 if self._traces
1429 else None
1430 ),
1431 on_headers_sent=(
1432 functools.partial(self._on_headers_request_sent, self.method, self.url)
1433 if self._traces
1434 else None
1435 ),
1436 )
1437
1438 if self.compress:
1439 writer.enable_compression(self.compress) # type: ignore[arg-type]
1440
1441 if self.chunked is not None:
1442 writer.enable_chunking()
1443
1444 # set default content-type
1445 if (
1446 self.method in self.POST_METHODS
1447 and (
1448 self._skip_auto_headers is None
1449 or hdrs.CONTENT_TYPE not in self._skip_auto_headers
1450 )
1451 and hdrs.CONTENT_TYPE not in self.headers
1452 ):
1453 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
1454
1455 v = self.version
1456 if hdrs.CONNECTION not in self.headers:
1457 if conn._connector.force_close:
1458 if v == HttpVersion11:
1459 self.headers[hdrs.CONNECTION] = "close"
1460 elif v == HttpVersion10:
1461 self.headers[hdrs.CONNECTION] = "keep-alive"
1462
1463 # status + headers
1464 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"
1465
1466 # Buffer headers for potential coalescing with body
1467 await writer.write_headers(status_line, self.headers)
1468
1469 task: Optional["asyncio.Task[None]"]
1470 if self._body or self._continue is not None or protocol.writing_paused:
1471 coro = self.write_bytes(writer, conn, self._get_content_length())
1472 if sys.version_info >= (3, 12):
1473 # Optimization for Python 3.12, try to write
1474 # bytes immediately to avoid having to schedule
1475 # the task on the event loop.
1476 task = asyncio.Task(coro, loop=self.loop, eager_start=True)
1477 else:
1478 task = self.loop.create_task(coro)
1479 if task.done():
1480 task = None
1481 else:
1482 self._writer = task
1483 else:
1484 # We have nothing to write because
1485 # - there is no body
1486 # - the protocol does not have writing paused
1487 # - we are not waiting for a 100-continue response
1488 protocol.start_timeout()
1489 writer.set_eof()
1490 task = None
1491 response_class = self.response_class
1492 assert response_class is not None
1493 self.response = response_class(
1494 self.method,
1495 self.original_url,
1496 writer=task,
1497 continue100=self._continue,
1498 timer=self._timer,
1499 request_info=self.request_info,
1500 traces=self._traces,
1501 loop=self.loop,
1502 session=self._session,
1503 )
1504 return self.response
1505
1506 async def close(self) -> None:
1507 if self.__writer is not None:
1508 try:
1509 await self.__writer
1510 except asyncio.CancelledError:
1511 if (
1512 sys.version_info >= (3, 11)
1513 and (task := asyncio.current_task())
1514 and task.cancelling()
1515 ):
1516 raise
1517
1518 def terminate(self) -> None:
1519 if self.__writer is not None:
1520 if not self.loop.is_closed():
1521 self.__writer.cancel()
1522 self.__writer.remove_done_callback(self.__reset_writer)
1523 self.__writer = None
1524
1525 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
1526 for trace in self._traces:
1527 await trace.send_request_chunk_sent(method, url, chunk)
1528
1529 async def _on_headers_request_sent(
1530 self, method: str, url: URL, headers: "CIMultiDict[str]"
1531 ) -> None:
1532 for trace in self._traces:
1533 await trace.send_request_headers(method, url, headers)