Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_reqrep.py: 26%
651 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
1import asyncio
2import codecs
3import contextlib
4import dataclasses
5import functools
6import io
7import re
8import sys
9import traceback
10import warnings
11from hashlib import md5, sha1, sha256
12from http.cookies import CookieError, Morsel, SimpleCookie
13from types import MappingProxyType, TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Callable,
18 Dict,
19 Iterable,
20 List,
21 Mapping,
22 Optional,
23 Tuple,
24 Type,
25 Union,
26 cast,
27)
29from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
30from yarl import URL
32from . import hdrs, helpers, http, multipart, payload
33from .abc import AbstractStreamWriter
34from .client_exceptions import (
35 ClientConnectionError,
36 ClientOSError,
37 ClientResponseError,
38 ContentTypeError,
39 InvalidURL,
40 ServerFingerprintMismatch,
41)
42from .compression_utils import HAS_BROTLI
43from .formdata import FormData
44from .hdrs import CONTENT_TYPE
45from .helpers import (
46 BaseTimerContext,
47 BasicAuth,
48 HeadersMixin,
49 TimerNoop,
50 basicauth_from_netrc,
51 is_expected_content_type,
52 netrc_from_env,
53 noop,
54 parse_mimetype,
55 reify,
56 set_result,
57)
58from .http import (
59 SERVER_SOFTWARE,
60 HttpVersion,
61 HttpVersion10,
62 HttpVersion11,
63 StreamWriter,
64)
65from .log import client_logger
66from .streams import StreamReader
67from .typedefs import (
68 DEFAULT_JSON_DECODER,
69 JSONDecoder,
70 LooseCookies,
71 LooseHeaders,
72 RawHeaders,
73)
75try:
76 import ssl
77 from ssl import SSLContext
78except ImportError: # pragma: no cover
79 ssl = None # type: ignore[assignment]
80 SSLContext = object # type: ignore[misc,assignment]
83__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
86if TYPE_CHECKING:
87 from .client import ClientSession
88 from .connector import Connection
89 from .tracing import Trace
92_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
95def _gen_default_accept_encoding() -> str:
96 return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate"
99@dataclasses.dataclass(frozen=True)
100class ContentDisposition:
101 type: Optional[str]
102 parameters: "MappingProxyType[str, str]"
103 filename: Optional[str]
106@dataclasses.dataclass(frozen=True)
107class RequestInfo:
108 url: URL
109 method: str
110 headers: "CIMultiDictProxy[str]"
111 real_url: URL
114class Fingerprint:
115 HASHFUNC_BY_DIGESTLEN = {
116 16: md5,
117 20: sha1,
118 32: sha256,
119 }
121 def __init__(self, fingerprint: bytes) -> None:
122 digestlen = len(fingerprint)
123 hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
124 if not hashfunc:
125 raise ValueError("fingerprint has invalid length")
126 elif hashfunc is md5 or hashfunc is sha1:
127 raise ValueError(
128 "md5 and sha1 are insecure and " "not supported. Use sha256."
129 )
130 self._hashfunc = hashfunc
131 self._fingerprint = fingerprint
133 @property
134 def fingerprint(self) -> bytes:
135 return self._fingerprint
137 def check(self, transport: asyncio.Transport) -> None:
138 if not transport.get_extra_info("sslcontext"):
139 return
140 sslobj = transport.get_extra_info("ssl_object")
141 cert = sslobj.getpeercert(binary_form=True)
142 got = self._hashfunc(cert).digest()
143 if got != self._fingerprint:
144 host, port, *_ = transport.get_extra_info("peername")
145 raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
148if ssl is not None:
149 SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint)
150else: # pragma: no cover
151 SSL_ALLOWED_TYPES = (bool,)
154@dataclasses.dataclass(frozen=True)
155class ConnectionKey:
156 # the key should contain an information about used proxy / TLS
157 # to prevent reusing wrong connections from a pool
158 host: str
159 port: Optional[int]
160 is_ssl: bool
161 ssl: Union[SSLContext, bool, Fingerprint]
162 proxy: Optional[URL]
163 proxy_auth: Optional[BasicAuth]
164 proxy_headers_hash: Optional[int] # hash(CIMultiDict)
167class ClientRequest:
168 GET_METHODS = {
169 hdrs.METH_GET,
170 hdrs.METH_HEAD,
171 hdrs.METH_OPTIONS,
172 hdrs.METH_TRACE,
173 }
174 POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
175 ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
177 DEFAULT_HEADERS = {
178 hdrs.ACCEPT: "*/*",
179 hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
180 }
182 body = b""
183 auth = None
184 response = None
186 __writer = None # async task for streaming data
187 _continue = None # waiter future for '100 Continue' response
189 # N.B.
190 # Adding __del__ method with self._writer closing doesn't make sense
191 # because _writer is instance method, thus it keeps a reference to self.
192 # Until writer has finished finalizer will not be called.
194 def __init__(
195 self,
196 method: str,
197 url: URL,
198 *,
199 params: Optional[Mapping[str, str]] = None,
200 headers: Optional[LooseHeaders] = None,
201 skip_auto_headers: Iterable[str] = frozenset(),
202 data: Any = None,
203 cookies: Optional[LooseCookies] = None,
204 auth: Optional[BasicAuth] = None,
205 version: http.HttpVersion = http.HttpVersion11,
206 compress: Optional[str] = None,
207 chunked: Optional[bool] = None,
208 expect100: bool = False,
209 loop: asyncio.AbstractEventLoop,
210 response_class: Optional[Type["ClientResponse"]] = None,
211 proxy: Optional[URL] = None,
212 proxy_auth: Optional[BasicAuth] = None,
213 timer: Optional[BaseTimerContext] = None,
214 session: Optional["ClientSession"] = None,
215 ssl: Union[SSLContext, bool, Fingerprint] = True,
216 proxy_headers: Optional[LooseHeaders] = None,
217 traces: Optional[List["Trace"]] = None,
218 trust_env: bool = False,
219 server_hostname: Optional[str] = None,
220 ):
221 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
222 if match:
223 raise ValueError(
224 f"Method cannot contain non-token characters {method!r} "
225 f"(found at least {match.group()!r})"
226 )
227 assert isinstance(url, URL), url
228 assert isinstance(proxy, (URL, type(None))), proxy
229 # FIXME: session is None in tests only, need to fix tests
230 # assert session is not None
231 self._session = cast("ClientSession", session)
232 if params:
233 q = MultiDict(url.query)
234 url2 = url.with_query(params)
235 q.extend(url2.query)
236 url = url.with_query(q)
237 self.original_url = url
238 self.url = url.with_fragment(None)
239 self.method = method.upper()
240 self.chunked = chunked
241 self.compress = compress
242 self.loop = loop
243 self.length = None
244 if response_class is None:
245 real_response_class = ClientResponse
246 else:
247 real_response_class = response_class
248 self.response_class: Type[ClientResponse] = real_response_class
249 self._timer = timer if timer is not None else TimerNoop()
250 self._ssl = ssl
251 self.server_hostname = server_hostname
253 if loop.get_debug():
254 self._source_traceback = traceback.extract_stack(sys._getframe(1))
256 self.update_version(version)
257 self.update_host(url)
258 self.update_headers(headers)
259 self.update_auto_headers(skip_auto_headers)
260 self.update_cookies(cookies)
261 self.update_content_encoding(data)
262 self.update_auth(auth, trust_env)
263 self.update_proxy(proxy, proxy_auth, proxy_headers)
265 self.update_body_from_data(data)
266 if data is not None or self.method not in self.GET_METHODS:
267 self.update_transfer_encoding()
268 self.update_expect_continue(expect100)
269 if traces is None:
270 traces = []
271 self._traces = traces
273 def __reset_writer(self, _: object = None) -> None:
274 self.__writer = None
276 @property
277 def _writer(self) -> Optional["asyncio.Task[None]"]:
278 return self.__writer
280 @_writer.setter
281 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
282 if self.__writer is not None:
283 self.__writer.remove_done_callback(self.__reset_writer)
284 self.__writer = writer
285 if writer is not None:
286 writer.add_done_callback(self.__reset_writer)
288 def is_ssl(self) -> bool:
289 return self.url.scheme in ("https", "wss")
291 @property
292 def ssl(self) -> Union["SSLContext", bool, Fingerprint]:
293 return self._ssl
295 @property
296 def connection_key(self) -> ConnectionKey:
297 proxy_headers = self.proxy_headers
298 if proxy_headers:
299 h: Optional[int] = hash(tuple((k, v) for k, v in proxy_headers.items()))
300 else:
301 h = None
302 return ConnectionKey(
303 self.host,
304 self.port,
305 self.is_ssl(),
306 self.ssl,
307 self.proxy,
308 self.proxy_auth,
309 h,
310 )
312 @property
313 def host(self) -> str:
314 ret = self.url.raw_host
315 assert ret is not None
316 return ret
318 @property
319 def port(self) -> Optional[int]:
320 return self.url.port
322 @property
323 def request_info(self) -> RequestInfo:
324 headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
325 return RequestInfo(self.url, self.method, headers, self.original_url)
327 def update_host(self, url: URL) -> None:
328 """Update destination host, port and connection type (ssl)."""
329 # get host/port
330 if not url.raw_host:
331 raise InvalidURL(url)
333 # basic auth info
334 username, password = url.user, url.password
335 if username:
336 self.auth = helpers.BasicAuth(username, password or "")
338 def update_version(self, version: Union[http.HttpVersion, str]) -> None:
339 """Convert request version to two elements tuple.
341 parser HTTP version '1.1' => (1, 1)
342 """
343 if isinstance(version, str):
344 v = [part.strip() for part in version.split(".", 1)]
345 try:
346 version = http.HttpVersion(int(v[0]), int(v[1]))
347 except ValueError:
348 raise ValueError(
349 f"Can not parse http version number: {version}"
350 ) from None
351 self.version = version
353 def update_headers(self, headers: Optional[LooseHeaders]) -> None:
354 """Update request headers."""
355 self.headers: CIMultiDict[str] = CIMultiDict()
357 # add host
358 netloc = cast(str, self.url.raw_host)
359 if helpers.is_ipv6_address(netloc):
360 netloc = f"[{netloc}]"
361 # See https://github.com/aio-libs/aiohttp/issues/3636.
362 netloc = netloc.rstrip(".")
363 if self.url.port is not None and not self.url.is_default_port():
364 netloc += ":" + str(self.url.port)
365 self.headers[hdrs.HOST] = netloc
367 if headers:
368 if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
369 headers = headers.items() # type: ignore[assignment]
371 for key, value in headers: # type: ignore[misc]
372 # A special case for Host header
373 if key.lower() == "host":
374 self.headers[key] = value
375 else:
376 self.headers.add(key, value)
378 def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None:
379 self.skip_auto_headers = CIMultiDict(
380 (hdr, None) for hdr in sorted(skip_auto_headers)
381 )
382 used_headers = self.headers.copy()
383 used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type]
385 for hdr, val in self.DEFAULT_HEADERS.items():
386 if hdr not in used_headers:
387 self.headers.add(hdr, val)
389 if hdrs.USER_AGENT not in used_headers:
390 self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
392 def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
393 """Update request cookies header."""
394 if not cookies:
395 return
397 c = SimpleCookie()
398 if hdrs.COOKIE in self.headers:
399 c.load(self.headers.get(hdrs.COOKIE, ""))
400 del self.headers[hdrs.COOKIE]
402 if isinstance(cookies, Mapping):
403 iter_cookies = cookies.items()
404 else:
405 iter_cookies = cookies # type: ignore[assignment]
406 for name, value in iter_cookies:
407 if isinstance(value, Morsel):
408 # Preserve coded_value
409 mrsl_val = value.get(value.key, Morsel())
410 mrsl_val.set(value.key, value.value, value.coded_value)
411 c[name] = mrsl_val
412 else:
413 c[name] = value # type: ignore[assignment]
415 self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
417 def update_content_encoding(self, data: Any) -> None:
418 """Set request content encoding."""
419 if data is None:
420 return
422 enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower()
423 if enc:
424 if self.compress:
425 raise ValueError(
426 "compress can not be set " "if Content-Encoding header is set"
427 )
428 elif self.compress:
429 if not isinstance(self.compress, str):
430 self.compress = "deflate"
431 self.headers[hdrs.CONTENT_ENCODING] = self.compress
432 self.chunked = True # enable chunked, no need to deal with length
434 def update_transfer_encoding(self) -> None:
435 """Analyze transfer-encoding header."""
436 te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
438 if "chunked" in te:
439 if self.chunked:
440 raise ValueError(
441 "chunked can not be set "
442 'if "Transfer-Encoding: chunked" header is set'
443 )
445 elif self.chunked:
446 if hdrs.CONTENT_LENGTH in self.headers:
447 raise ValueError(
448 "chunked can not be set " "if Content-Length header is set"
449 )
451 self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
452 else:
453 if hdrs.CONTENT_LENGTH not in self.headers:
454 self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
456 def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
457 """Set basic auth."""
458 if auth is None:
459 auth = self.auth
460 if auth is None and trust_env and self.url.host is not None:
461 netrc_obj = netrc_from_env()
462 with contextlib.suppress(LookupError):
463 auth = basicauth_from_netrc(netrc_obj, self.url.host)
464 if auth is None:
465 return
467 if not isinstance(auth, helpers.BasicAuth):
468 raise TypeError("BasicAuth() tuple is required instead")
470 self.headers[hdrs.AUTHORIZATION] = auth.encode()
472 def update_body_from_data(self, body: Any) -> None:
473 if body is None:
474 return
476 # FormData
477 if isinstance(body, FormData):
478 body = body()
480 try:
481 body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
482 except payload.LookupError:
483 boundary = None
484 if CONTENT_TYPE in self.headers:
485 boundary = parse_mimetype(self.headers[CONTENT_TYPE]).parameters.get(
486 "boundary"
487 )
488 body = FormData(body, boundary=boundary)()
490 self.body = body
492 # enable chunked encoding if needed
493 if not self.chunked:
494 if hdrs.CONTENT_LENGTH not in self.headers:
495 size = body.size
496 if size is None:
497 self.chunked = True
498 else:
499 if hdrs.CONTENT_LENGTH not in self.headers:
500 self.headers[hdrs.CONTENT_LENGTH] = str(size)
502 # copy payload headers
503 assert body.headers
504 for key, value in body.headers.items():
505 if key in self.headers:
506 continue
507 if key in self.skip_auto_headers:
508 continue
509 self.headers[key] = value
511 def update_expect_continue(self, expect: bool = False) -> None:
512 if expect:
513 self.headers[hdrs.EXPECT] = "100-continue"
514 elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue":
515 expect = True
517 if expect:
518 self._continue = self.loop.create_future()
520 def update_proxy(
521 self,
522 proxy: Optional[URL],
523 proxy_auth: Optional[BasicAuth],
524 proxy_headers: Optional[LooseHeaders],
525 ) -> None:
526 if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
527 raise ValueError("proxy_auth must be None or BasicAuth() tuple")
528 self.proxy = proxy
529 self.proxy_auth = proxy_auth
530 self.proxy_headers = proxy_headers
532 def keep_alive(self) -> bool:
533 if self.version < HttpVersion10:
534 # keep alive not supported at all
535 return False
536 if self.version == HttpVersion10:
537 if self.headers.get(hdrs.CONNECTION) == "keep-alive":
538 return True
539 else: # no headers means we close for Http 1.0
540 return False
541 elif self.headers.get(hdrs.CONNECTION) == "close":
542 return False
544 return True
546 async def write_bytes(
547 self, writer: AbstractStreamWriter, conn: "Connection"
548 ) -> None:
549 """Support coroutines that yields bytes objects."""
550 # 100 response
551 if self._continue is not None:
552 try:
553 await writer.drain()
554 await self._continue
555 except asyncio.CancelledError:
556 return
558 protocol = conn.protocol
559 assert protocol is not None
560 try:
561 if isinstance(self.body, payload.Payload):
562 await self.body.write(writer)
563 else:
564 if isinstance(self.body, (bytes, bytearray)):
565 self.body = (self.body,) # type: ignore[assignment]
567 for chunk in self.body:
568 await writer.write(chunk) # type: ignore[arg-type]
569 except OSError as exc:
570 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
571 protocol.set_exception(exc)
572 else:
573 new_exc = ClientOSError(
574 exc.errno, "Can not write request body for %s" % self.url
575 )
576 new_exc.__context__ = exc
577 new_exc.__cause__ = exc
578 protocol.set_exception(new_exc)
579 except asyncio.CancelledError:
580 await writer.write_eof()
581 except Exception as exc:
582 protocol.set_exception(exc)
583 else:
584 await writer.write_eof()
585 protocol.start_timeout()
587 async def send(self, conn: "Connection") -> "ClientResponse":
588 # Specify request target:
589 # - CONNECT request must send authority form URI
590 # - not CONNECT proxy must send absolute form URI
591 # - most common is origin form URI
592 if self.method == hdrs.METH_CONNECT:
593 connect_host = self.url.raw_host
594 assert connect_host is not None
595 if helpers.is_ipv6_address(connect_host):
596 connect_host = f"[{connect_host}]"
597 path = f"{connect_host}:{self.url.port}"
598 elif self.proxy and not self.is_ssl():
599 path = str(self.url)
600 else:
601 path = self.url.raw_path
602 if self.url.raw_query_string:
603 path += "?" + self.url.raw_query_string
605 protocol = conn.protocol
606 assert protocol is not None
607 writer = StreamWriter(
608 protocol,
609 self.loop,
610 on_chunk_sent=functools.partial(
611 self._on_chunk_request_sent, self.method, self.url
612 ),
613 on_headers_sent=functools.partial(
614 self._on_headers_request_sent, self.method, self.url
615 ),
616 )
618 if self.compress:
619 writer.enable_compression(self.compress)
621 if self.chunked is not None:
622 writer.enable_chunking()
624 # set default content-type
625 if (
626 self.method in self.POST_METHODS
627 and hdrs.CONTENT_TYPE not in self.skip_auto_headers
628 and hdrs.CONTENT_TYPE not in self.headers
629 ):
630 self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
632 # set the connection header
633 connection = self.headers.get(hdrs.CONNECTION)
634 if not connection:
635 if self.keep_alive():
636 if self.version == HttpVersion10:
637 connection = "keep-alive"
638 else:
639 if self.version == HttpVersion11:
640 connection = "close"
642 if connection is not None:
643 self.headers[hdrs.CONNECTION] = connection
645 # status + headers
646 status_line = "{0} {1} HTTP/{v.major}.{v.minor}".format(
647 self.method, path, v=self.version
648 )
649 await writer.write_headers(status_line, self.headers)
651 self._writer = self.loop.create_task(self.write_bytes(writer, conn))
653 response_class = self.response_class
654 assert response_class is not None
655 self.response = response_class(
656 self.method,
657 self.original_url,
658 writer=self._writer,
659 continue100=self._continue,
660 timer=self._timer,
661 request_info=self.request_info,
662 traces=self._traces,
663 loop=self.loop,
664 session=self._session,
665 )
666 return self.response
668 async def close(self) -> None:
669 if self._writer is not None:
670 with contextlib.suppress(asyncio.CancelledError):
671 await self._writer
673 def terminate(self) -> None:
674 if self._writer is not None:
675 if not self.loop.is_closed():
676 self._writer.cancel()
677 self._writer.remove_done_callback(self.__reset_writer)
678 self._writer = None
680 async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
681 for trace in self._traces:
682 await trace.send_request_chunk_sent(method, url, chunk)
684 async def _on_headers_request_sent(
685 self, method: str, url: URL, headers: "CIMultiDict[str]"
686 ) -> None:
687 for trace in self._traces:
688 await trace.send_request_headers(method, url, headers)
691class ClientResponse(HeadersMixin):
692 # Some of these attributes are None when created,
693 # but will be set by the start() method.
694 # As the end user will likely never see the None values, we cheat the types below.
695 # from the Status-Line of the response
696 version: Optional[HttpVersion] = None # HTTP-Version
697 status: int = None # type: ignore[assignment] # Status-Code
698 reason: Optional[str] = None # Reason-Phrase
700 content: StreamReader = None # type: ignore[assignment] # Payload stream
701 _headers: CIMultiDictProxy[str] = None # type: ignore[assignment]
702 _raw_headers: RawHeaders = None # type: ignore[assignment]
704 _connection = None # current connection
705 _source_traceback: Optional[traceback.StackSummary] = None
706 # set up by ClientRequest after ClientResponse object creation
707 # post-init stage allows to not change ctor signature
708 _closed = True # to allow __del__ for non-initialized properly response
709 _released = False
710 __writer = None
712 def __init__(
713 self,
714 method: str,
715 url: URL,
716 *,
717 writer: "asyncio.Task[None]",
718 continue100: Optional["asyncio.Future[bool]"],
719 timer: Optional[BaseTimerContext],
720 request_info: RequestInfo,
721 traces: List["Trace"],
722 loop: asyncio.AbstractEventLoop,
723 session: "ClientSession",
724 ) -> None:
725 assert isinstance(url, URL)
726 super().__init__()
728 self.method = method
729 self.cookies = SimpleCookie()
731 self._real_url = url
732 self._url = url.with_fragment(None)
733 self._body: Optional[bytes] = None
734 self._writer: Optional[asyncio.Task[None]] = writer
735 self._continue = continue100 # None by default
736 self._closed = True
737 self._history: Tuple[ClientResponse, ...] = ()
738 self._request_info = request_info
739 self._timer = timer if timer is not None else TimerNoop()
740 self._cache: Dict[str, Any] = {}
741 self._traces = traces
742 self._loop = loop
743 # store a reference to session #1985
744 self._session: Optional[ClientSession] = session
745 # Save reference to _resolve_charset, so that get_encoding() will still
746 # work after the response has finished reading the body.
747 if session is None:
748 # TODO: Fix session=None in tests (see ClientRequest.__init__).
749 self._resolve_charset: Callable[
750 ["ClientResponse", bytes], str
751 ] = lambda *_: "utf-8"
752 else:
753 self._resolve_charset = session._resolve_charset
754 if loop.get_debug():
755 self._source_traceback = traceback.extract_stack(sys._getframe(1))
757 def __reset_writer(self, _: object = None) -> None:
758 self.__writer = None
760 @property
761 def _writer(self) -> Optional["asyncio.Task[None]"]:
762 return self.__writer
764 @_writer.setter
765 def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
766 if self.__writer is not None:
767 self.__writer.remove_done_callback(self.__reset_writer)
768 self.__writer = writer
769 if writer is not None:
770 writer.add_done_callback(self.__reset_writer)
772 @reify
773 def url(self) -> URL:
774 return self._url
776 @reify
777 def real_url(self) -> URL:
778 return self._real_url
780 @reify
781 def host(self) -> str:
782 assert self._url.host is not None
783 return self._url.host
785 @reify
786 def headers(self) -> "CIMultiDictProxy[str]":
787 return self._headers
789 @reify
790 def raw_headers(self) -> RawHeaders:
791 return self._raw_headers
793 @reify
794 def request_info(self) -> RequestInfo:
795 return self._request_info
797 @reify
798 def content_disposition(self) -> Optional[ContentDisposition]:
799 raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
800 if raw is None:
801 return None
802 disposition_type, params_dct = multipart.parse_content_disposition(raw)
803 params = MappingProxyType(params_dct)
804 filename = multipart.content_disposition_filename(params)
805 return ContentDisposition(disposition_type, params, filename)
807 def __del__(self, _warnings: Any = warnings) -> None:
808 if self._closed:
809 return
811 if self._connection is not None:
812 self._connection.release()
813 self._cleanup_writer()
815 if self._loop.get_debug():
816 _warnings.warn(
817 f"Unclosed response {self!r}", ResourceWarning, source=self
818 )
819 context = {"client_response": self, "message": "Unclosed response"}
820 if self._source_traceback:
821 context["source_traceback"] = self._source_traceback
822 self._loop.call_exception_handler(context)
824 def __repr__(self) -> str:
825 out = io.StringIO()
826 ascii_encodable_url = str(self.url)
827 if self.reason:
828 ascii_encodable_reason = self.reason.encode(
829 "ascii", "backslashreplace"
830 ).decode("ascii")
831 else:
832 ascii_encodable_reason = "None"
833 print(
834 "<ClientResponse({}) [{} {}]>".format(
835 ascii_encodable_url, self.status, ascii_encodable_reason
836 ),
837 file=out,
838 )
839 print(self.headers, file=out)
840 return out.getvalue()
842 @property
843 def connection(self) -> Optional["Connection"]:
844 return self._connection
846 @reify
847 def history(self) -> Tuple["ClientResponse", ...]:
848 """A sequence of responses, if redirects occurred."""
849 return self._history
851 @reify
852 def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
853 links_str = ", ".join(self.headers.getall("link", []))
855 if not links_str:
856 return MultiDictProxy(MultiDict())
858 links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict()
860 for val in re.split(r",(?=\s*<)", links_str):
861 match = re.match(r"\s*<(.*)>(.*)", val)
862 if match is None: # pragma: no cover
863 # the check exists to suppress mypy error
864 continue
865 url, params_str = match.groups()
866 params = params_str.split(";")[1:]
868 link: MultiDict[Union[str, URL]] = MultiDict()
870 for param in params:
871 match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
872 if match is None: # pragma: no cover
873 # the check exists to suppress mypy error
874 continue
875 key, _, value, _ = match.groups()
877 link.add(key, value)
879 key = link.get("rel", url)
881 link.add("url", self.url.join(URL(url)))
883 links.add(str(key), MultiDictProxy(link))
885 return MultiDictProxy(links)
887 async def start(self, connection: "Connection") -> "ClientResponse":
888 """Start response processing."""
889 self._closed = False
890 self._protocol = connection.protocol
891 self._connection = connection
893 with self._timer:
894 while True:
895 # read response
896 try:
897 protocol = self._protocol
898 message, payload = await protocol.read() # type: ignore[union-attr]
899 except http.HttpProcessingError as exc:
900 raise ClientResponseError(
901 self.request_info,
902 self.history,
903 status=exc.code,
904 message=exc.message,
905 headers=exc.headers,
906 ) from exc
908 if message.code < 100 or message.code > 199 or message.code == 101:
909 break
911 if self._continue is not None:
912 set_result(self._continue, True)
913 self._continue = None
915 # payload eof handler
916 payload.on_eof(self._response_eof)
918 # response status
919 self.version = message.version
920 self.status = message.code
921 self.reason = message.reason
923 # headers
924 self._headers = message.headers # type is CIMultiDictProxy
925 self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
927 # payload
928 self.content = payload
930 # cookies
931 for hdr in self.headers.getall(hdrs.SET_COOKIE, ()):
932 try:
933 self.cookies.load(hdr)
934 except CookieError as exc:
935 client_logger.warning("Can not load response cookies: %s", exc)
936 return self
938 def _response_eof(self) -> None:
939 if self._closed:
940 return
942 # protocol could be None because connection could be detached
943 protocol = self._connection and self._connection.protocol
944 if protocol is not None and protocol.upgraded:
945 return
947 self._closed = True
948 self._cleanup_writer()
949 self._release_connection()
951 @property
952 def closed(self) -> bool:
953 return self._closed
955 def close(self) -> None:
956 if not self._released:
957 self._notify_content()
959 self._closed = True
960 if self._loop.is_closed():
961 return
963 self._cleanup_writer()
964 if self._connection is not None:
965 self._connection.close()
966 self._connection = None
968 def release(self) -> Any:
969 if not self._released:
970 self._notify_content()
972 self._closed = True
974 self._cleanup_writer()
975 self._release_connection()
976 return noop()
978 @property
979 def ok(self) -> bool:
980 """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
982 This is **not** a check for ``200 OK`` but a check that the response
983 status is under 400.
984 """
985 return 400 > self.status
987 def raise_for_status(self) -> None:
988 if not self.ok:
989 # reason should always be not None for a started response
990 assert self.reason is not None
991 self.release()
992 raise ClientResponseError(
993 self.request_info,
994 self.history,
995 status=self.status,
996 message=self.reason,
997 headers=self.headers,
998 )
1000 def _release_connection(self) -> None:
1001 if self._connection is not None:
1002 if self._writer is None:
1003 self._connection.release()
1004 self._connection = None
1005 else:
1006 self._writer.add_done_callback(lambda f: self._release_connection())
1008 async def _wait_released(self) -> None:
1009 if self._writer is not None:
1010 await self._writer
1011 self._release_connection()
1013 def _cleanup_writer(self) -> None:
1014 if self._writer is not None:
1015 self._writer.cancel()
1016 self._session = None
1018 def _notify_content(self) -> None:
1019 content = self.content
1020 # content can be None here, but the types are cheated elsewhere.
1021 if content and content.exception() is None: # type: ignore[truthy-bool]
1022 content.set_exception(ClientConnectionError("Connection closed"))
1023 self._released = True
1025 async def wait_for_close(self) -> None:
1026 if self._writer is not None:
1027 await self._writer
1028 self.release()
1030 async def read(self) -> bytes:
1031 """Read response payload."""
1032 if self._body is None:
1033 try:
1034 self._body = await self.content.read()
1035 for trace in self._traces:
1036 await trace.send_response_chunk_received(
1037 self.method, self.url, self._body
1038 )
1039 except BaseException:
1040 self.close()
1041 raise
1042 elif self._released: # Response explicitly released
1043 raise ClientConnectionError("Connection closed")
1045 protocol = self._connection and self._connection.protocol
1046 if protocol is None or not protocol.upgraded:
1047 await self._wait_released() # Underlying connection released
1048 return self._body
1050 def get_encoding(self) -> str:
1051 ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1052 mimetype = helpers.parse_mimetype(ctype)
1054 encoding = mimetype.parameters.get("charset")
1055 if encoding:
1056 with contextlib.suppress(LookupError):
1057 return codecs.lookup(encoding).name
1059 if mimetype.type == "application" and (
1060 mimetype.subtype == "json" or mimetype.subtype == "rdap"
1061 ):
1062 # RFC 7159 states that the default encoding is UTF-8.
1063 # RFC 7483 defines application/rdap+json
1064 return "utf-8"
1066 if self._body is None:
1067 raise RuntimeError(
1068 "Cannot compute fallback encoding of a not yet read body"
1069 )
1071 return self._resolve_charset(self, self._body)
1073 async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
1074 """Read response payload and decode."""
1075 if self._body is None:
1076 await self.read()
1078 if encoding is None:
1079 encoding = self.get_encoding()
1081 return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
1083 async def json(
1084 self,
1085 *,
1086 encoding: Optional[str] = None,
1087 loads: JSONDecoder = DEFAULT_JSON_DECODER,
1088 content_type: Optional[str] = "application/json",
1089 ) -> Any:
1090 """Read and decodes JSON response."""
1091 if self._body is None:
1092 await self.read()
1094 if content_type:
1095 if not is_expected_content_type(self.content_type, content_type):
1096 raise ContentTypeError(
1097 self.request_info,
1098 self.history,
1099 message=(
1100 "Attempt to decode JSON with "
1101 "unexpected mimetype: %s" % self.content_type
1102 ),
1103 headers=self.headers,
1104 )
1106 if encoding is None:
1107 encoding = self.get_encoding()
1109 return loads(self._body.decode(encoding)) # type: ignore[union-attr]
1111 async def __aenter__(self) -> "ClientResponse":
1112 return self
1114 async def __aexit__(
1115 self,
1116 exc_type: Optional[Type[BaseException]],
1117 exc_val: Optional[BaseException],
1118 exc_tb: Optional[TracebackType],
1119 ) -> None:
1120 # similar to _RequestContextManager, we do not need to check
1121 # for exceptions, response object can close connection
1122 # if state is broken
1123 self.release()
1124 await self.wait_for_close()