Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import codecs
3import contextlib
4import functools
5import io
6import re
7import sys
8import traceback
9import warnings
10from hashlib import md5, sha1, sha256
11from http.cookies import CookieError, Morsel, SimpleCookie
12from types import MappingProxyType, TracebackType
13from typing import (
14 TYPE_CHECKING,
15 Any,
16 Callable,
17 Dict,
18 Iterable,
19 List,
20 Mapping,
21 NamedTuple,
22 Optional,
23 Tuple,
24 Type,
25 Union,
26)
28from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
29from yarl import URL
31from . import hdrs, helpers, http, multipart, payload
32from .abc import AbstractStreamWriter
33from .client_exceptions import (
34 ClientConnectionError,
35 ClientOSError,
36 ClientResponseError,
37 ContentTypeError,
38 InvalidURL,
39 ServerFingerprintMismatch,
40)
41from .compression_utils import HAS_BROTLI
42from .formdata import FormData
43from .hdrs import CONTENT_TYPE
44from .helpers import (
45 _SENTINEL,
46 BaseTimerContext,
47 BasicAuth,
48 HeadersMixin,
49 TimerNoop,
50 basicauth_from_netrc,
51 frozen_dataclass_decorator,
52 is_expected_content_type,
53 netrc_from_env,
54 parse_mimetype,
55 reify,
56 set_exception,
57 set_result,
58)
59from .http import (
60 SERVER_SOFTWARE,
61 HttpVersion,
62 HttpVersion10,
63 HttpVersion11,
64 StreamWriter,
65)
66from .log import client_logger
67from .streams import StreamReader
68from .typedefs import (
69 DEFAULT_JSON_DECODER,
70 JSONDecoder,
71 LooseCookies,
72 LooseHeaders,
73 Query,
74 RawHeaders,
75)
77if TYPE_CHECKING:
78 import ssl
79 from ssl import SSLContext
80else:
81 try:
82 import ssl
83 from ssl import SSLContext
84 except ImportError: # pragma: no cover
85 ssl = None # type: ignore[assignment]
86 SSLContext = object # type: ignore[misc,assignment]
89__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
92if TYPE_CHECKING:
93 from .client import ClientSession
94 from .connector import Connection
95 from .tracing import Trace
98_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
101def _gen_default_accept_encoding() -> str:
102 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate"
105@frozen_dataclass_decorator
106class ContentDisposition:
107 type: Optional[str]
108 parameters: "MappingProxyType[str, str]"
109 filename: Optional[str]
112class _RequestInfo(NamedTuple):
113 url: URL
114 method: str
115 headers: "CIMultiDictProxy[str]"
116 real_url: URL
119class RequestInfo(_RequestInfo):
121 def __new__(
122 cls,
123 url: URL,
124 method: str,
125 headers: "CIMultiDictProxy[str]",
126 real_url: URL = _SENTINEL, # type: ignore[assignment]
127 ) -> "RequestInfo":
128 """Create a new RequestInfo instance.
130 For backwards compatibility, the real_url parameter is optional.
131 """
132 return tuple.__new__(
133 cls, (url, method, headers, url if real_url is _SENTINEL else real_url)
134 )
137class Fingerprint:
138 HASHFUNC_BY_DIGESTLEN = {
139 16: md5,
140 20: sha1,
141 32: sha256,
142 }
144 def __init__(self, fingerprint: bytes) -> None:
145 digestlen = len(fingerprint)
146 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
147 if not hashfunc:
148 raise ValueError("fingerprint has invalid length")
149 elif hashfunc is md5 or hashfunc is sha1:
150 raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.")
151 self._hashfunc = hashfunc
152 self._fingerprint = fingerprint
154 @property
155 def fingerprint(self) -> bytes:
156 return self._fingerprint
158 def check(self, transport: asyncio.Transport) -> None:
159 if not transport.get_extra_info("sslcontext"):
160 return
161 sslobj = transport.get_extra_info("ssl_object")
162 cert = sslobj.getpeercert(binary_form=True)
163 got = self._hashfunc(cert).digest()
164 if got != self._fingerprint:
165 host, port, *_ = transport.get_extra_info("peername")
166 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
169if ssl is not None:
170 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint)
171else: # pragma: no cover
172 SSL_ALLOWED_TYPES = (bool,) # type: ignore[unreachable]
175_SSL_SCHEMES = frozenset(("https", "wss"))
178# ConnectionKey is a NamedTuple because it is used as a key in a dict
179# and a set in the connector. Since a NamedTuple is a tuple it uses
180# the fast native tuple __hash__ and __eq__ implementation in CPython.
181class ConnectionKey(NamedTuple):
182 # the key should contain an information about used proxy / TLS
183 # to prevent reusing wrong connections from a pool
184 host: str
185 port: Optional[int]
186 is_ssl: bool
187 ssl: Union[SSLContext, bool, Fingerprint]
188 proxy: Optional[URL]
189 proxy_auth: Optional[BasicAuth]
190 proxy_headers_hash: Optional[int] # hash(CIMultiDict)
193class ClientRequest:
194 GET_METHODS = {
195 hdrs.METH_GET,
196 hdrs.METH_HEAD,
197 hdrs.METH_OPTIONS,
198 hdrs.METH_TRACE,
199 }
200 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
201 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
203 DEFAULT_HEADERS = {
204 hdrs.ACCEPT: "*/*",
205 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
206 }
208 # Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
209 body: Any = b""
210 auth = None
211 response = None
213 # These class defaults help create_autospec() work correctly.
214 # If autospec is improved in future, maybe these can be removed.
215 url = URL()
216 method = "GET"
218 __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data
219 _continue = None # waiter future for '100 Continue' response
221 _skip_auto_headers: Optional["CIMultiDict[None]"] = None
223 # N.B.
224 # Adding __del__ method with self._writer closing doesn't make sense
225 # because _writer is instance method, thus it keeps a reference to self.
226 # Until writer has finished finalizer will not be called.
228 def __init__(
229 self,
230 method: str,
231 url: URL,
232 *,
233 params: Query = None,
234 headers: Optional[LooseHeaders] = None,
235 skip_auto_headers: Optional[Iterable[str]] = None,
236 data: Any = None,
237 cookies: Optional[LooseCookies] = None,
238 auth: Optional[BasicAuth] = None,
239 version: http.HttpVersion = http.HttpVersion11,
240 compress: Union[str, bool] = False,
241 chunked: Optional[bool] = None,
242 expect100: bool = False,
243 loop: asyncio.AbstractEventLoop,
244 response_class: Optional[Type["ClientResponse"]] = None,
245 proxy: Optional[URL] = None,
246 proxy_auth: Optional[BasicAuth] = None,
247 timer: Optional[BaseTimerContext] = None,
248 session: Optional["ClientSession"] = None,
249 ssl: Union[SSLContext, bool, Fingerprint] = True,
250 proxy_headers: Optional[LooseHeaders] = None,
251 traces: Optional[List["Trace"]] = None,
252 trust_env: bool = False,
253 server_hostname: Optional[str] = None,
254 ):
255 if match := _CONTAINS_CONTROL_CHAR_RE.search(method):
256 raise ValueError(
257 f"Method cannot contain non-token characters {method!r} "
258 f"(found at least {match.group()!r})"
259 )
260 # URL forbids subclasses, so a simple type check is enough.
261 assert type(url) is URL, url
262 if proxy is not None:
263 assert type(proxy) is URL, proxy
264 # FIXME: session is None in tests only, need to fix tests
265 # assert session is not None
266 if TYPE_CHECKING:
267 assert session is not None
268 self._session = session
269 if params:
270 url = url.extend_query(params)
271 self.original_url = url
272 self.url = url.with_fragment(None) if url.raw_fragment else url
273 self.method = method.upper()
274 self.chunked = chunked
275 self.loop = loop
276 self.length = None
277 if response_class is None:
278 real_response_class = ClientResponse
279 else:
280 real_response_class = response_class
281 self.response_class: Type[ClientResponse] = real_response_class
282 self._timer = timer if timer is not None else TimerNoop()
283 self._ssl = ssl
284 self.server_hostname = server_hostname
286 if loop.get_debug():
287 self._source_traceback = traceback.extract_stack(sys._getframe(1))
289 self.update_version(version)
290 self.update_host(url)
291 self.update_headers(headers)
292 self.update_auto_headers(skip_auto_headers)
293 self.update_cookies(cookies)
294 self.update_content_encoding(data, compress)
295 self.update_auth(auth, trust_env)
296 self.update_proxy(proxy, proxy_auth, proxy_headers)
298 self.update_body_from_data(data)
299 if data is not None or self.method not in self.GET_METHODS:
300 self.update_transfer_encoding()
301 self.update_expect_continue(expect100)
302 self._traces = [] if traces is None else traces
304 def __reset_writer(self, _: object = None) -> None:
305 self.__writer = None
307 def _get_content_length(self) -> Optional[int]:
308 """Extract and validate Content-Length header value.
310 Returns parsed Content-Length value or None if not set.
311 Raises ValueError if header exists but cannot be parsed as an integer.
312 """
313 if hdrs.CONTENT_LENGTH not in self.headers:
314 return None
316 content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
317 try:
318 return int(content_length_hdr)
319 except ValueError:
320 raise ValueError(
321 f"Invalid Content-Length header: {content_length_hdr}"
322 ) from None
324 @property
325 def skip_auto_headers(self) -> CIMultiDict[None]:
326 return self._skip_auto_headers or CIMultiDict()
328 @property
329 def _writer(self) -> Optional["asyncio.Task[None]"]:
330 return self.__writer
332 @_writer.setter
333 def _writer(self, writer: "asyncio.Task[None]") -> None:
334 if self.__writer is not None:
335 self.__writer.remove_done_callback(self.__reset_writer)
336 self.__writer = writer
337 writer.add_done_callback(self.__reset_writer)
339 def is_ssl(self) -> bool:
340 return self.url.scheme in _SSL_SCHEMES
342 @property
343 def ssl(self) -> Union["SSLContext", bool, Fingerprint]:
344 return self._ssl
346 @property
347 def connection_key(self) -> ConnectionKey: # type: ignore[misc]
348 if proxy_headers := self.proxy_headers:
349 h: Optional[int] = hash(tuple(proxy_headers.items()))
350 else:
351 h = None
352 url = self.url
353 return tuple.__new__(
354 ConnectionKey,
355 (
356 url.raw_host or "",
357 url.port,
358 url.scheme in _SSL_SCHEMES,
359 self._ssl,
360 self.proxy,
361 self.proxy_auth,
362 h,
363 ),
364 )
366 @property
367 def host(self) -> str:
368 ret = self.url.raw_host
369 assert ret is not None
370 return ret
372 @property
373 def port(self) -> Optional[int]:
374 return self.url.port
376 @property
377 def request_info(self) -> RequestInfo:
378 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
379 # These are created on every request, so we use a NamedTuple
380 # for performance reasons. We don't use the RequestInfo.__new__
381 # method because it has a different signature which is provided
382 # for backwards compatibility only.
383 return tuple.__new__(
384 RequestInfo, (self.url, self.method, headers, self.original_url)
385 )
387 @property
388 def session(self) -> "ClientSession":
389 """Return the ClientSession instance.
391 This property provides access to the ClientSession that initiated
392 this request, allowing middleware to make additional requests
393 using the same session.
394 """
395 return self._session
397 def update_host(self, url: URL) -> None:
398 """Update destination host, port and connection type (ssl)."""
399 # get host/port
400 if not url.raw_host:
401 raise InvalidURL(url)
403 # basic auth info
404 if url.raw_user or url.raw_password:
405 self.auth = helpers.BasicAuth(url.user or "", url.password or "")
407 def update_version(self, version: Union[http.HttpVersion, str]) -> None:
408 """Convert request version to two elements tuple.
410 parser HTTP version '1.1' => (1, 1)
411 """
412 if isinstance(version, str):
413 v = [part.strip() for part in version.split(".", 1)]
414 try:
415 version = http.HttpVersion(int(v[0]), int(v[1]))
416 except ValueError:
417 raise ValueError(
418 f"Can not parse http version number: {version}"
419 ) from None
420 self.version = version
422 def update_headers(self, headers: Optional[LooseHeaders]) -> None:
423 """Update request headers."""
424 self.headers: CIMultiDict[str] = CIMultiDict()
426 # Build the host header
427 host = self.url.host_port_subcomponent
429 # host_port_subcomponent is None when the URL is a relative URL.
430 # but we know we do not have a relative URL here.
431 assert host is not None
432 self.headers[hdrs.HOST] = host
434 if not headers:
435 return
437 if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
438 headers = headers.items()
440 for key, value in headers: # type: ignore[misc]
441 # A special case for Host header
442 if key in hdrs.HOST_ALL:
443 self.headers[key] = value
444 else:
445 self.headers.add(key, value)
447 def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None:
448 if skip_auto_headers is not None:
449 self._skip_auto_headers = CIMultiDict(
450 (hdr, None) for hdr in sorted(skip_auto_headers)
451 )
452 used_headers = self.headers.copy()
453 used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type]
454 else:
455 # Fast path when there are no headers to skip
456 # which is the most common case.
457 used_headers = self.headers
459 for hdr, val in self.DEFAULT_HEADERS.items():
460 if hdr not in used_headers:
461 self.headers[hdr] = val
463 if hdrs.USER_AGENT not in used_headers:
464 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
466 def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
467 """Update request cookies header."""
468 if not cookies:
469 return
471 c = SimpleCookie()
472 if hdrs.COOKIE in self.headers:
473 c.load(self.headers.get(hdrs.COOKIE, ""))
474 del self.headers[hdrs.COOKIE]
476 if isinstance(cookies, Mapping):
477 iter_cookies = cookies.items()
478 else:
479 iter_cookies = cookies # type: ignore[assignment]
480 for name, value in iter_cookies:
481 if isinstance(value, Morsel):
482 # Preserve coded_value
483 mrsl_val = value.get(value.key, Morsel())
484 mrsl_val.set(value.key, value.value, value.coded_value)
485 c[name] = mrsl_val
486 else:
487 c[name] = value # type: ignore[assignment]
489 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
491 def update_content_encoding(self, data: Any, compress: Union[bool, str]) -> None:
492 """Set request content encoding."""
493 self.compress = None
494 if not data:
495 return
497 if self.headers.get(hdrs.CONTENT_ENCODING):
498 if compress:
499 raise ValueError(
500 "compress can not be set if Content-Encoding header is set"
501 )
502 elif compress:
503 self.compress = compress if isinstance(compress, str) else "deflate"
504 self.headers[hdrs.CONTENT_ENCODING] = self.compress
505 self.chunked = True # enable chunked, no need to deal with length
507 def update_transfer_encoding(self) -> None:
508 """Analyze transfer-encoding header."""
509 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
511 if "chunked" in te:
512 if self.chunked:
513 raise ValueError(
514 "chunked can not be set "
515 'if "Transfer-Encoding: chunked" header is set'
516 )
518 elif self.chunked:
519 if hdrs.CONTENT_LENGTH in self.headers:
520 raise ValueError(
521 "chunked can not be set if Content-Length header is set"
522 )
524 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
525 else:
526 if hdrs.CONTENT_LENGTH not in self.headers:
527 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
529 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
530 """Set basic auth."""
531 if auth is None:
532 auth = self.auth
533 if auth is None and trust_env and self.url.host is not None:
534 netrc_obj = netrc_from_env()
535 with contextlib.suppress(LookupError):
536 auth = basicauth_from_netrc(netrc_obj, self.url.host)
537 if auth is None:
538 return
540 if not isinstance(auth, helpers.BasicAuth):
541 raise TypeError("BasicAuth() tuple is required instead")
543 self.headers[hdrs.AUTHORIZATION] = auth.encode()
545 def update_body_from_data(self, body: Any) -> None:
546 if body is None:
547 return
549 # FormData
550 if isinstance(body, FormData):
551 body = body()
553 try:
554 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
555 except payload.LookupError:
556 boundary = None
557 if CONTENT_TYPE in self.headers:
558 boundary = parse_mimetype(self.headers[CONTENT_TYPE]).parameters.get(
559 "boundary"
560 )
561 body = FormData(body, boundary=boundary)()
563 self.body = body
565 # enable chunked encoding if needed
566 if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
567 if (size := body.size) is not None:
568 self.headers[hdrs.CONTENT_LENGTH] = str(size)
569 else:
570 self.chunked = True
572 # copy payload headers
573 assert body.headers
574 headers = self.headers
575 skip_headers = self._skip_auto_headers
576 for key, value in body.headers.items():
577 if key in headers or (skip_headers is not None and key in skip_headers):
578 continue
579 headers[key] = value
581 def update_expect_continue(self, expect: bool = False) -> None:
582 if expect:
583 self.headers[hdrs.EXPECT] = "100-continue"
584 elif (
585 hdrs.EXPECT in self.headers
586 and self.headers[hdrs.EXPECT].lower() == "100-continue"
587 ):
588 expect = True
590 if expect:
591 self._continue = self.loop.create_future()
593 def update_proxy(
594 self,
595 proxy: Optional[URL],
596 proxy_auth: Optional[BasicAuth],
597 proxy_headers: Optional[LooseHeaders],
598 ) -> None:
599 self.proxy = proxy
600 if proxy is None:
601 self.proxy_auth = None
602 self.proxy_headers = None
603 return
605 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
606 raise ValueError("proxy_auth must be None or BasicAuth() tuple")
607 self.proxy_auth = proxy_auth
609 if proxy_headers is not None and not isinstance(
610 proxy_headers, (MultiDict, MultiDictProxy)
611 ):
612 proxy_headers = CIMultiDict(proxy_headers)
613 self.proxy_headers = proxy_headers
615 async def write_bytes(
616 self,
617 writer: AbstractStreamWriter,
618 conn: "Connection",
619 content_length: Optional[int],
620 ) -> None:
621 """
622 Write the request body to the connection stream.
624 This method handles writing different types of request bodies:
625 1. Payload objects (using their specialized write_with_length method)
626 2. Bytes/bytearray objects
627 3. Iterable body content
629 Args:
630 writer: The stream writer to write the body to
631 conn: The connection being used for this request
632 content_length: Optional maximum number of bytes to write from the body
633 (None means write the entire body)
635 The method properly handles:
636 - Waiting for 100-Continue responses if required
637 - Content length constraints for chunked encoding
638 - Error handling for network issues, cancellation, and other exceptions
639 - Signaling EOF and timeout management
641 Raises:
642 ClientOSError: When there's an OS-level error writing the body
643 ClientConnectionError: When there's a general connection error
644 asyncio.CancelledError: When the operation is cancelled
646 """
647 # 100 response
648 if self._continue is not None:
649 await writer.drain()
650 await self._continue
652 protocol = conn.protocol
653 assert protocol is not None
654 try:
655 if isinstance(self.body, payload.Payload):
656 # Specialized handling for Payload objects that know how to write themselves
657 await self.body.write_with_length(writer, content_length)
658 else:
659 # Handle bytes/bytearray by converting to an iterable for consistent handling
660 if isinstance(self.body, (bytes, bytearray)):
661 self.body = (self.body,)
663 if content_length is None:
664 # Write the entire body without length constraint
665 for chunk in self.body:
666 await writer.write(chunk)
667 else:
668 # Write with length constraint, respecting content_length limit
669 # If the body is larger than content_length, we truncate it
670 remaining_bytes = content_length
671 for chunk in self.body:
672 await writer.write(chunk[:remaining_bytes])
673 remaining_bytes -= len(chunk)
674 if remaining_bytes <= 0:
675 break
676 except OSError as underlying_exc:
677 reraised_exc = underlying_exc
679 # Distinguish between timeout and other OS errors for better error reporting
680 exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
681 underlying_exc, asyncio.TimeoutError
682 )
683 if exc_is_not_timeout:
684 reraised_exc = ClientOSError(
685 underlying_exc.errno,
686 f"Can not write request body for {self.url !s}",
687 )
689 set_exception(protocol, reraised_exc, underlying_exc)
690 except asyncio.CancelledError:
691 # Body hasn't been fully sent, so connection can't be reused
692 conn.close()
693 raise
694 except Exception as underlying_exc:
695 set_exception(
696 protocol,
697 ClientConnectionError(
698 "Failed to send bytes into the underlying connection "
699 f"{conn !s}: {underlying_exc!r}",
700 ),
701 underlying_exc,
702 )
703 else:
704 # Successfully wrote the body, signal EOF and start response timeout
705 await writer.write_eof()
706 protocol.start_timeout()
708 async def send(self, conn: "Connection") -> "ClientResponse":
709 # Specify request target:
710 # - CONNECT request must send authority form URI
711 # - not CONNECT proxy must send absolute form URI
712 # - most common is origin form URI
713 if self.method == hdrs.METH_CONNECT:
714 connect_host = self.url.host_subcomponent
715 assert connect_host is not None
716 path = f"{connect_host}:{self.url.port}"
717 elif self.proxy and not self.is_ssl():
718 path = str(self.url)
719 else:
720 path = self.url.raw_path_qs
722 protocol = conn.protocol
723 assert protocol is not None
724 writer = StreamWriter(
725 protocol,
726 self.loop,
727 on_chunk_sent=(
728 functools.partial(self._on_chunk_request_sent, self.method, self.url)
729 if self._traces
730 else None
731 ),
732 on_headers_sent=(
733 functools.partial(self._on_headers_request_sent, self.method, self.url)
734 if self._traces
735 else None
736 ),
737 )
739 if self.compress:
740 writer.enable_compression(self.compress)
742 if self.chunked is not None:
743 writer.enable_chunking()
745 # set default content-type
746 if (
747 self.method in self.POST_METHODS
748 and (
749 self._skip_auto_headers is None
750 or hdrs.CONTENT_TYPE not in self._skip_auto_headers
751 )
752 and hdrs.CONTENT_TYPE not in self.headers
753 ):
754 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
756 v = self.version
757 if hdrs.CONNECTION not in self.headers:
758 if conn._connector.force_close:
759 if v == HttpVersion11:
760 self.headers[hdrs.CONNECTION] = "close"
761 elif v == HttpVersion10:
762 self.headers[hdrs.CONNECTION] = "keep-alive"
764 # status + headers
765 status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"
766 await writer.write_headers(status_line, self.headers)
767 task: Optional["asyncio.Task[None]"]
768 if self.body or self._continue is not None or protocol.writing_paused:
769 coro = self.write_bytes(writer, conn, self._get_content_length())
770 if sys.version_info >= (3, 12):
771 # Optimization for Python 3.12, try to write
772 # bytes immediately to avoid having to schedule
773 # the task on the event loop.
774 task = asyncio.Task(coro, loop=self.loop, eager_start=True)
775 else:
776 task = self.loop.create_task(coro)
777 if task.done():
778 task = None
779 else:
780 self._writer = task
781 else:
782 # We have nothing to write because
783 # - there is no body
784 # - the protocol does not have writing paused
785 # - we are not waiting for a 100-continue response
786 protocol.start_timeout()
787 writer.set_eof()
788 task = None
789 response_class = self.response_class
790 assert response_class is not None
791 self.response = response_class(
792 self.method,
793 self.original_url,
794 writer=task,
795 continue100=self._continue,
796 timer=self._timer,
797 request_info=self.request_info,
798 traces=self._traces,
799 loop=self.loop,
800 session=self._session,
801 )
802 return self.response
804 async def close(self) -> None:
805 if self.__writer is not None:
806 try:
807 await self.__writer
808 except asyncio.CancelledError:
809 if (
810 sys.version_info >= (3, 11)
811 and (task := asyncio.current_task())
812 and task.cancelling()
813 ):
814 raise
816 def terminate(self) -> None:
817 if self.__writer is not None:
818 if not self.loop.is_closed():
819 self.__writer.cancel()
820 self.__writer.remove_done_callback(self.__reset_writer)
821 self.__writer = None
823 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
824 for trace in self._traces:
825 await trace.send_request_chunk_sent(method, url, chunk)
827 async def _on_headers_request_sent(
828 self, method: str, url: URL, headers: "CIMultiDict[str]"
829 ) -> None:
830 for trace in self._traces:
831 await trace.send_request_headers(method, url, headers)
834_CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
837class ClientResponse(HeadersMixin):
838 # Some of these attributes are None when created,
839 # but will be set by the start() method.
840 # As the end user will likely never see the None values, we cheat the types below.
841 # from the Status-Line of the response
842 version: Optional[HttpVersion] = None # HTTP-Version
843 status: int = None # type: ignore[assignment] # Status-Code
844 reason: Optional[str] = None # Reason-Phrase
846 content: StreamReader = None # type: ignore[assignment] # Payload stream
847 _body: Optional[bytes] = None
848 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment]
849 _history: Tuple["ClientResponse", ...] = ()
850 _raw_headers: RawHeaders = None # type: ignore[assignment]
852 _connection: Optional["Connection"] = None # current connection
853 _cookies: Optional[SimpleCookie] = None
854 _continue: Optional["asyncio.Future[bool]"] = None
855 _source_traceback: Optional[traceback.StackSummary] = None
856 _session: Optional["ClientSession"] = None
857 # set up by ClientRequest after ClientResponse object creation
858 # post-init stage allows to not change ctor signature
859 _closed = True # to allow __del__ for non-initialized properly response
860 _released = False
861 _in_context = False
863 _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8"
865 __writer: Optional["asyncio.Task[None]"] = None
867 def __init__(
868 self,
869 method: str,
870 url: URL,
871 *,
872 writer: "Optional[asyncio.Task[None]]",
873 continue100: Optional["asyncio.Future[bool]"],
874 timer: Optional[BaseTimerContext],
875 request_info: RequestInfo,
876 traces: List["Trace"],
877 loop: asyncio.AbstractEventLoop,
878 session: "ClientSession",
879 ) -> None:
880 # URL forbids subclasses, so a simple type check is enough.
881 assert type(url) is URL
883 self.method = method
885 self._real_url = url
886 self._url = url.with_fragment(None) if url.raw_fragment else url
887 if writer is not None:
888 self._writer = writer
889 if continue100 is not None:
890 self._continue = continue100
891 self._request_info = request_info
892 self._timer = timer if timer is not None else TimerNoop()
893 self._cache: Dict[str, Any] = {}
894 self._traces = traces
895 self._loop = loop
896 # Save reference to _resolve_charset, so that get_encoding() will still
897 # work after the response has finished reading the body.
898 # TODO: Fix session=None in tests (see ClientRequest.__init__).
899 if session is not None:
900 # store a reference to session #1985
901 self._session = session
902 self._resolve_charset = session._resolve_charset
903 if loop.get_debug():
904 self._source_traceback = traceback.extract_stack(sys._getframe(1))
906 def __reset_writer(self, _: object = None) -> None:
907 self.__writer = None
909 @property
910 def _writer(self) -> Optional["asyncio.Task[None]"]:
911 """The writer task for streaming data.
913 _writer is only provided for backwards compatibility
914 for subclasses that may need to access it.
915 """
916 return self.__writer
918 @_writer.setter
919 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
920 """Set the writer task for streaming data."""
921 if self.__writer is not None:
922 self.__writer.remove_done_callback(self.__reset_writer)
923 self.__writer = writer
924 if writer is None:
925 return
926 if writer.done():
927 # The writer is already done, so we can clear it immediately.
928 self.__writer = None
929 else:
930 writer.add_done_callback(self.__reset_writer)
932 @property
933 def cookies(self) -> SimpleCookie:
934 if self._cookies is None:
935 self._cookies = SimpleCookie()
936 return self._cookies
938 @cookies.setter
939 def cookies(self, cookies: SimpleCookie) -> None:
940 self._cookies = cookies
942 @reify
943 def url(self) -> URL:
944 return self._url
946 @reify
947 def real_url(self) -> URL:
948 return self._real_url
950 @reify
951 def host(self) -> str:
952 assert self._url.host is not None
953 return self._url.host
955 @reify
956 def headers(self) -> "CIMultiDictProxy[str]":
957 return self._headers
959 @reify
960 def raw_headers(self) -> RawHeaders:
961 return self._raw_headers
963 @reify
964 def request_info(self) -> RequestInfo:
965 return self._request_info
967 @reify
968 def content_disposition(self) -> Optional[ContentDisposition]:
969 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
970 if raw is None:
971 return None
972 disposition_type, params_dct = multipart.parse_content_disposition(raw)
973 params = MappingProxyType(params_dct)
974 filename = multipart.content_disposition_filename(params)
975 return ContentDisposition(disposition_type, params, filename)
977 def __del__(self, _warnings: Any = warnings) -> None:
978 if self._closed:
979 return
981 if self._connection is not None:
982 self._connection.release()
983 self._cleanup_writer()
985 if self._loop.get_debug():
986 _warnings.warn(
987 f"Unclosed response {self!r}", ResourceWarning, source=self
988 )
989 context = {"client_response": self, "message": "Unclosed response"}
990 if self._source_traceback:
991 context["source_traceback"] = self._source_traceback
992 self._loop.call_exception_handler(context)
994 def __repr__(self) -> str:
995 out = io.StringIO()
996 ascii_encodable_url = str(self.url)
997 if self.reason:
998 ascii_encodable_reason = self.reason.encode(
999 "ascii", "backslashreplace"
1000 ).decode("ascii")
1001 else:
1002 ascii_encodable_reason = "None"
1003 print(
1004 "<ClientResponse({}) [{} {}]>".format(
1005 ascii_encodable_url, self.status, ascii_encodable_reason
1006 ),
1007 file=out,
1008 )
1009 print(self.headers, file=out)
1010 return out.getvalue()
1012 @property
1013 def connection(self) -> Optional["Connection"]:
1014 return self._connection
1016 @reify
1017 def history(self) -> Tuple["ClientResponse", ...]:
1018 """A sequence of responses, if redirects occurred."""
1019 return self._history
1021 @reify
1022 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
1023 links_str = ", ".join(self.headers.getall("link", []))
1025 if not links_str:
1026 return MultiDictProxy(MultiDict())
1028 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict()
1030 for val in re.split(r",(?=\s*<)", links_str):
1031 match = re.match(r"\s*<(.*)>(.*)", val)
1032 if match is None: # Malformed link
1033 continue
1034 url, params_str = match.groups()
1035 params = params_str.split(";")[1:]
1037 link: MultiDict[Union[str, URL]] = MultiDict()
1039 for param in params:
1040 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
1041 if match is None: # Malformed param
1042 continue
1043 key, _, value, _ = match.groups()
1045 link.add(key, value)
1047 key = link.get("rel", url)
1049 link.add("url", self.url.join(URL(url)))
1051 links.add(str(key), MultiDictProxy(link))
1053 return MultiDictProxy(links)
1055 async def start(self, connection: "Connection") -> "ClientResponse":
1056 """Start response processing."""
1057 self._closed = False
1058 self._protocol = connection.protocol
1059 self._connection = connection
1061 with self._timer:
1062 while True:
1063 # read response
1064 try:
1065 protocol = self._protocol
1066 message, payload = await protocol.read() # type: ignore[union-attr]
1067 except http.HttpProcessingError as exc:
1068 raise ClientResponseError(
1069 self.request_info,
1070 self.history,
1071 status=exc.code,
1072 message=exc.message,
1073 headers=exc.headers,
1074 ) from exc
1076 if message.code < 100 or message.code > 199 or message.code == 101:
1077 break
1079 if self._continue is not None:
1080 set_result(self._continue, True)
1081 self._continue = None
1083 # payload eof handler
1084 payload.on_eof(self._response_eof)
1086 # response status
1087 self.version = message.version
1088 self.status = message.code
1089 self.reason = message.reason
1091 # headers
1092 self._headers = message.headers # type is CIMultiDictProxy
1093 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
1095 # payload
1096 self.content = payload
1098 # cookies
1099 if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()):
1100 cookies = SimpleCookie()
1101 for hdr in cookie_hdrs:
1102 try:
1103 cookies.load(hdr)
1104 except CookieError as exc:
1105 client_logger.warning("Can not load response cookies: %s", exc)
1106 self._cookies = cookies
1107 return self
1109 def _response_eof(self) -> None:
1110 if self._closed:
1111 return
1113 # protocol could be None because connection could be detached
1114 protocol = self._connection and self._connection.protocol
1115 if protocol is not None and protocol.upgraded:
1116 return
1118 self._closed = True
1119 self._cleanup_writer()
1120 self._release_connection()
1122 @property
1123 def closed(self) -> bool:
1124 return self._closed
1126 def close(self) -> None:
1127 if not self._released:
1128 self._notify_content()
1130 self._closed = True
1131 if self._loop.is_closed():
1132 return
1134 self._cleanup_writer()
1135 if self._connection is not None:
1136 self._connection.close()
1137 self._connection = None
1139 def release(self) -> None:
1140 if not self._released:
1141 self._notify_content()
1143 self._closed = True
1145 self._cleanup_writer()
1146 self._release_connection()
1148 @property
1149 def ok(self) -> bool:
1150 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
1152 This is **not** a check for ``200 OK`` but a check that the response
1153 status is under 400.
1154 """
1155 return 400 > self.status
1157 def raise_for_status(self) -> None:
1158 if not self.ok:
1159 # reason should always be not None for a started response
1160 assert self.reason is not None
1162 # If we're in a context we can rely on __aexit__() to release as the
1163 # exception propagates.
1164 if not self._in_context:
1165 self.release()
1167 raise ClientResponseError(
1168 self.request_info,
1169 self.history,
1170 status=self.status,
1171 message=self.reason,
1172 headers=self.headers,
1173 )
1175 def _release_connection(self) -> None:
1176 if self._connection is not None:
1177 if self.__writer is None:
1178 self._connection.release()
1179 self._connection = None
1180 else:
1181 self.__writer.add_done_callback(lambda f: self._release_connection())
1183 async def _wait_released(self) -> None:
1184 if self.__writer is not None:
1185 try:
1186 await self.__writer
1187 except asyncio.CancelledError:
1188 if (
1189 sys.version_info >= (3, 11)
1190 and (task := asyncio.current_task())
1191 and task.cancelling()
1192 ):
1193 raise
1194 self._release_connection()
1196 def _cleanup_writer(self) -> None:
1197 if self.__writer is not None:
1198 self.__writer.cancel()
1199 self._session = None
1201 def _notify_content(self) -> None:
1202 content = self.content
1203 # content can be None here, but the types are cheated elsewhere.
1204 if content and content.exception() is None: # type: ignore[truthy-bool]
1205 set_exception(content, _CONNECTION_CLOSED_EXCEPTION)
1206 self._released = True
1208 async def wait_for_close(self) -> None:
1209 if self.__writer is not None:
1210 try:
1211 await self.__writer
1212 except asyncio.CancelledError:
1213 if (
1214 sys.version_info >= (3, 11)
1215 and (task := asyncio.current_task())
1216 and task.cancelling()
1217 ):
1218 raise
1219 self.release()
1221 async def read(self) -> bytes:
1222 """Read response payload."""
1223 if self._body is None:
1224 try:
1225 self._body = await self.content.read()
1226 for trace in self._traces:
1227 await trace.send_response_chunk_received(
1228 self.method, self.url, self._body
1229 )
1230 except BaseException:
1231 self.close()
1232 raise
1233 elif self._released: # Response explicitly released
1234 raise ClientConnectionError("Connection closed")
1236 protocol = self._connection and self._connection.protocol
1237 if protocol is None or not protocol.upgraded:
1238 await self._wait_released() # Underlying connection released
1239 return self._body
1241 def get_encoding(self) -> str:
1242 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1243 mimetype = helpers.parse_mimetype(ctype)
1245 encoding = mimetype.parameters.get("charset")
1246 if encoding:
1247 with contextlib.suppress(LookupError, ValueError):
1248 return codecs.lookup(encoding).name
1250 if mimetype.type == "application" and (
1251 mimetype.subtype == "json" or mimetype.subtype == "rdap"
1252 ):
1253 # RFC 7159 states that the default encoding is UTF-8.
1254 # RFC 7483 defines application/rdap+json
1255 return "utf-8"
1257 if self._body is None:
1258 raise RuntimeError(
1259 "Cannot compute fallback encoding of a not yet read body"
1260 )
1262 return self._resolve_charset(self, self._body)
1264 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
1265 """Read response payload and decode."""
1266 await self.read()
1268 if encoding is None:
1269 encoding = self.get_encoding()
1271 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
1273 async def json(
1274 self,
1275 *,
1276 encoding: Optional[str] = None,
1277 loads: JSONDecoder = DEFAULT_JSON_DECODER,
1278 content_type: Optional[str] = "application/json",
1279 ) -> Any:
1280 """Read and decodes JSON response."""
1281 await self.read()
1283 if content_type:
1284 if not is_expected_content_type(self.content_type, content_type):
1285 raise ContentTypeError(
1286 self.request_info,
1287 self.history,
1288 status=self.status,
1289 message=(
1290 "Attempt to decode JSON with "
1291 "unexpected mimetype: %s" % self.content_type
1292 ),
1293 headers=self.headers,
1294 )
1296 if encoding is None:
1297 encoding = self.get_encoding()
1299 return loads(self._body.decode(encoding)) # type: ignore[union-attr]
1301 async def __aenter__(self) -> "ClientResponse":
1302 self._in_context = True
1303 return self
1305 async def __aexit__(
1306 self,
1307 exc_type: Optional[Type[BaseException]],
1308 exc_val: Optional[BaseException],
1309 exc_tb: Optional[TracebackType],
1310 ) -> None:
1311 self._in_context = False
1312 # similar to _RequestContextManager, we do not need to check
1313 # for exceptions, response object can close connection
1314 # if state is broken
1315 self.release()
1316 await self.wait_for_close()