Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import codecs
3import contextlib
4import functools
5import io
6import re
7import sys
8import traceback
9import warnings
10from collections.abc import Callable, Iterable, Sequence
11from hashlib import md5, sha1, sha256
12from http.cookies import BaseCookie, SimpleCookie
13from types import MappingProxyType, TracebackType
14from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypedDict
16from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
17from yarl import URL, Query
19from . import hdrs, multipart, payload
20from ._cookie_helpers import (
21 parse_cookie_header,
22 parse_set_cookie_headers,
23 preserve_morsel_with_coded_value,
24)
25from .abc import AbstractStreamWriter
26from .base_protocol import BaseProtocol
27from .client_exceptions import (
28 ClientConnectionError,
29 ClientOSError,
30 ClientResponseError,
31 ContentTypeError,
32 InvalidURL,
33 ServerFingerprintMismatch,
34)
35from .compression_utils import HAS_BROTLI, HAS_ZSTD
36from .formdata import FormData
37from .helpers import (
38 _SENTINEL,
39 BaseTimerContext,
40 HeadersDictProxy,
41 HeadersMixin,
42 TimerNoop,
43 encode_basic_auth,
44 frozen_dataclass_decorator,
45 is_expected_content_type,
46 parse_mimetype,
47 reify,
48 sentinel,
49 set_exception,
50 set_result,
51)
52from .http import (
53 SERVER_SOFTWARE,
54 HttpProcessingError,
55 HttpVersion,
56 HttpVersion10,
57 HttpVersion11,
58 StreamWriter,
59)
60from .streams import StreamReader
61from .typedefs import DEFAULT_JSON_DECODER, JSONDecoder, RawHeaders
63try:
64 import ssl
65 from ssl import SSLContext
66except ImportError: # pragma: no cover
67 ssl = None # type: ignore[assignment]
68 SSLContext = object # type: ignore[misc,assignment]
71__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
74if TYPE_CHECKING:
75 from .client import ClientSession
76 from .connector import Connection
77 from .tracing import Trace
80_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
81_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
82_DIGITS_RE = re.compile(r"\d+", re.ASCII)
85def _gen_default_accept_encoding() -> str:
86 encodings = [
87 "gzip",
88 "deflate",
89 ]
90 if HAS_BROTLI:
91 encodings.append("br")
92 if HAS_ZSTD:
93 encodings.append("zstd")
94 return ", ".join(encodings)
97@frozen_dataclass_decorator
98class ContentDisposition:
99 type: str | None
100 parameters: "MappingProxyType[str, str]"
101 filename: str | None
104class _RequestInfo(NamedTuple):
105 url: URL
106 method: str
107 headers: "CIMultiDictProxy[str]"
108 real_url: URL
111class RequestInfo(_RequestInfo):
113 def __new__(
114 cls,
115 url: URL,
116 method: str,
117 headers: "CIMultiDictProxy[str]",
118 real_url: URL | _SENTINEL = sentinel,
119 ) -> "RequestInfo":
120 """Create a new RequestInfo instance.
122 For backwards compatibility, the real_url parameter is optional.
123 """
124 return tuple.__new__(
125 cls, (url, method, headers, url if real_url is sentinel else real_url)
126 )
129class Fingerprint:
130 HASHFUNC_BY_DIGESTLEN = {
131 16: md5,
132 20: sha1,
133 32: sha256,
134 }
136 def __init__(self, fingerprint: bytes) -> None:
137 digestlen = len(fingerprint)
138 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
139 if not hashfunc:
140 raise ValueError("fingerprint has invalid length")
141 elif hashfunc is md5 or hashfunc is sha1:
142 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.")
143 self._hashfunc = hashfunc
144 self._fingerprint = fingerprint
146 @property
147 def fingerprint(self) -> bytes:
148 return self._fingerprint
150 def check(self, transport: asyncio.Transport) -> None:
151 if not transport.get_extra_info("sslcontext"):
152 return
153 sslobj = transport.get_extra_info("ssl_object")
154 cert = sslobj.getpeercert(binary_form=True)
155 got = self._hashfunc(cert).digest()
156 if got != self._fingerprint:
157 host, port, *_ = transport.get_extra_info("peername")
158 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
161if ssl is not None:
162 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint)
163else: # pragma: no cover
164 SSL_ALLOWED_TYPES = (bool,) # type: ignore[unreachable]
167_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
168_SSL_SCHEMES = frozenset(("https", "wss"))
171# ConnectionKey is a NamedTuple because it is used as a key in a dict
172# and a set in the connector. Since a NamedTuple is a tuple it uses
173# the fast native tuple __hash__ and __eq__ implementation in CPython.
174class ConnectionKey(NamedTuple):
175 # the key should contain an information about used proxy / TLS
176 # to prevent reusing wrong connections from a pool
177 host: str
178 port: int | None
179 is_ssl: bool
180 ssl: SSLContext | bool | Fingerprint
181 proxy: URL | None
182 proxy_headers_hash: int | None # hash(CIMultiDict)
185class ClientResponse(HeadersMixin):
186 # Some of these attributes are None when created,
187 # but will be set by the start() method.
188 # As the end user will likely never see the None values, we cheat the types below.
189 # from the Status-Line of the response
190 version: HttpVersion | None = None # HTTP-Version
191 status: int = None # type: ignore[assignment] # Status-Code
192 reason: str | None = None # Reason-Phrase
194 content: StreamReader = None # type: ignore[assignment] # Payload stream
195 _body: bytes | None = None
196 _headers: HeadersDictProxy = None # type: ignore[assignment]
197 _history: tuple["ClientResponse", ...] = ()
198 _raw_headers: RawHeaders = None # type: ignore[assignment]
200 _connection: "Connection | None" = None # current connection
201 _cookies: SimpleCookie | None = None
202 _raw_cookie_headers: tuple[str, ...] | None = None
203 _continue: asyncio.Future[bool] | None = None
204 _source_traceback: traceback.StackSummary | None = None
205 _session: "ClientSession | None" = None
206 # set up by ClientRequest after ClientResponse object creation
207 # post-init stage allows to not change ctor signature
208 _closed = True # to allow __del__ for non-initialized properly response
209 _released = False
210 _in_context = False
212 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8"
214 __writer: asyncio.Task[None] | None = None
215 _stream_writer: AbstractStreamWriter | None = None
216 _output_size: int = 0
217 _upload_complete: asyncio.Future[None] | None = None
219 def __init__(
220 self,
221 method: str,
222 url: URL,
223 *,
224 writer: asyncio.Task[None] | None,
225 continue100: asyncio.Future[bool] | None,
226 timer: BaseTimerContext | None,
227 traces: Sequence["Trace"],
228 loop: asyncio.AbstractEventLoop,
229 session: "ClientSession | None",
230 request_headers: CIMultiDict[str],
231 original_url: URL,
232 stream_writer: AbstractStreamWriter,
233 **kwargs: object,
234 ) -> None:
235 # kwargs exists so authors of subclasses should expect to pass through unknown
236 # arguments. This allows us to safely add new arguments in future releases.
237 # But, we should never receive unknown arguments here in the parent class, this
238 # would indicate an argument has been named wrong or similar in the subclass.
239 assert not kwargs, "Unexpected arguments to ClientResponse"
240 # URL forbids subclasses, so a simple type check is enough.
241 assert type(url) is URL
243 self.method = method
245 self._real_url = url
246 self._url = url.with_fragment(None) if url.raw_fragment else url
247 if writer is None: # Request already sent
248 self._output_size = stream_writer.output_size
249 else:
250 self._stream_writer = stream_writer
251 self._writer = writer
252 if continue100 is not None:
253 self._continue = continue100
254 self._request_headers = request_headers
255 self._original_url = original_url
256 self._timer = timer if timer is not None else TimerNoop()
257 self._cache: dict[str, Any] = {}
258 self._traces = traces
259 self._loop = loop
260 # Save reference to _resolve_charset, so that get_encoding() will still
261 # work after the response has finished reading the body.
262 if session is not None:
263 # store a reference to session #1985
264 self._session = session
265 self._resolve_charset = session._resolve_charset
266 if loop.get_debug():
267 self._source_traceback = traceback.extract_stack(sys._getframe(1))
269 def __reset_writer(self, _: object = None) -> None:
270 self.__writer = None
271 if self._stream_writer is not None:
272 self._output_size = self._stream_writer.output_size
273 self._stream_writer = None
274 if self._upload_complete is not None and not self._upload_complete.done():
275 self._upload_complete.set_result(None)
277 @property
278 def _writer(self) -> asyncio.Task[None] | None:
279 """The writer task for streaming data.
281 _writer is only provided for backwards compatibility
282 for subclasses that may need to access it.
283 """
284 return self.__writer
286 @_writer.setter
287 def _writer(self, writer: asyncio.Task[None] | None) -> None:
288 """Set the writer task for streaming data."""
289 if self.__writer is not None:
290 self.__writer.remove_done_callback(self.__reset_writer)
291 self.__writer = writer
292 if writer is None:
293 return
294 if writer.done():
295 # The writer is already done, so we can clear it immediately.
296 self.__reset_writer()
297 else:
298 writer.add_done_callback(self.__reset_writer)
300 @property
301 def output_size(self) -> int:
302 """Number of bytes sent for this request."""
303 if self._stream_writer is not None:
304 return self._stream_writer.output_size
305 return self._output_size
307 @property
308 def upload_complete(self) -> "asyncio.Future[None]":
309 """Future set when the request body has been fully sent.
311 Already done when the request had no body or was written eagerly.
312 """
313 if self._upload_complete is None:
314 self._upload_complete = self._loop.create_future()
315 if self._stream_writer is None: # upload already finished
316 self._upload_complete.set_result(None)
317 return self._upload_complete
319 @property
320 def cookies(self) -> SimpleCookie:
321 if self._cookies is None:
322 if self._raw_cookie_headers is not None:
323 # Parse cookies for response.cookies (SimpleCookie for backward compatibility)
324 cookies = SimpleCookie()
325 # Use parse_set_cookie_headers for more lenient parsing that handles
326 # malformed cookies better than SimpleCookie.load
327 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers))
328 self._cookies = cookies
329 else:
330 self._cookies = SimpleCookie()
331 return self._cookies
333 @cookies.setter
334 def cookies(self, cookies: SimpleCookie) -> None:
335 self._cookies = cookies
336 # Generate raw cookie headers from the SimpleCookie
337 if cookies:
338 self._raw_cookie_headers = tuple(
339 morsel.OutputString() for morsel in cookies.values()
340 )
341 else:
342 self._raw_cookie_headers = None
344 @reify
345 def url(self) -> URL:
346 return self._url
348 @reify
349 def real_url(self) -> URL:
350 return self._real_url
352 @reify
353 def host(self) -> str:
354 assert self._url.host is not None
355 return self._url.host
357 @reify
358 def headers(self) -> HeadersDictProxy:
359 return self._headers
361 @reify
362 def raw_headers(self) -> RawHeaders:
363 return self._raw_headers
365 @reify
366 def request_info(self) -> RequestInfo:
367 # Build RequestInfo lazily from components
368 headers = CIMultiDictProxy(self._request_headers)
369 return tuple.__new__(
370 RequestInfo, (self._url, self.method, headers, self._original_url)
371 )
373 @reify
374 def content_disposition(self) -> ContentDisposition | None:
375 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
376 if raw is None:
377 return None
378 disposition_type, params_dct = multipart.parse_content_disposition(raw)
379 params = MappingProxyType(params_dct)
380 filename = multipart.content_disposition_filename(params)
381 return ContentDisposition(disposition_type, params, filename)
383 def __del__(self, _warnings: Any = warnings) -> None:
384 if self._closed:
385 return
387 if self._connection is not None:
388 self._connection.release()
389 self._cleanup_writer()
391 if self._loop.get_debug():
392 _warnings.warn(
393 f"Unclosed response {self!r}", ResourceWarning, source=self
394 )
395 context = {"client_response": self, "message": "Unclosed response"}
396 if self._source_traceback:
397 context["source_traceback"] = self._source_traceback
398 self._loop.call_exception_handler(context)
400 def __repr__(self) -> str:
401 out = io.StringIO()
402 ascii_encodable_url = str(self.url)
403 if self.reason:
404 ascii_encodable_reason = self.reason.encode(
405 "ascii", "backslashreplace"
406 ).decode("ascii")
407 else:
408 ascii_encodable_reason = "None"
409 print(
410 f"<ClientResponse({ascii_encodable_url}) [{self.status} {ascii_encodable_reason}]>",
411 file=out,
412 )
413 print(self.headers, file=out)
414 return out.getvalue()
416 @property
417 def connection(self) -> "Connection | None":
418 return self._connection
420 @reify
421 def history(self) -> tuple["ClientResponse", ...]:
422 """A sequence of responses, if redirects occurred."""
423 return self._history
425 @reify
426 def links(self) -> "MultiDictProxy[MultiDictProxy[str | URL]]":
427 links: MultiDict[MultiDictProxy[str | URL]] = MultiDict()
428 for val in self.headers.getall("link"):
429 match = re.match(r"\s*<(.*)>(.*)", val)
430 if match is None: # Malformed link
431 continue
432 url, params_str = match.groups()
433 params = params_str.split(";")[1:]
435 link: MultiDict[str | URL] = MultiDict()
437 for param in params:
438 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
439 if match is None: # Malformed param
440 continue
441 key, _, value, _ = match.groups()
443 link.add(key, value)
445 key = link.get("rel", url)
447 link.add("url", self.url.join(URL(url)))
449 links.add(str(key), MultiDictProxy(link))
451 return MultiDictProxy(links)
453 async def start(self, connection: "Connection") -> "ClientResponse":
454 """Start response processing."""
455 self._closed = False
456 self._protocol = connection.protocol
457 self._connection = connection
459 with self._timer:
460 while True:
461 # read response
462 try:
463 protocol = self._protocol
464 message, payload = await protocol.read() # type: ignore[union-attr]
465 except HttpProcessingError as exc:
466 raise ClientResponseError(
467 self.request_info,
468 self.history,
469 status=exc.code,
470 message=exc.message,
471 headers=exc.headers,
472 ) from exc
474 if message.code < 100 or message.code > 199 or message.code == 101:
475 break
477 if self._continue is not None:
478 set_result(self._continue, True)
479 self._continue = None
481 # payload eof handler
482 payload.on_eof(self._response_eof)
484 # response status
485 self.version = message.version
486 self.status = message.code
487 self.reason = message.reason
489 # headers
490 self._headers = message.headers
491 self._raw_headers = message.raw_headers
493 # payload
494 self.content = payload
496 # cookies
497 if cookie_hdrs := self.headers._md.getall(hdrs.SET_COOKIE, ()):
498 # Store raw cookie headers for CookieJar
499 self._raw_cookie_headers = tuple(cookie_hdrs)
500 return self
502 def _response_eof(self) -> None:
503 if self._closed:
504 return
506 # protocol could be None because connection could be detached
507 protocol = self._connection and self._connection.protocol
508 if protocol is not None and protocol.upgraded:
509 return
511 self._closed = True
512 self._cleanup_writer()
513 self._release_connection()
515 @property
516 def closed(self) -> bool:
517 return self._closed
519 def close(self) -> None:
520 if not self._released:
521 self._notify_content()
523 self._closed = True
524 if self._loop.is_closed():
525 return
527 self._cleanup_writer()
528 if self._connection is not None:
529 self._connection.close()
530 self._connection = None
532 def release(self) -> None:
533 if not self._released:
534 self._notify_content()
536 self._closed = True
538 self._cleanup_writer()
539 self._release_connection()
541 @property
542 def ok(self) -> bool:
543 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
545 This is **not** a check for ``200 OK`` but a check that the response
546 status is under 400.
547 """
548 return 400 > self.status
550 def raise_for_status(self) -> None:
551 if not self.ok:
552 # reason should always be not None for a started response
553 assert self.reason is not None
555 # If we're in a context we can rely on __aexit__() to release as the
556 # exception propagates.
557 if not self._in_context:
558 self.release()
560 raise ClientResponseError(
561 self.request_info,
562 self.history,
563 status=self.status,
564 message=self.reason,
565 headers=self.headers,
566 )
568 def _release_connection(self) -> None:
569 if self._connection is not None:
570 if self.__writer is None:
571 self._connection.release()
572 self._connection = None
573 else:
574 self.__writer.add_done_callback(lambda f: self._release_connection())
576 async def _wait_released(self) -> None:
577 if self.__writer is not None:
578 try:
579 await self.__writer
580 except asyncio.CancelledError:
581 if (
582 sys.version_info >= (3, 11)
583 and (task := asyncio.current_task())
584 and task.cancelling()
585 ):
586 raise
587 self._release_connection()
589 def _cleanup_writer(self) -> None:
590 if self.__writer is not None:
591 self.__writer.cancel()
592 if self._stream_writer is not None:
593 self._output_size = self._stream_writer.output_size
594 self._stream_writer = None
595 self._session = None
597 def _notify_content(self) -> None:
598 content = self.content
599 # content can be None here, but the types are cheated elsewhere.
600 if content and content.exception() is None: # type: ignore[truthy-bool]
601 set_exception(content, _CONNECTION_CLOSED_EXCEPTION)
602 self._released = True
604 async def wait_for_close(self) -> None:
605 if self.__writer is not None:
606 try:
607 await self.__writer
608 except asyncio.CancelledError:
609 if (
610 sys.version_info >= (3, 11)
611 and (task := asyncio.current_task())
612 and task.cancelling()
613 ):
614 raise
615 self.release()
617 async def read(self) -> bytes:
618 """Read response payload."""
619 if self._body is None:
620 try:
621 self._body = await self.content.read()
622 for trace in self._traces:
623 await trace.send_response_chunk_received(
624 self.method, self.url, self._body
625 )
626 except BaseException:
627 self.close()
628 raise
629 elif self._released: # Response explicitly released
630 raise ClientConnectionError("Connection closed")
632 protocol = self._connection and self._connection.protocol
633 if protocol is None or not protocol.upgraded:
634 await self._wait_released() # Underlying connection released
635 return self._body
637 def get_encoding(self) -> str:
638 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
639 mimetype = parse_mimetype(ctype)
641 encoding = mimetype.parameters.get("charset")
642 if encoding:
643 with contextlib.suppress(LookupError, ValueError):
644 return codecs.lookup(encoding).name
646 if mimetype.type == "application" and (
647 mimetype.subtype == "json" or mimetype.subtype == "rdap"
648 ):
649 # RFC 7159 states that the default encoding is UTF-8.
650 # RFC 7483 defines application/rdap+json
651 return "utf-8"
653 if self._body is None:
654 raise RuntimeError(
655 "Cannot compute fallback encoding of a not yet read body"
656 )
658 return self._resolve_charset(self, self._body)
660 async def text(self, encoding: str | None = None, errors: str = "strict") -> str:
661 """Read response payload and decode."""
662 await self.read()
664 if encoding is None:
665 encoding = self.get_encoding()
667 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
669 async def json(
670 self,
671 *,
672 encoding: str | None = None,
673 loads: JSONDecoder = DEFAULT_JSON_DECODER,
674 content_type: str | None = "application/json",
675 ) -> Any:
676 """Read and decodes JSON response."""
677 await self.read()
679 if content_type:
680 if not is_expected_content_type(self.content_type, content_type):
681 raise ContentTypeError(
682 self.request_info,
683 self.history,
684 status=self.status,
685 message=(
686 "Attempt to decode JSON with "
687 "unexpected mimetype: %s" % self.content_type
688 ),
689 headers=self.headers,
690 )
692 if encoding is None:
693 encoding = self.get_encoding()
695 return loads(self._body.decode(encoding)) # type: ignore[union-attr]
697 async def __aenter__(self) -> "ClientResponse":
698 self._in_context = True
699 return self
701 async def __aexit__(
702 self,
703 exc_type: type[BaseException] | None,
704 exc_val: BaseException | None,
705 exc_tb: TracebackType | None,
706 ) -> None:
707 self._in_context = False
708 # similar to _RequestContextManager, we do not need to check
709 # for exceptions, response object can close connection
710 # if state is broken
711 self.release()
712 await self.wait_for_close()
715class ClientRequestBase:
716 """An internal class for proxy requests."""
718 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
720 proxy: URL | None = None
721 response_class = ClientResponse
722 server_hostname: str | None = None # Needed in connector.py
723 version = HttpVersion11
724 _response = None
726 # These class defaults help create_autospec() work correctly.
727 # If autospec is improved in future, maybe these can be removed.
728 url = URL()
729 method = "GET"
731 _writer_task: asyncio.Task[None] | None = None # async task for streaming data
733 _skip_auto_headers: "CIMultiDict[None] | None" = None
735 # N.B.
736 # Adding __del__ method with self._writer closing doesn't make sense
737 # because _writer is instance method, thus it keeps a reference to self.
738 # Until writer has finished finalizer will not be called.
740 def __init__(
741 self,
742 method: str,
743 url: URL,
744 *,
745 headers: CIMultiDict[str],
746 loop: asyncio.AbstractEventLoop,
747 ssl: SSLContext | bool | Fingerprint,
748 trust_env: bool = False,
749 ):
750 if match := _CONTAINS_CONTROL_CHAR_RE.search(method):
751 raise ValueError(
752 f"Method cannot contain non-token characters {method!r} "
753 f"(found at least {match.group()!r})"
754 )
755 # URL forbids subclasses, so a simple type check is enough.
756 assert type(url) is URL, url
757 self.original_url = url
758 self.url = url.with_fragment(None) if url.raw_fragment else url
759 self.method = method.upper()
760 self.loop = loop
761 self._ssl = ssl
763 if loop.get_debug():
764 self._source_traceback = traceback.extract_stack(sys._getframe(1))
766 if not url.raw_host:
767 raise InvalidURL(url)
768 self._update_headers(headers)
769 if url.raw_user or url.raw_password:
770 self.headers[hdrs.AUTHORIZATION] = encode_basic_auth(
771 url.user or "", url.password or ""
772 )
774 def _reset_writer(self, _: object = None) -> None:
775 self._writer_task = None
777 def _get_content_length(self) -> int | None:
778 """Extract and validate Content-Length header value.
780 Returns parsed Content-Length value or None if not set.
781 Raises ValueError if header exists but cannot be parsed as an integer.
782 """
783 if hdrs.CONTENT_LENGTH not in self.headers:
784 return None
786 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
787 if not _DIGITS_RE.fullmatch(content_length_hdr):
788 raise ValueError(f"Invalid Content-Length header: {content_length_hdr!r}")
789 return int(content_length_hdr)
791 @property
792 def _writer(self) -> asyncio.Task[None] | None:
793 return self._writer_task
795 @_writer.setter
796 def _writer(self, writer: asyncio.Task[None]) -> None:
797 if self._writer_task is not None:
798 self._writer_task.remove_done_callback(self._reset_writer)
799 self._writer_task = writer
800 writer.add_done_callback(self._reset_writer)
802 def is_ssl(self) -> bool:
803 return self.url.scheme in _SSL_SCHEMES
805 @property
806 def ssl(self) -> "SSLContext | bool | Fingerprint":
807 return self._ssl
809 @property
810 def connection_key(self) -> ConnectionKey:
811 url = self.url
812 return tuple.__new__(
813 ConnectionKey,
814 (
815 url.raw_host or "",
816 url.port,
817 url.scheme in _SSL_SCHEMES,
818 self._ssl,
819 None,
820 None,
821 ),
822 )
824 def _update_headers(self, headers: CIMultiDict[str]) -> None:
825 """Update request headers."""
826 self.headers: CIMultiDict[str] = CIMultiDict()
828 # Build the host header
829 host = self.url.host_port_subcomponent
831 # host_port_subcomponent is None when the URL is a relative URL.
832 # but we know we do not have a relative URL here.
833 assert host is not None
834 self.headers[hdrs.HOST] = headers.pop(hdrs.HOST, host)
835 self.headers.extend(headers)
837 def _create_response(
838 self,
839 task: asyncio.Task[None] | None,
840 stream_writer: AbstractStreamWriter,
841 ) -> ClientResponse:
842 return self.response_class(
843 self.method,
844 self.original_url,
845 writer=task,
846 continue100=None,
847 timer=TimerNoop(),
848 traces=(),
849 loop=self.loop,
850 session=None,
851 request_headers=self.headers,
852 original_url=self.original_url,
853 stream_writer=stream_writer,
854 )
856 def _create_writer(self, protocol: BaseProtocol) -> StreamWriter:
857 return StreamWriter(protocol, self.loop)
859 def _should_write(self, protocol: BaseProtocol) -> bool:
860 return protocol.writing_paused
862 async def _send(self, conn: "Connection") -> ClientResponse:
863 # Specify request target:
864 # - CONNECT request must send authority form URI
865 # - not CONNECT proxy must send absolute form URI
866 # - most common is origin form URI
867 if self.method == hdrs.METH_CONNECT:
868 connect_host = self.url.host_subcomponent
869 assert connect_host is not None
870 path = f"{connect_host}:{self.url.port}"
871 elif self.proxy and not self.is_ssl():
872 path = str(self.url)
873 else:
874 path = self.url.raw_path_qs
876 protocol = conn.protocol
877 assert protocol is not None
878 writer = self._create_writer(protocol)
880 # set default content-type
881 if (
882 self.method in self.POST_METHODS
883 and (
884 self._skip_auto_headers is None
885 or hdrs.CONTENT_TYPE not in self._skip_auto_headers
886 )
887 and hdrs.CONTENT_TYPE not in self.headers
888 ):
889 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
891 v = self.version
892 if hdrs.CONNECTION not in self.headers:
893 if conn._connector.force_close:
894 if v == HttpVersion11:
895 self.headers[hdrs.CONNECTION] = "close"
896 elif v == HttpVersion10:
897 self.headers[hdrs.CONNECTION] = "keep-alive"
899 # status + headers
900 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"
902 # Buffer headers for potential coalescing with body
903 await writer.write_headers(status_line, self.headers)
905 task: asyncio.Task[None] | None
906 if self._should_write(protocol):
907 coro = self._write_bytes(writer, conn, self._get_content_length())
908 if sys.version_info >= (3, 12):
909 # Optimization for Python 3.12, try to write
910 # bytes immediately to avoid having to schedule
911 # the task on the event loop.
912 task = asyncio.Task(coro, loop=self.loop, eager_start=True)
913 else:
914 task = self.loop.create_task(coro)
915 if task.done():
916 task = None
917 else:
918 self._writer = task
919 else:
920 # We have nothing to write because
921 # - there is no body
922 # - the protocol does not have writing paused
923 # - we are not waiting for a 100-continue response
924 protocol.start_timeout()
925 writer.set_eof()
926 task = None
927 self._response = self._create_response(task, stream_writer=writer)
928 return self._response
930 async def _write_bytes(
931 self,
932 writer: AbstractStreamWriter,
933 conn: "Connection",
934 content_length: int | None,
935 ) -> None:
936 # Base class never has a body, this will never be run.
937 assert False
940class ClientRequestArgs(TypedDict, total=False):
941 params: Query
942 headers: CIMultiDict[str]
943 skip_auto_headers: Iterable[str] | None
944 data: Any
945 cookies: BaseCookie[str]
946 version: HttpVersion
947 compress: Literal["deflate", "gzip"] | bool
948 chunked: bool | None
949 expect100: bool
950 loop: asyncio.AbstractEventLoop
951 response_class: type[ClientResponse]
952 proxy: URL | None
953 timer: BaseTimerContext
954 session: "ClientSession"
955 ssl: SSLContext | bool | Fingerprint
956 proxy_headers: CIMultiDict[str] | None
957 traces: list["Trace"]
958 trust_env: bool
959 server_hostname: str | None
962class ClientRequest(ClientRequestBase):
963 _EMPTY_BODY = payload.PAYLOAD_REGISTRY.get(b"", disposition=None)
964 _body = _EMPTY_BODY
965 _continue = None # waiter future for '100 Continue' response
967 GET_METHODS = {
968 hdrs.METH_GET,
969 hdrs.METH_HEAD,
970 hdrs.METH_OPTIONS,
971 hdrs.METH_TRACE,
972 }
973 DEFAULT_HEADERS = {
974 hdrs.ACCEPT: "*/*",
975 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
976 }
978 def __init__(
979 self,
980 method: str,
981 url: URL,
982 *,
983 params: Query,
984 headers: CIMultiDict[str],
985 skip_auto_headers: Iterable[str] | None,
986 data: Any,
987 cookies: BaseCookie[str],
988 version: HttpVersion,
989 compress: Literal["deflate", "gzip"] | bool,
990 chunked: bool | None,
991 expect100: bool,
992 loop: asyncio.AbstractEventLoop,
993 response_class: type[ClientResponse],
994 proxy: URL | None,
995 timer: BaseTimerContext,
996 session: "ClientSession",
997 ssl: SSLContext | bool | Fingerprint,
998 proxy_headers: CIMultiDict[str] | None,
999 traces: list["Trace"],
1000 trust_env: bool,
1001 server_hostname: str | None,
1002 **kwargs: object,
1003 ):
1004 # kwargs exists so authors of subclasses should expect to pass through unknown
1005 # arguments. This allows us to safely add new arguments in future releases.
1006 # But, we should never receive unknown arguments here in the parent class, this
1007 # would indicate an argument has been named wrong or similar in the subclass.
1008 assert not kwargs, "Unexpected arguments to ClientRequest"
1010 if params:
1011 url = url.extend_query(params)
1012 super().__init__(method, url, headers=headers, loop=loop, ssl=ssl)
1014 if proxy is not None:
1015 assert type(proxy) is URL, proxy
1016 self._session = session
1017 self.chunked = chunked
1018 self.response_class = response_class
1019 self._timer = timer
1020 self.server_hostname = server_hostname
1021 self.version = version
1023 self._update_auto_headers(skip_auto_headers)
1024 self._update_cookies(cookies)
1025 self._update_content_encoding(data, compress)
1026 self._update_proxy(proxy, proxy_headers)
1028 self._update_body_from_data(data)
1029 if data is not None or self.method not in self.GET_METHODS:
1030 self._update_transfer_encoding()
1031 self._update_expect_continue(expect100)
1032 self._traces = traces
1034 @property
1035 def body(self) -> payload.Payload:
1036 return self._body
1038 @property
1039 def skip_auto_headers(self) -> CIMultiDict[None]:
1040 return self._skip_auto_headers or CIMultiDict()
1042 @property
1043 def connection_key(self) -> ConnectionKey:
1044 if proxy_headers := self.proxy_headers:
1045 h: int | None = hash(tuple(proxy_headers.items()))
1046 else:
1047 h = None
1048 url = self.url
1049 return tuple.__new__(
1050 ConnectionKey,
1051 (
1052 url.raw_host or "",
1053 url.port,
1054 url.scheme in _SSL_SCHEMES,
1055 self._ssl,
1056 self.proxy,
1057 h,
1058 ),
1059 )
1061 @property
1062 def session(self) -> "ClientSession":
1063 """Return the ClientSession instance.
1065 This property provides access to the ClientSession that initiated
1066 this request, allowing middleware to make additional requests
1067 using the same session.
1068 """
1069 return self._session
1071 def _update_auto_headers(self, skip_auto_headers: Iterable[str] | None) -> None:
1072 if skip_auto_headers is not None:
1073 self._skip_auto_headers = CIMultiDict(
1074 (hdr, None) for hdr in sorted(skip_auto_headers)
1075 )
1076 used_headers = self.headers.copy()
1077 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type]
1078 else:
1079 # Fast path when there are no headers to skip
1080 # which is the most common case.
1081 used_headers = self.headers
1083 for hdr, val in self.DEFAULT_HEADERS.items():
1084 if hdr not in used_headers:
1085 self.headers[hdr] = val
1087 if hdrs.USER_AGENT not in used_headers:
1088 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
1090 def _update_cookies(self, cookies: BaseCookie[str]) -> None:
1091 """Update request cookies header."""
1092 if not cookies:
1093 return
1095 c = SimpleCookie()
1096 if hdrs.COOKIE in self.headers:
1097 # parse_cookie_header for RFC 6265 compliant Cookie header parsing
1098 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, "")))
1099 del self.headers[hdrs.COOKIE]
1101 for name, value in cookies.items():
1102 # Use helper to preserve coded_value exactly as sent by server
1103 c[name] = preserve_morsel_with_coded_value(value)
1105 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
1107 def _update_content_encoding(
1108 self, data: Any, compress: bool | Literal["deflate", "gzip"]
1109 ) -> None:
1110 """Set request content encoding."""
1111 self.compress = None
1112 if not data:
1113 return
1115 if self.headers.get(hdrs.CONTENT_ENCODING):
1116 if compress:
1117 raise ValueError(
1118 "compress can not be set if Content-Encoding header is set"
1119 )
1120 elif compress:
1121 if isinstance(compress, str) and compress not in {"deflate", "gzip"}:
1122 raise ValueError(
1123 "compress must be one of True, False, 'deflate', or 'gzip'"
1124 )
1125 self.compress = compress if isinstance(compress, str) else "deflate"
1126 self.headers[hdrs.CONTENT_ENCODING] = self.compress
1127 self.chunked = True # enable chunked, no need to deal with length
1129 def _update_transfer_encoding(self) -> None:
1130 """Analyze transfer-encoding header."""
1131 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
1133 if "chunked" in te:
1134 if self.chunked:
1135 raise ValueError(
1136 "chunked can not be set "
1137 'if "Transfer-Encoding: chunked" header is set'
1138 )
1140 elif self.chunked:
1141 if hdrs.CONTENT_LENGTH in self.headers:
1142 raise ValueError(
1143 "chunked can not be set if Content-Length header is set"
1144 )
1146 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
1148 def _update_body_from_data(self, body: Any) -> None:
1149 """Update request body from data."""
1150 if body is None:
1151 self._body = self._EMPTY_BODY
1152 # Set Content-Length to 0 when body is None for methods that expect a body
1153 if (
1154 self.method not in self.GET_METHODS
1155 and not self.chunked
1156 and hdrs.CONTENT_LENGTH not in self.headers
1157 ):
1158 self.headers[hdrs.CONTENT_LENGTH] = "0"
1159 return
1161 # FormData
1162 if isinstance(body, FormData):
1163 body = body()
1164 else:
1165 try:
1166 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
1167 except payload.LookupError:
1168 boundary = None
1169 if hdrs.CONTENT_TYPE in self.headers:
1170 boundary = parse_mimetype(
1171 self.headers[hdrs.CONTENT_TYPE]
1172 ).parameters.get("boundary")
1173 body = FormData(body, boundary=boundary)()
1175 self._body = body
1177 # enable chunked encoding if needed
1178 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
1179 if (size := body.size) is not None:
1180 self.headers[hdrs.CONTENT_LENGTH] = str(size)
1181 else:
1182 self.chunked = True
1184 # copy payload headers
1185 assert body.headers
1186 headers = self.headers
1187 skip_headers = self._skip_auto_headers
1188 for key, value in body.headers.items():
1189 if key in headers or (skip_headers is not None and key in skip_headers):
1190 continue
1191 headers[key] = value
1193 def _update_body(self, body: Any) -> None:
1194 """Update request body after its already been set."""
1195 # Remove existing Content-Length header since body is changing
1196 if hdrs.CONTENT_LENGTH in self.headers:
1197 del self.headers[hdrs.CONTENT_LENGTH]
1199 # Remove existing Transfer-Encoding header to avoid conflicts
1200 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers:
1201 del self.headers[hdrs.TRANSFER_ENCODING]
1203 # Now update the body using the existing method
1204 self._update_body_from_data(body)
1206 # Update transfer encoding headers if needed (same logic as __init__)
1207 if body is not None or self.method not in self.GET_METHODS:
1208 self._update_transfer_encoding()
1210 async def update_body(self, body: Any) -> None:
1211 """
1212 Update request body and close previous payload if needed.
1214 This method safely updates the request body by first closing any existing
1215 payload to prevent resource leaks, then setting the new body.
1217 IMPORTANT: Always use this method instead of setting request.body directly.
1218 Direct assignment to request.body will leak resources if the previous body
1219 contains file handles, streams, or other resources that need cleanup.
1221 Args:
1222 body: The new body content. Can be:
1223 - bytes/bytearray: Raw binary data
1224 - str: Text data (will be encoded using charset from Content-Type)
1225 - FormData: Form data that will be encoded as multipart/form-data
1226 - Payload: A pre-configured payload object
1227 - AsyncIterable: An async iterable of bytes chunks
1228 - File-like object: Will be read and sent as binary data
1229 - None: Clears the body
1231 Usage:
1232 # CORRECT: Use update_body
1233 await request.update_body(b"new request data")
1235 # WRONG: Don't set body directly
1236 # request.body = b"new request data" # This will leak resources!
1238 # Update with form data
1239 form_data = FormData()
1240 form_data.add_field('field', 'value')
1241 await request.update_body(form_data)
1243 # Clear body
1244 await request.update_body(None)
1246 Note:
1247 This method is async because it may need to close file handles or
1248 other resources associated with the previous payload. Always await
1249 this method to ensure proper cleanup.
1251 Warning:
1252 Setting request.body directly is highly discouraged and can lead to:
1253 - Resource leaks (unclosed file handles, streams)
1254 - Memory leaks (unreleased buffers)
1255 - Unexpected behavior with streaming payloads
1257 It is not recommended to change the payload type in middleware. If the
1258 body was already set (e.g., as bytes), it's best to keep the same type
1259 rather than converting it (e.g., to str) as this may result in unexpected
1260 behavior.
1262 See Also:
1263 - update_body_from_data: Synchronous body update without cleanup
1264 - body property: Direct body access (STRONGLY DISCOURAGED)
1266 """
1267 # Close existing payload if it exists and needs closing
1268 if self._body is not None:
1269 await self._body.close()
1270 self._update_body(body)
1272 def _update_expect_continue(self, expect: bool = False) -> None:
1273 if expect:
1274 self.headers[hdrs.EXPECT] = "100-continue"
1275 elif (
1276 hdrs.EXPECT in self.headers
1277 and self.headers[hdrs.EXPECT].lower() == "100-continue"
1278 ):
1279 expect = True
1281 if expect:
1282 self._continue = self.loop.create_future()
1284 def _update_proxy(
1285 self,
1286 proxy: URL | None,
1287 proxy_headers: CIMultiDict[str] | None,
1288 ) -> None:
1289 if proxy is None:
1290 self.proxy = None
1291 self.proxy_headers = None
1292 return
1293 # URL-embedded credentials on the proxy map to Proxy-Authorization.
1294 if proxy.raw_user or proxy.raw_password:
1295 auth_header = encode_basic_auth(proxy.user or "", proxy.password or "")
1296 if proxy_headers is None:
1297 proxy_headers = CIMultiDict()
1298 proxy_headers.setdefault(hdrs.PROXY_AUTHORIZATION, auth_header)
1299 proxy = proxy.with_user(None)
1300 self.proxy = proxy
1301 self.proxy_headers = proxy_headers
1303 def _create_response(
1304 self,
1305 task: asyncio.Task[None] | None,
1306 stream_writer: AbstractStreamWriter,
1307 ) -> ClientResponse:
1308 return self.response_class(
1309 self.method,
1310 self.original_url,
1311 writer=task,
1312 continue100=self._continue,
1313 timer=self._timer,
1314 traces=self._traces,
1315 loop=self.loop,
1316 session=self._session,
1317 request_headers=self.headers,
1318 original_url=self.original_url,
1319 stream_writer=stream_writer,
1320 )
1322 def _create_writer(self, protocol: BaseProtocol) -> StreamWriter:
1323 writer = StreamWriter(
1324 protocol,
1325 self.loop,
1326 on_chunk_sent=(
1327 functools.partial(self._on_chunk_request_sent, self.method, self.url)
1328 if self._traces
1329 else None
1330 ),
1331 on_headers_sent=(
1332 functools.partial(self._on_headers_request_sent, self.method, self.url)
1333 if self._traces
1334 else None
1335 ),
1336 )
1338 if self.compress:
1339 writer.enable_compression(self.compress)
1341 if self.chunked is not None:
1342 writer.enable_chunking()
1343 return writer
1345 def _should_write(self, protocol: BaseProtocol) -> bool:
1346 return (
1347 self.body.size != 0 or self._continue is not None or protocol.writing_paused
1348 )
1350 async def _write_bytes(
1351 self,
1352 writer: AbstractStreamWriter,
1353 conn: "Connection",
1354 content_length: int | None,
1355 ) -> None:
1356 """
1357 Write the request body to the connection stream.
1359 This method handles writing different types of request bodies:
1360 1. Payload objects (using their specialized write_with_length method)
1361 2. Bytes/bytearray objects
1362 3. Iterable body content
1364 Args:
1365 writer: The stream writer to write the body to
1366 conn: The connection being used for this request
1367 content_length: Optional maximum number of bytes to write from the body
1368 (None means write the entire body)
1370 The method properly handles:
1371 - Waiting for 100-Continue responses if required
1372 - Content length constraints for chunked encoding
1373 - Error handling for network issues, cancellation, and other exceptions
1374 - Signaling EOF and timeout management
1376 Raises:
1377 ClientOSError: When there's an OS-level error writing the body
1378 ClientConnectionError: When there's a general connection error
1379 asyncio.CancelledError: When the operation is cancelled
1381 """
1382 # 100 response
1383 if self._continue is not None:
1384 # Force headers to be sent before waiting for 100-continue
1385 writer.send_headers()
1386 await writer.drain()
1387 await self._continue
1389 protocol = conn.protocol
1390 assert protocol is not None
1391 try:
1392 await self._body.write_with_length(writer, content_length)
1393 except OSError as underlying_exc:
1394 reraised_exc = underlying_exc
1396 # Distinguish between timeout and other OS errors for better error reporting
1397 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
1398 underlying_exc, asyncio.TimeoutError
1399 )
1400 if exc_is_not_timeout:
1401 reraised_exc = ClientOSError(
1402 underlying_exc.errno,
1403 f"Can not write request body for {self.url !s}",
1404 )
1406 set_exception(protocol, reraised_exc, underlying_exc)
1407 except asyncio.CancelledError:
1408 # Body hasn't been fully sent, so connection can't be reused
1409 conn.close()
1410 raise
1411 except Exception as underlying_exc:
1412 set_exception(
1413 protocol,
1414 ClientConnectionError(
1415 "Failed to send bytes into the underlying connection "
1416 f"{conn !s}: {underlying_exc!r}",
1417 ),
1418 underlying_exc,
1419 )
1420 else:
1421 # Successfully wrote the body, signal EOF and start response timeout
1422 await writer.write_eof()
1423 protocol.start_timeout()
1425 async def _close(self) -> None:
1426 if self._writer_task is not None:
1427 try:
1428 await self._writer_task
1429 except asyncio.CancelledError:
1430 if (
1431 sys.version_info >= (3, 11)
1432 and (task := asyncio.current_task())
1433 and task.cancelling()
1434 ):
1435 raise
1437 def _terminate(self) -> None:
1438 if self._writer_task is not None:
1439 if not self.loop.is_closed():
1440 self._writer_task.cancel()
1441 self._writer_task.remove_done_callback(self._reset_writer)
1442 self._writer_task = None
1444 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
1445 for trace in self._traces:
1446 await trace.send_request_chunk_sent(method, url, chunk)
1448 async def _on_headers_request_sent(
1449 self, method: str, url: URL, headers: "CIMultiDict[str]"
1450 ) -> None:
1451 for trace in self._traces:
1452 await trace.send_request_headers(method, url, headers)