Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/routing/map.py: 19%

312 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import posixpath 

2import typing as t 

3import warnings 

4from pprint import pformat 

5from threading import Lock 

6 

7from .._internal import _encode_idna 

8from .._internal import _get_environ 

9from .._internal import _to_str 

10from .._internal import _wsgi_decoding_dance 

11from ..datastructures import ImmutableDict 

12from ..datastructures import MultiDict 

13from ..exceptions import BadHost 

14from ..exceptions import HTTPException 

15from ..exceptions import MethodNotAllowed 

16from ..exceptions import NotFound 

17from ..urls import url_encode 

18from ..urls import url_join 

19from ..urls import url_quote 

20from ..wsgi import get_host 

21from .converters import DEFAULT_CONVERTERS 

22from .exceptions import BuildError 

23from .exceptions import NoMatch 

24from .exceptions import RequestAliasRedirect 

25from .exceptions import RequestPath 

26from .exceptions import RequestRedirect 

27from .exceptions import WebsocketMismatch 

28from .matcher import StateMachineMatcher 

29from .rules import _simple_rule_re 

30from .rules import Rule 

31 

32if t.TYPE_CHECKING: 

33 import typing_extensions as te 

34 from _typeshed.wsgi import WSGIApplication 

35 from _typeshed.wsgi import WSGIEnvironment 

36 from .converters import BaseConverter 

37 from .rules import RuleFactory 

38 from ..wrappers.request import Request 

39 

40 

41class Map: 

42 """The map class stores all the URL rules and some configuration 

43 parameters. Some of the configuration values are only stored on the 

44 `Map` instance since those affect all rules, others are just defaults 

45 and can be overridden for each rule. Note that you have to specify all 

46 arguments besides the `rules` as keyword arguments! 

47 

48 :param rules: sequence of url rules for this map. 

49 :param default_subdomain: The default subdomain for rules without a 

50 subdomain defined. 

51 :param charset: charset of the url. defaults to ``"utf-8"`` 

52 :param strict_slashes: If a rule ends with a slash but the matched 

53 URL does not, redirect to the URL with a trailing slash. 

54 :param merge_slashes: Merge consecutive slashes when matching or 

55 building URLs. Matches will redirect to the normalized URL. 

56 Slashes in variable parts are not merged. 

57 :param redirect_defaults: This will redirect to the default rule if it 

58 wasn't visited that way. This helps creating 

59 unique URLs. 

60 :param converters: A dict of converters that adds additional converters 

61 to the list of converters. If you redefine one 

62 converter this will override the original one. 

63 :param sort_parameters: If set to `True` the url parameters are sorted. 

64 See `url_encode` for more details. 

65 :param sort_key: The sort key function for `url_encode`. 

66 :param encoding_errors: the error method to use for decoding 

67 :param host_matching: if set to `True` it enables the host matching 

68 feature and disables the subdomain one. If 

69 enabled the `host` parameter to rules is used 

70 instead of the `subdomain` one. 

71 

72 .. versionchanged:: 1.0 

73 If ``url_scheme`` is ``ws`` or ``wss``, only WebSocket rules 

74 will match. 

75 

76 .. versionchanged:: 1.0 

77 Added ``merge_slashes``. 

78 

79 .. versionchanged:: 0.7 

80 Added ``encoding_errors`` and ``host_matching``. 

81 

82 .. versionchanged:: 0.5 

83 Added ``sort_parameters`` and ``sort_key``. 

84 """ 

85 

86 #: A dict of default converters to be used. 

87 default_converters = ImmutableDict(DEFAULT_CONVERTERS) 

88 

89 #: The type of lock to use when updating. 

90 #: 

91 #: .. versionadded:: 1.0 

92 lock_class = Lock 

93 

94 def __init__( 

95 self, 

96 rules: t.Optional[t.Iterable["RuleFactory"]] = None, 

97 default_subdomain: str = "", 

98 charset: str = "utf-8", 

99 strict_slashes: bool = True, 

100 merge_slashes: bool = True, 

101 redirect_defaults: bool = True, 

102 converters: t.Optional[t.Mapping[str, t.Type["BaseConverter"]]] = None, 

103 sort_parameters: bool = False, 

104 sort_key: t.Optional[t.Callable[[t.Any], t.Any]] = None, 

105 encoding_errors: str = "replace", 

106 host_matching: bool = False, 

107 ) -> None: 

108 self._matcher = StateMachineMatcher(merge_slashes) 

