Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_urldispatcher.py: 37%

728 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-09 06:47 +0000

1import abc 

2import asyncio 

3import base64 

4import hashlib 

5import keyword 

6import os 

7import re 

8from contextlib import contextmanager 

9from pathlib import Path 

10from types import MappingProxyType 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Awaitable, 

15 Callable, 

16 Container, 

17 Dict, 

18 Final, 

19 Generator, 

20 Iterable, 

21 Iterator, 

22 List, 

23 Mapping, 

24 NoReturn, 

25 Optional, 

26 Pattern, 

27 Set, 

28 Sized, 

29 Tuple, 

30 Type, 

31 TypedDict, 

32 Union, 

33 cast, 

34) 

35 

36from yarl import URL, __version__ as yarl_version # type: ignore[attr-defined] 

37 

38from . import hdrs 

39from .abc import AbstractMatchInfo, AbstractRouter, AbstractView 

40from .helpers import DEBUG 

41from .http import HttpVersion11 

42from .typedefs import Handler, PathLike 

43from .web_exceptions import ( 

44 HTTPException, 

45 HTTPExpectationFailed, 

46 HTTPForbidden, 

47 HTTPMethodNotAllowed, 

48 HTTPNotFound, 

49) 

50from .web_fileresponse import FileResponse 

51from .web_request import Request 

52from .web_response import Response, StreamResponse 

53from .web_routedef import AbstractRouteDef 

54 

55__all__ = ( 

56 "UrlDispatcher", 

57 "UrlMappingMatchInfo", 

58 "AbstractResource", 

59 "Resource", 

60 "PlainResource", 

61 "DynamicResource", 

62 "AbstractRoute", 

63 "ResourceRoute", 

64 "StaticResource", 

65 "View", 

66) 

67 

68 

69if TYPE_CHECKING: 

70 from .web_app import Application 

71 

72 BaseDict = Dict[str, str] 

73else: 

74 BaseDict = dict 

75 

76YARL_VERSION: Final[Tuple[int, ...]] = tuple(map(int, yarl_version.split(".")[:2])) 

77 

78HTTP_METHOD_RE: Final[Pattern[str]] = re.compile( 

79 r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$" 

80) 

81ROUTE_RE: Final[Pattern[str]] = re.compile( 

82 r"(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})" 

83) 

84PATH_SEP: Final[str] = re.escape("/") 

85 

86 

87_ExpectHandler = Callable[[Request], Awaitable[Optional[StreamResponse]]] 

88_Resolve = Tuple[Optional["UrlMappingMatchInfo"], Set[str]] 

89 

90 

91class _InfoDict(TypedDict, total=False): 

92 path: str 

93 

94 formatter: str 

95 pattern: Pattern[str] 

96 

97 directory: Path 

98 prefix: str 

99 routes: Mapping[str, "AbstractRoute"] 

100 

101 app: "Application" 

102 

103 domain: str 

104 

105 rule: "AbstractRuleMatching" 

106 

107 http_exception: HTTPException 

108 

109 

110class AbstractResource(Sized, Iterable["AbstractRoute"]): 

111 def __init__(self, *, name: Optional[str] = None) -> None: 

112 self._name = name 

113 

114 @property 

115 def name(self) -> Optional[str]: 

116 return self._name 

117 

118 @property 

119 @abc.abstractmethod 

120 def canonical(self) -> str: 

121 """Exposes the resource's canonical path. 

122 

123 For example '/foo/bar/{name}' 

124 

125 """ 

126 

127 @abc.abstractmethod # pragma: no branch 

128 def url_for(self, **kwargs: str) -> URL: 

129 """Construct url for resource with additional params.""" 

130 

131 @abc.abstractmethod # pragma: no branch 

132 async def resolve(self, request: Request) -> _Resolve: 

133 """Resolve resource. 

134 

135 Return (UrlMappingMatchInfo, allowed_methods) pair. 

136 """ 

137 

138 @abc.abstractmethod 

139 def add_prefix(self, prefix: str) -> None: 

140 """Add a prefix to processed URLs. 

141 

142 Required for subapplications support. 

143 """ 

144 

145 @abc.abstractmethod 

146 def get_info(self) -> _InfoDict: 

147 """Return a dict with additional info useful for introspection""" 

148 

149 def freeze(self) -> None: 

150 pass 

151 

152 @abc.abstractmethod 

153 def raw_match(self, path: str) -> bool: 

154 """Perform a raw match against path""" 

155 

156 

157class AbstractRoute(abc.ABC): 

158 def __init__( 

159 self, 

160 method: str, 

161 handler: Union[Handler, Type[AbstractView]], 

162 *, 

163 expect_handler: Optional[_ExpectHandler] = None, 

164 resource: Optional[AbstractResource] = None, 

165 ) -> None: 

166 if expect_handler is None: 

167 expect_handler = _default_expect_handler 

168 

169 assert asyncio.iscoroutinefunction( 

170 expect_handler 

171 ), f"Coroutine is expected, got {expect_handler!r}" 

172 

173 method = method.upper() 

174 if not HTTP_METHOD_RE.match(method): 

175 raise ValueError(f"{method} is not allowed HTTP method") 

176 

177 if asyncio.iscoroutinefunction(handler): 

178 pass 

179 elif isinstance(handler, type) and issubclass(handler, AbstractView): 

180 pass 

181 else: 

182 raise TypeError( 

183 "Only async functions are allowed as web-handlers " 

184 ", got {!r}".format(handler) 

185 ) 

186 

187 self._method = method 

