Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_urldispatcher.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

729 statements  

1import abc 

2import asyncio 

3import base64 

4import functools 

5import hashlib 

6import html 

7import inspect 

8import keyword 

9import os 

10import re 

11import sys 

12from pathlib import Path 

13from types import MappingProxyType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Awaitable, 

18 Callable, 

19 Container, 

20 Dict, 

21 Final, 

22 Generator, 

23 Iterable, 

24 Iterator, 

25 List, 

26 Mapping, 

27 NoReturn, 

28 Optional, 

29 Pattern, 

30 Set, 

31 Sized, 

32 Tuple, 

33 Type, 

34 TypedDict, 

35 Union, 

36 cast, 

37) 

38 

39from yarl import URL 

40 

41from . import hdrs 

42from .abc import AbstractMatchInfo, AbstractRouter, AbstractView 

43from .helpers import DEBUG 

44from .http import HttpVersion11 

45from .typedefs import Handler, PathLike 

46from .web_exceptions import ( 

47 HTTPException, 

48 HTTPExpectationFailed, 

49 HTTPForbidden, 

50 HTTPMethodNotAllowed, 

51 HTTPNotFound, 

52) 

53from .web_fileresponse import FileResponse 

54from .web_request import Request 

55from .web_response import Response, StreamResponse 

56from .web_routedef import AbstractRouteDef 

57 

58__all__ = ( 

59 "UrlDispatcher", 

60 "UrlMappingMatchInfo", 

61 "AbstractResource", 

62 "Resource", 

63 "PlainResource", 

64 "DynamicResource", 

65 "AbstractRoute", 

66 "ResourceRoute", 

67 "StaticResource", 

68 "View", 

69) 

70 

71 

72if TYPE_CHECKING: 

73 from .web_app import Application 

74 

75 BaseDict = Dict[str, str] 

76else: 

77 BaseDict = dict 

78 

79CIRCULAR_SYMLINK_ERROR = ( 

80 (OSError,) 

81 if sys.version_info < (3, 10) and sys.platform.startswith("win32") 

82 else (RuntimeError,) if sys.version_info < (3, 13) else () 

83) 

84 

85HTTP_METHOD_RE: Final[Pattern[str]] = re.compile( 

86 r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$" 

87) 

88ROUTE_RE: Final[Pattern[str]] = re.compile( 

89 r"(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})" 

90) 

91PATH_SEP: Final[str] = re.escape("/") 

92 

93 

94_ExpectHandler = Callable[[Request], Awaitable[Optional[StreamResponse]]] 

95_Resolve = Tuple[Optional["UrlMappingMatchInfo"], Set[str]] 

96 

97html_escape = functools.partial(html.escape, quote=True) 

98 

99 

100class _InfoDict(TypedDict, total=False): 

101 path: str 

102 

103 formatter: str 

104 pattern: Pattern[str] 

105 

106 directory: Path 

107 prefix: str 

108 routes: Mapping[str, "AbstractRoute"] 

109 

110 app: "Application" 

111 

112 domain: str 

113 

114 rule: "AbstractRuleMatching" 

115 

116 http_exception: HTTPException 

117 

118 

119class AbstractResource(Sized, Iterable["AbstractRoute"]): 

120 def __init__(self, *, name: Optional[str] = None) -> None: 

121 self._name = name 

122 

123 @property 

124 def name(self) -> Optional[str]: 

125 return self._name 

126 

127 @property 

128 @abc.abstractmethod 

129 def canonical(self) -> str: 

130 """Exposes the resource's canonical path. 

131 

132 For example '/foo/bar/{name}' 

133 

134 """ 

135 

136 @abc.abstractmethod # pragma: no branch 

137 def url_for(self, **kwargs: str) -> URL: 

138 """Construct url for resource with additional params.""" 

139 

140 @abc.abstractmethod # pragma: no branch 

141 async def resolve(self, request: Request) -> _Resolve: 

142 """Resolve resource. 

143 

144 Return (UrlMappingMatchInfo, allowed_methods) pair. 

145 """ 

146 

147 @abc.abstractmethod 

148 def add_prefix(self, prefix: str) -> None: 

149 """Add a prefix to processed URLs. 

150 

151 Required for subapplications support. 

152 """ 

153 

154 @abc.abstractmethod 

155 def get_info(self) -> _InfoDict: 

156 """Return a dict with additional info useful for introspection""" 

157 

158 def freeze(self) -> None: 

159 pass 

160 

161 @abc.abstractmethod 

162 def raw_match(self, path: str) -> bool: 

163 """Perform a raw match against path""" 

164 

165 

166class AbstractRoute(abc.ABC): 

167 def __init__( 

168 self, 

169 method: str, 

170 handler: Union[Handler, Type[AbstractView]], 

171 *, 

172 expect_handler: Optional[_ExpectHandler] = None, 

173 resource: Optional[AbstractResource] = None, 

174 ) -> None: 

175 if expect_handler is None: 

176 expect_handler = _default_expect_handler 

177 

178 assert inspect.iscoroutinefunction(expect_handler) or ( 

179 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(expect_handler) 

180 ), f"Coroutine is expected, got {expect_handler!r}" 

181 

182 method = method.upper() 

183 if not HTTP_METHOD_RE.match(method): 

184 raise ValueError(f"{method} is not allowed HTTP method") 

185 

186 if inspect.iscoroutinefunction(handler) or ( 

187 sys.version_info < (3, 14) and asyncio.iscoroutinefunction(handler) 

188 ): 

189 pass 

190 elif isinstance(handler, type) and issubclass(handler, AbstractView): 

191 pass 

192 else: 

