Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_urldispatcher.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

733 statements  

1import abc 

2import asyncio 

3import base64 

4import functools 

5import hashlib 

6import html 

7import inspect 

8import keyword 

9import os 

10import platform 

11import re 

12import sys 

13from collections.abc import ( 

14 Awaitable, 

15 Callable, 

16 Container, 

17 Generator, 

18 Iterable, 

19 Iterator, 

20 Mapping, 

21 Sized, 

22) 

23from pathlib import Path 

24from re import Pattern 

25from types import MappingProxyType 

26from typing import TYPE_CHECKING, Any, Final, NoReturn, Optional, TypedDict, cast 

27 

28from yarl import URL 

29 

30from . import hdrs 

31from .abc import AbstractMatchInfo, AbstractRouter, AbstractView 

32from .helpers import DEBUG 

33from .http import HttpVersion11 

34from .typedefs import Handler, PathLike 

35from .web_exceptions import ( 

36 HTTPException, 

37 HTTPExpectationFailed, 

38 HTTPForbidden, 

39 HTTPMethodNotAllowed, 

40 HTTPNotFound, 

41) 

42from .web_fileresponse import FileResponse 

43from .web_request import Request 

44from .web_response import Response, StreamResponse 

45from .web_routedef import AbstractRouteDef 

46 

47__all__ = ( 

48 "UrlDispatcher", 

49 "UrlMappingMatchInfo", 

50 "AbstractResource", 

51 "Resource", 

52 "PlainResource", 

53 "DynamicResource", 

54 "AbstractRoute", 

55 "ResourceRoute", 

56 "StaticResource", 

57 "View", 

58) 

59 

60 

61if TYPE_CHECKING: 

62 from .web_app import Application 

63 

64CIRCULAR_SYMLINK_ERROR = (RuntimeError,) if sys.version_info < (3, 13) else () 

65 

66HTTP_METHOD_RE: Final[Pattern[str]] = re.compile( 

67 r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$" 

68) 

69ROUTE_RE: Final[Pattern[str]] = re.compile( 

70 r"(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})" 

71) 

72PATH_SEP: Final[str] = re.escape("/") 

73 

74IS_WINDOWS: Final[bool] = platform.system() == "Windows" 

75 

76_ExpectHandler = Callable[[Request], Awaitable[StreamResponse | None]] 

77_Resolve = tuple[Optional["UrlMappingMatchInfo"], set[str]] 

78 

79html_escape = functools.partial(html.escape, quote=True) 

80 

81 

82class _InfoDict(TypedDict, total=False): 

83 path: str 

84 

85 formatter: str 

86 pattern: Pattern[str] 

87 

88 directory: Path 

89 prefix: str 

90 routes: Mapping[str, "AbstractRoute"] 

91 

92 app: "Application" 

93 

94 domain: str 

95 

96 rule: "AbstractRuleMatching" 

97 

98 http_exception: HTTPException 

99 

100 

101class AbstractResource(Sized, Iterable["AbstractRoute"]): 

102 def __init__(self, *, name: str | None = None) -> None: 

103 self._name = name 

104 

105 @property 

106 def name(self) -> str | None: 

107 return self._name 

108 

109 @property 

110 @abc.abstractmethod 

111 def canonical(self) -> str: 

112 """Exposes the resource's canonical path. 

113 

114 For example '/foo/bar/{name}' 

115 

116 """ 

117 

118 @abc.abstractmethod # pragma: no branch 

119 def url_for(self, **kwargs: str) -> URL: 

120 """Construct url for resource with additional params.""" 

121 

122 @abc.abstractmethod # pragma: no branch 

123 async def resolve(self, request: Request) -> _Resolve: 

124 """Resolve resource. 

125 

126 Return (UrlMappingMatchInfo, allowed_methods) pair. 

127 """ 

128 

129 @abc.abstractmethod 

130 def add_prefix(self, prefix: str) -> None: 

131 """Add a prefix to processed URLs. 

132 

133 Required for subapplications support. 

134 """ 

135 

136 @abc.abstractmethod 

137 def get_info(self) -> _InfoDict: 

138 """Return a dict with additional info useful for introspection""" 

139 

140 def freeze(self) -> None: 

141 pass 

142 

143 @abc.abstractmethod 

144 def raw_match(self, path: str) -> bool: 

145 """Perform a raw match against path""" 

146 

147 

148class AbstractRoute(abc.ABC): 

149 def __init__( 

150 self, 

151 method: str, 

152 handler: Handler | type[AbstractView], 

153 *, 

154 expect_handler: _ExpectHandler | None = None, 

155 resource: AbstractResource | None = None, 

156 ) -> None: 

157 if expect_handler is None: 

158 expect_handler = _default_expect_handler 

159 

160 assert inspect.iscoroutinefunction(expect_handler) or ( 

161 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(expect_handler) 

162 ), f"Coroutine is expected, got {expect_handler!r}" 

163 

164 method = method.upper() 

165 if not HTTP_METHOD_RE.match(method): 

166 raise ValueError(f"{method} is not allowed HTTP method") 

167 

168 if inspect.iscoroutinefunction(handler) or ( 

169 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(handler) 

170 ): 

171 pass 

172 elif isinstance(handler, type) and issubclass(handler, AbstractView): 

173 pass 

174 else: 

175 raise TypeError( 

176 f"Only async functions are allowed as web-handlers, got {handler!r}" 

177 ) 

178 

179 self._method = method 

180 self._handler = handler 

181 self._expect_handler = expect_handler 

182 self._resource = resource 

183 

184 @property 

