1"""
2JSON Schema URI resolution scopes and dereferencing
3
4https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7
5
6Code adapted from https://github.com/Julian/jsonschema
7"""
8
9import contextlib
10import json
11import re
12from urllib import parse as urlparse
13from urllib.parse import unquote
14
15from .exceptions import JsonSchemaDefinitionException
16
17
18def get_id(schema):
19 """
20 Originally ID was `id` and since v7 it's `$id`.
21 """
22 return schema.get('$id', schema.get('id', ''))
23
24
25def resolve_path(schema, fragment):
26 """
27 Return definition from path.
28
29 Path is unescaped according https://tools.ietf.org/html/rfc6901
30 """
31 fragment = fragment.lstrip('/')
32 parts = unquote(fragment).split('/') if fragment else []
33 for part in parts:
34 part = part.replace('~1', '/').replace('~0', '~')
35 if isinstance(schema, list):
36 schema = schema[int(part)]
37 elif part in schema:
38 schema = schema[part]
39 else:
40 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part))
41 return schema
42
43
44def normalize(uri):
45 return urlparse.urlsplit(uri).geturl()
46
47
48def resolve_remote(uri, handlers):
49 """
50 Resolve a remote ``uri``.
51
52 .. note::
53
54 urllib library is used to fetch requests from the remote ``uri``
55 if handlers does notdefine otherwise.
56 """
57 scheme = urlparse.urlsplit(uri).scheme
58 if scheme in handlers:
59 result = handlers[scheme](uri)
60 else:
61 from urllib.request import urlopen
62
63 req = urlopen(uri)
64 encoding = req.info().get_content_charset() or 'utf-8'
65 try:
66 result = json.loads(req.read().decode(encoding),)
67 except ValueError as exc:
68 raise JsonSchemaDefinitionException('{} failed to decode: {}'.format(uri, exc))
69 return result
70
71
72class RefResolver:
73 """
74 Resolve JSON References.
75 """
76
77 # pylint: disable=dangerous-default-value,too-many-arguments
78 def __init__(self, base_uri, schema, store={}, cache=True, handlers={}):
79 """
80 `base_uri` is URI of the referring document from the `schema`.
81 `store` is an dictionary that will be used to cache the fetched schemas
82 (if `cache=True`).
83
84 Please notice that you can have caching problems when compiling schemas
85 with colliding `$ref`. To force overwriting use `cache=False` or
86 explicitly pass the `store` argument (with a brand new dictionary)
87 """
88 self.base_uri = base_uri
89 self.resolution_scope = base_uri
90 self.schema = schema
91 self.store = store
92 self.cache = cache
93 self.handlers = handlers
94 self.walk(schema)
95
96 @classmethod
97 def from_schema(cls, schema, handlers={}, **kwargs):
98 """
99 Construct a resolver from a JSON schema object.
100 """
101 return cls(
102 get_id(schema) if isinstance(schema, dict) else '',
103 schema,
104 handlers=handlers,
105 **kwargs
106 )
107
108 @contextlib.contextmanager
109 def in_scope(self, scope: str):
110 """
111 Context manager to handle current scope.
112 """
113 old_scope = self.resolution_scope
114 self.resolution_scope = urlparse.urljoin(old_scope, scope)
115 try:
116 yield
117 finally:
118 self.resolution_scope = old_scope
119
120 @contextlib.contextmanager
121 def resolving(self, ref: str):
122 """
123 Context manager which resolves a JSON ``ref`` and enters the
124 resolution scope of this ref.
125 """
126 new_uri = urlparse.urljoin(self.resolution_scope, ref)
127 uri, fragment = urlparse.urldefrag(new_uri)
128
129 if uri and normalize(uri) in self.store:
130 schema = self.store[normalize(uri)]
131 elif not uri or uri == self.base_uri:
132 schema = self.schema
133 else:
134 schema = resolve_remote(uri, self.handlers)
135 if self.cache:
136 self.store[normalize(uri)] = schema
137
138 old_base_uri, old_schema = self.base_uri, self.schema
139 self.base_uri, self.schema = uri, schema
140 try:
141 with self.in_scope(uri):
142 yield resolve_path(schema, fragment)
143 finally:
144 self.base_uri, self.schema = old_base_uri, old_schema
145
146 def get_uri(self):
147 return normalize(self.resolution_scope)
148
149 def get_scope_name(self):
150 """
151 Get current scope and return it as a valid function name.
152 """
153 name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '')
154 name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name)
155 name = name.lower().rstrip('_')
156 return name
157
158 def walk(self, node: dict):
159 """
160 Walk thru schema and dereferencing ``id`` and ``$ref`` instances
161 """
162 if isinstance(node, bool):
163 pass
164 elif '$ref' in node and isinstance(node['$ref'], str):
165 ref = node['$ref']
166 node['$ref'] = urlparse.urljoin(self.resolution_scope, ref)
167 elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str):
168 with self.in_scope(get_id(node)):
169 self.store[normalize(self.resolution_scope)] = node
170 for _, item in node.items():
171 if isinstance(item, dict):
172 self.walk(item)
173 else:
174 for _, item in node.items():
175 if isinstance(item, dict):
176 self.walk(item)