193 raise TypeError( 

194 "Only async functions are allowed as web-handlers " 

195 ", got {!r}".format(handler) 

196 ) 

197 

198 self._method = method 

199 self._handler = handler 

200 self._expect_handler = expect_handler 

201 self._resource = resource 

202 

203 @property 

204 def method(self) -> str: 

205 return self._method 

206 

207 @property 

208 def handler(self) -> Handler: 

209 return self._handler 

210 

211 @property 

212 @abc.abstractmethod 

213 def name(self) -> Optional[str]: 

214 """Optional route's name, always equals to resource's name.""" 

215 

216 @property 

217 def resource(self) -> Optional[AbstractResource]: 

218 return self._resource 

219 

220 @abc.abstractmethod 

221 def get_info(self) -> _InfoDict: 

222 """Return a dict with additional info useful for introspection""" 

223 

224 @abc.abstractmethod # pragma: no branch 

225 def url_for(self, *args: str, **kwargs: str) -> URL: 

226 """Construct url for route with additional params.""" 

227 

228 async def handle_expect_header(self, request: Request) -> Optional[StreamResponse]: 

229 return await self._expect_handler(request) 

230 

231 

232class UrlMappingMatchInfo(BaseDict, AbstractMatchInfo): 

233 

234 __slots__ = ("_route", "_apps", "_current_app", "_frozen") 

235 

236 def __init__(self, match_dict: Dict[str, str], route: AbstractRoute) -> None: 

237 super().__init__(match_dict) 

238 self._route = route 

239 self._apps: List[Application] = [] 

240 self._current_app: Optional[Application] = None 

241 self._frozen = False 

242 

243 @property 

244 def handler(self) -> Handler: 

245 return self._route.handler 

246 

247 @property 

248 def route(self) -> AbstractRoute: 

249 return self._route 

250 

251 @property 

252 def expect_handler(self) -> _ExpectHandler: 

253 return self._route.handle_expect_header 

254 

255 @property 

256 def http_exception(self) -> Optional[HTTPException]: 

257 return None 

258 

259 def get_info(self) -> _InfoDict: # type: ignore[override] 

260 return self._route.get_info() 

261 

262 @property 

263 def apps(self) -> Tuple["Application", ...]: 

264 return tuple(self._apps) 

265 

266 def add_app(self, app: "Application") -> None: 

267 if self._frozen: 

268 raise RuntimeError("Cannot change apps stack after .freeze() call") 

269 if self._current_app is None: 

270 self._current_app = app 

271 self._apps.insert(0, app) 

272 

273 @property 

274 def current_app(self) -> "Application": 

275 app = self._current_app 

276 assert app is not None 

277 return app 

278 

279 @current_app.setter 

280 def current_app(self, app: "Application") -> None: 

281 if DEBUG: 

282 if app not in self._apps: 

283 raise RuntimeError( 

284 "Expected one of the following apps {!r}, got {!r}".format( 

285 self._apps, app 

286 ) 

287 ) 

288 self._current_app = app 

289 

290 def freeze(self) -> None: 

291 self._frozen = True 

292 

293 def __repr__(self) -> str: 

294 return f"<MatchInfo {super().__repr__()}: {self._route}>" 

295 

296 

297class MatchInfoError(UrlMappingMatchInfo): 

298 

299 __slots__ = ("_exception",) 

300 

301 def __init__(self, http_exception: HTTPException) -> None: 

302 self._exception = http_exception 

303 super().__init__({}, SystemRoute(self._exception)) 

304 

305 @property 

306 def http_exception(self) -> HTTPException: 

307 return self._exception 

308 

309 def __repr__(self) -> str: 

310 return "<MatchInfoError {}: {}>".format( 

311 self._exception.status, self._exception.reason 

312 ) 

313 

314 

315async def _default_expect_handler(request: Request) -> None: 

316 """Default handler for Expect header. 

317 

318 Just send "100 Continue" to client. 

319 raise HTTPExpectationFailed if value of header is not "100-continue" 

320 """ 

321 expect = request.headers.get(hdrs.EXPECT, "") 

322 if request.version == HttpVersion11: 

323 if expect.lower() == "100-continue": 

324 await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n") 

325 # Reset output_size as we haven't started the main body yet. 

326 request.writer.output_size = 0 

327 else: 

328 raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect) 

329 

330 

331class Resource(AbstractResource): 

332 def __init__(self, *, name: Optional[str] = None) -> None: 

333 super().__init__(name=name) 

334 self._routes: Dict[str, ResourceRoute] = {} 

335 self._any_route: Optional[ResourceRoute] = None 

336 self._allowed_methods: Set[str] = set() 

337 

338 def add_route( 

339 self, 

340 method: str, 

341 handler: Union[Type[AbstractView], Handler], 

342 *, 

343 expect_handler: Optional[_ExpectHandler] = None, 

344 ) -> "ResourceRoute": 

345 if route := self._routes.get(method, self._any_route): 

346 raise RuntimeError( 

347 "Added route will never be executed, " 

348 f"method {route.method} is already " 

349 "registered" 

350 ) 

351 

352 route_obj = ResourceRoute(method, handler, self, expect_handler=expect_handler) 

353 self.register_route(route_obj) 

354 return route_obj 

355 

356 def register_route(self, route: "ResourceRoute") -> None: 

357 assert isinstance( 

358 route, ResourceRoute 

359 ), f"Instance of Route class is required, got {route!r}" 

360 if route.method == hdrs.METH_ANY: 

361 self._any_route = route 

362 self._allowed_methods.add(route.method) 

363 self._routes[route.method] = route 

364 

365 async def resolve(self, request: Request) -> _Resolve: 