185 def method(self) -> str: 

186 return self._method 

187 

188 @property 

189 def handler(self) -> Handler: 

190 return self._handler 

191 

192 @property 

193 @abc.abstractmethod 

194 def name(self) -> str | None: 

195 """Optional route's name, always equals to resource's name.""" 

196 

197 @property 

198 def resource(self) -> AbstractResource | None: 

199 return self._resource 

200 

201 @abc.abstractmethod 

202 def get_info(self) -> _InfoDict: 

203 """Return a dict with additional info useful for introspection""" 

204 

205 @abc.abstractmethod # pragma: no branch 

206 def url_for(self, *args: str, **kwargs: str) -> URL: 

207 """Construct url for route with additional params.""" 

208 

209 async def handle_expect_header(self, request: Request) -> StreamResponse | None: 

210 return await self._expect_handler(request) 

211 

212 

213class UrlMappingMatchInfo(dict[str, str], AbstractMatchInfo): 

214 

215 __slots__ = ("_route", "_apps", "_current_app", "_frozen") 

216 

217 def __init__(self, match_dict: dict[str, str], route: AbstractRoute) -> None: 

218 super().__init__(match_dict) 

219 self._route = route 

220 self._apps: list[Application] = [] 

221 self._current_app: Application | None = None 

222 self._frozen = False 

223 

224 @property 

225 def handler(self) -> Handler: 

226 return self._route.handler 

227 

228 @property 

229 def route(self) -> AbstractRoute: 

230 return self._route 

231 

232 @property 

233 def expect_handler(self) -> _ExpectHandler: 

234 return self._route.handle_expect_header 

235 

236 @property 

237 def http_exception(self) -> HTTPException | None: 

238 return None 

239 

240 def get_info(self) -> _InfoDict: # type: ignore[override] 

241 return self._route.get_info() 

242 

243 @property 

244 def apps(self) -> tuple["Application", ...]: 

245 return tuple(self._apps) 

246 

247 def add_app(self, app: "Application") -> None: 

248 if self._frozen: 

249 raise RuntimeError("Cannot change apps stack after .freeze() call") 

250 if self._current_app is None: 

251 self._current_app = app 

252 self._apps.insert(0, app) 

253 

254 @property 

255 def current_app(self) -> "Application": 

256 app = self._current_app 

257 assert app is not None 

258 return app 

259 

260 @current_app.setter 

261 def current_app(self, app: "Application") -> None: 

262 if DEBUG: 

263 if app not in self._apps: 

264 raise RuntimeError( 

265 f"Expected one of the following apps {self._apps!r}, got {app!r}" 

266 ) 

267 self._current_app = app 

268 

269 def freeze(self) -> None: 

270 self._frozen = True 

271 

272 def __repr__(self) -> str: 

273 return f"<MatchInfo {super().__repr__()}: {self._route}>" 

274 

275 

276class MatchInfoError(UrlMappingMatchInfo): 

277 

278 __slots__ = ("_exception",) 

279 

280 def __init__(self, http_exception: HTTPException) -> None: 

281 self._exception = http_exception 

282 super().__init__({}, SystemRoute(self._exception)) 

283 

284 @property 

285 def http_exception(self) -> HTTPException: 

286 return self._exception 

287 

288 def __repr__(self) -> str: 

289 return f"<MatchInfoError {self._exception.status}: {self._exception.reason}>" 

290 

291 

292async def _default_expect_handler(request: Request) -> None: 

293 """Default handler for Expect header. 

294 

295 Just send "100 Continue" to client. 

296 raise HTTPExpectationFailed if value of header is not "100-continue" 

297 """ 

298 expect = request.headers.get(hdrs.EXPECT, "") 

299 if request.version == HttpVersion11: 

300 if expect.lower() == "100-continue": 

301 await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n") 

302 # Reset output_size as we haven't started the main body yet. 

303 request.writer.output_size = 0 

304 else: 

305 raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect) 

306 

307 

308class Resource(AbstractResource): 

309 def __init__(self, *, name: str | None = None) -> None: 

310 super().__init__(name=name) 

311 self._routes: dict[str, ResourceRoute] = {} 

312 self._any_route: ResourceRoute | None = None 

313 self._allowed_methods: set[str] = set() 

314 

315 def add_route( 

316 self, 

317 method: str, 

318 handler: type[AbstractView] | Handler, 

319 *, 

320 expect_handler: _ExpectHandler | None = None, 

321 ) -> "ResourceRoute": 

322 if route := self._routes.get(method, self._any_route): 

323 raise RuntimeError( 

324 "Added route will never be executed, " 

325 f"method {route.method} is already " 

326 "registered" 

327 ) 

328 

329 route_obj = ResourceRoute(method, handler, self, expect_handler=expect_handler) 

330 self.register_route(route_obj) 

331 return route_obj 

332 

333 def register_route(self, route: "ResourceRoute") -> None: 

334 assert isinstance( 

335 route, ResourceRoute 

336 ), f"Instance of Route class is required, got {route!r}" 

337 if route.method == hdrs.METH_ANY: 

338 self._any_route = route 

339 self._allowed_methods.add(route.method) 

340 self._routes[route.method] = route 

341 

342 async def resolve(self, request: Request) -> _Resolve: 

343 if (match_dict := self._match(request.rel_url.path_safe)) is None: 

344 return None, set() 

345 if route := self._routes.get(request.method, self._any_route): 

346 return UrlMappingMatchInfo(match_dict, route), self._allowed_methods 

347 return None, self._allowed_methods 

348 

349 @abc.abstractmethod 

