Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_reqrep.py: 41%
680 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import asyncio
2import codecs
3import contextlib
4import functools
5import io
6import re
7import sys
8import traceback
9import warnings
10from hashlib import md5, sha1, sha256
11from http.cookies import CookieError, Morsel, SimpleCookie
12from types import MappingProxyType, TracebackType
13from typing import (
14 TYPE_CHECKING,
15 Any,
16 Callable,
17 Dict,
18 Iterable,
19 List,
20 Literal,
21 Mapping,
22 Optional,
23 Tuple,
24 Type,
25 Union,
26 cast,
27)
29import attr
30from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
31from yarl import URL
33from . import hdrs, helpers, http, multipart, payload
34from .abc import AbstractStreamWriter
35from .client_exceptions import (
36 ClientConnectionError,
37 ClientOSError,
38 ClientResponseError,
39 ContentTypeError,
40 InvalidURL,
41 ServerFingerprintMismatch,
42)
43from .compression_utils import HAS_BROTLI
44from .formdata import FormData
45from .helpers import (
46 BaseTimerContext,
47 BasicAuth,
48 HeadersMixin,
49 TimerNoop,
50 basicauth_from_netrc,
51 netrc_from_env,
52 noop,
53 reify,
54 set_result,
55)
56from .http import (
57 SERVER_SOFTWARE,
58 HttpVersion,
59 HttpVersion10,
60 HttpVersion11,
61 StreamWriter,
62)
63from .log import client_logger
64from .streams import StreamReader
65from .typedefs import (
66 DEFAULT_JSON_DECODER,
67 JSONDecoder,
68 LooseCookies,
69 LooseHeaders,
70 RawHeaders,
71)
73try:
74 import ssl
75 from ssl import SSLContext
76except ImportError: # pragma: no cover
77 ssl = None # type: ignore[assignment]
78 SSLContext = object # type: ignore[misc,assignment]
81__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
84if TYPE_CHECKING: # pragma: no cover
85 from .client import ClientSession
86 from .connector import Connection
87 from .tracing import Trace
90_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
91json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json")
94def _gen_default_accept_encoding() -> str:
95 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate"
98@attr.s(auto_attribs=True, frozen=True, slots=True)
99class ContentDisposition:
100 type: Optional[str]
101 parameters: "MappingProxyType[str, str]"
102 filename: Optional[str]
105@attr.s(auto_attribs=True, frozen=True, slots=True)
106class RequestInfo:
107 url: URL
108 method: str
109 headers: "CIMultiDictProxy[str]"
110 real_url: URL = attr.ib()
112 @real_url.default
113 def real_url_default(self) -> URL:
114 return self.url
117class Fingerprint:
118 HASHFUNC_BY_DIGESTLEN = {
119 16: md5,
120 20: sha1,
121 32: sha256,
122 }
124 def __init__(self, fingerprint: bytes) -> None:
125 digestlen = len(fingerprint)
126 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
127 if not hashfunc:
128 raise ValueError("fingerprint has invalid length")
129 elif hashfunc is md5 or hashfunc is sha1:
130 raise ValueError(
131 "md5 and sha1 are insecure and " "not supported. Use sha256."
132 )
133 self._hashfunc = hashfunc
134 self._fingerprint = fingerprint
136 @property
137 def fingerprint(self) -> bytes:
138 return self._fingerprint
140 def check(self, transport: asyncio.Transport) -> None:
141 if not transport.get_extra_info("sslcontext"):
142 return
143 sslobj = transport.get_extra_info("ssl_object")
144 cert = sslobj.getpeercert(binary_form=True)
145 got = self._hashfunc(cert).digest()
146 if got != self._fingerprint:
147 host, port, *_ = transport.get_extra_info("peername")
148 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
151if ssl is not None:
152 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
153else: # pragma: no cover
154 SSL_ALLOWED_TYPES = type(None)
157def _merge_ssl_params(
158 ssl: Union["SSLContext", Literal[False], Fingerprint, None],
159 verify_ssl: Optional[bool],
160 ssl_context: Optional["SSLContext"],
161 fingerprint: Optional[bytes],
162) -> Union["SSLContext", Literal[False], Fingerprint, None]:
163 if verify_ssl is not None and not verify_ssl:
164 warnings.warn(
165 "verify_ssl is deprecated, use ssl=False instead",
166 DeprecationWarning,
167 stacklevel=3,
168 )
169 if ssl is not None:
170 raise ValueError(
171 "verify_ssl, ssl_context, fingerprint and ssl "
172 "parameters are mutually exclusive"
173 )
174 else:
175 ssl = False
176 if ssl_context is not None:
177 warnings.warn(
178 "ssl_context is deprecated, use ssl=context instead",
179 DeprecationWarning,
180 stacklevel=3,
181 )
182 if ssl is not None:
183 raise ValueError(
184 "verify_ssl, ssl_context, fingerprint and ssl "
185 "parameters are mutually exclusive"
186 )
187 else:
188 ssl = ssl_context
189 if fingerprint is not None:
190 warnings.warn(
191 "fingerprint is deprecated, " "use ssl=Fingerprint(fingerprint) instead",
192 DeprecationWarning,
193 stacklevel=3,
194 )
195 if ssl is not None:
196 raise ValueError(
197 "verify_ssl, ssl_context, fingerprint and ssl "
198 "parameters are mutually exclusive"
199 )
200 else:
201 ssl = Fingerprint(fingerprint)
202 if not isinstance(ssl, SSL_ALLOWED_TYPES):
203 raise TypeError(
204 "ssl should be SSLContext, bool, Fingerprint or None, "
205 "got {!r} instead.".format(ssl)
206 )
207 return ssl
210@attr.s(auto_attribs=True, slots=True, frozen=True)
211class ConnectionKey:
212 # the key should contain an information about used proxy / TLS
213 # to prevent reusing wrong connections from a pool
214 host: str
215 port: Optional[int]
216 is_ssl: bool
217 ssl: Union[SSLContext, None, Literal[False], Fingerprint]
218 proxy: Optional[URL]
219 proxy_auth: Optional[BasicAuth]
220 proxy_headers_hash: Optional[int] # hash(CIMultiDict)
223def _is_expected_content_type(
224 response_content_type: str, expected_content_type: str
225) -> bool:
226 if expected_content_type == "application/json":
227 return json_re.match(response_content_type) is not None
228 return expected_content_type in response_content_type
231class ClientRequest:
232 GET_METHODS = {
233 hdrs.METH_GET,
234 hdrs.METH_HEAD,
235 hdrs.METH_OPTIONS,
236 hdrs.METH_TRACE,
237 }
238 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
239 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
241 DEFAULT_HEADERS = {
242 hdrs.ACCEPT: "*/*",
243 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
244 }
246 body = b""
247 auth = None
248 response = None
250 __writer = None # async task for streaming data
251 _continue = None # waiter future for '100 Continue' response
253 # N.B.
254 # Adding __del__ method with self._writer closing doesn't make sense
255 # because _writer is instance method, thus it keeps a reference to self.
256 # Until writer has finished finalizer will not be called.
258 def __init__(
259 self,
260 method: str,
261 url: URL,
262 *,
263 params: Optional[Mapping[str, str]] = None,
264 headers: Optional[LooseHeaders] = None,
265 skip_auto_headers: Iterable[str] = frozenset(),
266 data: Any = None,
267 cookies: Optional[LooseCookies] = None,
268 auth: Optional[BasicAuth] = None,
269 version: http.HttpVersion = http.HttpVersion11,
270 compress: Optional[str] = None,
271 chunked: Optional[bool] = None,
272 expect100: bool = False,
273 loop: Optional[asyncio.AbstractEventLoop] = None,
274 response_class: Optional[Type["ClientResponse"]] = None,
275 proxy: Optional[URL] = None,
276 proxy_auth: Optional[BasicAuth] = None,
277 timer: Optional[BaseTimerContext] = None,
278 session: Optional["ClientSession"] = None,
279 ssl: Union[SSLContext, Literal[False], Fingerprint, None] = None,
280 proxy_headers: Optional[LooseHeaders] = None,
281 traces: Optional[List["Trace"]] = None,
282 trust_env: bool = False,
283 server_hostname: Optional[str] = None,
284 ):
285 if loop is None:
286 loop = asyncio.get_event_loop()
288 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
289 if match:
290 raise ValueError(
291 f"Method cannot contain non-token characters {method!r} "
292 "(found at least {match.group()!r})"
293 )
295 assert isinstance(url, URL), url
296 assert isinstance(proxy, (URL, type(None))), proxy
297 # FIXME: session is None in tests only, need to fix tests
298 # assert session is not None
299 self._session = cast("ClientSession", session)
300 if params:
301 q = MultiDict(url.query)
302 url2 = url.with_query(params)
303 q.extend(url2.query)
304 url = url.with_query(q)
305 self.original_url = url
306 self.url = url.with_fragment(None)
307 self.method = method.upper()
308 self.chunked = chunked
309 self.compress = compress
310 self.loop = loop
311 self.length = None
312 if response_class is None:
313 real_response_class = ClientResponse
314 else:
315 real_response_class = response_class
316 self.response_class: Type[ClientResponse] = real_response_class
317 self._timer = timer if timer is not None else TimerNoop()
318 self._ssl = ssl
319 self.server_hostname = server_hostname
321 if loop.get_debug():
322 self._source_traceback = traceback.extract_stack(sys._getframe(1))
324 self.update_version(version)
325 self.update_host(url)
326 self.update_headers(headers)
327 self.update_auto_headers(skip_auto_headers)
328 self.update_cookies(cookies)
329 self.update_content_encoding(data)
330 self.update_auth(auth, trust_env)
331 self.update_proxy(proxy, proxy_auth, proxy_headers)
333 self.update_body_from_data(data)
334 if data is not None or self.method not in self.GET_METHODS:
335 self.update_transfer_encoding()
336 self.update_expect_continue(expect100)
337 if traces is None:
338 traces = []
339 self._traces = traces
341 def __reset_writer(self, _: object = None) -> None:
342 self.__writer = None
344 @property
345 def _writer(self) -> Optional["asyncio.Task[None]"]:
346 return self.__writer
348 @_writer.setter
349 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
350 if self.__writer is not None:
351 self.__writer.remove_done_callback(self.__reset_writer)
352 self.__writer = writer
353 if writer is not None:
354 writer.add_done_callback(self.__reset_writer)
356 def is_ssl(self) -> bool:
357 return self.url.scheme in ("https", "wss")
359 @property
360 def ssl(self) -> Union["SSLContext", None, Literal[False], Fingerprint]:
361 return self._ssl
363 @property
364 def connection_key(self) -> ConnectionKey:
365 proxy_headers = self.proxy_headers
366 if proxy_headers:
367 h: Optional[int] = hash(tuple((k, v) for k, v in proxy_headers.items()))
368 else:
369 h = None
370 return ConnectionKey(
371 self.host,
372 self.port,
373 self.is_ssl(),
374 self.ssl,
375 self.proxy,
376 self.proxy_auth,
377 h,
378 )
380 @property
381 def host(self) -> str:
382 ret = self.url.raw_host
383 assert ret is not None
384 return ret
386 @property
387 def port(self) -> Optional[int]:
388 return self.url.port
390 @property
391 def request_info(self) -> RequestInfo:
392 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
393 return RequestInfo(self.url, self.method, headers, self.original_url)
395 def update_host(self, url: URL) -> None:
396 """Update destination host, port and connection type (ssl)."""
397 # get host/port
398 if not url.raw_host:
399 raise InvalidURL(url)
401 # basic auth info
402 username, password = url.user, url.password
403 if username:
404 self.auth = helpers.BasicAuth(username, password or "")
406 def update_version(self, version: Union[http.HttpVersion, str]) -> None:
407 """Convert request version to two elements tuple.
409 parser HTTP version '1.1' => (1, 1)
410 """
411 if isinstance(version, str):
412 v = [part.strip() for part in version.split(".", 1)]
413 try:
414 version = http.HttpVersion(int(v[0]), int(v[1]))
415 except ValueError:
416 raise ValueError(
417 f"Can not parse http version number: {version}"
418 ) from None
419 self.version = version
421 def update_headers(self, headers: Optional[LooseHeaders]) -> None:
422 """Update request headers."""
423 self.headers: CIMultiDict[str] = CIMultiDict()
425 # add host
426 netloc = cast(str, self.url.raw_host)
427 if helpers.is_ipv6_address(netloc):
428 netloc = f"[{netloc}]"
429 # See https://github.com/aio-libs/aiohttp/issues/3636.
430 netloc = netloc.rstrip(".")
431 if self.url.port is not None and not self.url.is_default_port():
432 netloc += ":" + str(self.url.port)
433 self.headers[hdrs.HOST] = netloc
435 if headers:
436 if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
437 headers = headers.items() # type: ignore[assignment]
439 for key, value in headers: # type: ignore[misc]
440 # A special case for Host header
441 if key.lower() == "host":
442 self.headers[key] = value
443 else:
444 self.headers.add(key, value)
446 def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None:
447 self.skip_auto_headers = CIMultiDict(
448 (hdr, None) for hdr in sorted(skip_auto_headers)
449 )
450 used_headers = self.headers.copy()
451 used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type]
453 for hdr, val in self.DEFAULT_HEADERS.items():
454 if hdr not in used_headers:
455 self.headers.add(hdr, val)
457 if hdrs.USER_AGENT not in used_headers:
458 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
460 def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
461 """Update request cookies header."""
462 if not cookies:
463 return
465 c = SimpleCookie()
466 if hdrs.COOKIE in self.headers:
467 c.load(self.headers.get(hdrs.COOKIE, ""))
468 del self.headers[hdrs.COOKIE]
470 if isinstance(cookies, Mapping):
471 iter_cookies = cookies.items()
472 else:
473 iter_cookies = cookies # type: ignore[assignment]
474 for name, value in iter_cookies:
475 if isinstance(value, Morsel):
476 # Preserve coded_value
477 mrsl_val = value.get(value.key, Morsel())
478 mrsl_val.set(value.key, value.value, value.coded_value)
479 c[name] = mrsl_val
480 else:
481 c[name] = value # type: ignore[assignment]
483 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
485 def update_content_encoding(self, data: Any) -> None:
486 """Set request content encoding."""
487 if data is None:
488 return
490 enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower()
491 if enc:
492 if self.compress:
493 raise ValueError(
494 "compress can not be set " "if Content-Encoding header is set"
495 )
496 elif self.compress:
497 if not isinstance(self.compress, str):
498 self.compress = "deflate"
499 self.headers[hdrs.CONTENT_ENCODING] = self.compress
500 self.chunked = True # enable chunked, no need to deal with length
502 def update_transfer_encoding(self) -> None:
503 """Analyze transfer-encoding header."""
504 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
506 if "chunked" in te:
507 if self.chunked:
508 raise ValueError(
509 "chunked can not be set "
510 'if "Transfer-Encoding: chunked" header is set'
511 )
513 elif self.chunked:
514 if hdrs.CONTENT_LENGTH in self.headers:
515 raise ValueError(
516 "chunked can not be set " "if Content-Length header is set"
517 )
519 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
520 else:
521 if hdrs.CONTENT_LENGTH not in self.headers:
522 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
524 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
525 """Set basic auth."""
526 if auth is None:
527 auth = self.auth
528 if auth is None and trust_env and self.url.host is not None:
529 netrc_obj = netrc_from_env()
530 with contextlib.suppress(LookupError):
531 auth = basicauth_from_netrc(netrc_obj, self.url.host)
532 if auth is None:
533 return
535 if not isinstance(auth, helpers.BasicAuth):
536 raise TypeError("BasicAuth() tuple is required instead")
538 self.headers[hdrs.AUTHORIZATION] = auth.encode()
540 def update_body_from_data(self, body: Any) -> None:
541 if body is None:
542 return
544 # FormData
545 if isinstance(body, FormData):
546 body = body()
548 try:
549 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
550 except payload.LookupError:
551 body = FormData(body)()
553 self.body = body
555 # enable chunked encoding if needed
556 if not self.chunked:
557 if hdrs.CONTENT_LENGTH not in self.headers:
558 size = body.size
559 if size is None:
560 self.chunked = True
561 else:
562 if hdrs.CONTENT_LENGTH not in self.headers:
563 self.headers[hdrs.CONTENT_LENGTH] = str(size)
565 # copy payload headers
566 assert body.headers
567 for (key, value) in body.headers.items():
568 if key in self.headers:
569 continue
570 if key in self.skip_auto_headers:
571 continue
572 self.headers[key] = value
574 def update_expect_continue(self, expect: bool = False) -> None:
575 if expect:
576 self.headers[hdrs.EXPECT] = "100-continue"
577 elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue":
578 expect = True
580 if expect:
581 self._continue = self.loop.create_future()
583 def update_proxy(
584 self,
585 proxy: Optional[URL],
586 proxy_auth: Optional[BasicAuth],
587 proxy_headers: Optional[LooseHeaders],
588 ) -> None:
589 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
590 raise ValueError("proxy_auth must be None or BasicAuth() tuple")
591 self.proxy = proxy
592 self.proxy_auth = proxy_auth
593 self.proxy_headers = proxy_headers
595 def keep_alive(self) -> bool:
596 if self.version < HttpVersion10:
597 # keep alive not supported at all
598 return False
599 if self.version == HttpVersion10:
600 if self.headers.get(hdrs.CONNECTION) == "keep-alive":
601 return True
602 else: # no headers means we close for Http 1.0
603 return False
604 elif self.headers.get(hdrs.CONNECTION) == "close":
605 return False
607 return True
609 async def write_bytes(
610 self, writer: AbstractStreamWriter, conn: "Connection"
611 ) -> None:
612 """Support coroutines that yields bytes objects."""
613 # 100 response
614 if self._continue is not None:
615 try:
616 await writer.drain()
617 await self._continue
618 except asyncio.CancelledError:
619 return
621 protocol = conn.protocol
622 assert protocol is not None
623 try:
624 if isinstance(self.body, payload.Payload):
625 await self.body.write(writer)
626 else:
627 if isinstance(self.body, (bytes, bytearray)):
628 self.body = (self.body,) # type: ignore[assignment]
630 for chunk in self.body:
631 await writer.write(chunk) # type: ignore[arg-type]
632 except OSError as exc:
633 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
634 protocol.set_exception(exc)
635 else:
636 new_exc = ClientOSError(
637 exc.errno, "Can not write request body for %s" % self.url
638 )
639 new_exc.__context__ = exc
640 new_exc.__cause__ = exc
641 protocol.set_exception(new_exc)
642 except asyncio.CancelledError:
643 await writer.write_eof()
644 except Exception as exc:
645 protocol.set_exception(exc)
646 else:
647 await writer.write_eof()
648 protocol.start_timeout()
650 async def send(self, conn: "Connection") -> "ClientResponse":
651 # Specify request target:
652 # - CONNECT request must send authority form URI
653 # - not CONNECT proxy must send absolute form URI
654 # - most common is origin form URI
655 if self.method == hdrs.METH_CONNECT:
656 connect_host = self.url.raw_host
657 assert connect_host is not None
658 if helpers.is_ipv6_address(connect_host):
659 connect_host = f"[{connect_host}]"
660 path = f"{connect_host}:{self.url.port}"
661 elif self.proxy and not self.is_ssl():
662 path = str(self.url)
663 else:
664 path = self.url.raw_path
665 if self.url.raw_query_string:
666 path += "?" + self.url.raw_query_string
668 protocol = conn.protocol
669 assert protocol is not None
670 writer = StreamWriter(
671 protocol,
672 self.loop,
673 on_chunk_sent=functools.partial(
674 self._on_chunk_request_sent, self.method, self.url
675 ),
676 on_headers_sent=functools.partial(
677 self._on_headers_request_sent, self.method, self.url
678 ),
679 )
681 if self.compress:
682 writer.enable_compression(self.compress)
684 if self.chunked is not None:
685 writer.enable_chunking()
687 # set default content-type
688 if (
689 self.method in self.POST_METHODS
690 and hdrs.CONTENT_TYPE not in self.skip_auto_headers
691 and hdrs.CONTENT_TYPE not in self.headers
692 ):
693 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
695 # set the connection header
696 connection = self.headers.get(hdrs.CONNECTION)
697 if not connection:
698 if self.keep_alive():
699 if self.version == HttpVersion10:
700 connection = "keep-alive"
701 else:
702 if self.version == HttpVersion11:
703 connection = "close"
705 if connection is not None:
706 self.headers[hdrs.CONNECTION] = connection
708 # status + headers
709 status_line = "{0} {1} HTTP/{v.major}.{v.minor}".format(
710 self.method, path, v=self.version
711 )
712 await writer.write_headers(status_line, self.headers)
714 self._writer = self.loop.create_task(self.write_bytes(writer, conn))
716 response_class = self.response_class
717 assert response_class is not None
718 self.response = response_class(
719 self.method,
720 self.original_url,
721 writer=self._writer,
722 continue100=self._continue,
723 timer=self._timer,
724 request_info=self.request_info,
725 traces=self._traces,
726 loop=self.loop,
727 session=self._session,
728 )
729 return self.response
731 async def close(self) -> None:
732 if self._writer is not None:
733 with contextlib.suppress(asyncio.CancelledError):
734 await self._writer
736 def terminate(self) -> None:
737 if self._writer is not None:
738 if not self.loop.is_closed():
739 self._writer.cancel()
740 self._writer.remove_done_callback(self.__reset_writer)
741 self._writer = None
743 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
744 for trace in self._traces:
745 await trace.send_request_chunk_sent(method, url, chunk)
747 async def _on_headers_request_sent(
748 self, method: str, url: URL, headers: "CIMultiDict[str]"
749 ) -> None:
750 for trace in self._traces:
751 await trace.send_request_headers(method, url, headers)
754class ClientResponse(HeadersMixin):
756 # Some of these attributes are None when created,
757 # but will be set by the start() method.
758 # As the end user will likely never see the None values, we cheat the types below.
759 # from the Status-Line of the response
760 version: Optional[HttpVersion] = None # HTTP-Version
761 status: int = None # type: ignore[assignment] # Status-Code
762 reason: Optional[str] = None # Reason-Phrase
764 content: StreamReader = None # type: ignore[assignment] # Payload stream
765 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment]
766 _raw_headers: RawHeaders = None # type: ignore[assignment]
768 _connection = None # current connection
769 _source_traceback: Optional[traceback.StackSummary] = None
770 # set up by ClientRequest after ClientResponse object creation
771 # post-init stage allows to not change ctor signature
772 _closed = True # to allow __del__ for non-initialized properly response
773 _released = False
774 __writer = None
776 def __init__(
777 self,
778 method: str,
779 url: URL,
780 *,
781 writer: "asyncio.Task[None]",
782 continue100: Optional["asyncio.Future[bool]"],
783 timer: BaseTimerContext,
784 request_info: RequestInfo,
785 traces: List["Trace"],
786 loop: asyncio.AbstractEventLoop,
787 session: "ClientSession",
788 ) -> None:
789 assert isinstance(url, URL)
791 self.method = method
792 self.cookies = SimpleCookie()
794 self._real_url = url
795 self._url = url.with_fragment(None)
796 self._body: Any = None
797 self._writer: Optional[asyncio.Task[None]] = writer
798 self._continue = continue100 # None by default
799 self._closed = True
800 self._history: Tuple[ClientResponse, ...] = ()
801 self._request_info = request_info
802 self._timer = timer if timer is not None else TimerNoop()
803 self._cache: Dict[str, Any] = {}
804 self._traces = traces
805 self._loop = loop
806 # store a reference to session #1985
807 self._session: Optional[ClientSession] = session
808 # Save reference to _resolve_charset, so that get_encoding() will still
809 # work after the response has finished reading the body.
810 if session is None:
811 # TODO: Fix session=None in tests (see ClientRequest.__init__).
812 self._resolve_charset: Callable[
813 ["ClientResponse", bytes], str
814 ] = lambda *_: "utf-8"
815 else:
816 self._resolve_charset = session._resolve_charset
817 if loop.get_debug():
818 self._source_traceback = traceback.extract_stack(sys._getframe(1))
820 def __reset_writer(self, _: object = None) -> None:
821 self.__writer = None
823 @property
824 def _writer(self) -> Optional["asyncio.Task[None]"]:
825 return self.__writer
827 @_writer.setter
828 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
829 if self.__writer is not None:
830 self.__writer.remove_done_callback(self.__reset_writer)
831 self.__writer = writer
832 if writer is not None:
833 writer.add_done_callback(self.__reset_writer)
835 @reify
836 def url(self) -> URL:
837 return self._url
839 @reify
840 def url_obj(self) -> URL:
841 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2)
842 return self._url
844 @reify
845 def real_url(self) -> URL:
846 return self._real_url
848 @reify
849 def host(self) -> str:
850 assert self._url.host is not None
851 return self._url.host
853 @reify
854 def headers(self) -> "CIMultiDictProxy[str]":
855 return self._headers
857 @reify
858 def raw_headers(self) -> RawHeaders:
859 return self._raw_headers
861 @reify
862 def request_info(self) -> RequestInfo:
863 return self._request_info
865 @reify
866 def content_disposition(self) -> Optional[ContentDisposition]:
867 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
868 if raw is None:
869 return None
870 disposition_type, params_dct = multipart.parse_content_disposition(raw)
871 params = MappingProxyType(params_dct)
872 filename = multipart.content_disposition_filename(params)
873 return ContentDisposition(disposition_type, params, filename)
875 def __del__(self, _warnings: Any = warnings) -> None:
876 if self._closed:
877 return
879 if self._connection is not None:
880 self._connection.release()
881 self._cleanup_writer()
883 if self._loop.get_debug():
884 kwargs = {"source": self}
885 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs)
886 context = {"client_response": self, "message": "Unclosed response"}
887 if self._source_traceback:
888 context["source_traceback"] = self._source_traceback
889 self._loop.call_exception_handler(context)
891 def __repr__(self) -> str:
892 out = io.StringIO()
893 ascii_encodable_url = str(self.url)
894 if self.reason:
895 ascii_encodable_reason = self.reason.encode(
896 "ascii", "backslashreplace"
897 ).decode("ascii")
898 else:
899 ascii_encodable_reason = "None"
900 print(
901 "<ClientResponse({}) [{} {}]>".format(
902 ascii_encodable_url, self.status, ascii_encodable_reason
903 ),
904 file=out,
905 )
906 print(self.headers, file=out)
907 return out.getvalue()
909 @property
910 def connection(self) -> Optional["Connection"]:
911 return self._connection
913 @reify
914 def history(self) -> Tuple["ClientResponse", ...]:
915 """A sequence of of responses, if redirects occurred."""
916 return self._history
918 @reify
919 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
920 links_str = ", ".join(self.headers.getall("link", []))
922 if not links_str:
923 return MultiDictProxy(MultiDict())
925 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict()
927 for val in re.split(r",(?=\s*<)", links_str):
928 match = re.match(r"\s*<(.*)>(.*)", val)
929 if match is None: # pragma: no cover
930 # the check exists to suppress mypy error
931 continue
932 url, params_str = match.groups()
933 params = params_str.split(";")[1:]
935 link: MultiDict[Union[str, URL]] = MultiDict()
937 for param in params:
938 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
939 if match is None: # pragma: no cover
940 # the check exists to suppress mypy error
941 continue
942 key, _, value, _ = match.groups()
944 link.add(key, value)
946 key = link.get("rel", url)
948 link.add("url", self.url.join(URL(url)))
950 links.add(str(key), MultiDictProxy(link))
952 return MultiDictProxy(links)
954 async def start(self, connection: "Connection") -> "ClientResponse":
955 """Start response processing."""
956 self._closed = False
957 self._protocol = connection.protocol
958 self._connection = connection
960 with self._timer:
961 while True:
962 # read response
963 try:
964 protocol = self._protocol
965 message, payload = await protocol.read() # type: ignore[union-attr]
966 except http.HttpProcessingError as exc:
967 raise ClientResponseError(
968 self.request_info,
969 self.history,
970 status=exc.code,
971 message=exc.message,
972 headers=exc.headers,
973 ) from exc
975 if message.code < 100 or message.code > 199 or message.code == 101:
976 break
978 if self._continue is not None:
979 set_result(self._continue, True)
980 self._continue = None
982 # payload eof handler
983 payload.on_eof(self._response_eof)
985 # response status
986 self.version = message.version
987 self.status = message.code
988 self.reason = message.reason
990 # headers
991 self._headers = message.headers # type is CIMultiDictProxy
992 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
994 # payload
995 self.content = payload
997 # cookies
998 for hdr in self.headers.getall(hdrs.SET_COOKIE, ()):
999 try:
1000 self.cookies.load(hdr)
1001 except CookieError as exc:
1002 client_logger.warning("Can not load response cookies: %s", exc)
1003 return self
1005 def _response_eof(self) -> None:
1006 if self._closed:
1007 return
1009 # protocol could be None because connection could be detached
1010 protocol = self._connection and self._connection.protocol
1011 if protocol is not None and protocol.upgraded:
1012 return
1014 self._closed = True
1015 self._cleanup_writer()
1016 self._release_connection()
1018 @property
1019 def closed(self) -> bool:
1020 return self._closed
1022 def close(self) -> None:
1023 if not self._released:
1024 self._notify_content()
1026 self._closed = True
1027 if self._loop is None or self._loop.is_closed():
1028 return
1030 self._cleanup_writer()
1031 if self._connection is not None:
1032 self._connection.close()
1033 self._connection = None
1035 def release(self) -> Any:
1036 if not self._released:
1037 self._notify_content()
1039 self._closed = True
1041 self._cleanup_writer()
1042 self._release_connection()
1043 return noop()
1045 @property
1046 def ok(self) -> bool:
1047 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
1049 This is **not** a check for ``200 OK`` but a check that the response
1050 status is under 400.
1051 """
1052 return 400 > self.status
1054 def raise_for_status(self) -> None:
1055 if not self.ok:
1056 # reason should always be not None for a started response
1057 assert self.reason is not None
1058 self.release()
1059 raise ClientResponseError(
1060 self.request_info,
1061 self.history,
1062 status=self.status,
1063 message=self.reason,
1064 headers=self.headers,
1065 )
1067 def _release_connection(self) -> None:
1068 if self._connection is not None:
1069 if self._writer is None:
1070 self._connection.release()
1071 self._connection = None
1072 else:
1073 self._writer.add_done_callback(lambda f: self._release_connection())
1075 async def _wait_released(self) -> None:
1076 if self._writer is not None:
1077 await self._writer
1078 self._release_connection()
1080 def _cleanup_writer(self) -> None:
1081 if self._writer is not None:
1082 self._writer.cancel()
1083 self._session = None
1085 def _notify_content(self) -> None:
1086 content = self.content
1087 if content and content.exception() is None:
1088 content.set_exception(ClientConnectionError("Connection closed"))
1089 self._released = True
1091 async def wait_for_close(self) -> None:
1092 if self._writer is not None:
1093 await self._writer
1094 self.release()
1096 async def read(self) -> bytes:
1097 """Read response payload."""
1098 if self._body is None:
1099 try:
1100 self._body = await self.content.read()
1101 for trace in self._traces:
1102 await trace.send_response_chunk_received(
1103 self.method, self.url, self._body
1104 )
1105 except BaseException:
1106 self.close()
1107 raise
1108 elif self._released: # Response explicitly released
1109 raise ClientConnectionError("Connection closed")
1111 protocol = self._connection and self._connection.protocol
1112 if protocol is None or not protocol.upgraded:
1113 await self._wait_released() # Underlying connection released
1114 return self._body # type: ignore[no-any-return]
1116 def get_encoding(self) -> str:
1117 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1118 mimetype = helpers.parse_mimetype(ctype)
1120 encoding = mimetype.parameters.get("charset")
1121 if encoding:
1122 with contextlib.suppress(LookupError):
1123 return codecs.lookup(encoding).name
1125 if mimetype.type == "application" and (
1126 mimetype.subtype == "json" or mimetype.subtype == "rdap"
1127 ):
1128 # RFC 7159 states that the default encoding is UTF-8.
1129 # RFC 7483 defines application/rdap+json
1130 return "utf-8"
1132 if self._body is None:
1133 raise RuntimeError(
1134 "Cannot compute fallback encoding of a not yet read body"
1135 )
1137 return self._resolve_charset(self, self._body)
1139 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
1140 """Read response payload and decode."""
1141 if self._body is None:
1142 await self.read()
1144 if encoding is None:
1145 encoding = self.get_encoding()
1147 return self._body.decode( # type: ignore[no-any-return,union-attr]
1148 encoding, errors=errors
1149 )
1151 async def json(
1152 self,
1153 *,
1154 encoding: Optional[str] = None,
1155 loads: JSONDecoder = DEFAULT_JSON_DECODER,
1156 content_type: Optional[str] = "application/json",
1157 ) -> Any:
1158 """Read and decodes JSON response."""
1159 if self._body is None:
1160 await self.read()
1162 if content_type:
1163 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1164 if not _is_expected_content_type(ctype, content_type):
1165 raise ContentTypeError(
1166 self.request_info,
1167 self.history,
1168 message=(
1169 "Attempt to decode JSON with " "unexpected mimetype: %s" % ctype
1170 ),
1171 headers=self.headers,
1172 )
1174 stripped = self._body.strip() # type: ignore[union-attr]
1175 if not stripped:
1176 return None
1178 if encoding is None:
1179 encoding = self.get_encoding()
1181 return loads(stripped.decode(encoding))
1183 async def __aenter__(self) -> "ClientResponse":
1184 return self
1186 async def __aexit__(
1187 self,
1188 exc_type: Optional[Type[BaseException]],
1189 exc_val: Optional[BaseException],
1190 exc_tb: Optional[TracebackType],
1191 ) -> None:
1192 # similar to _RequestContextManager, we do not need to check
1193 # for exceptions, response object can close connection
1194 # if state is broken
1195 self.release()
1196 await self.wait_for_close()