Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/routing/matcher.py: 19%
116 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1import re
2import typing as t
3from dataclasses import dataclass
4from dataclasses import field
6from .converters import ValidationError
7from .exceptions import NoMatch
8from .exceptions import RequestAliasRedirect
9from .exceptions import RequestPath
10from .rules import Rule
11from .rules import RulePart
14class SlashRequired(Exception):
15 pass
18@dataclass
19class State:
20 """A representation of a rule state.
22 This includes the *rules* that correspond to the state and the
23 possible *static* and *dynamic* transitions to the next state.
24 """
26 dynamic: t.List[t.Tuple[RulePart, "State"]] = field(default_factory=list)
27 rules: t.List[Rule] = field(default_factory=list)
28 static: t.Dict[str, "State"] = field(default_factory=dict)
31class StateMachineMatcher:
32 def __init__(self, merge_slashes: bool) -> None:
33 self._root = State()
34 self.merge_slashes = merge_slashes
36 def add(self, rule: Rule) -> None:
37 state = self._root
38 for part in rule._parts:
39 if part.static:
40 state.static.setdefault(part.content, State())
41 state = state.static[part.content]
42 else:
43 for test_part, new_state in state.dynamic:
44 if test_part == part:
45 state = new_state
46 break
47 else:
48 new_state = State()
49 state.dynamic.append((part, new_state))
50 state = new_state
51 state.rules.append(rule)
53 def update(self) -> None:
54 # For every state the dynamic transitions should be sorted by
55 # the weight of the transition
56 state = self._root
58 def _update_state(state: State) -> None:
59 state.dynamic.sort(key=lambda entry: entry[0].weight)
60 for new_state in state.static.values():
61 _update_state(new_state)
62 for _, new_state in state.dynamic:
63 _update_state(new_state)
65 _update_state(state)
67 def match(
68 self, domain: str, path: str, method: str, websocket: bool
69 ) -> t.Tuple[Rule, t.MutableMapping[str, t.Any]]:
70 # To match to a rule we need to start at the root state and
71 # try to follow the transitions until we find a match, or find
72 # there is no transition to follow.
74 have_match_for = set()
75 websocket_mismatch = False
77 def _match(
78 state: State, parts: t.List[str], values: t.List[str]
79 ) -> t.Optional[t.Tuple[Rule, t.List[str]]]:
80 # This function is meant to be called recursively, and will attempt
81 # to match the head part to the state's transitions.
82 nonlocal have_match_for, websocket_mismatch
84 # The base case is when all parts have been matched via
85 # transitions. Hence if there is a rule with methods &
86 # websocket that work return it and the dynamic values
87 # extracted.
88 if parts == []:
89 for rule in state.rules:
90 if rule.methods is not None and method not in rule.methods:
91 have_match_for.update(rule.methods)
92 elif rule.websocket != websocket:
93 websocket_mismatch = True
94 else:
95 return rule, values
97 # Test if there is a match with this path with a
98 # trailing slash, if so raise an exception to report
99 # that matching is possible with an additional slash
100 if "" in state.static:
101 for rule in state.static[""].rules:
102 if websocket == rule.websocket and (
103 rule.methods is None or method in rule.methods
104 ):
105 if rule.strict_slashes:
106 raise SlashRequired()
107 else:
108 return rule, values
109 return None
111 part = parts[0]
112 # To match this part try the static transitions first
113 if part in state.static:
114 rv = _match(state.static[part], parts[1:], values)
115 if rv is not None:
116 return rv
117 # No match via the static transitions, so try the dynamic
118 # ones.
119 for test_part, new_state in state.dynamic:
120 target = part
121 remaining = parts[1:]
122 # A final part indicates a transition that always
123 # consumes the remaining parts i.e. transitions to a
124 # final state.
125 if test_part.final:
126 target = "/".join(parts)
127 remaining = []
128 match = re.compile(test_part.content).match(target)
129 if match is not None:
130 rv = _match(new_state, remaining, values + list(match.groups()))
131 if rv is not None:
132 return rv
134 # If there is no match and the only part left is a
135 # trailing slash ("") consider rules that aren't
136 # strict-slashes as these should match if there is a final
137 # slash part.
138 if parts == [""]:
139 for rule in state.rules:
140 if rule.strict_slashes:
141 continue
142 if rule.methods is not None and method not in rule.methods:
143 have_match_for.update(rule.methods)
144 elif rule.websocket != websocket:
145 websocket_mismatch = True
146 else:
147 return rule, values
149 return None
151 try:
152 rv = _match(self._root, [domain, *path.split("/")], [])
153 except SlashRequired:
154 raise RequestPath(f"{path}/") from None
156 if self.merge_slashes and rv is None:
157 # Try to match again, but with slashes merged
158 path = re.sub("/{2,}?", "/", path)
159 try:
160 rv = _match(self._root, [domain, *path.split("/")], [])
161 except SlashRequired:
162 raise RequestPath(f"{path}/") from None
163 if rv is None:
164 raise NoMatch(have_match_for, websocket_mismatch)
165 else:
166 raise RequestPath(f"{path}")
167 elif rv is not None:
168 rule, values = rv
170 result = {}
171 for name, value in zip(rule._converters.keys(), values):
172 try:
173 value = rule._converters[name].to_python(value)
174 except ValidationError:
175 raise NoMatch(have_match_for, websocket_mismatch) from None
176 result[str(name)] = value
177 if rule.defaults:
178 result.update(rule.defaults)
180 if rule.alias and rule.map.redirect_defaults:
181 raise RequestAliasRedirect(result, rule.endpoint)
183 return rule, result
185 raise NoMatch(have_match_for, websocket_mismatch)