188 self._handler = handler 

189 self._expect_handler = expect_handler 

190 self._resource = resource 

191 

192 @property 

193 def method(self) -> str: 

194 return self._method 

195 

196 @property 

197 def handler(self) -> Handler: 

198 return self._handler 

199 

200 @property 

201 @abc.abstractmethod 

202 def name(self) -> Optional[str]: 

203 """Optional route's name, always equals to resource's name.""" 

204 

205 @property 

206 def resource(self) -> Optional[AbstractResource]: 

207 return self._resource 

208 

209 @abc.abstractmethod 

210 def get_info(self) -> _InfoDict: 

211 """Return a dict with additional info useful for introspection""" 

212 

213 @abc.abstractmethod # pragma: no branch 

214 def url_for(self, *args: str, **kwargs: str) -> URL: 

215 """Construct url for route with additional params.""" 

216 

217 async def handle_expect_header(self, request: Request) -> Optional[StreamResponse]: 

218 return await self._expect_handler(request) 

219 

220 

221class UrlMappingMatchInfo(BaseDict, AbstractMatchInfo): 

222 def __init__(self, match_dict: Dict[str, str], route: AbstractRoute): 

223 super().__init__(match_dict) 

224 self._route = route 

225 self._apps: List[Application] = [] 

226 self._current_app: Optional[Application] = None 

227 self._frozen = False 

228 

229 @property 

230 def handler(self) -> Handler: 

231 return self._route.handler 

232 

233 @property 

234 def route(self) -> AbstractRoute: 

235 return self._route 

236 

237 @property 

238 def expect_handler(self) -> _ExpectHandler: 

239 return self._route.handle_expect_header 

240 

241 @property 

242 def http_exception(self) -> Optional[HTTPException]: 

243 return None 

244 

245 def get_info(self) -> _InfoDict: # type: ignore[override] 

246 return self._route.get_info() 

247 

248 @property 

249 def apps(self) -> Tuple["Application", ...]: 

250 return tuple(self._apps) 

251 

252 def add_app(self, app: "Application") -> None: 

253 if self._frozen: 

254 raise RuntimeError("Cannot change apps stack after .freeze() call") 

255 if self._current_app is None: 

256 self._current_app = app 

257 self._apps.insert(0, app) 

258 

259 @property 

260 def current_app(self) -> "Application": 

261 app = self._current_app 

262 assert app is not None 

263 return app 

264 

265 @contextmanager 

266 def set_current_app(self, app: "Application") -> Generator[None, None, None]: 

267 if DEBUG: # pragma: no cover 

268 if app not in self._apps: 

269 raise RuntimeError( 

270 "Expected one of the following apps {!r}, got {!r}".format( 

271 self._apps, app 

272 ) 

273 ) 

274 prev = self._current_app 

275 self._current_app = app 

276 try: 

277 yield 

278 finally: 

279 self._current_app = prev 

280 

281 def freeze(self) -> None: 

282 self._frozen = True 

283 

284 def __repr__(self) -> str: 

285 return f"<MatchInfo {super().__repr__()}: {self._route}>" 

286 

287 

288class MatchInfoError(UrlMappingMatchInfo): 

289 def __init__(self, http_exception: HTTPException) -> None: 

290 self._exception = http_exception 

291 super().__init__({}, SystemRoute(self._exception)) 

292 

293 @property 

294 def http_exception(self) -> HTTPException: 

295 return self._exception 

296 

297 def __repr__(self) -> str: 

298 return "<MatchInfoError {}: {}>".format( 

299 self._exception.status, self._exception.reason 

300 ) 

301 

302 

303async def _default_expect_handler(request: Request) -> None: 

304 """Default handler for Expect header. 

305 

306 Just send "100 Continue" to client. 

307 raise HTTPExpectationFailed if value of header is not "100-continue" 

308 """ 

309 expect = request.headers.get(hdrs.EXPECT, "") 

310 if request.version == HttpVersion11: 

311 if expect.lower() == "100-continue": 

312 await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n") 

313 else: 

314 raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect) 

315 

316 

317class Resource(AbstractResource): 

318 def __init__(self, *, name: Optional[str] = None) -> None: 

319 super().__init__(name=name) 

320 self._routes: List[ResourceRoute] = [] 

321 

322 def add_route( 

323 self, 

324 method: str, 

325 handler: Union[Type[AbstractView], Handler], 

326 *, 

327 expect_handler: Optional[_ExpectHandler] = None, 

328 ) -> "ResourceRoute": 

329 for route_obj in self._routes: 

330 if route_obj.method == method or route_obj.method == hdrs.METH_ANY: 

331 raise RuntimeError( 

332 "Added route will never be executed, " 

333 "method {route.method} is already " 

334 "registered".format(route=route_obj) 

335 ) 

336 

337 route_obj = ResourceRoute(method, handler, self, expect_handler=expect_handler) 

338 self.register_route(route_obj) 

339 return route_obj 

340 

341 def register_route(self, route: "ResourceRoute") -> None: 

342 assert isinstance( 

343 route, ResourceRoute 

344 ), f"Instance of Route class is required, got {route!r}" 

345 self._routes.append(route) 

346 

347 async def resolve(self, request: Request) -> _Resolve: 

348 allowed_methods: Set[str] = set() 

349 

350 match_dict = self._match(request.rel_url.raw_path) 

351 if match_dict is None: 

352 return None, allowed_methods 

353 

354 for route_obj in self._routes: 

355 route_method = route_obj.method 

356 allowed_methods.add(route_method) 

357 