109 self._rules_by_endpoint: t.Dict[str, t.List[Rule]] = {} 

110 self._remap = True 

111 self._remap_lock = self.lock_class() 

112 

113 self.default_subdomain = default_subdomain 

114 self.charset = charset 

115 self.encoding_errors = encoding_errors 

116 self.strict_slashes = strict_slashes 

117 self.merge_slashes = merge_slashes 

118 self.redirect_defaults = redirect_defaults 

119 self.host_matching = host_matching 

120 

121 self.converters = self.default_converters.copy() 

122 if converters: 

123 self.converters.update(converters) 

124 

125 self.sort_parameters = sort_parameters 

126 self.sort_key = sort_key 

127 

128 for rulefactory in rules or (): 

129 self.add(rulefactory) 

130 

131 def is_endpoint_expecting(self, endpoint: str, *arguments: str) -> bool: 

132 """Iterate over all rules and check if the endpoint expects 

133 the arguments provided. This is for example useful if you have 

134 some URLs that expect a language code and others that do not and 

135 you want to wrap the builder a bit so that the current language 

136 code is automatically added if not provided but endpoints expect 

137 it. 

138 

139 :param endpoint: the endpoint to check. 

140 :param arguments: this function accepts one or more arguments 

141 as positional arguments. Each one of them is 

142 checked. 

143 """ 

144 self.update() 

145 arguments = set(arguments) 

146 for rule in self._rules_by_endpoint[endpoint]: 

147 if arguments.issubset(rule.arguments): 

148 return True 

149 return False 

150 

151 @property 

152 def _rules(self) -> t.List[Rule]: 

153 return [rule for rules in self._rules_by_endpoint.values() for rule in rules] 

154 

155 def iter_rules(self, endpoint: t.Optional[str] = None) -> t.Iterator[Rule]: 

156 """Iterate over all rules or the rules of an endpoint. 

157 

158 :param endpoint: if provided only the rules for that endpoint 

159 are returned. 

160 :return: an iterator 

161 """ 

162 self.update() 

163 if endpoint is not None: 

164 return iter(self._rules_by_endpoint[endpoint]) 

165 return iter(self._rules) 

166 

167 def add(self, rulefactory: "RuleFactory") -> None: 

168 """Add a new rule or factory to the map and bind it. Requires that the 

169 rule is not bound to another map. 

170 

171 :param rulefactory: a :class:`Rule` or :class:`RuleFactory` 

172 """ 

173 for rule in rulefactory.get_rules(self): 

174 rule.bind(self) 

175 if not rule.build_only: 

176 self._matcher.add(rule) 

177 self._rules_by_endpoint.setdefault(rule.endpoint, []).append(rule) 

178 self._remap = True 

179 

180 def bind( 

181 self, 

182 server_name: str, 

183 script_name: t.Optional[str] = None, 

184 subdomain: t.Optional[str] = None, 

185 url_scheme: str = "http", 

186 default_method: str = "GET", 

187 path_info: t.Optional[str] = None, 

188 query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None, 

189 ) -> "MapAdapter": 

190 """Return a new :class:`MapAdapter` with the details specified to the 

191 call. Note that `script_name` will default to ``'/'`` if not further 

192 specified or `None`. The `server_name` at least is a requirement 

193 because the HTTP RFC requires absolute URLs for redirects and so all 

194 redirect exceptions raised by Werkzeug will contain the full canonical 

195 URL. 

196 

197 If no path_info is passed to :meth:`match` it will use the default path 

198 info passed to bind. While this doesn't really make sense for 

199 manual bind calls, it's useful if you bind a map to a WSGI 

200 environment which already contains the path info. 

201 

202 `subdomain` will default to the `default_subdomain` for this map if 

203 no defined. If there is no `default_subdomain` you cannot use the 

204 subdomain feature. 

205 

206 .. versionchanged:: 1.0 

207 If ``url_scheme`` is ``ws`` or ``wss``, only WebSocket rules 

208 will match. 

209 

210 .. versionchanged:: 0.15 

211 ``path_info`` defaults to ``'/'`` if ``None``. 

212 

213 .. versionchanged:: 0.8 

214 ``query_args`` can be a string. 

215 

216 .. versionchanged:: 0.7 

217 Added ``query_args``. 

218 """ 

219 server_name = server_name.lower() 

220 if self.host_matching: 

221 if subdomain is not None: 

222 raise RuntimeError("host matching enabled and a subdomain was provided") 

223 elif subdomain is None: 

