1# pylint: disable=import-outside-toplevel
2
3"""
4JSON Schema URI resolution scopes and dereferencing
5
6https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7
7
8Code adapted from https://github.com/Julian/jsonschema
9"""
10
11import contextlib
12import json
13import re
14import sys
15from urllib import parse as urlparse
16from urllib.parse import unquote
17
18from .exceptions import JsonSchemaDefinitionException
19
20MAX_SCHEMA_WALK_DEPTH = min(500, sys.getrecursionlimit() // 2)
21
22
23def get_id(schema):
24 """
25 Originally ID was `id` and since v7 it's `$id`.
26 """
27 return schema.get('$id', schema.get('id', ''))
28
29
30def resolve_path(schema, fragment):
31 """
32 Return definition from path.
33
34 Path is unescaped according https://tools.ietf.org/html/rfc6901
35 """
36 fragment = fragment.lstrip('/')
37 parts = unquote(fragment).split('/') if fragment else []
38 for part in parts:
39 part = part.replace('~1', '/').replace('~0', '~')
40 if isinstance(schema, list):
41 schema = schema[int(part)]
42 elif part in schema:
43 schema = schema[part]
44 else:
45 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part))
46 return schema
47
48
49def normalize(uri):
50 return urlparse.urlsplit(uri).geturl()
51
52
53def resolve_remote(uri, handlers):
54 """
55 Resolve a remote ``uri``.
56
57 .. note::
58
59 urllib library is used to fetch requests from the remote ``uri``
60 if handlers does notdefine otherwise.
61 """
62 scheme = urlparse.urlsplit(uri).scheme
63 if scheme in handlers:
64 result = handlers[scheme](uri)
65 else:
66 from urllib.request import urlopen
67
68 with urlopen(uri) as response:
69 encoding = response.info().get_content_charset() or 'utf-8'
70 try:
71 result = json.loads(response.read().decode(encoding),)
72 except ValueError as exc:
73 raise JsonSchemaDefinitionException('{} failed to decode'.format(uri)) from exc
74 return result
75
76
77class RefResolver:
78 """
79 Resolve JSON References.
80 """
81
82 # pylint: disable=dangerous-default-value,too-many-arguments
83 def __init__(self, base_uri, schema, store={}, cache=True, handlers={}):
84 """
85 `base_uri` is URI of the referring document from the `schema`.
86 `store` is an dictionary that will be used to cache the fetched schemas
87 (if `cache=True`).
88
89 Please notice that you can have caching problems when compiling schemas
90 with colliding `$ref`. To force overwriting use `cache=False` or
91 explicitly pass the `store` argument (with a brand new dictionary)
92 """
93 self.base_uri = base_uri
94 self.resolution_scope = base_uri
95 self.schema = schema
96 self.store = store
97 self.cache = cache
98 self.handlers = handlers
99 self._walked_uris = set()
100 self.walk(schema)
101 self._walked_uris.add(normalize(base_uri) if base_uri else '')
102
103 @classmethod
104 def from_schema(cls, schema, handlers={}, **kwargs):
105 """
106 Construct a resolver from a JSON schema object.
107 """
108 return cls(
109 get_id(schema) if isinstance(schema, dict) else '',
110 schema,
111 handlers=handlers,
112 **kwargs
113 )
114
115 @contextlib.contextmanager
116 def in_scope(self, scope: str):
117 """
118 Context manager to handle current scope.
119 """
120 old_scope = self.resolution_scope
121 self.resolution_scope = urlparse.urljoin(old_scope, scope)
122 try:
123 yield
124 finally:
125 self.resolution_scope = old_scope
126
127 @contextlib.contextmanager
128 def resolving(self, ref: str):
129 """
130 Context manager which resolves a JSON ``ref`` and enters the
131 resolution scope of this ref.
132 """
133 new_uri = urlparse.urljoin(self.resolution_scope, ref)
134 uri, fragment = urlparse.urldefrag(new_uri)
135
136 document_uri = uri or self.base_uri
137
138 if uri and normalize(uri) in self.store:
139 schema = self.store[normalize(uri)]
140 elif not uri or uri == self.base_uri:
141 schema = self.schema
142 else:
143 schema = resolve_remote(uri, self.handlers)
144 if self.cache:
145 self.store[normalize(uri)] = schema
146
147 old_base_uri, old_schema = self.base_uri, self.schema
148 self.base_uri, self.schema = document_uri, schema
149 try:
150 with self.in_scope(document_uri):
151 self._ensure_walked(document_uri, schema)
152 if fragment and not fragment.startswith('/'):
153 plain_name = normalize(urlparse.urljoin(document_uri, '#' + fragment))
154 if plain_name in self.store:
155 yield self.store[plain_name]
156 return
157 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(fragment))
158 yield resolve_path(schema, fragment)
159 finally:
160 self.base_uri, self.schema = old_base_uri, old_schema
161
162 def _ensure_walked(self, uri, schema):
163 normalized = normalize(uri) if uri else ''
164 if normalized in self._walked_uris:
165 return
166 self.walk(schema)
167 self._walked_uris.add(normalized)
168
169 def get_uri(self):
170 return normalize(self.resolution_scope)
171
172 def get_scope_name(self):
173 """
174 Get current scope and return it as a valid function name.
175 """
176 name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '')
177 name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name)
178 name = name.lower().rstrip('_')
179 return name
180
181 def walk(self, node: dict, depth=0):
182 """
183 Walk thru schema and dereferencing ``id`` and ``$ref`` instances
184 """
185 if depth >= MAX_SCHEMA_WALK_DEPTH:
186 raise JsonSchemaDefinitionException(
187 'Schema is too deeply nested (maximum depth is {})'.format(MAX_SCHEMA_WALK_DEPTH)
188 )
189
190 if isinstance(node, bool):
191 pass
192 elif '$ref' in node and isinstance(node['$ref'], str):
193 ref = node['$ref']
194 node['$ref'] = urlparse.urljoin(self.resolution_scope, ref)
195 elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str):
196 with self.in_scope(get_id(node)):
197 self.store[normalize(self.resolution_scope)] = node
198 for _, item in node.items():
199 if isinstance(item, dict):
200 self.walk(item, depth + 1)
201 else:
202 for _, item in node.items():
203 if isinstance(item, dict):
204 self.walk(item, depth + 1)