358 if route_method == request.method or route_method == hdrs.METH_ANY: 

359 return (UrlMappingMatchInfo(match_dict, route_obj), allowed_methods) 

360 else: 

361 return None, allowed_methods 

362 

363 @abc.abstractmethod 

364 def _match(self, path: str) -> Optional[Dict[str, str]]: 

365 pass # pragma: no cover 

366 

367 def __len__(self) -> int: 

368 return len(self._routes) 

369 

370 def __iter__(self) -> Iterator["ResourceRoute"]: 

371 return iter(self._routes) 

372 

373 # TODO: implement all abstract methods 

374 

375 

376class PlainResource(Resource): 

377 def __init__(self, path: str, *, name: Optional[str] = None) -> None: 

378 super().__init__(name=name) 

379 assert not path or path.startswith("/") 

380 self._path = path 

381 

382 @property 

383 def canonical(self) -> str: 

384 return self._path 

385 

386 def freeze(self) -> None: 

387 if not self._path: 

388 self._path = "/" 

389 

390 def add_prefix(self, prefix: str) -> None: 

391 assert prefix.startswith("/") 

392 assert not prefix.endswith("/") 

393 assert len(prefix) > 1 

394 self._path = prefix + self._path 

395 

396 def _match(self, path: str) -> Optional[Dict[str, str]]: 

397 # string comparison is about 10 times faster than regexp matching 

398 if self._path == path: 

399 return {} 

400 else: 

401 return None 

402 

403 def raw_match(self, path: str) -> bool: 

404 return self._path == path 

405 

406 def get_info(self) -> _InfoDict: 

407 return {"path": self._path} 

408 

409 def url_for(self) -> URL: # type: ignore[override] 

410 return URL.build(path=self._path, encoded=True) 

411 

412 def __repr__(self) -> str: 

413 name = "'" + self.name + "' " if self.name is not None else "" 

414 return f"<PlainResource {name} {self._path}>" 

415 

416 

417class DynamicResource(Resource): 

418 DYN = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}") 

419 DYN_WITH_RE = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}") 

420 GOOD = r"[^{}/]+" 

421 

422 def __init__(self, path: str, *, name: Optional[str] = None) -> None: 

423 super().__init__(name=name) 

424 pattern = "" 

425 formatter = "" 

426 for part in ROUTE_RE.split(path): 

427 match = self.DYN.fullmatch(part) 

428 if match: 

429 pattern += "(?P<{}>{})".format(match.group("var"), self.GOOD) 

430 formatter += "{" + match.group("var") + "}" 

431 continue 

432 

433 match = self.DYN_WITH_RE.fullmatch(part) 

434 if match: 

435 pattern += "(?P<{var}>{re})".format(**match.groupdict()) 

436 formatter += "{" + match.group("var") + "}" 

437 continue 

438 

439 if "{" in part or "}" in part: 

440 raise ValueError(f"Invalid path '{path}'['{part}']") 

441 

442 part = _requote_path(part) 

443 formatter += part 

444 pattern += re.escape(part) 

445 

446 try: 

447 compiled = re.compile(pattern) 

448 except re.error as exc: 

449 raise ValueError(f"Bad pattern '{pattern}': {exc}") from None 

450 assert compiled.pattern.startswith(PATH_SEP) 

451 assert formatter.startswith("/") 

452 self._pattern = compiled 

453 self._formatter = formatter 

454 

455 @property 

456 def canonical(self) -> str: 

457 return self._formatter 

458 

459 def add_prefix(self, prefix: str) -> None: 

460 assert prefix.startswith("/") 

461 assert not prefix.endswith("/") 

462 assert len(prefix) > 1 

463 self._pattern = re.compile(re.escape(prefix) + self._pattern.pattern) 

464 self._formatter = prefix + self._formatter 

465 

466 def _match(self, path: str) -> Optional[Dict[str, str]]: 

467 match = self._pattern.fullmatch(path) 

468 if match is None: 

469 return None 

470 else: 

471 return { 

472 key: _unquote_path(value) for key, value in match.groupdict().items() 

473 } 

474 

475 def raw_match(self, path: str) -> bool: 

476 return self._formatter == path 

477 

478 def get_info(self) -> _InfoDict: 

479 return {"formatter": self._formatter, "pattern": self._pattern} 

480 

481 def url_for(self, **parts: str) -> URL: 

482 url = self._formatter.format_map({k: _quote_path(v) for k, v in parts.items()}) 

483 return URL.build(path=url, encoded=True) 

484 

485 def __repr__(self) -> str: 

486 name = "'" + self.name + "' " if self.name is not None else "" 

487 return "<DynamicResource {name} {formatter}>".format( 

488 name=name, formatter=self._formatter 

489 ) 

490 

491 

492class PrefixResource(AbstractResource): 

493 def __init__(self, prefix: str, *, name: Optional[str] = None) -> None: 

494 assert not prefix or prefix.startswith("/"), prefix 

495 assert prefix in ("", "/") or not prefix.endswith("/"), prefix 

496 super().__init__(name=name) 

497 self._prefix = _requote_path(prefix) 

498 self._prefix2 = self._prefix + "/" 

499 

500 @property 

501 def canonical(self) -> str: 

502 return self._prefix 

503 

504 def add_prefix(self, prefix: str) -> None: 

505 assert prefix.startswith("/") 

506 assert not prefix.endswith("/") 

507 assert len(prefix) > 1 

508 self._prefix = prefix + self._prefix 

509 self._prefix2 = self._prefix + "/" 

510 