224 subdomain = self.default_subdomain 

225 if script_name is None: 

226 script_name = "/" 

227 if path_info is None: 

228 path_info = "/" 

229 

230 try: 

231 server_name = _encode_idna(server_name) # type: ignore 

232 except UnicodeError as e: 

233 raise BadHost() from e 

234 

235 return MapAdapter( 

236 self, 

237 server_name, 

238 script_name, 

239 subdomain, 

240 url_scheme, 

241 path_info, 

242 default_method, 

243 query_args, 

244 ) 

245 

246 def bind_to_environ( 

247 self, 

248 environ: t.Union["WSGIEnvironment", "Request"], 

249 server_name: t.Optional[str] = None, 

250 subdomain: t.Optional[str] = None, 

251 ) -> "MapAdapter": 

252 """Like :meth:`bind` but you can pass it an WSGI environment and it 

253 will fetch the information from that dictionary. Note that because of 

254 limitations in the protocol there is no way to get the current 

255 subdomain and real `server_name` from the environment. If you don't 

256 provide it, Werkzeug will use `SERVER_NAME` and `SERVER_PORT` (or 

257 `HTTP_HOST` if provided) as used `server_name` with disabled subdomain 

258 feature. 

259 

260 If `subdomain` is `None` but an environment and a server name is 

261 provided it will calculate the current subdomain automatically. 

262 Example: `server_name` is ``'example.com'`` and the `SERVER_NAME` 

263 in the wsgi `environ` is ``'staging.dev.example.com'`` the calculated 

264 subdomain will be ``'staging.dev'``. 

265 

266 If the object passed as environ has an environ attribute, the value of 

267 this attribute is used instead. This allows you to pass request 

268 objects. Additionally `PATH_INFO` added as a default of the 

269 :class:`MapAdapter` so that you don't have to pass the path info to 

270 the match method. 

271 

272 .. versionchanged:: 1.0.0 

273 If the passed server name specifies port 443, it will match 

274 if the incoming scheme is ``https`` without a port. 

275 

276 .. versionchanged:: 1.0.0 

277 A warning is shown when the passed server name does not 

278 match the incoming WSGI server name. 

279 

280 .. versionchanged:: 0.8 

281 This will no longer raise a ValueError when an unexpected server 

282 name was passed. 

283 

284 .. versionchanged:: 0.5 

285 previously this method accepted a bogus `calculate_subdomain` 

286 parameter that did not have any effect. It was removed because 

287 of that. 

288 

289 :param environ: a WSGI environment. 

290 :param server_name: an optional server name hint (see above). 

291 :param subdomain: optionally the current subdomain (see above). 

292 """ 

293 env = _get_environ(environ) 

294 wsgi_server_name = get_host(env).lower() 

295 scheme = env["wsgi.url_scheme"] 

296 upgrade = any( 

297 v.strip() == "upgrade" 

298 for v in env.get("HTTP_CONNECTION", "").lower().split(",") 

299 ) 

300 

301 if upgrade and env.get("HTTP_UPGRADE", "").lower() == "websocket": 

302 scheme = "wss" if scheme == "https" else "ws" 

303 

304 if server_name is None: 

305 server_name = wsgi_server_name 

306 else: 

307 server_name = server_name.lower() 

308 

309 # strip standard port to match get_host() 

310 if scheme in {"http", "ws"} and server_name.endswith(":80"): 

311 server_name = server_name[:-3] 

312 elif scheme in {"https", "wss"} and server_name.endswith(":443"): 

313 server_name = server_name[:-4] 

314 

315 if subdomain is None and not self.host_matching: 

316 cur_server_name = wsgi_server_name.split(".") 

317 real_server_name = server_name.split(".") 

318 offset = -len(real_server_name) 

319 

320 if cur_server_name[offset:] != real_server_name: 

321 # This can happen even with valid configs if the server was 

322 # accessed directly by IP address under some situations. 

323 # Instead of raising an exception like in Werkzeug 0.7 or 

324 # earlier we go by an invalid subdomain which will result 

325 # in a 404 error on matching. 

326 warnings.warn( 

327 f"Current server name {wsgi_server_name!r} doesn't match configured" 

328 f" server name {server_name!r}", 

329 stacklevel=2, 

330 ) 

331 subdomain = "<invalid>" 

332 else: 

333 subdomain = ".".join(filter(None, cur_server_name[:offset])) 

334 

335 def _get_wsgi_string(name: str) -> t.Optional[str]: 

