Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 25%
620 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import asyncio
2import codecs
3import contextlib
4import dataclasses
5import functools
6import io
7import re
8import sys
9import traceback
10import warnings
11from hashlib import md5, sha1, sha256
12from http.cookies import CookieError, Morsel, SimpleCookie
13from types import MappingProxyType, TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Dict,
18 Iterable,
19 List,
20 Mapping,
21 Optional,
22 Tuple,
23 Type,
24 Union,
25 cast,
26)
28from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
29from yarl import URL
31from . import hdrs, helpers, http, multipart, payload
32from .abc import AbstractStreamWriter
33from .client_exceptions import (
34 ClientConnectionError,
35 ClientOSError,
36 ClientResponseError,
37 ContentTypeError,
38 InvalidURL,
39 ServerFingerprintMismatch,
40)
41from .compression_utils import HAS_BROTLI
42from .formdata import FormData
43from .hdrs import CONTENT_TYPE
44from .helpers import (
45 BaseTimerContext,
46 BasicAuth,
47 HeadersMixin,
48 TimerNoop,
49 basicauth_from_netrc,
50 is_expected_content_type,
51 netrc_from_env,
52 noop,
53 parse_mimetype,
54 reify,
55 set_result,
56)
57from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter
58from .log import client_logger
59from .streams import StreamReader
60from .typedefs import (
61 DEFAULT_JSON_DECODER,
62 JSONDecoder,
63 LooseCookies,
64 LooseHeaders,
65 RawHeaders,
66)
68try:
69 import ssl
70 from ssl import SSLContext
71except ImportError: # pragma: no cover
72 ssl = None # type: ignore[assignment]
73 SSLContext = object # type: ignore[misc,assignment]
75try:
76 import cchardet as chardet
77except ImportError: # pragma: no cover
78 import charset_normalizer as chardet # type: ignore[no-redef]
81__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
84if TYPE_CHECKING: # pragma: no cover
85 from .client import ClientSession
86 from .connector import Connection
87 from .tracing import Trace
90_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
93def _gen_default_accept_encoding() -> str:
94 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate"
97@dataclasses.dataclass(frozen=True)
98class ContentDisposition:
99 type: Optional[str]
100 parameters: "MappingProxyType[str, str]"
101 filename: Optional[str]
104@dataclasses.dataclass(frozen=True)
105class RequestInfo:
106 url: URL
107 method: str
108 headers: "CIMultiDictProxy[str]"
109 real_url: URL
112class Fingerprint:
113 HASHFUNC_BY_DIGESTLEN = {
114 16: md5,
115 20: sha1,
116 32: sha256,
117 }
119 def __init__(self, fingerprint: bytes) -> None:
120 digestlen = len(fingerprint)
121 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
122 if not hashfunc:
123 raise ValueError("fingerprint has invalid length")
124 elif hashfunc is md5 or hashfunc is sha1:
125 raise ValueError(
126 "md5 and sha1 are insecure and " "not supported. Use sha256."
127 )
128 self._hashfunc = hashfunc
129 self._fingerprint = fingerprint
131 @property
132 def fingerprint(self) -> bytes:
133 return self._fingerprint
135 def check(self, transport: asyncio.Transport) -> None:
136 if not transport.get_extra_info("sslcontext"):
137 return
138 sslobj = transport.get_extra_info("ssl_object")
139 cert = sslobj.getpeercert(binary_form=True)
140 got = self._hashfunc(cert).digest()
141 if got != self._fingerprint:
142 host, port, *_ = transport.get_extra_info("peername")
143 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
146if ssl is not None:
147 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
148else: # pragma: no cover
149 SSL_ALLOWED_TYPES = type(None)
152@dataclasses.dataclass(frozen=True)
153class ConnectionKey:
154 # the key should contain an information about used proxy / TLS
155 # to prevent reusing wrong connections from a pool
156 host: str
157 port: Optional[int]
158 is_ssl: bool
159 ssl: Union[SSLContext, None, bool, Fingerprint]
160 proxy: Optional[URL]
161 proxy_auth: Optional[BasicAuth]
162 proxy_headers_hash: Optional[int] # hash(CIMultiDict)
165class ClientRequest:
166 GET_METHODS = {
167 hdrs.METH_GET,
168 hdrs.METH_HEAD,
169 hdrs.METH_OPTIONS,
170 hdrs.METH_TRACE,
171 }
172 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
173 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
175 DEFAULT_HEADERS = {
176 hdrs.ACCEPT: "*/*",
177 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
178 }
180 body = b""
181 auth = None
182 response = None
184 _writer = None # async task for streaming data
185 _continue = None # waiter future for '100 Continue' response
187 # N.B.
188 # Adding __del__ method with self._writer closing doesn't make sense
189 # because _writer is instance method, thus it keeps a reference to self.
190 # Until writer has finished finalizer will not be called.
192 def __init__(
193 self,
194 method: str,
195 url: URL,
196 *,
197 params: Optional[Mapping[str, str]] = None,
198 headers: Optional[LooseHeaders] = None,
199 skip_auto_headers: Iterable[str] = frozenset(),
200 data: Any = None,
201 cookies: Optional[LooseCookies] = None,
202 auth: Optional[BasicAuth] = None,
203 version: http.HttpVersion = http.HttpVersion11,
204 compress: Optional[str] = None,
205 chunked: Optional[bool] = None,
206 expect100: bool = False,
207 loop: asyncio.AbstractEventLoop,
208 response_class: Optional[Type["ClientResponse"]] = None,
209 proxy: Optional[URL] = None,
210 proxy_auth: Optional[BasicAuth] = None,
211 timer: Optional[BaseTimerContext] = None,
212 session: Optional["ClientSession"] = None,
213 ssl: Union[SSLContext, bool, Fingerprint, None] = None,
214 proxy_headers: Optional[LooseHeaders] = None,
215 traces: Optional[List["Trace"]] = None,
216 trust_env: bool = False,
217 ):
218 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
219 if match:
220 raise ValueError(
221 f"Method cannot contain non-token characters {method!r} "
222 f"(found at least {match.group()!r})"
223 )
224 assert isinstance(url, URL), url
225 assert isinstance(proxy, (URL, type(None))), proxy
226 # FIXME: session is None in tests only, need to fix tests
227 # assert session is not None
228 self._session = cast("ClientSession", session)
229 if params:
230 q = MultiDict(url.query)
231 url2 = url.with_query(params)
232 q.extend(url2.query)
233 url = url.with_query(q)
234 self.original_url = url
235 self.url = url.with_fragment(None)
236 self.method = method.upper()
237 self.chunked = chunked
238 self.compress = compress
239 self.loop = loop
240 self.length = None
241 if response_class is None:
242 real_response_class = ClientResponse
243 else:
244 real_response_class = response_class
245 self.response_class: Type[ClientResponse] = real_response_class
246 self._timer = timer if timer is not None else TimerNoop()
247 self._ssl = ssl
249 if loop.get_debug():
250 self._source_traceback = traceback.extract_stack(sys._getframe(1))
252 self.update_version(version)
253 self.update_host(url)
254 self.update_headers(headers)
255 self.update_auto_headers(skip_auto_headers)
256 self.update_cookies(cookies)
257 self.update_content_encoding(data)
258 self.update_auth(auth, trust_env)
259 self.update_proxy(proxy, proxy_auth, proxy_headers)
261 self.update_body_from_data(data)
262 if data is not None or self.method not in self.GET_METHODS:
263 self.update_transfer_encoding()
264 self.update_expect_continue(expect100)
265 if traces is None:
266 traces = []
267 self._traces = traces
269 def is_ssl(self) -> bool:
270 return self.url.scheme in ("https", "wss")
272 @property
273 def ssl(self) -> Union["SSLContext", None, bool, Fingerprint]:
274 return self._ssl
276 @property
277 def connection_key(self) -> ConnectionKey:
278 proxy_headers = self.proxy_headers
279 if proxy_headers:
280 h: Optional[int] = hash(tuple((k, v) for k, v in proxy_headers.items()))
281 else:
282 h = None
283 return ConnectionKey(
284 self.host,
285 self.port,
286 self.is_ssl(),
287 self.ssl,
288 self.proxy,
289 self.proxy_auth,
290 h,
291 )
293 @property
294 def host(self) -> str:
295 ret = self.url.raw_host
296 assert ret is not None
297 return ret
299 @property
300 def port(self) -> Optional[int]:
301 return self.url.port
303 @property
304 def request_info(self) -> RequestInfo:
305 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
306 return RequestInfo(self.url, self.method, headers, self.original_url)
308 def update_host(self, url: URL) -> None:
309 """Update destination host, port and connection type (ssl)."""
310 # get host/port
311 if not url.raw_host:
312 raise InvalidURL(url)
314 # basic auth info
315 username, password = url.user, url.password
316 if username:
317 self.auth = helpers.BasicAuth(username, password or "")
319 def update_version(self, version: Union[http.HttpVersion, str]) -> None:
320 """Convert request version to two elements tuple.
322 parser HTTP version '1.1' => (1, 1)
323 """
324 if isinstance(version, str):
325 v = [part.strip() for part in version.split(".", 1)]
326 try:
327 version = http.HttpVersion(int(v[0]), int(v[1]))
328 except ValueError:
329 raise ValueError(
330 f"Can not parse http version number: {version}"
331 ) from None
332 self.version = version
334 def update_headers(self, headers: Optional[LooseHeaders]) -> None:
335 """Update request headers."""
336 self.headers: CIMultiDict[str] = CIMultiDict()
338 # add host
339 netloc = cast(str, self.url.raw_host)
340 if helpers.is_ipv6_address(netloc):
341 netloc = f"[{netloc}]"
342 if self.url.port is not None and not self.url.is_default_port():
343 netloc += ":" + str(self.url.port)
344 self.headers[hdrs.HOST] = netloc
346 if headers:
347 if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
348 headers = headers.items() # type: ignore[assignment]
350 for key, value in headers: # type: ignore[misc]
351 # A special case for Host header
352 if key.lower() == "host":
353 self.headers[key] = value
354 else:
355 self.headers.add(key, value)
357 def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None:
358 self.skip_auto_headers = CIMultiDict(
359 (hdr, None) for hdr in sorted(skip_auto_headers)
360 )
361 used_headers = self.headers.copy()
362 used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type]
364 for hdr, val in self.DEFAULT_HEADERS.items():
365 if hdr not in used_headers:
366 self.headers.add(hdr, val)
368 if hdrs.USER_AGENT not in used_headers:
369 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
371 def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
372 """Update request cookies header."""
373 if not cookies:
374 return
376 c: SimpleCookie[str] = SimpleCookie()
377 if hdrs.COOKIE in self.headers:
378 c.load(self.headers.get(hdrs.COOKIE, ""))
379 del self.headers[hdrs.COOKIE]
381 if isinstance(cookies, Mapping):
382 iter_cookies = cookies.items()
383 else:
384 iter_cookies = cookies # type: ignore[assignment]
385 for name, value in iter_cookies:
386 if isinstance(value, Morsel):
387 # Preserve coded_value
388 mrsl_val = value.get(value.key, Morsel())
389 mrsl_val.set(value.key, value.value, value.coded_value)
390 c[name] = mrsl_val
391 else:
392 c[name] = value # type: ignore[assignment]
394 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
396 def update_content_encoding(self, data: Any) -> None:
397 """Set request content encoding."""
398 if data is None:
399 return
401 enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower()
402 if enc:
403 if self.compress:
404 raise ValueError(
405 "compress can not be set " "if Content-Encoding header is set"
406 )
407 elif self.compress:
408 if not isinstance(self.compress, str):
409 self.compress = "deflate"
410 self.headers[hdrs.CONTENT_ENCODING] = self.compress
411 self.chunked = True # enable chunked, no need to deal with length
413 def update_transfer_encoding(self) -> None:
414 """Analyze transfer-encoding header."""
415 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
417 if "chunked" in te:
418 if self.chunked:
419 raise ValueError(
420 "chunked can not be set "
421 'if "Transfer-Encoding: chunked" header is set'
422 )
424 elif self.chunked:
425 if hdrs.CONTENT_LENGTH in self.headers:
426 raise ValueError(
427 "chunked can not be set " "if Content-Length header is set"
428 )
430 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
431 else:
432 if hdrs.CONTENT_LENGTH not in self.headers:
433 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
435 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
436 """Set basic auth."""
437 if auth is None:
438 auth = self.auth
439 if auth is None and trust_env and self.url.host is not None:
440 netrc_obj = netrc_from_env()
441 with contextlib.suppress(LookupError):
442 auth = basicauth_from_netrc(netrc_obj, self.url.host)
443 if auth is None:
444 return
446 if not isinstance(auth, helpers.BasicAuth):
447 raise TypeError("BasicAuth() tuple is required instead")
449 self.headers[hdrs.AUTHORIZATION] = auth.encode()
451 def update_body_from_data(self, body: Any) -> None:
452 if body is None:
453 return
455 # FormData
456 if isinstance(body, FormData):
457 body = body()
459 try:
460 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
461 except payload.LookupError:
462 boundary = None
463 if CONTENT_TYPE in self.headers:
464 boundary = parse_mimetype(self.headers[CONTENT_TYPE]).parameters.get(
465 "boundary"
466 )
467 body = FormData(body, boundary=boundary)()
469 self.body = body
471 # enable chunked encoding if needed
472 if not self.chunked:
473 if hdrs.CONTENT_LENGTH not in self.headers:
474 size = body.size
475 if size is None:
476 self.chunked = True
477 else:
478 if hdrs.CONTENT_LENGTH not in self.headers:
479 self.headers[hdrs.CONTENT_LENGTH] = str(size)
481 # copy payload headers
482 assert body.headers
483 for key, value in body.headers.items():
484 if key in self.headers:
485 continue
486 if key in self.skip_auto_headers:
487 continue
488 self.headers[key] = value
490 def update_expect_continue(self, expect: bool = False) -> None:
491 if expect:
492 self.headers[hdrs.EXPECT] = "100-continue"
493 elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue":
494 expect = True
496 if expect:
497 self._continue = self.loop.create_future()
499 def update_proxy(
500 self,
501 proxy: Optional[URL],
502 proxy_auth: Optional[BasicAuth],
503 proxy_headers: Optional[LooseHeaders],
504 ) -> None:
505 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
506 raise ValueError("proxy_auth must be None or BasicAuth() tuple")
507 self.proxy = proxy
508 self.proxy_auth = proxy_auth
509 self.proxy_headers = proxy_headers
511 def keep_alive(self) -> bool:
512 if self.version < HttpVersion10:
513 # keep alive not supported at all
514 return False
515 if self.version == HttpVersion10:
516 if self.headers.get(hdrs.CONNECTION) == "keep-alive":
517 return True
518 else: # no headers means we close for Http 1.0
519 return False
520 elif self.headers.get(hdrs.CONNECTION) == "close":
521 return False
523 return True
525 async def write_bytes(
526 self, writer: AbstractStreamWriter, conn: "Connection"
527 ) -> None:
528 """Support coroutines that yields bytes objects."""
529 # 100 response
530 if self._continue is not None:
531 await writer.drain()
532 await self._continue
534 protocol = conn.protocol
535 assert protocol is not None
536 try:
537 if isinstance(self.body, payload.Payload):
538 await self.body.write(writer)
539 else:
540 if isinstance(self.body, (bytes, bytearray)):
541 self.body = (self.body,) # type: ignore[assignment]
543 for chunk in self.body:
544 await writer.write(chunk) # type: ignore[arg-type]
546 await writer.write_eof()
547 except OSError as exc:
548 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
549 protocol.set_exception(exc)
550 else:
551 new_exc = ClientOSError(
552 exc.errno, "Can not write request body for %s" % self.url
553 )
554 new_exc.__context__ = exc
555 new_exc.__cause__ = exc
556 protocol.set_exception(new_exc)
557 except asyncio.CancelledError as exc:
558 if not conn.closed:
559 protocol.set_exception(exc)
560 except Exception as exc:
561 protocol.set_exception(exc)
562 else:
563 protocol.start_timeout()
564 finally:
565 self._writer = None
567 async def send(self, conn: "Connection") -> "ClientResponse":
568 # Specify request target:
569 # - CONNECT request must send authority form URI
570 # - not CONNECT proxy must send absolute form URI
571 # - most common is origin form URI
572 if self.method == hdrs.METH_CONNECT:
573 connect_host = self.url.raw_host
574 assert connect_host is not None
575 if helpers.is_ipv6_address(connect_host):
576 connect_host = f"[{connect_host}]"
577 path = f"{connect_host}:{self.url.port}"
578 elif self.proxy and not self.is_ssl():
579 path = str(self.url)
580 else:
581 path = self.url.raw_path
582 if self.url.raw_query_string:
583 path += "?" + self.url.raw_query_string
585 protocol = conn.protocol
586 assert protocol is not None
587 writer = StreamWriter(
588 protocol,
589 self.loop,
590 on_chunk_sent=functools.partial(
591 self._on_chunk_request_sent, self.method, self.url
592 ),
593 on_headers_sent=functools.partial(
594 self._on_headers_request_sent, self.method, self.url
595 ),
596 )
598 if self.compress:
599 writer.enable_compression(self.compress)
601 if self.chunked is not None:
602 writer.enable_chunking()
604 # set default content-type
605 if (
606 self.method in self.POST_METHODS
607 and hdrs.CONTENT_TYPE not in self.skip_auto_headers
608 and hdrs.CONTENT_TYPE not in self.headers
609 ):
610 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
612 # set the connection header
613 connection = self.headers.get(hdrs.CONNECTION)
614 if not connection:
615 if self.keep_alive():
616 if self.version == HttpVersion10:
617 connection = "keep-alive"
618 else:
619 if self.version == HttpVersion11:
620 connection = "close"
622 if connection is not None:
623 self.headers[hdrs.CONNECTION] = connection
625 # status + headers
626 status_line = "{0} {1} HTTP/{2[0]}.{2[1]}".format(
627 self.method, path, self.version
628 )
629 await writer.write_headers(status_line, self.headers)
631 self._writer = self.loop.create_task(self.write_bytes(writer, conn))
633 response_class = self.response_class
634 assert response_class is not None
635 self.response = response_class(
636 self.method,
637 self.original_url,
638 writer=self._writer,
639 continue100=self._continue,
640 timer=self._timer,
641 request_info=self.request_info,
642 traces=self._traces,
643 loop=self.loop,
644 session=self._session,
645 )
646 return self.response
648 async def close(self) -> None:
649 if self._writer is not None:
650 try:
651 await self._writer
652 finally:
653 self._writer = None
655 def terminate(self) -> None:
656 if self._writer is not None:
657 if not self.loop.is_closed():
658 self._writer.cancel()
659 self._writer = None
661 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
662 for trace in self._traces:
663 await trace.send_request_chunk_sent(method, url, chunk)
665 async def _on_headers_request_sent(
666 self, method: str, url: URL, headers: "CIMultiDict[str]"
667 ) -> None:
668 for trace in self._traces:
669 await trace.send_request_headers(method, url, headers)
672class ClientResponse(HeadersMixin):
673 # Some of these attributes are None when created,
674 # but will be set by the start() method.
675 # As the end user will likely never see the None values, we cheat the types below.
676 # from the Status-Line of the response
677 version = None # HTTP-Version
678 status: int = None # type: ignore[assignment] # Status-Code
679 reason = None # Reason-Phrase
681 content: StreamReader = None # type: ignore[assignment] # Payload stream
682 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment]
683 _raw_headers: RawHeaders = None # type: ignore[assignment]
685 _connection = None # current connection
686 _source_traceback = None
687 # set up by ClientRequest after ClientResponse object creation
688 # post-init stage allows to not change ctor signature
689 _closed = True # to allow __del__ for non-initialized properly response
690 _released = False
692 def __init__(
693 self,
694 method: str,
695 url: URL,
696 *,
697 writer: "asyncio.Task[None]",
698 continue100: Optional["asyncio.Future[bool]"],
699 timer: BaseTimerContext,
700 request_info: RequestInfo,
701 traces: List["Trace"],
702 loop: asyncio.AbstractEventLoop,
703 session: "ClientSession",
704 ) -> None:
705 assert isinstance(url, URL)
706 super().__init__()
708 self.method = method
709 self.cookies: SimpleCookie[str] = SimpleCookie()
711 self._real_url = url
712 self._url = url.with_fragment(None)
713 self._body: Optional[bytes] = None
714 self._writer: Optional[asyncio.Task[None]] = writer
715 self._continue = continue100 # None by default
716 self._closed = True
717 self._history: Tuple[ClientResponse, ...] = ()
718 self._request_info = request_info
719 self._timer = timer if timer is not None else TimerNoop()
720 self._cache: Dict[str, Any] = {}
721 self._traces = traces
722 self._loop = loop
723 # store a reference to session #1985
724 self._session: Optional[ClientSession] = session
725 if loop.get_debug():
726 self._source_traceback = traceback.extract_stack(sys._getframe(1))
728 @reify
729 def url(self) -> URL:
730 return self._url
732 @reify
733 def real_url(self) -> URL:
734 return self._real_url
736 @reify
737 def host(self) -> str:
738 assert self._url.host is not None
739 return self._url.host
741 @reify
742 def headers(self) -> "CIMultiDictProxy[str]":
743 return self._headers
745 @reify
746 def raw_headers(self) -> RawHeaders:
747 return self._raw_headers
749 @reify
750 def request_info(self) -> RequestInfo:
751 return self._request_info
753 @reify
754 def content_disposition(self) -> Optional[ContentDisposition]:
755 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
756 if raw is None:
757 return None
758 disposition_type, params_dct = multipart.parse_content_disposition(raw)
759 params = MappingProxyType(params_dct)
760 filename = multipart.content_disposition_filename(params)
761 return ContentDisposition(disposition_type, params, filename)
763 def __del__(self, _warnings: Any = warnings) -> None:
764 if self._closed:
765 return
767 if self._connection is not None:
768 self._connection.release()
769 self._cleanup_writer()
771 if self._loop.get_debug():
772 _warnings.warn(
773 f"Unclosed response {self!r}", ResourceWarning, source=self
774 )
775 context = {"client_response": self, "message": "Unclosed response"}
776 if self._source_traceback:
777 context["source_traceback"] = self._source_traceback
778 self._loop.call_exception_handler(context)
780 def __repr__(self) -> str:
781 out = io.StringIO()
782 ascii_encodable_url = str(self.url)
783 if self.reason:
784 ascii_encodable_reason = self.reason.encode(
785 "ascii", "backslashreplace"
786 ).decode("ascii")
787 else:
788 ascii_encodable_reason = self.reason
789 print(
790 "<ClientResponse({}) [{} {}]>".format(
791 ascii_encodable_url, self.status, ascii_encodable_reason
792 ),
793 file=out,
794 )
795 print(self.headers, file=out)
796 return out.getvalue()
798 @property
799 def connection(self) -> Optional["Connection"]:
800 return self._connection
802 @reify
803 def history(self) -> Tuple["ClientResponse", ...]:
804 """A sequence of responses, if redirects occurred."""
805 return self._history
807 @reify
808 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
809 links_str = ", ".join(self.headers.getall("link", []))
811 if not links_str:
812 return MultiDictProxy(MultiDict())
814 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict()
816 for val in re.split(r",(?=\s*<)", links_str):
817 match = re.match(r"\s*<(.*)>(.*)", val)
818 if match is None: # pragma: no cover
819 # the check exists to suppress mypy error
820 continue
821 url, params_str = match.groups()
822 params = params_str.split(";")[1:]
824 link: MultiDict[Union[str, URL]] = MultiDict()
826 for param in params:
827 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
828 if match is None: # pragma: no cover
829 # the check exists to suppress mypy error
830 continue
831 key, _, value, _ = match.groups()
833 link.add(key, value)
835 key = link.get("rel", url)
837 link.add("url", self.url.join(URL(url)))
839 links.add(str(key), MultiDictProxy(link))
841 return MultiDictProxy(links)
843 async def start(self, connection: "Connection") -> "ClientResponse":
844 """Start response processing."""
845 self._closed = False
846 self._protocol = connection.protocol
847 self._connection = connection
849 with self._timer:
850 while True:
851 # read response
852 try:
853 protocol = self._protocol
854 message, payload = await protocol.read() # type: ignore[union-attr]
855 except http.HttpProcessingError as exc:
856 raise ClientResponseError(
857 self.request_info,
858 self.history,
859 status=exc.code,
860 message=exc.message,
861 headers=exc.headers,
862 ) from exc
864 if message.code < 100 or message.code > 199 or message.code == 101:
865 break
867 if self._continue is not None:
868 set_result(self._continue, True)
869 self._continue = None
871 # payload eof handler
872 payload.on_eof(self._response_eof)
874 # response status
875 self.version = message.version
876 self.status = message.code
877 self.reason = message.reason
879 # headers
880 self._headers = message.headers # type is CIMultiDictProxy
881 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
883 # payload
884 self.content = payload
886 # cookies
887 for hdr in self.headers.getall(hdrs.SET_COOKIE, ()):
888 try:
889 self.cookies.load(hdr)
890 except CookieError as exc:
891 client_logger.warning("Can not load response cookies: %s", exc)
892 return self
894 def _response_eof(self) -> None:
895 if self._closed:
896 return
898 if self._connection is not None:
899 # websocket, protocol could be None because
900 # connection could be detached
901 if (
902 self._connection.protocol is not None
903 and self._connection.protocol.upgraded
904 ):
905 return
907 self._connection.release()
908 self._connection = None
910 self._closed = True
911 self._cleanup_writer()
913 @property
914 def closed(self) -> bool:
915 return self._closed
917 def close(self) -> None:
918 if not self._released:
919 self._notify_content()
920 if self._closed:
921 return
923 self._closed = True
924 if self._loop is None or self._loop.is_closed():
925 return
927 if self._connection is not None:
928 self._connection.close()
929 self._connection = None
930 self._cleanup_writer()
932 def release(self) -> Any:
933 if not self._released:
934 self._notify_content()
935 if self._closed:
936 return noop()
938 self._closed = True
939 if self._connection is not None:
940 self._connection.release()
941 self._connection = None
943 self._cleanup_writer()
944 return noop()
946 @property
947 def ok(self) -> bool:
948 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
950 This is **not** a check for ``200 OK`` but a check that the response
951 status is under 400.
952 """
953 return 400 > self.status
955 def raise_for_status(self) -> None:
956 if not self.ok:
957 # reason should always be not None for a started response
958 assert self.reason is not None
959 self.release()
960 raise ClientResponseError(
961 self.request_info,
962 self.history,
963 status=self.status,
964 message=self.reason,
965 headers=self.headers,
966 )
968 def _cleanup_writer(self) -> None:
969 if self._writer is not None:
970 self._writer.cancel()
971 self._writer = None
972 self._session = None
974 def _notify_content(self) -> None:
975 content = self.content
976 if content and content.exception() is None:
977 content.set_exception(ClientConnectionError("Connection closed"))
978 self._released = True
980 async def wait_for_close(self) -> None:
981 if self._writer is not None:
982 try:
983 await self._writer
984 finally:
985 self._writer = None
986 self.release()
988 async def read(self) -> bytes:
989 """Read response payload."""
990 if self._body is None:
991 try:
992 self._body = await self.content.read()
993 for trace in self._traces:
994 await trace.send_response_chunk_received(
995 self.method, self.url, self._body
996 )
997 except BaseException:
998 self.close()
999 raise
1000 elif self._released:
1001 raise ClientConnectionError("Connection closed")
1003 return self._body
1005 def get_encoding(self) -> str:
1006 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1007 mimetype = helpers.parse_mimetype(ctype)
1009 encoding = mimetype.parameters.get("charset")
1010 if encoding:
1011 try:
1012 codecs.lookup(encoding)
1013 except LookupError:
1014 encoding = None
1015 if not encoding:
1016 if mimetype.type == "application" and (
1017 mimetype.subtype == "json" or mimetype.subtype == "rdap"
1018 ):
1019 # RFC 7159 states that the default encoding is UTF-8.
1020 # RFC 7483 defines application/rdap+json
1021 encoding = "utf-8"
1022 elif self._body is None:
1023 raise RuntimeError(
1024 "Cannot guess the encoding of " "a not yet read body"
1025 )
1026 else:
1027 encoding = chardet.detect(self._body)["encoding"]
1028 if not encoding:
1029 encoding = "utf-8"
1031 return encoding
1033 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
1034 """Read response payload and decode."""
1035 if self._body is None:
1036 await self.read()
1038 if encoding is None:
1039 encoding = self.get_encoding()
1041 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
1043 async def json(
1044 self,
1045 *,
1046 encoding: Optional[str] = None,
1047 loads: JSONDecoder = DEFAULT_JSON_DECODER,
1048 content_type: Optional[str] = "application/json",
1049 ) -> Any:
1050 """Read and decodes JSON response."""
1051 if self._body is None:
1052 await self.read()
1054 if content_type:
1055 if not is_expected_content_type(self.content_type, content_type):
1056 raise ContentTypeError(
1057 self.request_info,
1058 self.history,
1059 message=(
1060 "Attempt to decode JSON with "
1061 "unexpected mimetype: %s" % self.content_type
1062 ),
1063 headers=self.headers,
1064 )
1066 if encoding is None:
1067 encoding = self.get_encoding()
1069 return loads(self._body.decode(encoding)) # type: ignore[union-attr]
1071 async def __aenter__(self) -> "ClientResponse":
1072 return self
1074 async def __aexit__(
1075 self,
1076 exc_type: Optional[Type[BaseException]],
1077 exc_val: Optional[BaseException],
1078 exc_tb: Optional[TracebackType],
1079 ) -> None:
1080 # similar to _RequestContextManager, we do not need to check
1081 # for exceptions, response object can close connection
1082 # if state is broken
1083 self.release()