511 def raw_match(self, prefix: str) -> bool: 

512 return False 

513 

514 # TODO: impl missing abstract methods 

515 

516 

517class StaticResource(PrefixResource): 

518 VERSION_KEY = "v" 

519 

520 def __init__( 

521 self, 

522 prefix: str, 

523 directory: PathLike, 

524 *, 

525 name: Optional[str] = None, 

526 expect_handler: Optional[_ExpectHandler] = None, 

527 chunk_size: int = 256 * 1024, 

528 show_index: bool = False, 

529 follow_symlinks: bool = False, 

530 append_version: bool = False, 

531 ) -> None: 

532 super().__init__(prefix, name=name) 

533 try: 

534 directory = Path(directory) 

535 if str(directory).startswith("~"): 

536 directory = Path(os.path.expanduser(str(directory))) 

537 directory = directory.resolve() 

538 if not directory.is_dir(): 

539 raise ValueError("Not a directory") 

540 except (FileNotFoundError, ValueError) as error: 

541 raise ValueError(f"No directory exists at '{directory}'") from error 

542 self._directory = directory 

543 self._show_index = show_index 

544 self._chunk_size = chunk_size 

545 self._follow_symlinks = follow_symlinks 

546 self._expect_handler = expect_handler 

547 self._append_version = append_version 

548 

549 self._routes = { 

550 "GET": ResourceRoute( 

551 "GET", self._handle, self, expect_handler=expect_handler 

552 ), 

553 "HEAD": ResourceRoute( 

554 "HEAD", self._handle, self, expect_handler=expect_handler 

555 ), 

556 } 

557 

558 def url_for( # type: ignore[override] 

559 self, 

560 *, 

561 filename: PathLike, 

562 append_version: Optional[bool] = None, 

563 ) -> URL: 

564 if append_version is None: 

565 append_version = self._append_version 

566 filename = str(filename).lstrip("/") 

567 

568 url = URL.build(path=self._prefix, encoded=True) 

569 # filename is not encoded 

570 if YARL_VERSION < (1, 6): 

571 url = url / filename.replace("%", "%25") 

572 else: 

573 url = url / filename 

574 

575 if append_version: 

576 unresolved_path = self._directory.joinpath(filename) 

577 try: 

578 if self._follow_symlinks: 

579 normalized_path = Path(os.path.normpath(unresolved_path)) 

580 normalized_path.relative_to(self._directory) 

581 filepath = normalized_path.resolve() 

582 else: 

583 filepath = unresolved_path.resolve() 

584 filepath.relative_to(self._directory) 

585 except (ValueError, FileNotFoundError): 

586 # ValueError for case when path point to symlink 

587 # with follow_symlinks is False 

588 return url # relatively safe 

589 if filepath.is_file(): 

590 # TODO cache file content 

591 # with file watcher for cache invalidation 

592 with filepath.open("rb") as f: 

593 file_bytes = f.read() 

594 h = self._get_file_hash(file_bytes) 

595 url = url.with_query({self.VERSION_KEY: h}) 

596 return url 

597 return url 

598 

599 @staticmethod 

600 def _get_file_hash(byte_array: bytes) -> str: 

601 m = hashlib.sha256() # todo sha256 can be configurable param 

602 m.update(byte_array) 

603 b64 = base64.urlsafe_b64encode(m.digest()) 

604 return b64.decode("ascii") 

605 

606 def get_info(self) -> _InfoDict: 

607 return { 

608 "directory": self._directory, 

609 "prefix": self._prefix, 

610 "routes": self._routes, 

611 } 

612 

613 def set_options_route(self, handler: Handler) -> None: 

614 if "OPTIONS" in self._routes: 

615 raise RuntimeError("OPTIONS route was set already") 

616 self._routes["OPTIONS"] = ResourceRoute( 

617 "OPTIONS", handler, self, expect_handler=self._expect_handler 

618 ) 

619 

620 async def resolve(self, request: Request) -> _Resolve: 

621 path = request.rel_url.raw_path 

622 method = request.method 

623 allowed_methods = set(self._routes) 

624 if not path.startswith(self._prefix2) and path != self._prefix: 

625 return None, set() 

626 

627 if method not in allowed_methods: 

628 return None, allowed_methods 

629 

630 match_dict = {"filename": _unquote_path(path[len(self._prefix) + 1 :])} 

631 return (UrlMappingMatchInfo(match_dict, self._routes[method]), allowed_methods) 

632 

633 def __len__(self) -> int: 

634 return len(self._routes) 

635 

636 def __iter__(self) -> Iterator[AbstractRoute]: 

637 return iter(self._routes.values()) 

638 

639 async def _handle(self, request: Request) -> StreamResponse: 

640 rel_url = request.match_info["filename"] 

641 try: 

642 filename = Path(rel_url) 

643 if filename.anchor: 

644 # rel_url is an absolute name like 

645 # /static/\\machine_name\c$ or /static/D:\path 

646 # where the static dir is totally different 

647 raise HTTPForbidden() 

648 unresolved_path = self._directory.joinpath(filename) 

649 if self._follow_symlinks: 

650 normalized_path = Path(os.path.normpath(unresolved_path)) 

651 normalized_path.relative_to(self._directory) 

652 filepath = normalized_path.resolve() 

653 else: 

654 filepath = unresolved_path.resolve() 

655 filepath.relative_to(self._directory) 

656 except (ValueError, FileNotFoundError) as error: 

657 # relatively safe 

658 raise HTTPNotFound() from error 

659 except HTTPForbidden: 