336 val = env.get(name) 

337 if val is not None: 

338 return _wsgi_decoding_dance(val, self.charset) 

339 return None 

340 

341 script_name = _get_wsgi_string("SCRIPT_NAME") 

342 path_info = _get_wsgi_string("PATH_INFO") 

343 query_args = _get_wsgi_string("QUERY_STRING") 

344 return Map.bind( 

345 self, 

346 server_name, 

347 script_name, 

348 subdomain, 

349 scheme, 

350 env["REQUEST_METHOD"], 

351 path_info, 

352 query_args=query_args, 

353 ) 

354 

355 def update(self) -> None: 

356 """Called before matching and building to keep the compiled rules 

357 in the correct order after things changed. 

358 """ 

359 if not self._remap: 

360 return 

361 

362 with self._remap_lock: 

363 if not self._remap: 

364 return 

365 

366 self._matcher.update() 

367 for rules in self._rules_by_endpoint.values(): 

368 rules.sort(key=lambda x: x.build_compare_key()) 

369 self._remap = False 

370 

371 def __repr__(self) -> str: 

372 rules = self.iter_rules() 

373 return f"{type(self).__name__}({pformat(list(rules))})" 

374 

375 

376class MapAdapter: 

377 

378 """Returned by :meth:`Map.bind` or :meth:`Map.bind_to_environ` and does 

379 the URL matching and building based on runtime information. 

380 """ 

381 

382 def __init__( 

383 self, 

384 map: Map, 

385 server_name: str, 

386 script_name: str, 

387 subdomain: t.Optional[str], 

388 url_scheme: str, 

389 path_info: str, 

390 default_method: str, 

391 query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None, 

392 ): 

393 self.map = map 

394 self.server_name = _to_str(server_name) 

395 script_name = _to_str(script_name) 

396 if not script_name.endswith("/"): 

397 script_name += "/" 

398 self.script_name = script_name 

399 self.subdomain = _to_str(subdomain) 

400 self.url_scheme = _to_str(url_scheme) 

401 self.path_info = _to_str(path_info) 

402 self.default_method = _to_str(default_method) 

403 self.query_args = query_args 

404 self.websocket = self.url_scheme in {"ws", "wss"} 

405 

406 def dispatch( 

407 self, 

408 view_func: t.Callable[[str, t.Mapping[str, t.Any]], "WSGIApplication"], 

409 path_info: t.Optional[str] = None, 

410 method: t.Optional[str] = None, 

411 catch_http_exceptions: bool = False, 

412 ) -> "WSGIApplication": 

413 """Does the complete dispatching process. `view_func` is called with 

414 the endpoint and a dict with the values for the view. It should 

415 look up the view function, call it, and return a response object 

416 or WSGI application. http exceptions are not caught by default 

417 so that applications can display nicer error messages by just 

418 catching them by hand. If you want to stick with the default 

419 error messages you can pass it ``catch_http_exceptions=True`` and 

420 it will catch the http exceptions. 

421 

422 Here a small example for the dispatch usage:: 

423 

424 from werkzeug.wrappers import Request, Response 

425 from werkzeug.wsgi import responder 

426 from werkzeug.routing import Map, Rule 

427 

428 def on_index(request): 

429 return Response('Hello from the index') 

430 

431 url_map = Map([Rule('/', endpoint='index')]) 

432 views = {'index': on_index} 

433 

434 @responder 

435 def application(environ, start_response): 

436 request = Request(environ) 

437 urls = url_map.bind_to_environ(environ) 

438 return urls.dispatch(lambda e, v: views[e](request, **v), 

439 catch_http_exceptions=True) 

440 

441 Keep in mind that this method might return exception objects, too, so 

442 use :class:`Response.force_type` to get a response object. 

443 

444 :param view_func: a function that is called with the endpoint as 

445 first argument and the value dict as second. Has 

446 to dispatch to the actual view function with this 

447 information. (see above) 

448 :param path_info: the path info to use for matching. Overrides the 

449 path info specified on binding. 

450 :param method: the HTTP method used for matching. Overrides the 

451 method specified on binding. 

452 :param catch_http_exceptions: set to `True` to catch any of the 

453 werkzeug :class:`HTTPException`\\s. 

454 """ 

455 try: 

456 try: 

457 endpoint, args = self.match(path_info, method) 

458 except RequestRedirect as e: 

459 return e 

460 return view_func(endpoint, args) 

461 except HTTPException as e: 