350 def _match(self, path: str) -> dict[str, str] | None: 

351 """Return dict of path values if path matches this resource, otherwise None.""" 

352 

353 def __len__(self) -> int: 

354 return len(self._routes) 

355 

356 def __iter__(self) -> Iterator["ResourceRoute"]: 

357 return iter(self._routes.values()) 

358 

359 # TODO: implement all abstract methods 

360 

361 

362class PlainResource(Resource): 

363 def __init__(self, path: str, *, name: str | None = None) -> None: 

364 super().__init__(name=name) 

365 assert not path or path.startswith("/") 

366 self._path = path 

367 

368 @property 

369 def canonical(self) -> str: 

370 return self._path 

371 

372 def freeze(self) -> None: 

373 if not self._path: 

374 self._path = "/" 

375 

376 def add_prefix(self, prefix: str) -> None: 

377 assert prefix.startswith("/") 

378 assert not prefix.endswith("/") 

379 assert len(prefix) > 1 

380 self._path = prefix + self._path 

381 

382 def _match(self, path: str) -> dict[str, str] | None: 

383 # string comparison is about 10 times faster than regexp matching 

384 if self._path == path: 

385 return {} 

386 return None 

387 

388 def raw_match(self, path: str) -> bool: 

389 return self._path == path 

390 

391 def get_info(self) -> _InfoDict: 

392 return {"path": self._path} 

393 

394 def url_for(self) -> URL: # type: ignore[override] 

395 return URL.build(path=self._path, encoded=True) 

396 

397 def __repr__(self) -> str: 

398 name = "'" + self.name + "' " if self.name is not None else "" 

399 return f"<PlainResource {name} {self._path}>" 

400 

401 

402class DynamicResource(Resource): 

403 DYN = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}") 

404 DYN_WITH_RE = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}") 

405 GOOD = r"[^{}/]+" 

406 

407 def __init__(self, path: str, *, name: str | None = None) -> None: 

408 super().__init__(name=name) 

409 self._orig_path = path 

410 pattern = "" 

411 formatter = "" 

412 for part in ROUTE_RE.split(path): 

413 match = self.DYN.fullmatch(part) 

414 if match: 

415 pattern += "(?P<{}>{})".format(match.group("var"), self.GOOD) 

416 formatter += "{" + match.group("var") + "}" 

417 continue 

418 

419 match = self.DYN_WITH_RE.fullmatch(part) 

420 if match: 

421 pattern += "(?P<{var}>{re})".format(**match.groupdict()) 

422 formatter += "{" + match.group("var") + "}" 

423 continue 

424 

425 if "{" in part or "}" in part: 

426 raise ValueError(f"Invalid path '{path}'['{part}']") 

427 

428 part = _requote_path(part) 

429 formatter += part 

430 pattern += re.escape(part) 

431 

432 try: 

433 compiled = re.compile(pattern) 

434 except re.error as exc: 

435 raise ValueError(f"Bad pattern '{pattern}': {exc}") from None 

436 assert compiled.pattern.startswith(PATH_SEP) 

437 assert formatter.startswith("/") 

438 self._pattern = compiled 

439 self._formatter = formatter 

440 

441 @property 

442 def canonical(self) -> str: 

443 return self._formatter 

444 

445 def add_prefix(self, prefix: str) -> None: 

446 assert prefix.startswith("/") 

447 assert not prefix.endswith("/") 

448 assert len(prefix) > 1 

449 self._pattern = re.compile(re.escape(prefix) + self._pattern.pattern) 

450 self._formatter = prefix + self._formatter 

451 

452 def _match(self, path: str) -> dict[str, str] | None: 

453 match = self._pattern.fullmatch(path) 

454 if match is None: 

455 return None 

456 return { 

457 key: _unquote_path_safe(value) for key, value in match.groupdict().items() 

458 } 

459 

460 def raw_match(self, path: str) -> bool: 

461 return self._orig_path == path 

462 

463 def get_info(self) -> _InfoDict: 

464 return {"formatter": self._formatter, "pattern": self._pattern} 

465 

466 def url_for(self, **parts: str) -> URL: 

467 url = self._formatter.format_map({k: _quote_path(v) for k, v in parts.items()}) 

468 return URL.build(path=url, encoded=True) 

469 

470 def __repr__(self) -> str: 

471 name = "'" + self.name + "' " if self.name is not None else "" 

472 return f"<DynamicResource {name} {self._formatter}>" 

473 

474 

475class PrefixResource(AbstractResource): 

476 def __init__(self, prefix: str, *, name: str | None = None) -> None: 

477 assert not prefix or prefix.startswith("/"), prefix 

478 assert prefix in ("", "/") or not prefix.endswith("/"), prefix 

479 super().__init__(name=name) 

480 self._prefix = _requote_path(prefix) 

481 self._prefix2 = self._prefix + "/" 

482 

483 @property 

484 def canonical(self) -> str: 

485 return self._prefix 

486 

487 def add_prefix(self, prefix: str) -> None: 

488 assert prefix.startswith("/") 

489 assert not prefix.endswith("/") 

490 assert len(prefix) > 1 

491 self._prefix = prefix + self._prefix 

492 self._prefix2 = self._prefix + "/" 

493 

494 def raw_match(self, prefix: str) -> bool: 

495 return False 

496 

497 # TODO: impl missing abstract methods 

498 

499 

500class StaticResource(PrefixResource): 

501 VERSION_KEY = "v" 

502 

