1import asyncio
2import codecs
3import contextlib
4import functools
5import io
6import re
7import sys
8import traceback
9import warnings
10from collections.abc import Callable, Iterable, Mapping
11from hashlib import md5, sha1, sha256
12from http.cookies import Morsel, SimpleCookie
13from types import MappingProxyType, TracebackType
14from typing import TYPE_CHECKING, Any, Literal, NamedTuple, Optional, Union
15
16import attr
17from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
18from yarl import URL
19
20from . import hdrs, helpers, http, multipart, payload
21from ._cookie_helpers import (
22 parse_cookie_header,
23 parse_set_cookie_headers,
24 preserve_morsel_with_coded_value,
25)
26from .abc import AbstractStreamWriter
27from .client_exceptions import (
28 ClientConnectionError,
29 ClientOSError,
30 ClientResponseError,
31 ContentTypeError,
32 InvalidURL,
33 ServerFingerprintMismatch,
34)
35from .compression_utils import HAS_BROTLI, HAS_ZSTD
36from .formdata import FormData
37from .helpers import (
38 _SENTINEL,
39 BaseTimerContext,
40 BasicAuth,
41 HeadersMixin,
42 TimerNoop,
43 _basic_auth_no_warn,
44 noop,
45 reify,
46 sentinel,
47 set_exception,
48 set_result,
49)
50from .http import (
51 SERVER_SOFTWARE,
52 HttpVersion,
53 HttpVersion10,
54 HttpVersion11,
55 StreamWriter,
56)
57from .streams import StreamReader
58from .typedefs import (
59 DEFAULT_JSON_DECODER,
60 JSONDecoder,
61 LooseCookies,
62 LooseHeaders,
63 Query,
64 RawHeaders,
65)
66
67if TYPE_CHECKING:
68 import ssl
69 from ssl import SSLContext
70else:
71 try:
72 import ssl
73 from ssl import SSLContext
74 except ImportError: # pragma: no cover
75 ssl = None # type: ignore[assignment]
76 SSLContext = object # type: ignore[misc,assignment]
77
78
79__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
80
81
82if TYPE_CHECKING:
83 from .client import ClientSession
84 from .connector import Connection
85 from .tracing import Trace
86
87
88_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
89_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
90json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json")
91_DIGITS_RE = re.compile(r"\d+", re.ASCII)
92
93
94def _gen_default_accept_encoding() -> str:
95 encodings = [
96 "gzip",
97 "deflate",
98 ]
99 if HAS_BROTLI:
100 encodings.append("br")
101 if HAS_ZSTD:
102 encodings.append("zstd")
103 return ", ".join(encodings)
104
105
106@attr.s(auto_attribs=True, frozen=True, slots=True)
107class ContentDisposition:
108 type: str | None
109 parameters: "MappingProxyType[str, str]"
110 filename: str | None
111
112
113class _RequestInfo(NamedTuple):
114 url: URL
115 method: str
116 headers: "CIMultiDictProxy[str]"
117 real_url: URL
118
119
120class RequestInfo(_RequestInfo):
121
122 def __new__(
123 cls,
124 url: URL,
125 method: str,
126 headers: "CIMultiDictProxy[str]",
127 real_url: URL | _SENTINEL = sentinel,
128 ) -> "RequestInfo":
129 """Create a new RequestInfo instance.
130
131 For backwards compatibility, the real_url parameter is optional.
132 """
133 return tuple.__new__(
134 cls, (url, method, headers, url if real_url is sentinel else real_url)
135 )
136
137
138class Fingerprint:
139 HASHFUNC_BY_DIGESTLEN = {
140 16: md5,
141 20: sha1,
142 32: sha256,
143 }
144
145 def __init__(self, fingerprint: bytes) -> None:
146 digestlen = len(fingerprint)
147 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
148 if not hashfunc:
149 raise ValueError("fingerprint has invalid length")
150 elif hashfunc is md5 or hashfunc is sha1:
151 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.")
152 self._hashfunc = hashfunc
153 self._fingerprint = fingerprint
154
155 @property
156 def fingerprint(self) -> bytes:
157 return self._fingerprint
158
159 def check(self, transport: asyncio.Transport) -> None:
160 if not transport.get_extra_info("sslcontext"):
161 return
162 sslobj = transport.get_extra_info("ssl_object")
163 cert = sslobj.getpeercert(binary_form=True)
164 got = self._hashfunc(cert).digest()
165 if got != self._fingerprint:
166 host, port, *_ = transport.get_extra_info("peername")
167 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
168
169
170if ssl is not None:
171 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
172else: # pragma: no cover
173 SSL_ALLOWED_TYPES = (bool, type(None))
174
175
176def _merge_ssl_params(
177 ssl: Union["SSLContext", bool, Fingerprint],
178 verify_ssl: bool | None,
179 ssl_context: Optional["SSLContext"],
180 fingerprint: bytes | None,
181) -> Union["SSLContext", bool, Fingerprint]:
182 if ssl is None:
183 ssl = True # Double check for backwards compatibility
184 if verify_ssl is not None and not verify_ssl:
185 warnings.warn(
186 "verify_ssl is deprecated, use ssl=False instead",
187 DeprecationWarning,
188 stacklevel=3,
189 )
190 if ssl is not True:
191 raise ValueError(
192 "verify_ssl, ssl_context, fingerprint and ssl "
193 "parameters are mutually exclusive"
194 )
195 else:
196 ssl = False
197 if ssl_context is not None:
198 warnings.warn(
199 "ssl_context is deprecated, use ssl=context instead",
200 DeprecationWarning,
201 stacklevel=3,
202 )
203 if ssl is not True:
204 raise ValueError(
205 "verify_ssl, ssl_context, fingerprint and ssl "
206 "parameters are mutually exclusive"
207 )
208 else:
209 ssl = ssl_context
210 if fingerprint is not None:
211 warnings.warn(
212 "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead",
213 DeprecationWarning,
214 stacklevel=3,
215 )
216 if ssl is not True:
217 raise ValueError(
218 "verify_ssl, ssl_context, fingerprint and ssl "
219 "parameters are mutually exclusive"
220 )
221 else:
222 ssl = Fingerprint(fingerprint)
223 if not isinstance(ssl, SSL_ALLOWED_TYPES):
224 raise TypeError(
225 "ssl should be SSLContext, bool, Fingerprint or None, "
226 f"got {ssl!r} instead."
227 )
228 return ssl
229
230
231_SSL_SCHEMES = frozenset(("https", "wss"))
232
233
234# ConnectionKey is a NamedTuple because it is used as a key in a dict
235# and a set in the connector. Since a NamedTuple is a tuple it uses
236# the fast native tuple __hash__ and __eq__ implementation in CPython.
237class ConnectionKey(NamedTuple):
238 # the key should contain an information about used proxy / TLS
239 # to prevent reusing wrong connections from a pool
240 host: str
241 port: int | None
242 is_ssl: bool
243 ssl: SSLContext | bool | Fingerprint
244 proxy: URL | None
245 proxy_auth: BasicAuth | None
246 proxy_headers_hash: int | None # hash(CIMultiDict)
247 server_hostname: str | None = None
248
249
250def _is_expected_content_type(
251 response_content_type: str, expected_content_type: str
252) -> bool:
253 if expected_content_type == "application/json":
254 return json_re.match(response_content_type) is not None
255 return expected_content_type in response_content_type
256
257
258def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None:
259 """Warn if the payload is not closed.
260
261 Callers must check that the body is a Payload before calling this method.
262
263 Args:
264 payload: The payload to check
265 stacklevel: Stack level for the warning (default 2 for direct callers)
266 """
267 if not payload.autoclose and not payload.consumed:
268 warnings.warn(
269 "The previous request body contains unclosed resources. "
270 "Use await request.update_body() instead of setting request.body "
271 "directly to properly close resources and avoid leaks.",
272 ResourceWarning,
273 stacklevel=stacklevel,
274 )
275
276
277class ClientResponse(HeadersMixin):
278
279 # Some of these attributes are None when created,
280 # but will be set by the start() method.
281 # As the end user will likely never see the None values, we cheat the types below.
282 # from the Status-Line of the response
283 version: HttpVersion | None = None # HTTP-Version
284 status: int = None # type: ignore[assignment] # Status-Code
285 reason: str | None = None # Reason-Phrase
286
287 content: StreamReader = None # type: ignore[assignment] # Payload stream
288 _body: bytes | None = None
289 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment]
290 _history: tuple["ClientResponse", ...] = ()
291 _raw_headers: RawHeaders = None # type: ignore[assignment]
292
293 _connection: Optional["Connection"] = None # current connection
294 _cookies: SimpleCookie | None = None
295 _raw_cookie_headers: tuple[str, ...] | None = None
296 _continue: Optional["asyncio.Future[bool]"] = None
297 _source_traceback: traceback.StackSummary | None = None
298 _session: Optional["ClientSession"] = None
299 # set up by ClientRequest after ClientResponse object creation
300 # post-init stage allows to not change ctor signature
301 _closed = True # to allow __del__ for non-initialized properly response
302 _released = False
303 _in_context = False
304
305 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8"
306
307 __writer: Optional["asyncio.Task[None]"] = None
308 _stream_writer: Optional[AbstractStreamWriter] = None
309 _output_size: int = 0
310 _upload_complete: Optional[asyncio.Future[None]] = None
311
312 def __init__(
313 self,
314 method: str,
315 url: URL,
316 *,
317 writer: "asyncio.Task[None] | None",
318 continue100: Optional["asyncio.Future[bool]"],
319 timer: BaseTimerContext,
320 request_info: RequestInfo,
321 traces: list["Trace"],
322 loop: asyncio.AbstractEventLoop,
323 session: "ClientSession",
324 stream_writer: AbstractStreamWriter,
325 ) -> None:
326 # URL forbids subclasses, so a simple type check is enough.
327 assert type(url) is URL
328
329 self.method = method
330
331 self._real_url = url
332 self._url = url.with_fragment(None) if url.raw_fragment else url
333 if writer is None: # Request already sent
334 self._output_size = stream_writer.output_size
335 else:
336 self._stream_writer = stream_writer
337 self._writer = writer
338 if continue100 is not None:
339 self._continue = continue100
340 self._request_info = request_info
341 self._timer = timer if timer is not None else TimerNoop()
342 self._cache: dict[str, Any] = {}
343 self._traces = traces
344 self._loop = loop
345 # Save reference to _resolve_charset, so that get_encoding() will still
346 # work after the response has finished reading the body.
347 # TODO: Fix session=None in tests (see ClientRequest.__init__).
348 if session is not None:
349 # store a reference to session #1985
350 self._session = session
351 self._resolve_charset = session._resolve_charset
352 if loop.get_debug():
353 self._source_traceback = traceback.extract_stack(sys._getframe(1))
354
355 def __reset_writer(self, _: object = None) -> None:
356 self.__writer = None
357 if self._stream_writer is not None:
358 self._output_size = self._stream_writer.output_size
359 self._stream_writer = None
360 if self._upload_complete is not None and not self._upload_complete.done():
361 self._upload_complete.set_result(None)
362
363 @property
364 def _writer(self) -> Optional["asyncio.Task[None]"]:
365 """The writer task for streaming data.
366
367 _writer is only provided for backwards compatibility
368 for subclasses that may need to access it.
369 """
370 return self.__writer
371
372 @_writer.setter
373 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
374 """Set the writer task for streaming data."""
375 if self.__writer is not None:
376 self.__writer.remove_done_callback(self.__reset_writer)
377 self.__writer = writer
378 if writer is None:
379 return
380 if writer.done():
381 # The writer is already done, so we can clear it immediately.
382 self.__reset_writer()
383 else:
384 writer.add_done_callback(self.__reset_writer)
385
386 @property
387 def output_size(self) -> int:
388 """Number of bytes sent for this request."""
389 if self._stream_writer is not None:
390 return self._stream_writer.output_size
391 return self._output_size
392
393 @property
394 def upload_complete(self) -> "asyncio.Future[None]":
395 """Future set when the request body has been fully sent.
396
397 Already done when the request had no body or was written eagerly.
398 """
399 if self._upload_complete is None:
400 self._upload_complete = self._loop.create_future()
401 if self._stream_writer is None: # upload already finished
402 self._upload_complete.set_result(None)
403 return self._upload_complete
404
405 @property
406 def cookies(self) -> SimpleCookie:
407 if self._cookies is None:
408 if self._raw_cookie_headers is not None:
409 # Parse cookies for response.cookies (SimpleCookie for backward compatibility)
410 cookies = SimpleCookie()
411 # Use parse_set_cookie_headers for more lenient parsing that handles
412 # malformed cookies better than SimpleCookie.load
413 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers))
414 self._cookies = cookies
415 else:
416 self._cookies = SimpleCookie()
417 return self._cookies
418
419 @cookies.setter
420 def cookies(self, cookies: SimpleCookie) -> None:
421 self._cookies = cookies
422 # Generate raw cookie headers from the SimpleCookie
423 if cookies:
424 self._raw_cookie_headers = tuple(
425 morsel.OutputString() for morsel in cookies.values()
426 )
427 else:
428 self._raw_cookie_headers = None
429
430 @reify
431 def url(self) -> URL:
432 return self._url
433
434 @reify
435 def url_obj(self) -> URL:
436 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2)
437 return self._url
438
439 @reify
440 def real_url(self) -> URL:
441 return self._real_url
442
443 @reify
444 def host(self) -> str:
445 assert self._url.host is not None
446 return self._url.host
447
448 @reify
449 def headers(self) -> "CIMultiDictProxy[str]":
450 return self._headers
451
452 @reify
453 def raw_headers(self) -> RawHeaders:
454 return self._raw_headers
455
456 @reify
457 def request_info(self) -> RequestInfo:
458 return self._request_info
459
460 @reify
461 def content_disposition(self) -> ContentDisposition | None:
462 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
463 if raw is None:
464 return None
465 disposition_type, params_dct = multipart.parse_content_disposition(raw)
466 params = MappingProxyType(params_dct)
467 filename = multipart.content_disposition_filename(params)
468 return ContentDisposition(disposition_type, params, filename)
469
470 def __del__(self, _warnings: Any = warnings) -> None:
471 if self._closed:
472 return
473
474 if self._connection is not None:
475 self._connection.release()
476 self._cleanup_writer()
477
478 if self._loop.get_debug():
479 kwargs = {"source": self}
480 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs)
481 context = {"client_response": self, "message": "Unclosed response"}
482 if self._source_traceback:
483 context["source_traceback"] = self._source_traceback
484 self._loop.call_exception_handler(context)
485
486 def __repr__(self) -> str:
487 out = io.StringIO()
488 ascii_encodable_url = str(self.url)
489 if self.reason:
490 ascii_encodable_reason = self.reason.encode(
491 "ascii", "backslashreplace"
492 ).decode("ascii")
493 else:
494 ascii_encodable_reason = "None"
495 print(
496 f"<ClientResponse({ascii_encodable_url}) [{self.status} {ascii_encodable_reason}]>",
497 file=out,
498 )
499 print(self.headers, file=out)
500 return out.getvalue()
501
502 @property
503 def connection(self) -> Optional["Connection"]:
504 return self._connection
505
506 @reify
507 def history(self) -> tuple["ClientResponse", ...]:
508 """A sequence of of responses, if redirects occurred."""
509 return self._history
510
511 @reify
512 def links(self) -> "MultiDictProxy[MultiDictProxy[str | URL]]":
513 links_str = ", ".join(self.headers.getall("link", []))
514
515 if not links_str:
516 return MultiDictProxy(MultiDict())
517
518 links: MultiDict[MultiDictProxy[str | URL]] = MultiDict()
519
520 for val in re.split(r",(?=\s*<)", links_str):
521 match = re.match(r"\s*<(.*)>(.*)", val)
522 if match is None: # pragma: no cover
523 # the check exists to suppress mypy error
524 continue
525 url, params_str = match.groups()
526 params = params_str.split(";")[1:]
527
528 link: MultiDict[str | URL] = MultiDict()
529
530 for param in params:
531 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
532 if match is None: # pragma: no cover
533 # the check exists to suppress mypy error
534 continue
535 key, _, value, _ = match.groups()
536
537 link.add(key, value)
538
539 key = link.get("rel", url)
540
541 link.add("url", self.url.join(URL(url)))
542
543 links.add(str(key), MultiDictProxy(link))
544
545 return MultiDictProxy(links)
546
547 async def start(self, connection: "Connection") -> "ClientResponse":
548 """Start response processing."""
549 self._closed = False
550 self._protocol = connection.protocol
551 self._connection = connection
552
553 with self._timer:
554 while True:
555 # read response
556 try:
557 protocol = self._protocol
558 message, payload = await protocol.read() # type: ignore[union-attr]
559 except http.HttpProcessingError as exc:
560 raise ClientResponseError(
561 self.request_info,
562 self.history,
563 status=exc.code,
564 message=exc.message,
565 headers=exc.headers,
566 ) from exc
567
568 if message.code < 100 or message.code > 199 or message.code == 101:
569 break
570
571 if self._continue is not None:
572 set_result(self._continue, True)
573 self._continue = None
574
575 # payload eof handler
576 payload.on_eof(self._response_eof)
577
578 # response status
579 self.version = message.version
580 self.status = message.code
581 self.reason = message.reason
582
583 # headers
584 self._headers = message.headers # type is CIMultiDictProxy
585 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
586
587 # payload
588 self.content = payload
589
590 # cookies
591 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()):
592 # Store raw cookie headers for CookieJar
593 self._raw_cookie_headers = tuple(cookie_hdrs)
594 return self
595
596 def _response_eof(self) -> None:
597 if self._closed:
598 return
599
600 # protocol could be None because connection could be detached
601 protocol = self._connection and self._connection.protocol
602 if protocol is not None and protocol.upgraded:
603 return
604
605 self._closed = True
606 self._cleanup_writer()
607 self._release_connection()
608
609 @property
610 def closed(self) -> bool:
611 return self._closed
612
613 def close(self) -> None:
614 if not self._released:
615 self._notify_content()
616
617 self._closed = True
618 if self._loop is None or self._loop.is_closed():
619 return
620
621 self._cleanup_writer()
622 if self._connection is not None:
623 self._connection.close()
624 self._connection = None
625
626 def release(self) -> Any:
627 if not self._released:
628 self._notify_content()
629
630 self._closed = True
631
632 self._cleanup_writer()
633 self._release_connection()
634 return noop()
635
636 @property
637 def ok(self) -> bool:
638 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
639
640 This is **not** a check for ``200 OK`` but a check that the response
641 status is under 400.
642 """
643 return 400 > self.status
644
645 def raise_for_status(self) -> None:
646 if not self.ok:
647 # reason should always be not None for a started response
648 assert self.reason is not None
649
650 # If we're in a context we can rely on __aexit__() to release as the
651 # exception propagates.
652 if not self._in_context:
653 self.release()
654
655 raise ClientResponseError(
656 self.request_info,
657 self.history,
658 status=self.status,
659 message=self.reason,
660 headers=self.headers,
661 )
662
663 def _release_connection(self) -> None:
664 if self._connection is not None:
665 if self.__writer is None:
666 self._connection.release()
667 self._connection = None
668 else:
669 self.__writer.add_done_callback(lambda f: self._release_connection())
670
671 async def _wait_released(self) -> None:
672 if self.__writer is not None:
673 try:
674 await self.__writer
675 except asyncio.CancelledError:
676 if (
677 sys.version_info >= (3, 11)
678 and (task := asyncio.current_task())
679 and task.cancelling()
680 ):
681 raise
682 self._release_connection()
683
684 def _cleanup_writer(self) -> None:
685 if self.__writer is not None:
686 self.__writer.cancel()
687 if self._stream_writer is not None:
688 self._output_size = self._stream_writer.output_size
689 self._stream_writer = None
690 self._session = None
691
692 def _notify_content(self) -> None:
693 content = self.content
694 if content and content.exception() is None:
695 set_exception(content, _CONNECTION_CLOSED_EXCEPTION)
696 self._released = True
697
698 async def wait_for_close(self) -> None:
699 if self.__writer is not None:
700 try:
701 await self.__writer
702 except asyncio.CancelledError:
703 if (
704 sys.version_info >= (3, 11)
705 and (task := asyncio.current_task())
706 and task.cancelling()
707 ):
708 raise
709 self.release()
710
711 async def read(self) -> bytes:
712 """Read response payload."""
713 if self._body is None:
714 try:
715 self._body = await self.content.read()
716 for trace in self._traces:
717 await trace.send_response_chunk_received(
718 self.method, self.url, self._body
719 )
720 except BaseException:
721 self.close()
722 raise
723 elif self._released: # Response explicitly released
724 raise ClientConnectionError("Connection closed")
725
726 protocol = self._connection and self._connection.protocol
727 if protocol is None or not protocol.upgraded:
728 await self._wait_released() # Underlying connection released
729 return self._body
730
731 def get_encoding(self) -> str:
732 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
733 mimetype = helpers.parse_mimetype(ctype)
734
735 encoding = mimetype.parameters.get("charset")
736 if encoding:
737 with contextlib.suppress(LookupError, ValueError):
738 return codecs.lookup(encoding).name
739
740 if mimetype.type == "application" and (
741 mimetype.subtype == "json" or mimetype.subtype == "rdap"
742 ):
743 # RFC 7159 states that the default encoding is UTF-8.
744 # RFC 7483 defines application/rdap+json
745 return "utf-8"
746
747 if self._body is None:
748 raise RuntimeError(
749 "Cannot compute fallback encoding of a not yet read body"
750 )
751
752 return self._resolve_charset(self, self._body)
753
754 async def text(self, encoding: str | None = None, errors: str = "strict") -> str:
755 """Read response payload and decode."""
756 if self._body is None:
757 await self.read()
758
759 if encoding is None:
760 encoding = self.get_encoding()
761
762 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
763
764 async def json(
765 self,
766 *,
767 encoding: str | None = None,
768 loads: JSONDecoder = DEFAULT_JSON_DECODER,
769 content_type: str | None = "application/json",
770 ) -> Any:
771 """Read and decodes JSON response."""
772 if self._body is None:
773 await self.read()
774
775 if content_type:
776 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
777 if not _is_expected_content_type(ctype, content_type):
778 raise ContentTypeError(
779 self.request_info,
780 self.history,
781 status=self.status,
782 message=(
783 "Attempt to decode JSON with unexpected mimetype: %s" % ctype
784 ),
785 headers=self.headers,
786 )
787
788 stripped = self._body.strip() # type: ignore[union-attr]
789 if not stripped:
790 return None
791
792 if encoding is None:
793 encoding = self.get_encoding()
794
795 return loads(stripped.decode(encoding))
796
797 async def __aenter__(self) -> "ClientResponse":
798 self._in_context = True
799 return self
800
801 async def __aexit__(
802 self,
803 exc_type: type[BaseException] | None,
804 exc_val: BaseException | None,
805 exc_tb: TracebackType | None,
806 ) -> None:
807 self._in_context = False
808 # similar to _RequestContextManager, we do not need to check
809 # for exceptions, response object can close connection
810 # if state is broken
811 self.release()
812 await self.wait_for_close()
813
814
815class ClientRequest:
816 GET_METHODS = {
817 hdrs.METH_GET,
818 hdrs.METH_HEAD,
819 hdrs.METH_OPTIONS,
820 hdrs.METH_TRACE,
821 }
822 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
823 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
824
825 DEFAULT_HEADERS = {
826 hdrs.ACCEPT: "*/*",
827 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
828 }
829
830 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
831 _body: None | payload.Payload = None
832 auth = None
833 response = None
834
835 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data
836
837 # These class defaults help create_autospec() work correctly.
838 # If autospec is improved in future, maybe these can be removed.
839 url = URL()
840 method = "GET"
841
842 _continue = None # waiter future for '100 Continue' response
843
844 _skip_auto_headers: Optional["CIMultiDict[None]"] = None
845
846 # N.B.
847 # Adding __del__ method with self._writer closing doesn't make sense
848 # because _writer is instance method, thus it keeps a reference to self.
849 # Until writer has finished finalizer will not be called.
850
851 def __init__(
852 self,
853 method: str,
854 url: URL,
855 *,
856 params: Query = None,
857 headers: LooseHeaders | None = None,
858 skip_auto_headers: Iterable[str] | None = None,
859 data: Any = None,
860 cookies: LooseCookies | None = None,
861 auth: BasicAuth | None = None,
862 version: http.HttpVersion = http.HttpVersion11,
863 compress: str | bool | None = None,
864 chunked: bool | None = None,
865 expect100: bool = False,
866 loop: asyncio.AbstractEventLoop | None = None,
867 response_class: type["ClientResponse"] | None = None,
868 proxy: URL | None = None,
869 proxy_auth: BasicAuth | None = None,
870 timer: BaseTimerContext | None = None,
871 session: Optional["ClientSession"] = None,
872 ssl: SSLContext | bool | Fingerprint = True,
873 proxy_headers: LooseHeaders | None = None,
874 traces: list["Trace"] | None = None,
875 trust_env: bool = False,
876 server_hostname: str | None = None,
877 ):
878 if loop is None:
879 loop = asyncio.get_event_loop()
880 if match := _CONTAINS_CONTROL_CHAR_RE.search(method):
881 raise ValueError(
882 f"Method cannot contain non-token characters {method!r} "
883 f"(found at least {match.group()!r})"
884 )
885 # URL forbids subclasses, so a simple type check is enough.
886 assert type(url) is URL, url
887 if proxy is not None:
888 assert type(proxy) is URL, proxy
889 # FIXME: session is None in tests only, need to fix tests
890 # assert session is not None
891 if TYPE_CHECKING:
892 assert session is not None
893 self._session = session
894 if params:
895 url = url.extend_query(params)
896 self.original_url = url
897 self.url = url.with_fragment(None) if url.raw_fragment else url
898 self.method = method.upper()
899 self.chunked = chunked
900 self.compress = compress
901 self.loop = loop
902 self.length = None
903 if response_class is None:
904 real_response_class = ClientResponse
905 else:
906 real_response_class = response_class
907 self.response_class: type[ClientResponse] = real_response_class
908 self._timer = timer if timer is not None else TimerNoop()
909 self._ssl = ssl if ssl is not None else True
910 self.server_hostname = server_hostname
911
912 if loop.get_debug():
913 self._source_traceback = traceback.extract_stack(sys._getframe(1))
914
915 self.update_version(version)
916 self.update_host(url)
917 self.update_headers(headers)
918 self.update_auto_headers(skip_auto_headers)
919 self.update_cookies(cookies)
920 self.update_content_encoding(data)
921 self.update_auth(auth, trust_env)
922 self.update_proxy(proxy, proxy_auth, proxy_headers)
923
924 self.update_body_from_data(data)
925 if data is not None or self.method not in self.GET_METHODS:
926 self.update_transfer_encoding()
927 self.update_expect_continue(expect100)
928 self._traces = [] if traces is None else traces
929
930 def __reset_writer(self, _: object = None) -> None:
931 self.__writer = None
932
933 def _get_content_length(self) -> int | None:
934 """Extract and validate Content-Length header value.
935
936 Returns parsed Content-Length value or None if not set.
937 Raises ValueError if header exists but cannot be parsed as an integer.
938 """
939 if hdrs.CONTENT_LENGTH not in self.headers:
940 return None
941
942 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
943 if not _DIGITS_RE.fullmatch(content_length_hdr):
944 raise ValueError(f"Invalid Content-Length header: {content_length_hdr!r}")
945 return int(content_length_hdr)
946
947 @property
948 def skip_auto_headers(self) -> CIMultiDict[None]:
949 return self._skip_auto_headers or CIMultiDict()
950
951 @property
952 def _writer(self) -> Optional["asyncio.Task[None]"]:
953 return self.__writer
954
955 @_writer.setter
956 def _writer(self, writer: "asyncio.Task[None]") -> None:
957 if self.__writer is not None:
958 self.__writer.remove_done_callback(self.__reset_writer)
959 self.__writer = writer
960 writer.add_done_callback(self.__reset_writer)
961
962 def is_ssl(self) -> bool:
963 return self.url.scheme in _SSL_SCHEMES
964
965 @property
966 def ssl(self) -> Union["SSLContext", bool, Fingerprint]:
967 return self._ssl
968
969 @property
970 def connection_key(self) -> ConnectionKey:
971 if proxy_headers := self.proxy_headers:
972 h: int | None = hash(tuple(proxy_headers.items()))
973 else:
974 h = None
975 url = self.url
976 return tuple.__new__(
977 ConnectionKey,
978 (
979 url.raw_host or "",
980 url.port,
981 url.scheme in _SSL_SCHEMES,
982 self._ssl,
983 self.proxy,
984 self.proxy_auth,
985 h,
986 self.server_hostname,
987 ),
988 )
989
990 @property
991 def host(self) -> str:
992 ret = self.url.raw_host
993 assert ret is not None
994 return ret
995
996 @property
997 def port(self) -> int | None:
998 return self.url.port
999
1000 @property
1001 def body(self) -> payload.Payload | Literal[b""]:
1002 """Request body."""
1003 # empty body is represented as bytes for backwards compatibility
1004 return self._body or b""
1005
1006 @body.setter
1007 def body(self, value: Any) -> None:
1008 """Set request body with warning for non-autoclose payloads.
1009
1010 WARNING: This setter must be called from within an event loop and is not
1011 thread-safe. Setting body outside of an event loop may raise RuntimeError
1012 when closing file-based payloads.
1013
1014 DEPRECATED: Direct assignment to body is deprecated and will be removed
1015 in a future version. Use await update_body() instead for proper resource
1016 management.
1017 """
1018 # Close existing payload if present
1019 if self._body is not None:
1020 # Warn if the payload needs manual closing
1021 # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload
1022 _warn_if_unclosed_payload(self._body, stacklevel=3)
1023 # NOTE: In the future, when we remove sync close support,
1024 # this setter will need to be removed and only the async
1025 # update_body() method will be available. For now, we call
1026 # _close() for backwards compatibility.
1027 self._body._close()
1028 self._update_body(value)
1029
1030 @property
1031 def request_info(self) -> RequestInfo:
1032 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
1033 # These are created on every request, so we use a NamedTuple
1034 # for performance reasons. We don't use the RequestInfo.__new__
1035 # method because it has a different signature which is provided
1036 # for backwards compatibility only.
1037 return tuple.__new__(
1038 RequestInfo, (self.url, self.method, headers, self.original_url)
1039 )
1040
1041 @property
1042 def session(self) -> "ClientSession":
1043 """Return the ClientSession instance.
1044
1045 This property provides access to the ClientSession that initiated
1046 this request, allowing middleware to make additional requests
1047 using the same session.
1048 """
1049 return self._session
1050
1051 def update_host(self, url: URL) -> None:
1052 """Update destination host, port and connection type (ssl)."""
1053 # get host/port
1054 if not url.raw_host:
1055 raise InvalidURL(url)
1056
1057 # basic auth info
1058 if url.raw_user or url.raw_password:
1059 self.auth = _basic_auth_no_warn(url.user or "", url.password or "")
1060
1061 def update_version(self, version: http.HttpVersion | str) -> None:
1062 """Convert request version to two elements tuple.
1063
1064 parser HTTP version '1.1' => (1, 1)
1065 """
1066 if isinstance(version, str):
1067 v = [part.strip() for part in version.split(".", 1)]
1068 try:
1069 version = http.HttpVersion(int(v[0]), int(v[1]))
1070 except ValueError:
1071 raise ValueError(
1072 f"Can not parse http version number: {version}"
1073 ) from None
1074 self.version = version
1075
1076 def update_headers(self, headers: LooseHeaders | None) -> None:
1077 """Update request headers."""
1078 self.headers: CIMultiDict[str] = CIMultiDict()
1079
1080 # Build the host header
1081 host = self.url.host_port_subcomponent
1082
1083 # host_port_subcomponent is None when the URL is a relative URL.
1084 # but we know we do not have a relative URL here.
1085 assert host is not None
1086 self.headers[hdrs.HOST] = host
1087
1088 if not headers:
1089 return
1090
1091 if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
1092 headers = headers.items()
1093
1094 for key, value in headers: # type: ignore[str-unpack]
1095 # A special case for Host header
1096 if key in hdrs.HOST_ALL:
1097 self.headers[key] = value
1098 else:
1099 self.headers.add(key, value)
1100
1101 def update_auto_headers(self, skip_auto_headers: Iterable[str] | None) -> None:
1102 if skip_auto_headers is not None:
1103 self._skip_auto_headers = CIMultiDict(
1104 (hdr, None) for hdr in sorted(skip_auto_headers)
1105 )
1106 used_headers = self.headers.copy()
1107 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type]
1108 else:
1109 # Fast path when there are no headers to skip
1110 # which is the most common case.
1111 used_headers = self.headers
1112
1113 for hdr, val in self.DEFAULT_HEADERS.items():
1114 if hdr not in used_headers:
1115 self.headers[hdr] = val
1116
1117 if hdrs.USER_AGENT not in used_headers:
1118 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
1119
1120 def update_cookies(self, cookies: LooseCookies | None) -> None:
1121 """Update request cookies header."""
1122 if not cookies:
1123 return
1124
1125 c = SimpleCookie()
1126 if hdrs.COOKIE in self.headers:
1127 # parse_cookie_header for RFC 6265 compliant Cookie header parsing
1128 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, "")))
1129 del self.headers[hdrs.COOKIE]
1130
1131 if isinstance(cookies, Mapping):
1132 iter_cookies = cookies.items()
1133 else:
1134 iter_cookies = cookies # type: ignore[assignment]
1135 for name, value in iter_cookies:
1136 if isinstance(value, Morsel):
1137 # Use helper to preserve coded_value exactly as sent by server
1138 c[name] = preserve_morsel_with_coded_value(value)
1139 else:
1140 c[name] = value # type: ignore[assignment]
1141
1142 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
1143
1144 def update_content_encoding(self, data: Any) -> None:
1145 """Set request content encoding."""
1146 if not data:
1147 # Don't compress an empty body.
1148 self.compress = None
1149 return
1150
1151 if self.headers.get(hdrs.CONTENT_ENCODING):
1152 if self.compress:
1153 raise ValueError(
1154 "compress can not be set if Content-Encoding header is set"
1155 )
1156 elif self.compress:
1157 if not isinstance(self.compress, str):
1158 self.compress = "deflate"
1159 self.headers[hdrs.CONTENT_ENCODING] = self.compress
1160 self.chunked = True # enable chunked, no need to deal with length
1161
1162 def update_transfer_encoding(self) -> None:
1163 """Analyze transfer-encoding header."""
1164 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
1165
1166 if "chunked" in te:
1167 if self.chunked:
1168 raise ValueError(
1169 "chunked can not be set "
1170 'if "Transfer-Encoding: chunked" header is set'
1171 )
1172
1173 elif self.chunked:
1174 if hdrs.CONTENT_LENGTH in self.headers:
1175 raise ValueError(
1176 "chunked can not be set if Content-Length header is set"
1177 )
1178
1179 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
1180
1181 def update_auth(self, auth: BasicAuth | None, trust_env: bool = False) -> None:
1182 """Set basic auth."""
1183 if auth is None:
1184 auth = self.auth
1185 if auth is None:
1186 return
1187
1188 if not isinstance(auth, helpers.BasicAuth):
1189 raise TypeError("BasicAuth() tuple is required instead")
1190
1191 self.headers[hdrs.AUTHORIZATION] = auth.encode()
1192
1193 def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None:
1194 """Update request body from data."""
1195 if self._body is not None:
1196 _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel)
1197
1198 if body is None:
1199 self._body = None
1200 # Set Content-Length to 0 when body is None for methods that expect a body
1201 if (
1202 self.method not in self.GET_METHODS
1203 and not self.chunked
1204 and hdrs.CONTENT_LENGTH not in self.headers
1205 ):
1206 self.headers[hdrs.CONTENT_LENGTH] = "0"
1207 return
1208
1209 # FormData
1210 maybe_payload = body() if isinstance(body, FormData) else body
1211
1212 try:
1213 body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None)
1214 except payload.LookupError:
1215 body_payload = FormData(maybe_payload)() # type: ignore[arg-type]
1216
1217 self._body = body_payload
1218 # enable chunked encoding if needed
1219 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
1220 if (size := body_payload.size) is not None:
1221 self.headers[hdrs.CONTENT_LENGTH] = str(size)
1222 else:
1223 self.chunked = True
1224
1225 # copy payload headers
1226 assert body_payload.headers
1227 headers = self.headers
1228 skip_headers = self._skip_auto_headers
1229 for key, value in body_payload.headers.items():
1230 if key in headers or (skip_headers is not None and key in skip_headers):
1231 continue
1232 headers[key] = value
1233
1234 def _update_body(self, body: Any) -> None:
1235 """Update request body after its already been set."""
1236 # Remove existing Content-Length header since body is changing
1237 if hdrs.CONTENT_LENGTH in self.headers:
1238 del self.headers[hdrs.CONTENT_LENGTH]
1239
1240 # Remove existing Transfer-Encoding header to avoid conflicts
1241 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers:
1242 del self.headers[hdrs.TRANSFER_ENCODING]
1243
1244 # Now update the body using the existing method
1245 # Called from _update_body, add 1 to stacklevel from caller
1246 self.update_body_from_data(body, _stacklevel=4)
1247
1248 # Update transfer encoding headers if needed (same logic as __init__)
1249 if body is not None or self.method not in self.GET_METHODS:
1250 self.update_transfer_encoding()
1251
1252 async def update_body(self, body: Any) -> None:
1253 """
1254 Update request body and close previous payload if needed.
1255
1256 This method safely updates the request body by first closing any existing
1257 payload to prevent resource leaks, then setting the new body.
1258
1259 IMPORTANT: Always use this method instead of setting request.body directly.
1260 Direct assignment to request.body will leak resources if the previous body
1261 contains file handles, streams, or other resources that need cleanup.
1262
1263 Args:
1264 body: The new body content. Can be:
1265 - bytes/bytearray: Raw binary data
1266 - str: Text data (will be encoded using charset from Content-Type)
1267 - FormData: Form data that will be encoded as multipart/form-data
1268 - Payload: A pre-configured payload object
1269 - AsyncIterable: An async iterable of bytes chunks
1270 - File-like object: Will be read and sent as binary data
1271 - None: Clears the body
1272
1273 Usage:
1274 # CORRECT: Use update_body
1275 await request.update_body(b"new request data")
1276
1277 # WRONG: Don't set body directly
1278 # request.body = b"new request data" # This will leak resources!
1279
1280 # Update with form data
1281 form_data = FormData()
1282 form_data.add_field('field', 'value')
1283 await request.update_body(form_data)
1284
1285 # Clear body
1286 await request.update_body(None)
1287
1288 Note:
1289 This method is async because it may need to close file handles or
1290 other resources associated with the previous payload. Always await
1291 this method to ensure proper cleanup.
1292
1293 Warning:
1294 Setting request.body directly is highly discouraged and can lead to:
1295 - Resource leaks (unclosed file handles, streams)
1296 - Memory leaks (unreleased buffers)
1297 - Unexpected behavior with streaming payloads
1298
1299 It is not recommended to change the payload type in middleware. If the
1300 body was already set (e.g., as bytes), it's best to keep the same type
1301 rather than converting it (e.g., to str) as this may result in unexpected
1302 behavior.
1303
1304 See Also:
1305 - update_body_from_data: Synchronous body update without cleanup
1306 - body property: Direct body access (STRONGLY DISCOURAGED)
1307
1308 """
1309 # Close existing payload if it exists and needs closing
1310 if self._body is not None:
1311 await self._body.close()
1312 self._update_body(body)
1313
1314 def update_expect_continue(self, expect: bool = False) -> None:
1315 if expect:
1316 self.headers[hdrs.EXPECT] = "100-continue"
1317 elif (
1318 hdrs.EXPECT in self.headers
1319 and self.headers[hdrs.EXPECT].lower() == "100-continue"
1320 ):
1321 expect = True
1322
1323 if expect:
1324 self._continue = self.loop.create_future()
1325
1326 def update_proxy(
1327 self,
1328 proxy: URL | None,
1329 proxy_auth: BasicAuth | None,
1330 proxy_headers: LooseHeaders | None,
1331 ) -> None:
1332 self.proxy = proxy
1333 if proxy is None:
1334 self.proxy_auth = None
1335 self.proxy_headers = None
1336 return
1337
1338 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
1339 raise ValueError("proxy_auth must be None or BasicAuth() tuple")
1340 self.proxy_auth = proxy_auth
1341
1342 if proxy_headers is not None and not isinstance(
1343 proxy_headers, (MultiDict, MultiDictProxy)
1344 ):
1345 proxy_headers = CIMultiDict(proxy_headers)
1346 self.proxy_headers = proxy_headers
1347
1348 async def write_bytes(
1349 self,
1350 writer: AbstractStreamWriter,
1351 conn: "Connection",
1352 content_length: int | None = None,
1353 ) -> None:
1354 """
1355 Write the request body to the connection stream.
1356
1357 This method handles writing different types of request bodies:
1358 1. Payload objects (using their specialized write_with_length method)
1359 2. Bytes/bytearray objects
1360 3. Iterable body content
1361
1362 Args:
1363 writer: The stream writer to write the body to
1364 conn: The connection being used for this request
1365 content_length: Optional maximum number of bytes to write from the body
1366 (None means write the entire body)
1367
1368 The method properly handles:
1369 - Waiting for 100-Continue responses if required
1370 - Content length constraints for chunked encoding
1371 - Error handling for network issues, cancellation, and other exceptions
1372 - Signaling EOF and timeout management
1373
1374 Raises:
1375 ClientOSError: When there's an OS-level error writing the body
1376 ClientConnectionError: When there's a general connection error
1377 asyncio.CancelledError: When the operation is cancelled
1378
1379 """
1380 # 100 response
1381 if self._continue is not None:
1382 # Force headers to be sent before waiting for 100-continue
1383 writer.send_headers()
1384 await writer.drain()
1385 await self._continue
1386
1387 protocol = conn.protocol
1388 assert protocol is not None
1389 try:
1390 # This should be a rare case but the
1391 # self._body can be set to None while
1392 # the task is being started or we wait above
1393 # for the 100-continue response.
1394 # The more likely case is we have an empty
1395 # payload, but 100-continue is still expected.
1396 if self._body is not None:
1397 await self._body.write_with_length(writer, content_length)
1398 except OSError as underlying_exc:
1399 reraised_exc = underlying_exc
1400
1401 # Distinguish between timeout and other OS errors for better error reporting
1402 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
1403 underlying_exc, asyncio.TimeoutError
1404 )
1405 if exc_is_not_timeout:
1406 reraised_exc = ClientOSError(
1407 underlying_exc.errno,
1408 f"Can not write request body for {self.url !s}",
1409 )
1410
1411 set_exception(protocol, reraised_exc, underlying_exc)
1412 except asyncio.CancelledError:
1413 # Body hasn't been fully sent, so connection can't be reused
1414 conn.close()
1415 raise
1416 except Exception as underlying_exc:
1417 set_exception(
1418 protocol,
1419 ClientConnectionError(
1420 "Failed to send bytes into the underlying connection "
1421 f"{conn !s}: {underlying_exc!r}",
1422 ),
1423 underlying_exc,
1424 )
1425 else:
1426 # Successfully wrote the body, signal EOF and start response timeout
1427 await writer.write_eof()
1428 protocol.start_timeout()
1429
1430 async def send(self, conn: "Connection") -> "ClientResponse":
1431 # Specify request target:
1432 # - CONNECT request must send authority form URI
1433 # - not CONNECT proxy must send absolute form URI
1434 # - most common is origin form URI
1435 if self.method == hdrs.METH_CONNECT:
1436 connect_host = self.url.host_subcomponent
1437 assert connect_host is not None
1438 path = f"{connect_host}:{self.url.port}"
1439 elif self.proxy and not self.is_ssl():
1440 path = str(self.url)
1441 else:
1442 path = self.url.raw_path_qs
1443
1444 protocol = conn.protocol
1445 assert protocol is not None
1446 writer = StreamWriter(
1447 protocol,
1448 self.loop,
1449 on_chunk_sent=(
1450 functools.partial(self._on_chunk_request_sent, self.method, self.url)
1451 if self._traces
1452 else None
1453 ),
1454 on_headers_sent=(
1455 functools.partial(self._on_headers_request_sent, self.method, self.url)
1456 if self._traces
1457 else None
1458 ),
1459 )
1460
1461 if self.compress:
1462 writer.enable_compression(self.compress) # type: ignore[arg-type]
1463
1464 if self.chunked is not None:
1465 writer.enable_chunking()
1466
1467 # set default content-type
1468 if (
1469 self.method in self.POST_METHODS
1470 and (
1471 self._skip_auto_headers is None
1472 or hdrs.CONTENT_TYPE not in self._skip_auto_headers
1473 )
1474 and hdrs.CONTENT_TYPE not in self.headers
1475 ):
1476 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
1477
1478 v = self.version
1479 if hdrs.CONNECTION not in self.headers:
1480 if conn._connector.force_close:
1481 if v == HttpVersion11:
1482 self.headers[hdrs.CONNECTION] = "close"
1483 elif v == HttpVersion10:
1484 self.headers[hdrs.CONNECTION] = "keep-alive"
1485
1486 # status + headers
1487 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"
1488
1489 # Buffer headers for potential coalescing with body
1490 await writer.write_headers(status_line, self.headers)
1491
1492 task: asyncio.Task[None] | None
1493 if self._body or self._continue is not None or protocol.writing_paused:
1494 coro = self.write_bytes(writer, conn, self._get_content_length())
1495 if sys.version_info >= (3, 12):
1496 # Optimization for Python 3.12, try to write
1497 # bytes immediately to avoid having to schedule
1498 # the task on the event loop.
1499 task = asyncio.Task(coro, loop=self.loop, eager_start=True)
1500 else:
1501 task = self.loop.create_task(coro)
1502 if task.done():
1503 task = None
1504 else:
1505 self._writer = task
1506 else:
1507 # We have nothing to write because
1508 # - there is no body
1509 # - the protocol does not have writing paused
1510 # - we are not waiting for a 100-continue response
1511 protocol.start_timeout()
1512 writer.set_eof()
1513 task = None
1514 response_class = self.response_class
1515 assert response_class is not None
1516 self.response = response_class(
1517 self.method,
1518 self.original_url,
1519 writer=task,
1520 continue100=self._continue,
1521 timer=self._timer,
1522 request_info=self.request_info,
1523 traces=self._traces,
1524 loop=self.loop,
1525 session=self._session,
1526 stream_writer=writer,
1527 )
1528 return self.response
1529
1530 async def close(self) -> None:
1531 if self.__writer is not None:
1532 try:
1533 await self.__writer
1534 except asyncio.CancelledError:
1535 if (
1536 sys.version_info >= (3, 11)
1537 and (task := asyncio.current_task())
1538 and task.cancelling()
1539 ):
1540 raise
1541
1542 def terminate(self) -> None:
1543 if self.__writer is not None:
1544 if not self.loop.is_closed():
1545 self.__writer.cancel()
1546 self.__writer.remove_done_callback(self.__reset_writer)
1547 self.__writer = None
1548
1549 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
1550 for trace in self._traces:
1551 await trace.send_request_chunk_sent(method, url, chunk)
1552
1553 async def _on_headers_request_sent(
1554 self, method: str, url: URL, headers: "CIMultiDict[str]"
1555 ) -> None:
1556 for trace in self._traces:
1557 await trace.send_request_headers(method, url, headers)