462 if catch_http_exceptions: 

463 return e 

464 raise 

465 

466 @t.overload 

467 def match( # type: ignore 

468 self, 

469 path_info: t.Optional[str] = None, 

470 method: t.Optional[str] = None, 

471 return_rule: "te.Literal[False]" = False, 

472 query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None, 

473 websocket: t.Optional[bool] = None, 

474 ) -> t.Tuple[str, t.Mapping[str, t.Any]]: 

475 ... 

476 

477 @t.overload 

478 def match( 

479 self, 

480 path_info: t.Optional[str] = None, 

481 method: t.Optional[str] = None, 

482 return_rule: "te.Literal[True]" = True, 

483 query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None, 

484 websocket: t.Optional[bool] = None, 

485 ) -> t.Tuple[Rule, t.Mapping[str, t.Any]]: 

486 ... 

487 

488 def match( 

489 self, 

490 path_info: t.Optional[str] = None, 

491 method: t.Optional[str] = None, 

492 return_rule: bool = False, 

493 query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None, 

494 websocket: t.Optional[bool] = None, 

495 ) -> t.Tuple[t.Union[str, Rule], t.Mapping[str, t.Any]]: 

496 """The usage is simple: you just pass the match method the current 

497 path info as well as the method (which defaults to `GET`). The 

498 following things can then happen: 

499 

500 - you receive a `NotFound` exception that indicates that no URL is 

501 matching. A `NotFound` exception is also a WSGI application you 

502 can call to get a default page not found page (happens to be the 

503 same object as `werkzeug.exceptions.NotFound`) 

504 

505 - you receive a `MethodNotAllowed` exception that indicates that there 

506 is a match for this URL but not for the current request method. 

507 This is useful for RESTful applications. 

508 

509 - you receive a `RequestRedirect` exception with a `new_url` 

510 attribute. This exception is used to notify you about a request 

511 Werkzeug requests from your WSGI application. This is for example the 

512 case if you request ``/foo`` although the correct URL is ``/foo/`` 

513 You can use the `RequestRedirect` instance as response-like object 

514 similar to all other subclasses of `HTTPException`. 

515 

516 - you receive a ``WebsocketMismatch`` exception if the only 

517 match is a WebSocket rule but the bind is an HTTP request, or 

518 if the match is an HTTP rule but the bind is a WebSocket 

519 request. 

520 

521 - you get a tuple in the form ``(endpoint, arguments)`` if there is 

522 a match (unless `return_rule` is True, in which case you get a tuple 

523 in the form ``(rule, arguments)``) 

524 

525 If the path info is not passed to the match method the default path 

526 info of the map is used (defaults to the root URL if not defined 

527 explicitly). 

528 

529 All of the exceptions raised are subclasses of `HTTPException` so they 

530 can be used as WSGI responses. They will all render generic error or 

531 redirect pages. 

532 

533 Here is a small example for matching: 

534 

535 >>> m = Map([ 

536 ... Rule('/', endpoint='index'), 

537 ... Rule('/downloads/', endpoint='downloads/index'), 

538 ... Rule('/downloads/<int:id>', endpoint='downloads/show') 

539 ... ]) 

540 >>> urls = m.bind("example.com", "/") 

541 >>> urls.match("/", "GET") 

542 ('index', {}) 

543 >>> urls.match("/downloads/42") 

544 ('downloads/show', {'id': 42}) 

545 

546 And here is what happens on redirect and missing URLs: 

547 

548 >>> urls.match("/downloads") 

549 Traceback (most recent call last): 

550 ... 

551 RequestRedirect: http://example.com/downloads/ 

552 >>> urls.match("/missing") 

553 Traceback (most recent call last): 

554 ... 

555 NotFound: 404 Not Found 

556 

557 :param path_info: the path info to use for matching. Overrides the 

558 path info specified on binding. 

559 :param method: the HTTP method used for matching. Overrides the 

560 method specified on binding. 

561 :param return_rule: return the rule that matched instead of just the 

562 endpoint (defaults to `False`). 

563 :param query_args: optional query arguments that are used for 

564 automatic redirects as string or dictionary. It's 

565 currently not possible to use the query arguments 

566 for URL matching. 

567 :param websocket: Match WebSocket instead of HTTP requests. A 

568 websocket request has a ``ws`` or ``wss`` 

569 :attr:`url_scheme`. This overrides that detection. 

570 

571 .. versionadded:: 1.0 

572 Added ``websocket``. 

573 

574 .. versionchanged:: 0.8 

575 ``query_args`` can be a string. 

576 

577 .. versionadded:: 0.7 

578 Added ``query_args``. 

579 

580 .. versionadded:: 0.6 

581 Added ``return_rule``. 

582 """ 

