Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fastjsonschema/ref_resolver.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

113 statements  

1# pylint: disable=import-outside-toplevel 

2 

3""" 

4JSON Schema URI resolution scopes and dereferencing 

5 

6https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7 

7 

8Code adapted from https://github.com/Julian/jsonschema 

9""" 

10 

11import contextlib 

12import json 

13import re 

14import sys 

15from urllib import parse as urlparse 

16from urllib.parse import unquote 

17 

18from .exceptions import JsonSchemaDefinitionException 

19 

20MAX_SCHEMA_WALK_DEPTH = min(500, sys.getrecursionlimit() // 2) 

21 

22 

23def get_id(schema): 

24 """ 

25 Originally ID was `id` and since v7 it's `$id`. 

26 """ 

27 return schema.get('$id', schema.get('id', '')) 

28 

29 

30def resolve_path(schema, fragment): 

31 """ 

32 Return definition from path. 

33 

34 Path is unescaped according https://tools.ietf.org/html/rfc6901 

35 """ 

36 fragment = fragment.lstrip('/') 

37 parts = unquote(fragment).split('/') if fragment else [] 

38 for part in parts: 

39 part = part.replace('~1', '/').replace('~0', '~') 

40 if isinstance(schema, list): 

41 schema = schema[int(part)] 

42 elif part in schema: 

43 schema = schema[part] 

44 else: 

45 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part)) 

46 return schema 

47 

48 

49def normalize(uri): 

50 return urlparse.urlsplit(uri).geturl() 

51 

52 

53def resolve_remote(uri, handlers): 

54 """ 

55 Resolve a remote ``uri``. 

56 

57 .. note:: 

58 

59 urllib library is used to fetch requests from the remote ``uri`` 

60 if handlers does notdefine otherwise. 

61 """ 

62 scheme = urlparse.urlsplit(uri).scheme 

63 if scheme in handlers: 

64 result = handlers[scheme](uri) 

65 else: 

66 from urllib.request import urlopen 

67 

68 with urlopen(uri) as response: 

69 encoding = response.info().get_content_charset() or 'utf-8' 

70 try: 

71 result = json.loads(response.read().decode(encoding),) 

72 except ValueError as exc: 

73 raise JsonSchemaDefinitionException('{} failed to decode'.format(uri)) from exc 

74 return result 

75 

76 

77class RefResolver: 

78 """ 

79 Resolve JSON References. 

80 """ 

81 

82 # pylint: disable=dangerous-default-value,too-many-arguments 

83 def __init__(self, base_uri, schema, store={}, cache=True, handlers={}): 

84 """ 

85 `base_uri` is URI of the referring document from the `schema`. 

86 `store` is an dictionary that will be used to cache the fetched schemas 

87 (if `cache=True`). 

88 

89 Please notice that you can have caching problems when compiling schemas 

90 with colliding `$ref`. To force overwriting use `cache=False` or 

91 explicitly pass the `store` argument (with a brand new dictionary) 

92 """ 

93 self.base_uri = base_uri 

94 self.resolution_scope = base_uri 

95 self.schema = schema 

96 self.store = store 

97 self.cache = cache 

98 self.handlers = handlers 

99 self._walked_uris = set() 

100 self.walk(schema) 

101 self._walked_uris.add(normalize(base_uri) if base_uri else '') 

102 

103 @classmethod 

104 def from_schema(cls, schema, handlers={}, **kwargs): 

105 """ 

106 Construct a resolver from a JSON schema object. 

107 """ 

108 return cls( 

109 get_id(schema) if isinstance(schema, dict) else '', 

110 schema, 

111 handlers=handlers, 

112 **kwargs 

113 ) 

114 

115 @contextlib.contextmanager 

116 def in_scope(self, scope: str): 

117 """ 

118 Context manager to handle current scope. 

119 """ 

120 old_scope = self.resolution_scope 

121 self.resolution_scope = urlparse.urljoin(old_scope, scope) 

122 try: 

123 yield 

124 finally: 

125 self.resolution_scope = old_scope 

126 

127 @contextlib.contextmanager 

128 def resolving(self, ref: str): 

129 """ 

130 Context manager which resolves a JSON ``ref`` and enters the 

131 resolution scope of this ref. 

132 """ 

133 new_uri = urlparse.urljoin(self.resolution_scope, ref) 

134 uri, fragment = urlparse.urldefrag(new_uri) 

135 

136 document_uri = uri or self.base_uri 

137 

138 if uri and normalize(uri) in self.store: 

139 schema = self.store[normalize(uri)] 

140 elif not uri or uri == self.base_uri: 

141 schema = self.schema 

142 else: 

143 schema = resolve_remote(uri, self.handlers) 

144 if self.cache: 

145 self.store[normalize(uri)] = schema 

146 

147 old_base_uri, old_schema = self.base_uri, self.schema 

148 self.base_uri, self.schema = document_uri, schema 

149 try: 

150 with self.in_scope(document_uri): 

151 self._ensure_walked(document_uri, schema) 

152 if fragment and not fragment.startswith('/'): 

153 plain_name = normalize(urlparse.urljoin(document_uri, '#' + fragment)) 

154 if plain_name in self.store: 

155 yield self.store[plain_name] 

156 return 

157 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(fragment)) 

158 yield resolve_path(schema, fragment) 

159 finally: 

160 self.base_uri, self.schema = old_base_uri, old_schema 

161 

162 def _ensure_walked(self, uri, schema): 

163 normalized = normalize(uri) if uri else '' 

164 if normalized in self._walked_uris: 

165 return 

166 self.walk(schema) 

167 self._walked_uris.add(normalized) 

168 

169 def get_uri(self): 

170 return normalize(self.resolution_scope) 

171 

172 def get_scope_name(self): 

173 """ 

174 Get current scope and return it as a valid function name. 

175 """ 

176 name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '') 

177 name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name) 

178 name = name.lower().rstrip('_') 

179 return name 

180 

181 def walk(self, node: dict, depth=0): 

182 """ 

183 Walk thru schema and dereferencing ``id`` and ``$ref`` instances 

184 """ 

185 if depth >= MAX_SCHEMA_WALK_DEPTH: 

186 raise JsonSchemaDefinitionException( 

187 'Schema is too deeply nested (maximum depth is {})'.format(MAX_SCHEMA_WALK_DEPTH) 

188 ) 

189 

190 if isinstance(node, bool): 

191 pass 

192 elif '$ref' in node and isinstance(node['$ref'], str): 

193 ref = node['$ref'] 

194 node['$ref'] = urlparse.urljoin(self.resolution_scope, ref) 

195 elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str): 

196 with self.in_scope(get_id(node)): 

197 self.store[normalize(self.resolution_scope)] = node 

198 for _, item in node.items(): 

199 if isinstance(item, dict): 

200 self.walk(item, depth + 1) 

201 else: 

202 for _, item in node.items(): 

203 if isinstance(item, dict): 

204 self.walk(item, depth + 1)