366 if (match_dict := self._match(request.rel_url.path_safe)) is None: 

367 return None, set() 

368 if route := self._routes.get(request.method, self._any_route): 

369 return UrlMappingMatchInfo(match_dict, route), self._allowed_methods 

370 return None, self._allowed_methods 

371 

372 @abc.abstractmethod 

373 def _match(self, path: str) -> Optional[Dict[str, str]]: 

374 """Return dict of path values if path matches this resource, otherwise None.""" 

375 

376 def __len__(self) -> int: 

377 return len(self._routes) 

378 

379 def __iter__(self) -> Iterator["ResourceRoute"]: 

380 return iter(self._routes.values()) 

381 

382 # TODO: implement all abstract methods 

383 

384 

385class PlainResource(Resource): 

386 def __init__(self, path: str, *, name: Optional[str] = None) -> None: 

387 super().__init__(name=name) 

388 assert not path or path.startswith("/") 

389 self._path = path 

390 

391 @property 

392 def canonical(self) -> str: 

393 return self._path 

394 

395 def freeze(self) -> None: 

396 if not self._path: 

397 self._path = "/" 

398 

399 def add_prefix(self, prefix: str) -> None: 

400 assert prefix.startswith("/") 

401 assert not prefix.endswith("/") 

402 assert len(prefix) > 1 

403 self._path = prefix + self._path 

404 

405 def _match(self, path: str) -> Optional[Dict[str, str]]: 

406 # string comparison is about 10 times faster than regexp matching 

407 if self._path == path: 

408 return {} 

409 return None 

410 

411 def raw_match(self, path: str) -> bool: 

412 return self._path == path 

413 

414 def get_info(self) -> _InfoDict: 

415 return {"path": self._path} 

416 

417 def url_for(self) -> URL: # type: ignore[override] 

418 return URL.build(path=self._path, encoded=True) 

419 

420 def __repr__(self) -> str: 

421 name = "'" + self.name + "' " if self.name is not None else "" 

422 return f"<PlainResource {name} {self._path}>" 

423 

424 

425class DynamicResource(Resource): 

426 DYN = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}") 

427 DYN_WITH_RE = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}") 

428 GOOD = r"[^{}/]+" 

429 

430 def __init__(self, path: str, *, name: Optional[str] = None) -> None: 

431 super().__init__(name=name) 

432 self._orig_path = path 

433 pattern = "" 

434 formatter = "" 

435 for part in ROUTE_RE.split(path): 

436 match = self.DYN.fullmatch(part) 

437 if match: 

438 pattern += "(?P<{}>{})".format(match.group("var"), self.GOOD) 

439 formatter += "{" + match.group("var") + "}" 

440 continue 

441 

442 match = self.DYN_WITH_RE.fullmatch(part) 

443 if match: 

444 pattern += "(?P<{var}>{re})".format(**match.groupdict()) 

445 formatter += "{" + match.group("var") + "}" 

446 continue 

447 

448 if "{" in part or "}" in part: 

449 raise ValueError(f"Invalid path '{path}'['{part}']") 

450 

451 part = _requote_path(part) 

452 formatter += part 

453 pattern += re.escape(part) 

454 

455 try: 

456 compiled = re.compile(pattern) 

457 except re.error as exc: 

458 raise ValueError(f"Bad pattern '{pattern}': {exc}") from None 

459 assert compiled.pattern.startswith(PATH_SEP) 

460 assert formatter.startswith("/") 

461 self._pattern = compiled 

462 self._formatter = formatter 

463 

464 @property 

465 def canonical(self) -> str: 

466 return self._formatter 

467 

468 def add_prefix(self, prefix: str) -> None: 

469 assert prefix.startswith("/") 

470 assert not prefix.endswith("/") 

471 assert len(prefix) > 1 

472 self._pattern = re.compile(re.escape(prefix) + self._pattern.pattern) 

473 self._formatter = prefix + self._formatter 

474 

475 def _match(self, path: str) -> Optional[Dict[str, str]]: 

476 match = self._pattern.fullmatch(path) 

477 if match is None: 

478 return None 

479 return { 

480 key: _unquote_path_safe(value) for key, value in match.groupdict().items() 

481 } 

482 

483 def raw_match(self, path: str) -> bool: 

484 return self._orig_path == path 

485 

486 def get_info(self) -> _InfoDict: 

487 return {"formatter": self._formatter, "pattern": self._pattern} 

488 

489 def url_for(self, **parts: str) -> URL: 

490 url = self._formatter.format_map({k: _quote_path(v) for k, v in parts.items()}) 

491 return URL.build(path=url, encoded=True) 

492 

493 def __repr__(self) -> str: 

494 name = "'" + self.name + "' " if self.name is not None else "" 

495 return "<DynamicResource {name} {formatter}>".format( 

496 name=name, formatter=self._formatter 

497 ) 

498 

499 

500class PrefixResource(AbstractResource): 

501 def __init__(self, prefix: str, *, name: Optional[str] = None) -> None: 

502 assert not prefix or prefix.startswith("/"), prefix 

503 assert prefix in ("", "/") or not prefix.endswith("/"), prefix 

504 super().__init__(name=name) 

505 self._prefix = _requote_path(prefix) 

506 self._prefix2 = self._prefix + "/" 

507 

508 @property 

509 def canonical(self) -> str: 

510 return self._prefix 

511 

512 def add_prefix(self, prefix: str) -> None: 

513 assert prefix.startswith("/") 

514 assert not prefix.endswith("/") 

515 assert len(prefix) > 1 

516 self._prefix = prefix + self._prefix 

517 self._prefix2 = self._prefix + "/" 

518 

519 def raw_match(self, prefix: str) -> bool: 