583 self.map.update() 

584 if path_info is None: 

585 path_info = self.path_info 

586 else: 

587 path_info = _to_str(path_info, self.map.charset) 

588 if query_args is None: 

589 query_args = self.query_args or {} 

590 method = (method or self.default_method).upper() 

591 

592 if websocket is None: 

593 websocket = self.websocket 

594 

595 domain_part = self.server_name if self.map.host_matching else self.subdomain 

596 path_part = f"/{path_info.lstrip('/')}" if path_info else "" 

597 

598 try: 

599 result = self.map._matcher.match(domain_part, path_part, method, websocket) 

600 except RequestPath as e: 

601 raise RequestRedirect( 

602 self.make_redirect_url( 

603 url_quote(e.path_info, self.map.charset, safe="/:|+"), 

604 query_args, 

605 ) 

606 ) from None 

607 except RequestAliasRedirect as e: 

608 raise RequestRedirect( 

609 self.make_alias_redirect_url( 

610 f"{domain_part}|{path_part}", 

611 e.endpoint, 

612 e.matched_values, 

613 method, 

614 query_args, 

615 ) 

616 ) from None 

617 except NoMatch as e: 

618 if e.have_match_for: 

619 raise MethodNotAllowed(valid_methods=list(e.have_match_for)) from None 

620 

621 if e.websocket_mismatch: 

622 raise WebsocketMismatch() from None 

623 

624 raise NotFound() from None 

625 else: 

626 rule, rv = result 

627 

628 if self.map.redirect_defaults: 

629 redirect_url = self.get_default_redirect(rule, method, rv, query_args) 

630 if redirect_url is not None: 

631 raise RequestRedirect(redirect_url) 

632 

633 if rule.redirect_to is not None: 

634 if isinstance(rule.redirect_to, str): 

635 

636 def _handle_match(match: t.Match[str]) -> str: 

637 value = rv[match.group(1)] 

638 return rule._converters[match.group(1)].to_url(value) 

639 

640 redirect_url = _simple_rule_re.sub(_handle_match, rule.redirect_to) 

641 else: 

642 redirect_url = rule.redirect_to(self, **rv) 

643 

644 if self.subdomain: 

645 netloc = f"{self.subdomain}.{self.server_name}" 

646 else: 

647 netloc = self.server_name 

648 

649 raise RequestRedirect( 

650 url_join( 

651 f"{self.url_scheme or 'http'}://{netloc}{self.script_name}", 

652 redirect_url, 

653 ) 

654 ) 

655 

656 if return_rule: 

657 return rule, rv 

658 else: 

659 return rule.endpoint, rv 

660 

661 def test( 

662 self, path_info: t.Optional[str] = None, method: t.Optional[str] = None 

663 ) -> bool: 

664 """Test if a rule would match. Works like `match` but returns `True` 

665 if the URL matches, or `False` if it does not exist. 

666 

667 :param path_info: the path info to use for matching. Overrides the 

668 path info specified on binding. 

669 :param method: the HTTP method used for matching. Overrides the 

670 method specified on binding. 

671 """ 

672 try: 

673 self.match(path_info, method) 

674 except RequestRedirect: 

675 pass 

676 except HTTPException: 

677 return False 

678 return True 

679 

680 def allowed_methods(self, path_info: t.Optional[str] = None) -> t.Iterable[str]: 

681 """Returns the valid methods that match for a given path. 

682 

683 .. versionadded:: 0.7 

684 """ 

685 try: 

686 self.match(path_info, method="--") 

687 except MethodNotAllowed as e: 

688 return e.valid_methods # type: ignore 

689 except HTTPException: 

690 pass 

691 return [] 

692 

693 def get_host(self, domain_part: t.Optional[str]) -> str: 

694 """Figures out the full host name for the given domain part. The 

695 domain part is a subdomain in case host matching is disabled or 

696 a full host name. 

697 """ 

698 if self.map.host_matching: 

699 if domain_part is None: 

700 return self.server_name 

701 return _to_str(domain_part, "ascii") 

702 subdomain = domain_part 

703 if subdomain is None: 

704 subdomain = self.subdomain 

705 else: 

706 subdomain = _to_str(subdomain, "ascii") 

707 

