Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_urldispatcher.py: 38%
691 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import abc
2import base64
3import hashlib
4import keyword
5import os
6import re
7from contextlib import contextmanager
8from pathlib import Path
9from types import MappingProxyType
10from typing import (
11 TYPE_CHECKING,
12 Any,
13 Awaitable,
14 Callable,
15 Container,
16 Dict,
17 Generator,
18 Iterable,
19 Iterator,
20 List,
21 Mapping,
22 NoReturn,
23 Optional,
24 Pattern,
25 Set,
26 Sized,
27 Tuple,
28 Type,
29 Union,
30 cast,
31)
33from typing_extensions import Final, TypedDict
34from yarl import URL, __version__ as yarl_version # type: ignore[attr-defined]
36from . import hdrs
37from .abc import AbstractMatchInfo, AbstractRouter, AbstractView
38from .helpers import DEBUG, iscoroutinefunction
39from .http import HttpVersion11
40from .typedefs import Handler, PathLike
41from .web_exceptions import (
42 HTTPException,
43 HTTPExpectationFailed,
44 HTTPForbidden,
45 HTTPMethodNotAllowed,
46 HTTPNotFound,
47)
48from .web_fileresponse import FileResponse
49from .web_request import Request
50from .web_response import Response, StreamResponse
51from .web_routedef import AbstractRouteDef
53__all__ = (
54 "UrlDispatcher",
55 "UrlMappingMatchInfo",
56 "AbstractResource",
57 "Resource",
58 "PlainResource",
59 "DynamicResource",
60 "AbstractRoute",
61 "ResourceRoute",
62 "StaticResource",
63 "View",
64)
67if TYPE_CHECKING: # pragma: no cover
68 from .web_app import Application
70 BaseDict = Dict[str, str]
71else:
72 BaseDict = dict
74YARL_VERSION: Final[Tuple[int, ...]] = tuple(map(int, yarl_version.split(".")[:2]))
76HTTP_METHOD_RE: Final[Pattern[str]] = re.compile(
77 r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$"
78)
79ROUTE_RE: Final[Pattern[str]] = re.compile(
80 r"(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})"
81)
82PATH_SEP: Final[str] = re.escape("/")
85_ExpectHandler = Callable[[Request], Awaitable[Optional[StreamResponse]]]
86_Resolve = Tuple[Optional["UrlMappingMatchInfo"], Set[str]]
89class _InfoDict(TypedDict, total=False):
90 path: str
92 formatter: str
93 pattern: Pattern[str]
95 directory: Path
96 prefix: str
97 routes: Mapping[str, "AbstractRoute"]
99 app: "Application"
101 domain: str
103 rule: "AbstractRuleMatching"
105 http_exception: HTTPException
108class AbstractResource(Sized, Iterable["AbstractRoute"]):
109 def __init__(self, *, name: Optional[str] = None) -> None:
110 self._name = name
112 @property
113 def name(self) -> Optional[str]:
114 return self._name
116 @property
117 @abc.abstractmethod
118 def canonical(self) -> str:
119 """Exposes the resource's canonical path.
121 For example '/foo/bar/{name}'
123 """
125 @abc.abstractmethod # pragma: no branch
126 def url_for(self, **kwargs: str) -> URL:
127 """Construct url for resource with additional params."""
129 @abc.abstractmethod # pragma: no branch
130 async def resolve(self, request: Request) -> _Resolve:
131 """Resolve resource.
133 Return (UrlMappingMatchInfo, allowed_methods) pair.
134 """
136 @abc.abstractmethod
137 def add_prefix(self, prefix: str) -> None:
138 """Add a prefix to processed URLs.
140 Required for subapplications support.
141 """
143 @abc.abstractmethod
144 def get_info(self) -> _InfoDict:
145 """Return a dict with additional info useful for introspection"""
147 def freeze(self) -> None:
148 pass
150 @abc.abstractmethod
151 def raw_match(self, path: str) -> bool:
152 """Perform a raw match against path"""
155class AbstractRoute(abc.ABC):
156 def __init__(
157 self,
158 method: str,
159 handler: Union[Handler, Type[AbstractView]],
160 *,
161 expect_handler: Optional[_ExpectHandler] = None,
162 resource: Optional[AbstractResource] = None,
163 ) -> None:
164 if expect_handler is None:
165 expect_handler = _default_expect_handler
167 assert iscoroutinefunction(
168 expect_handler
169 ), f"Coroutine is expected, got {expect_handler!r}"
171 method = method.upper()
172 if not HTTP_METHOD_RE.match(method):
173 raise ValueError(f"{method} is not allowed HTTP method")
175 if iscoroutinefunction(handler):
176 pass
177 elif isinstance(handler, type) and issubclass(handler, AbstractView):
178 pass
179 else:
180 raise TypeError(
181 "Only async functions are allowed as web-handlers "
182 ", got {!r}".format(handler)
183 )
185 self._method = method
186 self._handler = handler
187 self._expect_handler = expect_handler
188 self._resource = resource
190 @property
191 def method(self) -> str:
192 return self._method
194 @property
195 def handler(self) -> Handler:
196 return self._handler
198 @property
199 @abc.abstractmethod
200 def name(self) -> Optional[str]:
201 """Optional route's name, always equals to resource's name."""
203 @property
204 def resource(self) -> Optional[AbstractResource]:
205 return self._resource
207 @abc.abstractmethod
208 def get_info(self) -> _InfoDict:
209 """Return a dict with additional info useful for introspection"""
211 @abc.abstractmethod # pragma: no branch
212 def url_for(self, *args: str, **kwargs: str) -> URL:
213 """Construct url for route with additional params."""
215 async def handle_expect_header(self, request: Request) -> Optional[StreamResponse]:
216 return await self._expect_handler(request)
219class UrlMappingMatchInfo(BaseDict, AbstractMatchInfo):
220 def __init__(self, match_dict: Dict[str, str], route: AbstractRoute):
221 super().__init__(match_dict)
222 self._route = route
223 self._apps: List[Application] = []
224 self._current_app: Optional[Application] = None
225 self._frozen = False
227 @property
228 def handler(self) -> Handler:
229 return self._route.handler
231 @property
232 def route(self) -> AbstractRoute:
233 return self._route
235 @property
236 def expect_handler(self) -> _ExpectHandler:
237 return self._route.handle_expect_header
239 @property
240 def http_exception(self) -> Optional[HTTPException]:
241 return None
243 def get_info(self) -> _InfoDict: # type: ignore[override]
244 return self._route.get_info()
246 @property
247 def apps(self) -> Tuple["Application", ...]:
248 return tuple(self._apps)
250 def add_app(self, app: "Application") -> None:
251 if self._frozen:
252 raise RuntimeError("Cannot change apps stack after .freeze() call")
253 if self._current_app is None:
254 self._current_app = app
255 self._apps.insert(0, app)
257 @property
258 def current_app(self) -> "Application":
259 app = self._current_app
260 assert app is not None
261 return app
263 @contextmanager
264 def set_current_app(self, app: "Application") -> Generator[None, None, None]:
265 if DEBUG: # pragma: no cover
266 if app not in self._apps:
267 raise RuntimeError(
268 "Expected one of the following apps {!r}, got {!r}".format(
269 self._apps, app
270 )
271 )
272 prev = self._current_app
273 self._current_app = app
274 try:
275 yield
276 finally:
277 self._current_app = prev
279 def freeze(self) -> None:
280 self._frozen = True
282 def __repr__(self) -> str:
283 return f"<MatchInfo {super().__repr__()}: {self._route}>"
286class MatchInfoError(UrlMappingMatchInfo):
287 def __init__(self, http_exception: HTTPException) -> None:
288 self._exception = http_exception
289 super().__init__({}, SystemRoute(self._exception))
291 @property
292 def http_exception(self) -> HTTPException:
293 return self._exception
295 def __repr__(self) -> str:
296 return "<MatchInfoError {}: {}>".format(
297 self._exception.status, self._exception.reason
298 )
301async def _default_expect_handler(request: Request) -> None:
302 """Default handler for Expect header.
304 Just send "100 Continue" to client.
305 raise HTTPExpectationFailed if value of header is not "100-continue"
306 """
307 expect = request.headers.get(hdrs.EXPECT, "")
308 if request.version == HttpVersion11:
309 if expect.lower() == "100-continue":
310 await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n")
311 else:
312 raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect)
315class Resource(AbstractResource):
316 def __init__(self, *, name: Optional[str] = None) -> None:
317 super().__init__(name=name)
318 self._routes: List[ResourceRoute] = []
320 def add_route(
321 self,
322 method: str,
323 handler: Union[Type[AbstractView], Handler],
324 *,
325 expect_handler: Optional[_ExpectHandler] = None,
326 ) -> "ResourceRoute":
327 for route_obj in self._routes:
328 if route_obj.method == method or route_obj.method == hdrs.METH_ANY:
329 raise RuntimeError(
330 "Added route will never be executed, "
331 "method {route.method} is already "
332 "registered".format(route=route_obj)
333 )
335 route_obj = ResourceRoute(method, handler, self, expect_handler=expect_handler)
336 self.register_route(route_obj)
337 return route_obj
339 def register_route(self, route: "ResourceRoute") -> None:
340 assert isinstance(
341 route, ResourceRoute
342 ), f"Instance of Route class is required, got {route!r}"
343 self._routes.append(route)
345 async def resolve(self, request: Request) -> _Resolve:
346 allowed_methods: Set[str] = set()
348 match_dict = self._match(request.rel_url.raw_path)
349 if match_dict is None:
350 return None, allowed_methods
352 for route_obj in self._routes:
353 route_method = route_obj.method
354 allowed_methods.add(route_method)
356 if route_method == request.method or route_method == hdrs.METH_ANY:
357 return (UrlMappingMatchInfo(match_dict, route_obj), allowed_methods)
358 else:
359 return None, allowed_methods
361 @abc.abstractmethod
362 def _match(self, path: str) -> Optional[Dict[str, str]]:
363 pass # pragma: no cover
365 def __len__(self) -> int:
366 return len(self._routes)
368 def __iter__(self) -> Iterator["ResourceRoute"]:
369 return iter(self._routes)
371 # TODO: implement all abstract methods
374class PlainResource(Resource):
375 def __init__(self, path: str, *, name: Optional[str] = None) -> None:
376 super().__init__(name=name)
377 assert not path or path.startswith("/")
378 self._path = path
380 @property
381 def canonical(self) -> str:
382 return self._path
384 def freeze(self) -> None:
385 if not self._path:
386 self._path = "/"
388 def add_prefix(self, prefix: str) -> None:
389 assert prefix.startswith("/")
390 assert not prefix.endswith("/")
391 assert len(prefix) > 1
392 self._path = prefix + self._path
394 def _match(self, path: str) -> Optional[Dict[str, str]]:
395 # string comparison is about 10 times faster than regexp matching
396 if self._path == path:
397 return {}
398 else:
399 return None
401 def raw_match(self, path: str) -> bool:
402 return self._path == path
404 def get_info(self) -> _InfoDict:
405 return {"path": self._path}
407 def url_for(self) -> URL: # type: ignore[override]
408 return URL.build(path=self._path, encoded=True)
410 def __repr__(self) -> str:
411 name = "'" + self.name + "' " if self.name is not None else ""
412 return f"<PlainResource {name} {self._path}>"
415class DynamicResource(Resource):
416 DYN = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}")
417 DYN_WITH_RE = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}")
418 GOOD = r"[^{}/]+"
420 def __init__(self, path: str, *, name: Optional[str] = None) -> None:
421 super().__init__(name=name)
422 pattern = ""
423 formatter = ""
424 for part in ROUTE_RE.split(path):
425 match = self.DYN.fullmatch(part)
426 if match:
427 pattern += "(?P<{}>{})".format(match.group("var"), self.GOOD)
428 formatter += "{" + match.group("var") + "}"
429 continue
431 match = self.DYN_WITH_RE.fullmatch(part)
432 if match:
433 pattern += "(?P<{var}>{re})".format(**match.groupdict())
434 formatter += "{" + match.group("var") + "}"
435 continue
437 if "{" in part or "}" in part:
438 raise ValueError(f"Invalid path '{path}'['{part}']")
440 part = _requote_path(part)
441 formatter += part
442 pattern += re.escape(part)
444 try:
445 compiled = re.compile(pattern)
446 except re.error as exc:
447 raise ValueError(f"Bad pattern '{pattern}': {exc}") from None
448 assert compiled.pattern.startswith(PATH_SEP)
449 assert formatter.startswith("/")
450 self._pattern = compiled
451 self._formatter = formatter
453 @property
454 def canonical(self) -> str:
455 return self._formatter
457 def add_prefix(self, prefix: str) -> None:
458 assert prefix.startswith("/")
459 assert not prefix.endswith("/")
460 assert len(prefix) > 1
461 self._pattern = re.compile(re.escape(prefix) + self._pattern.pattern)
462 self._formatter = prefix + self._formatter
464 def _match(self, path: str) -> Optional[Dict[str, str]]:
465 match = self._pattern.fullmatch(path)
466 if match is None:
467 return None
468 else:
469 return {
470 key: _unquote_path(value) for key, value in match.groupdict().items()
471 }
473 def raw_match(self, path: str) -> bool:
474 return self._formatter == path
476 def get_info(self) -> _InfoDict:
477 return {"formatter": self._formatter, "pattern": self._pattern}
479 def url_for(self, **parts: str) -> URL:
480 url = self._formatter.format_map({k: _quote_path(v) for k, v in parts.items()})
481 return URL.build(path=url, encoded=True)
483 def __repr__(self) -> str:
484 name = "'" + self.name + "' " if self.name is not None else ""
485 return "<DynamicResource {name} {formatter}>".format(
486 name=name, formatter=self._formatter
487 )
490class PrefixResource(AbstractResource):
491 def __init__(self, prefix: str, *, name: Optional[str] = None) -> None:
492 assert not prefix or prefix.startswith("/"), prefix
493 assert prefix in ("", "/") or not prefix.endswith("/"), prefix
494 super().__init__(name=name)
495 self._prefix = _requote_path(prefix)
496 self._prefix2 = self._prefix + "/"
498 @property
499 def canonical(self) -> str:
500 return self._prefix
502 def add_prefix(self, prefix: str) -> None:
503 assert prefix.startswith("/")
504 assert not prefix.endswith("/")
505 assert len(prefix) > 1
506 self._prefix = prefix + self._prefix
507 self._prefix2 = self._prefix + "/"
509 def raw_match(self, prefix: str) -> bool:
510 return False
512 # TODO: impl missing abstract methods
515class StaticResource(PrefixResource):
516 VERSION_KEY = "v"
518 def __init__(
519 self,
520 prefix: str,
521 directory: PathLike,
522 *,
523 name: Optional[str] = None,
524 expect_handler: Optional[_ExpectHandler] = None,
525 chunk_size: int = 256 * 1024,
526 show_index: bool = False,
527 follow_symlinks: bool = False,
528 append_version: bool = False,
529 ) -> None:
530 super().__init__(prefix, name=name)
531 try:
532 directory = Path(directory)
533 if str(directory).startswith("~"):
534 directory = Path(os.path.expanduser(str(directory)))
535 directory = directory.resolve()
536 if not directory.is_dir():
537 raise ValueError("Not a directory")
538 except (FileNotFoundError, ValueError) as error:
539 raise ValueError(f"No directory exists at '{directory}'") from error
540 self._directory = directory
541 self._show_index = show_index
542 self._chunk_size = chunk_size
543 self._follow_symlinks = follow_symlinks
544 self._expect_handler = expect_handler
545 self._append_version = append_version
547 self._routes = {
548 "GET": ResourceRoute(
549 "GET", self._handle, self, expect_handler=expect_handler
550 ),
551 "HEAD": ResourceRoute(
552 "HEAD", self._handle, self, expect_handler=expect_handler
553 ),
554 }
556 def url_for( # type: ignore[override]
557 self,
558 *,
559 filename: PathLike,
560 append_version: Optional[bool] = None,
561 ) -> URL:
562 if append_version is None:
563 append_version = self._append_version
564 filename = str(filename).lstrip("/")
566 url = URL.build(path=self._prefix, encoded=True)
567 # filename is not encoded
568 if YARL_VERSION < (1, 6):
569 url = url / filename.replace("%", "%25")
570 else:
571 url = url / filename
573 if append_version:
574 try:
575 filepath = self._directory.joinpath(filename).resolve()
576 if not self._follow_symlinks:
577 filepath.relative_to(self._directory)
578 except (ValueError, FileNotFoundError):
579 # ValueError for case when path point to symlink
580 # with follow_symlinks is False
581 return url # relatively safe
582 if filepath.is_file():
583 # TODO cache file content
584 # with file watcher for cache invalidation
585 with filepath.open("rb") as f:
586 file_bytes = f.read()
587 h = self._get_file_hash(file_bytes)
588 url = url.with_query({self.VERSION_KEY: h})
589 return url
590 return url
592 @staticmethod
593 def _get_file_hash(byte_array: bytes) -> str:
594 m = hashlib.sha256() # todo sha256 can be configurable param
595 m.update(byte_array)
596 b64 = base64.urlsafe_b64encode(m.digest())
597 return b64.decode("ascii")
599 def get_info(self) -> _InfoDict:
600 return {
601 "directory": self._directory,
602 "prefix": self._prefix,
603 "routes": self._routes,
604 }
606 def set_options_route(self, handler: Handler) -> None:
607 if "OPTIONS" in self._routes:
608 raise RuntimeError("OPTIONS route was set already")
609 self._routes["OPTIONS"] = ResourceRoute(
610 "OPTIONS", handler, self, expect_handler=self._expect_handler
611 )
613 async def resolve(self, request: Request) -> _Resolve:
614 path = request.rel_url.raw_path
615 method = request.method
616 allowed_methods = set(self._routes)
617 if not path.startswith(self._prefix2) and path != self._prefix:
618 return None, set()
620 if method not in allowed_methods:
621 return None, allowed_methods
623 match_dict = {"filename": _unquote_path(path[len(self._prefix) + 1 :])}
624 return (UrlMappingMatchInfo(match_dict, self._routes[method]), allowed_methods)
626 def __len__(self) -> int:
627 return len(self._routes)
629 def __iter__(self) -> Iterator[AbstractRoute]:
630 return iter(self._routes.values())
632 async def _handle(self, request: Request) -> StreamResponse:
633 rel_url = request.match_info["filename"]
634 try:
635 filename = Path(rel_url)
636 if filename.anchor:
637 # rel_url is an absolute name like
638 # /static/\\machine_name\c$ or /static/D:\path
639 # where the static dir is totally different
640 raise HTTPForbidden()
641 filepath = self._directory.joinpath(filename).resolve()
642 if not self._follow_symlinks:
643 filepath.relative_to(self._directory)
644 except (ValueError, FileNotFoundError) as error:
645 # relatively safe
646 raise HTTPNotFound() from error
647 except HTTPForbidden:
648 raise
649 except Exception as error:
650 # perm error or other kind!
651 request.app.logger.exception(error)
652 raise HTTPNotFound() from error
654 # on opening a dir, load its contents if allowed
655 if filepath.is_dir():
656 if self._show_index:
657 try:
658 return Response(
659 text=self._directory_as_html(filepath), content_type="text/html"
660 )
661 except PermissionError:
662 raise HTTPForbidden()
663 else:
664 raise HTTPForbidden()
665 elif filepath.is_file():
666 return FileResponse(filepath, chunk_size=self._chunk_size)
667 else:
668 raise HTTPNotFound
670 def _directory_as_html(self, filepath: Path) -> str:
671 # returns directory's index as html
673 # sanity check
674 assert filepath.is_dir()
676 relative_path_to_dir = filepath.relative_to(self._directory).as_posix()
677 index_of = f"Index of /{relative_path_to_dir}"
678 h1 = f"<h1>{index_of}</h1>"
680 index_list = []
681 dir_index = filepath.iterdir()
682 for _file in sorted(dir_index):
683 # show file url as relative to static path
684 rel_path = _file.relative_to(self._directory).as_posix()
685 file_url = self._prefix + "/" + rel_path
687 # if file is a directory, add '/' to the end of the name
688 if _file.is_dir():
689 file_name = f"{_file.name}/"
690 else:
691 file_name = _file.name
693 index_list.append(
694 '<li><a href="{url}">{name}</a></li>'.format(
695 url=file_url, name=file_name
696 )
697 )
698 ul = "<ul>\n{}\n</ul>".format("\n".join(index_list))
699 body = f"<body>\n{h1}\n{ul}\n</body>"
701 head_str = f"<head>\n<title>{index_of}</title>\n</head>"
702 html = f"<html>\n{head_str}\n{body}\n</html>"
704 return html
706 def __repr__(self) -> str:
707 name = "'" + self.name + "'" if self.name is not None else ""
708 return "<StaticResource {name} {path} -> {directory!r}>".format(
709 name=name, path=self._prefix, directory=self._directory
710 )
713class PrefixedSubAppResource(PrefixResource):
714 def __init__(self, prefix: str, app: "Application") -> None:
715 super().__init__(prefix)
716 self._app = app
717 for resource in app.router.resources():
718 resource.add_prefix(prefix)
720 def add_prefix(self, prefix: str) -> None:
721 super().add_prefix(prefix)
722 for resource in self._app.router.resources():
723 resource.add_prefix(prefix)
725 def url_for(self, *args: str, **kwargs: str) -> URL:
726 raise RuntimeError(".url_for() is not supported " "by sub-application root")
728 def get_info(self) -> _InfoDict:
729 return {"app": self._app, "prefix": self._prefix}
731 async def resolve(self, request: Request) -> _Resolve:
732 if (
733 not request.url.raw_path.startswith(self._prefix2)
734 and request.url.raw_path != self._prefix
735 ):
736 return None, set()
737 match_info = await self._app.router.resolve(request)
738 match_info.add_app(self._app)
739 if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
740 methods = match_info.http_exception.allowed_methods
741 else:
742 methods = set()
743 return match_info, methods
745 def __len__(self) -> int:
746 return len(self._app.router.routes())
748 def __iter__(self) -> Iterator[AbstractRoute]:
749 return iter(self._app.router.routes())
751 def __repr__(self) -> str:
752 return "<PrefixedSubAppResource {prefix} -> {app!r}>".format(
753 prefix=self._prefix, app=self._app
754 )
757class AbstractRuleMatching(abc.ABC):
758 @abc.abstractmethod # pragma: no branch
759 async def match(self, request: Request) -> bool:
760 """Return bool if the request satisfies the criteria"""
762 @abc.abstractmethod # pragma: no branch
763 def get_info(self) -> _InfoDict:
764 """Return a dict with additional info useful for introspection"""
766 @property
767 @abc.abstractmethod # pragma: no branch
768 def canonical(self) -> str:
769 """Return a str"""
772class Domain(AbstractRuleMatching):
773 re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)")
775 def __init__(self, domain: str) -> None:
776 super().__init__()
777 self._domain = self.validation(domain)
779 @property
780 def canonical(self) -> str:
781 return self._domain
783 def validation(self, domain: str) -> str:
784 if not isinstance(domain, str):
785 raise TypeError("Domain must be str")
786 domain = domain.rstrip(".").lower()
787 if not domain:
788 raise ValueError("Domain cannot be empty")
789 elif "://" in domain:
790 raise ValueError("Scheme not supported")
791 url = URL("http://" + domain)
792 assert url.raw_host is not None
793 if not all(self.re_part.fullmatch(x) for x in url.raw_host.split(".")):
794 raise ValueError("Domain not valid")
795 if url.port == 80:
796 return url.raw_host
797 return f"{url.raw_host}:{url.port}"
799 async def match(self, request: Request) -> bool:
800 host = request.headers.get(hdrs.HOST)
801 if not host:
802 return False
803 return self.match_domain(host)
805 def match_domain(self, host: str) -> bool:
806 return host.lower() == self._domain
808 def get_info(self) -> _InfoDict:
809 return {"domain": self._domain}
812class MaskDomain(Domain):
813 re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)")
815 def __init__(self, domain: str) -> None:
816 super().__init__(domain)
817 mask = self._domain.replace(".", r"\.").replace("*", ".*")
818 self._mask = re.compile(mask)
820 @property
821 def canonical(self) -> str:
822 return self._mask.pattern
824 def match_domain(self, host: str) -> bool:
825 return self._mask.fullmatch(host) is not None
828class MatchedSubAppResource(PrefixedSubAppResource):
829 def __init__(self, rule: AbstractRuleMatching, app: "Application") -> None:
830 AbstractResource.__init__(self)
831 self._prefix = ""
832 self._app = app
833 self._rule = rule
835 @property
836 def canonical(self) -> str:
837 return self._rule.canonical
839 def get_info(self) -> _InfoDict:
840 return {"app": self._app, "rule": self._rule}
842 async def resolve(self, request: Request) -> _Resolve:
843 if not await self._rule.match(request):
844 return None, set()
845 match_info = await self._app.router.resolve(request)
846 match_info.add_app(self._app)
847 if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
848 methods = match_info.http_exception.allowed_methods
849 else:
850 methods = set()
851 return match_info, methods
853 def __repr__(self) -> str:
854 return "<MatchedSubAppResource -> {app!r}>" "".format(app=self._app)
857class ResourceRoute(AbstractRoute):
858 """A route with resource"""
860 def __init__(
861 self,
862 method: str,
863 handler: Union[Handler, Type[AbstractView]],
864 resource: AbstractResource,
865 *,
866 expect_handler: Optional[_ExpectHandler] = None,
867 ) -> None:
868 super().__init__(
869 method, handler, expect_handler=expect_handler, resource=resource
870 )
872 def __repr__(self) -> str:
873 return "<ResourceRoute [{method}] {resource} -> {handler!r}".format(
874 method=self.method, resource=self._resource, handler=self.handler
875 )
877 @property
878 def name(self) -> Optional[str]:
879 if self._resource is None:
880 return None
881 return self._resource.name
883 def url_for(self, *args: str, **kwargs: str) -> URL:
884 """Construct url for route with additional params."""
885 assert self._resource is not None
886 return self._resource.url_for(*args, **kwargs)
888 def get_info(self) -> _InfoDict:
889 assert self._resource is not None
890 return self._resource.get_info()
893class SystemRoute(AbstractRoute):
894 def __init__(self, http_exception: HTTPException) -> None:
895 super().__init__(hdrs.METH_ANY, self._handle)
896 self._http_exception = http_exception
898 def url_for(self, *args: str, **kwargs: str) -> URL:
899 raise RuntimeError(".url_for() is not allowed for SystemRoute")
901 @property
902 def name(self) -> Optional[str]:
903 return None
905 def get_info(self) -> _InfoDict:
906 return {"http_exception": self._http_exception}
908 async def _handle(self, request: Request) -> StreamResponse:
909 raise self._http_exception
911 @property
912 def status(self) -> int:
913 return self._http_exception.status
915 @property
916 def reason(self) -> str:
917 return self._http_exception.reason
919 def __repr__(self) -> str:
920 return "<SystemRoute {self.status}: {self.reason}>".format(self=self)
923class View(AbstractView):
924 async def _iter(self) -> StreamResponse:
925 if self.request.method not in hdrs.METH_ALL:
926 self._raise_allowed_methods()
927 method: Optional[Callable[[], Awaitable[StreamResponse]]] = getattr(
928 self, self.request.method.lower(), None
929 )
930 if method is None:
931 self._raise_allowed_methods()
932 return await method()
934 def __await__(self) -> Generator[Any, None, StreamResponse]:
935 return self._iter().__await__()
937 def _raise_allowed_methods(self) -> NoReturn:
938 allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())}
939 raise HTTPMethodNotAllowed(self.request.method, allowed_methods)
942class ResourcesView(Sized, Iterable[AbstractResource], Container[AbstractResource]):
943 def __init__(self, resources: List[AbstractResource]) -> None:
944 self._resources = resources
946 def __len__(self) -> int:
947 return len(self._resources)
949 def __iter__(self) -> Iterator[AbstractResource]:
950 yield from self._resources
952 def __contains__(self, resource: object) -> bool:
953 return resource in self._resources
956class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]):
957 def __init__(self, resources: List[AbstractResource]):
958 self._routes: List[AbstractRoute] = []
959 for resource in resources:
960 for route in resource:
961 self._routes.append(route)
963 def __len__(self) -> int:
964 return len(self._routes)
966 def __iter__(self) -> Iterator[AbstractRoute]:
967 yield from self._routes
969 def __contains__(self, route: object) -> bool:
970 return route in self._routes
973class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]):
974 NAME_SPLIT_RE = re.compile(r"[.:-]")
976 def __init__(self) -> None:
977 super().__init__()
978 self._resources: List[AbstractResource] = []
979 self._named_resources: Dict[str, AbstractResource] = {}
981 async def resolve(self, request: Request) -> UrlMappingMatchInfo:
982 method = request.method
983 allowed_methods: Set[str] = set()
985 for resource in self._resources:
986 match_dict, allowed = await resource.resolve(request)
987 if match_dict is not None:
988 return match_dict
989 else:
990 allowed_methods |= allowed
992 if allowed_methods:
993 return MatchInfoError(HTTPMethodNotAllowed(method, allowed_methods))
994 else:
995 return MatchInfoError(HTTPNotFound())
997 def __iter__(self) -> Iterator[str]:
998 return iter(self._named_resources)
1000 def __len__(self) -> int:
1001 return len(self._named_resources)
1003 def __contains__(self, resource: object) -> bool:
1004 return resource in self._named_resources
1006 def __getitem__(self, name: str) -> AbstractResource:
1007 return self._named_resources[name]
1009 def resources(self) -> ResourcesView:
1010 return ResourcesView(self._resources)
1012 def routes(self) -> RoutesView:
1013 return RoutesView(self._resources)
1015 def named_resources(self) -> Mapping[str, AbstractResource]:
1016 return MappingProxyType(self._named_resources)
1018 def register_resource(self, resource: AbstractResource) -> None:
1019 assert isinstance(
1020 resource, AbstractResource
1021 ), f"Instance of AbstractResource class is required, got {resource!r}"
1022 if self.frozen:
1023 raise RuntimeError("Cannot register a resource into frozen router.")
1025 name = resource.name
1027 if name is not None:
1028 parts = self.NAME_SPLIT_RE.split(name)
1029 for part in parts:
1030 if keyword.iskeyword(part):
1031 raise ValueError(
1032 f"Incorrect route name {name!r}, "
1033 "python keywords cannot be used "
1034 "for route name"
1035 )
1036 if not part.isidentifier():
1037 raise ValueError(
1038 "Incorrect route name {!r}, "
1039 "the name should be a sequence of "
1040 "python identifiers separated "
1041 "by dash, dot or column".format(name)
1042 )
1043 if name in self._named_resources:
1044 raise ValueError(
1045 "Duplicate {!r}, "
1046 "already handled by {!r}".format(name, self._named_resources[name])
1047 )
1048 self._named_resources[name] = resource
1049 self._resources.append(resource)
1051 def add_resource(self, path: str, *, name: Optional[str] = None) -> Resource:
1052 if path and not path.startswith("/"):
1053 raise ValueError("path should be started with / or be empty")
1054 # Reuse last added resource if path and name are the same
1055 if self._resources:
1056 resource = self._resources[-1]
1057 if resource.name == name and resource.raw_match(path):
1058 return cast(Resource, resource)
1059 if not ("{" in path or "}" in path or ROUTE_RE.search(path)):
1060 resource = PlainResource(_requote_path(path), name=name)
1061 self.register_resource(resource)
1062 return resource
1063 resource = DynamicResource(path, name=name)
1064 self.register_resource(resource)
1065 return resource
1067 def add_route(
1068 self,
1069 method: str,
1070 path: str,
1071 handler: Union[Handler, Type[AbstractView]],
1072 *,
1073 name: Optional[str] = None,
1074 expect_handler: Optional[_ExpectHandler] = None,
1075 ) -> AbstractRoute:
1076 resource = self.add_resource(path, name=name)
1077 return resource.add_route(method, handler, expect_handler=expect_handler)
1079 def add_static(
1080 self,
1081 prefix: str,
1082 path: PathLike,
1083 *,
1084 name: Optional[str] = None,
1085 expect_handler: Optional[_ExpectHandler] = None,
1086 chunk_size: int = 256 * 1024,
1087 show_index: bool = False,
1088 follow_symlinks: bool = False,
1089 append_version: bool = False,
1090 ) -> AbstractResource:
1091 """Add static files view.
1093 prefix - url prefix
1094 path - folder with files
1096 """
1097 assert prefix.startswith("/")
1098 if prefix.endswith("/"):
1099 prefix = prefix[:-1]
1100 resource = StaticResource(
1101 prefix,
1102 path,
1103 name=name,
1104 expect_handler=expect_handler,
1105 chunk_size=chunk_size,
1106 show_index=show_index,
1107 follow_symlinks=follow_symlinks,
1108 append_version=append_version,
1109 )
1110 self.register_resource(resource)
1111 return resource
1113 def add_head(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1114 """Shortcut for add_route with method HEAD."""
1115 return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs)
1117 def add_options(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1118 """Shortcut for add_route with method OPTIONS."""
1119 return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs)
1121 def add_get(
1122 self,
1123 path: str,
1124 handler: Handler,
1125 *,
1126 name: Optional[str] = None,
1127 allow_head: bool = True,
1128 **kwargs: Any,
1129 ) -> AbstractRoute:
1130 """Shortcut for add_route with method GET.
1132 If allow_head is true, another
1133 route is added allowing head requests to the same endpoint.
1134 """
1135 resource = self.add_resource(path, name=name)
1136 if allow_head:
1137 resource.add_route(hdrs.METH_HEAD, handler, **kwargs)
1138 return resource.add_route(hdrs.METH_GET, handler, **kwargs)
1140 def add_post(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1141 """Shortcut for add_route with method POST."""
1142 return self.add_route(hdrs.METH_POST, path, handler, **kwargs)
1144 def add_put(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1145 """Shortcut for add_route with method PUT."""
1146 return self.add_route(hdrs.METH_PUT, path, handler, **kwargs)
1148 def add_patch(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1149 """Shortcut for add_route with method PATCH."""
1150 return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs)
1152 def add_delete(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
1153 """Shortcut for add_route with method DELETE."""
1154 return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs)
1156 def add_view(
1157 self, path: str, handler: Type[AbstractView], **kwargs: Any
1158 ) -> AbstractRoute:
1159 """Shortcut for add_route with ANY methods for a class-based view."""
1160 return self.add_route(hdrs.METH_ANY, path, handler, **kwargs)
1162 def freeze(self) -> None:
1163 super().freeze()
1164 for resource in self._resources:
1165 resource.freeze()
1167 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]:
1168 """Append routes to route table.
1170 Parameter should be a sequence of RouteDef objects.
1172 Returns a list of registered AbstractRoute instances.
1173 """
1174 registered_routes = []
1175 for route_def in routes:
1176 registered_routes.extend(route_def.register(self))
1177 return registered_routes
1180def _quote_path(value: str) -> str:
1181 if YARL_VERSION < (1, 6):
1182 value = value.replace("%", "%25")
1183 return URL.build(path=value, encoded=False).raw_path
1186def _unquote_path(value: str) -> str:
1187 return URL.build(path=value, encoded=True).path
1190def _requote_path(value: str) -> str:
1191 # Quote non-ascii characters and other characters which must be quoted,
1192 # but preserve existing %-sequences.
1193 result = _quote_path(value)
1194 if "%" in value:
1195 result = result.replace("%25", "%")
1196 return result