Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import codecs
3import contextlib
4import functools
5import io
6import re
7import sys
8import traceback
9import warnings
10from collections.abc import Callable, Iterable, Sequence
11from hashlib import md5, sha1, sha256
12from http.cookies import BaseCookie, SimpleCookie
13from types import MappingProxyType, TracebackType
14from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypedDict
16from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
17from yarl import URL, Query
19from . import hdrs, multipart, payload
20from ._cookie_helpers import (
21 parse_cookie_header,
22 parse_set_cookie_headers,
23 preserve_morsel_with_coded_value,
24)
25from .abc import AbstractStreamWriter
26from .base_protocol import BaseProtocol
27from .client_exceptions import (
28 ClientConnectionError,
29 ClientOSError,
30 ClientResponseError,
31 ContentTypeError,
32 InvalidURL,
33 ServerFingerprintMismatch,
34)
35from .compression_utils import HAS_BROTLI, HAS_ZSTD
36from .formdata import FormData
37from .helpers import (
38 _SENTINEL,
39 BaseTimerContext,
40 HeadersDictProxy,
41 HeadersMixin,
42 TimerNoop,
43 encode_basic_auth,
44 frozen_dataclass_decorator,
45 is_expected_content_type,
46 parse_mimetype,
47 reify,
48 sentinel,
49 set_exception,
50 set_result,
51)
52from .http import (
53 SERVER_SOFTWARE,
54 HttpProcessingError,
55 HttpVersion,
56 HttpVersion10,
57 HttpVersion11,
58 StreamWriter,
59)
60from .streams import StreamReader
61from .typedefs import DEFAULT_JSON_DECODER, JSONDecoder, RawHeaders
63try:
64 import ssl
65 from ssl import SSLContext
66except ImportError: # pragma: no cover
67 ssl = None # type: ignore[assignment]
68 SSLContext = object # type: ignore[misc,assignment]
71__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
74if TYPE_CHECKING:
75 from .client import ClientSession
76 from .connector import Connection
77 from .tracing import Trace
80_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
81_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
82_DIGITS_RE = re.compile(r"\d+", re.ASCII)
85def _gen_default_accept_encoding() -> str:
86 encodings = [
87 "gzip",
88 "deflate",
89 ]
90 if HAS_BROTLI:
91 encodings.append("br")
92 if HAS_ZSTD:
93 encodings.append("zstd")
94 return ", ".join(encodings)
97@frozen_dataclass_decorator
98class ContentDisposition:
99 type: str | None
100 parameters: "MappingProxyType[str, str]"
101 filename: str | None
104class _RequestInfo(NamedTuple):
105 url: URL
106 method: str
107 headers: "CIMultiDictProxy[str]"
108 real_url: URL
111class RequestInfo(_RequestInfo):
113 def __new__(
114 cls,
115 url: URL,
116 method: str,
117 headers: "CIMultiDictProxy[str]",
118 real_url: URL | _SENTINEL = sentinel,
119 ) -> "RequestInfo":
120 """Create a new RequestInfo instance.
122 For backwards compatibility, the real_url parameter is optional.
123 """
124 return tuple.__new__(
125 cls, (url, method, headers, url if real_url is sentinel else real_url)
126 )
129class Fingerprint:
130 HASHFUNC_BY_DIGESTLEN = {
131 16: md5,
132 20: sha1,
133 32: sha256,
134 }
136 def __init__(self, fingerprint: bytes) -> None:
137 digestlen = len(fingerprint)
138 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
139 if not hashfunc:
140 raise ValueError("fingerprint has invalid length")
141 elif hashfunc is md5 or hashfunc is sha1:
142 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.")
143 self._hashfunc = hashfunc
144 self._fingerprint = fingerprint
146 @property
147 def fingerprint(self) -> bytes:
148 return self._fingerprint
150 def check(self, transport: asyncio.Transport) -> None:
151 if not transport.get_extra_info("sslcontext"):
152 return
153 sslobj = transport.get_extra_info("ssl_object")
154 cert = sslobj.getpeercert(binary_form=True)
155 got = self._hashfunc(cert).digest()
156 if got != self._fingerprint:
157 host, port, *_ = transport.get_extra_info("peername")
158 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
161if ssl is not None:
162 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint)
163else: # pragma: no cover
164 SSL_ALLOWED_TYPES = (bool,) # type: ignore[unreachable]
167_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
168_SSL_SCHEMES = frozenset(("https", "wss"))
171# ConnectionKey is a NamedTuple because it is used as a key in a dict
172# and a set in the connector. Since a NamedTuple is a tuple it uses
173# the fast native tuple __hash__ and __eq__ implementation in CPython.
174class ConnectionKey(NamedTuple):
175 # the key should contain an information about used proxy / TLS
176 # to prevent reusing wrong connections from a pool
177 host: str
178 port: int | None
179 is_ssl: bool
180 ssl: SSLContext | bool | Fingerprint
181 proxy: URL | None
182 proxy_headers_hash: int | None # hash(CIMultiDict)
183 server_hostname: str | None = None
186class ClientResponse(HeadersMixin):
187 # Some of these attributes are None when created,
188 # but will be set by the start() method.
189 # As the end user will likely never see the None values, we cheat the types below.
190 # from the Status-Line of the response
191 version: HttpVersion | None = None # HTTP-Version
192 status: int = None # type: ignore[assignment] # Status-Code
193 reason: str | None = None # Reason-Phrase
195 content: StreamReader = None # type: ignore[assignment] # Payload stream
196 _body: bytes | None = None
197 _headers: HeadersDictProxy = None # type: ignore[assignment]
198 _history: tuple["ClientResponse", ...] = ()
199 _raw_headers: RawHeaders = None # type: ignore[assignment]
201 _connection: "Connection | None" = None # current connection
202 _cookies: SimpleCookie | None = None
203 _raw_cookie_headers: tuple[str, ...] | None = None
204 _continue: asyncio.Future[bool] | None = None
205 _source_traceback: traceback.StackSummary | None = None
206 _session: "ClientSession | None" = None
207 # set up by ClientRequest after ClientResponse object creation
208 # post-init stage allows to not change ctor signature
209 _closed = True # to allow __del__ for non-initialized properly response
210 _released = False
211 _in_context = False
213 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8"
215 __writer: asyncio.Task[None] | None = None
216 _stream_writer: AbstractStreamWriter | None = None
217 _output_size: int = 0
218 _upload_complete: asyncio.Future[None] | None = None
220 def __init__(
221 self,
222 method: str,
223 url: URL,
224 *,
225 writer: asyncio.Task[None] | None,
226 continue100: asyncio.Future[bool] | None,
227 timer: BaseTimerContext | None,
228 traces: Sequence["Trace"],
229 loop: asyncio.AbstractEventLoop,
230 session: "ClientSession | None",
231 request_headers: CIMultiDict[str],
232 original_url: URL,
233 stream_writer: AbstractStreamWriter,
234 **kwargs: object,
235 ) -> None:
236 # kwargs exists so authors of subclasses should expect to pass through unknown
237 # arguments. This allows us to safely add new arguments in future releases.
238 # But, we should never receive unknown arguments here in the parent class, this
239 # would indicate an argument has been named wrong or similar in the subclass.
240 assert not kwargs, "Unexpected arguments to ClientResponse"
241 # URL forbids subclasses, so a simple type check is enough.
242 assert type(url) is URL
244 self.method = method
246 self._real_url = url
247 self._url = url.with_fragment(None) if url.raw_fragment else url
248 if writer is None: # Request already sent
249 self._output_size = stream_writer.output_size
250 else:
251 self._stream_writer = stream_writer
252 self._writer = writer
253 if continue100 is not None:
254 self._continue = continue100
255 self._request_headers = request_headers
256 self._original_url = original_url
257 self._timer = timer if timer is not None else TimerNoop()
258 self._cache: dict[str, Any] = {}
259 self._traces = traces
260 self._loop = loop
261 # Save reference to _resolve_charset, so that get_encoding() will still
262 # work after the response has finished reading the body.
263 if session is not None:
264 # store a reference to session #1985
265 self._session = session
266 self._resolve_charset = session._resolve_charset
267 if loop.get_debug():
268 self._source_traceback = traceback.extract_stack(sys._getframe(1))
270 def __reset_writer(self, _: object = None) -> None:
271 self.__writer = None
272 if self._stream_writer is not None:
273 self._output_size = self._stream_writer.output_size
274 self._stream_writer = None
275 if self._upload_complete is not None and not self._upload_complete.done():
276 self._upload_complete.set_result(None)
278 @property
279 def _writer(self) -> asyncio.Task[None] | None:
280 """The writer task for streaming data.
282 _writer is only provided for backwards compatibility
283 for subclasses that may need to access it.
284 """
285 return self.__writer
287 @_writer.setter
288 def _writer(self, writer: asyncio.Task[None] | None) -> None:
289 """Set the writer task for streaming data."""
290 if self.__writer is not None:
291 self.__writer.remove_done_callback(self.__reset_writer)
292 self.__writer = writer
293 if writer is None:
294 return
295 if writer.done():
296 # The writer is already done, so we can clear it immediately.
297 self.__reset_writer()
298 else:
299 writer.add_done_callback(self.__reset_writer)
301 @property
302 def output_size(self) -> int:
303 """Number of bytes sent for this request."""
304 if self._stream_writer is not None:
305 return self._stream_writer.output_size
306 return self._output_size
308 @property
309 def upload_complete(self) -> "asyncio.Future[None]":
310 """Future set when the request body has been fully sent.
312 Already done when the request had no body or was written eagerly.
313 """
314 if self._upload_complete is None:
315 self._upload_complete = self._loop.create_future()
316 if self._stream_writer is None: # upload already finished
317 self._upload_complete.set_result(None)
318 return self._upload_complete
320 @property
321 def cookies(self) -> SimpleCookie:
322 if self._cookies is None:
323 if self._raw_cookie_headers is not None:
324 # Parse cookies for response.cookies (SimpleCookie for backward compatibility)
325 cookies = SimpleCookie()
326 # Use parse_set_cookie_headers for more lenient parsing that handles
327 # malformed cookies better than SimpleCookie.load
328 cookies.update(parse_set_cookie_headers(self._raw_cookie_headers))
329 self._cookies = cookies
330 else:
331 self._cookies = SimpleCookie()
332 return self._cookies
334 @cookies.setter
335 def cookies(self, cookies: SimpleCookie) -> None:
336 self._cookies = cookies
337 # Generate raw cookie headers from the SimpleCookie
338 if cookies:
339 self._raw_cookie_headers = tuple(
340 morsel.OutputString() for morsel in cookies.values()
341 )
342 else:
343 self._raw_cookie_headers = None
345 @reify
346 def url(self) -> URL:
347 return self._url
349 @reify
350 def real_url(self) -> URL:
351 return self._real_url
353 @reify
354 def host(self) -> str:
355 assert self._url.host is not None
356 return self._url.host
358 @reify
359 def headers(self) -> HeadersDictProxy:
360 return self._headers
362 @reify
363 def raw_headers(self) -> RawHeaders:
364 return self._raw_headers
366 @reify
367 def request_info(self) -> RequestInfo:
368 # Build RequestInfo lazily from components
369 headers = CIMultiDictProxy(self._request_headers)
370 return tuple.__new__(
371 RequestInfo, (self._url, self.method, headers, self._original_url)
372 )
374 @reify
375 def content_disposition(self) -> ContentDisposition | None:
376 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
377 if raw is None:
378 return None
379 disposition_type, params_dct = multipart.parse_content_disposition(raw)
380 params = MappingProxyType(params_dct)
381 filename = multipart.content_disposition_filename(params)
382 return ContentDisposition(disposition_type, params, filename)
384 def __del__(self, _warnings: Any = warnings) -> None:
385 if self._closed:
386 return
388 if self._connection is not None:
389 self._connection.release()
390 self._cleanup_writer()
392 if self._loop.get_debug():
393 _warnings.warn(
394 f"Unclosed response {self!r}", ResourceWarning, source=self
395 )
396 context = {"client_response": self, "message": "Unclosed response"}
397 if self._source_traceback:
398 context["source_traceback"] = self._source_traceback
399 self._loop.call_exception_handler(context)
401 def __repr__(self) -> str:
402 out = io.StringIO()
403 ascii_encodable_url = str(self.url)
404 if self.reason:
405 ascii_encodable_reason = self.reason.encode(
406 "ascii", "backslashreplace"
407 ).decode("ascii")
408 else:
409 ascii_encodable_reason = "None"
410 print(
411 f"<ClientResponse({ascii_encodable_url}) [{self.status} {ascii_encodable_reason}]>",
412 file=out,
413 )
414 print(self.headers, file=out)
415 return out.getvalue()
417 @property
418 def connection(self) -> "Connection | None":
419 return self._connection
421 @reify
422 def history(self) -> tuple["ClientResponse", ...]:
423 """A sequence of responses, if redirects occurred."""
424 return self._history
426 @reify
427 def links(self) -> "MultiDictProxy[MultiDictProxy[str | URL]]":
428 links: MultiDict[MultiDictProxy[str | URL]] = MultiDict()
429 for val in self.headers.getall("link"):
430 match = re.match(r"\s*<(.*)>(.*)", val)
431 if match is None: # Malformed link
432 continue
433 url, params_str = match.groups()
434 params = params_str.split(";")[1:]
436 link: MultiDict[str | URL] = MultiDict()
438 for param in params:
439 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
440 if match is None: # Malformed param
441 continue
442 key, _, value, _ = match.groups()
444 link.add(key, value)
446 key = link.get("rel", url)
448 link.add("url", self.url.join(URL(url)))
450 links.add(str(key), MultiDictProxy(link))
452 return MultiDictProxy(links)
454 async def start(self, connection: "Connection") -> "ClientResponse":
455 """Start response processing."""
456 self._closed = False
457 self._protocol = connection.protocol
458 self._connection = connection
460 with self._timer:
461 while True:
462 # read response
463 try:
464 protocol = self._protocol
465 message, payload = await protocol.read() # type: ignore[union-attr]
466 except HttpProcessingError as exc:
467 raise ClientResponseError(
468 self.request_info,
469 self.history,
470 status=exc.code,
471 message=exc.message,
472 headers=exc.headers,
473 ) from exc
475 if message.code < 100 or message.code > 199 or message.code == 101:
476 break
478 if self._continue is not None:
479 set_result(self._continue, True)
480 self._continue = None
482 # payload eof handler
483 payload.on_eof(self._response_eof)
485 # response status
486 self.version = message.version
487 self.status = message.code
488 self.reason = message.reason
490 # headers
491 self._headers = message.headers
492 self._raw_headers = message.raw_headers
494 # payload
495 self.content = payload
497 # cookies
498 if cookie_hdrs := self.headers._md.getall(hdrs.SET_COOKIE, ()):
499 # Store raw cookie headers for CookieJar
500 self._raw_cookie_headers = tuple(cookie_hdrs)
501 return self
503 def _response_eof(self) -> None:
504 if self._closed:
505 return
507 # protocol could be None because connection could be detached
508 protocol = self._connection and self._connection.protocol
509 if protocol is not None and protocol.upgraded:
510 return
512 self._closed = True
513 self._cleanup_writer()
514 self._release_connection()
516 @property
517 def closed(self) -> bool:
518 return self._closed
520 def close(self) -> None:
521 if not self._released:
522 self._notify_content()
524 self._closed = True
525 if self._loop.is_closed():
526 return
528 self._cleanup_writer()
529 if self._connection is not None:
530 self._connection.close()
531 self._connection = None
533 def release(self) -> None:
534 if not self._released:
535 self._notify_content()
537 self._closed = True
539 self._cleanup_writer()
540 self._release_connection()
542 @property
543 def ok(self) -> bool:
544 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
546 This is **not** a check for ``200 OK`` but a check that the response
547 status is under 400.
548 """
549 return 400 > self.status
551 def raise_for_status(self) -> None:
552 if not self.ok:
553 # reason should always be not None for a started response
554 assert self.reason is not None
556 # If we're in a context we can rely on __aexit__() to release as the
557 # exception propagates.
558 if not self._in_context:
559 self.release()
561 raise ClientResponseError(
562 self.request_info,
563 self.history,
564 status=self.status,
565 message=self.reason,
566 headers=self.headers,
567 )
569 def _release_connection(self) -> None:
570 if self._connection is not None:
571 if self.__writer is None:
572 self._connection.release()
573 self._connection = None
574 else:
575 self.__writer.add_done_callback(lambda f: self._release_connection())
577 async def _wait_released(self) -> None:
578 if self.__writer is not None:
579 try:
580 await self.__writer
581 except asyncio.CancelledError:
582 if (
583 sys.version_info >= (3, 11)
584 and (task := asyncio.current_task())
585 and task.cancelling()
586 ):
587 raise
588 self._release_connection()
590 def _cleanup_writer(self) -> None:
591 if self.__writer is not None:
592 self.__writer.cancel()
593 if self._stream_writer is not None:
594 self._output_size = self._stream_writer.output_size
595 self._stream_writer = None
596 self._session = None
598 def _notify_content(self) -> None:
599 content = self.content
600 # content can be None here, but the types are cheated elsewhere.
601 if content and content.exception() is None: # type: ignore[truthy-bool]
602 set_exception(content, _CONNECTION_CLOSED_EXCEPTION)
603 self._released = True
605 async def wait_for_close(self) -> None:
606 if self.__writer is not None:
607 try:
608 await self.__writer
609 except asyncio.CancelledError:
610 if (
611 sys.version_info >= (3, 11)
612 and (task := asyncio.current_task())
613 and task.cancelling()
614 ):
615 raise
616 self.release()
618 async def read(self) -> bytes:
619 """Read response payload."""
620 if self._body is None:
621 try:
622 self._body = await self.content.read()
623 for trace in self._traces:
624 await trace.send_response_chunk_received(
625 self.method, self.url, self._body
626 )
627 except BaseException:
628 self.close()
629 raise
630 elif self._released: # Response explicitly released
631 raise ClientConnectionError("Connection closed")
633 protocol = self._connection and self._connection.protocol
634 if protocol is None or not protocol.upgraded:
635 await self._wait_released() # Underlying connection released
636 return self._body
638 def get_encoding(self) -> str:
639 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
640 mimetype = parse_mimetype(ctype)
642 encoding = mimetype.parameters.get("charset")
643 if encoding:
644 with contextlib.suppress(LookupError, ValueError):
645 return codecs.lookup(encoding).name
647 if mimetype.type == "application" and (
648 mimetype.subtype == "json" or mimetype.subtype == "rdap"
649 ):
650 # RFC 7159 states that the default encoding is UTF-8.
651 # RFC 7483 defines application/rdap+json
652 return "utf-8"
654 if self._body is None:
655 raise RuntimeError(
656 "Cannot compute fallback encoding of a not yet read body"
657 )
659 return self._resolve_charset(self, self._body)
661 async def text(self, encoding: str | None = None, errors: str = "strict") -> str:
662 """Read response payload and decode."""
663 await self.read()
665 if encoding is None:
666 encoding = self.get_encoding()
668 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
670 async def json(
671 self,
672 *,
673 encoding: str | None = None,
674 loads: JSONDecoder = DEFAULT_JSON_DECODER,
675 content_type: str | None = "application/json",
676 ) -> Any:
677 """Read and decodes JSON response."""
678 await self.read()
680 if content_type:
681 if not is_expected_content_type(self.content_type, content_type):
682 raise ContentTypeError(
683 self.request_info,
684 self.history,
685 status=self.status,
686 message=(
687 "Attempt to decode JSON with "
688 "unexpected mimetype: %s" % self.content_type
689 ),
690 headers=self.headers,
691 )
693 if encoding is None:
694 encoding = self.get_encoding()
696 return loads(self._body.decode(encoding)) # type: ignore[union-attr]
698 async def __aenter__(self) -> "ClientResponse":
699 self._in_context = True
700 return self
702 async def __aexit__(
703 self,
704 exc_type: type[BaseException] | None,
705 exc_val: BaseException | None,
706 exc_tb: TracebackType | None,
707 ) -> None:
708 self._in_context = False
709 # similar to _RequestContextManager, we do not need to check
710 # for exceptions, response object can close connection
711 # if state is broken
712 self.release()
713 await self.wait_for_close()
716class ClientRequestBase:
717 """An internal class for proxy requests."""
719 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
721 proxy: URL | None = None
722 response_class = ClientResponse
723 server_hostname: str | None = None # Needed in connector.py
724 version = HttpVersion11
725 _response = None
727 # These class defaults help create_autospec() work correctly.
728 # If autospec is improved in future, maybe these can be removed.
729 url = URL()
730 method = "GET"
732 _writer_task: asyncio.Task[None] | None = None # async task for streaming data
734 _skip_auto_headers: "CIMultiDict[None] | None" = None
736 # N.B.
737 # Adding __del__ method with self._writer closing doesn't make sense
738 # because _writer is instance method, thus it keeps a reference to self.
739 # Until writer has finished finalizer will not be called.
741 def __init__(
742 self,
743 method: str,
744 url: URL,
745 *,
746 headers: CIMultiDict[str],
747 loop: asyncio.AbstractEventLoop,
748 ssl: SSLContext | bool | Fingerprint,
749 trust_env: bool = False,
750 ):
751 if match := _CONTAINS_CONTROL_CHAR_RE.search(method):
752 raise ValueError(
753 f"Method cannot contain non-token characters {method!r} "
754 f"(found at least {match.group()!r})"
755 )
756 # URL forbids subclasses, so a simple type check is enough.
757 assert type(url) is URL, url
758 self.original_url = url
759 self.url = url.with_fragment(None) if url.raw_fragment else url
760 self.method = method.upper()
761 self.loop = loop
762 self._ssl = ssl
764 if loop.get_debug():
765 self._source_traceback = traceback.extract_stack(sys._getframe(1))
767 if not url.raw_host:
768 raise InvalidURL(url)
769 self._update_headers(headers)
770 if url.raw_user or url.raw_password:
771 self.headers[hdrs.AUTHORIZATION] = encode_basic_auth(
772 url.user or "", url.password or ""
773 )
775 def _reset_writer(self, _: object = None) -> None:
776 self._writer_task = None
778 def _get_content_length(self) -> int | None:
779 """Extract and validate Content-Length header value.
781 Returns parsed Content-Length value or None if not set.
782 Raises ValueError if header exists but cannot be parsed as an integer.
783 """
784 if hdrs.CONTENT_LENGTH not in self.headers:
785 return None
787 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
788 if not _DIGITS_RE.fullmatch(content_length_hdr):
789 raise ValueError(f"Invalid Content-Length header: {content_length_hdr!r}")
790 return int(content_length_hdr)
792 @property
793 def _writer(self) -> asyncio.Task[None] | None:
794 return self._writer_task
796 @_writer.setter
797 def _writer(self, writer: asyncio.Task[None]) -> None:
798 if self._writer_task is not None:
799 self._writer_task.remove_done_callback(self._reset_writer)
800 self._writer_task = writer
801 writer.add_done_callback(self._reset_writer)
803 def is_ssl(self) -> bool:
804 return self.url.scheme in _SSL_SCHEMES
806 @property
807 def ssl(self) -> "SSLContext | bool | Fingerprint":
808 return self._ssl
810 @property
811 def connection_key(self) -> ConnectionKey:
812 url = self.url
813 return tuple.__new__(
814 ConnectionKey,
815 (
816 url.raw_host or "",
817 url.port,
818 url.scheme in _SSL_SCHEMES,
819 self._ssl,
820 None,
821 None,
822 self.server_hostname,
823 ),
824 )
826 def _update_headers(self, headers: CIMultiDict[str]) -> None:
827 """Update request headers."""
828 self.headers: CIMultiDict[str] = CIMultiDict()
830 # Build the host header
831 host = self.url.host_port_subcomponent
833 # host_port_subcomponent is None when the URL is a relative URL.
834 # but we know we do not have a relative URL here.
835 assert host is not None
836 self.headers[hdrs.HOST] = headers.pop(hdrs.HOST, host)
837 self.headers.extend(headers)
839 def _create_response(
840 self,
841 task: asyncio.Task[None] | None,
842 stream_writer: AbstractStreamWriter,
843 ) -> ClientResponse:
844 return self.response_class(
845 self.method,
846 self.original_url,
847 writer=task,
848 continue100=None,
849 timer=TimerNoop(),
850 traces=(),
851 loop=self.loop,
852 session=None,
853 request_headers=self.headers,
854 original_url=self.original_url,
855 stream_writer=stream_writer,
856 )
858 def _create_writer(self, protocol: BaseProtocol) -> StreamWriter:
859 return StreamWriter(protocol, self.loop)
861 def _should_write(self, protocol: BaseProtocol) -> bool:
862 return protocol.writing_paused
864 async def _send(self, conn: "Connection") -> ClientResponse:
865 # Specify request target:
866 # - CONNECT request must send authority form URI
867 # - not CONNECT proxy must send absolute form URI
868 # - most common is origin form URI
869 if self.method == hdrs.METH_CONNECT:
870 connect_host = self.url.host_subcomponent
871 assert connect_host is not None
872 path = f"{connect_host}:{self.url.port}"
873 elif self.proxy and not self.is_ssl():
874 path = str(self.url)
875 else:
876 path = self.url.raw_path_qs
878 protocol = conn.protocol
879 assert protocol is not None
880 writer = self._create_writer(protocol)
882 # set default content-type
883 if (
884 self.method in self.POST_METHODS
885 and (
886 self._skip_auto_headers is None
887 or hdrs.CONTENT_TYPE not in self._skip_auto_headers
888 )
889 and hdrs.CONTENT_TYPE not in self.headers
890 ):
891 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
893 v = self.version
894 if hdrs.CONNECTION not in self.headers:
895 if conn._connector.force_close:
896 if v == HttpVersion11:
897 self.headers[hdrs.CONNECTION] = "close"
898 elif v == HttpVersion10:
899 self.headers[hdrs.CONNECTION] = "keep-alive"
901 # status + headers
902 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"
904 # Buffer headers for potential coalescing with body
905 await writer.write_headers(status_line, self.headers)
907 task: asyncio.Task[None] | None
908 if self._should_write(protocol):
909 coro = self._write_bytes(writer, conn, self._get_content_length())
910 if sys.version_info >= (3, 12):
911 # Optimization for Python 3.12, try to write
912 # bytes immediately to avoid having to schedule
913 # the task on the event loop.
914 task = asyncio.Task(coro, loop=self.loop, eager_start=True)
915 else:
916 task = self.loop.create_task(coro)
917 if task.done():
918 task = None
919 else:
920 self._writer = task
921 else:
922 # We have nothing to write because
923 # - there is no body
924 # - the protocol does not have writing paused
925 # - we are not waiting for a 100-continue response
926 protocol.start_timeout()
927 writer.set_eof()
928 task = None
929 self._response = self._create_response(task, stream_writer=writer)
930 return self._response
932 async def _write_bytes(
933 self,
934 writer: AbstractStreamWriter,
935 conn: "Connection",
936 content_length: int | None,
937 ) -> None:
938 # Base class never has a body, this will never be run.
939 assert False
942class ClientRequestArgs(TypedDict, total=False):
943 params: Query
944 headers: CIMultiDict[str]
945 skip_auto_headers: Iterable[str] | None
946 data: Any
947 cookies: BaseCookie[str]
948 version: HttpVersion
949 compress: Literal["deflate", "gzip"] | bool
950 chunked: bool | None
951 expect100: bool
952 loop: asyncio.AbstractEventLoop
953 response_class: type[ClientResponse]
954 proxy: URL | None
955 timer: BaseTimerContext
956 session: "ClientSession"
957 ssl: SSLContext | bool | Fingerprint
958 proxy_headers: CIMultiDict[str] | None
959 traces: list["Trace"]
960 trust_env: bool
961 server_hostname: str | None
964class ClientRequest(ClientRequestBase):
965 _EMPTY_BODY = payload.PAYLOAD_REGISTRY.get(b"", disposition=None)
966 _body = _EMPTY_BODY
967 _continue = None # waiter future for '100 Continue' response
969 GET_METHODS = {
970 hdrs.METH_GET,
971 hdrs.METH_HEAD,
972 hdrs.METH_OPTIONS,
973 hdrs.METH_TRACE,
974 }
975 DEFAULT_HEADERS = {
976 hdrs.ACCEPT: "*/*",
977 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
978 }
980 def __init__(
981 self,
982 method: str,
983 url: URL,
984 *,
985 params: Query,
986 headers: CIMultiDict[str],
987 skip_auto_headers: Iterable[str] | None,
988 data: Any,
989 cookies: BaseCookie[str],
990 version: HttpVersion,
991 compress: Literal["deflate", "gzip"] | bool,
992 chunked: bool | None,
993 expect100: bool,
994 loop: asyncio.AbstractEventLoop,
995 response_class: type[ClientResponse],
996 proxy: URL | None,
997 timer: BaseTimerContext,
998 session: "ClientSession",
999 ssl: SSLContext | bool | Fingerprint,
1000 proxy_headers: CIMultiDict[str] | None,
1001 traces: list["Trace"],
1002 trust_env: bool,
1003 server_hostname: str | None,
1004 **kwargs: object,
1005 ):
1006 # kwargs exists so authors of subclasses should expect to pass through unknown
1007 # arguments. This allows us to safely add new arguments in future releases.
1008 # But, we should never receive unknown arguments here in the parent class, this
1009 # would indicate an argument has been named wrong or similar in the subclass.
1010 assert not kwargs, "Unexpected arguments to ClientRequest"
1012 if params:
1013 url = url.extend_query(params)
1014 super().__init__(method, url, headers=headers, loop=loop, ssl=ssl)
1016 if proxy is not None:
1017 assert type(proxy) is URL, proxy
1018 self._session = session
1019 self.chunked = chunked
1020 self.response_class = response_class
1021 self._timer = timer
1022 self.server_hostname = server_hostname
1023 self.version = version
1025 self._update_auto_headers(skip_auto_headers)
1026 self._update_cookies(cookies)
1027 self._update_content_encoding(data, compress)
1028 self._update_proxy(proxy, proxy_headers)
1030 self._update_body_from_data(data)
1031 if data is not None or self.method not in self.GET_METHODS:
1032 self._update_transfer_encoding()
1033 self._update_expect_continue(expect100)
1034 self._traces = traces
1036 @property
1037 def body(self) -> payload.Payload:
1038 return self._body
1040 @property
1041 def skip_auto_headers(self) -> CIMultiDict[None]:
1042 return self._skip_auto_headers or CIMultiDict()
1044 @property
1045 def connection_key(self) -> ConnectionKey:
1046 if proxy_headers := self.proxy_headers:
1047 h: int | None = hash(tuple(proxy_headers.items()))
1048 else:
1049 h = None
1050 url = self.url
1051 return tuple.__new__(
1052 ConnectionKey,
1053 (
1054 url.raw_host or "",
1055 url.port,
1056 url.scheme in _SSL_SCHEMES,
1057 self._ssl,
1058 self.proxy,
1059 h,
1060 self.server_hostname,
1061 ),
1062 )
1064 @property
1065 def session(self) -> "ClientSession":
1066 """Return the ClientSession instance.
1068 This property provides access to the ClientSession that initiated
1069 this request, allowing middleware to make additional requests
1070 using the same session.
1071 """
1072 return self._session
1074 def _update_auto_headers(self, skip_auto_headers: Iterable[str] | None) -> None:
1075 if skip_auto_headers is not None:
1076 self._skip_auto_headers = CIMultiDict(
1077 (hdr, None) for hdr in sorted(skip_auto_headers)
1078 )
1079 used_headers = self.headers.copy()
1080 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type]
1081 else:
1082 # Fast path when there are no headers to skip
1083 # which is the most common case.
1084 used_headers = self.headers
1086 for hdr, val in self.DEFAULT_HEADERS.items():
1087 if hdr not in used_headers:
1088 self.headers[hdr] = val
1090 if hdrs.USER_AGENT not in used_headers:
1091 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
1093 def _update_cookies(self, cookies: BaseCookie[str]) -> None:
1094 """Update request cookies header."""
1095 if not cookies:
1096 return
1098 c = SimpleCookie()
1099 if hdrs.COOKIE in self.headers:
1100 # parse_cookie_header for RFC 6265 compliant Cookie header parsing
1101 c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, "")))
1102 del self.headers[hdrs.COOKIE]
1104 for name, value in cookies.items():
1105 # Use helper to preserve coded_value exactly as sent by server
1106 c[name] = preserve_morsel_with_coded_value(value)
1108 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
1110 def _update_content_encoding(
1111 self, data: Any, compress: bool | Literal["deflate", "gzip"]
1112 ) -> None:
1113 """Set request content encoding."""
1114 self.compress = None
1115 if not data:
1116 return
1118 if self.headers.get(hdrs.CONTENT_ENCODING):
1119 if compress:
1120 raise ValueError(
1121 "compress can not be set if Content-Encoding header is set"
1122 )
1123 elif compress:
1124 if isinstance(compress, str) and compress not in {"deflate", "gzip"}:
1125 raise ValueError(
1126 "compress must be one of True, False, 'deflate', or 'gzip'"
1127 )
1128 self.compress = compress if isinstance(compress, str) else "deflate"
1129 self.headers[hdrs.CONTENT_ENCODING] = self.compress
1130 self.chunked = True # enable chunked, no need to deal with length
1132 def _update_transfer_encoding(self) -> None:
1133 """Analyze transfer-encoding header."""
1134 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
1136 if "chunked" in te:
1137 if self.chunked:
1138 raise ValueError(
1139 "chunked can not be set "
1140 'if "Transfer-Encoding: chunked" header is set'
1141 )
1143 elif self.chunked:
1144 if hdrs.CONTENT_LENGTH in self.headers:
1145 raise ValueError(
1146 "chunked can not be set if Content-Length header is set"
1147 )
1149 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
1151 def _update_body_from_data(self, body: Any) -> None:
1152 """Update request body from data."""
1153 if body is None:
1154 self._body = self._EMPTY_BODY
1155 # Set Content-Length to 0 when body is None for methods that expect a body
1156 if (
1157 self.method not in self.GET_METHODS
1158 and not self.chunked
1159 and hdrs.CONTENT_LENGTH not in self.headers
1160 ):
1161 self.headers[hdrs.CONTENT_LENGTH] = "0"
1162 return
1164 # FormData
1165 if isinstance(body, FormData):
1166 body = body()
1167 else:
1168 try:
1169 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
1170 except payload.LookupError:
1171 boundary = None
1172 if hdrs.CONTENT_TYPE in self.headers:
1173 boundary = parse_mimetype(
1174 self.headers[hdrs.CONTENT_TYPE]
1175 ).parameters.get("boundary")
1176 body = FormData(body, boundary=boundary)()
1178 self._body = body
1180 # enable chunked encoding if needed
1181 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
1182 if (size := body.size) is not None:
1183 self.headers[hdrs.CONTENT_LENGTH] = str(size)
1184 else:
1185 self.chunked = True
1187 # copy payload headers
1188 assert body.headers
1189 headers = self.headers
1190 skip_headers = self._skip_auto_headers
1191 for key, value in body.headers.items():
1192 if key in headers or (skip_headers is not None and key in skip_headers):
1193 continue
1194 headers[key] = value
1196 def _update_body(self, body: Any) -> None:
1197 """Update request body after its already been set."""
1198 # Remove existing Content-Length header since body is changing
1199 if hdrs.CONTENT_LENGTH in self.headers:
1200 del self.headers[hdrs.CONTENT_LENGTH]
1202 # Remove existing Transfer-Encoding header to avoid conflicts
1203 if self.chunked and hdrs.TRANSFER_ENCODING in self.headers:
1204 del self.headers[hdrs.TRANSFER_ENCODING]
1206 # Now update the body using the existing method
1207 self._update_body_from_data(body)
1209 # Update transfer encoding headers if needed (same logic as __init__)
1210 if body is not None or self.method not in self.GET_METHODS:
1211 self._update_transfer_encoding()
1213 async def update_body(self, body: Any) -> None:
1214 """
1215 Update request body and close previous payload if needed.
1217 This method safely updates the request body by first closing any existing
1218 payload to prevent resource leaks, then setting the new body.
1220 IMPORTANT: Always use this method instead of setting request.body directly.
1221 Direct assignment to request.body will leak resources if the previous body
1222 contains file handles, streams, or other resources that need cleanup.
1224 Args:
1225 body: The new body content. Can be:
1226 - bytes/bytearray: Raw binary data
1227 - str: Text data (will be encoded using charset from Content-Type)
1228 - FormData: Form data that will be encoded as multipart/form-data
1229 - Payload: A pre-configured payload object
1230 - AsyncIterable: An async iterable of bytes chunks
1231 - File-like object: Will be read and sent as binary data
1232 - None: Clears the body
1234 Usage:
1235 # CORRECT: Use update_body
1236 await request.update_body(b"new request data")
1238 # WRONG: Don't set body directly
1239 # request.body = b"new request data" # This will leak resources!
1241 # Update with form data
1242 form_data = FormData()
1243 form_data.add_field('field', 'value')
1244 await request.update_body(form_data)
1246 # Clear body
1247 await request.update_body(None)
1249 Note:
1250 This method is async because it may need to close file handles or
1251 other resources associated with the previous payload. Always await
1252 this method to ensure proper cleanup.
1254 Warning:
1255 Setting request.body directly is highly discouraged and can lead to:
1256 - Resource leaks (unclosed file handles, streams)
1257 - Memory leaks (unreleased buffers)
1258 - Unexpected behavior with streaming payloads
1260 It is not recommended to change the payload type in middleware. If the
1261 body was already set (e.g., as bytes), it's best to keep the same type
1262 rather than converting it (e.g., to str) as this may result in unexpected
1263 behavior.
1265 See Also:
1266 - update_body_from_data: Synchronous body update without cleanup
1267 - body property: Direct body access (STRONGLY DISCOURAGED)
1269 """
1270 # Close existing payload if it exists and needs closing
1271 if self._body is not None:
1272 await self._body.close()
1273 self._update_body(body)
1275 def _update_expect_continue(self, expect: bool = False) -> None:
1276 if expect:
1277 self.headers[hdrs.EXPECT] = "100-continue"
1278 elif (
1279 hdrs.EXPECT in self.headers
1280 and self.headers[hdrs.EXPECT].lower() == "100-continue"
1281 ):
1282 expect = True
1284 if expect:
1285 self._continue = self.loop.create_future()
1287 def _update_proxy(
1288 self,
1289 proxy: URL | None,
1290 proxy_headers: CIMultiDict[str] | None,
1291 ) -> None:
1292 if proxy is None:
1293 self.proxy = None
1294 self.proxy_headers = None
1295 return
1296 # URL-embedded credentials on the proxy map to Proxy-Authorization.
1297 if proxy.raw_user or proxy.raw_password:
1298 auth_header = encode_basic_auth(proxy.user or "", proxy.password or "")
1299 if proxy_headers is None:
1300 proxy_headers = CIMultiDict()
1301 proxy_headers.setdefault(hdrs.PROXY_AUTHORIZATION, auth_header)
1302 proxy = proxy.with_user(None)
1303 self.proxy = proxy
1304 self.proxy_headers = proxy_headers
1306 def _create_response(
1307 self,
1308 task: asyncio.Task[None] | None,
1309 stream_writer: AbstractStreamWriter,
1310 ) -> ClientResponse:
1311 return self.response_class(
1312 self.method,
1313 self.original_url,
1314 writer=task,
1315 continue100=self._continue,
1316 timer=self._timer,
1317 traces=self._traces,
1318 loop=self.loop,
1319 session=self._session,
1320 request_headers=self.headers,
1321 original_url=self.original_url,
1322 stream_writer=stream_writer,
1323 )
1325 def _create_writer(self, protocol: BaseProtocol) -> StreamWriter:
1326 writer = StreamWriter(
1327 protocol,
1328 self.loop,
1329 on_chunk_sent=(
1330 functools.partial(self._on_chunk_request_sent, self.method, self.url)
1331 if self._traces
1332 else None
1333 ),
1334 on_headers_sent=(
1335 functools.partial(self._on_headers_request_sent, self.method, self.url)
1336 if self._traces
1337 else None
1338 ),
1339 )
1341 if self.compress:
1342 writer.enable_compression(self.compress)
1344 if self.chunked is not None:
1345 writer.enable_chunking()
1346 return writer
1348 def _should_write(self, protocol: BaseProtocol) -> bool:
1349 return (
1350 self.body.size != 0 or self._continue is not None or protocol.writing_paused
1351 )
1353 async def _write_bytes(
1354 self,
1355 writer: AbstractStreamWriter,
1356 conn: "Connection",
1357 content_length: int | None,
1358 ) -> None:
1359 """
1360 Write the request body to the connection stream.
1362 This method handles writing different types of request bodies:
1363 1. Payload objects (using their specialized write_with_length method)
1364 2. Bytes/bytearray objects
1365 3. Iterable body content
1367 Args:
1368 writer: The stream writer to write the body to
1369 conn: The connection being used for this request
1370 content_length: Optional maximum number of bytes to write from the body
1371 (None means write the entire body)
1373 The method properly handles:
1374 - Waiting for 100-Continue responses if required
1375 - Content length constraints for chunked encoding
1376 - Error handling for network issues, cancellation, and other exceptions
1377 - Signaling EOF and timeout management
1379 Raises:
1380 ClientOSError: When there's an OS-level error writing the body
1381 ClientConnectionError: When there's a general connection error
1382 asyncio.CancelledError: When the operation is cancelled
1384 """
1385 # 100 response
1386 if self._continue is not None:
1387 # Force headers to be sent before waiting for 100-continue
1388 writer.send_headers()
1389 await writer.drain()
1390 await self._continue
1392 protocol = conn.protocol
1393 assert protocol is not None
1394 try:
1395 await self._body.write_with_length(writer, content_length)
1396 except OSError as underlying_exc:
1397 reraised_exc = underlying_exc
1399 # Distinguish between timeout and other OS errors for better error reporting
1400 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
1401 underlying_exc, asyncio.TimeoutError
1402 )
1403 if exc_is_not_timeout:
1404 reraised_exc = ClientOSError(
1405 underlying_exc.errno,
1406 f"Can not write request body for {self.url !s}",
1407 )
1409 set_exception(protocol, reraised_exc, underlying_exc)
1410 except asyncio.CancelledError:
1411 # Body hasn't been fully sent, so connection can't be reused
1412 conn.close()
1413 raise
1414 except Exception as underlying_exc:
1415 set_exception(
1416 protocol,
1417 ClientConnectionError(
1418 "Failed to send bytes into the underlying connection "
1419 f"{conn !s}: {underlying_exc!r}",
1420 ),
1421 underlying_exc,
1422 )
1423 else:
1424 # Successfully wrote the body, signal EOF and start response timeout
1425 await writer.write_eof()
1426 protocol.start_timeout()
1428 async def _close(self) -> None:
1429 if self._writer_task is not None:
1430 try:
1431 await self._writer_task
1432 except asyncio.CancelledError:
1433 if (
1434 sys.version_info >= (3, 11)
1435 and (task := asyncio.current_task())
1436 and task.cancelling()
1437 ):
1438 raise
1440 def _terminate(self) -> None:
1441 if self._writer_task is not None:
1442 if not self.loop.is_closed():
1443 self._writer_task.cancel()
1444 self._writer_task.remove_done_callback(self._reset_writer)
1445 self._writer_task = None
1447 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
1448 for trace in self._traces:
1449 await trace.send_request_chunk_sent(method, url, chunk)
1451 async def _on_headers_request_sent(
1452 self, method: str, url: URL, headers: "CIMultiDict[str]"
1453 ) -> None:
1454 for trace in self._traces:
1455 await trace.send_request_headers(method, url, headers)