708 if subdomain: 

709 return f"{subdomain}.{self.server_name}" 

710 else: 

711 return self.server_name 

712 

713 def get_default_redirect( 

714 self, 

715 rule: Rule, 

716 method: str, 

717 values: t.MutableMapping[str, t.Any], 

718 query_args: t.Union[t.Mapping[str, t.Any], str], 

719 ) -> t.Optional[str]: 

720 """A helper that returns the URL to redirect to if it finds one. 

721 This is used for default redirecting only. 

722 

723 :internal: 

724 """ 

725 assert self.map.redirect_defaults 

726 for r in self.map._rules_by_endpoint[rule.endpoint]: 

727 # every rule that comes after this one, including ourself 

728 # has a lower priority for the defaults. We order the ones 

729 # with the highest priority up for building. 

730 if r is rule: 

731 break 

732 if r.provides_defaults_for(rule) and r.suitable_for(values, method): 

733 values.update(r.defaults) # type: ignore 

734 domain_part, path = r.build(values) # type: ignore 

735 return self.make_redirect_url(path, query_args, domain_part=domain_part) 

736 return None 

737 

738 def encode_query_args(self, query_args: t.Union[t.Mapping[str, t.Any], str]) -> str: 

739 if not isinstance(query_args, str): 

740 return url_encode(query_args, self.map.charset) 

741 return query_args 

742 

743 def make_redirect_url( 

744 self, 

745 path_info: str, 

746 query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None, 

747 domain_part: t.Optional[str] = None, 

748 ) -> str: 

749 """Creates a redirect URL. 

750 

751 :internal: 

752 """ 

753 if query_args: 

754 suffix = f"?{self.encode_query_args(query_args)}" 

755 else: 

756 suffix = "" 

757 

758 scheme = self.url_scheme or "http" 

759 host = self.get_host(domain_part) 

760 path = posixpath.join(self.script_name.strip("/"), path_info.lstrip("/")) 

761 return f"{scheme}://{host}/{path}{suffix}" 

762 

763 def make_alias_redirect_url( 

764 self, 

765 path: str, 

766 endpoint: str, 

767 values: t.Mapping[str, t.Any], 

768 method: str, 

769 query_args: t.Union[t.Mapping[str, t.Any], str], 

770 ) -> str: 

771 """Internally called to make an alias redirect URL.""" 

772 url = self.build( 

773 endpoint, values, method, append_unknown=False, force_external=True 

774 ) 

775 if query_args: 

776 url += f"?{self.encode_query_args(query_args)}" 

777 assert url != path, "detected invalid alias setting. No canonical URL found" 

778 return url 

779 

780 def _partial_build( 

781 self, 

782 endpoint: str, 

783 values: t.Mapping[str, t.Any], 

784 method: t.Optional[str], 

785 append_unknown: bool, 

786 ) -> t.Optional[t.Tuple[str, str, bool]]: 

787 """Helper for :meth:`build`. Returns subdomain and path for the 

788 rule that accepts this endpoint, values and method. 

789 

790 :internal: 

791 """ 

792 # in case the method is none, try with the default method first 

793 if method is None: 

794 rv = self._partial_build( 

795 endpoint, values, self.default_method, append_unknown 

796 ) 

797 if rv is not None: 

798 return rv 

799 

800 # Default method did not match or a specific method is passed. 

801 # Check all for first match with matching host. If no matching 

802 # host is found, go with first result. 

803 first_match = None 

804 

805 for rule in self.map._rules_by_endpoint.get(endpoint, ()): 

806 if rule.suitable_for(values, method): 

807 build_rv = rule.build(values, append_unknown) 

808 

809 if build_rv is not None: 

810 rv = (build_rv[0], build_rv[1], rule.websocket) 

811 if self.map.host_matching: 

812 if rv[0] == self.server_name: 

813 return rv 

814 elif first_match is None: 

815 first_match = rv 

816 else: 

817 return rv 

818 

819 return first_match 

820 

821 def build( 

822 self, 

823 endpoint: str, 

824 values: t.Optional[t.Mapping[str, t.Any]] = None, 

825 method: t.Optional[str] = None, 

826 force_external: bool = False, 

827 append_unknown: bool = True, 

828 url_scheme: t.Optional[str] = None, 

829 ) -> str: 

