Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_urldispatcher.py: 38%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import asyncio
3import base64
4import functools
5import hashlib
6import html
7import inspect
8import keyword
9import os
10import re
11import sys
12from pathlib import Path
13from types import MappingProxyType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Awaitable,
18 Callable,
19 Container,
20 Dict,
21 Final,
22 Generator,
23 Iterable,
24 Iterator,
25 List,
26 Mapping,
27 NoReturn,
28 Optional,
29 Pattern,
30 Set,
31 Sized,
32 Tuple,
33 Type,
34 TypedDict,
35 Union,
36 cast,
37)
39from yarl import URL
41from . import hdrs
42from .abc import AbstractMatchInfo, AbstractRouter, AbstractView
43from .helpers import DEBUG
44from .http import HttpVersion11
45from .typedefs import Handler, PathLike
46from .web_exceptions import (
47 HTTPException,
48 HTTPExpectationFailed,
49 HTTPForbidden,
50 HTTPMethodNotAllowed,
51 HTTPNotFound,
52)
53from .web_fileresponse import FileResponse
54from .web_request import Request
55from .web_response import Response, StreamResponse
56from .web_routedef import AbstractRouteDef
58__all__ = (
59 "UrlDispatcher",
60 "UrlMappingMatchInfo",
61 "AbstractResource",
62 "Resource",
63 "PlainResource",
64 "DynamicResource",
65 "AbstractRoute",
66 "ResourceRoute",
67 "StaticResource",
68 "View",
69)
72if TYPE_CHECKING:
73 from .web_app import Application
75 BaseDict = Dict[str, str]
76else:
77 BaseDict = dict
79CIRCULAR_SYMLINK_ERROR = (
80 (OSError,)
81 if sys.version_info < (3, 10) and sys.platform.startswith("win32")
82 else (RuntimeError,) if sys.version_info < (3, 13) else ()
83)
85HTTP_METHOD_RE: Final[Pattern[str]] = re.compile(
86 r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$"
87)
88ROUTE_RE: Final[Pattern[str]] = re.compile(
89 r"(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})"
90)
91PATH_SEP: Final[str] = re.escape("/")
94_ExpectHandler = Callable[[Request], Awaitable[Optional[StreamResponse]]]
95_Resolve = Tuple[Optional["UrlMappingMatchInfo"], Set[str]]
97html_escape = functools.partial(html.escape, quote=True)
100class _InfoDict(TypedDict, total=False):
101 path: str
103 formatter: str
104 pattern: Pattern[str]
106 directory: Path
107 prefix: str
108 routes: Mapping[str, "AbstractRoute"]
110 app: "Application"
112 domain: str
114 rule: "AbstractRuleMatching"
116 http_exception: HTTPException
119class AbstractResource(Sized, Iterable["AbstractRoute"]):
120 def __init__(self, *, name: Optional[str] = None) -> None:
121 self._name = name
123 @property
124 def name(self) -> Optional[str]:
125 return self._name
127 @property
128 @abc.abstractmethod
129 def canonical(self) -> str:
130 """Exposes the resource's canonical path.
132 For example '/foo/bar/{name}'
134 """
136 @abc.abstractmethod # pragma: no branch
137 def url_for(self, **kwargs: str) -> URL:
138 """Construct url for resource with additional params."""
140 @abc.abstractmethod # pragma: no branch
141 async def resolve(self, request: Request) -> _Resolve:
142 """Resolve resource.
144 Return (UrlMappingMatchInfo, allowed_methods) pair.
145 """
147 @abc.abstractmethod
148 def add_prefix(self, prefix: str) -> None:
149 """Add a prefix to processed URLs.
151 Required for subapplications support.
152 """
154 @abc.abstractmethod
155 def get_info(self) -> _InfoDict:
156 """Return a dict with additional info useful for introspection"""
158 def freeze(self) -> None:
159 pass
161 @abc.abstractmethod
162 def raw_match(self, path: str) -> bool:
163 """Perform a raw match against path"""
166class AbstractRoute(abc.ABC):
167 def __init__(
168 self,
169 method: str,
170 handler: Union[Handler, Type[AbstractView]],
171 *,
172 expect_handler: Optional[_ExpectHandler] = None,
173 resource: Optional[AbstractResource] = None,
174 ) -> None:
175 if expect_handler is None:
176 expect_handler = _default_expect_handler
178 assert inspect.iscoroutinefunction(expect_handler) or (
179 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(expect_handler)
180 ), f"Coroutine is expected, got {expect_handler!r}"
182 method = method.upper()
183 if not HTTP_METHOD_RE.match(method):
184 raise ValueError(f"{method} is not allowed HTTP method")
186 if inspect.iscoroutinefunction(handler) or (
187 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(handler)
188 ):
189 pass
190 elif isinstance(handler, type) and issubclass(handler, AbstractView):
191 pass
192 else:
193 raise TypeError(
194 "Only async functions are allowed as web-handlers "
195 ", got {!r}".format(handler)
196 )
198 self._method = method
199 self._handler = handler
200 self._expect_handler = expect_handler
201 self._resource = resource
203 @property
204 def method(self) -> str:
205 return self._method
207 @property
208 def handler(self) -> Handler:
209 return self._handler
211 @property
212 @abc.abstractmethod
213 def name(self) -> Optional[str]:
214 """Optional route's name, always equals to resource's name."""
216 @property
217 def resource(self) -> Optional[AbstractResource]:
218 return self._resource
220 @abc.abstractmethod
221 def get_info(self) -> _InfoDict:
222 """Return a dict with additional info useful for introspection"""
224 @abc.abstractmethod # pragma: no branch
225 def url_for(self, *args: str, **kwargs: str) -> URL:
226 """Construct url for route with additional params."""
228 async def handle_expect_header(self, request: Request) -> Optional[StreamResponse]:
229 return await self._expect_handler(request)
232class UrlMappingMatchInfo(BaseDict, AbstractMatchInfo):
234 __slots__ = ("_route", "_apps", "_current_app", "_frozen")
236 def __init__(self, match_dict: Dict[str, str], route: AbstractRoute) -> None:
237 super().__init__(match_dict)
238 self._route = route
239 self._apps: List[Application] = []
240 self._current_app: Optional[Application] = None
241 self._frozen = False
243 @property
244 def handler(self) -> Handler:
245 return self._route.handler
247 @property
248 def route(self) -> AbstractRoute:
249 return self._route
251 @property
252 def expect_handler(self) -> _ExpectHandler:
253 return self._route.handle_expect_header
255 @property
256 def http_exception(self) -> Optional[HTTPException]:
257 return None
259 def get_info(self) -> _InfoDict: # type: ignore[override]
260 return self._route.get_info()
262 @property
263 def apps(self) -> Tuple["Application", ...]:
264 return tuple(self._apps)
266 def add_app(self, app: "Application") -> None:
267 if self._frozen:
268 raise RuntimeError("Cannot change apps stack after .freeze() call")
269 if self._current_app is None:
270 self._current_app = app
271 self._apps.insert(0, app)
273 @property
274 def current_app(self) -> "Application":
275 app = self._current_app
276 assert app is not None
277 return app
279 @current_app.setter
280 def current_app(self, app: "Application") -> None:
281 if DEBUG:
282 if app not in self._apps:
283 raise RuntimeError(
284 "Expected one of the following apps {!r}, got {!r}".format(
285 self._apps, app
286 )
287 )
288 self._current_app = app
290 def freeze(self) -> None:
291 self._frozen = True
293 def __repr__(self) -> str:
294 return f"<MatchInfo {super().__repr__()}: {self._route}>"
297class MatchInfoError(UrlMappingMatchInfo):
299 __slots__ = ("_exception",)
301 def __init__(self, http_exception: HTTPException) -> None:
302 self._exception = http_exception
303 super().__init__({}, SystemRoute(self._exception))
305 @property
306 def http_exception(self) -> HTTPException:
307 return self._exception
309 def __repr__(self) -> str:
310 return "<MatchInfoError {}: {}>".format(
311 self._exception.status, self._exception.reason
312 )
315async def _default_expect_handler(request: Request) -> None:
316 """Default handler for Expect header.
318 Just send "100 Continue" to client.
319 raise HTTPExpectationFailed if value of header is not "100-continue"
320 """
321 expect = request.headers.get(hdrs.EXPECT, "")
322 if request.version == HttpVersion11:
323 if expect.lower() == "100-continue":
324 await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n")
325 # Reset output_size as we haven't started the main body yet.
326 request.writer.output_size = 0
327 else:
328 raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect)
331class Resource(AbstractResource):
332 def __init__(self, *, name: Optional[str] = None) -> None:
333 super().__init__(name=name)
334 self._routes: Dict[str, ResourceRoute] = {}
335 self._any_route: Optional[ResourceRoute] = None
336 self._allowed_methods: Set[str] = set()
338 def add_route(
339 self,
340 method: str,
341 handler: Union[Type[AbstractView], Handler],
342 *,
343 expect_handler: Optional[_ExpectHandler] = None,
344 ) -> "ResourceRoute":
345 if route := self._routes.get(method, self._any_route):
346 raise RuntimeError(
347 "Added route will never be executed, "
348 f"method {route.method} is already "
349 "registered"
350 )
352 route_obj = ResourceRoute(method, handler, self, expect_handler=expect_handler)
353 self.register_route(route_obj)
354 return route_obj
356 def register_route(self, route: "ResourceRoute") -> None:
357 assert isinstance(
358 route, ResourceRoute
359 ), f"Instance of Route class is required, got {route!r}"
360 if route.method == hdrs.METH_ANY:
361 self._any_route = route
362 self._allowed_methods.add(route.method)
363 self._routes[route.method] = route
365 async def resolve(self, request: Request) -> _Resolve:
366 if (match_dict := self._match(request.rel_url.path_safe)) is None:
367 return None, set()
368 if route := self._routes.get(request.method, self._any_route):
369 return UrlMappingMatchInfo(match_dict, route), self._allowed_methods
370 return None, self._allowed_methods
372 @abc.abstractmethod
373 def _match(self, path: str) -> Optional[Dict[str, str]]:
374 """Return dict of path values if path matches this resource, otherwise None."""
376 def __len__(self) -> int:
377 return len(self._routes)
379 def __iter__(self) -> Iterator["ResourceRoute"]:
380 return iter(self._routes.values())
382 # TODO: implement all abstract methods
385class PlainResource(Resource):
386 def __init__(self, path: str, *, name: Optional[str] = None) -> None:
387 super().__init__(name=name)
388 assert not path or path.startswith("/")
389 self._path = path
391 @property
392 def canonical(self) -> str:
393 return self._path
395 def freeze(self) -> None:
396 if not self._path:
397 self._path = "/"
399 def add_prefix(self, prefix: str) -> None:
400 assert prefix.startswith("/")
401 assert not prefix.endswith("/")
402 assert len(prefix) > 1
403 self._path = prefix + self._path
405 def _match(self, path: str) -> Optional[Dict[str, str]]:
406 # string comparison is about 10 times faster than regexp matching
407 if self._path == path:
408 return {}
409 return None
411 def raw_match(self, path: str) -> bool:
412 return self._path == path
414 def get_info(self) -> _InfoDict:
415 return {"path": self._path}
417 def url_for(self) -> URL: # type: ignore[override]
418 return URL.build(path=self._path, encoded=True)
420 def __repr__(self) -> str:
421 name = "'" + self.name + "' " if self.name is not None else ""
422 return f"<PlainResource {name} {self._path}>"
425class DynamicResource(Resource):
426 DYN = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}")
427 DYN_WITH_RE = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}")
428 GOOD = r"[^{}/]+"
430 def __init__(self, path: str, *, name: Optional[str] = None) -> None:
431 super().__init__(name=name)
432 self._orig_path = path
433 pattern = ""
434 formatter = ""
435 for part in ROUTE_RE.split(path):
436 match = self.DYN.fullmatch(part)
437 if match:
438 pattern += "(?P<{}>{})".format(match.group("var"), self.GOOD)
439 formatter += "{" + match.group("var") + "}"
440 continue
442 match = self.DYN_WITH_RE.fullmatch(part)
443 if match:
444 pattern += "(?P<{var}>{re})".format(**match.groupdict())
445 formatter += "{" + match.group("var") + "}"
446 continue
448 if "{" in part or "}" in part:
449 raise ValueError(f"Invalid path '{path}'['{part}']")
451 part = _requote_path(part)
452 formatter += part
453 pattern += re.escape(part)
455 try:
456 compiled = re.compile(pattern)
457 except re.error as exc:
458 raise ValueError(f"Bad pattern '{pattern}': {exc}") from None
459 assert compiled.pattern.startswith(PATH_SEP)
460 assert formatter.startswith("/")
461 self._pattern = compiled
462 self._formatter = formatter
464 @property
465 def canonical(self) -> str:
466 return self._formatter
468 def add_prefix(self, prefix: str) -> None:
469 assert prefix.startswith("/")
470 assert not prefix.endswith("/")
471 assert len(prefix) > 1
472 self._pattern = re.compile(re.escape(prefix) + self._pattern.pattern)
473 self._formatter = prefix + self._formatter
475 def _match(self, path: str) -> Optional[Dict[str, str]]:
476 match = self._pattern.fullmatch(path)
477 if match is None:
478 return None
479 return {
480 key: _unquote_path_safe(value) for key, value in match.groupdict().items()
481 }
483 def raw_match(self, path: str) -> bool:
484 return self._orig_path == path
486 def get_info(self) -> _InfoDict:
487 return {"formatter": self._formatter, "pattern": self._pattern}
489 def url_for(self, **parts: str) -> URL:
490 url = self._formatter.format_map({k: _quote_path(v) for k, v in parts.items()})
491 return URL.build(path=url, encoded=True)
493 def __repr__(self) -> str:
494 name = "'" + self.name + "' " if self.name is not None else ""
495 return "<DynamicResource {name} {formatter}>".format(
496 name=name, formatter=self._formatter
497 )
500class PrefixResource(AbstractResource):
501 def __init__(self, prefix: str, *, name: Optional[str] = None) -> None:
502 assert not prefix or prefix.startswith("/"), prefix
503 assert prefix in ("", "/") or not prefix.endswith("/"), prefix
504 super().__init__(name=name)
505 self._prefix = _requote_path(prefix)
506 self._prefix2 = self._prefix + "/"
508 @property
509 def canonical(self) -> str:
510 return self._prefix
512 def add_prefix(self, prefix: str) -> None:
513 assert prefix.startswith("/")
514 assert not prefix.endswith("/")
515 assert len(prefix) > 1
516 self._prefix = prefix + self._prefix
517 self._prefix2 = self._prefix + "/"
519 def raw_match(self, prefix: str) -> bool:
520 return False
522 # TODO: impl missing abstract methods
525class StaticResource(PrefixResource):
526 VERSION_KEY = "v"
528 def __init__(
529 self,
530 prefix: str,
531 directory: PathLike,
532 *,
533 name: Optional[str] = None,
534 expect_handler: Optional[_ExpectHandler] = None,
535 chunk_size: int = 256 * 1024,
536 show_index: bool = False,
537 follow_symlinks: bool = False,
538 append_version: bool = False,
539 ) -> None:
540 super().__init__(prefix, name=name)
541 try:
542 directory = Path(directory).expanduser().resolve(strict=True)
543 except FileNotFoundError as error:
544 raise ValueError(f"'{directory}' does not exist") from error
545 if not directory.is_dir():
546 raise ValueError(f"'{directory}' is not a directory")
547 self._directory = directory
548 self._show_index = show_index
549 self._chunk_size = chunk_size
550 self._follow_symlinks = follow_symlinks
551 self._expect_handler = expect_handler
552 self._append_version = append_version
554 self._routes = {
555 "GET": ResourceRoute(
556 "GET", self._handle, self, expect_handler=expect_handler
557 ),
558 "HEAD": ResourceRoute(
559 "HEAD", self._handle, self, expect_handler=expect_handler
560 ),
561 }
562 self._allowed_methods = set(self._routes)
564 def url_for( # type: ignore[override]
565 self,
566 *,
567 filename: PathLike,
568 append_version: Optional[bool] = None,
569 ) -> URL:
570 if append_version is None:
571 append_version = self._append_version
572 filename = str(filename).lstrip("/")
574 url = URL.build(path=self._prefix, encoded=True)
575 # filename is not encoded
576 url = url / filename
578 if append_version:
579 unresolved_path = self._directory.joinpath(filename)
580 try:
581 if self._follow_symlinks:
582 normalized_path = Path(os.path.normpath(unresolved_path))
583 normalized_path.relative_to(self._directory)
584 filepath = normalized_path.resolve()
585 else:
586 filepath = unresolved_path.resolve()
587 filepath.relative_to(self._directory)
588 except (ValueError, FileNotFoundError):
589 # ValueError for case when path point to symlink
590 # with follow_symlinks is False
591 return url # relatively safe
592 if filepath.is_file():
593 # TODO cache file content
594 # with file watcher for cache invalidation
595 with filepath.open("rb") as f:
596 file_bytes = f.read()
597 h = self._get_file_hash(file_bytes)
598 url = url.with_query({self.VERSION_KEY: h})
599 return url
600 return url
602 @staticmethod
603 def _get_file_hash(byte_array: bytes) -> str:
604 m = hashlib.sha256() # todo sha256 can be configurable param
605 m.update(byte_array)
606 b64 = base64.urlsafe_b64encode(m.digest())
607 return b64.decode("ascii")
609 def get_info(self) -> _InfoDict:
610 return {
611 "directory": self._directory,
612 "prefix": self._prefix,
613 "routes": self._routes,
614 }
616 def set_options_route(self, handler: Handler) -> None:
617 if "OPTIONS" in self._routes:
618 raise RuntimeError("OPTIONS route was set already")
619 self._routes["OPTIONS"] = ResourceRoute(
620 "OPTIONS", handler, self, expect_handler=self._expect_handler
621 )
622 self._allowed_methods.add("OPTIONS")
624 async def resolve(self, request: Request) -> _Resolve:
625 path = request.rel_url.path_safe
626 method = request.method
627 if not path.startswith(self._prefix2) and path != self._prefix:
628 return None, set()
630 allowed_methods = self._allowed_methods
631 if method not in allowed_methods:
632 return None, allowed_methods
634 match_dict = {"filename": _unquote_path_safe(path[len(self._prefix) + 1 :])}
635 return (UrlMappingMatchInfo(match_dict, self._routes[method]), allowed_methods)
637 def __len__(self) -> int:
638 return len(self._routes)
640 def __iter__(self) -> Iterator[AbstractRoute]:
641 return iter(self._routes.values())
643 async def _handle(self, request: Request) -> StreamResponse:
644 rel_url = request.match_info["filename"]
645 filename = Path(rel_url)
646 if filename.anchor:
647 # rel_url is an absolute name like
648 # /static/\\machine_name\c$ or /static/D:\path
649 # where the static dir is totally different
650 raise HTTPForbidden()
652 unresolved_path = self._directory.joinpath(filename)
653 loop = asyncio.get_running_loop()
654 return await loop.run_in_executor(
655 None, self._resolve_path_to_response, unresolved_path
656 )
658 def _resolve_path_to_response(self, unresolved_path: Path) -> StreamResponse:
659 """Take the unresolved path and query the file system to form a response."""
660 # Check for access outside the root directory. For follow symlinks, URI
661 # cannot traverse out, but symlinks can. Otherwise, no access outside
662 # root is permitted.
663 try:
664 if self._follow_symlinks:
665 normalized_path = Path(os.path.normpath(unresolved_path))
666 normalized_path.relative_to(self._directory)
667 file_path = normalized_path.resolve()
668 else:
669 file_path = unresolved_path.resolve()
670 file_path.relative_to(self._directory)
671 except (ValueError, *CIRCULAR_SYMLINK_ERROR) as error:
672 # ValueError is raised for the relative check. Circular symlinks
673 # raise here on resolving for python < 3.13.
674 raise HTTPNotFound() from error
676 # if path is a directory, return the contents if permitted. Note the
677 # directory check will raise if a segment is not readable.
678 try:
679 if file_path.is_dir():
680 if self._show_index:
681 return Response(
682 text=self._directory_as_html(file_path),
683 content_type="text/html",
684 )
685 else:
686 raise HTTPForbidden()
687 except PermissionError as error:
688 raise HTTPForbidden() from error
690 # Return the file response, which handles all other checks.
691 return FileResponse(file_path, chunk_size=self._chunk_size)
693 def _directory_as_html(self, dir_path: Path) -> str:
694 """returns directory's index as html."""
695 assert dir_path.is_dir()
697 relative_path_to_dir = dir_path.relative_to(self._directory).as_posix()
698 index_of = f"Index of /{html_escape(relative_path_to_dir)}"
699 h1 = f"<h1>{index_of}</h1>"
701 index_list = []
702 dir_index = dir_path.iterdir()
703 for _file in sorted(dir_index):
704 # show file url as relative to static path
705 rel_path = _file.relative_to(self._directory).as_posix()
706 quoted_file_url = _quote_path(f"{self._prefix}/{rel_path}")
708 # if file is a directory, add '/' to the end of the name
709 if _file.is_dir():
710 file_name = f"{_file.name}/"
711 else:
712 file_name = _file.name
714 index_list.append(
715 f'<li><a href="{quoted_file_url}">{html_escape(file_name)}</a></li>'
716 )
717 ul = "<ul>\n{}\n</ul>".format("\n".join(index_list))
718 body = f"<body>\n{h1}\n{ul}\n</body>"
720 head_str = f"<head>\n<title>{index_of}</title>\n</head>"
721 html = f"<html>\n{head_str}\n{body}\n</html>"
723 return html
725 def __repr__(self) -> str:
726 name = "'" + self.name + "'" if self.name is not None else ""
727 return "<StaticResource {name} {path} -> {directory!r}>".format(
728 name=name, path=self._prefix, directory=self._directory
729 )
732class PrefixedSubAppResource(PrefixResource):
733 def __init__(self, prefix: str, app: "Application") -> None:
734 super().__init__(prefix)
735 self._app = app
736 self._add_prefix_to_resources(prefix)
738 def add_prefix(self, prefix: str) -> None:
739 super().add_prefix(prefix)
740 self._add_prefix_to_resources(prefix)
742 def _add_prefix_to_resources(self, prefix: str) -> None:
743 router = self._app.router
744 for resource in router.resources():
745 # Since the canonical path of a resource is about
746 # to change, we need to unindex it and then reindex
747 router.unindex_resource(resource)
748 resource.add_prefix(prefix)
749 router.index_resource(resource)
751 def url_for(self, *args: str, **kwargs: str) -> URL:
752 raise RuntimeError(".url_for() is not supported by sub-application root")
754 def get_info(self) -> _InfoDict:
755 return {"app": self._app, "prefix": self._prefix}
757 async def resolve(self, request: Request) -> _Resolve:
758 match_info = await self._app.router.resolve(request)
759 match_info.add_app(self._app)
760 if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
761 methods = match_info.http_exception.allowed_methods
762 else:
763 methods = set()
764 return match_info, methods
766 def __len__(self) -> int:
767 return len(self._app.router.routes())
769 def __iter__(self) -> Iterator[AbstractRoute]:
770 return iter(self._app.router.routes())
772 def __repr__(self) -> str:
773 return "<PrefixedSubAppResource {prefix} -> {app!r}>".format(
774 prefix=self._prefix, app=self._app
775 )
778class AbstractRuleMatching(abc.ABC):
779 @abc.abstractmethod # pragma: no branch
780 async def match(self, request: Request) -> bool:
781 """Return bool if the request satisfies the criteria"""
783 @abc.abstractmethod # pragma: no branch
784 def get_info(self) -> _InfoDict:
785 """Return a dict with additional info useful for introspection"""
787 @property
788 @abc.abstractmethod # pragma: no branch
789 def canonical(self) -> str:
790 """Return a str"""
793class Domain(AbstractRuleMatching):
794 re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)")
796 def __init__(self, domain: str) -> None:
797 super().__init__()
798 self._domain = self.validation(domain)
800 @property
801 def canonical(self) -> str:
802 return self._domain
804 def validation(self, domain: str) -> str:
805 if not isinstance(domain, str):
806 raise TypeError("Domain must be str")
807 domain = domain.rstrip(".").lower()
808 if not domain:
809 raise ValueError("Domain cannot be empty")
810 elif "://" in domain:
811 raise ValueError("Scheme not supported")
812 url = URL("http://" + domain)
813 assert url.raw_host is not None
814 if not all(self.re_part.fullmatch(x) for x in url.raw_host.split(".")):
815 raise ValueError("Domain not valid")
816 if url.port == 80:
817 return url.raw_host
818 return f"{url.raw_host}:{url.port}"
820 async def match(self, request: Request) -> bool:
821 host = request.headers.get(hdrs.HOST)
822 if not host:
823 return False
824 return self.match_domain(host)
826 def match_domain(self, host: str) -> bool:
827 return host.lower() == self._domain
829 def get_info(self) -> _InfoDict:
830 return {"domain": self._domain}
833class MaskDomain(Domain):
834 re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)")
836 def __init__(self, domain: str) -> None:
837 super().__init__(domain)
838 mask = self._domain.replace(".", r"\.").replace("*", ".*")
839 self._mask = re.compile(mask)
841 @property
842 def canonical(self) -> str:
843 return self._mask.pattern
845 def match_domain(self, host: str) -> bool:
846 return self._mask.fullmatch(host) is not None
849class MatchedSubAppResource(PrefixedSubAppResource):
850 def __init__(self, rule: AbstractRuleMatching, app: "Application") -> None:
851 AbstractResource.__init__(self)
852 self._prefix = ""
853 self._app = app
854 self._rule = rule
856 @property
857 def canonical(self) -> str:
858 return self._rule.canonical
860 def get_info(self) -> _InfoDict:
861 return {"app": self._app, "rule": self._rule}
863 async def resolve(self, request: Request) -> _Resolve:
864 if not await self._rule.match(request):
865 return None, set()
866 match_info = await self._app.router.resolve(request)
867 match_info.add_app(self._app)
868 if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
869 methods = match_info.http_exception.allowed_methods
870 else:
871 methods = set()
872 return match_info, methods
874 def __repr__(self) -> str:
875 return f"<MatchedSubAppResource -> {self._app!r}>"
878class ResourceRoute(AbstractRoute):
879 """A route with resource"""
881 def __init__(
882 self,
883 method: str,
884 handler: Union[Handler, Type[AbstractView]],
885 resource: AbstractResource,
886 *,
887 expect_handler: Optional[_ExpectHandler] = None,
888 ) -> None:
889 super().__init__(
890 method, handler, expect_handler=expect_handler, resource=resource
891 )
893 def __repr__(self) -> str:
894 return "<ResourceRoute [{method}] {resource} -> {handler!r}".format(
895 method=self.method, resource=self._resource, handler=self.handler
896 )
898 @property
899 def name(self) -> Optional[str]:
900 if self._resource is None:
901 return None
902 return self._resource.name
904 def url_for(self, *args: str, **kwargs: str) -> URL:
905 """Construct url for route with additional params."""
906 assert self._resource is not None
907 return self._resource.url_for(*args, **kwargs)
909 def get_info(self) -> _InfoDict:
910 assert self._resource is not None
911 return self._resource.get_info()
914class SystemRoute(AbstractRoute):
915 def __init__(self, http_exception: HTTPException) -> None:
916 super().__init__(hdrs.METH_ANY, self._handle)
917 self._http_exception = http_exception
919 def url_for(self, *args: str, **kwargs: str) -> URL:
920 raise RuntimeError(".url_for() is not allowed for SystemRoute")
922 @property
923 def name(self) -> Optional[str]:
924 return None
926 def get_info(self) -> _InfoDict:
927 return {"http_exception": self._http_exception}
929 async def _handle(self, request: Request) -> StreamResponse:
930 raise self._http_exception
932 @property
933 def status(self) -> int:
934 return self._http_exception.status
936 @property
937 def reason(self) -> str:
938 return self._http_exception.reason
940 def __repr__(self) -> str:
941 return "<SystemRoute {self.status}: {self.reason}>".format(self=self)
944class View(AbstractView):
945 async def _iter(self) -> StreamResponse:
946 if self.request.method not in hdrs.METH_ALL:
947 self._raise_allowed_methods()
948 method: Optional[Callable[[], Awaitable[StreamResponse]]] = getattr(
949 self, self.request.method.lower(), None
950 )
951 if method is None:
952 self._raise_allowed_methods()
953 return await method()
955 def __await__(self) -> Generator[None, None, StreamResponse]:
956 return self._iter().__await__()
958 def _raise_allowed_methods(self) -> NoReturn:
959 allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())}
960 raise HTTPMethodNotAllowed(self.request.method, allowed_methods)
963class ResourcesView(Sized, Iterable[AbstractResource], Container[AbstractResource]):
964 def __init__(self, resources: List[AbstractResource]) -> None:
965 self._resources = resources
967 def __len__(self) -> int:
968 return len(self._resources)
970 def __iter__(self) -> Iterator[AbstractResource]:
971 yield from self._resources
973 def __contains__(self, resource: object) -> bool:
974 return resource in self._resources
977class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]):
978 def __init__(self, resources: List[AbstractResource]):
979 self._routes: List[AbstractRoute] = []
980 for resource in resources:
981 for route in resource:
982 self._routes.append(route)
984 def __len__(self) -> int:
985 return len(self._routes)
987 def __iter__(self) -> Iterator[AbstractRoute]:
988 yield from self._routes
990 def __contains__(self, route: object) -> bool:
991 return route in self._routes
994class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]):
995 NAME_SPLIT_RE = re.compile(r"[.:-]")
996 HTTP_NOT_FOUND = HTTPNotFound()
998 def __init__(self) -> None:
999 super().__init__()
1000 self._resources: List[AbstractResource] = []
1001 self._named_resources: Dict[str, AbstractResource] = {}
1002 self._resource_index: dict[str, list[AbstractResource]] = {}
1003 self._matched_sub_app_resources: List[MatchedSubAppResource] = []
1005 async def resolve(self, request: Request) -> UrlMappingMatchInfo:
1006 resource_index = self._resource_index
1007 allowed_methods: Set[str] = set()
1009 # Walk the url parts looking for candidates. We walk the url backwards
1010 # to ensure the most explicit match is found first. If there are multiple
1011 # candidates for a given url part because there are multiple resources
1012 # registered for the same canonical path, we resolve them in a linear
1013 # fashion to ensure registration order is respected.
1014 url_part = request.rel_url.path_safe
1015 while url_part:
1016 for candidate in resource_index.get(url_part, ()):
1017 match_dict, allowed = await candidate.resolve(request)
1018 if match_dict is not None:
1019 return match_dict
1020 else:
1021 allowed_methods |= allowed
1022 if url_part == "/":
1023 break
1024 url_part = url_part.rpartition("/")[0] or "/"
1026 #
1027 # We didn't find any candidates, so we'll try the matched sub-app
1028 # resources which we have to walk in a linear fashion because they
1029 # have regex/wildcard match rules and we cannot index them.
1030 #
1031 # For most cases we do not expect there to be many of these since
1032 # currently they are only added by `add_domain`
1033 #
1034 for resource in self._matched_sub_app_resources:
1035 match_dict, allowed = await resource.resolve(request)
1036 if match_dict is not None:
1037 return match_dict
1038 else:
1039 allowed_methods |= allowed
1041 if allowed_methods:
1042 return MatchInfoError(HTTPMethodNotAllowed(request.method, allowed_methods))
1044 return MatchInfoError(self.HTTP_NOT_FOUND)
1046 def __iter__(self) -> Iterator[str]:
1047 return iter(self._named_resources)
1049 def __len__(self) -> int:
1050 return len(self._named_resources)
1052 def __contains__(self, resource: object) -> bool:
1053 return resource in self._named_resources
1055 def __getitem__(self, name: str) -> AbstractResource:
1056 return self._named_resources[name]
1058 def resources(self) -> ResourcesView:
1059 return ResourcesView(self._resources)
1061 def routes(self) -> RoutesView:
1062 return RoutesView(self._resources)
1064 def named_resources(self) -> Mapping[str, AbstractResource]:
1065 return MappingProxyType(self._named_resources)
1067 def register_resource(self, resource: AbstractResource) -> None:
1068 assert isinstance(
1069 resource, AbstractResource
1070 ), f"Instance of AbstractResource class is required, got {resource!r}"
1071 if self.frozen:
1072 raise RuntimeError("Cannot register a resource into frozen router.")
1074 name = resource.name
1076 if name is not None:
1077 parts = self.NAME_SPLIT_RE.split(name)
1078 for part in parts:
1079 if keyword.iskeyword(part):
1080 raise ValueError(
1081 f"Incorrect route name {name!r}, "
1082 "python keywords cannot be used "
1083 "for route name"
1084 )
1085 if not part.isidentifier():
1086 raise ValueError(
1087 "Incorrect route name {!r}, "
1088 "the name should be a sequence of "
1089 "python identifiers separated "
1090 "by dash, dot or column".format(name)
1091 )
1092 if name in self._named_resources:
1093 raise ValueError(
1094 "Duplicate {!r}, "
1095 "already handled by {!r}".format(name, self._named_resources[name])
1096 )
1097 self._named_resources[name] = resource
1098 self._resources.append(resource)
1100 if isinstance(resource, MatchedSubAppResource):
1101 # We cannot index match sub-app resources because they have match rules
1102 self._matched_sub_app_resources.append(resource)
1103 else:
1104 self.index_resource(resource)
1106 def _get_resource_index_key(self, resource: AbstractResource) -> str:
1107 """Return a key to index the resource in the resource index."""
1108 if "{" in (index_key := resource.canonical):
1109 # strip at the first { to allow for variables, and than
1110 # rpartition at / to allow for variable parts in the path
1111 # For example if the canonical path is `/core/locations{tail:.*}`
1112 # the index key will be `/core` since index is based on the
1113 # url parts split by `/`
1114 index_key = index_key.partition("{")[0].rpartition("/")[0]
1115 return index_key.rstrip("/") or "/"
1117 def index_resource(self, resource: AbstractResource) -> None:
1118 """Add a resource to the resource index."""
1119 resource_key = self._get_resource_index_key(resource)
1120 # There may be multiple resources for a canonical path
1121 # so we keep them in a list to ensure that registration
1122 # order is respected.
1123 self._resource_index.setdefault(resource_key, []).append(resource)
1125 def unindex_resource(self, resource: AbstractResource) -> None:
1126 """Remove a resource from the resource index."""
1127 resource_key = self._get_resource_index_key(resource)
1128 self._resource_index[resource_key].remove(resource)
1130 def add_resource(self, path: str, *, name: Optional[str] = None) -> Resource:
1131 if path and not path.startswith("/"):
1132 raise ValueError("path should be started with / or be empty")
1133 # Reuse last added resource if path and name are the same
1134 if self._resources:
1135 resource = self._resources[-1]
1136 if resource.name == name and resource.raw_match(path):
1137 return cast(Resource, resource)
1138 if not ("{" in path or "}" in path or ROUTE_RE.search(path)):
1139 resource = PlainResource(path, name=name)
1140 self.register_resource(resource)
1141 return resource
1142 resource = DynamicResource(path, name=name)
1143 self.register_resource(resource)
1144 return resource
1146 def add_route(
1147 self,
1148 method: str,
1149 path: str,
1150 handler: Union[Handler, Type[AbstractView]],
1151 *,
1152 name: Optional[str] = None,
1153 expect_handler: Optional[_ExpectHandler] = None,
1154 ) -> AbstractRoute:
1155 resource = self.add_resource(path, name=name)
1156 return resource.add_route(method, handler, expect_handler=expect_handler)
1158 def add_static(
1159 self,
1160 prefix: str,
1161 path: PathLike,
1162 *,
1163 name: Optional[str] = None,
1164 expect_handler: Optional[_ExpectHandler] = None,
1165 chunk_size: int = 256 * 1024,
1166 show_index: bool = False,
1167 follow_symlinks: bool = False,
1168 append_version: bool = False,
1169 ) -> StaticResource:
1170 """Add static files view.
1172 prefix - url prefix
1173 path - folder with files
1175 """
1176 assert prefix.startswith("/")
1177 if prefix.endswith("/"):
1178 prefix = prefix[:-1]
1179 resource = StaticResource(
1180 prefix,
1181 path,
1182 name=name,
1183 expect_handler=expect_handler,
1184 chunk_size=chunk_size,
1185 show_index=show_index,
1186 follow_symlinks=follow_symlinks,
1187 append_version=append_version,
1188 )
1189 self.register_resource(resource)
1190 return resource
1192 def add_head(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1193 """Shortcut for add_route with method HEAD."""
1194 return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs)
1196 def add_options(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1197 """Shortcut for add_route with method OPTIONS."""
1198 return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs)
1200 def add_get(
1201 self,
1202 path: str,
1203 handler: Handler,
1204 *,
1205 name: Optional[str] = None,
1206 allow_head: bool = True,
1207 **kwargs: Any,
1208 ) -> AbstractRoute:
1209 """Shortcut for add_route with method GET.
1211 If allow_head is true, another
1212 route is added allowing head requests to the same endpoint.
1213 """
1214 resource = self.add_resource(path, name=name)
1215 if allow_head:
1216 resource.add_route(hdrs.METH_HEAD, handler, **kwargs)
1217 return resource.add_route(hdrs.METH_GET, handler, **kwargs)
1219 def add_post(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1220 """Shortcut for add_route with method POST."""
1221 return self.add_route(hdrs.METH_POST, path, handler, **kwargs)
1223 def add_put(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1224 """Shortcut for add_route with method PUT."""
1225 return self.add_route(hdrs.METH_PUT, path, handler, **kwargs)
1227 def add_patch(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1228 """Shortcut for add_route with method PATCH."""
1229 return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs)
1231 def add_delete(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1232 """Shortcut for add_route with method DELETE."""
1233 return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs)
1235 def add_view(
1236 self, path: str, handler: Type[AbstractView], **kwargs: Any
1237 ) -> AbstractRoute:
1238 """Shortcut for add_route with ANY methods for a class-based view."""
1239 return self.add_route(hdrs.METH_ANY, path, handler, **kwargs)
1241 def freeze(self) -> None:
1242 super().freeze()
1243 for resource in self._resources:
1244 resource.freeze()
1246 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]:
1247 """Append routes to route table.
1249 Parameter should be a sequence of RouteDef objects.
1251 Returns a list of registered AbstractRoute instances.
1252 """
1253 registered_routes = []
1254 for route_def in routes:
1255 registered_routes.extend(route_def.register(self))
1256 return registered_routes
1259def _quote_path(value: str) -> str:
1260 return URL.build(path=value, encoded=False).raw_path
1263def _unquote_path_safe(value: str) -> str:
1264 if "%" not in value:
1265 return value
1266 return value.replace("%2F", "/").replace("%25", "%")
1269def _requote_path(value: str) -> str:
1270 # Quote non-ascii characters and other characters which must be quoted,
1271 # but preserve existing %-sequences.
1272 result = _quote_path(value)
1273 if "%" in value:
1274 result = result.replace("%25", "%")
1275 return result