Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_urldispatcher.py: 38%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import asyncio
3import base64
4import functools
5import hashlib
6import html
7import inspect
8import keyword
9import os
10import platform
11import re
12import sys
13from collections.abc import (
14 Awaitable,
15 Callable,
16 Container,
17 Generator,
18 Iterable,
19 Iterator,
20 Mapping,
21 Sized,
22)
23from pathlib import Path
24from re import Pattern
25from types import MappingProxyType
26from typing import TYPE_CHECKING, Any, Final, NoReturn, Optional, TypedDict, cast
28from yarl import URL
30from . import hdrs
31from .abc import AbstractMatchInfo, AbstractRouter, AbstractView
32from .helpers import DEBUG
33from .http import HttpVersion11
34from .typedefs import Handler, PathLike
35from .web_exceptions import (
36 HTTPException,
37 HTTPExpectationFailed,
38 HTTPForbidden,
39 HTTPMethodNotAllowed,
40 HTTPNotFound,
41)
42from .web_fileresponse import FileResponse
43from .web_request import Request
44from .web_response import Response, StreamResponse
45from .web_routedef import AbstractRouteDef
47__all__ = (
48 "UrlDispatcher",
49 "UrlMappingMatchInfo",
50 "AbstractResource",
51 "Resource",
52 "PlainResource",
53 "DynamicResource",
54 "AbstractRoute",
55 "ResourceRoute",
56 "StaticResource",
57 "View",
58)
61if TYPE_CHECKING:
62 from .web_app import Application
64CIRCULAR_SYMLINK_ERROR = (RuntimeError,) if sys.version_info < (3, 13) else ()
66HTTP_METHOD_RE: Final[Pattern[str]] = re.compile(
67 r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$"
68)
69ROUTE_RE: Final[Pattern[str]] = re.compile(
70 r"(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})"
71)
72PATH_SEP: Final[str] = re.escape("/")
74IS_WINDOWS: Final[bool] = platform.system() == "Windows"
76_ExpectHandler = Callable[[Request], Awaitable[StreamResponse | None]]
77_Resolve = tuple[Optional["UrlMappingMatchInfo"], set[str]]
79html_escape = functools.partial(html.escape, quote=True)
82class _InfoDict(TypedDict, total=False):
83 path: str
85 formatter: str
86 pattern: Pattern[str]
88 directory: Path
89 prefix: str
90 routes: Mapping[str, "AbstractRoute"]
92 app: "Application"
94 domain: str
96 rule: "AbstractRuleMatching"
98 http_exception: HTTPException
101class AbstractResource(Sized, Iterable["AbstractRoute"]):
102 def __init__(self, *, name: str | None = None) -> None:
103 self._name = name
105 @property
106 def name(self) -> str | None:
107 return self._name
109 @property
110 @abc.abstractmethod
111 def canonical(self) -> str:
112 """Exposes the resource's canonical path.
114 For example '/foo/bar/{name}'
116 """
118 @abc.abstractmethod # pragma: no branch
119 def url_for(self, **kwargs: str) -> URL:
120 """Construct url for resource with additional params."""
122 @abc.abstractmethod # pragma: no branch
123 async def resolve(self, request: Request) -> _Resolve:
124 """Resolve resource.
126 Return (UrlMappingMatchInfo, allowed_methods) pair.
127 """
129 @abc.abstractmethod
130 def add_prefix(self, prefix: str) -> None:
131 """Add a prefix to processed URLs.
133 Required for subapplications support.
134 """
136 @abc.abstractmethod
137 def get_info(self) -> _InfoDict:
138 """Return a dict with additional info useful for introspection"""
140 def freeze(self) -> None:
141 pass
143 @abc.abstractmethod
144 def raw_match(self, path: str) -> bool:
145 """Perform a raw match against path"""
148class AbstractRoute(abc.ABC):
149 def __init__(
150 self,
151 method: str,
152 handler: Handler | type[AbstractView],
153 *,
154 expect_handler: _ExpectHandler | None = None,
155 resource: AbstractResource | None = None,
156 ) -> None:
157 if expect_handler is None:
158 expect_handler = _default_expect_handler
160 assert inspect.iscoroutinefunction(expect_handler) or (
161 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(expect_handler)
162 ), f"Coroutine is expected, got {expect_handler!r}"
164 method = method.upper()
165 if not HTTP_METHOD_RE.match(method):
166 raise ValueError(f"{method} is not allowed HTTP method")
168 if inspect.iscoroutinefunction(handler) or (
169 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(handler)
170 ):
171 pass
172 elif isinstance(handler, type) and issubclass(handler, AbstractView):
173 pass
174 else:
175 raise TypeError(
176 f"Only async functions are allowed as web-handlers, got {handler!r}"
177 )
179 self._method = method
180 self._handler = handler
181 self._expect_handler = expect_handler
182 self._resource = resource
184 @property
185 def method(self) -> str:
186 return self._method
188 @property
189 def handler(self) -> Handler:
190 return self._handler
192 @property
193 @abc.abstractmethod
194 def name(self) -> str | None:
195 """Optional route's name, always equals to resource's name."""
197 @property
198 def resource(self) -> AbstractResource | None:
199 return self._resource
201 @abc.abstractmethod
202 def get_info(self) -> _InfoDict:
203 """Return a dict with additional info useful for introspection"""
205 @abc.abstractmethod # pragma: no branch
206 def url_for(self, *args: str, **kwargs: str) -> URL:
207 """Construct url for route with additional params."""
209 async def handle_expect_header(self, request: Request) -> StreamResponse | None:
210 return await self._expect_handler(request)
213class UrlMappingMatchInfo(dict[str, str], AbstractMatchInfo):
215 __slots__ = ("_route", "_apps", "_current_app", "_frozen")
217 def __init__(self, match_dict: dict[str, str], route: AbstractRoute) -> None:
218 super().__init__(match_dict)
219 self._route = route
220 self._apps: list[Application] = []
221 self._current_app: Application | None = None
222 self._frozen = False
224 @property
225 def handler(self) -> Handler:
226 return self._route.handler
228 @property
229 def route(self) -> AbstractRoute:
230 return self._route
232 @property
233 def expect_handler(self) -> _ExpectHandler:
234 return self._route.handle_expect_header
236 @property
237 def http_exception(self) -> HTTPException | None:
238 return None
240 def get_info(self) -> _InfoDict: # type: ignore[override]
241 return self._route.get_info()
243 @property
244 def apps(self) -> tuple["Application", ...]:
245 return tuple(self._apps)
247 def add_app(self, app: "Application") -> None:
248 if self._frozen:
249 raise RuntimeError("Cannot change apps stack after .freeze() call")
250 if self._current_app is None:
251 self._current_app = app
252 self._apps.insert(0, app)
254 @property
255 def current_app(self) -> "Application":
256 app = self._current_app
257 assert app is not None
258 return app
260 @current_app.setter
261 def current_app(self, app: "Application") -> None:
262 if DEBUG:
263 if app not in self._apps:
264 raise RuntimeError(
265 f"Expected one of the following apps {self._apps!r}, got {app!r}"
266 )
267 self._current_app = app
269 def freeze(self) -> None:
270 self._frozen = True
272 def __repr__(self) -> str:
273 return f"<MatchInfo {super().__repr__()}: {self._route}>"
276class MatchInfoError(UrlMappingMatchInfo):
278 __slots__ = ("_exception",)
280 def __init__(self, http_exception: HTTPException) -> None:
281 self._exception = http_exception
282 super().__init__({}, SystemRoute(self._exception))
284 @property
285 def http_exception(self) -> HTTPException:
286 return self._exception
288 def __repr__(self) -> str:
289 return f"<MatchInfoError {self._exception.status}: {self._exception.reason}>"
292async def _default_expect_handler(request: Request) -> None:
293 """Default handler for Expect header.
295 Just send "100 Continue" to client.
296 raise HTTPExpectationFailed if value of header is not "100-continue"
297 """
298 expect = request.headers.get(hdrs.EXPECT, "")
299 if request.version == HttpVersion11:
300 if expect.lower() == "100-continue":
301 await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n")
302 # Reset output_size as we haven't started the main body yet.
303 request.writer.output_size = 0
304 else:
305 raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect)
308class Resource(AbstractResource):
309 def __init__(self, *, name: str | None = None) -> None:
310 super().__init__(name=name)
311 self._routes: dict[str, ResourceRoute] = {}
312 self._any_route: ResourceRoute | None = None
313 self._allowed_methods: set[str] = set()
315 def add_route(
316 self,
317 method: str,
318 handler: type[AbstractView] | Handler,
319 *,
320 expect_handler: _ExpectHandler | None = None,
321 ) -> "ResourceRoute":
322 if route := self._routes.get(method, self._any_route):
323 raise RuntimeError(
324 "Added route will never be executed, "
325 f"method {route.method} is already "
326 "registered"
327 )
329 route_obj = ResourceRoute(method, handler, self, expect_handler=expect_handler)
330 self.register_route(route_obj)
331 return route_obj
333 def register_route(self, route: "ResourceRoute") -> None:
334 assert isinstance(
335 route, ResourceRoute
336 ), f"Instance of Route class is required, got {route!r}"
337 if route.method == hdrs.METH_ANY:
338 self._any_route = route
339 self._allowed_methods.add(route.method)
340 self._routes[route.method] = route
342 async def resolve(self, request: Request) -> _Resolve:
343 if (match_dict := self._match(request.rel_url.path_safe)) is None:
344 return None, set()
345 if route := self._routes.get(request.method, self._any_route):
346 return UrlMappingMatchInfo(match_dict, route), self._allowed_methods
347 return None, self._allowed_methods
349 @abc.abstractmethod
350 def _match(self, path: str) -> dict[str, str] | None:
351 """Return dict of path values if path matches this resource, otherwise None."""
353 def __len__(self) -> int:
354 return len(self._routes)
356 def __iter__(self) -> Iterator["ResourceRoute"]:
357 return iter(self._routes.values())
359 # TODO: implement all abstract methods
362class PlainResource(Resource):
363 def __init__(self, path: str, *, name: str | None = None) -> None:
364 super().__init__(name=name)
365 assert not path or path.startswith("/")
366 self._path = path
368 @property
369 def canonical(self) -> str:
370 return self._path
372 def freeze(self) -> None:
373 if not self._path:
374 self._path = "/"
376 def add_prefix(self, prefix: str) -> None:
377 assert prefix.startswith("/")
378 assert not prefix.endswith("/")
379 assert len(prefix) > 1
380 self._path = prefix + self._path
382 def _match(self, path: str) -> dict[str, str] | None:
383 # string comparison is about 10 times faster than regexp matching
384 if self._path == path:
385 return {}
386 return None
388 def raw_match(self, path: str) -> bool:
389 return self._path == path
391 def get_info(self) -> _InfoDict:
392 return {"path": self._path}
394 def url_for(self) -> URL: # type: ignore[override]
395 return URL.build(path=self._path, encoded=True)
397 def __repr__(self) -> str:
398 name = "'" + self.name + "' " if self.name is not None else ""
399 return f"<PlainResource {name} {self._path}>"
402class DynamicResource(Resource):
403 DYN = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}")
404 DYN_WITH_RE = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}")
405 GOOD = r"[^{}/]+"
407 def __init__(self, path: str, *, name: str | None = None) -> None:
408 super().__init__(name=name)
409 self._orig_path = path
410 pattern = ""
411 formatter = ""
412 for part in ROUTE_RE.split(path):
413 match = self.DYN.fullmatch(part)
414 if match:
415 pattern += "(?P<{}>{})".format(match.group("var"), self.GOOD)
416 formatter += "{" + match.group("var") + "}"
417 continue
419 match = self.DYN_WITH_RE.fullmatch(part)
420 if match:
421 pattern += "(?P<{var}>{re})".format(**match.groupdict())
422 formatter += "{" + match.group("var") + "}"
423 continue
425 if "{" in part or "}" in part:
426 raise ValueError(f"Invalid path '{path}'['{part}']")
428 part = _requote_path(part)
429 formatter += part
430 pattern += re.escape(part)
432 try:
433 compiled = re.compile(pattern)
434 except re.error as exc:
435 raise ValueError(f"Bad pattern '{pattern}': {exc}") from None
436 assert compiled.pattern.startswith(PATH_SEP)
437 assert formatter.startswith("/")
438 self._pattern = compiled
439 self._formatter = formatter
441 @property
442 def canonical(self) -> str:
443 return self._formatter
445 def add_prefix(self, prefix: str) -> None:
446 assert prefix.startswith("/")
447 assert not prefix.endswith("/")
448 assert len(prefix) > 1
449 self._pattern = re.compile(re.escape(prefix) + self._pattern.pattern)
450 self._formatter = prefix + self._formatter
452 def _match(self, path: str) -> dict[str, str] | None:
453 match = self._pattern.fullmatch(path)
454 if match is None:
455 return None
456 return {
457 key: _unquote_path_safe(value) for key, value in match.groupdict().items()
458 }
460 def raw_match(self, path: str) -> bool:
461 return self._orig_path == path
463 def get_info(self) -> _InfoDict:
464 return {"formatter": self._formatter, "pattern": self._pattern}
466 def url_for(self, **parts: str) -> URL:
467 url = self._formatter.format_map({k: _quote_path(v) for k, v in parts.items()})
468 return URL.build(path=url, encoded=True)
470 def __repr__(self) -> str:
471 name = "'" + self.name + "' " if self.name is not None else ""
472 return f"<DynamicResource {name} {self._formatter}>"
475class PrefixResource(AbstractResource):
476 def __init__(self, prefix: str, *, name: str | None = None) -> None:
477 assert not prefix or prefix.startswith("/"), prefix
478 assert prefix in ("", "/") or not prefix.endswith("/"), prefix
479 super().__init__(name=name)
480 self._prefix = _requote_path(prefix)
481 self._prefix2 = self._prefix + "/"
483 @property
484 def canonical(self) -> str:
485 return self._prefix
487 def add_prefix(self, prefix: str) -> None:
488 assert prefix.startswith("/")
489 assert not prefix.endswith("/")
490 assert len(prefix) > 1
491 self._prefix = prefix + self._prefix
492 self._prefix2 = self._prefix + "/"
494 def raw_match(self, prefix: str) -> bool:
495 return False
497 # TODO: impl missing abstract methods
500class StaticResource(PrefixResource):
501 VERSION_KEY = "v"
503 def __init__(
504 self,
505 prefix: str,
506 directory: PathLike,
507 *,
508 name: str | None = None,
509 expect_handler: _ExpectHandler | None = None,
510 chunk_size: int = 256 * 1024,
511 show_index: bool = False,
512 follow_symlinks: bool = False,
513 append_version: bool = False,
514 ) -> None:
515 super().__init__(prefix, name=name)
516 try:
517 directory = Path(directory).expanduser().resolve(strict=True)
518 except FileNotFoundError as error:
519 raise ValueError(f"'{directory}' does not exist") from error
520 if not directory.is_dir():
521 raise ValueError(f"'{directory}' is not a directory")
522 self._directory = directory
523 self._show_index = show_index
524 self._chunk_size = chunk_size
525 self._follow_symlinks = follow_symlinks
526 self._expect_handler = expect_handler
527 self._append_version = append_version
529 self._routes = {
530 "GET": ResourceRoute(
531 "GET", self._handle, self, expect_handler=expect_handler
532 ),
533 "HEAD": ResourceRoute(
534 "HEAD", self._handle, self, expect_handler=expect_handler
535 ),
536 }
537 self._allowed_methods = set(self._routes)
539 def url_for( # type: ignore[override]
540 self,
541 *,
542 filename: PathLike,
543 append_version: bool | None = None,
544 ) -> URL:
545 if append_version is None:
546 append_version = self._append_version
547 filename = str(filename).lstrip("/")
549 url = URL.build(path=self._prefix, encoded=True)
550 # filename is not encoded
551 url = url / filename
553 if append_version:
554 unresolved_path = self._directory.joinpath(filename)
555 try:
556 if self._follow_symlinks:
557 normalized_path = Path(os.path.normpath(unresolved_path))
558 normalized_path.relative_to(self._directory)
559 filepath = normalized_path.resolve()
560 else:
561 filepath = unresolved_path.resolve()
562 filepath.relative_to(self._directory)
563 except (ValueError, FileNotFoundError):
564 # ValueError for case when path point to symlink
565 # with follow_symlinks is False
566 return url # relatively safe
567 if filepath.is_file():
568 # TODO cache file content
569 # with file watcher for cache invalidation
570 with filepath.open("rb") as f:
571 file_bytes = f.read()
572 h = self._get_file_hash(file_bytes)
573 url = url.with_query({self.VERSION_KEY: h})
574 return url
575 return url
577 @staticmethod
578 def _get_file_hash(byte_array: bytes) -> str:
579 m = hashlib.sha256() # todo sha256 can be configurable param
580 m.update(byte_array)
581 b64 = base64.urlsafe_b64encode(m.digest())
582 return b64.decode("ascii")
584 def get_info(self) -> _InfoDict:
585 return {
586 "directory": self._directory,
587 "prefix": self._prefix,
588 "routes": self._routes,
589 }
591 def set_options_route(self, handler: Handler) -> None:
592 if "OPTIONS" in self._routes:
593 raise RuntimeError("OPTIONS route was set already")
594 self._routes["OPTIONS"] = ResourceRoute(
595 "OPTIONS", handler, self, expect_handler=self._expect_handler
596 )
597 self._allowed_methods.add("OPTIONS")
599 async def resolve(self, request: Request) -> _Resolve:
600 path = request.rel_url.path_safe
601 method = request.method
602 # We normalise here to avoid matches that traverse below the static root.
603 # e.g. /static/../../../../home/user/webapp/static/
604 norm_path = os.path.normpath(path)
605 if IS_WINDOWS:
606 norm_path = norm_path.replace("\\", "/")
607 if not norm_path.startswith(self._prefix2) and norm_path != self._prefix:
608 return None, set()
610 allowed_methods = self._allowed_methods
611 if method not in allowed_methods:
612 return None, allowed_methods
614 match_dict = {"filename": _unquote_path_safe(path[len(self._prefix) + 1 :])}
615 return (UrlMappingMatchInfo(match_dict, self._routes[method]), allowed_methods)
617 def __len__(self) -> int:
618 return len(self._routes)
620 def __iter__(self) -> Iterator[AbstractRoute]:
621 return iter(self._routes.values())
623 async def _handle(self, request: Request) -> StreamResponse:
624 filename = request.match_info["filename"]
625 if Path(filename).is_absolute():
626 # filename is an absolute path e.g. //network/share or D:\path
627 # which could be a UNC path leading to NTLM credential theft
628 raise HTTPNotFound()
629 unresolved_path = self._directory.joinpath(filename)
630 loop = asyncio.get_running_loop()
631 return await loop.run_in_executor(
632 None, self._resolve_path_to_response, unresolved_path
633 )
635 def _resolve_path_to_response(self, unresolved_path: Path) -> StreamResponse:
636 """Take the unresolved path and query the file system to form a response."""
637 # Check for access outside the root directory. For follow symlinks, URI
638 # cannot traverse out, but symlinks can. Otherwise, no access outside
639 # root is permitted.
640 try:
641 if self._follow_symlinks:
642 normalized_path = Path(os.path.normpath(unresolved_path))
643 normalized_path.relative_to(self._directory)
644 file_path = normalized_path.resolve()
645 else:
646 file_path = unresolved_path.resolve()
647 file_path.relative_to(self._directory)
648 except (ValueError, *CIRCULAR_SYMLINK_ERROR) as error:
649 # ValueError is raised for the relative check. Circular symlinks
650 # raise here on resolving for python < 3.13.
651 raise HTTPNotFound() from error
653 # if path is a directory, return the contents if permitted. Note the
654 # directory check will raise if a segment is not readable.
655 try:
656 if file_path.is_dir():
657 if self._show_index:
658 return Response(
659 text=self._directory_as_html(file_path),
660 content_type="text/html",
661 )
662 else:
663 raise HTTPForbidden()
664 except PermissionError as error:
665 raise HTTPForbidden() from error
667 # Return the file response, which handles all other checks.
668 return FileResponse(file_path, chunk_size=self._chunk_size)
670 def _directory_as_html(self, dir_path: Path) -> str:
671 """returns directory's index as html."""
672 assert dir_path.is_dir()
674 relative_path_to_dir = dir_path.relative_to(self._directory).as_posix()
675 index_of = f"Index of /{html_escape(relative_path_to_dir)}"
676 h1 = f"<h1>{index_of}</h1>"
678 index_list = []
679 dir_index = dir_path.iterdir()
680 for _file in sorted(dir_index):
681 # show file url as relative to static path
682 rel_path = _file.relative_to(self._directory).as_posix()
683 quoted_file_url = _quote_path(f"{self._prefix}/{rel_path}")
685 # if file is a directory, add '/' to the end of the name
686 if _file.is_dir():
687 file_name = f"{_file.name}/"
688 else:
689 file_name = _file.name
691 index_list.append(
692 f'<li><a href="{quoted_file_url}">{html_escape(file_name)}</a></li>'
693 )
694 ul = "<ul>\n{}\n</ul>".format("\n".join(index_list))
695 body = f"<body>\n{h1}\n{ul}\n</body>"
697 head_str = f"<head>\n<title>{index_of}</title>\n</head>"
698 html = f"<html>\n{head_str}\n{body}\n</html>"
700 return html
702 def __repr__(self) -> str:
703 name = "'" + self.name + "'" if self.name is not None else ""
704 return f"<StaticResource {name} {self._prefix} -> {self._directory!r}>"
707class PrefixedSubAppResource(PrefixResource):
708 def __init__(self, prefix: str, app: "Application") -> None:
709 super().__init__(prefix)
710 self._app = app
711 self._add_prefix_to_resources(prefix)
713 def add_prefix(self, prefix: str) -> None:
714 super().add_prefix(prefix)
715 self._add_prefix_to_resources(prefix)
717 def _add_prefix_to_resources(self, prefix: str) -> None:
718 router = self._app.router
719 for resource in router.resources():
720 # Since the canonical path of a resource is about
721 # to change, we need to unindex it and then reindex
722 router.unindex_resource(resource)
723 resource.add_prefix(prefix)
724 router.index_resource(resource)
726 def url_for(self, *args: str, **kwargs: str) -> URL:
727 raise RuntimeError(".url_for() is not supported by sub-application root")
729 def get_info(self) -> _InfoDict:
730 return {"app": self._app, "prefix": self._prefix}
732 async def resolve(self, request: Request) -> _Resolve:
733 match_info = await self._app.router.resolve(request)
734 match_info.add_app(self._app)
735 if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
736 methods = match_info.http_exception.allowed_methods
737 else:
738 methods = set()
739 return match_info, methods
741 def __len__(self) -> int:
742 return len(self._app.router.routes())
744 def __iter__(self) -> Iterator[AbstractRoute]:
745 return iter(self._app.router.routes())
747 def __repr__(self) -> str:
748 return f"<PrefixedSubAppResource {self._prefix} -> {self._app!r}>"
751class AbstractRuleMatching(abc.ABC):
752 @abc.abstractmethod # pragma: no branch
753 async def match(self, request: Request) -> bool:
754 """Return bool if the request satisfies the criteria"""
756 @abc.abstractmethod # pragma: no branch
757 def get_info(self) -> _InfoDict:
758 """Return a dict with additional info useful for introspection"""
760 @property
761 @abc.abstractmethod # pragma: no branch
762 def canonical(self) -> str:
763 """Return a str"""
766class Domain(AbstractRuleMatching):
767 re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)")
769 def __init__(self, domain: str) -> None:
770 super().__init__()
771 self._domain = self.validation(domain)
773 @property
774 def canonical(self) -> str:
775 return self._domain
777 def validation(self, domain: str) -> str:
778 if not isinstance(domain, str):
779 raise TypeError("Domain must be str")
780 domain = domain.rstrip(".").lower()
781 if not domain:
782 raise ValueError("Domain cannot be empty")
783 elif "://" in domain:
784 raise ValueError("Scheme not supported")
785 url = URL("http://" + domain)
786 assert url.raw_host is not None
787 if not all(self.re_part.fullmatch(x) for x in url.raw_host.split(".")):
788 raise ValueError("Domain not valid")
789 if url.port == 80:
790 return url.raw_host
791 return f"{url.raw_host}:{url.port}"
793 async def match(self, request: Request) -> bool:
794 host = request.headers.get(hdrs.HOST)
795 if not host:
796 return False
797 return self.match_domain(host)
799 def match_domain(self, host: str) -> bool:
800 return host.lower() == self._domain
802 def get_info(self) -> _InfoDict:
803 return {"domain": self._domain}
806class MaskDomain(Domain):
807 re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)")
809 def __init__(self, domain: str) -> None:
810 super().__init__(domain)
811 mask = self._domain.replace(".", r"\.").replace("*", ".*")
812 self._mask = re.compile(mask)
814 @property
815 def canonical(self) -> str:
816 return self._mask.pattern
818 def match_domain(self, host: str) -> bool:
819 return self._mask.fullmatch(host) is not None
822class MatchedSubAppResource(PrefixedSubAppResource):
823 def __init__(self, rule: AbstractRuleMatching, app: "Application") -> None:
824 AbstractResource.__init__(self)
825 self._prefix = ""
826 self._app = app
827 self._rule = rule
829 @property
830 def canonical(self) -> str:
831 return self._rule.canonical
833 def get_info(self) -> _InfoDict:
834 return {"app": self._app, "rule": self._rule}
836 async def resolve(self, request: Request) -> _Resolve:
837 if not await self._rule.match(request):
838 return None, set()
839 match_info = await self._app.router.resolve(request)
840 match_info.add_app(self._app)
841 if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
842 methods = match_info.http_exception.allowed_methods
843 else:
844 methods = set()
845 return match_info, methods
847 def __repr__(self) -> str:
848 return f"<MatchedSubAppResource -> {self._app!r}>"
851class ResourceRoute(AbstractRoute):
852 """A route with resource"""
854 def __init__(
855 self,
856 method: str,
857 handler: Handler | type[AbstractView],
858 resource: AbstractResource,
859 *,
860 expect_handler: _ExpectHandler | None = None,
861 ) -> None:
862 super().__init__(
863 method, handler, expect_handler=expect_handler, resource=resource
864 )
866 def __repr__(self) -> str:
867 return f"<ResourceRoute [{self.method}] {self._resource} -> {self.handler!r}"
869 @property
870 def name(self) -> str | None:
871 if self._resource is None:
872 return None
873 return self._resource.name
875 def url_for(self, *args: str, **kwargs: str) -> URL:
876 """Construct url for route with additional params."""
877 assert self._resource is not None
878 return self._resource.url_for(*args, **kwargs)
880 def get_info(self) -> _InfoDict:
881 assert self._resource is not None
882 return self._resource.get_info()
885class SystemRoute(AbstractRoute):
886 def __init__(self, http_exception: HTTPException) -> None:
887 super().__init__(hdrs.METH_ANY, self._handle)
888 self._http_exception = http_exception
890 def url_for(self, *args: str, **kwargs: str) -> URL:
891 raise RuntimeError(".url_for() is not allowed for SystemRoute")
893 @property
894 def name(self) -> str | None:
895 return None
897 def get_info(self) -> _InfoDict:
898 return {"http_exception": self._http_exception}
900 async def _handle(self, request: Request) -> StreamResponse:
901 raise self._http_exception
903 @property
904 def status(self) -> int:
905 return self._http_exception.status
907 @property
908 def reason(self) -> str:
909 return self._http_exception.reason
911 def __repr__(self) -> str:
912 return f"<SystemRoute {self.status}: {self.reason}>"
915class View(AbstractView):
916 async def _iter(self) -> StreamResponse:
917 if self.request.method not in hdrs.METH_ALL:
918 self._raise_allowed_methods()
919 method: Callable[[], Awaitable[StreamResponse]] | None = getattr(
920 self, self.request.method.lower(), None
921 )
922 if method is None:
923 self._raise_allowed_methods()
924 return await method()
926 def __await__(self) -> Generator[None, None, StreamResponse]:
927 return self._iter().__await__()
929 def _raise_allowed_methods(self) -> NoReturn:
930 allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())}
931 raise HTTPMethodNotAllowed(self.request.method, allowed_methods)
934class ResourcesView(Sized, Iterable[AbstractResource], Container[AbstractResource]):
935 def __init__(self, resources: list[AbstractResource]) -> None:
936 self._resources = resources
938 def __len__(self) -> int:
939 return len(self._resources)
941 def __iter__(self) -> Iterator[AbstractResource]:
942 yield from self._resources
944 def __contains__(self, resource: object) -> bool:
945 return resource in self._resources
948class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]):
949 def __init__(self, resources: list[AbstractResource]):
950 self._routes: list[AbstractRoute] = []
951 for resource in resources:
952 for route in resource:
953 self._routes.append(route)
955 def __len__(self) -> int:
956 return len(self._routes)
958 def __iter__(self) -> Iterator[AbstractRoute]:
959 yield from self._routes
961 def __contains__(self, route: object) -> bool:
962 return route in self._routes
965class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]):
966 NAME_SPLIT_RE = re.compile(r"[.:-]")
967 HTTP_NOT_FOUND = HTTPNotFound()
969 def __init__(self) -> None:
970 super().__init__()
971 self._resources: list[AbstractResource] = []
972 self._named_resources: dict[str, AbstractResource] = {}
973 self._resource_index: dict[str, list[AbstractResource]] = {}
974 self._matched_sub_app_resources: list[MatchedSubAppResource] = []
976 async def resolve(self, request: Request) -> UrlMappingMatchInfo:
977 resource_index = self._resource_index
978 allowed_methods: set[str] = set()
980 # MatchedSubAppResource is primarily used to match on domain names
981 # (though custom rules could match on other things). This means that
982 # the traversal algorithm below can't be applied, and that we likely
983 # need to check these first so a sub app that defines the same path
984 # as a parent app will get priority if there's a domain match.
985 #
986 # For most cases we do not expect there to be many of these since
987 # currently they are only added by `.add_domain()`.
988 for resource in self._matched_sub_app_resources:
989 match_dict, allowed = await resource.resolve(request)
990 if match_dict is not None:
991 return match_dict
992 else:
993 allowed_methods |= allowed
995 # Walk the url parts looking for candidates. We walk the url backwards
996 # to ensure the most explicit match is found first. If there are multiple
997 # candidates for a given url part because there are multiple resources
998 # registered for the same canonical path, we resolve them in a linear
999 # fashion to ensure registration order is respected.
1000 url_part = request.rel_url.path_safe
1001 while url_part:
1002 for candidate in resource_index.get(url_part, ()):
1003 match_dict, allowed = await candidate.resolve(request)
1004 if match_dict is not None:
1005 return match_dict
1006 else:
1007 allowed_methods |= allowed
1008 if url_part == "/":
1009 break
1010 url_part = url_part.rpartition("/")[0] or "/"
1012 if allowed_methods:
1013 return MatchInfoError(HTTPMethodNotAllowed(request.method, allowed_methods))
1015 return MatchInfoError(self.HTTP_NOT_FOUND)
1017 def __iter__(self) -> Iterator[str]:
1018 return iter(self._named_resources)
1020 def __len__(self) -> int:
1021 return len(self._named_resources)
1023 def __contains__(self, resource: object) -> bool:
1024 return resource in self._named_resources
1026 def __getitem__(self, name: str) -> AbstractResource:
1027 return self._named_resources[name]
1029 def resources(self) -> ResourcesView:
1030 return ResourcesView(self._resources)
1032 def routes(self) -> RoutesView:
1033 return RoutesView(self._resources)
1035 def named_resources(self) -> Mapping[str, AbstractResource]:
1036 return MappingProxyType(self._named_resources)
1038 def register_resource(self, resource: AbstractResource) -> None:
1039 assert isinstance(
1040 resource, AbstractResource
1041 ), f"Instance of AbstractResource class is required, got {resource!r}"
1042 if self.frozen:
1043 raise RuntimeError("Cannot register a resource into frozen router.")
1045 name = resource.name
1047 if name is not None:
1048 parts = self.NAME_SPLIT_RE.split(name)
1049 for part in parts:
1050 if keyword.iskeyword(part):
1051 raise ValueError(
1052 f"Incorrect route name {name!r}, "
1053 "python keywords cannot be used "
1054 "for route name"
1055 )
1056 if not part.isidentifier():
1057 raise ValueError(
1058 f"Incorrect route name {name!r}, "
1059 "the name should be a sequence of "
1060 "python identifiers separated "
1061 "by dash, dot or column"
1062 )
1063 if name in self._named_resources:
1064 raise ValueError(
1065 f"Duplicate {name!r}, "
1066 f"already handled by {self._named_resources[name]!r}"
1067 )
1068 self._named_resources[name] = resource
1069 self._resources.append(resource)
1071 if isinstance(resource, MatchedSubAppResource):
1072 # We cannot index match sub-app resources because they have match rules
1073 self._matched_sub_app_resources.append(resource)
1074 else:
1075 self.index_resource(resource)
1077 def _get_resource_index_key(self, resource: AbstractResource) -> str:
1078 """Return a key to index the resource in the resource index."""
1079 if "{" in (index_key := resource.canonical):
1080 # strip at the first { to allow for variables, and than
1081 # rpartition at / to allow for variable parts in the path
1082 # For example if the canonical path is `/core/locations{tail:.*}`
1083 # the index key will be `/core` since index is based on the
1084 # url parts split by `/`
1085 index_key = index_key.partition("{")[0].rpartition("/")[0]
1086 return index_key.rstrip("/") or "/"
1088 def index_resource(self, resource: AbstractResource) -> None:
1089 """Add a resource to the resource index."""
1090 resource_key = self._get_resource_index_key(resource)
1091 # There may be multiple resources for a canonical path
1092 # so we keep them in a list to ensure that registration
1093 # order is respected.
1094 self._resource_index.setdefault(resource_key, []).append(resource)
1096 def unindex_resource(self, resource: AbstractResource) -> None:
1097 """Remove a resource from the resource index."""
1098 resource_key = self._get_resource_index_key(resource)
1099 self._resource_index[resource_key].remove(resource)
1101 def add_resource(self, path: str, *, name: str | None = None) -> Resource:
1102 if path and not path.startswith("/"):
1103 raise ValueError("path should be started with / or be empty")
1104 # Reuse last added resource if path and name are the same
1105 if self._resources:
1106 resource = self._resources[-1]
1107 if resource.name == name and resource.raw_match(path):
1108 return cast(Resource, resource)
1109 if not ("{" in path or "}" in path or ROUTE_RE.search(path)):
1110 resource = PlainResource(path, name=name)
1111 self.register_resource(resource)
1112 return resource
1113 resource = DynamicResource(path, name=name)
1114 self.register_resource(resource)
1115 return resource
1117 def add_route(
1118 self,
1119 method: str,
1120 path: str,
1121 handler: Handler | type[AbstractView],
1122 *,
1123 name: str | None = None,
1124 expect_handler: _ExpectHandler | None = None,
1125 ) -> AbstractRoute:
1126 resource = self.add_resource(path, name=name)
1127 return resource.add_route(method, handler, expect_handler=expect_handler)
1129 def add_static(
1130 self,
1131 prefix: str,
1132 path: PathLike,
1133 *,
1134 name: str | None = None,
1135 expect_handler: _ExpectHandler | None = None,
1136 chunk_size: int = 256 * 1024,
1137 show_index: bool = False,
1138 follow_symlinks: bool = False,
1139 append_version: bool = False,
1140 ) -> StaticResource:
1141 """Add static files view.
1143 prefix - url prefix
1144 path - folder with files
1146 """
1147 assert prefix.startswith("/")
1148 if prefix.endswith("/"):
1149 prefix = prefix[:-1]
1150 resource = StaticResource(
1151 prefix,
1152 path,
1153 name=name,
1154 expect_handler=expect_handler,
1155 chunk_size=chunk_size,
1156 show_index=show_index,
1157 follow_symlinks=follow_symlinks,
1158 append_version=append_version,
1159 )
1160 self.register_resource(resource)
1161 return resource
1163 def add_head(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1164 """Shortcut for add_route with method HEAD."""
1165 return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs)
1167 def add_options(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1168 """Shortcut for add_route with method OPTIONS."""
1169 return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs)
1171 def add_get(
1172 self,
1173 path: str,
1174 handler: Handler,
1175 *,
1176 name: str | None = None,
1177 allow_head: bool = True,
1178 **kwargs: Any,
1179 ) -> AbstractRoute:
1180 """Shortcut for add_route with method GET.
1182 If allow_head is true, another
1183 route is added allowing head requests to the same endpoint.
1184 """
1185 resource = self.add_resource(path, name=name)
1186 if allow_head:
1187 resource.add_route(hdrs.METH_HEAD, handler, **kwargs)
1188 return resource.add_route(hdrs.METH_GET, handler, **kwargs)
1190 def add_post(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1191 """Shortcut for add_route with method POST."""
1192 return self.add_route(hdrs.METH_POST, path, handler, **kwargs)
1194 def add_put(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1195 """Shortcut for add_route with method PUT."""
1196 return self.add_route(hdrs.METH_PUT, path, handler, **kwargs)
1198 def add_patch(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1199 """Shortcut for add_route with method PATCH."""
1200 return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs)
1202 def add_delete(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1203 """Shortcut for add_route with method DELETE."""
1204 return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs)
1206 def add_view(
1207 self, path: str, handler: type[AbstractView], **kwargs: Any
1208 ) -> AbstractRoute:
1209 """Shortcut for add_route with ANY methods for a class-based view."""
1210 return self.add_route(hdrs.METH_ANY, path, handler, **kwargs)
1212 def freeze(self) -> None:
1213 super().freeze()
1214 for resource in self._resources:
1215 resource.freeze()
1217 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> list[AbstractRoute]:
1218 """Append routes to route table.
1220 Parameter should be a sequence of RouteDef objects.
1222 Returns a list of registered AbstractRoute instances.
1223 """
1224 registered_routes = []
1225 for route_def in routes:
1226 registered_routes.extend(route_def.register(self))
1227 return registered_routes
1230def _quote_path(value: str) -> str:
1231 return URL.build(path=value, encoded=False).raw_path
1234def _unquote_path_safe(value: str) -> str:
1235 if "%" not in value:
1236 return value
1237 return value.replace("%2F", "/").replace("%25", "%")
1240def _requote_path(value: str) -> str:
1241 # Quote non-ascii characters and other characters which must be quoted,
1242 # but preserve existing %-sequences.
1243 result = _quote_path(value)
1244 if "%" in value:
1245 result = result.replace("%25", "%")
1246 return result