660 raise 

661 except Exception as error: 

662 # perm error or other kind! 

663 request.app.logger.exception(error) 

664 raise HTTPNotFound() from error 

665 

666 # on opening a dir, load its contents if allowed 

667 if filepath.is_dir(): 

668 if self._show_index: 

669 try: 

670 return Response( 

671 text=self._directory_as_html(filepath), content_type="text/html" 

672 ) 

673 except PermissionError: 

674 raise HTTPForbidden() 

675 else: 

676 raise HTTPForbidden() 

677 elif filepath.is_file(): 

678 return FileResponse(filepath, chunk_size=self._chunk_size) 

679 else: 

680 raise HTTPNotFound 

681 

682 def _directory_as_html(self, filepath: Path) -> str: 

683 # returns directory's index as html 

684 

685 # sanity check 

686 assert filepath.is_dir() 

687 

688 relative_path_to_dir = filepath.relative_to(self._directory).as_posix() 

689 index_of = f"Index of /{relative_path_to_dir}" 

690 h1 = f"<h1>{index_of}</h1>" 

691 

692 index_list = [] 

693 dir_index = filepath.iterdir() 

694 for _file in sorted(dir_index): 

695 # show file url as relative to static path 

696 rel_path = _file.relative_to(self._directory).as_posix() 

697 file_url = self._prefix + "/" + rel_path 

698 

699 # if file is a directory, add '/' to the end of the name 

700 if _file.is_dir(): 

701 file_name = f"{_file.name}/" 

702 else: 

703 file_name = _file.name 

704 

705 index_list.append( 

706 '<li><a href="{url}">{name}</a></li>'.format( 

707 url=file_url, name=file_name 

708 ) 

709 ) 

710 ul = "<ul>\n{}\n</ul>".format("\n".join(index_list)) 

711 body = f"<body>\n{h1}\n{ul}\n</body>" 

712 

713 head_str = f"<head>\n<title>{index_of}</title>\n</head>" 

714 html = f"<html>\n{head_str}\n{body}\n</html>" 

715 

716 return html 

717 

718 def __repr__(self) -> str: 

719 name = "'" + self.name + "'" if self.name is not None else "" 

720 return "<StaticResource {name} {path} -> {directory!r}>".format( 

721 name=name, path=self._prefix, directory=self._directory 

722 ) 

723 

724 

725class PrefixedSubAppResource(PrefixResource): 

726 def __init__(self, prefix: str, app: "Application") -> None: 

727 super().__init__(prefix) 

728 self._app = app 

729 self._add_prefix_to_resources(prefix) 

730 

731 def add_prefix(self, prefix: str) -> None: 

732 super().add_prefix(prefix) 

733 self._add_prefix_to_resources(prefix) 

734 

735 def _add_prefix_to_resources(self, prefix: str) -> None: 

736 router = self._app.router 

737 for resource in router.resources(): 

738 # Since the canonical path of a resource is about 

739 # to change, we need to unindex it and then reindex 

740 router.unindex_resource(resource) 

741 resource.add_prefix(prefix) 

742 router.index_resource(resource) 

743 

744 def url_for(self, *args: str, **kwargs: str) -> URL: 

745 raise RuntimeError(".url_for() is not supported " "by sub-application root") 

746 

747 def get_info(self) -> _InfoDict: 

748 return {"app": self._app, "prefix": self._prefix} 

749 

750 async def resolve(self, request: Request) -> _Resolve: 

751 match_info = await self._app.router.resolve(request) 

752 match_info.add_app(self._app) 

753 if isinstance(match_info.http_exception, HTTPMethodNotAllowed): 

754 methods = match_info.http_exception.allowed_methods 

755 else: 

756 methods = set() 

757 return match_info, methods 

758 

759 def __len__(self) -> int: 

760 return len(self._app.router.routes()) 

761 

762 def __iter__(self) -> Iterator[AbstractRoute]: 

763 return iter(self._app.router.routes()) 

764 

765 def __repr__(self) -> str: 

766 return "<PrefixedSubAppResource {prefix} -> {app!r}>".format( 

767 prefix=self._prefix, app=self._app 

768 ) 

769 

770 

771class AbstractRuleMatching(abc.ABC): 

772 @abc.abstractmethod # pragma: no branch 

773 async def match(self, request: Request) -> bool: 

774 """Return bool if the request satisfies the criteria""" 

775 

776 @abc.abstractmethod # pragma: no branch 

777 def get_info(self) -> _InfoDict: 

778 """Return a dict with additional info useful for introspection""" 

779 

780 @property 

781 @abc.abstractmethod # pragma: no branch 

782 def canonical(self) -> str: 

783 """Return a str""" 

784 

785 

786class Domain(AbstractRuleMatching): 

787 re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)") 

788 

789 def __init__(self, domain: str) -> None: 

790 super().__init__() 

791 self._domain = self.validation(domain) 

792 

793 @property 

794 def canonical(self) -> str: 

795 return self._domain 

796 

797 def validation(self, domain: str) -> str: 

798 if not isinstance(domain, str): 

799 raise TypeError("Domain must be str") 

800 domain = domain.rstrip(".").lower() 

801 if not domain: 

802 raise ValueError("Domain cannot be empty") 

803 elif "://" in domain: 

804 raise ValueError("Scheme not supported") 

805 url = URL("http://" + domain) 

806 assert url.raw_host is not None 

807 if not all(self.re_part.fullmatch(x) for x in url.raw_host.split(".")): 

808 raise ValueError("Domain not valid") 

