1import asyncio
2import codecs
3import contextlib
4import functools
5import io
6import re
7import sys
8import traceback
9import warnings
10from collections.abc import Mapping
11from hashlib import md5, sha1, sha256
12from http.cookies import Morsel, SimpleCookie
13from types import MappingProxyType, TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Callable,
18 Dict,
19 Iterable,
20 List,
21 Literal,
22 NamedTuple,
23 Optional,
24 Tuple,
25 Type,
26 Union,
27)
28
29import attr
30from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
31from yarl import URL
32
33from . import hdrs, helpers, http, multipart, payload
34from ._cookie_helpers import (
35 parse_cookie_header,
36 parse_set_cookie_headers,
37 preserve_morsel_with_coded_value,
38)
39from .abc import AbstractStreamWriter
40from .client_exceptions import (
41 ClientConnectionError,
42 ClientOSError,
43 ClientResponseError,
44 ContentTypeError,
45 InvalidURL,
46 ServerFingerprintMismatch,
47)
48from .compression_utils import HAS_BROTLI, HAS_ZSTD
49from .formdata import FormData
50from .helpers import (
51 _SENTINEL,
52 BaseTimerContext,
53 BasicAuth,
54 HeadersMixin,
55 TimerNoop,
56 noop,
57 reify,
58 sentinel,
59 set_exception,
60 set_result,
61)
62from .http import (
63 SERVER_SOFTWARE,
64 HttpVersion,
65 HttpVersion10,
66 HttpVersion11,
67 StreamWriter,
68)
69from .streams import StreamReader
70from .typedefs import (
71 DEFAULT_JSON_DECODER,
72 JSONDecoder,
73 LooseCookies,
74 LooseHeaders,
75 Query,
76 RawHeaders,
77)
78
79if TYPE_CHECKING:
80 import ssl
81 from ssl import SSLContext
82else:
83 try:
84 import ssl
85 from ssl import SSLContext
86 except ImportError: # pragma: no cover
87 ssl = None # type: ignore[assignment]
88 SSLContext = object # type: ignore[misc,assignment]
89
90
91__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
92
93
94if TYPE_CHECKING:
95 from .client import ClientSession
96 from .connector import Connection
97 from .tracing import Trace
98
99
100_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
101_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
102json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json")
103
104
105def _gen_default_accept_encoding() -> str:
106 encodings = [
107 "gzip",
108 "deflate",
109 ]
110 if HAS_BROTLI:
111 encodings.append("br")
112 if HAS_ZSTD:
113 encodings.append("zstd")
114 return ", ".join(encodings)
115
116
117@attr.s(auto_attribs=True, frozen=True, slots=True)
118class ContentDisposition:
119 type: Optional[str]
120 parameters: "MappingProxyType[str, str]"
121 filename: Optional[str]
122
123
124class _RequestInfo(NamedTuple):
125 url: URL
126 method: str
127 headers: "CIMultiDictProxy[str]"
128 real_url: URL
129
130
131class RequestInfo(_RequestInfo):
132
133 def __new__(
134 cls,
135 url: URL,
136 method: str,
137 headers: "CIMultiDictProxy[str]",
138 real_url: Union[URL, _SENTINEL] = sentinel,
139 ) -> "RequestInfo":
140 """Create a new RequestInfo instance.
141
142 For backwards compatibility, the real_url parameter is optional.
143 """
144 return tuple.__new__(
145 cls, (url, method, headers, url if real_url is sentinel else real_url)
146 )
147
148
149class Fingerprint:
150 HASHFUNC_BY_DIGESTLEN = {
151 16: md5,
152 20: sha1,
153 32: sha256,
154 }
155
156 def __init__(self, fingerprint: bytes) -> None:
157 digestlen = len(fingerprint)
158 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
159 if not hashfunc:
160 raise ValueError("fingerprint has invalid length")
161 elif hashfunc is md5 or hashfunc is sha1:
162 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.")
163 self._hashfunc = hashfunc
164 self._fingerprint = fingerprint
165
166 @property
167 def fingerprint(self) -> bytes:
168 return self._fingerprint
169
170 def check(self, transport: asyncio.Transport) -> None:
171 if not transport.get_extra_info("sslcontext"):
172 return
173 sslobj = transport.get_extra_info("ssl_object")
174 cert = sslobj.getpeercert(binary_form=True)
175 got = self._hashfunc(cert).digest()
176 if got != self._fingerprint:
177 host, port, *_ = transport.get_extra_info("peername")
178 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
179
180
181if ssl is not None:
182 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
183else: # pragma: no cover
184 SSL_ALLOWED_TYPES = (bool, type(None))
185
186
187def _merge_ssl_params(
188 ssl: Union["SSLContext", bool, Fingerprint],
189 verify_ssl: Optional[bool],
190 ssl_context: Optional["SSLContext"],
191 fingerprint: Optional[bytes],
192) -> Union["SSLContext", bool, Fingerprint]:
193 if ssl is None:
194 ssl = True # Double check for backwards compatibility
195 if verify_ssl is not None and not verify_ssl:
196 warnings.warn(
197 "verify_ssl is deprecated, use ssl=False instead",
198 DeprecationWarning,
199 stacklevel=3,
200 )
201 if ssl is not True:
202 raise ValueError(
203 "verify_ssl, ssl_context, fingerprint and ssl "
204 "parameters are mutually exclusive"
205 )
206 else:
207 ssl = False
208 if ssl_context is not None:
209 warnings.warn(
210 "ssl_context is deprecated, use ssl=context instead",
211 DeprecationWarning,
212 stacklevel=3,
213 )
214 if ssl is not True:
215 raise ValueError(
216 "verify_ssl, ssl_context, fingerprint and ssl "
217 "parameters are mutually exclusive"
218 )
219 else:
220 ssl = ssl_context
221 if fingerprint is not None:
222 warnings.warn(
223 "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead",
224 DeprecationWarning,
225 stacklevel=3,
226 )
227 if ssl is not True:
228 raise ValueError(
229 "verify_ssl, ssl_context, fingerprint and ssl "
230 "parameters are mutually exclusive"
231 )
232 else:
233 ssl = Fingerprint(fingerprint)
234 if not isinstance(ssl, SSL_ALLOWED_TYPES):
235 raise TypeError(
236 "ssl should be SSLContext, bool, Fingerprint or None, "
237 "got {!r} instead.".format(ssl)
238 )
239 return ssl
240
241
242_SSL_SCHEMES = frozenset(("https", "wss"))
243
244
245# ConnectionKey is a NamedTuple because it is used as a key in a dict
246# and a set in the connector. Since a NamedTuple is a tuple it uses
247# the fast native tuple __hash__ and __eq__ implementation in CPython.
248class ConnectionKey(NamedTuple):
249 # the key should contain an information about used proxy / TLS
250 # to prevent reusing wrong connections from a pool
251 host: str
252 port: Optional[int]
253 is_ssl: bool
254 ssl: Union[SSLContext, bool, Fingerprint]
255 proxy: Optional[URL]
256 proxy_auth: Optional[BasicAuth]
257 proxy_headers_hash: Optional[int] # hash(CIMultiDict)
258
259
260def _is_expected_content_type(
261 response_content_type: str, expected_content_type: str
262) -> bool:
263 if expected_content_type == "application/json":
264 return json_re.match(response_content_type) is not None
265 return expected_content_type in response_content_type
266
267
268def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None:
269 """Warn if the payload is not closed.
270
271 Callers must check that the body is a Payload before calling this method.
272
273 Args:
274 payload: The payload to check
275 stacklevel: Stack level for the warning (default 2 for direct callers)
276 """
277 if not payload.autoclose and not payload.consumed:
278 warnings.warn(
279 "The previous request body contains unclosed resources. "
280 "Use await request.update_body() instead of setting request.body "
281 "directly to properly close resources and avoid leaks.",
282 ResourceWarning,
283 stacklevel=stacklevel,
284 )
285
286
287class ClientResponse(HeadersMixin):
288
289 # Some of these attributes are None when created,
290 # but will be set by the start() method.
291 # As the end user will likely never see the None values, we cheat the types below.
292 # from the Status-Line of the response
293 version: Optional[HttpVersion] = None # HTTP-Version
294 status: int = None # type: ignore[assignment] # Status-Code
295 reason: Optional[str] = None # Reason-Phrase
296
297 content: StreamReader = None # type: ignore[assignment] # Payload stream
298 _body: Optional[bytes] = None
299 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment]
300 _history: Tuple["ClientResponse", ...] = ()
301 _raw_headers: RawHeaders = None # type: ignore[assignment]
302
303 _connection: Optional["Connection"] = None # current connection
304 _cookies: Optional[SimpleCookie] = None
305 _raw_cookie_headers: Optional[Tuple[str, ...]] = None
306 _continue: Optional["asyncio.Future[bool]"] = None
307 _source_traceback: Optional[traceback.StackSummary] = None
308 _session: Optional["ClientSession"] = None
309 # set up by ClientRequest after ClientResponse object creation
310 # post-init stage allows to not change ctor signature
311 _closed = True # to allow __del__ for non-initialized properly response
312 _released = False
313 _in_context = False
314
315 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8"
316
317 __writer: Optional["asyncio.Task[None]"] = None
318
319 def __init__(
320 self,
321 method: str,
322 url: URL,
323 *,
324 writer: "Optional[asyncio.Task[None]]",
325 continue100: Optional["asyncio.Future[bool]"],
326 timer: BaseTimerContext,
327 request_info: RequestInfo,
328 traces: List["Trace"],
329 loop: asyncio.AbstractEventLoop,
330 session: "ClientSession",
331 ) -> None:
332 # URL forbids subclasses, so a simple type check is enough.
333 assert type(url) is URL
334
335 self.method = method
336
337 self._real_url = url
338 self._url = url.with_fragment(None) if url.raw_fragment else url
339 if writer is not None:
340 self._writer = writer
341 if continue100 is not None:
342 self._continue = continue100
343 self._request_info = request_info
344 self._timer = timer if timer is not None else TimerNoop()
345 self._cache: Dict[str, Any] = {}
346 self._traces = traces
347 self._loop = loop
348 # Save reference to _resolve_charset, so that get_encoding() will still
349 # work after the response has finished reading the body.
350 # TODO: Fix session=None in tests (see ClientRequest.__init__).
351 if session is not None:
352 # store a reference to session #1985
353 self._session = session
354 self._resolve_charset = session._resolve_charset
355 if loop.get_debug():
356 self._source_traceback = traceback.extract_stack(sys._getframe(1))
357
358 def __reset_writer(self, _: object = None) -> None:
359 self.__writer = None
360
361 @property
362 def _writer(self) -> Optional["asyncio.Task[None]"]:
363 """The writer task for streaming data.
364
365 _writer is only provided for backwards compatibility
366 for subclasses that may need to access it.
367 """
368 return self.__writer
369
370 @_writer.setter
371 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
372 """Set the writer task for streaming data."""
373 if self.__writer is not None:
374 self.__writer.remove_done_callback(self.__reset_writer)
375 self.__writer = writer
376 if writer is None:
377 return
378 if writer.done():
379 # The writer is already done, so we can clear it immediately.
380 self.__writer = None
381 else:
382 writer.add_done_callback(self.__reset_writer)
383
384 @property
385 def cookies(self) -> SimpleCookie:
386 if self._cookies is None:
387 if self._raw_cookie_headers is not None:
388 # Parse cookies for response.cookies (SimpleCookie for backward compatibility)
389 cookies = SimpleCookie()
390 # Use parse_set_cookie_headers for more lenient parsing that handles
391 # malformed cookies better than SimpleCookie.load
392 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers))
393 self._cookies = cookies
394 else:
395 self._cookies = SimpleCookie()
396 return self._cookies
397
398 @cookies.setter
399 def cookies(self, cookies: SimpleCookie) -> None:
400 self._cookies = cookies
401 # Generate raw cookie headers from the SimpleCookie
402 if cookies:
403 self._raw_cookie_headers = tuple(
404 morsel.OutputString() for morsel in cookies.values()
405 )
406 else:
407 self._raw_cookie_headers = None
408
409 @reify
410 def url(self) -> URL:
411 return self._url
412
413 @reify
414 def url_obj(self) -> URL:
415 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2)
416 return self._url
417
418 @reify
419 def real_url(self) -> URL:
420 return self._real_url
421
422 @reify
423 def host(self) -> str:
424 assert self._url.host is not None
425 return self._url.host
426
427 @reify
428 def headers(self) -> "CIMultiDictProxy[str]":
429 return self._headers
430
431 @reify
432 def raw_headers(self) -> RawHeaders:
433 return self._raw_headers
434
435 @reify
436 def request_info(self) -> RequestInfo:
437 return self._request_info
438
439 @reify
440 def content_disposition(self) -> Optional[ContentDisposition]:
441 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
442 if raw is None:
443 return None
444 disposition_type, params_dct = multipart.parse_content_disposition(raw)
445 params = MappingProxyType(params_dct)
446 filename = multipart.content_disposition_filename(params)
447 return ContentDisposition(disposition_type, params, filename)
448
449 def __del__(self, _warnings: Any = warnings) -> None:
450 if self._closed:
451 return
452
453 if self._connection is not None:
454 self._connection.release()
455 self._cleanup_writer()
456
457 if self._loop.get_debug():
458 kwargs = {"source": self}
459 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs)
460 context = {"client_response": self, "message": "Unclosed response"}
461 if self._source_traceback:
462 context["source_traceback"] = self._source_traceback
463 self._loop.call_exception_handler(context)
464
465 def __repr__(self) -> str:
466 out = io.StringIO()
467 ascii_encodable_url = str(self.url)
468 if self.reason:
469 ascii_encodable_reason = self.reason.encode(
470 "ascii", "backslashreplace"
471 ).decode("ascii")
472 else:
473 ascii_encodable_reason = "None"
474 print(
475 "<ClientResponse({}) [{} {}]>".format(
476 ascii_encodable_url, self.status, ascii_encodable_reason
477 ),
478 file=out,
479 )
480 print(self.headers, file=out)
481 return out.getvalue()
482
483 @property
484 def connection(self) -> Optional["Connection"]:
485 return self._connection
486
487 @reify
488 def history(self) -> Tuple["ClientResponse", ...]:
489 """A sequence of of responses, if redirects occurred."""
490 return self._history
491
492 @reify
493 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
494 links_str = ", ".join(self.headers.getall("link", []))
495
496 if not links_str:
497 return MultiDictProxy(MultiDict())
498
499 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict()
500
501 for val in re.split(r",(?=\s*<)", links_str):
502 match = re.match(r"\s*<(.*)>(.*)", val)
503 if match is None: # pragma: no cover
504 # the check exists to suppress mypy error
505 continue
506 url, params_str = match.groups()
507 params = params_str.split(";")[1:]
508
509 link: MultiDict[Union[str, URL]] = MultiDict()
510
511 for param in params:
512 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
513 if match is None: # pragma: no cover
514 # the check exists to suppress mypy error
515 continue
516 key, _, value, _ = match.groups()
517
518 link.add(key, value)
519
520 key = link.get("rel", url)
521
522 link.add("url", self.url.join(URL(url)))
523
524 links.add(str(key), MultiDictProxy(link))
525
526 return MultiDictProxy(links)
527
528 async def start(self, connection: "Connection") -> "ClientResponse":
529 """Start response processing."""
530 self._closed = False
531 self._protocol = connection.protocol
532 self._connection = connection
533
534 with self._timer:
535 while True:
536 # read response
537 try:
538 protocol = self._protocol
539 message, payload = await protocol.read() # type: ignore[union-attr]
540 except http.HttpProcessingError as exc:
541 raise ClientResponseError(
542 self.request_info,
543 self.history,
544 status=exc.code,
545 message=exc.message,
546 headers=exc.headers,
547 ) from exc
548
549 if message.code < 100 or message.code > 199 or message.code == 101:
550 break
551
552 if self._continue is not None:
553 set_result(self._continue, True)
554 self._continue = None
555
556 # payload eof handler
557 payload.on_eof(self._response_eof)
558
559 # response status
560 self.version = message.version
561 self.status = message.code
562 self.reason = message.reason
563
564 # headers
565 self._headers = message.headers # type is CIMultiDictProxy
566 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
567
568 # payload
569 self.content = payload
570
571 # cookies
572 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()):
573 # Store raw cookie headers for CookieJar
574 self._raw_cookie_headers = tuple(cookie_hdrs)
575 return self
576
577 def _response_eof(self) -> None:
578 if self._closed:
579 return
580
581 # protocol could be None because connection could be detached
582 protocol = self._connection and self._connection.protocol
583 if protocol is not None and protocol.upgraded:
584 return
585
586 self._closed = True
587 self._cleanup_writer()
588 self._release_connection()
589
590 @property
591 def closed(self) -> bool:
592 return self._closed
593
594 def close(self) -> None:
595 if not self._released:
596 self._notify_content()
597
598 self._closed = True
599 if self._loop is None or self._loop.is_closed():
600 return
601
602 self._cleanup_writer()
603 if self._connection is not None:
604 self._connection.close()
605 self._connection = None
606
607 def release(self) -> Any:
608 if not self._released:
609 self._notify_content()
610
611 self._closed = True
612
613 self._cleanup_writer()
614 self._release_connection()
615 return noop()
616
617 @property
618 def ok(self) -> bool:
619 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
620
621 This is **not** a check for ``200 OK`` but a check that the response
622 status is under 400.
623 """
624 return 400 > self.status
625
626 def raise_for_status(self) -> None:
627 if not self.ok:
628 # reason should always be not None for a started response
629 assert self.reason is not None
630
631 # If we're in a context we can rely on __aexit__() to release as the
632 # exception propagates.
633 if not self._in_context:
634 self.release()
635
636 raise ClientResponseError(
637 self.request_info,
638 self.history,
639 status=self.status,
640 message=self.reason,
641 headers=self.headers,
642 )
643
644 def _release_connection(self) -> None:
645 if self._connection is not None:
646 if self.__writer is None:
647 self._connection.release()
648 self._connection = None
649 else:
650 self.__writer.add_done_callback(lambda f: self._release_connection())
651
652 async def _wait_released(self) -> None:
653 if self.__writer is not None:
654 try:
655 await self.__writer
656 except asyncio.CancelledError:
657 if (
658 sys.version_info >= (3, 11)
659 and (task := asyncio.current_task())
660 and task.cancelling()
661 ):
662 raise
663 self._release_connection()
664
665 def _cleanup_writer(self) -> None:
666 if self.__writer is not None:
667 self.__writer.cancel()
668 self._session = None
669
670 def _notify_content(self) -> None:
671 content = self.content
672 if content and content.exception() is None:
673 set_exception(content, _CONNECTION_CLOSED_EXCEPTION)
674 self._released = True
675
676 async def wait_for_close(self) -> None:
677 if self.__writer is not None:
678 try:
679 await self.__writer
680 except asyncio.CancelledError:
681 if (
682 sys.version_info >= (3, 11)
683 and (task := asyncio.current_task())
684 and task.cancelling()
685 ):
686 raise
687 self.release()
688
689 async def read(self) -> bytes:
690 """Read response payload."""
691 if self._body is None:
692 try:
693 self._body = await self.content.read()
694 for trace in self._traces:
695 await trace.send_response_chunk_received(
696 self.method, self.url, self._body
697 )
698 except BaseException:
699 self.close()
700 raise
701 elif self._released: # Response explicitly released
702 raise ClientConnectionError("Connection closed")
703
704 protocol = self._connection and self._connection.protocol
705 if protocol is None or not protocol.upgraded:
706 await self._wait_released() # Underlying connection released
707 return self._body
708
709 def get_encoding(self) -> str:
710 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
711 mimetype = helpers.parse_mimetype(ctype)
712
713 encoding = mimetype.parameters.get("charset")
714 if encoding:
715 with contextlib.suppress(LookupError, ValueError):
716 return codecs.lookup(encoding).name
717
718 if mimetype.type == "application" and (
719 mimetype.subtype == "json" or mimetype.subtype == "rdap"
720 ):
721 # RFC 7159 states that the default encoding is UTF-8.
722 # RFC 7483 defines application/rdap+json
723 return "utf-8"
724
725 if self._body is None:
726 raise RuntimeError(
727 "Cannot compute fallback encoding of a not yet read body"
728 )
729
730 return self._resolve_charset(self, self._body)
731
732 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
733 """Read response payload and decode."""
734 if self._body is None:
735 await self.read()
736
737 if encoding is None:
738 encoding = self.get_encoding()
739
740 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
741
742 async def json(
743 self,
744 *,
745 encoding: Optional[str] = None,
746 loads: JSONDecoder = DEFAULT_JSON_DECODER,
747 content_type: Optional[str] = "application/json",
748 ) -> Any:
749 """Read and decodes JSON response."""
750 if self._body is None:
751 await self.read()
752
753 if content_type:
754 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
755 if not _is_expected_content_type(ctype, content_type):
756 raise ContentTypeError(
757 self.request_info,
758 self.history,
759 status=self.status,
760 message=(
761 "Attempt to decode JSON with unexpected mimetype: %s" % ctype
762 ),
763 headers=self.headers,
764 )
765
766 stripped = self._body.strip() # type: ignore[union-attr]
767 if not stripped:
768 return None
769
770 if encoding is None:
771 encoding = self.get_encoding()
772
773 return loads(stripped.decode(encoding))
774
775 async def __aenter__(self) -> "ClientResponse":
776 self._in_context = True
777 return self
778
779 async def __aexit__(
780 self,
781 exc_type: Optional[Type[BaseException]],
782 exc_val: Optional[BaseException],
783 exc_tb: Optional[TracebackType],
784 ) -> None:
785 self._in_context = False
786 # similar to _RequestContextManager, we do not need to check
787 # for exceptions, response object can close connection
788 # if state is broken
789 self.release()
790 await self.wait_for_close()
791
792
793class ClientRequest:
794 GET_METHODS = {
795 hdrs.METH_GET,
796 hdrs.METH_HEAD,
797 hdrs.METH_OPTIONS,
798 hdrs.METH_TRACE,
799 }
800 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
801 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
802
803 DEFAULT_HEADERS = {
804 hdrs.ACCEPT: "*/*",
805 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
806 }
807
808 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
809 _body: Union[None, payload.Payload] = None
810 auth = None
811 response = None
812
813 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data
814
815 # These class defaults help create_autospec() work correctly.
816 # If autospec is improved in future, maybe these can be removed.
817 url = URL()
818 method = "GET"
819
820 _continue = None # waiter future for '100 Continue' response
821
822 _skip_auto_headers: Optional["CIMultiDict[None]"] = None
823
824 # N.B.
825 # Adding __del__ method with self._writer closing doesn't make sense
826 # because _writer is instance method, thus it keeps a reference to self.
827 # Until writer has finished finalizer will not be called.
828
829 def __init__(
830 self,
831 method: str,
832 url: URL,
833 *,
834 params: Query = None,
835 headers: Optional[LooseHeaders] = None,
836 skip_auto_headers: Optional[Iterable[str]] = None,
837 data: Any = None,
838 cookies: Optional[LooseCookies] = None,
839 auth: Optional[BasicAuth] = None,
840 version: http.HttpVersion = http.HttpVersion11,
841 compress: Union[str, bool, None] = None,
842 chunked: Optional[bool] = None,
843 expect100: bool = False,
844 loop: Optional[asyncio.AbstractEventLoop] = None,
845 response_class: Optional[Type["ClientResponse"]] = None,
846 proxy: Optional[URL] = None,
847 proxy_auth: Optional[BasicAuth] = None,
848 timer: Optional[BaseTimerContext] = None,
849 session: Optional["ClientSession"] = None,
850 ssl: Union[SSLContext, bool, Fingerprint] = True,
851 proxy_headers: Optional[LooseHeaders] = None,
852 traces: Optional[List["Trace"]] = None,
853 trust_env: bool = False,
854 server_hostname: Optional[str] = None,
855 ):
856 if loop is None:
857 loop = asyncio.get_event_loop()
858 if match := _CONTAINS_CONTROL_CHAR_RE.search(method):
859 raise ValueError(
860 f"Method cannot contain non-token characters {method!r} "
861 f"(found at least {match.group()!r})"
862 )
863 # URL forbids subclasses, so a simple type check is enough.
864 assert type(url) is URL, url
865 if proxy is not None:
866 assert type(proxy) is URL, proxy
867 # FIXME: session is None in tests only, need to fix tests
868 # assert session is not None
869 if TYPE_CHECKING:
870 assert session is not None
871 self._session = session
872 if params:
873 url = url.extend_query(params)
874 self.original_url = url
875 self.url = url.with_fragment(None) if url.raw_fragment else url
876 self.method = method.upper()
877 self.chunked = chunked
878 self.compress = compress
879 self.loop = loop
880 self.length = None
881 if response_class is None:
882 real_response_class = ClientResponse
883 else:
884 real_response_class = response_class
885 self.response_class: Type[ClientResponse] = real_response_class
886 self._timer = timer if timer is not None else TimerNoop()
887 self._ssl = ssl if ssl is not None else True
888 self.server_hostname = server_hostname
889
890 if loop.get_debug():
891 self._source_traceback = traceback.extract_stack(sys._getframe(1))
892
893 self.update_version(version)
894 self.update_host(url)
895 self.update_headers(headers)
896 self.update_auto_headers(skip_auto_headers)
897 self.update_cookies(cookies)
898 self.update_content_encoding(data)
899 self.update_auth(auth, trust_env)
900 self.update_proxy(proxy, proxy_auth, proxy_headers)
901
902 self.update_body_from_data(data)
903 if data is not None or self.method not in self.GET_METHODS:
904 self.update_transfer_encoding()
905 self.update_expect_continue(expect100)
906 self._traces = [] if traces is None else traces
907
908 def __reset_writer(self, _: object = None) -> None:
909 self.__writer = None
910
911 def _get_content_length(self) -> Optional[int]:
912 """Extract and validate Content-Length header value.
913
914 Returns parsed Content-Length value or None if not set.
915 Raises ValueError if header exists but cannot be parsed as an integer.
916 """
917 if hdrs.CONTENT_LENGTH not in self.headers:
918 return None
919
920 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
921 try:
922 return int(content_length_hdr)
923 except ValueError:
924 raise ValueError(
925 f"Invalid Content-Length header: {content_length_hdr}"
926 ) from None
927
928 @property
929 def skip_auto_headers(self) -> CIMultiDict[None]:
930 return self._skip_auto_headers or CIMultiDict()
931
932 @property
933 def _writer(self) -> Optional["asyncio.Task[None]"]:
934 return self.__writer
935
936 @_writer.setter
937 def _writer(self, writer: "asyncio.Task[None]") -> None:
938 if self.__writer is not None:
939 self.__writer.remove_done_callback(self.__reset_writer)
940 self.__writer = writer
941 writer.add_done_callback(self.__reset_writer)
942
943 def is_ssl(self) -> bool:
944 return self.url.scheme in _SSL_SCHEMES
945
946 @property
947 def ssl(self) -> Union["SSLContext", bool, Fingerprint]:
948 return self._ssl
949
950 @property
951 def connection_key(self) -> ConnectionKey:
952 if proxy_headers := self.proxy_headers:
953 h: Optional[int] = hash(tuple(proxy_headers.items()))
954 else:
955 h = None
956 url = self.url
957 return tuple.__new__(
958 ConnectionKey,
959 (
960 url.raw_host or "",
961 url.port,
962 url.scheme in _SSL_SCHEMES,
963 self._ssl,
964 self.proxy,
965 self.proxy_auth,
966 h,
967 ),
968 )
969
970 @property
971 def host(self) -> str:
972 ret = self.url.raw_host
973 assert ret is not None
974 return ret
975
976 @property
977 def port(self) -> Optional[int]:
978 return self.url.port
979
980 @property
981 def body(self) -> Union[payload.Payload, Literal[b""]]:
982 """Request body."""
983 # empty body is represented as bytes for backwards compatibility
984 return self._body or b""
985
986 @body.setter
987 def body(self, value: Any) -> None:
988 """Set request body with warning for non-autoclose payloads.
989
990 WARNING: This setter must be called from within an event loop and is not
991 thread-safe. Setting body outside of an event loop may raise RuntimeError
992 when closing file-based payloads.
993
994 DEPRECATED: Direct assignment to body is deprecated and will be removed
995 in a future version. Use await update_body() instead for proper resource
996 management.
997 """
998 # Close existing payload if present
999 if self._body is not None:
1000 # Warn if the payload needs manual closing
1001 # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload
1002 _warn_if_unclosed_payload(self._body, stacklevel=3)
1003 # NOTE: In the future, when we remove sync close support,
1004 # this setter will need to be removed and only the async
1005 # update_body() method will be available. For now, we call
1006 # _close() for backwards compatibility.
1007 self._body._close()
1008 self._update_body(value)
1009
1010 @property
1011 def request_info(self) -> RequestInfo:
1012 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
1013 # These are created on every request, so we use a NamedTuple
1014 # for performance reasons. We don't use the RequestInfo.__new__
1015 # method because it has a different signature which is provided
1016 # for backwards compatibility only.
1017 return tuple.__new__(
1018 RequestInfo, (self.url, self.method, headers, self.original_url)
1019 )
1020
1021 @property
1022 def session(self) -> "ClientSession":
1023 """Return the ClientSession instance.
1024
1025 This property provides access to the ClientSession that initiated
1026 this request, allowing middleware to make additional requests
1027 using the same session.
1028 """
1029 return self._session
1030
1031 def update_host(self, url: URL) -> None:
1032 """Update destination host, port and connection type (ssl)."""
1033 # get host/port
1034 if not url.raw_host:
1035 raise InvalidURL(url)
1036
1037 # basic auth info
1038 if url.raw_user or url.raw_password:
1039 self.auth = helpers.BasicAuth(url.user or "", url.password or "")
1040
1041 def update_version(self, version: Union[http.HttpVersion, str]) -> None:
1042 """Convert request version to two elements tuple.
1043
1044 parser HTTP version '1.1' => (1, 1)
1045 """
1046 if isinstance(version, str):
1047 v = [part.strip() for part in version.split(".", 1)]
1048 try:
1049 version = http.HttpVersion(int(v[0]), int(v[1]))
1050 except ValueError:
1051 raise ValueError(
1052 f"Can not parse http version number: {version}"
1053 ) from None
1054 self.version = version
1055
1056 def update_headers(self, headers: Optional[LooseHeaders]) -> None:
1057 """Update request headers."""
1058 self.headers: CIMultiDict[str] = CIMultiDict()
1059
1060 # Build the host header
1061 host = self.url.host_port_subcomponent
1062
1063 # host_port_subcomponent is None when the URL is a relative URL.
1064 # but we know we do not have a relative URL here.
1065 assert host is not None
1066 self.headers[hdrs.HOST] = host
1067
1068 if not headers:
1069 return
1070
1071 if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
1072 headers = headers.items()
1073
1074 for key, value in headers: # type: ignore[misc]
1075 # A special case for Host header
1076 if key in hdrs.HOST_ALL:
1077 self.headers[key] = value
1078 else:
1079 self.headers.add(key, value)
1080
1081 def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None:
1082 if skip_auto_headers is not None:
1083 self._skip_auto_headers = CIMultiDict(
1084 (hdr, None) for hdr in sorted(skip_auto_headers)
1085 )
1086 used_headers = self.headers.copy()
1087 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type]
1088 else:
1089 # Fast path when there are no headers to skip
1090 # which is the most common case.
1091 used_headers = self.headers
1092
1093 for hdr, val in self.DEFAULT_HEADERS.items():
1094 if hdr not in used_headers:
1095 self.headers[hdr] = val
1096
1097 if hdrs.USER_AGENT not in used_headers:
1098 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
1099
1100 def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
1101 """Update request cookies header."""
1102 if not cookies:
1103 return
1104
1105 c = SimpleCookie()
1106 if hdrs.COOKIE in self.headers:
1107 # parse_cookie_header for RFC 6265 compliant Cookie header parsing
1108 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, "")))
1109 del self.headers[hdrs.COOKIE]
1110
1111 if isinstance(cookies, Mapping):
1112 iter_cookies = cookies.items()
1113 else:
1114 iter_cookies = cookies # type: ignore[assignment]
1115 for name, value in iter_cookies:
1116 if isinstance(value, Morsel):
1117 # Use helper to preserve coded_value exactly as sent by server
1118 c[name] = preserve_morsel_with_coded_value(value)
1119 else:
1120 c[name] = value # type: ignore[assignment]
1121
1122 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
1123
1124 def update_content_encoding(self, data: Any) -> None:
1125 """Set request content encoding."""
1126 if not data:
1127 # Don't compress an empty body.
1128 self.compress = None
1129 return
1130
1131 if self.headers.get(hdrs.CONTENT_ENCODING):
1132 if self.compress:
1133 raise ValueError(
1134 "compress can not be set if Content-Encoding header is set"
1135 )
1136 elif self.compress:
1137 if not isinstance(self.compress, str):
1138 self.compress = "deflate"
1139 self.headers[hdrs.CONTENT_ENCODING] = self.compress
1140 self.chunked = True # enable chunked, no need to deal with length
1141
1142 def update_transfer_encoding(self) -> None:
1143 """Analyze transfer-encoding header."""
1144 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
1145
1146 if "chunked" in te:
1147 if self.chunked:
1148 raise ValueError(
1149 "chunked can not be set "
1150 'if "Transfer-Encoding: chunked" header is set'
1151 )
1152
1153 elif self.chunked:
1154 if hdrs.CONTENT_LENGTH in self.headers:
1155 raise ValueError(
1156 "chunked can not be set if Content-Length header is set"
1157 )
1158
1159 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
1160
1161 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
1162 """Set basic auth."""
1163 if auth is None:
1164 auth = self.auth
1165 if auth is None:
1166 return
1167
1168 if not isinstance(auth, helpers.BasicAuth):
1169 raise TypeError("BasicAuth() tuple is required instead")
1170
1171 self.headers[hdrs.AUTHORIZATION] = auth.encode()
1172
1173 def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None:
1174 """Update request body from data."""
1175 if self._body is not None:
1176 _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel)
1177
1178 if body is None:
1179 self._body = None
1180 # Set Content-Length to 0 when body is None for methods that expect a body
1181 if (
1182 self.method not in self.GET_METHODS
1183 and not self.chunked
1184 and hdrs.CONTENT_LENGTH not in self.headers
1185 ):
1186 self.headers[hdrs.CONTENT_LENGTH] = "0"
1187 return
1188
1189 # FormData
1190 maybe_payload = body() if isinstance(body, FormData) else body
1191
1192 try:
1193 body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None)
1194 except payload.LookupError:
1195 body_payload = FormData(maybe_payload)() # type: ignore[arg-type]
1196
1197 self._body = body_payload
1198 # enable chunked encoding if needed
1199 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
1200 if (size := body_payload.size) is not None:
1201 self.headers[hdrs.CONTENT_LENGTH] = str(size)
1202 else:
1203 self.chunked = True
1204
1205 # copy payload headers
1206 assert body_payload.headers
1207 headers = self.headers
1208 skip_headers = self._skip_auto_headers
1209 for key, value in body_payload.headers.items():
1210 if key in headers or (skip_headers is not None and key in skip_headers):
1211 continue
1212 headers[key] = value
1213
1214 def _update_body(self, body: Any) -> None:
1215 """Update request body after its already been set."""
1216 # Remove existing Content-Length header since body is changing
1217 if hdrs.CONTENT_LENGTH in self.headers:
1218 del self.headers[hdrs.CONTENT_LENGTH]
1219
1220 # Remove existing Transfer-Encoding header to avoid conflicts
1221 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers:
1222 del self.headers[hdrs.TRANSFER_ENCODING]
1223
1224 # Now update the body using the existing method
1225 # Called from _update_body, add 1 to stacklevel from caller
1226 self.update_body_from_data(body, _stacklevel=4)
1227
1228 # Update transfer encoding headers if needed (same logic as __init__)
1229 if body is not None or self.method not in self.GET_METHODS:
1230 self.update_transfer_encoding()
1231
1232 async def update_body(self, body: Any) -> None:
1233 """
1234 Update request body and close previous payload if needed.
1235
1236 This method safely updates the request body by first closing any existing
1237 payload to prevent resource leaks, then setting the new body.
1238
1239 IMPORTANT: Always use this method instead of setting request.body directly.
1240 Direct assignment to request.body will leak resources if the previous body
1241 contains file handles, streams, or other resources that need cleanup.
1242
1243 Args:
1244 body: The new body content. Can be:
1245 - bytes/bytearray: Raw binary data
1246 - str: Text data (will be encoded using charset from Content-Type)
1247 - FormData: Form data that will be encoded as multipart/form-data
1248 - Payload: A pre-configured payload object
1249 - AsyncIterable: An async iterable of bytes chunks
1250 - File-like object: Will be read and sent as binary data
1251 - None: Clears the body
1252
1253 Usage:
1254 # CORRECT: Use update_body
1255 await request.update_body(b"new request data")
1256
1257 # WRONG: Don't set body directly
1258 # request.body = b"new request data" # This will leak resources!
1259
1260 # Update with form data
1261 form_data = FormData()
1262 form_data.add_field('field', 'value')
1263 await request.update_body(form_data)
1264
1265 # Clear body
1266 await request.update_body(None)
1267
1268 Note:
1269 This method is async because it may need to close file handles or
1270 other resources associated with the previous payload. Always await
1271 this method to ensure proper cleanup.
1272
1273 Warning:
1274 Setting request.body directly is highly discouraged and can lead to:
1275 - Resource leaks (unclosed file handles, streams)
1276 - Memory leaks (unreleased buffers)
1277 - Unexpected behavior with streaming payloads
1278
1279 It is not recommended to change the payload type in middleware. If the
1280 body was already set (e.g., as bytes), it's best to keep the same type
1281 rather than converting it (e.g., to str) as this may result in unexpected
1282 behavior.
1283
1284 See Also:
1285 - update_body_from_data: Synchronous body update without cleanup
1286 - body property: Direct body access (STRONGLY DISCOURAGED)
1287
1288 """
1289 # Close existing payload if it exists and needs closing
1290 if self._body is not None:
1291 await self._body.close()
1292 self._update_body(body)
1293
1294 def update_expect_continue(self, expect: bool = False) -> None:
1295 if expect:
1296 self.headers[hdrs.EXPECT] = "100-continue"
1297 elif (
1298 hdrs.EXPECT in self.headers
1299 and self.headers[hdrs.EXPECT].lower() == "100-continue"
1300 ):
1301 expect = True
1302
1303 if expect:
1304 self._continue = self.loop.create_future()
1305
1306 def update_proxy(
1307 self,
1308 proxy: Optional[URL],
1309 proxy_auth: Optional[BasicAuth],
1310 proxy_headers: Optional[LooseHeaders],
1311 ) -> None:
1312 self.proxy = proxy
1313 if proxy is None:
1314 self.proxy_auth = None
1315 self.proxy_headers = None
1316 return
1317
1318 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
1319 raise ValueError("proxy_auth must be None or BasicAuth() tuple")
1320 self.proxy_auth = proxy_auth
1321
1322 if proxy_headers is not None and not isinstance(
1323 proxy_headers, (MultiDict, MultiDictProxy)
1324 ):
1325 proxy_headers = CIMultiDict(proxy_headers)
1326 self.proxy_headers = proxy_headers
1327
1328 async def write_bytes(
1329 self,
1330 writer: AbstractStreamWriter,
1331 conn: "Connection",
1332 content_length: Optional[int] = None,
1333 ) -> None:
1334 """
1335 Write the request body to the connection stream.
1336
1337 This method handles writing different types of request bodies:
1338 1. Payload objects (using their specialized write_with_length method)
1339 2. Bytes/bytearray objects
1340 3. Iterable body content
1341
1342 Args:
1343 writer: The stream writer to write the body to
1344 conn: The connection being used for this request
1345 content_length: Optional maximum number of bytes to write from the body
1346 (None means write the entire body)
1347
1348 The method properly handles:
1349 - Waiting for 100-Continue responses if required
1350 - Content length constraints for chunked encoding
1351 - Error handling for network issues, cancellation, and other exceptions
1352 - Signaling EOF and timeout management
1353
1354 Raises:
1355 ClientOSError: When there's an OS-level error writing the body
1356 ClientConnectionError: When there's a general connection error
1357 asyncio.CancelledError: When the operation is cancelled
1358
1359 """
1360 # 100 response
1361 if self._continue is not None:
1362 # Force headers to be sent before waiting for 100-continue
1363 writer.send_headers()
1364 await writer.drain()
1365 await self._continue
1366
1367 protocol = conn.protocol
1368 assert protocol is not None
1369 try:
1370 # This should be a rare case but the
1371 # self._body can be set to None while
1372 # the task is being started or we wait above
1373 # for the 100-continue response.
1374 # The more likely case is we have an empty
1375 # payload, but 100-continue is still expected.
1376 if self._body is not None:
1377 await self._body.write_with_length(writer, content_length)
1378 except OSError as underlying_exc:
1379 reraised_exc = underlying_exc
1380
1381 # Distinguish between timeout and other OS errors for better error reporting
1382 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
1383 underlying_exc, asyncio.TimeoutError
1384 )
1385 if exc_is_not_timeout:
1386 reraised_exc = ClientOSError(
1387 underlying_exc.errno,
1388 f"Can not write request body for {self.url !s}",
1389 )
1390
1391 set_exception(protocol, reraised_exc, underlying_exc)
1392 except asyncio.CancelledError:
1393 # Body hasn't been fully sent, so connection can't be reused
1394 conn.close()
1395 raise
1396 except Exception as underlying_exc:
1397 set_exception(
1398 protocol,
1399 ClientConnectionError(
1400 "Failed to send bytes into the underlying connection "
1401 f"{conn !s}: {underlying_exc!r}",
1402 ),
1403 underlying_exc,
1404 )
1405 else:
1406 # Successfully wrote the body, signal EOF and start response timeout
1407 await writer.write_eof()
1408 protocol.start_timeout()
1409
1410 async def send(self, conn: "Connection") -> "ClientResponse":
1411 # Specify request target:
1412 # - CONNECT request must send authority form URI
1413 # - not CONNECT proxy must send absolute form URI
1414 # - most common is origin form URI
1415 if self.method == hdrs.METH_CONNECT:
1416 connect_host = self.url.host_subcomponent
1417 assert connect_host is not None
1418 path = f"{connect_host}:{self.url.port}"
1419 elif self.proxy and not self.is_ssl():
1420 path = str(self.url)
1421 else:
1422 path = self.url.raw_path_qs
1423
1424 protocol = conn.protocol
1425 assert protocol is not None
1426 writer = StreamWriter(
1427 protocol,
1428 self.loop,
1429 on_chunk_sent=(
1430 functools.partial(self._on_chunk_request_sent, self.method, self.url)
1431 if self._traces
1432 else None
1433 ),
1434 on_headers_sent=(
1435 functools.partial(self._on_headers_request_sent, self.method, self.url)
1436 if self._traces
1437 else None
1438 ),
1439 )
1440
1441 if self.compress:
1442 writer.enable_compression(self.compress) # type: ignore[arg-type]
1443
1444 if self.chunked is not None:
1445 writer.enable_chunking()
1446
1447 # set default content-type
1448 if (
1449 self.method in self.POST_METHODS
1450 and (
1451 self._skip_auto_headers is None
1452 or hdrs.CONTENT_TYPE not in self._skip_auto_headers
1453 )
1454 and hdrs.CONTENT_TYPE not in self.headers
1455 ):
1456 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
1457
1458 v = self.version
1459 if hdrs.CONNECTION not in self.headers:
1460 if conn._connector.force_close:
1461 if v == HttpVersion11:
1462 self.headers[hdrs.CONNECTION] = "close"
1463 elif v == HttpVersion10:
1464 self.headers[hdrs.CONNECTION] = "keep-alive"
1465
1466 # status + headers
1467 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"
1468
1469 # Buffer headers for potential coalescing with body
1470 await writer.write_headers(status_line, self.headers)
1471
1472 task: Optional["asyncio.Task[None]"]
1473 if self._body or self._continue is not None or protocol.writing_paused:
1474 coro = self.write_bytes(writer, conn, self._get_content_length())
1475 if sys.version_info >= (3, 12):
1476 # Optimization for Python 3.12, try to write
1477 # bytes immediately to avoid having to schedule
1478 # the task on the event loop.
1479 task = asyncio.Task(coro, loop=self.loop, eager_start=True)
1480 else:
1481 task = self.loop.create_task(coro)
1482 if task.done():
1483 task = None
1484 else:
1485 self._writer = task
1486 else:
1487 # We have nothing to write because
1488 # - there is no body
1489 # - the protocol does not have writing paused
1490 # - we are not waiting for a 100-continue response
1491 protocol.start_timeout()
1492 writer.set_eof()
1493 task = None
1494 response_class = self.response_class
1495 assert response_class is not None
1496 self.response = response_class(
1497 self.method,
1498 self.original_url,
1499 writer=task,
1500 continue100=self._continue,
1501 timer=self._timer,
1502 request_info=self.request_info,
1503 traces=self._traces,
1504 loop=self.loop,
1505 session=self._session,
1506 )
1507 return self.response
1508
1509 async def close(self) -> None:
1510 if self.__writer is not None:
1511 try:
1512 await self.__writer
1513 except asyncio.CancelledError:
1514 if (
1515 sys.version_info >= (3, 11)
1516 and (task := asyncio.current_task())
1517 and task.cancelling()
1518 ):
1519 raise
1520
1521 def terminate(self) -> None:
1522 if self.__writer is not None:
1523 if not self.loop.is_closed():
1524 self.__writer.cancel()
1525 self.__writer.remove_done_callback(self.__reset_writer)
1526 self.__writer = None
1527
1528 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
1529 for trace in self._traces:
1530 await trace.send_request_chunk_sent(method, url, chunk)
1531
1532 async def _on_headers_request_sent(
1533 self, method: str, url: URL, headers: "CIMultiDict[str]"
1534 ) -> None:
1535 for trace in self._traces:
1536 await trace.send_request_headers(method, url, headers)