Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_urldispatcher.py: 37%
720 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
1import abc
2import asyncio
3import base64
4import hashlib
5import keyword
6import os
7import re
8from contextlib import contextmanager
9from pathlib import Path
10from types import MappingProxyType
11from typing import (
12 TYPE_CHECKING,
13 Any,
14 Awaitable,
15 Callable,
16 Container,
17 Dict,
18 Final,
19 Generator,
20 Iterable,
21 Iterator,
22 List,
23 Mapping,
24 NoReturn,
25 Optional,
26 Pattern,
27 Set,
28 Sized,
29 Tuple,
30 Type,
31 TypedDict,
32 Union,
33 cast,
34)
36from yarl import URL, __version__ as yarl_version # type: ignore[attr-defined]
38from . import hdrs
39from .abc import AbstractMatchInfo, AbstractRouter, AbstractView
40from .helpers import DEBUG
41from .http import HttpVersion11
42from .typedefs import Handler, PathLike
43from .web_exceptions import (
44 HTTPException,
45 HTTPExpectationFailed,
46 HTTPForbidden,
47 HTTPMethodNotAllowed,
48 HTTPNotFound,
49)
50from .web_fileresponse import FileResponse
51from .web_request import Request
52from .web_response import Response, StreamResponse
53from .web_routedef import AbstractRouteDef
55__all__ = (
56 "UrlDispatcher",
57 "UrlMappingMatchInfo",
58 "AbstractResource",
59 "Resource",
60 "PlainResource",
61 "DynamicResource",
62 "AbstractRoute",
63 "ResourceRoute",
64 "StaticResource",
65 "View",
66)
69if TYPE_CHECKING:
70 from .web_app import Application
72 BaseDict = Dict[str, str]
73else:
74 BaseDict = dict
76YARL_VERSION: Final[Tuple[int, ...]] = tuple(map(int, yarl_version.split(".")[:2]))
78HTTP_METHOD_RE: Final[Pattern[str]] = re.compile(
79 r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$"
80)
81ROUTE_RE: Final[Pattern[str]] = re.compile(
82 r"(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})"
83)
84PATH_SEP: Final[str] = re.escape("/")
87_ExpectHandler = Callable[[Request], Awaitable[Optional[StreamResponse]]]
88_Resolve = Tuple[Optional["UrlMappingMatchInfo"], Set[str]]
91class _InfoDict(TypedDict, total=False):
92 path: str
94 formatter: str
95 pattern: Pattern[str]
97 directory: Path
98 prefix: str
99 routes: Mapping[str, "AbstractRoute"]
101 app: "Application"
103 domain: str
105 rule: "AbstractRuleMatching"
107 http_exception: HTTPException
110class AbstractResource(Sized, Iterable["AbstractRoute"]):
111 def __init__(self, *, name: Optional[str] = None) -> None:
112 self._name = name
114 @property
115 def name(self) -> Optional[str]:
116 return self._name
118 @property
119 @abc.abstractmethod
120 def canonical(self) -> str:
121 """Exposes the resource's canonical path.
123 For example '/foo/bar/{name}'
125 """
127 @abc.abstractmethod # pragma: no branch
128 def url_for(self, **kwargs: str) -> URL:
129 """Construct url for resource with additional params."""
131 @abc.abstractmethod # pragma: no branch
132 async def resolve(self, request: Request) -> _Resolve:
133 """Resolve resource.
135 Return (UrlMappingMatchInfo, allowed_methods) pair.
136 """
138 @abc.abstractmethod
139 def add_prefix(self, prefix: str) -> None:
140 """Add a prefix to processed URLs.
142 Required for subapplications support.
143 """
145 @abc.abstractmethod
146 def get_info(self) -> _InfoDict:
147 """Return a dict with additional info useful for introspection"""
149 def freeze(self) -> None:
150 pass
152 @abc.abstractmethod
153 def raw_match(self, path: str) -> bool:
154 """Perform a raw match against path"""
157class AbstractRoute(abc.ABC):
158 def __init__(
159 self,
160 method: str,
161 handler: Union[Handler, Type[AbstractView]],
162 *,
163 expect_handler: Optional[_ExpectHandler] = None,
164 resource: Optional[AbstractResource] = None,
165 ) -> None:
166 if expect_handler is None:
167 expect_handler = _default_expect_handler
169 assert asyncio.iscoroutinefunction(
170 expect_handler
171 ), f"Coroutine is expected, got {expect_handler!r}"
173 method = method.upper()
174 if not HTTP_METHOD_RE.match(method):
175 raise ValueError(f"{method} is not allowed HTTP method")
177 if asyncio.iscoroutinefunction(handler):
178 pass
179 elif isinstance(handler, type) and issubclass(handler, AbstractView):
180 pass
181 else:
182 raise TypeError(
183 "Only async functions are allowed as web-handlers "
184 ", got {!r}".format(handler)
185 )
187 self._method = method
188 self._handler = handler
189 self._expect_handler = expect_handler
190 self._resource = resource
192 @property
193 def method(self) -> str:
194 return self._method
196 @property
197 def handler(self) -> Handler:
198 return self._handler
200 @property
201 @abc.abstractmethod
202 def name(self) -> Optional[str]:
203 """Optional route's name, always equals to resource's name."""
205 @property
206 def resource(self) -> Optional[AbstractResource]:
207 return self._resource
209 @abc.abstractmethod
210 def get_info(self) -> _InfoDict:
211 """Return a dict with additional info useful for introspection"""
213 @abc.abstractmethod # pragma: no branch
214 def url_for(self, *args: str, **kwargs: str) -> URL:
215 """Construct url for route with additional params."""
217 async def handle_expect_header(self, request: Request) -> Optional[StreamResponse]:
218 return await self._expect_handler(request)
221class UrlMappingMatchInfo(BaseDict, AbstractMatchInfo):
222 def __init__(self, match_dict: Dict[str, str], route: AbstractRoute):
223 super().__init__(match_dict)
224 self._route = route
225 self._apps: List[Application] = []
226 self._current_app: Optional[Application] = None
227 self._frozen = False
229 @property
230 def handler(self) -> Handler:
231 return self._route.handler
233 @property
234 def route(self) -> AbstractRoute:
235 return self._route
237 @property
238 def expect_handler(self) -> _ExpectHandler:
239 return self._route.handle_expect_header
241 @property
242 def http_exception(self) -> Optional[HTTPException]:
243 return None
245 def get_info(self) -> _InfoDict: # type: ignore[override]
246 return self._route.get_info()
248 @property
249 def apps(self) -> Tuple["Application", ...]:
250 return tuple(self._apps)
252 def add_app(self, app: "Application") -> None:
253 if self._frozen:
254 raise RuntimeError("Cannot change apps stack after .freeze() call")
255 if self._current_app is None:
256 self._current_app = app
257 self._apps.insert(0, app)
259 @property
260 def current_app(self) -> "Application":
261 app = self._current_app
262 assert app is not None
263 return app
265 @contextmanager
266 def set_current_app(self, app: "Application") -> Generator[None, None, None]:
267 if DEBUG: # pragma: no cover
268 if app not in self._apps:
269 raise RuntimeError(
270 "Expected one of the following apps {!r}, got {!r}".format(
271 self._apps, app
272 )
273 )
274 prev = self._current_app
275 self._current_app = app
276 try:
277 yield
278 finally:
279 self._current_app = prev
281 def freeze(self) -> None:
282 self._frozen = True
284 def __repr__(self) -> str:
285 return f"<MatchInfo {super().__repr__()}: {self._route}>"
288class MatchInfoError(UrlMappingMatchInfo):
289 def __init__(self, http_exception: HTTPException) -> None:
290 self._exception = http_exception
291 super().__init__({}, SystemRoute(self._exception))
293 @property
294 def http_exception(self) -> HTTPException:
295 return self._exception
297 def __repr__(self) -> str:
298 return "<MatchInfoError {}: {}>".format(
299 self._exception.status, self._exception.reason
300 )
303async def _default_expect_handler(request: Request) -> None:
304 """Default handler for Expect header.
306 Just send "100 Continue" to client.
307 raise HTTPExpectationFailed if value of header is not "100-continue"
308 """
309 expect = request.headers.get(hdrs.EXPECT, "")
310 if request.version == HttpVersion11:
311 if expect.lower() == "100-continue":
312 await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n")
313 else:
314 raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect)
317class Resource(AbstractResource):
318 def __init__(self, *, name: Optional[str] = None) -> None:
319 super().__init__(name=name)
320 self._routes: List[ResourceRoute] = []
322 def add_route(
323 self,
324 method: str,
325 handler: Union[Type[AbstractView], Handler],
326 *,
327 expect_handler: Optional[_ExpectHandler] = None,
328 ) -> "ResourceRoute":
329 for route_obj in self._routes:
330 if route_obj.method == method or route_obj.method == hdrs.METH_ANY:
331 raise RuntimeError(
332 "Added route will never be executed, "
333 "method {route.method} is already "
334 "registered".format(route=route_obj)
335 )
337 route_obj = ResourceRoute(method, handler, self, expect_handler=expect_handler)
338 self.register_route(route_obj)
339 return route_obj
341 def register_route(self, route: "ResourceRoute") -> None:
342 assert isinstance(
343 route, ResourceRoute
344 ), f"Instance of Route class is required, got {route!r}"
345 self._routes.append(route)
347 async def resolve(self, request: Request) -> _Resolve:
348 allowed_methods: Set[str] = set()
350 match_dict = self._match(request.rel_url.raw_path)
351 if match_dict is None:
352 return None, allowed_methods
354 for route_obj in self._routes:
355 route_method = route_obj.method
356 allowed_methods.add(route_method)
358 if route_method == request.method or route_method == hdrs.METH_ANY:
359 return (UrlMappingMatchInfo(match_dict, route_obj), allowed_methods)
360 else:
361 return None, allowed_methods
363 @abc.abstractmethod
364 def _match(self, path: str) -> Optional[Dict[str, str]]:
365 pass # pragma: no cover
367 def __len__(self) -> int:
368 return len(self._routes)
370 def __iter__(self) -> Iterator["ResourceRoute"]:
371 return iter(self._routes)
373 # TODO: implement all abstract methods
376class PlainResource(Resource):
377 def __init__(self, path: str, *, name: Optional[str] = None) -> None:
378 super().__init__(name=name)
379 assert not path or path.startswith("/")
380 self._path = path
382 @property
383 def canonical(self) -> str:
384 return self._path
386 def freeze(self) -> None:
387 if not self._path:
388 self._path = "/"
390 def add_prefix(self, prefix: str) -> None:
391 assert prefix.startswith("/")
392 assert not prefix.endswith("/")
393 assert len(prefix) > 1
394 self._path = prefix + self._path
396 def _match(self, path: str) -> Optional[Dict[str, str]]:
397 # string comparison is about 10 times faster than regexp matching
398 if self._path == path:
399 return {}
400 else:
401 return None
403 def raw_match(self, path: str) -> bool:
404 return self._path == path
406 def get_info(self) -> _InfoDict:
407 return {"path": self._path}
409 def url_for(self) -> URL: # type: ignore[override]
410 return URL.build(path=self._path, encoded=True)
412 def __repr__(self) -> str:
413 name = "'" + self.name + "' " if self.name is not None else ""
414 return f"<PlainResource {name} {self._path}>"
417class DynamicResource(Resource):
418 DYN = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}")
419 DYN_WITH_RE = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}")
420 GOOD = r"[^{}/]+"
422 def __init__(self, path: str, *, name: Optional[str] = None) -> None:
423 super().__init__(name=name)
424 pattern = ""
425 formatter = ""
426 for part in ROUTE_RE.split(path):
427 match = self.DYN.fullmatch(part)
428 if match:
429 pattern += "(?P<{}>{})".format(match.group("var"), self.GOOD)
430 formatter += "{" + match.group("var") + "}"
431 continue
433 match = self.DYN_WITH_RE.fullmatch(part)
434 if match:
435 pattern += "(?P<{var}>{re})".format(**match.groupdict())
436 formatter += "{" + match.group("var") + "}"
437 continue
439 if "{" in part or "}" in part:
440 raise ValueError(f"Invalid path '{path}'['{part}']")
442 part = _requote_path(part)
443 formatter += part
444 pattern += re.escape(part)
446 try:
447 compiled = re.compile(pattern)
448 except re.error as exc:
449 raise ValueError(f"Bad pattern '{pattern}': {exc}") from None
450 assert compiled.pattern.startswith(PATH_SEP)
451 assert formatter.startswith("/")
452 self._pattern = compiled
453 self._formatter = formatter
455 @property
456 def canonical(self) -> str:
457 return self._formatter
459 def add_prefix(self, prefix: str) -> None:
460 assert prefix.startswith("/")
461 assert not prefix.endswith("/")
462 assert len(prefix) > 1
463 self._pattern = re.compile(re.escape(prefix) + self._pattern.pattern)
464 self._formatter = prefix + self._formatter
466 def _match(self, path: str) -> Optional[Dict[str, str]]:
467 match = self._pattern.fullmatch(path)
468 if match is None:
469 return None
470 else:
471 return {
472 key: _unquote_path(value) for key, value in match.groupdict().items()
473 }
475 def raw_match(self, path: str) -> bool:
476 return self._formatter == path
478 def get_info(self) -> _InfoDict:
479 return {"formatter": self._formatter, "pattern": self._pattern}
481 def url_for(self, **parts: str) -> URL:
482 url = self._formatter.format_map({k: _quote_path(v) for k, v in parts.items()})
483 return URL.build(path=url, encoded=True)
485 def __repr__(self) -> str:
486 name = "'" + self.name + "' " if self.name is not None else ""
487 return "<DynamicResource {name} {formatter}>".format(
488 name=name, formatter=self._formatter
489 )
492class PrefixResource(AbstractResource):
493 def __init__(self, prefix: str, *, name: Optional[str] = None) -> None:
494 assert not prefix or prefix.startswith("/"), prefix
495 assert prefix in ("", "/") or not prefix.endswith("/"), prefix
496 super().__init__(name=name)
497 self._prefix = _requote_path(prefix)
498 self._prefix2 = self._prefix + "/"
500 @property
501 def canonical(self) -> str:
502 return self._prefix
504 def add_prefix(self, prefix: str) -> None:
505 assert prefix.startswith("/")
506 assert not prefix.endswith("/")
507 assert len(prefix) > 1
508 self._prefix = prefix + self._prefix
509 self._prefix2 = self._prefix + "/"
511 def raw_match(self, prefix: str) -> bool:
512 return False
514 # TODO: impl missing abstract methods
517class StaticResource(PrefixResource):
518 VERSION_KEY = "v"
520 def __init__(
521 self,
522 prefix: str,
523 directory: PathLike,
524 *,
525 name: Optional[str] = None,
526 expect_handler: Optional[_ExpectHandler] = None,
527 chunk_size: int = 256 * 1024,
528 show_index: bool = False,
529 follow_symlinks: bool = False,
530 append_version: bool = False,
531 ) -> None:
532 super().__init__(prefix, name=name)
533 try:
534 directory = Path(directory)
535 if str(directory).startswith("~"):
536 directory = Path(os.path.expanduser(str(directory)))
537 directory = directory.resolve()
538 if not directory.is_dir():
539 raise ValueError("Not a directory")
540 except (FileNotFoundError, ValueError) as error:
541 raise ValueError(f"No directory exists at '{directory}'") from error
542 self._directory = directory
543 self._show_index = show_index
544 self._chunk_size = chunk_size
545 self._follow_symlinks = follow_symlinks
546 self._expect_handler = expect_handler
547 self._append_version = append_version
549 self._routes = {
550 "GET": ResourceRoute(
551 "GET", self._handle, self, expect_handler=expect_handler
552 ),
553 "HEAD": ResourceRoute(
554 "HEAD", self._handle, self, expect_handler=expect_handler
555 ),
556 }
558 def url_for( # type: ignore[override]
559 self,
560 *,
561 filename: PathLike,
562 append_version: Optional[bool] = None,
563 ) -> URL:
564 if append_version is None:
565 append_version = self._append_version
566 filename = str(filename).lstrip("/")
568 url = URL.build(path=self._prefix, encoded=True)
569 # filename is not encoded
570 if YARL_VERSION < (1, 6):
571 url = url / filename.replace("%", "%25")
572 else:
573 url = url / filename
575 if append_version:
576 try:
577 filepath = self._directory.joinpath(filename).resolve()
578 if not self._follow_symlinks:
579 filepath.relative_to(self._directory)
580 except (ValueError, FileNotFoundError):
581 # ValueError for case when path point to symlink
582 # with follow_symlinks is False
583 return url # relatively safe
584 if filepath.is_file():
585 # TODO cache file content
586 # with file watcher for cache invalidation
587 with filepath.open("rb") as f:
588 file_bytes = f.read()
589 h = self._get_file_hash(file_bytes)
590 url = url.with_query({self.VERSION_KEY: h})
591 return url
592 return url
594 @staticmethod
595 def _get_file_hash(byte_array: bytes) -> str:
596 m = hashlib.sha256() # todo sha256 can be configurable param
597 m.update(byte_array)
598 b64 = base64.urlsafe_b64encode(m.digest())
599 return b64.decode("ascii")
601 def get_info(self) -> _InfoDict:
602 return {
603 "directory": self._directory,
604 "prefix": self._prefix,
605 "routes": self._routes,
606 }
608 def set_options_route(self, handler: Handler) -> None:
609 if "OPTIONS" in self._routes:
610 raise RuntimeError("OPTIONS route was set already")
611 self._routes["OPTIONS"] = ResourceRoute(
612 "OPTIONS", handler, self, expect_handler=self._expect_handler
613 )
615 async def resolve(self, request: Request) -> _Resolve:
616 path = request.rel_url.raw_path
617 method = request.method
618 allowed_methods = set(self._routes)
619 if not path.startswith(self._prefix2) and path != self._prefix:
620 return None, set()
622 if method not in allowed_methods:
623 return None, allowed_methods
625 match_dict = {"filename": _unquote_path(path[len(self._prefix) + 1 :])}
626 return (UrlMappingMatchInfo(match_dict, self._routes[method]), allowed_methods)
628 def __len__(self) -> int:
629 return len(self._routes)
631 def __iter__(self) -> Iterator[AbstractRoute]:
632 return iter(self._routes.values())
634 async def _handle(self, request: Request) -> StreamResponse:
635 rel_url = request.match_info["filename"]
636 try:
637 filename = Path(rel_url)
638 if filename.anchor:
639 # rel_url is an absolute name like
640 # /static/\\machine_name\c$ or /static/D:\path
641 # where the static dir is totally different
642 raise HTTPForbidden()
643 filepath = self._directory.joinpath(filename).resolve()
644 if not self._follow_symlinks:
645 filepath.relative_to(self._directory)
646 except (ValueError, FileNotFoundError) as error:
647 # relatively safe
648 raise HTTPNotFound() from error
649 except HTTPForbidden:
650 raise
651 except Exception as error:
652 # perm error or other kind!
653 request.app.logger.exception(error)
654 raise HTTPNotFound() from error
656 # on opening a dir, load its contents if allowed
657 if filepath.is_dir():
658 if self._show_index:
659 try:
660 return Response(
661 text=self._directory_as_html(filepath), content_type="text/html"
662 )
663 except PermissionError:
664 raise HTTPForbidden()
665 else:
666 raise HTTPForbidden()
667 elif filepath.is_file():
668 return FileResponse(filepath, chunk_size=self._chunk_size)
669 else:
670 raise HTTPNotFound
672 def _directory_as_html(self, filepath: Path) -> str:
673 # returns directory's index as html
675 # sanity check
676 assert filepath.is_dir()
678 relative_path_to_dir = filepath.relative_to(self._directory).as_posix()
679 index_of = f"Index of /{relative_path_to_dir}"
680 h1 = f"<h1>{index_of}</h1>"
682 index_list = []
683 dir_index = filepath.iterdir()
684 for _file in sorted(dir_index):
685 # show file url as relative to static path
686 rel_path = _file.relative_to(self._directory).as_posix()
687 file_url = self._prefix + "/" + rel_path
689 # if file is a directory, add '/' to the end of the name
690 if _file.is_dir():
691 file_name = f"{_file.name}/"
692 else:
693 file_name = _file.name
695 index_list.append(
696 '<li><a href="{url}">{name}</a></li>'.format(
697 url=file_url, name=file_name
698 )
699 )
700 ul = "<ul>\n{}\n</ul>".format("\n".join(index_list))
701 body = f"<body>\n{h1}\n{ul}\n</body>"
703 head_str = f"<head>\n<title>{index_of}</title>\n</head>"
704 html = f"<html>\n{head_str}\n{body}\n</html>"
706 return html
708 def __repr__(self) -> str:
709 name = "'" + self.name + "'" if self.name is not None else ""
710 return "<StaticResource {name} {path} -> {directory!r}>".format(
711 name=name, path=self._prefix, directory=self._directory
712 )
715class PrefixedSubAppResource(PrefixResource):
716 def __init__(self, prefix: str, app: "Application") -> None:
717 super().__init__(prefix)
718 self._app = app
719 self._add_prefix_to_resources(prefix)
721 def add_prefix(self, prefix: str) -> None:
722 super().add_prefix(prefix)
723 self._add_prefix_to_resources(prefix)
725 def _add_prefix_to_resources(self, prefix: str) -> None:
726 router = self._app.router
727 for resource in router.resources():
728 # Since the canonical path of a resource is about
729 # to change, we need to unindex it and then reindex
730 router.unindex_resource(resource)
731 resource.add_prefix(prefix)
732 router.index_resource(resource)
734 def url_for(self, *args: str, **kwargs: str) -> URL:
735 raise RuntimeError(".url_for() is not supported " "by sub-application root")
737 def get_info(self) -> _InfoDict:
738 return {"app": self._app, "prefix": self._prefix}
740 async def resolve(self, request: Request) -> _Resolve:
741 match_info = await self._app.router.resolve(request)
742 match_info.add_app(self._app)
743 if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
744 methods = match_info.http_exception.allowed_methods
745 else:
746 methods = set()
747 return match_info, methods
749 def __len__(self) -> int:
750 return len(self._app.router.routes())
752 def __iter__(self) -> Iterator[AbstractRoute]:
753 return iter(self._app.router.routes())
755 def __repr__(self) -> str:
756 return "<PrefixedSubAppResource {prefix} -> {app!r}>".format(
757 prefix=self._prefix, app=self._app
758 )
761class AbstractRuleMatching(abc.ABC):
762 @abc.abstractmethod # pragma: no branch
763 async def match(self, request: Request) -> bool:
764 """Return bool if the request satisfies the criteria"""
766 @abc.abstractmethod # pragma: no branch
767 def get_info(self) -> _InfoDict:
768 """Return a dict with additional info useful for introspection"""
770 @property
771 @abc.abstractmethod # pragma: no branch
772 def canonical(self) -> str:
773 """Return a str"""
776class Domain(AbstractRuleMatching):
777 re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)")
779 def __init__(self, domain: str) -> None:
780 super().__init__()
781 self._domain = self.validation(domain)
783 @property
784 def canonical(self) -> str:
785 return self._domain
787 def validation(self, domain: str) -> str:
788 if not isinstance(domain, str):
789 raise TypeError("Domain must be str")
790 domain = domain.rstrip(".").lower()
791 if not domain:
792 raise ValueError("Domain cannot be empty")
793 elif "://" in domain:
794 raise ValueError("Scheme not supported")
795 url = URL("http://" + domain)
796 assert url.raw_host is not None
797 if not all(self.re_part.fullmatch(x) for x in url.raw_host.split(".")):
798 raise ValueError("Domain not valid")
799 if url.port == 80:
800 return url.raw_host
801 return f"{url.raw_host}:{url.port}"
803 async def match(self, request: Request) -> bool:
804 host = request.headers.get(hdrs.HOST)
805 if not host:
806 return False
807 return self.match_domain(host)
809 def match_domain(self, host: str) -> bool:
810 return host.lower() == self._domain
812 def get_info(self) -> _InfoDict:
813 return {"domain": self._domain}
816class MaskDomain(Domain):
817 re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)")
819 def __init__(self, domain: str) -> None:
820 super().__init__(domain)
821 mask = self._domain.replace(".", r"\.").replace("*", ".*")
822 self._mask = re.compile(mask)
824 @property
825 def canonical(self) -> str:
826 return self._mask.pattern
828 def match_domain(self, host: str) -> bool:
829 return self._mask.fullmatch(host) is not None
832class MatchedSubAppResource(PrefixedSubAppResource):
833 def __init__(self, rule: AbstractRuleMatching, app: "Application") -> None:
834 AbstractResource.__init__(self)
835 self._prefix = ""
836 self._app = app
837 self._rule = rule
839 @property
840 def canonical(self) -> str:
841 return self._rule.canonical
843 def get_info(self) -> _InfoDict:
844 return {"app": self._app, "rule": self._rule}
846 async def resolve(self, request: Request) -> _Resolve:
847 if not await self._rule.match(request):
848 return None, set()
849 match_info = await self._app.router.resolve(request)
850 match_info.add_app(self._app)
851 if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
852 methods = match_info.http_exception.allowed_methods
853 else:
854 methods = set()
855 return match_info, methods
857 def __repr__(self) -> str:
858 return "<MatchedSubAppResource -> {app!r}>" "".format(app=self._app)
861class ResourceRoute(AbstractRoute):
862 """A route with resource"""
864 def __init__(
865 self,
866 method: str,
867 handler: Union[Handler, Type[AbstractView]],
868 resource: AbstractResource,
869 *,
870 expect_handler: Optional[_ExpectHandler] = None,
871 ) -> None:
872 super().__init__(
873 method, handler, expect_handler=expect_handler, resource=resource
874 )
876 def __repr__(self) -> str:
877 return "<ResourceRoute [{method}] {resource} -> {handler!r}".format(
878 method=self.method, resource=self._resource, handler=self.handler
879 )
881 @property
882 def name(self) -> Optional[str]:
883 if self._resource is None:
884 return None
885 return self._resource.name
887 def url_for(self, *args: str, **kwargs: str) -> URL:
888 """Construct url for route with additional params."""
889 assert self._resource is not None
890 return self._resource.url_for(*args, **kwargs)
892 def get_info(self) -> _InfoDict:
893 assert self._resource is not None
894 return self._resource.get_info()
897class SystemRoute(AbstractRoute):
898 def __init__(self, http_exception: HTTPException) -> None:
899 super().__init__(hdrs.METH_ANY, self._handle)
900 self._http_exception = http_exception
902 def url_for(self, *args: str, **kwargs: str) -> URL:
903 raise RuntimeError(".url_for() is not allowed for SystemRoute")
905 @property
906 def name(self) -> Optional[str]:
907 return None
909 def get_info(self) -> _InfoDict:
910 return {"http_exception": self._http_exception}
912 async def _handle(self, request: Request) -> StreamResponse:
913 raise self._http_exception
915 @property
916 def status(self) -> int:
917 return self._http_exception.status
919 @property
920 def reason(self) -> str:
921 return self._http_exception.reason
923 def __repr__(self) -> str:
924 return "<SystemRoute {self.status}: {self.reason}>".format(self=self)
927class View(AbstractView):
928 async def _iter(self) -> StreamResponse:
929 if self.request.method not in hdrs.METH_ALL:
930 self._raise_allowed_methods()
931 method: Optional[Callable[[], Awaitable[StreamResponse]]] = getattr(
932 self, self.request.method.lower(), None
933 )
934 if method is None:
935 self._raise_allowed_methods()
936 return await method()
938 def __await__(self) -> Generator[Any, None, StreamResponse]:
939 return self._iter().__await__()
941 def _raise_allowed_methods(self) -> NoReturn:
942 allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())}
943 raise HTTPMethodNotAllowed(self.request.method, allowed_methods)
946class ResourcesView(Sized, Iterable[AbstractResource], Container[AbstractResource]):
947 def __init__(self, resources: List[AbstractResource]) -> None:
948 self._resources = resources
950 def __len__(self) -> int:
951 return len(self._resources)
953 def __iter__(self) -> Iterator[AbstractResource]:
954 yield from self._resources
956 def __contains__(self, resource: object) -> bool:
957 return resource in self._resources
960class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]):
961 def __init__(self, resources: List[AbstractResource]):
962 self._routes: List[AbstractRoute] = []
963 for resource in resources:
964 for route in resource:
965 self._routes.append(route)
967 def __len__(self) -> int:
968 return len(self._routes)
970 def __iter__(self) -> Iterator[AbstractRoute]:
971 yield from self._routes
973 def __contains__(self, route: object) -> bool:
974 return route in self._routes
977class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]):
978 NAME_SPLIT_RE = re.compile(r"[.:-]")
979 HTTP_NOT_FOUND = HTTPNotFound()
981 def __init__(self) -> None:
982 super().__init__()
983 self._resources: List[AbstractResource] = []
984 self._named_resources: Dict[str, AbstractResource] = {}
985 self._resource_index: dict[str, list[AbstractResource]] = {}
986 self._matched_sub_app_resources: List[MatchedSubAppResource] = []
988 async def resolve(self, request: Request) -> UrlMappingMatchInfo:
989 resource_index = self._resource_index
990 allowed_methods: Set[str] = set()
992 # Walk the url parts looking for candidates. We walk the url backwards
993 # to ensure the most explicit match is found first. If there are multiple
994 # candidates for a given url part because there are multiple resources
995 # registered for the same canonical path, we resolve them in a linear
996 # fashion to ensure registration order is respected.
997 url_part = request.rel_url.raw_path
998 while url_part:
999 for candidate in resource_index.get(url_part, ()):
1000 match_dict, allowed = await candidate.resolve(request)
1001 if match_dict is not None:
1002 return match_dict
1003 else:
1004 allowed_methods |= allowed
1005 if url_part == "/":
1006 break
1007 url_part = url_part.rpartition("/")[0] or "/"
1009 #
1010 # We didn't find any candidates, so we'll try the matched sub-app
1011 # resources which we have to walk in a linear fashion because they
1012 # have regex/wildcard match rules and we cannot index them.
1013 #
1014 # For most cases we do not expect there to be many of these since
1015 # currently they are only added by `add_domain`
1016 #
1017 for resource in self._matched_sub_app_resources:
1018 match_dict, allowed = await resource.resolve(request)
1019 if match_dict is not None:
1020 return match_dict
1021 else:
1022 allowed_methods |= allowed
1024 if allowed_methods:
1025 return MatchInfoError(HTTPMethodNotAllowed(request.method, allowed_methods))
1027 return MatchInfoError(self.HTTP_NOT_FOUND)
1029 def __iter__(self) -> Iterator[str]:
1030 return iter(self._named_resources)
1032 def __len__(self) -> int:
1033 return len(self._named_resources)
1035 def __contains__(self, resource: object) -> bool:
1036 return resource in self._named_resources
1038 def __getitem__(self, name: str) -> AbstractResource:
1039 return self._named_resources[name]
1041 def resources(self) -> ResourcesView:
1042 return ResourcesView(self._resources)
1044 def routes(self) -> RoutesView:
1045 return RoutesView(self._resources)
1047 def named_resources(self) -> Mapping[str, AbstractResource]:
1048 return MappingProxyType(self._named_resources)
1050 def register_resource(self, resource: AbstractResource) -> None:
1051 assert isinstance(
1052 resource, AbstractResource
1053 ), f"Instance of AbstractResource class is required, got {resource!r}"
1054 if self.frozen:
1055 raise RuntimeError("Cannot register a resource into frozen router.")
1057 name = resource.name
1059 if name is not None:
1060 parts = self.NAME_SPLIT_RE.split(name)
1061 for part in parts:
1062 if keyword.iskeyword(part):
1063 raise ValueError(
1064 f"Incorrect route name {name!r}, "
1065 "python keywords cannot be used "
1066 "for route name"
1067 )
1068 if not part.isidentifier():
1069 raise ValueError(
1070 "Incorrect route name {!r}, "
1071 "the name should be a sequence of "
1072 "python identifiers separated "
1073 "by dash, dot or column".format(name)
1074 )
1075 if name in self._named_resources:
1076 raise ValueError(
1077 "Duplicate {!r}, "
1078 "already handled by {!r}".format(name, self._named_resources[name])
1079 )
1080 self._named_resources[name] = resource
1081 self._resources.append(resource)
1083 if isinstance(resource, MatchedSubAppResource):
1084 # We cannot index match sub-app resources because they have match rules
1085 self._matched_sub_app_resources.append(resource)
1086 else:
1087 self.index_resource(resource)
1089 def _get_resource_index_key(self, resource: AbstractResource) -> str:
1090 """Return a key to index the resource in the resource index."""
1091 # strip at the first { to allow for variables
1092 return resource.canonical.partition("{")[0].rstrip("/") or "/"
1094 def index_resource(self, resource: AbstractResource) -> None:
1095 """Add a resource to the resource index."""
1096 resource_key = self._get_resource_index_key(resource)
1097 # There may be multiple resources for a canonical path
1098 # so we keep them in a list to ensure that registration
1099 # order is respected.
1100 self._resource_index.setdefault(resource_key, []).append(resource)
1102 def unindex_resource(self, resource: AbstractResource) -> None:
1103 """Remove a resource from the resource index."""
1104 resource_key = self._get_resource_index_key(resource)
1105 self._resource_index[resource_key].remove(resource)
1107 def add_resource(self, path: str, *, name: Optional[str] = None) -> Resource:
1108 if path and not path.startswith("/"):
1109 raise ValueError("path should be started with / or be empty")
1110 # Reuse last added resource if path and name are the same
1111 if self._resources:
1112 resource = self._resources[-1]
1113 if resource.name == name and resource.raw_match(path):
1114 return cast(Resource, resource)
1115 if not ("{" in path or "}" in path or ROUTE_RE.search(path)):
1116 resource = PlainResource(_requote_path(path), name=name)
1117 self.register_resource(resource)
1118 return resource
1119 resource = DynamicResource(path, name=name)
1120 self.register_resource(resource)
1121 return resource
1123 def add_route(
1124 self,
1125 method: str,
1126 path: str,
1127 handler: Union[Handler, Type[AbstractView]],
1128 *,
1129 name: Optional[str] = None,
1130 expect_handler: Optional[_ExpectHandler] = None,
1131 ) -> AbstractRoute:
1132 resource = self.add_resource(path, name=name)
1133 return resource.add_route(method, handler, expect_handler=expect_handler)
1135 def add_static(
1136 self,
1137 prefix: str,
1138 path: PathLike,
1139 *,
1140 name: Optional[str] = None,
1141 expect_handler: Optional[_ExpectHandler] = None,
1142 chunk_size: int = 256 * 1024,
1143 show_index: bool = False,
1144 follow_symlinks: bool = False,
1145 append_version: bool = False,
1146 ) -> AbstractResource:
1147 """Add static files view.
1149 prefix - url prefix
1150 path - folder with files
1152 """
1153 assert prefix.startswith("/")
1154 if prefix.endswith("/"):
1155 prefix = prefix[:-1]
1156 resource = StaticResource(
1157 prefix,
1158 path,
1159 name=name,
1160 expect_handler=expect_handler,
1161 chunk_size=chunk_size,
1162 show_index=show_index,
1163 follow_symlinks=follow_symlinks,
1164 append_version=append_version,
1165 )
1166 self.register_resource(resource)
1167 return resource
1169 def add_head(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1170 """Shortcut for add_route with method HEAD."""
1171 return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs)
1173 def add_options(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1174 """Shortcut for add_route with method OPTIONS."""
1175 return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs)
1177 def add_get(
1178 self,
1179 path: str,
1180 handler: Handler,
1181 *,
1182 name: Optional[str] = None,
1183 allow_head: bool = True,
1184 **kwargs: Any,
1185 ) -> AbstractRoute:
1186 """Shortcut for add_route with method GET.
1188 If allow_head is true, another
1189 route is added allowing head requests to the same endpoint.
1190 """
1191 resource = self.add_resource(path, name=name)
1192 if allow_head:
1193 resource.add_route(hdrs.METH_HEAD, handler, **kwargs)
1194 return resource.add_route(hdrs.METH_GET, handler, **kwargs)
1196 def add_post(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1197 """Shortcut for add_route with method POST."""
1198 return self.add_route(hdrs.METH_POST, path, handler, **kwargs)
1200 def add_put(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1201 """Shortcut for add_route with method PUT."""
1202 return self.add_route(hdrs.METH_PUT, path, handler, **kwargs)
1204 def add_patch(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1205 """Shortcut for add_route with method PATCH."""
1206 return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs)
1208 def add_delete(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1209 """Shortcut for add_route with method DELETE."""
1210 return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs)
1212 def add_view(
1213 self, path: str, handler: Type[AbstractView], **kwargs: Any
1214 ) -> AbstractRoute:
1215 """Shortcut for add_route with ANY methods for a class-based view."""
1216 return self.add_route(hdrs.METH_ANY, path, handler, **kwargs)
1218 def freeze(self) -> None:
1219 super().freeze()
1220 for resource in self._resources:
1221 resource.freeze()
1223 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]:
1224 """Append routes to route table.
1226 Parameter should be a sequence of RouteDef objects.
1228 Returns a list of registered AbstractRoute instances.
1229 """
1230 registered_routes = []
1231 for route_def in routes:
1232 registered_routes.extend(route_def.register(self))
1233 return registered_routes
1236def _quote_path(value: str) -> str:
1237 if YARL_VERSION < (1, 6):
1238 value = value.replace("%", "%25")
1239 return URL.build(path=value, encoded=False).raw_path
1242def _unquote_path(value: str) -> str:
1243 return URL.build(path=value, encoded=True).path
1246def _requote_path(value: str) -> str:
1247 # Quote non-ascii characters and other characters which must be quoted,
1248 # but preserve existing %-sequences.
1249 result = _quote_path(value)
1250 if "%" in value:
1251 result = result.replace("%25", "%")
1252 return result