809 if url.port == 80: 

810 return url.raw_host 

811 return f"{url.raw_host}:{url.port}" 

812 

813 async def match(self, request: Request) -> bool: 

814 host = request.headers.get(hdrs.HOST) 

815 if not host: 

816 return False 

817 return self.match_domain(host) 

818 

819 def match_domain(self, host: str) -> bool: 

820 return host.lower() == self._domain 

821 

822 def get_info(self) -> _InfoDict: 

823 return {"domain": self._domain} 

824 

825 

826class MaskDomain(Domain): 

827 re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)") 

828 

829 def __init__(self, domain: str) -> None: 

830 super().__init__(domain) 

831 mask = self._domain.replace(".", r"\.").replace("*", ".*") 

832 self._mask = re.compile(mask) 

833 

834 @property 

835 def canonical(self) -> str: 

836 return self._mask.pattern 

837 

838 def match_domain(self, host: str) -> bool: 

839 return self._mask.fullmatch(host) is not None 

840 

841 

842class MatchedSubAppResource(PrefixedSubAppResource): 

843 def __init__(self, rule: AbstractRuleMatching, app: "Application") -> None: 

844 AbstractResource.__init__(self) 

845 self._prefix = "" 

846 self._app = app 

847 self._rule = rule 

848 

849 @property 

850 def canonical(self) -> str: 

851 return self._rule.canonical 

852 

853 def get_info(self) -> _InfoDict: 

854 return {"app": self._app, "rule": self._rule} 

855 

856 async def resolve(self, request: Request) -> _Resolve: 

857 if not await self._rule.match(request): 

858 return None, set() 

859 match_info = await self._app.router.resolve(request) 

860 match_info.add_app(self._app) 

861 if isinstance(match_info.http_exception, HTTPMethodNotAllowed): 

862 methods = match_info.http_exception.allowed_methods 

863 else: 

864 methods = set() 

865 return match_info, methods 

866 

867 def __repr__(self) -> str: 

868 return "<MatchedSubAppResource -> {app!r}>" "".format(app=self._app) 

869 

870 

871class ResourceRoute(AbstractRoute): 

872 """A route with resource""" 

873 

874 def __init__( 

875 self, 

876 method: str, 

877 handler: Union[Handler, Type[AbstractView]], 

878 resource: AbstractResource, 

879 *, 

880 expect_handler: Optional[_ExpectHandler] = None, 

881 ) -> None: 

882 super().__init__( 

883 method, handler, expect_handler=expect_handler, resource=resource 

884 ) 

885 

886 def __repr__(self) -> str: 

887 return "<ResourceRoute [{method}] {resource} -> {handler!r}".format( 

888 method=self.method, resource=self._resource, handler=self.handler 

889 ) 

890 

891 @property 

892 def name(self) -> Optional[str]: 

893 if self._resource is None: 

894 return None 

895 return self._resource.name 

896 

897 def url_for(self, *args: str, **kwargs: str) -> URL: 

898 """Construct url for route with additional params.""" 

899 assert self._resource is not None 

900 return self._resource.url_for(*args, **kwargs) 

901 

902 def get_info(self) -> _InfoDict: 

903 assert self._resource is not None 

904 return self._resource.get_info() 

905 

906 

907class SystemRoute(AbstractRoute): 

908 def __init__(self, http_exception: HTTPException) -> None: 

909 super().__init__(hdrs.METH_ANY, self._handle) 

910 self._http_exception = http_exception 

911 

912 def url_for(self, *args: str, **kwargs: str) -> URL: 

913 raise RuntimeError(".url_for() is not allowed for SystemRoute") 

914 

915 @property 

916 def name(self) -> Optional[str]: 

917 return None 

918 

919 def get_info(self) -> _InfoDict: 

920 return {"http_exception": self._http_exception} 

921 

922 async def _handle(self, request: Request) -> StreamResponse: 

923 raise self._http_exception 

924 

925 @property 

926 def status(self) -> int: 

927 return self._http_exception.status 

928 

929 @property 

930 def reason(self) -> str: 

931 return self._http_exception.reason 

932 

933 def __repr__(self) -> str: 

934 return "<SystemRoute {self.status}: {self.reason}>".format(self=self) 

935 

936 

937class View(AbstractView): 

938 async def _iter(self) -> StreamResponse: 

939 if self.request.method not in hdrs.METH_ALL: 

940 self._raise_allowed_methods() 

941 method: Optional[Callable[[], Awaitable[StreamResponse]]] = getattr( 

942 self, self.request.method.lower(), None 

943 ) 

944 if method is None: 

945 self._raise_allowed_methods() 

946 return await method() 

947 

948 def __await__(self) -> Generator[Any, None, StreamResponse]: 

949 return self._iter().__await__() 

950 

951 def _raise_allowed_methods(self) -> NoReturn: 

952 allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())} 

953 raise HTTPMethodNotAllowed(self.request.method, allowed_methods) 

954 

955 

956class ResourcesView(Sized, Iterable[AbstractResource], Container[AbstractResource]): 

957 def __init__(self, resources: List[AbstractResource]) -> None: 

958 self._resources = resources 

959 

960 def __len__(self) -> int: 

961 return len(self._resources) 

962 

963 def __iter__(self) -> Iterator[AbstractResource]: 

964 yield from self._resources 

965 

966 def __contains__(self, resource: object) -> bool: 

967 return resource in self._resources 

968 

969 

970class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]): 

971 def __init__(self, resources: List[AbstractResource]): 

972 self._routes: List[AbstractRoute] = [] 

