Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/helpers.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Various helper functions"""
3import asyncio
4import base64
5import binascii
6import contextlib
7import dataclasses
8import datetime
9import enum
10import functools
11import inspect
12import netrc
13import os
14import platform
15import re
16import sys
17import time
18import warnings
19import weakref
20from collections import namedtuple
21from contextlib import suppress
22from email.parser import HeaderParser
23from email.utils import parsedate
24from http.cookies import SimpleCookie
25from math import ceil
26from pathlib import Path
27from types import MappingProxyType, TracebackType
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 Callable,
32 ContextManager,
33 Dict,
34 Generic,
35 Iterable,
36 Iterator,
37 List,
38 Mapping,
39 Optional,
40 Protocol,
41 Tuple,
42 Type,
43 TypeVar,
44 Union,
45 final,
46 get_args,
47 overload,
48)
49from urllib.parse import quote
50from urllib.request import getproxies, proxy_bypass
52from multidict import CIMultiDict, MultiDict, MultiDictProxy, MultiMapping
53from propcache.api import under_cached_property as reify
54from yarl import URL
56from . import hdrs
57from .log import client_logger
58from .typedefs import PathLike # noqa
60if sys.version_info >= (3, 11):
61 import asyncio as async_timeout
62else:
63 import async_timeout
65if TYPE_CHECKING:
66 from dataclasses import dataclass as frozen_dataclass_decorator
67elif sys.version_info < (3, 10):
68 frozen_dataclass_decorator = functools.partial(dataclasses.dataclass, frozen=True)
69else:
70 frozen_dataclass_decorator = functools.partial(
71 dataclasses.dataclass, frozen=True, slots=True
72 )
74__all__ = ("BasicAuth", "ChainMapProxy", "ETag", "frozen_dataclass_decorator", "reify")
76PY_310 = sys.version_info >= (3, 10)
78COOKIE_MAX_LENGTH = 4096
80_T = TypeVar("_T")
81_S = TypeVar("_S")
83_SENTINEL = enum.Enum("_SENTINEL", "sentinel")
84sentinel = _SENTINEL.sentinel
86NO_EXTENSIONS = bool(os.environ.get("AIOHTTP_NO_EXTENSIONS"))
88# https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.1
89EMPTY_BODY_STATUS_CODES = frozenset((204, 304, *range(100, 200)))
90# https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.1
91# https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.2
92EMPTY_BODY_METHODS = hdrs.METH_HEAD_ALL
94DEBUG = sys.flags.dev_mode or (
95 not sys.flags.ignore_environment and bool(os.environ.get("PYTHONASYNCIODEBUG"))
96)
99CHAR = {chr(i) for i in range(0, 128)}
100CTL = {chr(i) for i in range(0, 32)} | {
101 chr(127),
102}
103SEPARATORS = {
104 "(",
105 ")",
106 "<",
107 ">",
108 "@",
109 ",",
110 ";",
111 ":",
112 "\\",
113 '"',
114 "/",
115 "[",
116 "]",
117 "?",
118 "=",
119 "{",
120 "}",
121 " ",
122 chr(9),
123}
124TOKEN = CHAR ^ CTL ^ SEPARATORS
127json_re = re.compile(r"(?:application/|[\w.-]+/[\w.+-]+?\+)json$", re.IGNORECASE)
130class BasicAuth(namedtuple("BasicAuth", ["login", "password", "encoding"])):
131 """Http basic authentication helper."""
133 def __new__(
134 cls, login: str, password: str = "", encoding: str = "latin1"
135 ) -> "BasicAuth":
136 if login is None:
137 raise ValueError("None is not allowed as login value")
139 if password is None:
140 raise ValueError("None is not allowed as password value")
142 if ":" in login:
143 raise ValueError('A ":" is not allowed in login (RFC 1945#section-11.1)')
145 return super().__new__(cls, login, password, encoding)
147 @classmethod
148 def decode(cls, auth_header: str, encoding: str = "latin1") -> "BasicAuth": # type: ignore[misc]
149 """Create a BasicAuth object from an Authorization HTTP header."""
150 try:
151 auth_type, encoded_credentials = auth_header.split(" ", 1)
152 except ValueError:
153 raise ValueError("Could not parse authorization header.")
155 if auth_type.lower() != "basic":
156 raise ValueError("Unknown authorization method %s" % auth_type)
158 try:
159 decoded = base64.b64decode(
160 encoded_credentials.encode("ascii"), validate=True
161 ).decode(encoding)
162 except binascii.Error:
163 raise ValueError("Invalid base64 encoding.")
165 try:
166 # RFC 2617 HTTP Authentication
167 # https://www.ietf.org/rfc/rfc2617.txt
168 # the colon must be present, but the username and password may be
169 # otherwise blank.
170 username, password = decoded.split(":", 1)
171 except ValueError:
172 raise ValueError("Invalid credentials.")
174 return cls(username, password, encoding=encoding)
176 @classmethod
177 def from_url(cls, url: URL, *, encoding: str = "latin1") -> Optional["BasicAuth"]: # type: ignore[misc]
178 """Create BasicAuth from url."""
179 if not isinstance(url, URL):
180 raise TypeError("url should be yarl.URL instance")
181 # Check raw_user and raw_password first as yarl is likely
182 # to already have these values parsed from the netloc in the cache.
183 if url.raw_user is None and url.raw_password is None:
184 return None
185 return cls(url.user or "", url.password or "", encoding=encoding)
187 def encode(self) -> str:
188 """Encode credentials."""
189 creds = (f"{self.login}:{self.password}").encode(self.encoding)
190 return "Basic %s" % base64.b64encode(creds).decode(self.encoding)
193def strip_auth_from_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]:
194 """Remove user and password from URL if present and return BasicAuth object."""
195 # Check raw_user and raw_password first as yarl is likely
196 # to already have these values parsed from the netloc in the cache.
197 if url.raw_user is None and url.raw_password is None:
198 return url, None
199 return url.with_user(None), BasicAuth(url.user or "", url.password or "")
202def netrc_from_env() -> Optional[netrc.netrc]:
203 """Load netrc from file.
205 Attempt to load it from the path specified by the env-var
206 NETRC or in the default location in the user's home directory.
208 Returns None if it couldn't be found or fails to parse.
209 """
210 netrc_env = os.environ.get("NETRC")
212 if netrc_env is not None:
213 netrc_path = Path(netrc_env)
214 else:
215 try:
216 home_dir = Path.home()
217 except RuntimeError as e:
218 # if pathlib can't resolve home, it may raise a RuntimeError
219 client_logger.debug(
220 "Could not resolve home directory when "
221 "trying to look for .netrc file: %s",
222 e,
223 )
224 return None
226 netrc_path = home_dir / (
227 "_netrc" if platform.system() == "Windows" else ".netrc"
228 )
230 try:
231 return netrc.netrc(str(netrc_path))
232 except netrc.NetrcParseError as e:
233 client_logger.warning("Could not parse .netrc file: %s", e)
234 except OSError as e:
235 netrc_exists = False
236 with contextlib.suppress(OSError):
237 netrc_exists = netrc_path.is_file()
238 # we couldn't read the file (doesn't exist, permissions, etc.)
239 if netrc_env or netrc_exists:
240 # only warn if the environment wanted us to load it,
241 # or it appears like the default file does actually exist
242 client_logger.warning("Could not read .netrc file: %s", e)
244 return None
247@frozen_dataclass_decorator
248class ProxyInfo: # type: ignore[misc]
249 proxy: URL
250 proxy_auth: Optional[BasicAuth]
253def basicauth_from_netrc(netrc_obj: Optional[netrc.netrc], host: str) -> BasicAuth:
254 """
255 Return :py:class:`~aiohttp.BasicAuth` credentials for ``host`` from ``netrc_obj``.
257 :raises LookupError: if ``netrc_obj`` is :py:data:`None` or if no
258 entry is found for the ``host``.
259 """
260 if netrc_obj is None:
261 raise LookupError("No .netrc file found")
262 auth_from_netrc = netrc_obj.authenticators(host)
264 if auth_from_netrc is None:
265 raise LookupError(f"No entry for {host!s} found in the `.netrc` file.")
266 login, account, password = auth_from_netrc
268 # TODO(PY311): username = login or account
269 # Up to python 3.10, account could be None if not specified,
270 # and login will be empty string if not specified. From 3.11,
271 # login and account will be empty string if not specified.
272 username = login if (login or account is None) else account
274 # TODO(PY311): Remove this, as password will be empty string
275 # if not specified
276 if password is None:
277 password = "" # type: ignore[unreachable]
279 return BasicAuth(username, password)
282def proxies_from_env() -> Dict[str, ProxyInfo]:
283 proxy_urls = {
284 k: URL(v)
285 for k, v in getproxies().items()
286 if k in ("http", "https", "ws", "wss")
287 }
288 netrc_obj = netrc_from_env()
289 stripped = {k: strip_auth_from_url(v) for k, v in proxy_urls.items()}
290 ret = {}
291 for proto, val in stripped.items():
292 proxy, auth = val
293 if proxy.scheme in ("https", "wss"):
294 client_logger.warning(
295 "%s proxies %s are not supported, ignoring", proxy.scheme.upper(), proxy
296 )
297 continue
298 if netrc_obj and auth is None:
299 if proxy.host is not None:
300 try:
301 auth = basicauth_from_netrc(netrc_obj, proxy.host)
302 except LookupError:
303 auth = None
304 ret[proto] = ProxyInfo(proxy, auth)
305 return ret
308def get_env_proxy_for_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]:
309 """Get a permitted proxy for the given URL from the env."""
310 if url.host is not None and proxy_bypass(url.host):
311 raise LookupError(f"Proxying is disallowed for `{url.host!r}`")
313 proxies_in_env = proxies_from_env()
314 try:
315 proxy_info = proxies_in_env[url.scheme]
316 except KeyError:
317 raise LookupError(f"No proxies found for `{url!s}` in the env")
318 else:
319 return proxy_info.proxy, proxy_info.proxy_auth
322@frozen_dataclass_decorator
323class MimeType:
324 type: str
325 subtype: str
326 suffix: str
327 parameters: "MultiDictProxy[str]"
330@functools.lru_cache(maxsize=56)
331def parse_mimetype(mimetype: str) -> MimeType:
332 """Parses a MIME type into its components.
334 mimetype is a MIME type string.
336 Returns a MimeType object.
338 Example:
340 >>> parse_mimetype('text/html; charset=utf-8')
341 MimeType(type='text', subtype='html', suffix='',
342 parameters={'charset': 'utf-8'})
344 """
345 if not mimetype:
346 return MimeType(
347 type="", subtype="", suffix="", parameters=MultiDictProxy(MultiDict())
348 )
350 parts = mimetype.split(";")
351 params: MultiDict[str] = MultiDict()
352 for item in parts[1:]:
353 if not item:
354 continue
355 key, _, value = item.partition("=")
356 params.add(key.lower().strip(), value.strip(' "'))
358 fulltype = parts[0].strip().lower()
359 if fulltype == "*":
360 fulltype = "*/*"
362 mtype, _, stype = fulltype.partition("/")
363 stype, _, suffix = stype.partition("+")
365 return MimeType(
366 type=mtype, subtype=stype, suffix=suffix, parameters=MultiDictProxy(params)
367 )
370@functools.lru_cache(maxsize=56)
371def parse_content_type(raw: str) -> Tuple[str, MappingProxyType[str, str]]:
372 """Parse Content-Type header.
374 Returns a tuple of the parsed content type and a
375 MappingProxyType of parameters.
376 """
377 msg = HeaderParser().parsestr(f"Content-Type: {raw}")
378 content_type = msg.get_content_type()
379 params = msg.get_params(())
380 content_dict = dict(params[1:]) # First element is content type again
381 return content_type, MappingProxyType(content_dict)
384def guess_filename(obj: Any, default: Optional[str] = None) -> Optional[str]:
385 name = getattr(obj, "name", None)
386 if name and isinstance(name, str) and name[0] != "<" and name[-1] != ">":
387 return Path(name).name
388 return default
391not_qtext_re = re.compile(r"[^\041\043-\133\135-\176]")
392QCONTENT = {chr(i) for i in range(0x20, 0x7F)} | {"\t"}
395def quoted_string(content: str) -> str:
396 """Return 7-bit content as quoted-string.
398 Format content into a quoted-string as defined in RFC5322 for
399 Internet Message Format. Notice that this is not the 8-bit HTTP
400 format, but the 7-bit email format. Content must be in usascii or
401 a ValueError is raised.
402 """
403 if not (QCONTENT > set(content)):
404 raise ValueError(f"bad content for quoted-string {content!r}")
405 return not_qtext_re.sub(lambda x: "\\" + x.group(0), content)
408def content_disposition_header(
409 disptype: str,
410 quote_fields: bool = True,
411 _charset: str = "utf-8",
412 params: Optional[Dict[str, str]] = None,
413) -> str:
414 """Sets ``Content-Disposition`` header for MIME.
416 This is the MIME payload Content-Disposition header from RFC 2183
417 and RFC 7579 section 4.2, not the HTTP Content-Disposition from
418 RFC 6266.
420 disptype is a disposition type: inline, attachment, form-data.
421 Should be valid extension token (see RFC 2183)
423 quote_fields performs value quoting to 7-bit MIME headers
424 according to RFC 7578. Set to quote_fields to False if recipient
425 can take 8-bit file names and field values.
427 _charset specifies the charset to use when quote_fields is True.
429 params is a dict with disposition params.
430 """
431 if not disptype or not (TOKEN > set(disptype)):
432 raise ValueError(f"bad content disposition type {disptype!r}")
434 value = disptype
435 if params:
436 lparams = []
437 for key, val in params.items():
438 if not key or not (TOKEN > set(key)):
439 raise ValueError(f"bad content disposition parameter {key!r}={val!r}")
440 if quote_fields:
441 if key.lower() == "filename":
442 qval = quote(val, "", encoding=_charset)
443 lparams.append((key, '"%s"' % qval))
444 else:
445 try:
446 qval = quoted_string(val)
447 except ValueError:
448 qval = "".join(
449 (_charset, "''", quote(val, "", encoding=_charset))
450 )
451 lparams.append((key + "*", qval))
452 else:
453 lparams.append((key, '"%s"' % qval))
454 else:
455 qval = val.replace("\\", "\\\\").replace('"', '\\"')
456 lparams.append((key, '"%s"' % qval))
457 sparams = "; ".join("=".join(pair) for pair in lparams)
458 value = "; ".join((value, sparams))
459 return value
462def is_expected_content_type(
463 response_content_type: str, expected_content_type: str
464) -> bool:
465 """Checks if received content type is processable as an expected one.
467 Both arguments should be given without parameters.
468 """
469 if expected_content_type == "application/json":
470 return json_re.match(response_content_type) is not None
471 return expected_content_type in response_content_type
474def is_ip_address(host: Optional[str]) -> bool:
475 """Check if host looks like an IP Address.
477 This check is only meant as a heuristic to ensure that
478 a host is not a domain name.
479 """
480 if not host:
481 return False
482 # For a host to be an ipv4 address, it must be all numeric.
483 # The host must contain a colon to be an IPv6 address.
484 return ":" in host or host.replace(".", "").isdigit()
487_cached_current_datetime: Optional[int] = None
488_cached_formatted_datetime = ""
491def rfc822_formatted_time() -> str:
492 global _cached_current_datetime
493 global _cached_formatted_datetime
495 now = int(time.time())
496 if now != _cached_current_datetime:
497 # Weekday and month names for HTTP date/time formatting;
498 # always English!
499 # Tuples are constants stored in codeobject!
500 _weekdayname = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")
501 _monthname = (
502 "", # Dummy so we can use 1-based month numbers
503 "Jan",
504 "Feb",
505 "Mar",
506 "Apr",
507 "May",
508 "Jun",
509 "Jul",
510 "Aug",
511 "Sep",
512 "Oct",
513 "Nov",
514 "Dec",
515 )
517 year, month, day, hh, mm, ss, wd, *tail = time.gmtime(now)
518 _cached_formatted_datetime = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
519 _weekdayname[wd],
520 day,
521 _monthname[month],
522 year,
523 hh,
524 mm,
525 ss,
526 )
527 _cached_current_datetime = now
528 return _cached_formatted_datetime
531def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None:
532 ref, name = info
533 ob = ref()
534 if ob is not None:
535 with suppress(Exception):
536 getattr(ob, name)()
539def weakref_handle(
540 ob: object,
541 name: str,
542 timeout: Optional[float],
543 loop: asyncio.AbstractEventLoop,
544 timeout_ceil_threshold: float = 5,
545) -> Optional[asyncio.TimerHandle]:
546 if timeout is not None and timeout > 0:
547 when = loop.time() + timeout
548 if timeout >= timeout_ceil_threshold:
549 when = ceil(when)
551 return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name))
552 return None
555def call_later(
556 cb: Callable[[], Any],
557 timeout: Optional[float],
558 loop: asyncio.AbstractEventLoop,
559 timeout_ceil_threshold: float = 5,
560) -> Optional[asyncio.TimerHandle]:
561 if timeout is None or timeout <= 0:
562 return None
563 now = loop.time()
564 when = calculate_timeout_when(now, timeout, timeout_ceil_threshold)
565 return loop.call_at(when, cb)
568def calculate_timeout_when(
569 loop_time: float,
570 timeout: float,
571 timeout_ceiling_threshold: float,
572) -> float:
573 """Calculate when to execute a timeout."""
574 when = loop_time + timeout
575 if timeout > timeout_ceiling_threshold:
576 return ceil(when)
577 return when
580class TimeoutHandle:
581 """Timeout handle"""
583 __slots__ = ("_timeout", "_loop", "_ceil_threshold", "_callbacks")
585 def __init__(
586 self,
587 loop: asyncio.AbstractEventLoop,
588 timeout: Optional[float],
589 ceil_threshold: float = 5,
590 ) -> None:
591 self._timeout = timeout
592 self._loop = loop
593 self._ceil_threshold = ceil_threshold
594 self._callbacks: List[
595 Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]
596 ] = []
598 def register(
599 self, callback: Callable[..., None], *args: Any, **kwargs: Any
600 ) -> None:
601 self._callbacks.append((callback, args, kwargs))
603 def close(self) -> None:
604 self._callbacks.clear()
606 def start(self) -> Optional[asyncio.TimerHandle]:
607 timeout = self._timeout
608 if timeout is not None and timeout > 0:
609 when = self._loop.time() + timeout
610 if timeout >= self._ceil_threshold:
611 when = ceil(when)
612 return self._loop.call_at(when, self.__call__)
613 else:
614 return None
616 def timer(self) -> "BaseTimerContext":
617 if self._timeout is not None and self._timeout > 0:
618 timer = TimerContext(self._loop)
619 self.register(timer.timeout)
620 return timer
621 else:
622 return TimerNoop()
624 def __call__(self) -> None:
625 for cb, args, kwargs in self._callbacks:
626 with suppress(Exception):
627 cb(*args, **kwargs)
629 self._callbacks.clear()
632class BaseTimerContext(ContextManager["BaseTimerContext"]):
634 __slots__ = ()
636 def assert_timeout(self) -> None:
637 """Raise TimeoutError if timeout has been exceeded."""
640class TimerNoop(BaseTimerContext):
642 __slots__ = ()
644 def __enter__(self) -> BaseTimerContext:
645 return self
647 def __exit__(
648 self,
649 exc_type: Optional[Type[BaseException]],
650 exc_val: Optional[BaseException],
651 exc_tb: Optional[TracebackType],
652 ) -> None:
653 return
656class TimerContext(BaseTimerContext):
657 """Low resolution timeout context manager"""
659 __slots__ = ("_loop", "_tasks", "_cancelled", "_cancelling")
661 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
662 self._loop = loop
663 self._tasks: List[asyncio.Task[Any]] = []
664 self._cancelled = False
665 self._cancelling = 0
667 def assert_timeout(self) -> None:
668 """Raise TimeoutError if timer has already been cancelled."""
669 if self._cancelled:
670 raise asyncio.TimeoutError from None
672 def __enter__(self) -> BaseTimerContext:
673 task = asyncio.current_task(loop=self._loop)
674 if task is None:
675 raise RuntimeError("Timeout context manager should be used inside a task")
677 if sys.version_info >= (3, 11):
678 # Remember if the task was already cancelling
679 # so when we __exit__ we can decide if we should
680 # raise asyncio.TimeoutError or let the cancellation propagate
681 self._cancelling = task.cancelling()
683 if self._cancelled:
684 raise asyncio.TimeoutError from None
686 self._tasks.append(task)
687 return self
689 def __exit__(
690 self,
691 exc_type: Optional[Type[BaseException]],
692 exc_val: Optional[BaseException],
693 exc_tb: Optional[TracebackType],
694 ) -> Optional[bool]:
695 enter_task: Optional[asyncio.Task[Any]] = None
696 if self._tasks:
697 enter_task = self._tasks.pop()
699 if exc_type is asyncio.CancelledError and self._cancelled:
700 assert enter_task is not None
701 # The timeout was hit, and the task was cancelled
702 # so we need to uncancel the last task that entered the context manager
703 # since the cancellation should not leak out of the context manager
704 if sys.version_info >= (3, 11):
705 # If the task was already cancelling don't raise
706 # asyncio.TimeoutError and instead return None
707 # to allow the cancellation to propagate
708 if enter_task.uncancel() > self._cancelling:
709 return None
710 raise asyncio.TimeoutError from exc_val
711 return None
713 def timeout(self) -> None:
714 if not self._cancelled:
715 for task in set(self._tasks):
716 task.cancel()
718 self._cancelled = True
721def ceil_timeout(
722 delay: Optional[float], ceil_threshold: float = 5
723) -> async_timeout.Timeout:
724 if delay is None or delay <= 0:
725 return async_timeout.timeout(None)
727 loop = asyncio.get_running_loop()
728 now = loop.time()
729 when = now + delay
730 if delay > ceil_threshold:
731 when = ceil(when)
732 return async_timeout.timeout_at(when)
735class HeadersMixin:
736 """Mixin for handling headers."""
738 _headers: MultiMapping[str]
739 _content_type: Optional[str] = None
740 _content_dict: Optional[Dict[str, str]] = None
741 _stored_content_type: Union[str, None, _SENTINEL] = sentinel
743 def _parse_content_type(self, raw: Optional[str]) -> None:
744 self._stored_content_type = raw
745 if raw is None:
746 # default value according to RFC 2616
747 self._content_type = "application/octet-stream"
748 self._content_dict = {}
749 else:
750 content_type, content_mapping_proxy = parse_content_type(raw)
751 self._content_type = content_type
752 # _content_dict needs to be mutable so we can update it
753 self._content_dict = content_mapping_proxy.copy()
755 @property
756 def content_type(self) -> str:
757 """The value of content part for Content-Type HTTP header."""
758 raw = self._headers.get(hdrs.CONTENT_TYPE)
759 if self._stored_content_type != raw:
760 self._parse_content_type(raw)
761 assert self._content_type is not None
762 return self._content_type
764 @property
765 def charset(self) -> Optional[str]:
766 """The value of charset part for Content-Type HTTP header."""
767 raw = self._headers.get(hdrs.CONTENT_TYPE)
768 if self._stored_content_type != raw:
769 self._parse_content_type(raw)
770 assert self._content_dict is not None
771 return self._content_dict.get("charset")
773 @property
774 def content_length(self) -> Optional[int]:
775 """The value of Content-Length HTTP header."""
776 content_length = self._headers.get(hdrs.CONTENT_LENGTH)
777 return None if content_length is None else int(content_length)
780def set_result(fut: "asyncio.Future[_T]", result: _T) -> None:
781 if not fut.done():
782 fut.set_result(result)
785_EXC_SENTINEL = BaseException()
788class ErrorableProtocol(Protocol):
789 def set_exception(
790 self,
791 exc: Union[Type[BaseException], BaseException],
792 exc_cause: BaseException = ...,
793 ) -> None: ...
796def set_exception(
797 fut: Union["asyncio.Future[_T]", ErrorableProtocol],
798 exc: Union[Type[BaseException], BaseException],
799 exc_cause: BaseException = _EXC_SENTINEL,
800) -> None:
801 """Set future exception.
803 If the future is marked as complete, this function is a no-op.
805 :param exc_cause: An exception that is a direct cause of ``exc``.
806 Only set if provided.
807 """
808 if asyncio.isfuture(fut) and fut.done():
809 return
811 exc_is_sentinel = exc_cause is _EXC_SENTINEL
812 exc_causes_itself = exc is exc_cause
813 if not exc_is_sentinel and not exc_causes_itself:
814 exc.__cause__ = exc_cause
816 fut.set_exception(exc)
819@functools.total_ordering
820class AppKey(Generic[_T]):
821 """Keys for static typing support in Application."""
823 __slots__ = ("_name", "_t", "__orig_class__")
825 # This may be set by Python when instantiating with a generic type. We need to
826 # support this, in order to support types that are not concrete classes,
827 # like Iterable, which can't be passed as the second parameter to __init__.
828 __orig_class__: Type[object]
830 # TODO(PY314): Change Type to TypeForm (this should resolve unreachable below).
831 def __init__(self, name: str, t: Optional[Type[_T]] = None):
832 # Prefix with module name to help deduplicate key names.
833 frame = inspect.currentframe()
834 while frame:
835 if frame.f_code.co_name == "<module>":
836 module: str = frame.f_globals["__name__"]
837 break
838 frame = frame.f_back
839 else:
840 raise RuntimeError("Failed to get module name.")
842 # https://github.com/python/mypy/issues/14209
843 self._name = module + "." + name # type: ignore[possibly-undefined]
844 self._t = t
846 def __lt__(self, other: object) -> bool:
847 if isinstance(other, AppKey):
848 return self._name < other._name
849 return True # Order AppKey above other types.
851 def __repr__(self) -> str:
852 t = self._t
853 if t is None:
854 with suppress(AttributeError):
855 # Set to type arg.
856 t = get_args(self.__orig_class__)[0]
858 if t is None:
859 t_repr = "<<Unknown>>"
860 elif isinstance(t, type):
861 if t.__module__ == "builtins":
862 t_repr = t.__qualname__
863 else:
864 t_repr = f"{t.__module__}.{t.__qualname__}"
865 else:
866 t_repr = repr(t) # type: ignore[unreachable]
867 return f"<AppKey({self._name}, type={t_repr})>"
870@final
871class ChainMapProxy(Mapping[Union[str, AppKey[Any]], Any]):
872 __slots__ = ("_maps",)
874 def __init__(self, maps: Iterable[Mapping[Union[str, AppKey[Any]], Any]]) -> None:
875 self._maps = tuple(maps)
877 def __init_subclass__(cls) -> None:
878 raise TypeError(
879 "Inheritance class {} from ChainMapProxy "
880 "is forbidden".format(cls.__name__)
881 )
883 @overload # type: ignore[override]
884 def __getitem__(self, key: AppKey[_T]) -> _T: ...
886 @overload
887 def __getitem__(self, key: str) -> Any: ... # type: ignore[misc]
889 def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any:
890 for mapping in self._maps:
891 try:
892 return mapping[key]
893 except KeyError:
894 pass
895 raise KeyError(key)
897 @overload # type: ignore[override]
898 def get(self, key: AppKey[_T], default: _S) -> Union[_T, _S]: ...
900 @overload
901 def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]: ...
903 @overload
904 def get(self, key: str, default: Any = ...) -> Any: ... # type: ignore[misc]
906 def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any:
907 try:
908 return self[key]
909 except KeyError:
910 return default
912 def __len__(self) -> int:
913 # reuses stored hash values if possible
914 return len(set().union(*self._maps))
916 def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]:
917 d: Dict[Union[str, AppKey[Any]], Any] = {}
918 for mapping in reversed(self._maps):
919 # reuses stored hash values if possible
920 d.update(mapping)
921 return iter(d)
923 def __contains__(self, key: object) -> bool:
924 return any(key in m for m in self._maps)
926 def __bool__(self) -> bool:
927 return any(self._maps)
929 def __repr__(self) -> str:
930 content = ", ".join(map(repr, self._maps))
931 return f"ChainMapProxy({content})"
934class CookieMixin:
935 """Mixin for handling cookies."""
937 _cookies: Optional[SimpleCookie] = None
939 @property
940 def cookies(self) -> SimpleCookie:
941 if self._cookies is None:
942 self._cookies = SimpleCookie()
943 return self._cookies
945 def set_cookie(
946 self,
947 name: str,
948 value: str,
949 *,
950 expires: Optional[str] = None,
951 domain: Optional[str] = None,
952 max_age: Optional[Union[int, str]] = None,
953 path: str = "/",
954 secure: Optional[bool] = None,
955 httponly: Optional[bool] = None,
956 samesite: Optional[str] = None,
957 partitioned: Optional[bool] = None,
958 ) -> None:
959 """Set or update response cookie.
961 Sets new cookie or updates existent with new value.
962 Also updates only those params which are not None.
963 """
964 if self._cookies is None:
965 self._cookies = SimpleCookie()
967 self._cookies[name] = value
968 c = self._cookies[name]
970 if expires is not None:
971 c["expires"] = expires
972 elif c.get("expires") == "Thu, 01 Jan 1970 00:00:00 GMT":
973 del c["expires"]
975 if domain is not None:
976 c["domain"] = domain
978 if max_age is not None:
979 c["max-age"] = str(max_age)
980 elif "max-age" in c:
981 del c["max-age"]
983 c["path"] = path
985 if secure is not None:
986 c["secure"] = secure
987 if httponly is not None:
988 c["httponly"] = httponly
989 if samesite is not None:
990 c["samesite"] = samesite
992 if partitioned is not None:
993 c["partitioned"] = partitioned
995 if DEBUG:
996 cookie_length = len(c.output(header="")[1:])
997 if cookie_length > COOKIE_MAX_LENGTH:
998 warnings.warn(
999 "The size of is too large, it might get ignored by the client.",
1000 UserWarning,
1001 stacklevel=2,
1002 )
1004 def del_cookie(
1005 self,
1006 name: str,
1007 *,
1008 domain: Optional[str] = None,
1009 path: str = "/",
1010 secure: Optional[bool] = None,
1011 httponly: Optional[bool] = None,
1012 samesite: Optional[str] = None,
1013 ) -> None:
1014 """Delete cookie.
1016 Creates new empty expired cookie.
1017 """
1018 # TODO: do we need domain/path here?
1019 if self._cookies is not None:
1020 self._cookies.pop(name, None)
1021 self.set_cookie(
1022 name,
1023 "",
1024 max_age=0,
1025 expires="Thu, 01 Jan 1970 00:00:00 GMT",
1026 domain=domain,
1027 path=path,
1028 secure=secure,
1029 httponly=httponly,
1030 samesite=samesite,
1031 )
1034def populate_with_cookies(headers: "CIMultiDict[str]", cookies: SimpleCookie) -> None:
1035 for cookie in cookies.values():
1036 value = cookie.output(header="")[1:]
1037 headers.add(hdrs.SET_COOKIE, value)
1040# https://tools.ietf.org/html/rfc7232#section-2.3
1041_ETAGC = r"[!\x23-\x7E\x80-\xff]+"
1042_ETAGC_RE = re.compile(_ETAGC)
1043_QUOTED_ETAG = rf'(W/)?"({_ETAGC})"'
1044QUOTED_ETAG_RE = re.compile(_QUOTED_ETAG)
1045LIST_QUOTED_ETAG_RE = re.compile(rf"({_QUOTED_ETAG})(?:\s*,\s*|$)|(.)")
1047ETAG_ANY = "*"
1050@frozen_dataclass_decorator
1051class ETag:
1052 value: str
1053 is_weak: bool = False
1056def validate_etag_value(value: str) -> None:
1057 if value != ETAG_ANY and not _ETAGC_RE.fullmatch(value):
1058 raise ValueError(
1059 f"Value {value!r} is not a valid etag. Maybe it contains '\"'?"
1060 )
1063def parse_http_date(date_str: Optional[str]) -> Optional[datetime.datetime]:
1064 """Process a date string, return a datetime object"""
1065 if date_str is not None:
1066 timetuple = parsedate(date_str)
1067 if timetuple is not None:
1068 with suppress(ValueError):
1069 return datetime.datetime(*timetuple[:6], tzinfo=datetime.timezone.utc)
1070 return None
1073@functools.lru_cache
1074def must_be_empty_body(method: str, code: int) -> bool:
1075 """Check if a request must return an empty body."""
1076 return (
1077 code in EMPTY_BODY_STATUS_CODES
1078 or method in EMPTY_BODY_METHODS
1079 or (200 <= code < 300 and method in hdrs.METH_CONNECT_ALL)
1080 )
1083def should_remove_content_length(method: str, code: int) -> bool:
1084 """Check if a Content-Length header should be removed.
1086 This should always be a subset of must_be_empty_body
1087 """
1088 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-8
1089 # https://www.rfc-editor.org/rfc/rfc9110.html#section-15.4.5-4
1090 return code in EMPTY_BODY_STATUS_CODES or (
1091 200 <= code < 300 and method in hdrs.METH_CONNECT_ALL
1092 )