503 def __init__( 

504 self, 

505 prefix: str, 

506 directory: PathLike, 

507 *, 

508 name: str | None = None, 

509 expect_handler: _ExpectHandler | None = None, 

510 chunk_size: int = 256 * 1024, 

511 show_index: bool = False, 

512 follow_symlinks: bool = False, 

513 append_version: bool = False, 

514 ) -> None: 

515 super().__init__(prefix, name=name) 

516 try: 

517 directory = Path(directory).expanduser().resolve(strict=True) 

518 except FileNotFoundError as error: 

519 raise ValueError(f"'{directory}' does not exist") from error 

520 if not directory.is_dir(): 

521 raise ValueError(f"'{directory}' is not a directory") 

522 self._directory = directory 

523 self._show_index = show_index 

524 self._chunk_size = chunk_size 

525 self._follow_symlinks = follow_symlinks 

526 self._expect_handler = expect_handler 

527 self._append_version = append_version 

528 

529 self._routes = { 

530 "GET": ResourceRoute( 

531 "GET", self._handle, self, expect_handler=expect_handler 

532 ), 

533 "HEAD": ResourceRoute( 

534 "HEAD", self._handle, self, expect_handler=expect_handler 

535 ), 

536 } 

537 self._allowed_methods = set(self._routes) 

538 

539 def url_for( # type: ignore[override] 

540 self, 

541 *, 

542 filename: PathLike, 

543 append_version: bool | None = None, 

544 ) -> URL: 

545 if append_version is None: 

546 append_version = self._append_version 

547 filename = str(filename).lstrip("/") 

548 

549 url = URL.build(path=self._prefix, encoded=True) 

550 # filename is not encoded 

551 url = url / filename 

552 

553 if append_version: 

554 unresolved_path = self._directory.joinpath(filename) 

555 try: 

556 if self._follow_symlinks: 

557 normalized_path = Path(os.path.normpath(unresolved_path)) 

558 normalized_path.relative_to(self._directory) 

559 filepath = normalized_path.resolve() 

560 else: 

561 filepath = unresolved_path.resolve() 

562 filepath.relative_to(self._directory) 

563 except (ValueError, FileNotFoundError): 

564 # ValueError for case when path point to symlink 

565 # with follow_symlinks is False 

566 return url # relatively safe 

567 if filepath.is_file(): 

568 # TODO cache file content 

569 # with file watcher for cache invalidation 

570 with filepath.open("rb") as f: 

571 file_bytes = f.read() 

572 h = self._get_file_hash(file_bytes) 

573 url = url.with_query({self.VERSION_KEY: h}) 

574 return url 

575 return url 

576 

577 @staticmethod 

578 def _get_file_hash(byte_array: bytes) -> str: 

579 m = hashlib.sha256() # todo sha256 can be configurable param 

580 m.update(byte_array) 

581 b64 = base64.urlsafe_b64encode(m.digest()) 

582 return b64.decode("ascii") 

583 

584 def get_info(self) -> _InfoDict: 

585 return { 

586 "directory": self._directory, 

587 "prefix": self._prefix, 

588 "routes": self._routes, 

589 } 

590 

591 def set_options_route(self, handler: Handler) -> None: 

592 if "OPTIONS" in self._routes: 

593 raise RuntimeError("OPTIONS route was set already") 

594 self._routes["OPTIONS"] = ResourceRoute( 

595 "OPTIONS", handler, self, expect_handler=self._expect_handler 

596 ) 

597 self._allowed_methods.add("OPTIONS") 

598 

599 async def resolve(self, request: Request) -> _Resolve: 

600 path = request.rel_url.path_safe 

601 method = request.method 

602 # We normalise here to avoid matches that traverse below the static root. 

603 # e.g. /static/../../../../home/user/webapp/static/ 

604 norm_path = os.path.normpath(path) 

605 if IS_WINDOWS: 

606 norm_path = norm_path.replace("\\", "/") 

607 if not norm_path.startswith(self._prefix2) and norm_path != self._prefix: 

608 return None, set() 

609 

610 allowed_methods = self._allowed_methods 

611 if method not in allowed_methods: 

612 return None, allowed_methods 

613 

614 match_dict = {"filename": _unquote_path_safe(path[len(self._prefix) + 1 :])} 

615 return (UrlMappingMatchInfo(match_dict, self._routes[method]), allowed_methods) 

616 

617 def __len__(self) -> int: 

618 return len(self._routes) 

619 

620 def __iter__(self) -> Iterator[AbstractRoute]: 

621 return iter(self._routes.values()) 

622 

623 async def _handle(self, request: Request) -> StreamResponse: 

624 filename = request.match_info["filename"] 

625 if Path(filename).is_absolute(): 

626 # filename is an absolute path e.g. //network/share or D:\path 

627 # which could be a UNC path leading to NTLM credential theft 

628 raise HTTPNotFound() 

629 unresolved_path = self._directory.joinpath(filename) 

630 loop = asyncio.get_running_loop() 

631 return await loop.run_in_executor( 

632 None, self._resolve_path_to_response, unresolved_path 

633 ) 

634 

635 def _resolve_path_to_response(self, unresolved_path: Path) -> StreamResponse: 

636 """Take the unresolved path and query the file system to form a response.""" 

637 # Check for access outside the root directory. For follow symlinks, URI 

638 # cannot traverse out, but symlinks can. Otherwise, no access outside 

639 # root is permitted. 

640 try: 

641 if self._follow_symlinks: 

642 normalized_path = Path(os.path.normpath(unresolved_path)) 

643 normalized_path.relative_to(self._directory) 

644 file_path = normalized_path.resolve() 

645 else: 

646 file_path = unresolved_path.resolve() 

