1"""
2JSON Schema URI resolution scopes and dereferencing
3
4https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7
5
6Code adapted from https://github.com/Julian/jsonschema
7"""
8
9import contextlib
10import json
11import re
12from urllib import parse as urlparse
13from urllib.parse import unquote
14
15from .exceptions import JsonSchemaDefinitionException
16
17
18def get_id(schema):
19 """
20 Originally ID was `id` and since v7 it's `$id`.
21 """
22 return schema.get('$id', schema.get('id', ''))
23
24
25def resolve_path(schema, fragment):
26 """
27 Return definition from path.
28
29 Path is unescaped according https://tools.ietf.org/html/rfc6901
30 """
31 fragment = fragment.lstrip('/')
32 parts = unquote(fragment).split('/') if fragment else []
33 for part in parts:
34 part = part.replace('~1', '/').replace('~0', '~')
35 if isinstance(schema, list):
36 schema = schema[int(part)]
37 elif part in schema:
38 schema = schema[part]
39 else:
40 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part))
41 return schema
42
43
44def normalize(uri):
45 return urlparse.urlsplit(uri).geturl()
46
47
48def resolve_remote(uri, handlers):
49 """
50 Resolve a remote ``uri``.
51
52 .. note::
53
54 urllib library is used to fetch requests from the remote ``uri``
55 if handlers does notdefine otherwise.
56 """
57 scheme = urlparse.urlsplit(uri).scheme
58 if scheme in handlers:
59 result = handlers[scheme](uri)
60 else:
61 from urllib.request import urlopen
62
63 req = urlopen(uri)
64 encoding = req.info().get_content_charset() or 'utf-8'
65 try:
66 result = json.loads(req.read().decode(encoding),)
67 except ValueError as exc:
68 raise JsonSchemaDefinitionException('{} failed to decode: {}'.format(uri, exc))
69 finally:
70 req.close()
71 return result
72
73
74class RefResolver:
75 """
76 Resolve JSON References.
77 """
78
79 # pylint: disable=dangerous-default-value,too-many-arguments
80 def __init__(self, base_uri, schema, store={}, cache=True, handlers={}):
81 """
82 `base_uri` is URI of the referring document from the `schema`.
83 `store` is an dictionary that will be used to cache the fetched schemas
84 (if `cache=True`).
85
86 Please notice that you can have caching problems when compiling schemas
87 with colliding `$ref`. To force overwriting use `cache=False` or
88 explicitly pass the `store` argument (with a brand new dictionary)
89 """
90 self.base_uri = base_uri
91 self.resolution_scope = base_uri
92 self.schema = schema
93 self.store = store
94 self.cache = cache
95 self.handlers = handlers
96 self.walk(schema)
97
98 @classmethod
99 def from_schema(cls, schema, handlers={}, **kwargs):
100 """
101 Construct a resolver from a JSON schema object.
102 """
103 return cls(
104 get_id(schema) if isinstance(schema, dict) else '',
105 schema,
106 handlers=handlers,
107 **kwargs
108 )
109
110 @contextlib.contextmanager
111 def in_scope(self, scope: str):
112 """
113 Context manager to handle current scope.
114 """
115 old_scope = self.resolution_scope
116 self.resolution_scope = urlparse.urljoin(old_scope, scope)
117 try:
118 yield
119 finally:
120 self.resolution_scope = old_scope
121
122 @contextlib.contextmanager
123 def resolving(self, ref: str):
124 """
125 Context manager which resolves a JSON ``ref`` and enters the
126 resolution scope of this ref.
127 """
128 new_uri = urlparse.urljoin(self.resolution_scope, ref)
129 uri, fragment = urlparse.urldefrag(new_uri)
130
131 if uri and normalize(uri) in self.store:
132 schema = self.store[normalize(uri)]
133 elif not uri or uri == self.base_uri:
134 schema = self.schema
135 else:
136 schema = resolve_remote(uri, self.handlers)
137 if self.cache:
138 self.store[normalize(uri)] = schema
139
140 old_base_uri, old_schema = self.base_uri, self.schema
141 self.base_uri, self.schema = uri, schema
142 try:
143 with self.in_scope(uri):
144 yield resolve_path(schema, fragment)
145 finally:
146 self.base_uri, self.schema = old_base_uri, old_schema
147
148 def get_uri(self):
149 return normalize(self.resolution_scope)
150
151 def get_scope_name(self):
152 """
153 Get current scope and return it as a valid function name.
154 """
155 name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '')
156 name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name)
157 name = name.lower().rstrip('_')
158 return name
159
160 def walk(self, node: dict):
161 """
162 Walk thru schema and dereferencing ``id`` and ``$ref`` instances
163 """
164 if isinstance(node, bool):
165 pass
166 elif '$ref' in node and isinstance(node['$ref'], str):
167 ref = node['$ref']
168 node['$ref'] = urlparse.urljoin(self.resolution_scope, ref)
169 elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str):
170 with self.in_scope(get_id(node)):
171 self.store[normalize(self.resolution_scope)] = node
172 for _, item in node.items():
173 if isinstance(item, dict):
174 self.walk(item)
175 else:
176 for _, item in node.items():
177 if isinstance(item, dict):
178 self.walk(item)