Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/routing/matcher.py: 19%

116 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import re 

2import typing as t 

3from dataclasses import dataclass 

4from dataclasses import field 

5 

6from .converters import ValidationError 

7from .exceptions import NoMatch 

8from .exceptions import RequestAliasRedirect 

9from .exceptions import RequestPath 

10from .rules import Rule 

11from .rules import RulePart 

12 

13 

14class SlashRequired(Exception): 

15 pass 

16 

17 

18@dataclass 

19class State: 

20 """A representation of a rule state. 

21 

22 This includes the *rules* that correspond to the state and the 

23 possible *static* and *dynamic* transitions to the next state. 

24 """ 

25 

26 dynamic: t.List[t.Tuple[RulePart, "State"]] = field(default_factory=list) 

27 rules: t.List[Rule] = field(default_factory=list) 

28 static: t.Dict[str, "State"] = field(default_factory=dict) 

29 

30 

31class StateMachineMatcher: 

32 def __init__(self, merge_slashes: bool) -> None: 

33 self._root = State() 

34 self.merge_slashes = merge_slashes 

35 

36 def add(self, rule: Rule) -> None: 

37 state = self._root 

38 for part in rule._parts: 

39 if part.static: 

40 state.static.setdefault(part.content, State()) 

41 state = state.static[part.content] 

42 else: 

43 for test_part, new_state in state.dynamic: 

44 if test_part == part: 

45 state = new_state 

46 break 

47 else: 

48 new_state = State() 

49 state.dynamic.append((part, new_state)) 

50 state = new_state 

51 state.rules.append(rule) 

52 

53 def update(self) -> None: 

54 # For every state the dynamic transitions should be sorted by 

55 # the weight of the transition 

56 state = self._root 

57 

58 def _update_state(state: State) -> None: 

59 state.dynamic.sort(key=lambda entry: entry[0].weight) 

60 for new_state in state.static.values(): 

61 _update_state(new_state) 

62 for _, new_state in state.dynamic: 

63 _update_state(new_state) 

64 

65 _update_state(state) 

66 

67 def match( 

68 self, domain: str, path: str, method: str, websocket: bool 

69 ) -> t.Tuple[Rule, t.MutableMapping[str, t.Any]]: 

70 # To match to a rule we need to start at the root state and 

71 # try to follow the transitions until we find a match, or find 

72 # there is no transition to follow. 

73 

74 have_match_for = set() 

75 websocket_mismatch = False 

76 

77 def _match( 

78 state: State, parts: t.List[str], values: t.List[str] 

79 ) -> t.Optional[t.Tuple[Rule, t.List[str]]]: 

80 # This function is meant to be called recursively, and will attempt 

81 # to match the head part to the state's transitions. 

82 nonlocal have_match_for, websocket_mismatch 

83 

84 # The base case is when all parts have been matched via 

85 # transitions. Hence if there is a rule with methods & 

86 # websocket that work return it and the dynamic values 

87 # extracted. 

88 if parts == []: 

89 for rule in state.rules: 

90 if rule.methods is not None and method not in rule.methods: 

91 have_match_for.update(rule.methods) 

92 elif rule.websocket != websocket: 

93 websocket_mismatch = True 

94 else: 

95 return rule, values 

96 

97 # Test if there is a match with this path with a 

98 # trailing slash, if so raise an exception to report 

99 # that matching is possible with an additional slash 

100 if "" in state.static: 

101 for rule in state.static[""].rules: 

102 if websocket == rule.websocket and ( 

103 rule.methods is None or method in rule.methods 

104 ): 

105 if rule.strict_slashes: 

106 raise SlashRequired() 

107 else: 

108 return rule, values 

109 return None 

110 

111 part = parts[0] 

112 # To match this part try the static transitions first 

113 if part in state.static: 

114 rv = _match(state.static[part], parts[1:], values) 

115 if rv is not None: 

116 return rv 

117 # No match via the static transitions, so try the dynamic 

118 # ones. 

119 for test_part, new_state in state.dynamic: 

120 target = part 

121 remaining = parts[1:] 

122 # A final part indicates a transition that always 

123 # consumes the remaining parts i.e. transitions to a 

124 # final state. 

125 if test_part.final: 

126 target = "/".join(parts) 

127 remaining = [] 

128 match = re.compile(test_part.content).match(target) 

129 if match is not None: 

130 rv = _match(new_state, remaining, values + list(match.groups())) 

131 if rv is not None: 

132 return rv 

133 

134 # If there is no match and the only part left is a 

135 # trailing slash ("") consider rules that aren't 

136 # strict-slashes as these should match if there is a final 

137 # slash part. 

138 if parts == [""]: 

139 for rule in state.rules: 

140 if rule.strict_slashes: 

141 continue 

142 if rule.methods is not None and method not in rule.methods: 

143 have_match_for.update(rule.methods) 

144 elif rule.websocket != websocket: 

145 websocket_mismatch = True 

146 else: 

147 return rule, values 

148 

149 return None 

150 

151 try: 

152 rv = _match(self._root, [domain, *path.split("/")], []) 

153 except SlashRequired: 

154 raise RequestPath(f"{path}/") from None 

155 

156 if self.merge_slashes and rv is None: 

157 # Try to match again, but with slashes merged 

158 path = re.sub("/{2,}?", "/", path) 

159 try: 

160 rv = _match(self._root, [domain, *path.split("/")], []) 

161 except SlashRequired: 

162 raise RequestPath(f"{path}/") from None 

163 if rv is None: 

164 raise NoMatch(have_match_for, websocket_mismatch) 

165 else: 

166 raise RequestPath(f"{path}") 

167 elif rv is not None: 

168 rule, values = rv 

169 

170 result = {} 

171 for name, value in zip(rule._converters.keys(), values): 

172 try: 

173 value = rule._converters[name].to_python(value) 

174 except ValidationError: 

175 raise NoMatch(have_match_for, websocket_mismatch) from None 

176 result[str(name)] = value 

177 if rule.defaults: 

178 result.update(rule.defaults) 

179 

180 if rule.alias and rule.map.redirect_defaults: 

181 raise RequestAliasRedirect(result, rule.endpoint) 

182 

183 return rule, result 

184 

185 raise NoMatch(have_match_for, websocket_mismatch)