647 file_path.relative_to(self._directory) 

648 except (ValueError, *CIRCULAR_SYMLINK_ERROR) as error: 

649 # ValueError is raised for the relative check. Circular symlinks 

650 # raise here on resolving for python < 3.13. 

651 raise HTTPNotFound() from error 

652 

653 # if path is a directory, return the contents if permitted. Note the 

654 # directory check will raise if a segment is not readable. 

655 try: 

656 if file_path.is_dir(): 

657 if self._show_index: 

658 return Response( 

659 text=self._directory_as_html(file_path), 

660 content_type="text/html", 

661 ) 

662 else: 

663 raise HTTPForbidden() 

664 except PermissionError as error: 

665 raise HTTPForbidden() from error 

666 

667 # Return the file response, which handles all other checks. 

668 return FileResponse(file_path, chunk_size=self._chunk_size) 

669 

670 def _directory_as_html(self, dir_path: Path) -> str: 

671 """returns directory's index as html.""" 

672 assert dir_path.is_dir() 

673 

674 relative_path_to_dir = dir_path.relative_to(self._directory).as_posix() 

675 index_of = f"Index of /{html_escape(relative_path_to_dir)}" 

676 h1 = f"<h1>{index_of}</h1>" 

677 

678 index_list = [] 

679 dir_index = dir_path.iterdir() 

680 for _file in sorted(dir_index): 

681 # show file url as relative to static path 

682 rel_path = _file.relative_to(self._directory).as_posix() 

683 quoted_file_url = _quote_path(f"{self._prefix}/{rel_path}") 

684 

685 # if file is a directory, add '/' to the end of the name 

686 if _file.is_dir(): 

687 file_name = f"{_file.name}/" 

688 else: 

689 file_name = _file.name 

690 

691 index_list.append( 

692 f'<li><a href="{quoted_file_url}">{html_escape(file_name)}</a></li>' 

693 ) 

694 ul = "<ul>\n{}\n</ul>".format("\n".join(index_list)) 

695 body = f"<body>\n{h1}\n{ul}\n</body>" 

696 

697 head_str = f"<head>\n<title>{index_of}</title>\n</head>" 

698 html = f"<html>\n{head_str}\n{body}\n</html>" 

699 

700 return html 

701 

702 def __repr__(self) -> str: 

703 name = "'" + self.name + "'" if self.name is not None else "" 

704 return f"<StaticResource {name} {self._prefix} -> {self._directory!r}>" 

705 

706 

707class PrefixedSubAppResource(PrefixResource): 

708 def __init__(self, prefix: str, app: "Application") -> None: 

709 super().__init__(prefix) 

710 self._app = app 

711 self._add_prefix_to_resources(prefix) 

712 

713 def add_prefix(self, prefix: str) -> None: 

714 super().add_prefix(prefix) 

715 self._add_prefix_to_resources(prefix) 

716 

717 def _add_prefix_to_resources(self, prefix: str) -> None: 

718 router = self._app.router 

719 for resource in router.resources(): 

720 # Since the canonical path of a resource is about 

721 # to change, we need to unindex it and then reindex 

722 router.unindex_resource(resource) 

723 resource.add_prefix(prefix) 

724 router.index_resource(resource) 

725 

726 def url_for(self, *args: str, **kwargs: str) -> URL: 

727 raise RuntimeError(".url_for() is not supported by sub-application root") 

728 

729 def get_info(self) -> _InfoDict: 

730 return {"app": self._app, "prefix": self._prefix} 

731 

732 async def resolve(self, request: Request) -> _Resolve: 

733 match_info = await self._app.router.resolve(request) 

734 match_info.add_app(self._app) 

735 if isinstance(match_info.http_exception, HTTPMethodNotAllowed): 

736 methods = match_info.http_exception.allowed_methods 

737 else: 

738 methods = set() 

739 return match_info, methods 

740 

741 def __len__(self) -> int: 

742 return len(self._app.router.routes()) 

743 

744 def __iter__(self) -> Iterator[AbstractRoute]: 

745 return iter(self._app.router.routes()) 

746 

747 def __repr__(self) -> str: 

748 return f"<PrefixedSubAppResource {self._prefix} -> {self._app!r}>" 

749 

750 

751class AbstractRuleMatching(abc.ABC): 

752 @abc.abstractmethod # pragma: no branch 

753 async def match(self, request: Request) -> bool: 

754 """Return bool if the request satisfies the criteria""" 

755 

756 @abc.abstractmethod # pragma: no branch 

757 def get_info(self) -> _InfoDict: 

758 """Return a dict with additional info useful for introspection""" 

759 

760 @property 

761 @abc.abstractmethod # pragma: no branch 

762 def canonical(self) -> str: 

763 """Return a str""" 

764 

765 

766class Domain(AbstractRuleMatching): 

767 re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)") 

768 

769 def __init__(self, domain: str) -> None: 

770 super().__init__() 

771 self._domain = self.validation(domain) 

772 

773 @property 

774 def canonical(self) -> str: 

775 return self._domain 

776 

777 def validation(self, domain: str) -> str: 

778 if not isinstance(domain, str): 

779 raise TypeError("Domain must be str") 

780 domain = domain.rstrip(".").lower() 

781 if not domain: 

782 raise ValueError("Domain cannot be empty") 

783 elif "://" in domain: 

784 raise ValueError("Scheme not supported") 

785 url = URL("http://" + domain) 

786 assert url.raw_host is not None 

787 if not all(self.re_part.fullmatch(x) for x in url.raw_host.split(".")): 

788 raise ValueError("Domain not valid") 