520 return False 

521 

522 # TODO: impl missing abstract methods 

523 

524 

525class StaticResource(PrefixResource): 

526 VERSION_KEY = "v" 

527 

528 def __init__( 

529 self, 

530 prefix: str, 

531 directory: PathLike, 

532 *, 

533 name: Optional[str] = None, 

534 expect_handler: Optional[_ExpectHandler] = None, 

535 chunk_size: int = 256 * 1024, 

536 show_index: bool = False, 

537 follow_symlinks: bool = False, 

538 append_version: bool = False, 

539 ) -> None: 

540 super().__init__(prefix, name=name) 

541 try: 

542 directory = Path(directory).expanduser().resolve(strict=True) 

543 except FileNotFoundError as error: 

544 raise ValueError(f"'{directory}' does not exist") from error 

545 if not directory.is_dir(): 

546 raise ValueError(f"'{directory}' is not a directory") 

547 self._directory = directory 

548 self._show_index = show_index 

549 self._chunk_size = chunk_size 

550 self._follow_symlinks = follow_symlinks 

551 self._expect_handler = expect_handler 

552 self._append_version = append_version 

553 

554 self._routes = { 

555 "GET": ResourceRoute( 

556 "GET", self._handle, self, expect_handler=expect_handler 

557 ), 

558 "HEAD": ResourceRoute( 

559 "HEAD", self._handle, self, expect_handler=expect_handler 

560 ), 

561 } 

562 self._allowed_methods = set(self._routes) 

563 

564 def url_for( # type: ignore[override] 

565 self, 

566 *, 

567 filename: PathLike, 

568 append_version: Optional[bool] = None, 

569 ) -> URL: 

570 if append_version is None: 

571 append_version = self._append_version 

572 filename = str(filename).lstrip("/") 

573 

574 url = URL.build(path=self._prefix, encoded=True) 

575 # filename is not encoded 

576 url = url / filename 

577 

578 if append_version: 

579 unresolved_path = self._directory.joinpath(filename) 

580 try: 

581 if self._follow_symlinks: 

582 normalized_path = Path(os.path.normpath(unresolved_path)) 

583 normalized_path.relative_to(self._directory) 

584 filepath = normalized_path.resolve() 

585 else: 

586 filepath = unresolved_path.resolve() 

587 filepath.relative_to(self._directory) 

588 except (ValueError, FileNotFoundError): 

589 # ValueError for case when path point to symlink 

590 # with follow_symlinks is False 

591 return url # relatively safe 

592 if filepath.is_file(): 

593 # TODO cache file content 

594 # with file watcher for cache invalidation 

595 with filepath.open("rb") as f: 

596 file_bytes = f.read() 

597 h = self._get_file_hash(file_bytes) 

598 url = url.with_query({self.VERSION_KEY: h}) 

599 return url 

600 return url 

601 

602 @staticmethod 

603 def _get_file_hash(byte_array: bytes) -> str: 

604 m = hashlib.sha256() # todo sha256 can be configurable param 

605 m.update(byte_array) 

606 b64 = base64.urlsafe_b64encode(m.digest()) 

607 return b64.decode("ascii") 

608 

609 def get_info(self) -> _InfoDict: 

610 return { 

611 "directory": self._directory, 

612 "prefix": self._prefix, 

613 "routes": self._routes, 

614 } 

615 

616 def set_options_route(self, handler: Handler) -> None: 

617 if "OPTIONS" in self._routes: 

618 raise RuntimeError("OPTIONS route was set already") 

619 self._routes["OPTIONS"] = ResourceRoute( 

620 "OPTIONS", handler, self, expect_handler=self._expect_handler 

621 ) 

622 self._allowed_methods.add("OPTIONS") 

623 

624 async def resolve(self, request: Request) -> _Resolve: 

625 path = request.rel_url.path_safe 

626 method = request.method 

627 if not path.startswith(self._prefix2) and path != self._prefix: 

628 return None, set() 

629 

630 allowed_methods = self._allowed_methods 

631 if method not in allowed_methods: 

632 return None, allowed_methods 

633 

634 match_dict = {"filename": _unquote_path_safe(path[len(self._prefix) + 1 :])} 

635 return (UrlMappingMatchInfo(match_dict, self._routes[method]), allowed_methods) 

636 

637 def __len__(self) -> int: 

638 return len(self._routes) 

639 

640 def __iter__(self) -> Iterator[AbstractRoute]: 

641 return iter(self._routes.values()) 

642 

643 async def _handle(self, request: Request) -> StreamResponse: 

644 rel_url = request.match_info["filename"] 

645 filename = Path(rel_url) 

646 if filename.anchor: 

647 # rel_url is an absolute name like 

648 # /static/\\machine_name\c$ or /static/D:\path 

649 # where the static dir is totally different 

650 raise HTTPForbidden() 

651 

652 unresolved_path = self._directory.joinpath(filename) 

653 loop = asyncio.get_running_loop() 

654 return await loop.run_in_executor( 

655 None, self._resolve_path_to_response, unresolved_path 

656 ) 

657 

658 def _resolve_path_to_response(self, unresolved_path: Path) -> StreamResponse: 

659 """Take the unresolved path and query the file system to form a response.""" 

660 # Check for access outside the root directory. For follow symlinks, URI 

661 # cannot traverse out, but symlinks can. Otherwise, no access outside 

662 # root is permitted. 

663 try: 

664 if self._follow_symlinks: 

665 normalized_path = Path(os.path.normpath(unresolved_path)) 

666 normalized_path.relative_to(self._directory) 

667 file_path = normalized_path.resolve() 

668 else: 