973 for resource in resources: 

974 for route in resource: 

975 self._routes.append(route) 

976 

977 def __len__(self) -> int: 

978 return len(self._routes) 

979 

980 def __iter__(self) -> Iterator[AbstractRoute]: 

981 yield from self._routes 

982 

983 def __contains__(self, route: object) -> bool: 

984 return route in self._routes 

985 

986 

987class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]): 

988 NAME_SPLIT_RE = re.compile(r"[.:-]") 

989 HTTP_NOT_FOUND = HTTPNotFound() 

990 

991 def __init__(self) -> None: 

992 super().__init__() 

993 self._resources: List[AbstractResource] = [] 

994 self._named_resources: Dict[str, AbstractResource] = {} 

995 self._resource_index: dict[str, list[AbstractResource]] = {} 

996 self._matched_sub_app_resources: List[MatchedSubAppResource] = [] 

997 

998 async def resolve(self, request: Request) -> UrlMappingMatchInfo: 

999 resource_index = self._resource_index 

1000 allowed_methods: Set[str] = set() 

1001 

1002 # Walk the url parts looking for candidates. We walk the url backwards 

1003 # to ensure the most explicit match is found first. If there are multiple 

1004 # candidates for a given url part because there are multiple resources 

1005 # registered for the same canonical path, we resolve them in a linear 

1006 # fashion to ensure registration order is respected. 

1007 url_part = request.rel_url.raw_path 

1008 while url_part: 

1009 for candidate in resource_index.get(url_part, ()): 

1010 match_dict, allowed = await candidate.resolve(request) 

1011 if match_dict is not None: 

1012 return match_dict 

1013 else: 

1014 allowed_methods |= allowed 

1015 if url_part == "/": 

1016 break 

1017 url_part = url_part.rpartition("/")[0] or "/" 

1018 

1019 # 

1020 # We didn't find any candidates, so we'll try the matched sub-app 

1021 # resources which we have to walk in a linear fashion because they 

1022 # have regex/wildcard match rules and we cannot index them. 

1023 # 

1024 # For most cases we do not expect there to be many of these since 

1025 # currently they are only added by `add_domain` 

1026 # 

1027 for resource in self._matched_sub_app_resources: 

1028 match_dict, allowed = await resource.resolve(request) 

1029 if match_dict is not None: 

1030 return match_dict 

1031 else: 

1032 allowed_methods |= allowed 

1033 

1034 if allowed_methods: 

1035 return MatchInfoError(HTTPMethodNotAllowed(request.method, allowed_methods)) 

1036 

1037 return MatchInfoError(self.HTTP_NOT_FOUND) 

1038 

1039 def __iter__(self) -> Iterator[str]: 

1040 return iter(self._named_resources) 

1041 

1042 def __len__(self) -> int: 

1043 return len(self._named_resources) 

1044 

1045 def __contains__(self, resource: object) -> bool: 

1046 return resource in self._named_resources 

1047 

1048 def __getitem__(self, name: str) -> AbstractResource: 

1049 return self._named_resources[name] 

1050 

1051 def resources(self) -> ResourcesView: 

1052 return ResourcesView(self._resources) 

1053 

1054 def routes(self) -> RoutesView: 

1055 return RoutesView(self._resources) 

1056 

1057 def named_resources(self) -> Mapping[str, AbstractResource]: 

1058 return MappingProxyType(self._named_resources) 

1059 

1060 def register_resource(self, resource: AbstractResource) -> None: 

1061 assert isinstance( 

1062 resource, AbstractResource 

1063 ), f"Instance of AbstractResource class is required, got {resource!r}" 

1064 if self.frozen: 

1065 raise RuntimeError("Cannot register a resource into frozen router.") 

1066 

1067 name = resource.name 

1068 

1069 if name is not None: 

1070 parts = self.NAME_SPLIT_RE.split(name) 

1071 for part in parts: 

1072 if keyword.iskeyword(part): 

1073 raise ValueError( 

1074 f"Incorrect route name {name!r}, " 

1075 "python keywords cannot be used " 

1076 "for route name" 

1077 ) 

1078 if not part.isidentifier(): 

1079 raise ValueError( 

1080 "Incorrect route name {!r}, " 

1081 "the name should be a sequence of " 

1082 "python identifiers separated " 

1083 "by dash, dot or column".format(name) 

1084 ) 

1085 if name in self._named_resources: 

1086 raise ValueError( 

1087 "Duplicate {!r}, " 

1088 "already handled by {!r}".format(name, self._named_resources[name]) 

1089 ) 

1090 self._named_resources[name] = resource 

1091 self._resources.append(resource) 

1092 

1093 if isinstance(resource, MatchedSubAppResource): 

1094 # We cannot index match sub-app resources because they have match rules 

1095 self._matched_sub_app_resources.append(resource) 

1096 else: 

1097 self.index_resource(resource) 

1098 

1099 def _get_resource_index_key(self, resource: AbstractResource) -> str: 

1100 """Return a key to index the resource in the resource index.""" 

1101 # strip at the first { to allow for variables 

1102 return resource.canonical.partition("{")[0].rstrip("/") or "/" 

1103 

1104 def index_resource(self, resource: AbstractResource) -> None: 

1105 """Add a resource to the resource index.""" 

1106 resource_key = self._get_resource_index_key(resource) 

1107 # There may be multiple resources for a canonical path 

1108 # so we keep them in a list to ensure that registration 

1109 # order is respected. 

1110 self._resource_index.setdefault(resource_key, []).append(resource) 