789 if url.port == 80: 

790 return url.raw_host 

791 return f"{url.raw_host}:{url.port}" 

792 

793 async def match(self, request: Request) -> bool: 

794 host = request.headers.get(hdrs.HOST) 

795 if not host: 

796 return False 

797 return self.match_domain(host) 

798 

799 def match_domain(self, host: str) -> bool: 

800 return host.lower() == self._domain 

801 

802 def get_info(self) -> _InfoDict: 

803 return {"domain": self._domain} 

804 

805 

806class MaskDomain(Domain): 

807 re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)") 

808 

809 def __init__(self, domain: str) -> None: 

810 super().__init__(domain) 

811 mask = self._domain.replace(".", r"\.").replace("*", ".*") 

812 self._mask = re.compile(mask) 

813 

814 @property 

815 def canonical(self) -> str: 

816 return self._mask.pattern 

817 

818 def match_domain(self, host: str) -> bool: 

819 return self._mask.fullmatch(host) is not None 

820 

821 

822class MatchedSubAppResource(PrefixedSubAppResource): 

823 def __init__(self, rule: AbstractRuleMatching, app: "Application") -> None: 

824 AbstractResource.__init__(self) 

825 self._prefix = "" 

826 self._app = app 

827 self._rule = rule 

828 

829 @property 

830 def canonical(self) -> str: 

831 return self._rule.canonical 

832 

833 def get_info(self) -> _InfoDict: 

834 return {"app": self._app, "rule": self._rule} 

835 

836 async def resolve(self, request: Request) -> _Resolve: 

837 if not await self._rule.match(request): 

838 return None, set() 

839 match_info = await self._app.router.resolve(request) 

840 match_info.add_app(self._app) 

841 if isinstance(match_info.http_exception, HTTPMethodNotAllowed): 

842 methods = match_info.http_exception.allowed_methods 

843 else: 

844 methods = set() 

845 return match_info, methods 

846 

847 def __repr__(self) -> str: 

848 return f"<MatchedSubAppResource -> {self._app!r}>" 

849 

850 

851class ResourceRoute(AbstractRoute): 

852 """A route with resource""" 

853 

854 def __init__( 

855 self, 

856 method: str, 

857 handler: Handler | type[AbstractView], 

858 resource: AbstractResource, 

859 *, 

860 expect_handler: _ExpectHandler | None = None, 

861 ) -> None: 

862 super().__init__( 

863 method, handler, expect_handler=expect_handler, resource=resource 

864 ) 

865 

866 def __repr__(self) -> str: 

867 return f"<ResourceRoute [{self.method}] {self._resource} -> {self.handler!r}" 

868 

869 @property 

870 def name(self) -> str | None: 

871 if self._resource is None: 

872 return None 

873 return self._resource.name 

874 

875 def url_for(self, *args: str, **kwargs: str) -> URL: 

876 """Construct url for route with additional params.""" 

877 assert self._resource is not None 

878 return self._resource.url_for(*args, **kwargs) 

879 

880 def get_info(self) -> _InfoDict: 

881 assert self._resource is not None 

882 return self._resource.get_info() 

883 

884 

885class SystemRoute(AbstractRoute): 

886 def __init__(self, http_exception: HTTPException) -> None: 

887 super().__init__(hdrs.METH_ANY, self._handle) 

888 self._http_exception = http_exception 

889 

890 def url_for(self, *args: str, **kwargs: str) -> URL: 

891 raise RuntimeError(".url_for() is not allowed for SystemRoute") 

892 

893 @property 

894 def name(self) -> str | None: 

895 return None 

896 

897 def get_info(self) -> _InfoDict: 

898 return {"http_exception": self._http_exception} 

899 

900 async def _handle(self, request: Request) -> StreamResponse: 

901 raise self._http_exception 

902 

903 @property 

904 def status(self) -> int: 

905 return self._http_exception.status 

906 

907 @property 

908 def reason(self) -> str: 

909 return self._http_exception.reason 

910 

911 def __repr__(self) -> str: 

912 return f"<SystemRoute {self.status}: {self.reason}>" 

913 

914 

915class View(AbstractView): 

916 async def _iter(self) -> StreamResponse: 

917 if self.request.method not in hdrs.METH_ALL: 

918 self._raise_allowed_methods() 

919 method: Callable[[], Awaitable[StreamResponse]] | None = getattr( 

920 self, self.request.method.lower(), None 

921 ) 

922 if method is None: 

923 self._raise_allowed_methods() 

924 return await method() 

925 

926 def __await__(self) -> Generator[None, None, StreamResponse]: 

927 return self._iter().__await__() 

928 

929 def _raise_allowed_methods(self) -> NoReturn: 

930 allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())} 

931 raise HTTPMethodNotAllowed(self.request.method, allowed_methods) 

932 

933 

934class ResourcesView(Sized, Iterable[AbstractResource], Container[AbstractResource]): 

935 def __init__(self, resources: list[AbstractResource]) -> None: 

936 self._resources = resources 

937 

938 def __len__(self) -> int: 

939 return len(self._resources) 

940 

941 def __iter__(self) -> Iterator[AbstractResource]: 

942 yield from self._resources 

943 

944 def __contains__(self, resource: object) -> bool: 

945 return resource in self._resources 

946 

947 

948class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]): 

949 def __init__(self, resources: list[AbstractResource]): 

950 self._routes: list[AbstractRoute] = [] 

951 for resource in resources: 

952 for route in resource: 

953 self._routes.append(route) 

954 

955 def __len__(self) -> int: 

956 return len(self._routes) 

957 