669 file_path = unresolved_path.resolve() 

670 file_path.relative_to(self._directory) 

671 except (ValueError, *CIRCULAR_SYMLINK_ERROR) as error: 

672 # ValueError is raised for the relative check. Circular symlinks 

673 # raise here on resolving for python < 3.13. 

674 raise HTTPNotFound() from error 

675 

676 # if path is a directory, return the contents if permitted. Note the 

677 # directory check will raise if a segment is not readable. 

678 try: 

679 if file_path.is_dir(): 

680 if self._show_index: 

681 return Response( 

682 text=self._directory_as_html(file_path), 

683 content_type="text/html", 

684 ) 

685 else: 

686 raise HTTPForbidden() 

687 except PermissionError as error: 

688 raise HTTPForbidden() from error 

689 

690 # Return the file response, which handles all other checks. 

691 return FileResponse(file_path, chunk_size=self._chunk_size) 

692 

693 def _directory_as_html(self, dir_path: Path) -> str: 

694 """returns directory's index as html.""" 

695 assert dir_path.is_dir() 

696 

697 relative_path_to_dir = dir_path.relative_to(self._directory).as_posix() 

698 index_of = f"Index of /{html_escape(relative_path_to_dir)}" 

699 h1 = f"<h1>{index_of}</h1>" 

700 

701 index_list = [] 

702 dir_index = dir_path.iterdir() 

703 for _file in sorted(dir_index): 

704 # show file url as relative to static path 

705 rel_path = _file.relative_to(self._directory).as_posix() 

706 quoted_file_url = _quote_path(f"{self._prefix}/{rel_path}") 

707 

708 # if file is a directory, add '/' to the end of the name 

709 if _file.is_dir(): 

710 file_name = f"{_file.name}/" 

711 else: 

712 file_name = _file.name 

713 

714 index_list.append( 

715 f'<li><a href="{quoted_file_url}">{html_escape(file_name)}</a></li>' 

716 ) 

717 ul = "<ul>\n{}\n</ul>".format("\n".join(index_list)) 

718 body = f"<body>\n{h1}\n{ul}\n</body>" 

719 

720 head_str = f"<head>\n<title>{index_of}</title>\n</head>" 

721 html = f"<html>\n{head_str}\n{body}\n</html>" 

722 

723 return html 

724 

725 def __repr__(self) -> str: 

726 name = "'" + self.name + "'" if self.name is not None else "" 

727 return "<StaticResource {name} {path} -> {directory!r}>".format( 

728 name=name, path=self._prefix, directory=self._directory 

729 ) 

730 

731 

732class PrefixedSubAppResource(PrefixResource): 

733 def __init__(self, prefix: str, app: "Application") -> None: 

734 super().__init__(prefix) 

735 self._app = app 

736 self._add_prefix_to_resources(prefix) 

737 

738 def add_prefix(self, prefix: str) -> None: 

739 super().add_prefix(prefix) 

740 self._add_prefix_to_resources(prefix) 

741 

742 def _add_prefix_to_resources(self, prefix: str) -> None: 

743 router = self._app.router 

744 for resource in router.resources(): 

745 # Since the canonical path of a resource is about 

746 # to change, we need to unindex it and then reindex 

747 router.unindex_resource(resource) 

748 resource.add_prefix(prefix) 

749 router.index_resource(resource) 

750 

751 def url_for(self, *args: str, **kwargs: str) -> URL: 

752 raise RuntimeError(".url_for() is not supported by sub-application root") 

753 

754 def get_info(self) -> _InfoDict: 

755 return {"app": self._app, "prefix": self._prefix} 

756 

757 async def resolve(self, request: Request) -> _Resolve: 

758 match_info = await self._app.router.resolve(request) 

759 match_info.add_app(self._app) 

760 if isinstance(match_info.http_exception, HTTPMethodNotAllowed): 

761 methods = match_info.http_exception.allowed_methods 

762 else: 

763 methods = set() 

764 return match_info, methods 

765 

766 def __len__(self) -> int: 

767 return len(self._app.router.routes()) 

768 

769 def __iter__(self) -> Iterator[AbstractRoute]: 

770 return iter(self._app.router.routes()) 

771 

772 def __repr__(self) -> str: 

773 return "<PrefixedSubAppResource {prefix} -> {app!r}>".format( 

774 prefix=self._prefix, app=self._app 

775 ) 

776 

777 

778class AbstractRuleMatching(abc.ABC): 

779 @abc.abstractmethod # pragma: no branch 

780 async def match(self, request: Request) -> bool: 

781 """Return bool if the request satisfies the criteria""" 

782 

783 @abc.abstractmethod # pragma: no branch 

784 def get_info(self) -> _InfoDict: 

785 """Return a dict with additional info useful for introspection""" 

786 

787 @property 

788 @abc.abstractmethod # pragma: no branch 

789 def canonical(self) -> str: 

790 """Return a str""" 

791 

792 

793class Domain(AbstractRuleMatching): 

794 re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)") 

795 

796 def __init__(self, domain: str) -> None: 

797 super().__init__() 

798 self._domain = self.validation(domain) 

799 

800 @property 

801 def canonical(self) -> str: 

802 return self._domain 

803 

804 def validation(self, domain: str) -> str: 

805 if not isinstance(domain, str): 

806 raise TypeError("Domain must be str") 

807 domain = domain.rstrip(".").lower() 

808 if not domain: 

809 raise ValueError("Domain cannot be empty") 

810 elif "://" in domain: 

811 raise ValueError("Scheme not supported") 

812 url = URL("http://" + domain) 

813 assert url.raw_host is not None 

814 if not all(self.re_part.fullmatch(x) for x in url.raw_host.split(".")): 