830 """Building URLs works pretty much the other way round. Instead of 

831 `match` you call `build` and pass it the endpoint and a dict of 

832 arguments for the placeholders. 

833 

834 The `build` function also accepts an argument called `force_external` 

835 which, if you set it to `True` will force external URLs. Per default 

836 external URLs (include the server name) will only be used if the 

837 target URL is on a different subdomain. 

838 

839 >>> m = Map([ 

840 ... Rule('/', endpoint='index'), 

841 ... Rule('/downloads/', endpoint='downloads/index'), 

842 ... Rule('/downloads/<int:id>', endpoint='downloads/show') 

843 ... ]) 

844 >>> urls = m.bind("example.com", "/") 

845 >>> urls.build("index", {}) 

846 '/' 

847 >>> urls.build("downloads/show", {'id': 42}) 

848 '/downloads/42' 

849 >>> urls.build("downloads/show", {'id': 42}, force_external=True) 

850 'http://example.com/downloads/42' 

851 

852 Because URLs cannot contain non ASCII data you will always get 

853 bytes back. Non ASCII characters are urlencoded with the 

854 charset defined on the map instance. 

855 

856 Additional values are converted to strings and appended to the URL as 

857 URL querystring parameters: 

858 

859 >>> urls.build("index", {'q': 'My Searchstring'}) 

860 '/?q=My+Searchstring' 

861 

862 When processing those additional values, lists are furthermore 

863 interpreted as multiple values (as per 

864 :py:class:`werkzeug.datastructures.MultiDict`): 

865 

866 >>> urls.build("index", {'q': ['a', 'b', 'c']}) 

867 '/?q=a&q=b&q=c' 

868 

869 Passing a ``MultiDict`` will also add multiple values: 

870 

871 >>> urls.build("index", MultiDict((('p', 'z'), ('q', 'a'), ('q', 'b')))) 

872 '/?p=z&q=a&q=b' 

873 

874 If a rule does not exist when building a `BuildError` exception is 

875 raised. 

876 

877 The build method accepts an argument called `method` which allows you 

878 to specify the method you want to have an URL built for if you have 

879 different methods for the same endpoint specified. 

880 

881 :param endpoint: the endpoint of the URL to build. 

882 :param values: the values for the URL to build. Unhandled values are 

883 appended to the URL as query parameters. 

884 :param method: the HTTP method for the rule if there are different 

885 URLs for different methods on the same endpoint. 

886 :param force_external: enforce full canonical external URLs. If the URL 

887 scheme is not provided, this will generate 

888 a protocol-relative URL. 

889 :param append_unknown: unknown parameters are appended to the generated 

890 URL as query string argument. Disable this 

891 if you want the builder to ignore those. 

892 :param url_scheme: Scheme to use in place of the bound 

893 :attr:`url_scheme`. 

894 

895 .. versionchanged:: 2.0 

896 Added the ``url_scheme`` parameter. 

897 

898 .. versionadded:: 0.6 

899 Added the ``append_unknown`` parameter. 

900 """ 

901 self.map.update() 

902 

903 if values: 

904 if isinstance(values, MultiDict): 

905 values = { 

906 k: (v[0] if len(v) == 1 else v) 

907 for k, v in dict.items(values) 

908 if len(v) != 0 

909 } 

910 else: # plain dict 

911 values = {k: v for k, v in values.items() if v is not None} 

912 else: 

913 values = {} 

914 

915 rv = self._partial_build(endpoint, values, method, append_unknown) 

916 if rv is None: 

917 raise BuildError(endpoint, values, method, self) 

918 

919 domain_part, path, websocket = rv 

920 host = self.get_host(domain_part) 

921 

922 if url_scheme is None: 

923 url_scheme = self.url_scheme 

924 

925 # Always build WebSocket routes with the scheme (browsers 

926 # require full URLs). If bound to a WebSocket, ensure that HTTP 

927 # routes are built with an HTTP scheme. 

928 secure = url_scheme in {"https", "wss"} 

929 

930 if websocket: 

931 force_external = True 

932 url_scheme = "wss" if secure else "ws" 

933 elif url_scheme: 

934 url_scheme = "https" if secure else "http" 

935 

936 # shortcut this. 

937 if not force_external and ( 

938 (self.map.host_matching and host == self.server_name) 

939 or (not self.map.host_matching and domain_part == self.subdomain) 

940 ): 

941 return f"{self.script_name.rstrip('/')}/{path.lstrip('/')}" 

942 

943 scheme = f"{url_scheme}:" if url_scheme else "" 

944 return f"{scheme}//{host}{self.script_name[:-1]}/{path.lstrip('/')}"