958 def __iter__(self) -> Iterator[AbstractRoute]: 

959 yield from self._routes 

960 

961 def __contains__(self, route: object) -> bool: 

962 return route in self._routes 

963 

964 

965class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]): 

966 NAME_SPLIT_RE = re.compile(r"[.:-]") 

967 HTTP_NOT_FOUND = HTTPNotFound() 

968 

969 def __init__(self) -> None: 

970 super().__init__() 

971 self._resources: list[AbstractResource] = [] 

972 self._named_resources: dict[str, AbstractResource] = {} 

973 self._resource_index: dict[str, list[AbstractResource]] = {} 

974 self._matched_sub_app_resources: list[MatchedSubAppResource] = [] 

975 

976 async def resolve(self, request: Request) -> UrlMappingMatchInfo: 

977 resource_index = self._resource_index 

978 allowed_methods: set[str] = set() 

979 

980 # MatchedSubAppResource is primarily used to match on domain names 

981 # (though custom rules could match on other things). This means that 

982 # the traversal algorithm below can't be applied, and that we likely 

983 # need to check these first so a sub app that defines the same path 

984 # as a parent app will get priority if there's a domain match. 

985 # 

986 # For most cases we do not expect there to be many of these since 

987 # currently they are only added by `.add_domain()`. 

988 for resource in self._matched_sub_app_resources: 

989 match_dict, allowed = await resource.resolve(request) 

990 if match_dict is not None: 

991 return match_dict 

992 else: 

993 allowed_methods |= allowed 

994 

995 # Walk the url parts looking for candidates. We walk the url backwards 

996 # to ensure the most explicit match is found first. If there are multiple 

997 # candidates for a given url part because there are multiple resources 

998 # registered for the same canonical path, we resolve them in a linear 

999 # fashion to ensure registration order is respected. 

1000 url_part = request.rel_url.path_safe 

1001 while url_part: 

1002 for candidate in resource_index.get(url_part, ()): 

1003 match_dict, allowed = await candidate.resolve(request) 

1004 if match_dict is not None: 

1005 return match_dict 

1006 else: 

1007 allowed_methods |= allowed 

1008 if url_part == "/": 

1009 break 

1010 url_part = url_part.rpartition("/")[0] or "/" 

1011 

1012 if allowed_methods: 

1013 return MatchInfoError(HTTPMethodNotAllowed(request.method, allowed_methods)) 

1014 

1015 return MatchInfoError(self.HTTP_NOT_FOUND) 

1016 

1017 def __iter__(self) -> Iterator[str]: 

1018 return iter(self._named_resources) 

1019 

1020 def __len__(self) -> int: 

1021 return len(self._named_resources) 

1022 

1023 def __contains__(self, resource: object) -> bool: 

1024 return resource in self._named_resources 

1025 

1026 def __getitem__(self, name: str) -> AbstractResource: 

1027 return self._named_resources[name] 

1028 

1029 def resources(self) -> ResourcesView: 

1030 return ResourcesView(self._resources) 

1031 

1032 def routes(self) -> RoutesView: 

1033 return RoutesView(self._resources) 

1034 

1035 def named_resources(self) -> Mapping[str, AbstractResource]: 

1036 return MappingProxyType(self._named_resources) 

1037 

1038 def register_resource(self, resource: AbstractResource) -> None: 

1039 assert isinstance( 

1040 resource, AbstractResource 

1041 ), f"Instance of AbstractResource class is required, got {resource!r}" 

1042 if self.frozen: 

1043 raise RuntimeError("Cannot register a resource into frozen router.") 

1044 

1045 name = resource.name 

1046 

1047 if name is not None: 

1048 parts = self.NAME_SPLIT_RE.split(name) 

1049 for part in parts: 

1050 if keyword.iskeyword(part): 

1051 raise ValueError( 

1052 f"Incorrect route name {name!r}, " 

1053 "python keywords cannot be used " 

1054 "for route name" 

1055 ) 

1056 if not part.isidentifier(): 

1057 raise ValueError( 

1058 f"Incorrect route name {name!r}, " 

1059 "the name should be a sequence of " 

1060 "python identifiers separated " 

1061 "by dash, dot or column" 

1062 ) 

1063 if name in self._named_resources: 

1064 raise ValueError( 

1065 f"Duplicate {name!r}, " 

1066 f"already handled by {self._named_resources[name]!r}" 

1067 ) 

1068 self._named_resources[name] = resource 

1069 self._resources.append(resource) 

1070 

1071 if isinstance(resource, MatchedSubAppResource): 

1072 # We cannot index match sub-app resources because they have match rules 

1073 self._matched_sub_app_resources.append(resource) 

1074 else: 

1075 self.index_resource(resource) 

1076 

1077 def _get_resource_index_key(self, resource: AbstractResource) -> str: 

1078 """Return a key to index the resource in the resource index.""" 

1079 if "{" in (index_key := resource.canonical): 

1080 # strip at the first { to allow for variables, and than 

1081 # rpartition at / to allow for variable parts in the path 

1082 # For example if the canonical path is `/core/locations{tail:.*}` 

1083 # the index key will be `/core` since index is based on the 

1084 # url parts split by `/` 

1085 index_key = index_key.partition("{")[0].rpartition("/")[0] 

1086 return index_key.rstrip("/") or "/" 

1087 

1088 def index_resource(self, resource: AbstractResource) -> None: 

1089 """Add a resource to the resource index.""" 

1090 resource_key = self._get_resource_index_key(resource) 

1091 # There may be multiple resources for a canonical path 

1092 # so we keep them in a list to ensure that registration 