815 raise ValueError("Domain not valid") 

816 if url.port == 80: 

817 return url.raw_host 

818 return f"{url.raw_host}:{url.port}" 

819 

820 async def match(self, request: Request) -> bool: 

821 host = request.headers.get(hdrs.HOST) 

822 if not host: 

823 return False 

824 return self.match_domain(host) 

825 

826 def match_domain(self, host: str) -> bool: 

827 return host.lower() == self._domain 

828 

829 def get_info(self) -> _InfoDict: 

830 return {"domain": self._domain} 

831 

832 

833class MaskDomain(Domain): 

834 re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)") 

835 

836 def __init__(self, domain: str) -> None: 

837 super().__init__(domain) 

838 mask = self._domain.replace(".", r"\.").replace("*", ".*") 

839 self._mask = re.compile(mask) 

840 

841 @property 

842 def canonical(self) -> str: 

843 return self._mask.pattern 

844 

845 def match_domain(self, host: str) -> bool: 

846 return self._mask.fullmatch(host) is not None 

847 

848 

849class MatchedSubAppResource(PrefixedSubAppResource): 

850 def __init__(self, rule: AbstractRuleMatching, app: "Application") -> None: 

851 AbstractResource.__init__(self) 

852 self._prefix = "" 

853 self._app = app 

854 self._rule = rule 

855 

856 @property 

857 def canonical(self) -> str: 

858 return self._rule.canonical 

859 

860 def get_info(self) -> _InfoDict: 

861 return {"app": self._app, "rule": self._rule} 

862 

863 async def resolve(self, request: Request) -> _Resolve: 

864 if not await self._rule.match(request): 

865 return None, set() 

866 match_info = await self._app.router.resolve(request) 

867 match_info.add_app(self._app) 

868 if isinstance(match_info.http_exception, HTTPMethodNotAllowed): 

869 methods = match_info.http_exception.allowed_methods 

870 else: 

871 methods = set() 

872 return match_info, methods 

873 

874 def __repr__(self) -> str: 

875 return f"<MatchedSubAppResource -> {self._app!r}>" 

876 

877 

878class ResourceRoute(AbstractRoute): 

879 """A route with resource""" 

880 

881 def __init__( 

882 self, 

883 method: str, 

884 handler: Union[Handler, Type[AbstractView]], 

885 resource: AbstractResource, 

886 *, 

887 expect_handler: Optional[_ExpectHandler] = None, 

888 ) -> None: 

889 super().__init__( 

890 method, handler, expect_handler=expect_handler, resource=resource 

891 ) 

892 

893 def __repr__(self) -> str: 

894 return "<ResourceRoute [{method}] {resource} -> {handler!r}".format( 

895 method=self.method, resource=self._resource, handler=self.handler 

896 ) 

897 

898 @property 

899 def name(self) -> Optional[str]: 

900 if self._resource is None: 

901 return None 

902 return self._resource.name 

903 

904 def url_for(self, *args: str, **kwargs: str) -> URL: 

905 """Construct url for route with additional params.""" 

906 assert self._resource is not None 

907 return self._resource.url_for(*args, **kwargs) 

908 

909 def get_info(self) -> _InfoDict: 

910 assert self._resource is not None 

911 return self._resource.get_info() 

912 

913 

914class SystemRoute(AbstractRoute): 

915 def __init__(self, http_exception: HTTPException) -> None: 

916 super().__init__(hdrs.METH_ANY, self._handle) 

917 self._http_exception = http_exception 

918 

919 def url_for(self, *args: str, **kwargs: str) -> URL: 

920 raise RuntimeError(".url_for() is not allowed for SystemRoute") 

921 

922 @property 

923 def name(self) -> Optional[str]: 

924 return None 

925 

926 def get_info(self) -> _InfoDict: 

927 return {"http_exception": self._http_exception} 

928 

929 async def _handle(self, request: Request) -> StreamResponse: 

930 raise self._http_exception 

931 

932 @property 

933 def status(self) -> int: 

934 return self._http_exception.status 

935 

936 @property 

937 def reason(self) -> str: 

938 return self._http_exception.reason 

939 

940 def __repr__(self) -> str: 

941 return "<SystemRoute {self.status}: {self.reason}>".format(self=self) 

942 

943 

944class View(AbstractView): 

945 async def _iter(self) -> StreamResponse: 

946 if self.request.method not in hdrs.METH_ALL: 

947 self._raise_allowed_methods() 

948 method: Optional[Callable[[], Awaitable[StreamResponse]]] = getattr( 

949 self, self.request.method.lower(), None 

950 ) 

951 if method is None: 

952 self._raise_allowed_methods() 

953 return await method() 

954 

955 def __await__(self) -> Generator[None, None, StreamResponse]: 

956 return self._iter().__await__() 

957 

958 def _raise_allowed_methods(self) -> NoReturn: 

959 allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())} 

960 raise HTTPMethodNotAllowed(self.request.method, allowed_methods) 

961 

962 

963class ResourcesView(Sized, Iterable[AbstractResource], Container[AbstractResource]): 

964 def __init__(self, resources: List[AbstractResource]) -> None: 

965 self._resources = resources 

966 

967 def __len__(self) -> int: 

968 return len(self._resources) 

969 

970 def __iter__(self) -> Iterator[AbstractResource]: 

971 yield from self._resources 

972 

973 def __contains__(self, resource: object) -> bool: 

974 return resource in self._resources 

975 

976 

977class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]): 

978 def __init__(self, resources: List[AbstractResource]): 

979 self._routes: List[AbstractRoute] = [] 

980 for resource in resources: 

981 for route in resource: 

