1import asyncio
2import codecs
3import contextlib
4import functools
5import io
6import re
7import sys
8import traceback
9import warnings
10from collections.abc import Mapping
11from hashlib import md5, sha1, sha256
12from http.cookies import Morsel, SimpleCookie
13from types import MappingProxyType, TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Callable,
18 Dict,
19 Iterable,
20 List,
21 Literal,
22 NamedTuple,
23 Optional,
24 Tuple,
25 Type,
26 Union,
27)
28
29import attr
30from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
31from yarl import URL
32
33from . import hdrs, helpers, http, multipart, payload
34from ._cookie_helpers import (
35 parse_cookie_header,
36 parse_set_cookie_headers,
37 preserve_morsel_with_coded_value,
38)
39from .abc import AbstractStreamWriter
40from .client_exceptions import (
41 ClientConnectionError,
42 ClientOSError,
43 ClientResponseError,
44 ContentTypeError,
45 InvalidURL,
46 ServerFingerprintMismatch,
47)
48from .compression_utils import HAS_BROTLI, HAS_ZSTD
49from .formdata import FormData
50from .helpers import (
51 _SENTINEL,
52 BaseTimerContext,
53 BasicAuth,
54 HeadersMixin,
55 TimerNoop,
56 basicauth_from_netrc,
57 netrc_from_env,
58 noop,
59 reify,
60 sentinel,
61 set_exception,
62 set_result,
63)
64from .http import (
65 SERVER_SOFTWARE,
66 HttpVersion,
67 HttpVersion10,
68 HttpVersion11,
69 StreamWriter,
70)
71from .streams import StreamReader
72from .typedefs import (
73 DEFAULT_JSON_DECODER,
74 JSONDecoder,
75 LooseCookies,
76 LooseHeaders,
77 Query,
78 RawHeaders,
79)
80
81if TYPE_CHECKING:
82 import ssl
83 from ssl import SSLContext
84else:
85 try:
86 import ssl
87 from ssl import SSLContext
88 except ImportError: # pragma: no cover
89 ssl = None # type: ignore[assignment]
90 SSLContext = object # type: ignore[misc,assignment]
91
92
93__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
94
95
96if TYPE_CHECKING:
97 from .client import ClientSession
98 from .connector import Connection
99 from .tracing import Trace
100
101
102_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
103_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
104json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json")
105
106
107def _gen_default_accept_encoding() -> str:
108 encodings = [
109 "gzip",
110 "deflate",
111 ]
112 if HAS_BROTLI:
113 encodings.append("br")
114 if HAS_ZSTD:
115 encodings.append("zstd")
116 return ", ".join(encodings)
117
118
119@attr.s(auto_attribs=True, frozen=True, slots=True)
120class ContentDisposition:
121 type: Optional[str]
122 parameters: "MappingProxyType[str, str]"
123 filename: Optional[str]
124
125
126class _RequestInfo(NamedTuple):
127 url: URL
128 method: str
129 headers: "CIMultiDictProxy[str]"
130 real_url: URL
131
132
133class RequestInfo(_RequestInfo):
134
135 def __new__(
136 cls,
137 url: URL,
138 method: str,
139 headers: "CIMultiDictProxy[str]",
140 real_url: Union[URL, _SENTINEL] = sentinel,
141 ) -> "RequestInfo":
142 """Create a new RequestInfo instance.
143
144 For backwards compatibility, the real_url parameter is optional.
145 """
146 return tuple.__new__(
147 cls, (url, method, headers, url if real_url is sentinel else real_url)
148 )
149
150
151class Fingerprint:
152 HASHFUNC_BY_DIGESTLEN = {
153 16: md5,
154 20: sha1,
155 32: sha256,
156 }
157
158 def __init__(self, fingerprint: bytes) -> None:
159 digestlen = len(fingerprint)
160 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
161 if not hashfunc:
162 raise ValueError("fingerprint has invalid length")
163 elif hashfunc is md5 or hashfunc is sha1:
164 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.")
165 self._hashfunc = hashfunc
166 self._fingerprint = fingerprint
167
168 @property
169 def fingerprint(self) -> bytes:
170 return self._fingerprint
171
172 def check(self, transport: asyncio.Transport) -> None:
173 if not transport.get_extra_info("sslcontext"):
174 return
175 sslobj = transport.get_extra_info("ssl_object")
176 cert = sslobj.getpeercert(binary_form=True)
177 got = self._hashfunc(cert).digest()
178 if got != self._fingerprint:
179 host, port, *_ = transport.get_extra_info("peername")
180 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
181
182
183if ssl is not None:
184 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
185else: # pragma: no cover
186 SSL_ALLOWED_TYPES = (bool, type(None))
187
188
189def _merge_ssl_params(
190 ssl: Union["SSLContext", bool, Fingerprint],
191 verify_ssl: Optional[bool],
192 ssl_context: Optional["SSLContext"],
193 fingerprint: Optional[bytes],
194) -> Union["SSLContext", bool, Fingerprint]:
195 if ssl is None:
196 ssl = True # Double check for backwards compatibility
197 if verify_ssl is not None and not verify_ssl:
198 warnings.warn(
199 "verify_ssl is deprecated, use ssl=False instead",
200 DeprecationWarning,
201 stacklevel=3,
202 )
203 if ssl is not True:
204 raise ValueError(
205 "verify_ssl, ssl_context, fingerprint and ssl "
206 "parameters are mutually exclusive"
207 )
208 else:
209 ssl = False
210 if ssl_context is not None:
211 warnings.warn(
212 "ssl_context is deprecated, use ssl=context instead",
213 DeprecationWarning,
214 stacklevel=3,
215 )
216 if ssl is not True:
217 raise ValueError(
218 "verify_ssl, ssl_context, fingerprint and ssl "
219 "parameters are mutually exclusive"
220 )
221 else:
222 ssl = ssl_context
223 if fingerprint is not None:
224 warnings.warn(
225 "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead",
226 DeprecationWarning,
227 stacklevel=3,
228 )
229 if ssl is not True:
230 raise ValueError(
231 "verify_ssl, ssl_context, fingerprint and ssl "
232 "parameters are mutually exclusive"
233 )
234 else:
235 ssl = Fingerprint(fingerprint)
236 if not isinstance(ssl, SSL_ALLOWED_TYPES):
237 raise TypeError(
238 "ssl should be SSLContext, bool, Fingerprint or None, "
239 "got {!r} instead.".format(ssl)
240 )
241 return ssl
242
243
244_SSL_SCHEMES = frozenset(("https", "wss"))
245
246
247# ConnectionKey is a NamedTuple because it is used as a key in a dict
248# and a set in the connector. Since a NamedTuple is a tuple it uses
249# the fast native tuple __hash__ and __eq__ implementation in CPython.
250class ConnectionKey(NamedTuple):
251 # the key should contain an information about used proxy / TLS
252 # to prevent reusing wrong connections from a pool
253 host: str
254 port: Optional[int]
255 is_ssl: bool
256 ssl: Union[SSLContext, bool, Fingerprint]
257 proxy: Optional[URL]
258 proxy_auth: Optional[BasicAuth]
259 proxy_headers_hash: Optional[int] # hash(CIMultiDict)
260
261
262def _is_expected_content_type(
263 response_content_type: str, expected_content_type: str
264) -> bool:
265 if expected_content_type == "application/json":
266 return json_re.match(response_content_type) is not None
267 return expected_content_type in response_content_type
268
269
270def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None:
271 """Warn if the payload is not closed.
272
273 Callers must check that the body is a Payload before calling this method.
274
275 Args:
276 payload: The payload to check
277 stacklevel: Stack level for the warning (default 2 for direct callers)
278 """
279 if not payload.autoclose and not payload.consumed:
280 warnings.warn(
281 "The previous request body contains unclosed resources. "
282 "Use await request.update_body() instead of setting request.body "
283 "directly to properly close resources and avoid leaks.",
284 ResourceWarning,
285 stacklevel=stacklevel,
286 )
287
288
289class ClientResponse(HeadersMixin):
290
291 # Some of these attributes are None when created,
292 # but will be set by the start() method.
293 # As the end user will likely never see the None values, we cheat the types below.
294 # from the Status-Line of the response
295 version: Optional[HttpVersion] = None # HTTP-Version
296 status: int = None # type: ignore[assignment] # Status-Code
297 reason: Optional[str] = None # Reason-Phrase
298
299 content: StreamReader = None # type: ignore[assignment] # Payload stream
300 _body: Optional[bytes] = None
301 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment]
302 _history: Tuple["ClientResponse", ...] = ()
303 _raw_headers: RawHeaders = None # type: ignore[assignment]
304
305 _connection: Optional["Connection"] = None # current connection
306 _cookies: Optional[SimpleCookie] = None
307 _raw_cookie_headers: Optional[Tuple[str, ...]] = None
308 _continue: Optional["asyncio.Future[bool]"] = None
309 _source_traceback: Optional[traceback.StackSummary] = None
310 _session: Optional["ClientSession"] = None
311 # set up by ClientRequest after ClientResponse object creation
312 # post-init stage allows to not change ctor signature
313 _closed = True # to allow __del__ for non-initialized properly response
314 _released = False
315 _in_context = False
316
317 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8"
318
319 __writer: Optional["asyncio.Task[None]"] = None
320
321 def __init__(
322 self,
323 method: str,
324 url: URL,
325 *,
326 writer: "Optional[asyncio.Task[None]]",
327 continue100: Optional["asyncio.Future[bool]"],
328 timer: BaseTimerContext,
329 request_info: RequestInfo,
330 traces: List["Trace"],
331 loop: asyncio.AbstractEventLoop,
332 session: "ClientSession",
333 ) -> None:
334 # URL forbids subclasses, so a simple type check is enough.
335 assert type(url) is URL
336
337 self.method = method
338
339 self._real_url = url
340 self._url = url.with_fragment(None) if url.raw_fragment else url
341 if writer is not None:
342 self._writer = writer
343 if continue100 is not None:
344 self._continue = continue100
345 self._request_info = request_info
346 self._timer = timer if timer is not None else TimerNoop()
347 self._cache: Dict[str, Any] = {}
348 self._traces = traces
349 self._loop = loop
350 # Save reference to _resolve_charset, so that get_encoding() will still
351 # work after the response has finished reading the body.
352 # TODO: Fix session=None in tests (see ClientRequest.__init__).
353 if session is not None:
354 # store a reference to session #1985
355 self._session = session
356 self._resolve_charset = session._resolve_charset
357 if loop.get_debug():
358 self._source_traceback = traceback.extract_stack(sys._getframe(1))
359
360 def __reset_writer(self, _: object = None) -> None:
361 self.__writer = None
362
363 @property
364 def _writer(self) -> Optional["asyncio.Task[None]"]:
365 """The writer task for streaming data.
366
367 _writer is only provided for backwards compatibility
368 for subclasses that may need to access it.
369 """
370 return self.__writer
371
372 @_writer.setter
373 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
374 """Set the writer task for streaming data."""
375 if self.__writer is not None:
376 self.__writer.remove_done_callback(self.__reset_writer)
377 self.__writer = writer
378 if writer is None:
379 return
380 if writer.done():
381 # The writer is already done, so we can clear it immediately.
382 self.__writer = None
383 else:
384 writer.add_done_callback(self.__reset_writer)
385
386 @property
387 def cookies(self) -> SimpleCookie:
388 if self._cookies is None:
389 if self._raw_cookie_headers is not None:
390 # Parse cookies for response.cookies (SimpleCookie for backward compatibility)
391 cookies = SimpleCookie()
392 # Use parse_set_cookie_headers for more lenient parsing that handles
393 # malformed cookies better than SimpleCookie.load
394 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers))
395 self._cookies = cookies
396 else:
397 self._cookies = SimpleCookie()
398 return self._cookies
399
400 @cookies.setter
401 def cookies(self, cookies: SimpleCookie) -> None:
402 self._cookies = cookies
403 # Generate raw cookie headers from the SimpleCookie
404 if cookies:
405 self._raw_cookie_headers = tuple(
406 morsel.OutputString() for morsel in cookies.values()
407 )
408 else:
409 self._raw_cookie_headers = None
410
411 @reify
412 def url(self) -> URL:
413 return self._url
414
415 @reify
416 def url_obj(self) -> URL:
417 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2)
418 return self._url
419
420 @reify
421 def real_url(self) -> URL:
422 return self._real_url
423
424 @reify
425 def host(self) -> str:
426 assert self._url.host is not None
427 return self._url.host
428
429 @reify
430 def headers(self) -> "CIMultiDictProxy[str]":
431 return self._headers
432
433 @reify
434 def raw_headers(self) -> RawHeaders:
435 return self._raw_headers
436
437 @reify
438 def request_info(self) -> RequestInfo:
439 return self._request_info
440
441 @reify
442 def content_disposition(self) -> Optional[ContentDisposition]:
443 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
444 if raw is None:
445 return None
446 disposition_type, params_dct = multipart.parse_content_disposition(raw)
447 params = MappingProxyType(params_dct)
448 filename = multipart.content_disposition_filename(params)
449 return ContentDisposition(disposition_type, params, filename)
450
451 def __del__(self, _warnings: Any = warnings) -> None:
452 if self._closed:
453 return
454
455 if self._connection is not None:
456 self._connection.release()
457 self._cleanup_writer()
458
459 if self._loop.get_debug():
460 kwargs = {"source": self}
461 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs)
462 context = {"client_response": self, "message": "Unclosed response"}
463 if self._source_traceback:
464 context["source_traceback"] = self._source_traceback
465 self._loop.call_exception_handler(context)
466
467 def __repr__(self) -> str:
468 out = io.StringIO()
469 ascii_encodable_url = str(self.url)
470 if self.reason:
471 ascii_encodable_reason = self.reason.encode(
472 "ascii", "backslashreplace"
473 ).decode("ascii")
474 else:
475 ascii_encodable_reason = "None"
476 print(
477 "<ClientResponse({}) [{} {}]>".format(
478 ascii_encodable_url, self.status, ascii_encodable_reason
479 ),
480 file=out,
481 )
482 print(self.headers, file=out)
483 return out.getvalue()
484
485 @property
486 def connection(self) -> Optional["Connection"]:
487 return self._connection
488
489 @reify
490 def history(self) -> Tuple["ClientResponse", ...]:
491 """A sequence of of responses, if redirects occurred."""
492 return self._history
493
494 @reify
495 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
496 links_str = ", ".join(self.headers.getall("link", []))
497
498 if not links_str:
499 return MultiDictProxy(MultiDict())
500
501 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict()
502
503 for val in re.split(r",(?=\s*<)", links_str):
504 match = re.match(r"\s*<(.*)>(.*)", val)
505 if match is None: # pragma: no cover
506 # the check exists to suppress mypy error
507 continue
508 url, params_str = match.groups()
509 params = params_str.split(";")[1:]
510
511 link: MultiDict[Union[str, URL]] = MultiDict()
512
513 for param in params:
514 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
515 if match is None: # pragma: no cover
516 # the check exists to suppress mypy error
517 continue
518 key, _, value, _ = match.groups()
519
520 link.add(key, value)
521
522 key = link.get("rel", url)
523
524 link.add("url", self.url.join(URL(url)))
525
526 links.add(str(key), MultiDictProxy(link))
527
528 return MultiDictProxy(links)
529
530 async def start(self, connection: "Connection") -> "ClientResponse":
531 """Start response processing."""
532 self._closed = False
533 self._protocol = connection.protocol
534 self._connection = connection
535
536 with self._timer:
537 while True:
538 # read response
539 try:
540 protocol = self._protocol
541 message, payload = await protocol.read() # type: ignore[union-attr]
542 except http.HttpProcessingError as exc:
543 raise ClientResponseError(
544 self.request_info,
545 self.history,
546 status=exc.code,
547 message=exc.message,
548 headers=exc.headers,
549 ) from exc
550
551 if message.code < 100 or message.code > 199 or message.code == 101:
552 break
553
554 if self._continue is not None:
555 set_result(self._continue, True)
556 self._continue = None
557
558 # payload eof handler
559 payload.on_eof(self._response_eof)
560
561 # response status
562 self.version = message.version
563 self.status = message.code
564 self.reason = message.reason
565
566 # headers
567 self._headers = message.headers # type is CIMultiDictProxy
568 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
569
570 # payload
571 self.content = payload
572
573 # cookies
574 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()):
575 # Store raw cookie headers for CookieJar
576 self._raw_cookie_headers = tuple(cookie_hdrs)
577 return self
578
579 def _response_eof(self) -> None:
580 if self._closed:
581 return
582
583 # protocol could be None because connection could be detached
584 protocol = self._connection and self._connection.protocol
585 if protocol is not None and protocol.upgraded:
586 return
587
588 self._closed = True
589 self._cleanup_writer()
590 self._release_connection()
591
592 @property
593 def closed(self) -> bool:
594 return self._closed
595
596 def close(self) -> None:
597 if not self._released:
598 self._notify_content()
599
600 self._closed = True
601 if self._loop is None or self._loop.is_closed():
602 return
603
604 self._cleanup_writer()
605 if self._connection is not None:
606 self._connection.close()
607 self._connection = None
608
609 def release(self) -> Any:
610 if not self._released:
611 self._notify_content()
612
613 self._closed = True
614
615 self._cleanup_writer()
616 self._release_connection()
617 return noop()
618
619 @property
620 def ok(self) -> bool:
621 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
622
623 This is **not** a check for ``200 OK`` but a check that the response
624 status is under 400.
625 """
626 return 400 > self.status
627
628 def raise_for_status(self) -> None:
629 if not self.ok:
630 # reason should always be not None for a started response
631 assert self.reason is not None
632
633 # If we're in a context we can rely on __aexit__() to release as the
634 # exception propagates.
635 if not self._in_context:
636 self.release()
637
638 raise ClientResponseError(
639 self.request_info,
640 self.history,
641 status=self.status,
642 message=self.reason,
643 headers=self.headers,
644 )
645
646 def _release_connection(self) -> None:
647 if self._connection is not None:
648 if self.__writer is None:
649 self._connection.release()
650 self._connection = None
651 else:
652 self.__writer.add_done_callback(lambda f: self._release_connection())
653
654 async def _wait_released(self) -> None:
655 if self.__writer is not None:
656 try:
657 await self.__writer
658 except asyncio.CancelledError:
659 if (
660 sys.version_info >= (3, 11)
661 and (task := asyncio.current_task())
662 and task.cancelling()
663 ):
664 raise
665 self._release_connection()
666
667 def _cleanup_writer(self) -> None:
668 if self.__writer is not None:
669 self.__writer.cancel()
670 self._session = None
671
672 def _notify_content(self) -> None:
673 content = self.content
674 if content and content.exception() is None:
675 set_exception(content, _CONNECTION_CLOSED_EXCEPTION)
676 self._released = True
677
678 async def wait_for_close(self) -> None:
679 if self.__writer is not None:
680 try:
681 await self.__writer
682 except asyncio.CancelledError:
683 if (
684 sys.version_info >= (3, 11)
685 and (task := asyncio.current_task())
686 and task.cancelling()
687 ):
688 raise
689 self.release()
690
691 async def read(self) -> bytes:
692 """Read response payload."""
693 if self._body is None:
694 try:
695 self._body = await self.content.read()
696 for trace in self._traces:
697 await trace.send_response_chunk_received(
698 self.method, self.url, self._body
699 )
700 except BaseException:
701 self.close()
702 raise
703 elif self._released: # Response explicitly released
704 raise ClientConnectionError("Connection closed")
705
706 protocol = self._connection and self._connection.protocol
707 if protocol is None or not protocol.upgraded:
708 await self._wait_released() # Underlying connection released
709 return self._body
710
711 def get_encoding(self) -> str:
712 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
713 mimetype = helpers.parse_mimetype(ctype)
714
715 encoding = mimetype.parameters.get("charset")
716 if encoding:
717 with contextlib.suppress(LookupError, ValueError):
718 return codecs.lookup(encoding).name
719
720 if mimetype.type == "application" and (
721 mimetype.subtype == "json" or mimetype.subtype == "rdap"
722 ):
723 # RFC 7159 states that the default encoding is UTF-8.
724 # RFC 7483 defines application/rdap+json
725 return "utf-8"
726
727 if self._body is None:
728 raise RuntimeError(
729 "Cannot compute fallback encoding of a not yet read body"
730 )
731
732 return self._resolve_charset(self, self._body)
733
734 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
735 """Read response payload and decode."""
736 if self._body is None:
737 await self.read()
738
739 if encoding is None:
740 encoding = self.get_encoding()
741
742 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
743
744 async def json(
745 self,
746 *,
747 encoding: Optional[str] = None,
748 loads: JSONDecoder = DEFAULT_JSON_DECODER,
749 content_type: Optional[str] = "application/json",
750 ) -> Any:
751 """Read and decodes JSON response."""
752 if self._body is None:
753 await self.read()
754
755 if content_type:
756 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
757 if not _is_expected_content_type(ctype, content_type):
758 raise ContentTypeError(
759 self.request_info,
760 self.history,
761 status=self.status,
762 message=(
763 "Attempt to decode JSON with unexpected mimetype: %s" % ctype
764 ),
765 headers=self.headers,
766 )
767
768 stripped = self._body.strip() # type: ignore[union-attr]
769 if not stripped:
770 return None
771
772 if encoding is None:
773 encoding = self.get_encoding()
774
775 return loads(stripped.decode(encoding))
776
777 async def __aenter__(self) -> "ClientResponse":
778 self._in_context = True
779 return self
780
781 async def __aexit__(
782 self,
783 exc_type: Optional[Type[BaseException]],
784 exc_val: Optional[BaseException],
785 exc_tb: Optional[TracebackType],
786 ) -> None:
787 self._in_context = False
788 # similar to _RequestContextManager, we do not need to check
789 # for exceptions, response object can close connection
790 # if state is broken
791 self.release()
792 await self.wait_for_close()
793
794
795class ClientRequest:
796 GET_METHODS = {
797 hdrs.METH_GET,
798 hdrs.METH_HEAD,
799 hdrs.METH_OPTIONS,
800 hdrs.METH_TRACE,
801 }
802 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
803 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
804
805 DEFAULT_HEADERS = {
806 hdrs.ACCEPT: "*/*",
807 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
808 }
809
810 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
811 _body: Union[None, payload.Payload] = None
812 auth = None
813 response = None
814
815 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data
816
817 # These class defaults help create_autospec() work correctly.
818 # If autospec is improved in future, maybe these can be removed.
819 url = URL()
820 method = "GET"
821
822 _continue = None # waiter future for '100 Continue' response
823
824 _skip_auto_headers: Optional["CIMultiDict[None]"] = None
825
826 # N.B.
827 # Adding __del__ method with self._writer closing doesn't make sense
828 # because _writer is instance method, thus it keeps a reference to self.
829 # Until writer has finished finalizer will not be called.
830
831 def __init__(
832 self,
833 method: str,
834 url: URL,
835 *,
836 params: Query = None,
837 headers: Optional[LooseHeaders] = None,
838 skip_auto_headers: Optional[Iterable[str]] = None,
839 data: Any = None,
840 cookies: Optional[LooseCookies] = None,
841 auth: Optional[BasicAuth] = None,
842 version: http.HttpVersion = http.HttpVersion11,
843 compress: Union[str, bool, None] = None,
844 chunked: Optional[bool] = None,
845 expect100: bool = False,
846 loop: Optional[asyncio.AbstractEventLoop] = None,
847 response_class: Optional[Type["ClientResponse"]] = None,
848 proxy: Optional[URL] = None,
849 proxy_auth: Optional[BasicAuth] = None,
850 timer: Optional[BaseTimerContext] = None,
851 session: Optional["ClientSession"] = None,
852 ssl: Union[SSLContext, bool, Fingerprint] = True,
853 proxy_headers: Optional[LooseHeaders] = None,
854 traces: Optional[List["Trace"]] = None,
855 trust_env: bool = False,
856 server_hostname: Optional[str] = None,
857 ):
858 if loop is None:
859 loop = asyncio.get_event_loop()
860 if match := _CONTAINS_CONTROL_CHAR_RE.search(method):
861 raise ValueError(
862 f"Method cannot contain non-token characters {method!r} "
863 f"(found at least {match.group()!r})"
864 )
865 # URL forbids subclasses, so a simple type check is enough.
866 assert type(url) is URL, url
867 if proxy is not None:
868 assert type(proxy) is URL, proxy
869 # FIXME: session is None in tests only, need to fix tests
870 # assert session is not None
871 if TYPE_CHECKING:
872 assert session is not None
873 self._session = session
874 if params:
875 url = url.extend_query(params)
876 self.original_url = url
877 self.url = url.with_fragment(None) if url.raw_fragment else url
878 self.method = method.upper()
879 self.chunked = chunked
880 self.compress = compress
881 self.loop = loop
882 self.length = None
883 if response_class is None:
884 real_response_class = ClientResponse
885 else:
886 real_response_class = response_class
887 self.response_class: Type[ClientResponse] = real_response_class
888 self._timer = timer if timer is not None else TimerNoop()
889 self._ssl = ssl if ssl is not None else True
890 self.server_hostname = server_hostname
891
892 if loop.get_debug():
893 self._source_traceback = traceback.extract_stack(sys._getframe(1))
894
895 self.update_version(version)
896 self.update_host(url)
897 self.update_headers(headers)
898 self.update_auto_headers(skip_auto_headers)
899 self.update_cookies(cookies)
900 self.update_content_encoding(data)
901 self.update_auth(auth, trust_env)
902 self.update_proxy(proxy, proxy_auth, proxy_headers)
903
904 self.update_body_from_data(data)
905 if data is not None or self.method not in self.GET_METHODS:
906 self.update_transfer_encoding()
907 self.update_expect_continue(expect100)
908 self._traces = [] if traces is None else traces
909
910 def __reset_writer(self, _: object = None) -> None:
911 self.__writer = None
912
913 def _get_content_length(self) -> Optional[int]:
914 """Extract and validate Content-Length header value.
915
916 Returns parsed Content-Length value or None if not set.
917 Raises ValueError if header exists but cannot be parsed as an integer.
918 """
919 if hdrs.CONTENT_LENGTH not in self.headers:
920 return None
921
922 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
923 try:
924 return int(content_length_hdr)
925 except ValueError:
926 raise ValueError(
927 f"Invalid Content-Length header: {content_length_hdr}"
928 ) from None
929
930 @property
931 def skip_auto_headers(self) -> CIMultiDict[None]:
932 return self._skip_auto_headers or CIMultiDict()
933
934 @property
935 def _writer(self) -> Optional["asyncio.Task[None]"]:
936 return self.__writer
937
938 @_writer.setter
939 def _writer(self, writer: "asyncio.Task[None]") -> None:
940 if self.__writer is not None:
941 self.__writer.remove_done_callback(self.__reset_writer)
942 self.__writer = writer
943 writer.add_done_callback(self.__reset_writer)
944
945 def is_ssl(self) -> bool:
946 return self.url.scheme in _SSL_SCHEMES
947
948 @property
949 def ssl(self) -> Union["SSLContext", bool, Fingerprint]:
950 return self._ssl
951
952 @property
953 def connection_key(self) -> ConnectionKey:
954 if proxy_headers := self.proxy_headers:
955 h: Optional[int] = hash(tuple(proxy_headers.items()))
956 else:
957 h = None
958 url = self.url
959 return tuple.__new__(
960 ConnectionKey,
961 (
962 url.raw_host or "",
963 url.port,
964 url.scheme in _SSL_SCHEMES,
965 self._ssl,
966 self.proxy,
967 self.proxy_auth,
968 h,
969 ),
970 )
971
972 @property
973 def host(self) -> str:
974 ret = self.url.raw_host
975 assert ret is not None
976 return ret
977
978 @property
979 def port(self) -> Optional[int]:
980 return self.url.port
981
982 @property
983 def body(self) -> Union[payload.Payload, Literal[b""]]:
984 """Request body."""
985 # empty body is represented as bytes for backwards compatibility
986 return self._body or b""
987
988 @body.setter
989 def body(self, value: Any) -> None:
990 """Set request body with warning for non-autoclose payloads.
991
992 WARNING: This setter must be called from within an event loop and is not
993 thread-safe. Setting body outside of an event loop may raise RuntimeError
994 when closing file-based payloads.
995
996 DEPRECATED: Direct assignment to body is deprecated and will be removed
997 in a future version. Use await update_body() instead for proper resource
998 management.
999 """
1000 # Close existing payload if present
1001 if self._body is not None:
1002 # Warn if the payload needs manual closing
1003 # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload
1004 _warn_if_unclosed_payload(self._body, stacklevel=3)
1005 # NOTE: In the future, when we remove sync close support,
1006 # this setter will need to be removed and only the async
1007 # update_body() method will be available. For now, we call
1008 # _close() for backwards compatibility.
1009 self._body._close()
1010 self._update_body(value)
1011
1012 @property
1013 def request_info(self) -> RequestInfo:
1014 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
1015 # These are created on every request, so we use a NamedTuple
1016 # for performance reasons. We don't use the RequestInfo.__new__
1017 # method because it has a different signature which is provided
1018 # for backwards compatibility only.
1019 return tuple.__new__(
1020 RequestInfo, (self.url, self.method, headers, self.original_url)
1021 )
1022
1023 @property
1024 def session(self) -> "ClientSession":
1025 """Return the ClientSession instance.
1026
1027 This property provides access to the ClientSession that initiated
1028 this request, allowing middleware to make additional requests
1029 using the same session.
1030 """
1031 return self._session
1032
1033 def update_host(self, url: URL) -> None:
1034 """Update destination host, port and connection type (ssl)."""
1035 # get host/port
1036 if not url.raw_host:
1037 raise InvalidURL(url)
1038
1039 # basic auth info
1040 if url.raw_user or url.raw_password:
1041 self.auth = helpers.BasicAuth(url.user or "", url.password or "")
1042
1043 def update_version(self, version: Union[http.HttpVersion, str]) -> None:
1044 """Convert request version to two elements tuple.
1045
1046 parser HTTP version '1.1' => (1, 1)
1047 """
1048 if isinstance(version, str):
1049 v = [part.strip() for part in version.split(".", 1)]
1050 try:
1051 version = http.HttpVersion(int(v[0]), int(v[1]))
1052 except ValueError:
1053 raise ValueError(
1054 f"Can not parse http version number: {version}"
1055 ) from None
1056 self.version = version
1057
1058 def update_headers(self, headers: Optional[LooseHeaders]) -> None:
1059 """Update request headers."""
1060 self.headers: CIMultiDict[str] = CIMultiDict()
1061
1062 # Build the host header
1063 host = self.url.host_port_subcomponent
1064
1065 # host_port_subcomponent is None when the URL is a relative URL.
1066 # but we know we do not have a relative URL here.
1067 assert host is not None
1068 self.headers[hdrs.HOST] = host
1069
1070 if not headers:
1071 return
1072
1073 if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
1074 headers = headers.items()
1075
1076 for key, value in headers: # type: ignore[misc]
1077 # A special case for Host header
1078 if key in hdrs.HOST_ALL:
1079 self.headers[key] = value
1080 else:
1081 self.headers.add(key, value)
1082
1083 def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None:
1084 if skip_auto_headers is not None:
1085 self._skip_auto_headers = CIMultiDict(
1086 (hdr, None) for hdr in sorted(skip_auto_headers)
1087 )
1088 used_headers = self.headers.copy()
1089 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type]
1090 else:
1091 # Fast path when there are no headers to skip
1092 # which is the most common case.
1093 used_headers = self.headers
1094
1095 for hdr, val in self.DEFAULT_HEADERS.items():
1096 if hdr not in used_headers:
1097 self.headers[hdr] = val
1098
1099 if hdrs.USER_AGENT not in used_headers:
1100 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
1101
1102 def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
1103 """Update request cookies header."""
1104 if not cookies:
1105 return
1106
1107 c = SimpleCookie()
1108 if hdrs.COOKIE in self.headers:
1109 # parse_cookie_header for RFC 6265 compliant Cookie header parsing
1110 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, "")))
1111 del self.headers[hdrs.COOKIE]
1112
1113 if isinstance(cookies, Mapping):
1114 iter_cookies = cookies.items()
1115 else:
1116 iter_cookies = cookies # type: ignore[assignment]
1117 for name, value in iter_cookies:
1118 if isinstance(value, Morsel):
1119 # Use helper to preserve coded_value exactly as sent by server
1120 c[name] = preserve_morsel_with_coded_value(value)
1121 else:
1122 c[name] = value # type: ignore[assignment]
1123
1124 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
1125
1126 def update_content_encoding(self, data: Any) -> None:
1127 """Set request content encoding."""
1128 if not data:
1129 # Don't compress an empty body.
1130 self.compress = None
1131 return
1132
1133 if self.headers.get(hdrs.CONTENT_ENCODING):
1134 if self.compress:
1135 raise ValueError(
1136 "compress can not be set if Content-Encoding header is set"
1137 )
1138 elif self.compress:
1139 if not isinstance(self.compress, str):
1140 self.compress = "deflate"
1141 self.headers[hdrs.CONTENT_ENCODING] = self.compress
1142 self.chunked = True # enable chunked, no need to deal with length
1143
1144 def update_transfer_encoding(self) -> None:
1145 """Analyze transfer-encoding header."""
1146 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
1147
1148 if "chunked" in te:
1149 if self.chunked:
1150 raise ValueError(
1151 "chunked can not be set "
1152 'if "Transfer-Encoding: chunked" header is set'
1153 )
1154
1155 elif self.chunked:
1156 if hdrs.CONTENT_LENGTH in self.headers:
1157 raise ValueError(
1158 "chunked can not be set if Content-Length header is set"
1159 )
1160
1161 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
1162
1163 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
1164 """Set basic auth."""
1165 if auth is None:
1166 auth = self.auth
1167 if auth is None and trust_env and self.url.host is not None:
1168 netrc_obj = netrc_from_env()
1169 with contextlib.suppress(LookupError):
1170 auth = basicauth_from_netrc(netrc_obj, self.url.host)
1171 if auth is None:
1172 return
1173
1174 if not isinstance(auth, helpers.BasicAuth):
1175 raise TypeError("BasicAuth() tuple is required instead")
1176
1177 self.headers[hdrs.AUTHORIZATION] = auth.encode()
1178
1179 def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None:
1180 """Update request body from data."""
1181 if self._body is not None:
1182 _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel)
1183
1184 if body is None:
1185 self._body = None
1186 # Set Content-Length to 0 when body is None for methods that expect a body
1187 if (
1188 self.method not in self.GET_METHODS
1189 and not self.chunked
1190 and hdrs.CONTENT_LENGTH not in self.headers
1191 ):
1192 self.headers[hdrs.CONTENT_LENGTH] = "0"
1193 return
1194
1195 # FormData
1196 maybe_payload = body() if isinstance(body, FormData) else body
1197
1198 try:
1199 body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None)
1200 except payload.LookupError:
1201 body_payload = FormData(maybe_payload)() # type: ignore[arg-type]
1202
1203 self._body = body_payload
1204 # enable chunked encoding if needed
1205 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
1206 if (size := body_payload.size) is not None:
1207 self.headers[hdrs.CONTENT_LENGTH] = str(size)
1208 else:
1209 self.chunked = True
1210
1211 # copy payload headers
1212 assert body_payload.headers
1213 headers = self.headers
1214 skip_headers = self._skip_auto_headers
1215 for key, value in body_payload.headers.items():
1216 if key in headers or (skip_headers is not None and key in skip_headers):
1217 continue
1218 headers[key] = value
1219
1220 def _update_body(self, body: Any) -> None:
1221 """Update request body after its already been set."""
1222 # Remove existing Content-Length header since body is changing
1223 if hdrs.CONTENT_LENGTH in self.headers:
1224 del self.headers[hdrs.CONTENT_LENGTH]
1225
1226 # Remove existing Transfer-Encoding header to avoid conflicts
1227 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers:
1228 del self.headers[hdrs.TRANSFER_ENCODING]
1229
1230 # Now update the body using the existing method
1231 # Called from _update_body, add 1 to stacklevel from caller
1232 self.update_body_from_data(body, _stacklevel=4)
1233
1234 # Update transfer encoding headers if needed (same logic as __init__)
1235 if body is not None or self.method not in self.GET_METHODS:
1236 self.update_transfer_encoding()
1237
1238 async def update_body(self, body: Any) -> None:
1239 """
1240 Update request body and close previous payload if needed.
1241
1242 This method safely updates the request body by first closing any existing
1243 payload to prevent resource leaks, then setting the new body.
1244
1245 IMPORTANT: Always use this method instead of setting request.body directly.
1246 Direct assignment to request.body will leak resources if the previous body
1247 contains file handles, streams, or other resources that need cleanup.
1248
1249 Args:
1250 body: The new body content. Can be:
1251 - bytes/bytearray: Raw binary data
1252 - str: Text data (will be encoded using charset from Content-Type)
1253 - FormData: Form data that will be encoded as multipart/form-data
1254 - Payload: A pre-configured payload object
1255 - AsyncIterable: An async iterable of bytes chunks
1256 - File-like object: Will be read and sent as binary data
1257 - None: Clears the body
1258
1259 Usage:
1260 # CORRECT: Use update_body
1261 await request.update_body(b"new request data")
1262
1263 # WRONG: Don't set body directly
1264 # request.body = b"new request data" # This will leak resources!
1265
1266 # Update with form data
1267 form_data = FormData()
1268 form_data.add_field('field', 'value')
1269 await request.update_body(form_data)
1270
1271 # Clear body
1272 await request.update_body(None)
1273
1274 Note:
1275 This method is async because it may need to close file handles or
1276 other resources associated with the previous payload. Always await
1277 this method to ensure proper cleanup.
1278
1279 Warning:
1280 Setting request.body directly is highly discouraged and can lead to:
1281 - Resource leaks (unclosed file handles, streams)
1282 - Memory leaks (unreleased buffers)
1283 - Unexpected behavior with streaming payloads
1284
1285 It is not recommended to change the payload type in middleware. If the
1286 body was already set (e.g., as bytes), it's best to keep the same type
1287 rather than converting it (e.g., to str) as this may result in unexpected
1288 behavior.
1289
1290 See Also:
1291 - update_body_from_data: Synchronous body update without cleanup
1292 - body property: Direct body access (STRONGLY DISCOURAGED)
1293
1294 """
1295 # Close existing payload if it exists and needs closing
1296 if self._body is not None:
1297 await self._body.close()
1298 self._update_body(body)
1299
1300 def update_expect_continue(self, expect: bool = False) -> None:
1301 if expect:
1302 self.headers[hdrs.EXPECT] = "100-continue"
1303 elif (
1304 hdrs.EXPECT in self.headers
1305 and self.headers[hdrs.EXPECT].lower() == "100-continue"
1306 ):
1307 expect = True
1308
1309 if expect:
1310 self._continue = self.loop.create_future()
1311
1312 def update_proxy(
1313 self,
1314 proxy: Optional[URL],
1315 proxy_auth: Optional[BasicAuth],
1316 proxy_headers: Optional[LooseHeaders],
1317 ) -> None:
1318 self.proxy = proxy
1319 if proxy is None:
1320 self.proxy_auth = None
1321 self.proxy_headers = None
1322 return
1323
1324 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
1325 raise ValueError("proxy_auth must be None or BasicAuth() tuple")
1326 self.proxy_auth = proxy_auth
1327
1328 if proxy_headers is not None and not isinstance(
1329 proxy_headers, (MultiDict, MultiDictProxy)
1330 ):
1331 proxy_headers = CIMultiDict(proxy_headers)
1332 self.proxy_headers = proxy_headers
1333
1334 async def write_bytes(
1335 self,
1336 writer: AbstractStreamWriter,
1337 conn: "Connection",
1338 content_length: Optional[int] = None,
1339 ) -> None:
1340 """
1341 Write the request body to the connection stream.
1342
1343 This method handles writing different types of request bodies:
1344 1. Payload objects (using their specialized write_with_length method)
1345 2. Bytes/bytearray objects
1346 3. Iterable body content
1347
1348 Args:
1349 writer: The stream writer to write the body to
1350 conn: The connection being used for this request
1351 content_length: Optional maximum number of bytes to write from the body
1352 (None means write the entire body)
1353
1354 The method properly handles:
1355 - Waiting for 100-Continue responses if required
1356 - Content length constraints for chunked encoding
1357 - Error handling for network issues, cancellation, and other exceptions
1358 - Signaling EOF and timeout management
1359
1360 Raises:
1361 ClientOSError: When there's an OS-level error writing the body
1362 ClientConnectionError: When there's a general connection error
1363 asyncio.CancelledError: When the operation is cancelled
1364
1365 """
1366 # 100 response
1367 if self._continue is not None:
1368 # Force headers to be sent before waiting for 100-continue
1369 writer.send_headers()
1370 await writer.drain()
1371 await self._continue
1372
1373 protocol = conn.protocol
1374 assert protocol is not None
1375 try:
1376 # This should be a rare case but the
1377 # self._body can be set to None while
1378 # the task is being started or we wait above
1379 # for the 100-continue response.
1380 # The more likely case is we have an empty
1381 # payload, but 100-continue is still expected.
1382 if self._body is not None:
1383 await self._body.write_with_length(writer, content_length)
1384 except OSError as underlying_exc:
1385 reraised_exc = underlying_exc
1386
1387 # Distinguish between timeout and other OS errors for better error reporting
1388 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
1389 underlying_exc, asyncio.TimeoutError
1390 )
1391 if exc_is_not_timeout:
1392 reraised_exc = ClientOSError(
1393 underlying_exc.errno,
1394 f"Can not write request body for {self.url !s}",
1395 )
1396
1397 set_exception(protocol, reraised_exc, underlying_exc)
1398 except asyncio.CancelledError:
1399 # Body hasn't been fully sent, so connection can't be reused
1400 conn.close()
1401 raise
1402 except Exception as underlying_exc:
1403 set_exception(
1404 protocol,
1405 ClientConnectionError(
1406 "Failed to send bytes into the underlying connection "
1407 f"{conn !s}: {underlying_exc!r}",
1408 ),
1409 underlying_exc,
1410 )
1411 else:
1412 # Successfully wrote the body, signal EOF and start response timeout
1413 await writer.write_eof()
1414 protocol.start_timeout()
1415
1416 async def send(self, conn: "Connection") -> "ClientResponse":
1417 # Specify request target:
1418 # - CONNECT request must send authority form URI
1419 # - not CONNECT proxy must send absolute form URI
1420 # - most common is origin form URI
1421 if self.method == hdrs.METH_CONNECT:
1422 connect_host = self.url.host_subcomponent
1423 assert connect_host is not None
1424 path = f"{connect_host}:{self.url.port}"
1425 elif self.proxy and not self.is_ssl():
1426 path = str(self.url)
1427 else:
1428 path = self.url.raw_path_qs
1429
1430 protocol = conn.protocol
1431 assert protocol is not None
1432 writer = StreamWriter(
1433 protocol,
1434 self.loop,
1435 on_chunk_sent=(
1436 functools.partial(self._on_chunk_request_sent, self.method, self.url)
1437 if self._traces
1438 else None
1439 ),
1440 on_headers_sent=(
1441 functools.partial(self._on_headers_request_sent, self.method, self.url)
1442 if self._traces
1443 else None
1444 ),
1445 )
1446
1447 if self.compress:
1448 writer.enable_compression(self.compress) # type: ignore[arg-type]
1449
1450 if self.chunked is not None:
1451 writer.enable_chunking()
1452
1453 # set default content-type
1454 if (
1455 self.method in self.POST_METHODS
1456 and (
1457 self._skip_auto_headers is None
1458 or hdrs.CONTENT_TYPE not in self._skip_auto_headers
1459 )
1460 and hdrs.CONTENT_TYPE not in self.headers
1461 ):
1462 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
1463
1464 v = self.version
1465 if hdrs.CONNECTION not in self.headers:
1466 if conn._connector.force_close:
1467 if v == HttpVersion11:
1468 self.headers[hdrs.CONNECTION] = "close"
1469 elif v == HttpVersion10:
1470 self.headers[hdrs.CONNECTION] = "keep-alive"
1471
1472 # status + headers
1473 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"
1474
1475 # Buffer headers for potential coalescing with body
1476 await writer.write_headers(status_line, self.headers)
1477
1478 task: Optional["asyncio.Task[None]"]
1479 if self._body or self._continue is not None or protocol.writing_paused:
1480 coro = self.write_bytes(writer, conn, self._get_content_length())
1481 if sys.version_info >= (3, 12):
1482 # Optimization for Python 3.12, try to write
1483 # bytes immediately to avoid having to schedule
1484 # the task on the event loop.
1485 task = asyncio.Task(coro, loop=self.loop, eager_start=True)
1486 else:
1487 task = self.loop.create_task(coro)
1488 if task.done():
1489 task = None
1490 else:
1491 self._writer = task
1492 else:
1493 # We have nothing to write because
1494 # - there is no body
1495 # - the protocol does not have writing paused
1496 # - we are not waiting for a 100-continue response
1497 protocol.start_timeout()
1498 writer.set_eof()
1499 task = None
1500 response_class = self.response_class
1501 assert response_class is not None
1502 self.response = response_class(
1503 self.method,
1504 self.original_url,
1505 writer=task,
1506 continue100=self._continue,
1507 timer=self._timer,
1508 request_info=self.request_info,
1509 traces=self._traces,
1510 loop=self.loop,
1511 session=self._session,
1512 )
1513 return self.response
1514
1515 async def close(self) -> None:
1516 if self.__writer is not None:
1517 try:
1518 await self.__writer
1519 except asyncio.CancelledError:
1520 if (
1521 sys.version_info >= (3, 11)
1522 and (task := asyncio.current_task())
1523 and task.cancelling()
1524 ):
1525 raise
1526
1527 def terminate(self) -> None:
1528 if self.__writer is not None:
1529 if not self.loop.is_closed():
1530 self.__writer.cancel()
1531 self.__writer.remove_done_callback(self.__reset_writer)
1532 self.__writer = None
1533
1534 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
1535 for trace in self._traces:
1536 await trace.send_request_chunk_sent(method, url, chunk)
1537
1538 async def _on_headers_request_sent(
1539 self, method: str, url: URL, headers: "CIMultiDict[str]"
1540 ) -> None:
1541 for trace in self._traces:
1542 await trace.send_request_headers(method, url, headers)