1093 # order is respected. 

1094 self._resource_index.setdefault(resource_key, []).append(resource) 

1095 

1096 def unindex_resource(self, resource: AbstractResource) -> None: 

1097 """Remove a resource from the resource index.""" 

1098 resource_key = self._get_resource_index_key(resource) 

1099 self._resource_index[resource_key].remove(resource) 

1100 

1101 def add_resource(self, path: str, *, name: str | None = None) -> Resource: 

1102 if path and not path.startswith("/"): 

1103 raise ValueError("path should be started with / or be empty") 

1104 # Reuse last added resource if path and name are the same 

1105 if self._resources: 

1106 resource = self._resources[-1] 

1107 if resource.name == name and resource.raw_match(path): 

1108 return cast(Resource, resource) 

1109 if not ("{" in path or "}" in path or ROUTE_RE.search(path)): 

1110 resource = PlainResource(path, name=name) 

1111 self.register_resource(resource) 

1112 return resource 

1113 resource = DynamicResource(path, name=name) 

1114 self.register_resource(resource) 

1115 return resource 

1116 

1117 def add_route( 

1118 self, 

1119 method: str, 

1120 path: str, 

1121 handler: Handler | type[AbstractView], 

1122 *, 

1123 name: str | None = None, 

1124 expect_handler: _ExpectHandler | None = None, 

1125 ) -> AbstractRoute: 

1126 resource = self.add_resource(path, name=name) 

1127 return resource.add_route(method, handler, expect_handler=expect_handler) 

1128 

1129 def add_static( 

1130 self, 

1131 prefix: str, 

1132 path: PathLike, 

1133 *, 

1134 name: str | None = None, 

1135 expect_handler: _ExpectHandler | None = None, 

1136 chunk_size: int = 256 * 1024, 

1137 show_index: bool = False, 

1138 follow_symlinks: bool = False, 

1139 append_version: bool = False, 

1140 ) -> StaticResource: 

1141 """Add static files view. 

1142 

1143 prefix - url prefix 

1144 path - folder with files 

1145 

1146 """ 

1147 assert prefix.startswith("/") 

1148 if prefix.endswith("/"): 

1149 prefix = prefix[:-1] 

1150 resource = StaticResource( 

1151 prefix, 

1152 path, 

1153 name=name, 

1154 expect_handler=expect_handler, 

1155 chunk_size=chunk_size, 

1156 show_index=show_index, 

1157 follow_symlinks=follow_symlinks, 

1158 append_version=append_version, 

1159 ) 

1160 self.register_resource(resource) 

1161 return resource 

1162 

1163 def add_head(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1164 """Shortcut for add_route with method HEAD.""" 

1165 return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs) 

1166 

1167 def add_options(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1168 """Shortcut for add_route with method OPTIONS.""" 

1169 return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs) 

1170 

1171 def add_get( 

1172 self, 

1173 path: str, 

1174 handler: Handler, 

1175 *, 

1176 name: str | None = None, 

1177 allow_head: bool = True, 

1178 **kwargs: Any, 

1179 ) -> AbstractRoute: 

1180 """Shortcut for add_route with method GET. 

1181 

1182 If allow_head is true, another 

1183 route is added allowing head requests to the same endpoint. 

1184 """ 

1185 resource = self.add_resource(path, name=name) 

1186 if allow_head: 

1187 resource.add_route(hdrs.METH_HEAD, handler, **kwargs) 

1188 return resource.add_route(hdrs.METH_GET, handler, **kwargs) 

1189 

1190 def add_post(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1191 """Shortcut for add_route with method POST.""" 

1192 return self.add_route(hdrs.METH_POST, path, handler, **kwargs) 

1193 

1194 def add_put(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1195 """Shortcut for add_route with method PUT.""" 

1196 return self.add_route(hdrs.METH_PUT, path, handler, **kwargs) 

1197 

1198 def add_patch(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1199 """Shortcut for add_route with method PATCH.""" 

1200 return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs) 

1201 

1202 def add_delete(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1203 """Shortcut for add_route with method DELETE.""" 

1204 return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs) 

1205 

1206 def add_view( 

1207 self, path: str, handler: type[AbstractView], **kwargs: Any 

1208 ) -> AbstractRoute: 

1209 """Shortcut for add_route with ANY methods for a class-based view.""" 

1210 return self.add_route(hdrs.METH_ANY, path, handler, **kwargs) 

1211 

1212 def freeze(self) -> None: 

1213 super().freeze() 

1214 for resource in self._resources: 

1215 resource.freeze() 

1216 

1217 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> list[AbstractRoute]: 

1218 """Append routes to route table. 

1219 

1220 Parameter should be a sequence of RouteDef objects. 

1221 

1222 Returns a list of registered AbstractRoute instances. 

1223 """ 

1224 registered_routes = [] 

1225 for route_def in routes: 

1226 registered_routes.extend(route_def.register(self)) 

1227 return registered_routes 

1228 

1229 

1230def _quote_path(value: str) -> str: 

1231 return URL.build(path=value, encoded=False).raw_path 

1232 

1233 

1234def _unquote_path_safe(value: str) -> str: 

1235 if "%" not in value: 

1236 return value 

1237 return value.replace("%2F", "/").replace("%25", "%") 

1238 

1239 

1240def _requote_path(value: str) -> str: 

1241 # Quote non-ascii characters and other characters which must be quoted, 

1242 # but preserve existing %-sequences. 

1243 result = _quote_path(value) 

1244 if "%" in value: 

1245 result = result.replace("%25", "%") 

1246 return result