982 self._routes.append(route) 

983 

984 def __len__(self) -> int: 

985 return len(self._routes) 

986 

987 def __iter__(self) -> Iterator[AbstractRoute]: 

988 yield from self._routes 

989 

990 def __contains__(self, route: object) -> bool: 

991 return route in self._routes 

992 

993 

994class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]): 

995 NAME_SPLIT_RE = re.compile(r"[.:-]") 

996 HTTP_NOT_FOUND = HTTPNotFound() 

997 

998 def __init__(self) -> None: 

999 super().__init__() 

1000 self._resources: List[AbstractResource] = [] 

1001 self._named_resources: Dict[str, AbstractResource] = {} 

1002 self._resource_index: dict[str, list[AbstractResource]] = {} 

1003 self._matched_sub_app_resources: List[MatchedSubAppResource] = [] 

1004 

1005 async def resolve(self, request: Request) -> UrlMappingMatchInfo: 

1006 resource_index = self._resource_index 

1007 allowed_methods: Set[str] = set() 

1008 

1009 # Walk the url parts looking for candidates. We walk the url backwards 

1010 # to ensure the most explicit match is found first. If there are multiple 

1011 # candidates for a given url part because there are multiple resources 

1012 # registered for the same canonical path, we resolve them in a linear 

1013 # fashion to ensure registration order is respected. 

1014 url_part = request.rel_url.path_safe 

1015 while url_part: 

1016 for candidate in resource_index.get(url_part, ()): 

1017 match_dict, allowed = await candidate.resolve(request) 

1018 if match_dict is not None: 

1019 return match_dict 

1020 else: 

1021 allowed_methods |= allowed 

1022 if url_part == "/": 

1023 break 

1024 url_part = url_part.rpartition("/")[0] or "/" 

1025 

1026 # 

1027 # We didn't find any candidates, so we'll try the matched sub-app 

1028 # resources which we have to walk in a linear fashion because they 

1029 # have regex/wildcard match rules and we cannot index them. 

1030 # 

1031 # For most cases we do not expect there to be many of these since 

1032 # currently they are only added by `add_domain` 

1033 # 

1034 for resource in self._matched_sub_app_resources: 

1035 match_dict, allowed = await resource.resolve(request) 

1036 if match_dict is not None: 

1037 return match_dict 

1038 else: 

1039 allowed_methods |= allowed 

1040 

1041 if allowed_methods: 

1042 return MatchInfoError(HTTPMethodNotAllowed(request.method, allowed_methods)) 

1043 

1044 return MatchInfoError(self.HTTP_NOT_FOUND) 

1045 

1046 def __iter__(self) -> Iterator[str]: 

1047 return iter(self._named_resources) 

1048 

1049 def __len__(self) -> int: 

1050 return len(self._named_resources) 

1051 

1052 def __contains__(self, resource: object) -> bool: 

1053 return resource in self._named_resources 

1054 

1055 def __getitem__(self, name: str) -> AbstractResource: 

1056 return self._named_resources[name] 

1057 

1058 def resources(self) -> ResourcesView: 

1059 return ResourcesView(self._resources) 

1060 

1061 def routes(self) -> RoutesView: 

1062 return RoutesView(self._resources) 

1063 

1064 def named_resources(self) -> Mapping[str, AbstractResource]: 

1065 return MappingProxyType(self._named_resources) 

1066 

1067 def register_resource(self, resource: AbstractResource) -> None: 

1068 assert isinstance( 

1069 resource, AbstractResource 

1070 ), f"Instance of AbstractResource class is required, got {resource!r}" 

1071 if self.frozen: 

1072 raise RuntimeError("Cannot register a resource into frozen router.") 

1073 

1074 name = resource.name 

1075 

1076 if name is not None: 

1077 parts = self.NAME_SPLIT_RE.split(name) 

1078 for part in parts: 

1079 if keyword.iskeyword(part): 

1080 raise ValueError( 

1081 f"Incorrect route name {name!r}, " 

1082 "python keywords cannot be used " 

1083 "for route name" 

1084 ) 

1085 if not part.isidentifier(): 

1086 raise ValueError( 

1087 "Incorrect route name {!r}, " 

1088 "the name should be a sequence of " 

1089 "python identifiers separated " 

1090 "by dash, dot or column".format(name) 

1091 ) 

1092 if name in self._named_resources: 

1093 raise ValueError( 

1094 "Duplicate {!r}, " 

1095 "already handled by {!r}".format(name, self._named_resources[name]) 

1096 ) 

1097 self._named_resources[name] = resource 

1098 self._resources.append(resource) 

1099 

1100 if isinstance(resource, MatchedSubAppResource): 

1101 # We cannot index match sub-app resources because they have match rules 

1102 self._matched_sub_app_resources.append(resource) 

1103 else: 

1104 self.index_resource(resource) 

1105 

1106 def _get_resource_index_key(self, resource: AbstractResource) -> str: 

1107 """Return a key to index the resource in the resource index.""" 

1108 if "{" in (index_key := resource.canonical): 

1109 # strip at the first { to allow for variables, and than 

1110 # rpartition at / to allow for variable parts in the path 

1111 # For example if the canonical path is `/core/locations{tail:.*}` 

1112 # the index key will be `/core` since index is based on the 

1113 # url parts split by `/` 

1114 index_key = index_key.partition("{")[0].rpartition("/")[0] 

1115 return index_key.rstrip("/") or "/" 

1116 

1117 def index_resource(self, resource: AbstractResource) -> None: 

1118 """Add a resource to the resource index.""" 

1119 resource_key = self._get_resource_index_key(resource) 

1120 # There may be multiple resources for a canonical path 

