Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_reqrep.py: 41%
642 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1import asyncio
2import codecs
3import functools
4import io
5import re
6import sys
7import traceback
8import warnings
9from hashlib import md5, sha1, sha256
10from http.cookies import CookieError, Morsel, SimpleCookie
11from types import MappingProxyType, TracebackType
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Dict,
16 Iterable,
17 List,
18 Mapping,
19 Optional,
20 Tuple,
21 Type,
22 Union,
23 cast,
24)
26import attr
27from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
28from yarl import URL
30from . import hdrs, helpers, http, multipart, payload
31from .abc import AbstractStreamWriter
32from .client_exceptions import (
33 ClientConnectionError,
34 ClientOSError,
35 ClientResponseError,
36 ContentTypeError,
37 InvalidURL,
38 ServerFingerprintMismatch,
39)
40from .formdata import FormData
41from .helpers import (
42 PY_36,
43 BaseTimerContext,
44 BasicAuth,
45 HeadersMixin,
46 TimerNoop,
47 noop,
48 reify,
49 set_result,
50)
51from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter
52from .log import client_logger
53from .streams import StreamReader
54from .typedefs import (
55 DEFAULT_JSON_DECODER,
56 JSONDecoder,
57 LooseCookies,
58 LooseHeaders,
59 RawHeaders,
60)
62try:
63 import ssl
64 from ssl import SSLContext
65except ImportError: # pragma: no cover
66 ssl = None # type: ignore[assignment]
67 SSLContext = object # type: ignore[misc,assignment]
69try:
70 import cchardet as chardet
71except ImportError: # pragma: no cover
72 import charset_normalizer as chardet # type: ignore[no-redef]
75__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
78if TYPE_CHECKING: # pragma: no cover
79 from .client import ClientSession
80 from .connector import Connection
81 from .tracing import Trace
84json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json")
87@attr.s(auto_attribs=True, frozen=True, slots=True)
88class ContentDisposition:
89 type: Optional[str]
90 parameters: "MappingProxyType[str, str]"
91 filename: Optional[str]
94@attr.s(auto_attribs=True, frozen=True, slots=True)
95class RequestInfo:
96 url: URL
97 method: str
98 headers: "CIMultiDictProxy[str]"
99 real_url: URL = attr.ib()
101 @real_url.default
102 def real_url_default(self) -> URL:
103 return self.url
106class Fingerprint:
107 HASHFUNC_BY_DIGESTLEN = {
108 16: md5,
109 20: sha1,
110 32: sha256,
111 }
113 def __init__(self, fingerprint: bytes) -> None:
114 digestlen = len(fingerprint)
115 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
116 if not hashfunc:
117 raise ValueError("fingerprint has invalid length")
118 elif hashfunc is md5 or hashfunc is sha1:
119 raise ValueError(
120 "md5 and sha1 are insecure and " "not supported. Use sha256."
121 )
122 self._hashfunc = hashfunc
123 self._fingerprint = fingerprint
125 @property
126 def fingerprint(self) -> bytes:
127 return self._fingerprint
129 def check(self, transport: asyncio.Transport) -> None:
130 if not transport.get_extra_info("sslcontext"):
131 return
132 sslobj = transport.get_extra_info("ssl_object")
133 cert = sslobj.getpeercert(binary_form=True)
134 got = self._hashfunc(cert).digest()
135 if got != self._fingerprint:
136 host, port, *_ = transport.get_extra_info("peername")
137 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
140if ssl is not None:
141 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
142else: # pragma: no cover
143 SSL_ALLOWED_TYPES = type(None)
146def _merge_ssl_params(
147 ssl: Union["SSLContext", bool, Fingerprint, None],
148 verify_ssl: Optional[bool],
149 ssl_context: Optional["SSLContext"],
150 fingerprint: Optional[bytes],
151) -> Union["SSLContext", bool, Fingerprint, None]:
152 if verify_ssl is not None and not verify_ssl:
153 warnings.warn(
154 "verify_ssl is deprecated, use ssl=False instead",
155 DeprecationWarning,
156 stacklevel=3,
157 )
158 if ssl is not None:
159 raise ValueError(
160 "verify_ssl, ssl_context, fingerprint and ssl "
161 "parameters are mutually exclusive"
162 )
163 else:
164 ssl = False
165 if ssl_context is not None:
166 warnings.warn(
167 "ssl_context is deprecated, use ssl=context instead",
168 DeprecationWarning,
169 stacklevel=3,
170 )
171 if ssl is not None:
172 raise ValueError(
173 "verify_ssl, ssl_context, fingerprint and ssl "
174 "parameters are mutually exclusive"
175 )
176 else:
177 ssl = ssl_context
178 if fingerprint is not None:
179 warnings.warn(
180 "fingerprint is deprecated, " "use ssl=Fingerprint(fingerprint) instead",
181 DeprecationWarning,
182 stacklevel=3,
183 )
184 if ssl is not None:
185 raise ValueError(
186 "verify_ssl, ssl_context, fingerprint and ssl "
187 "parameters are mutually exclusive"
188 )
189 else:
190 ssl = Fingerprint(fingerprint)
191 if not isinstance(ssl, SSL_ALLOWED_TYPES):
192 raise TypeError(
193 "ssl should be SSLContext, bool, Fingerprint or None, "
194 "got {!r} instead.".format(ssl)
195 )
196 return ssl
199@attr.s(auto_attribs=True, slots=True, frozen=True)
200class ConnectionKey:
201 # the key should contain an information about used proxy / TLS
202 # to prevent reusing wrong connections from a pool
203 host: str
204 port: Optional[int]
205 is_ssl: bool
206 ssl: Union[SSLContext, None, bool, Fingerprint]
207 proxy: Optional[URL]
208 proxy_auth: Optional[BasicAuth]
209 proxy_headers_hash: Optional[int] # hash(CIMultiDict)
212def _is_expected_content_type(
213 response_content_type: str, expected_content_type: str
214) -> bool:
215 if expected_content_type == "application/json":
216 return json_re.match(response_content_type) is not None
217 return expected_content_type in response_content_type
220class ClientRequest:
221 GET_METHODS = {
222 hdrs.METH_GET,
223 hdrs.METH_HEAD,
224 hdrs.METH_OPTIONS,
225 hdrs.METH_TRACE,
226 }
227 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
228 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
230 DEFAULT_HEADERS = {
231 hdrs.ACCEPT: "*/*",
232 hdrs.ACCEPT_ENCODING: "gzip, deflate",
233 }
235 body = b""
236 auth = None
237 response = None
239 _writer = None # async task for streaming data
240 _continue = None # waiter future for '100 Continue' response
242 # N.B.
243 # Adding __del__ method with self._writer closing doesn't make sense
244 # because _writer is instance method, thus it keeps a reference to self.
245 # Until writer has finished finalizer will not be called.
247 def __init__(
248 self,
249 method: str,
250 url: URL,
251 *,
252 params: Optional[Mapping[str, str]] = None,
253 headers: Optional[LooseHeaders] = None,
254 skip_auto_headers: Iterable[str] = frozenset(),
255 data: Any = None,
256 cookies: Optional[LooseCookies] = None,
257 auth: Optional[BasicAuth] = None,
258 version: http.HttpVersion = http.HttpVersion11,
259 compress: Optional[str] = None,
260 chunked: Optional[bool] = None,
261 expect100: bool = False,
262 loop: Optional[asyncio.AbstractEventLoop] = None,
263 response_class: Optional[Type["ClientResponse"]] = None,
264 proxy: Optional[URL] = None,
265 proxy_auth: Optional[BasicAuth] = None,
266 timer: Optional[BaseTimerContext] = None,
267 session: Optional["ClientSession"] = None,
268 ssl: Union[SSLContext, bool, Fingerprint, None] = None,
269 proxy_headers: Optional[LooseHeaders] = None,
270 traces: Optional[List["Trace"]] = None,
271 ):
273 if loop is None:
274 loop = asyncio.get_event_loop()
276 assert isinstance(url, URL), url
277 assert isinstance(proxy, (URL, type(None))), proxy
278 # FIXME: session is None in tests only, need to fix tests
279 # assert session is not None
280 self._session = cast("ClientSession", session)
281 if params:
282 q = MultiDict(url.query)
283 url2 = url.with_query(params)
284 q.extend(url2.query)
285 url = url.with_query(q)
286 self.original_url = url
287 self.url = url.with_fragment(None)
288 self.method = method.upper()
289 self.chunked = chunked
290 self.compress = compress
291 self.loop = loop
292 self.length = None
293 if response_class is None:
294 real_response_class = ClientResponse
295 else:
296 real_response_class = response_class
297 self.response_class: Type[ClientResponse] = real_response_class
298 self._timer = timer if timer is not None else TimerNoop()
299 self._ssl = ssl
301 if loop.get_debug():
302 self._source_traceback = traceback.extract_stack(sys._getframe(1))
304 self.update_version(version)
305 self.update_host(url)
306 self.update_headers(headers)
307 self.update_auto_headers(skip_auto_headers)
308 self.update_cookies(cookies)
309 self.update_content_encoding(data)
310 self.update_auth(auth)
311 self.update_proxy(proxy, proxy_auth, proxy_headers)
313 self.update_body_from_data(data)
314 if data is not None or self.method not in self.GET_METHODS:
315 self.update_transfer_encoding()
316 self.update_expect_continue(expect100)
317 if traces is None:
318 traces = []
319 self._traces = traces
321 def is_ssl(self) -> bool:
322 return self.url.scheme in ("https", "wss")
324 @property
325 def ssl(self) -> Union["SSLContext", None, bool, Fingerprint]:
326 return self._ssl
328 @property
329 def connection_key(self) -> ConnectionKey:
330 proxy_headers = self.proxy_headers
331 if proxy_headers:
332 h: Optional[int] = hash(tuple((k, v) for k, v in proxy_headers.items()))
333 else:
334 h = None
335 return ConnectionKey(
336 self.host,
337 self.port,
338 self.is_ssl(),
339 self.ssl,
340 self.proxy,
341 self.proxy_auth,
342 h,
343 )
345 @property
346 def host(self) -> str:
347 ret = self.url.raw_host
348 assert ret is not None
349 return ret
351 @property
352 def port(self) -> Optional[int]:
353 return self.url.port
355 @property
356 def request_info(self) -> RequestInfo:
357 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
358 return RequestInfo(self.url, self.method, headers, self.original_url)
360 def update_host(self, url: URL) -> None:
361 """Update destination host, port and connection type (ssl)."""
362 # get host/port
363 if not url.raw_host:
364 raise InvalidURL(url)
366 # basic auth info
367 username, password = url.user, url.password
368 if username:
369 self.auth = helpers.BasicAuth(username, password or "")
371 def update_version(self, version: Union[http.HttpVersion, str]) -> None:
372 """Convert request version to two elements tuple.
374 parser HTTP version '1.1' => (1, 1)
375 """
376 if isinstance(version, str):
377 v = [part.strip() for part in version.split(".", 1)]
378 try:
379 version = http.HttpVersion(int(v[0]), int(v[1]))
380 except ValueError:
381 raise ValueError(
382 f"Can not parse http version number: {version}"
383 ) from None
384 self.version = version
386 def update_headers(self, headers: Optional[LooseHeaders]) -> None:
387 """Update request headers."""
388 self.headers: CIMultiDict[str] = CIMultiDict()
390 # add host
391 netloc = cast(str, self.url.raw_host)
392 if helpers.is_ipv6_address(netloc):
393 netloc = f"[{netloc}]"
394 if self.url.port is not None and not self.url.is_default_port():
395 netloc += ":" + str(self.url.port)
396 self.headers[hdrs.HOST] = netloc
398 if headers:
399 if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
400 headers = headers.items() # type: ignore[assignment]
402 for key, value in headers: # type: ignore[misc]
403 # A special case for Host header
404 if key.lower() == "host":
405 self.headers[key] = value
406 else:
407 self.headers.add(key, value)
409 def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None:
410 self.skip_auto_headers = CIMultiDict(
411 (hdr, None) for hdr in sorted(skip_auto_headers)
412 )
413 used_headers = self.headers.copy()
414 used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type]
416 for hdr, val in self.DEFAULT_HEADERS.items():
417 if hdr not in used_headers:
418 self.headers.add(hdr, val)
420 if hdrs.USER_AGENT not in used_headers:
421 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
423 def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
424 """Update request cookies header."""
425 if not cookies:
426 return
428 c: SimpleCookie[str] = SimpleCookie()
429 if hdrs.COOKIE in self.headers:
430 c.load(self.headers.get(hdrs.COOKIE, ""))
431 del self.headers[hdrs.COOKIE]
433 if isinstance(cookies, Mapping):
434 iter_cookies = cookies.items()
435 else:
436 iter_cookies = cookies # type: ignore[assignment]
437 for name, value in iter_cookies:
438 if isinstance(value, Morsel):
439 # Preserve coded_value
440 mrsl_val = value.get(value.key, Morsel())
441 mrsl_val.set(value.key, value.value, value.coded_value)
442 c[name] = mrsl_val
443 else:
444 c[name] = value # type: ignore[assignment]
446 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
448 def update_content_encoding(self, data: Any) -> None:
449 """Set request content encoding."""
450 if data is None:
451 return
453 enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower()
454 if enc:
455 if self.compress:
456 raise ValueError(
457 "compress can not be set " "if Content-Encoding header is set"
458 )
459 elif self.compress:
460 if not isinstance(self.compress, str):
461 self.compress = "deflate"
462 self.headers[hdrs.CONTENT_ENCODING] = self.compress
463 self.chunked = True # enable chunked, no need to deal with length
465 def update_transfer_encoding(self) -> None:
466 """Analyze transfer-encoding header."""
467 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
469 if "chunked" in te:
470 if self.chunked:
471 raise ValueError(
472 "chunked can not be set "
473 'if "Transfer-Encoding: chunked" header is set'
474 )
476 elif self.chunked:
477 if hdrs.CONTENT_LENGTH in self.headers:
478 raise ValueError(
479 "chunked can not be set " "if Content-Length header is set"
480 )
482 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
483 else:
484 if hdrs.CONTENT_LENGTH not in self.headers:
485 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
487 def update_auth(self, auth: Optional[BasicAuth]) -> None:
488 """Set basic auth."""
489 if auth is None:
490 auth = self.auth
491 if auth is None:
492 return
494 if not isinstance(auth, helpers.BasicAuth):
495 raise TypeError("BasicAuth() tuple is required instead")
497 self.headers[hdrs.AUTHORIZATION] = auth.encode()
499 def update_body_from_data(self, body: Any) -> None:
500 if body is None:
501 return
503 # FormData
504 if isinstance(body, FormData):
505 body = body()
507 try:
508 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
509 except payload.LookupError:
510 body = FormData(body)()
512 self.body = body
514 # enable chunked encoding if needed
515 if not self.chunked:
516 if hdrs.CONTENT_LENGTH not in self.headers:
517 size = body.size
518 if size is None:
519 self.chunked = True
520 else:
521 if hdrs.CONTENT_LENGTH not in self.headers:
522 self.headers[hdrs.CONTENT_LENGTH] = str(size)
524 # copy payload headers
525 assert body.headers
526 for (key, value) in body.headers.items():
527 if key in self.headers:
528 continue
529 if key in self.skip_auto_headers:
530 continue
531 self.headers[key] = value
533 def update_expect_continue(self, expect: bool = False) -> None:
534 if expect:
535 self.headers[hdrs.EXPECT] = "100-continue"
536 elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue":
537 expect = True
539 if expect:
540 self._continue = self.loop.create_future()
542 def update_proxy(
543 self,
544 proxy: Optional[URL],
545 proxy_auth: Optional[BasicAuth],
546 proxy_headers: Optional[LooseHeaders],
547 ) -> None:
548 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
549 raise ValueError("proxy_auth must be None or BasicAuth() tuple")
550 self.proxy = proxy
551 self.proxy_auth = proxy_auth
552 self.proxy_headers = proxy_headers
554 def keep_alive(self) -> bool:
555 if self.version < HttpVersion10:
556 # keep alive not supported at all
557 return False
558 if self.version == HttpVersion10:
559 if self.headers.get(hdrs.CONNECTION) == "keep-alive":
560 return True
561 else: # no headers means we close for Http 1.0
562 return False
563 elif self.headers.get(hdrs.CONNECTION) == "close":
564 return False
566 return True
568 async def write_bytes(
569 self, writer: AbstractStreamWriter, conn: "Connection"
570 ) -> None:
571 """Support coroutines that yields bytes objects."""
572 # 100 response
573 if self._continue is not None:
574 await writer.drain()
575 await self._continue
577 protocol = conn.protocol
578 assert protocol is not None
579 try:
580 if isinstance(self.body, payload.Payload):
581 await self.body.write(writer)
582 else:
583 if isinstance(self.body, (bytes, bytearray)):
584 self.body = (self.body,) # type: ignore[assignment]
586 for chunk in self.body:
587 await writer.write(chunk) # type: ignore[arg-type]
589 await writer.write_eof()
590 except OSError as exc:
591 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
592 protocol.set_exception(exc)
593 else:
594 new_exc = ClientOSError(
595 exc.errno, "Can not write request body for %s" % self.url
596 )
597 new_exc.__context__ = exc
598 new_exc.__cause__ = exc
599 protocol.set_exception(new_exc)
600 except asyncio.CancelledError as exc:
601 if not conn.closed:
602 protocol.set_exception(exc)
603 except Exception as exc:
604 protocol.set_exception(exc)
605 finally:
606 self._writer = None
608 async def send(self, conn: "Connection") -> "ClientResponse":
609 # Specify request target:
610 # - CONNECT request must send authority form URI
611 # - not CONNECT proxy must send absolute form URI
612 # - most common is origin form URI
613 if self.method == hdrs.METH_CONNECT:
614 connect_host = self.url.raw_host
615 assert connect_host is not None
616 if helpers.is_ipv6_address(connect_host):
617 connect_host = f"[{connect_host}]"
618 path = f"{connect_host}:{self.url.port}"
619 elif self.proxy and not self.is_ssl():
620 path = str(self.url)
621 else:
622 path = self.url.raw_path
623 if self.url.raw_query_string:
624 path += "?" + self.url.raw_query_string
626 protocol = conn.protocol
627 assert protocol is not None
628 writer = StreamWriter(
629 protocol,
630 self.loop,
631 on_chunk_sent=functools.partial(
632 self._on_chunk_request_sent, self.method, self.url
633 ),
634 on_headers_sent=functools.partial(
635 self._on_headers_request_sent, self.method, self.url
636 ),
637 )
639 if self.compress:
640 writer.enable_compression(self.compress)
642 if self.chunked is not None:
643 writer.enable_chunking()
645 # set default content-type
646 if (
647 self.method in self.POST_METHODS
648 and hdrs.CONTENT_TYPE not in self.skip_auto_headers
649 and hdrs.CONTENT_TYPE not in self.headers
650 ):
651 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
653 # set the connection header
654 connection = self.headers.get(hdrs.CONNECTION)
655 if not connection:
656 if self.keep_alive():
657 if self.version == HttpVersion10:
658 connection = "keep-alive"
659 else:
660 if self.version == HttpVersion11:
661 connection = "close"
663 if connection is not None:
664 self.headers[hdrs.CONNECTION] = connection
666 # status + headers
667 status_line = "{0} {1} HTTP/{2[0]}.{2[1]}".format(
668 self.method, path, self.version
669 )
670 await writer.write_headers(status_line, self.headers)
672 self._writer = self.loop.create_task(self.write_bytes(writer, conn))
674 response_class = self.response_class
675 assert response_class is not None
676 self.response = response_class(
677 self.method,
678 self.original_url,
679 writer=self._writer,
680 continue100=self._continue,
681 timer=self._timer,
682 request_info=self.request_info,
683 traces=self._traces,
684 loop=self.loop,
685 session=self._session,
686 )
687 return self.response
689 async def close(self) -> None:
690 if self._writer is not None:
691 try:
692 await self._writer
693 finally:
694 self._writer = None
696 def terminate(self) -> None:
697 if self._writer is not None:
698 if not self.loop.is_closed():
699 self._writer.cancel()
700 self._writer = None
702 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
703 for trace in self._traces:
704 await trace.send_request_chunk_sent(method, url, chunk)
706 async def _on_headers_request_sent(
707 self, method: str, url: URL, headers: "CIMultiDict[str]"
708 ) -> None:
709 for trace in self._traces:
710 await trace.send_request_headers(method, url, headers)
713class ClientResponse(HeadersMixin):
715 # from the Status-Line of the response
716 version = None # HTTP-Version
717 status: int = None # type: ignore[assignment] # Status-Code
718 reason = None # Reason-Phrase
720 content: StreamReader = None # type: ignore[assignment] # Payload stream
721 _headers: "CIMultiDictProxy[str]" = None # type: ignore[assignment]
722 _raw_headers: RawHeaders = None # type: ignore[assignment] # Response raw headers
724 _connection = None # current connection
725 _source_traceback = None
726 # setted up by ClientRequest after ClientResponse object creation
727 # post-init stage allows to not change ctor signature
728 _closed = True # to allow __del__ for non-initialized properly response
729 _released = False
731 def __init__(
732 self,
733 method: str,
734 url: URL,
735 *,
736 writer: "asyncio.Task[None]",
737 continue100: Optional["asyncio.Future[bool]"],
738 timer: BaseTimerContext,
739 request_info: RequestInfo,
740 traces: List["Trace"],
741 loop: asyncio.AbstractEventLoop,
742 session: "ClientSession",
743 ) -> None:
744 assert isinstance(url, URL)
746 self.method = method
747 self.cookies: SimpleCookie[str] = SimpleCookie()
749 self._real_url = url
750 self._url = url.with_fragment(None)
751 self._body: Any = None
752 self._writer: Optional[asyncio.Task[None]] = writer
753 self._continue = continue100 # None by default
754 self._closed = True
755 self._history: Tuple[ClientResponse, ...] = ()
756 self._request_info = request_info
757 self._timer = timer if timer is not None else TimerNoop()
758 self._cache: Dict[str, Any] = {}
759 self._traces = traces
760 self._loop = loop
761 # store a reference to session #1985
762 self._session: Optional[ClientSession] = session
763 if loop.get_debug():
764 self._source_traceback = traceback.extract_stack(sys._getframe(1))
766 @reify
767 def url(self) -> URL:
768 return self._url
770 @reify
771 def url_obj(self) -> URL:
772 warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2)
773 return self._url
775 @reify
776 def real_url(self) -> URL:
777 return self._real_url
779 @reify
780 def host(self) -> str:
781 assert self._url.host is not None
782 return self._url.host
784 @reify
785 def headers(self) -> "CIMultiDictProxy[str]":
786 return self._headers
788 @reify
789 def raw_headers(self) -> RawHeaders:
790 return self._raw_headers
792 @reify
793 def request_info(self) -> RequestInfo:
794 return self._request_info
796 @reify
797 def content_disposition(self) -> Optional[ContentDisposition]:
798 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
799 if raw is None:
800 return None
801 disposition_type, params_dct = multipart.parse_content_disposition(raw)
802 params = MappingProxyType(params_dct)
803 filename = multipart.content_disposition_filename(params)
804 return ContentDisposition(disposition_type, params, filename)
806 def __del__(self, _warnings: Any = warnings) -> None:
807 if self._closed:
808 return
810 if self._connection is not None:
811 self._connection.release()
812 self._cleanup_writer()
814 if self._loop.get_debug():
815 if PY_36:
816 kwargs = {"source": self}
817 else:
818 kwargs = {}
819 _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs)
820 context = {"client_response": self, "message": "Unclosed response"}
821 if self._source_traceback:
822 context["source_traceback"] = self._source_traceback
823 self._loop.call_exception_handler(context)
825 def __repr__(self) -> str:
826 out = io.StringIO()
827 ascii_encodable_url = str(self.url)
828 if self.reason:
829 ascii_encodable_reason = self.reason.encode(
830 "ascii", "backslashreplace"
831 ).decode("ascii")
832 else:
833 ascii_encodable_reason = self.reason
834 print(
835 "<ClientResponse({}) [{} {}]>".format(
836 ascii_encodable_url, self.status, ascii_encodable_reason
837 ),
838 file=out,
839 )
840 print(self.headers, file=out)
841 return out.getvalue()
843 @property
844 def connection(self) -> Optional["Connection"]:
845 return self._connection
847 @reify
848 def history(self) -> Tuple["ClientResponse", ...]:
849 """A sequence of of responses, if redirects occurred."""
850 return self._history
852 @reify
853 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
854 links_str = ", ".join(self.headers.getall("link", []))
856 if not links_str:
857 return MultiDictProxy(MultiDict())
859 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict()
861 for val in re.split(r",(?=\s*<)", links_str):
862 match = re.match(r"\s*<(.*)>(.*)", val)
863 if match is None: # pragma: no cover
864 # the check exists to suppress mypy error
865 continue
866 url, params_str = match.groups()
867 params = params_str.split(";")[1:]
869 link: MultiDict[Union[str, URL]] = MultiDict()
871 for param in params:
872 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
873 if match is None: # pragma: no cover
874 # the check exists to suppress mypy error
875 continue
876 key, _, value, _ = match.groups()
878 link.add(key, value)
880 key = link.get("rel", url) # type: ignore[assignment]
882 link.add("url", self.url.join(URL(url)))
884 links.add(key, MultiDictProxy(link))
886 return MultiDictProxy(links)
888 async def start(self, connection: "Connection") -> "ClientResponse":
889 """Start response processing."""
890 self._closed = False
891 self._protocol = connection.protocol
892 self._connection = connection
894 with self._timer:
895 while True:
896 # read response
897 try:
898 protocol = self._protocol
899 message, payload = await protocol.read() # type: ignore[union-attr]
900 except http.HttpProcessingError as exc:
901 raise ClientResponseError(
902 self.request_info,
903 self.history,
904 status=exc.code,
905 message=exc.message,
906 headers=exc.headers,
907 ) from exc
909 if message.code < 100 or message.code > 199 or message.code == 101:
910 break
912 if self._continue is not None:
913 set_result(self._continue, True)
914 self._continue = None
916 # payload eof handler
917 payload.on_eof(self._response_eof)
919 # response status
920 self.version = message.version
921 self.status = message.code
922 self.reason = message.reason
924 # headers
925 self._headers = message.headers # type is CIMultiDictProxy
926 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
928 # payload
929 self.content = payload
931 # cookies
932 for hdr in self.headers.getall(hdrs.SET_COOKIE, ()):
933 try:
934 self.cookies.load(hdr)
935 except CookieError as exc:
936 client_logger.warning("Can not load response cookies: %s", exc)
937 return self
939 def _response_eof(self) -> None:
940 if self._closed:
941 return
943 if self._connection is not None:
944 # websocket, protocol could be None because
945 # connection could be detached
946 if (
947 self._connection.protocol is not None
948 and self._connection.protocol.upgraded
949 ):
950 return
952 self._connection.release()
953 self._connection = None
955 self._closed = True
956 self._cleanup_writer()
958 @property
959 def closed(self) -> bool:
960 return self._closed
962 def close(self) -> None:
963 if not self._released:
964 self._notify_content()
965 if self._closed:
966 return
968 self._closed = True
969 if self._loop is None or self._loop.is_closed():
970 return
972 if self._connection is not None:
973 self._connection.close()
974 self._connection = None
975 self._cleanup_writer()
977 def release(self) -> Any:
978 if not self._released:
979 self._notify_content()
980 if self._closed:
981 return noop()
983 self._closed = True
984 if self._connection is not None:
985 self._connection.release()
986 self._connection = None
988 self._cleanup_writer()
989 return noop()
991 @property
992 def ok(self) -> bool:
993 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
995 This is **not** a check for ``200 OK`` but a check that the response
996 status is under 400.
997 """
998 return 400 > self.status
1000 def raise_for_status(self) -> None:
1001 if not self.ok:
1002 # reason should always be not None for a started response
1003 assert self.reason is not None
1004 self.release()
1005 raise ClientResponseError(
1006 self.request_info,
1007 self.history,
1008 status=self.status,
1009 message=self.reason,
1010 headers=self.headers,
1011 )
1013 def _cleanup_writer(self) -> None:
1014 if self._writer is not None:
1015 self._writer.cancel()
1016 self._writer = None
1017 self._session = None
1019 def _notify_content(self) -> None:
1020 content = self.content
1021 if content and content.exception() is None:
1022 content.set_exception(ClientConnectionError("Connection closed"))
1023 self._released = True
1025 async def wait_for_close(self) -> None:
1026 if self._writer is not None:
1027 try:
1028 await self._writer
1029 finally:
1030 self._writer = None
1031 self.release()
1033 async def read(self) -> bytes:
1034 """Read response payload."""
1035 if self._body is None:
1036 try:
1037 self._body = await self.content.read()
1038 for trace in self._traces:
1039 await trace.send_response_chunk_received(
1040 self.method, self.url, self._body
1041 )
1042 except BaseException:
1043 self.close()
1044 raise
1045 elif self._released:
1046 raise ClientConnectionError("Connection closed")
1048 return self._body # type: ignore[no-any-return]
1050 def get_encoding(self) -> str:
1051 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1052 mimetype = helpers.parse_mimetype(ctype)
1054 encoding = mimetype.parameters.get("charset")
1055 if encoding:
1056 try:
1057 codecs.lookup(encoding)
1058 except LookupError:
1059 encoding = None
1060 if not encoding:
1061 if mimetype.type == "application" and (
1062 mimetype.subtype == "json" or mimetype.subtype == "rdap"
1063 ):
1064 # RFC 7159 states that the default encoding is UTF-8.
1065 # RFC 7483 defines application/rdap+json
1066 encoding = "utf-8"
1067 elif self._body is None:
1068 raise RuntimeError(
1069 "Cannot guess the encoding of " "a not yet read body"
1070 )
1071 else:
1072 encoding = chardet.detect(self._body)["encoding"]
1073 if not encoding:
1074 encoding = "utf-8"
1076 return encoding
1078 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
1079 """Read response payload and decode."""
1080 if self._body is None:
1081 await self.read()
1083 if encoding is None:
1084 encoding = self.get_encoding()
1086 return self._body.decode( # type: ignore[no-any-return,union-attr]
1087 encoding, errors=errors
1088 )
1090 async def json(
1091 self,
1092 *,
1093 encoding: Optional[str] = None,
1094 loads: JSONDecoder = DEFAULT_JSON_DECODER,
1095 content_type: Optional[str] = "application/json",
1096 ) -> Any:
1097 """Read and decodes JSON response."""
1098 if self._body is None:
1099 await self.read()
1101 if content_type:
1102 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1103 if not _is_expected_content_type(ctype, content_type):
1104 raise ContentTypeError(
1105 self.request_info,
1106 self.history,
1107 message=(
1108 "Attempt to decode JSON with " "unexpected mimetype: %s" % ctype
1109 ),
1110 headers=self.headers,
1111 )
1113 stripped = self._body.strip() # type: ignore[union-attr]
1114 if not stripped:
1115 return None
1117 if encoding is None:
1118 encoding = self.get_encoding()
1120 return loads(stripped.decode(encoding))
1122 async def __aenter__(self) -> "ClientResponse":
1123 return self
1125 async def __aexit__(
1126 self,
1127 exc_type: Optional[Type[BaseException]],
1128 exc_val: Optional[BaseException],
1129 exc_tb: Optional[TracebackType],
1130 ) -> None:
1131 # similar to _RequestContextManager, we do not need to check
1132 # for exceptions, response object can close connection
1133 # if state is broken
1134 self.release()