1111 

1112 def unindex_resource(self, resource: AbstractResource) -> None: 

1113 """Remove a resource from the resource index.""" 

1114 resource_key = self._get_resource_index_key(resource) 

1115 self._resource_index[resource_key].remove(resource) 

1116 

1117 def add_resource(self, path: str, *, name: Optional[str] = None) -> Resource: 

1118 if path and not path.startswith("/"): 

1119 raise ValueError("path should be started with / or be empty") 

1120 # Reuse last added resource if path and name are the same 

1121 if self._resources: 

1122 resource = self._resources[-1] 

1123 if resource.name == name and resource.raw_match(path): 

1124 return cast(Resource, resource) 

1125 if not ("{" in path or "}" in path or ROUTE_RE.search(path)): 

1126 resource = PlainResource(_requote_path(path), name=name) 

1127 self.register_resource(resource) 

1128 return resource 

1129 resource = DynamicResource(path, name=name) 

1130 self.register_resource(resource) 

1131 return resource 

1132 

1133 def add_route( 

1134 self, 

1135 method: str, 

1136 path: str, 

1137 handler: Union[Handler, Type[AbstractView]], 

1138 *, 

1139 name: Optional[str] = None, 

1140 expect_handler: Optional[_ExpectHandler] = None, 

1141 ) -> AbstractRoute: 

1142 resource = self.add_resource(path, name=name) 

1143 return resource.add_route(method, handler, expect_handler=expect_handler) 

1144 

1145 def add_static( 

1146 self, 

1147 prefix: str, 

1148 path: PathLike, 

1149 *, 

1150 name: Optional[str] = None, 

1151 expect_handler: Optional[_ExpectHandler] = None, 

1152 chunk_size: int = 256 * 1024, 

1153 show_index: bool = False, 

1154 follow_symlinks: bool = False, 

1155 append_version: bool = False, 

1156 ) -> AbstractResource: 

1157 """Add static files view. 

1158 

1159 prefix - url prefix 

1160 path - folder with files 

1161 

1162 """ 

1163 assert prefix.startswith("/") 

1164 if prefix.endswith("/"): 

1165 prefix = prefix[:-1] 

1166 resource = StaticResource( 

1167 prefix, 

1168 path, 

1169 name=name, 

1170 expect_handler=expect_handler, 

1171 chunk_size=chunk_size, 

1172 show_index=show_index, 

1173 follow_symlinks=follow_symlinks, 

1174 append_version=append_version, 

1175 ) 

1176 self.register_resource(resource) 

1177 return resource 

1178 

1179 def add_head(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1180 """Shortcut for add_route with method HEAD.""" 

1181 return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs) 

1182 

1183 def add_options(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1184 """Shortcut for add_route with method OPTIONS.""" 

1185 return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs) 

1186 

1187 def add_get( 

1188 self, 

1189 path: str, 

1190 handler: Handler, 

1191 *, 

1192 name: Optional[str] = None, 

1193 allow_head: bool = True, 

1194 **kwargs: Any, 

1195 ) -> AbstractRoute: 

1196 """Shortcut for add_route with method GET. 

1197 

1198 If allow_head is true, another 

1199 route is added allowing head requests to the same endpoint. 

1200 """ 

1201 resource = self.add_resource(path, name=name) 

1202 if allow_head: 

1203 resource.add_route(hdrs.METH_HEAD, handler, **kwargs) 

1204 return resource.add_route(hdrs.METH_GET, handler, **kwargs) 

1205 

1206 def add_post(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1207 """Shortcut for add_route with method POST.""" 

1208 return self.add_route(hdrs.METH_POST, path, handler, **kwargs) 

1209 

1210 def add_put(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1211 """Shortcut for add_route with method PUT.""" 

1212 return self.add_route(hdrs.METH_PUT, path, handler, **kwargs) 

1213 

1214 def add_patch(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1215 """Shortcut for add_route with method PATCH.""" 

1216 return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs) 

1217 

1218 def add_delete(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1219 """Shortcut for add_route with method DELETE.""" 

1220 return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs) 

1221 

1222 def add_view( 

1223 self, path: str, handler: Type[AbstractView], **kwargs: Any 

1224 ) -> AbstractRoute: 

1225 """Shortcut for add_route with ANY methods for a class-based view.""" 

1226 return self.add_route(hdrs.METH_ANY, path, handler, **kwargs) 

1227 

1228 def freeze(self) -> None: 

1229 super().freeze() 

1230 for resource in self._resources: 

1231 resource.freeze() 

1232 

1233 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]: 

1234 """Append routes to route table. 

1235 

1236 Parameter should be a sequence of RouteDef objects. 

1237 

1238 Returns a list of registered AbstractRoute instances. 

1239 """ 

1240 registered_routes = [] 

1241 for route_def in routes: 

1242 registered_routes.extend(route_def.register(self)) 

1243 return registered_routes 

1244 

1245 

1246def _quote_path(value: str) -> str: 

1247 if YARL_VERSION < (1, 6): 

1248 value = value.replace("%", "%25") 

1249 return URL.build(path=value, encoded=False).raw_path 

1250 

1251 

1252def _unquote_path(value: str) -> str: 

1253 return URL.build(path=value, encoded=True).path 

1254 

1255 

1256def _requote_path(value: str) -> str: 

1257 # Quote non-ascii characters and other characters which must be quoted, 

1258 # but preserve existing %-sequences. 

1259 result = _quote_path(value) 

1260 if "%" in value: 

1261 result = result.replace("%25", "%") 

1262 return result