1121 # so we keep them in a list to ensure that registration 

1122 # order is respected. 

1123 self._resource_index.setdefault(resource_key, []).append(resource) 

1124 

1125 def unindex_resource(self, resource: AbstractResource) -> None: 

1126 """Remove a resource from the resource index.""" 

1127 resource_key = self._get_resource_index_key(resource) 

1128 self._resource_index[resource_key].remove(resource) 

1129 

1130 def add_resource(self, path: str, *, name: Optional[str] = None) -> Resource: 

1131 if path and not path.startswith("/"): 

1132 raise ValueError("path should be started with / or be empty") 

1133 # Reuse last added resource if path and name are the same 

1134 if self._resources: 

1135 resource = self._resources[-1] 

1136 if resource.name == name and resource.raw_match(path): 

1137 return cast(Resource, resource) 

1138 if not ("{" in path or "}" in path or ROUTE_RE.search(path)): 

1139 resource = PlainResource(path, name=name) 

1140 self.register_resource(resource) 

1141 return resource 

1142 resource = DynamicResource(path, name=name) 

1143 self.register_resource(resource) 

1144 return resource 

1145 

1146 def add_route( 

1147 self, 

1148 method: str, 

1149 path: str, 

1150 handler: Union[Handler, Type[AbstractView]], 

1151 *, 

1152 name: Optional[str] = None, 

1153 expect_handler: Optional[_ExpectHandler] = None, 

1154 ) -> AbstractRoute: 

1155 resource = self.add_resource(path, name=name) 

1156 return resource.add_route(method, handler, expect_handler=expect_handler) 

1157 

1158 def add_static( 

1159 self, 

1160 prefix: str, 

1161 path: PathLike, 

1162 *, 

1163 name: Optional[str] = None, 

1164 expect_handler: Optional[_ExpectHandler] = None, 

1165 chunk_size: int = 256 * 1024, 

1166 show_index: bool = False, 

1167 follow_symlinks: bool = False, 

1168 append_version: bool = False, 

1169 ) -> StaticResource: 

1170 """Add static files view. 

1171 

1172 prefix - url prefix 

1173 path - folder with files 

1174 

1175 """ 

1176 assert prefix.startswith("/") 

1177 if prefix.endswith("/"): 

1178 prefix = prefix[:-1] 

1179 resource = StaticResource( 

1180 prefix, 

1181 path, 

1182 name=name, 

1183 expect_handler=expect_handler, 

1184 chunk_size=chunk_size, 

1185 show_index=show_index, 

1186 follow_symlinks=follow_symlinks, 

1187 append_version=append_version, 

1188 ) 

1189 self.register_resource(resource) 

1190 return resource 

1191 

1192 def add_head(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1193 """Shortcut for add_route with method HEAD.""" 

1194 return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs) 

1195 

1196 def add_options(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1197 """Shortcut for add_route with method OPTIONS.""" 

1198 return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs) 

1199 

1200 def add_get( 

1201 self, 

1202 path: str, 

1203 handler: Handler, 

1204 *, 

1205 name: Optional[str] = None, 

1206 allow_head: bool = True, 

1207 **kwargs: Any, 

1208 ) -> AbstractRoute: 

1209 """Shortcut for add_route with method GET. 

1210 

1211 If allow_head is true, another 

1212 route is added allowing head requests to the same endpoint. 

1213 """ 

1214 resource = self.add_resource(path, name=name) 

1215 if allow_head: 

1216 resource.add_route(hdrs.METH_HEAD, handler, **kwargs) 

1217 return resource.add_route(hdrs.METH_GET, handler, **kwargs) 

1218 

1219 def add_post(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1220 """Shortcut for add_route with method POST.""" 

1221 return self.add_route(hdrs.METH_POST, path, handler, **kwargs) 

1222 

1223 def add_put(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1224 """Shortcut for add_route with method PUT.""" 

1225 return self.add_route(hdrs.METH_PUT, path, handler, **kwargs) 

1226 

1227 def add_patch(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1228 """Shortcut for add_route with method PATCH.""" 

1229 return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs) 

1230 

1231 def add_delete(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute: 

1232 """Shortcut for add_route with method DELETE.""" 

1233 return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs) 

1234 

1235 def add_view( 

1236 self, path: str, handler: Type[AbstractView], **kwargs: Any 

1237 ) -> AbstractRoute: 

1238 """Shortcut for add_route with ANY methods for a class-based view.""" 

1239 return self.add_route(hdrs.METH_ANY, path, handler, **kwargs) 

1240 

1241 def freeze(self) -> None: 

1242 super().freeze() 

1243 for resource in self._resources: 

1244 resource.freeze() 

1245 

1246 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]: 

1247 """Append routes to route table. 

1248 

1249 Parameter should be a sequence of RouteDef objects. 

1250 

1251 Returns a list of registered AbstractRoute instances. 

1252 """ 

1253 registered_routes = [] 

1254 for route_def in routes: 

1255 registered_routes.extend(route_def.register(self)) 

1256 return registered_routes 

1257 

1258 

1259def _quote_path(value: str) -> str: 

1260 return URL.build(path=value, encoded=False).raw_path 

1261 

1262 

1263def _unquote_path_safe(value: str) -> str: 

1264 if "%" not in value: 

1265 return value 

1266 return value.replace("%2F", "/").replace("%25", "%") 

1267 

1268 

1269def _requote_path(value: str) -> str: 

1270 # Quote non-ascii characters and other characters which must be quoted, 

1271 # but preserve existing %-sequences. 

1272 result = _quote_path(value) 

1273 if "%" in value: 

1274 result = result.replace("%25", "%") 

1275 return result