Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastjsonschema/ref_resolver.py: 24%
91 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""
2JSON Schema URI resolution scopes and dereferencing
4https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7
6Code adapted from https://github.com/Julian/jsonschema
7"""
9import contextlib
10import json
11import re
12from urllib import parse as urlparse
13from urllib.parse import unquote
14from urllib.request import urlopen
16from .exceptions import JsonSchemaDefinitionException
19def get_id(schema):
20 """
21 Originally ID was `id` and since v7 it's `$id`.
22 """
23 return schema.get('$id', schema.get('id', ''))
26def resolve_path(schema, fragment):
27 """
28 Return definition from path.
30 Path is unescaped according https://tools.ietf.org/html/rfc6901
31 """
32 fragment = fragment.lstrip('/')
33 parts = unquote(fragment).split('/') if fragment else []
34 for part in parts:
35 part = part.replace('~1', '/').replace('~0', '~')
36 if isinstance(schema, list):
37 schema = schema[int(part)]
38 elif part in schema:
39 schema = schema[part]
40 else:
41 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part))
42 return schema
45def normalize(uri):
46 return urlparse.urlsplit(uri).geturl()
49def resolve_remote(uri, handlers):
50 """
51 Resolve a remote ``uri``.
53 .. note::
55 urllib library is used to fetch requests from the remote ``uri``
56 if handlers does notdefine otherwise.
57 """
58 scheme = urlparse.urlsplit(uri).scheme
59 if scheme in handlers:
60 result = handlers[scheme](uri)
61 else:
62 req = urlopen(uri)
63 encoding = req.info().get_content_charset() or 'utf-8'
64 try:
65 result = json.loads(req.read().decode(encoding),)
66 except ValueError as exc:
67 raise JsonSchemaDefinitionException('{} failed to decode: {}'.format(uri, exc))
68 return result
71class RefResolver:
72 """
73 Resolve JSON References.
74 """
76 # pylint: disable=dangerous-default-value,too-many-arguments
77 def __init__(self, base_uri, schema, store={}, cache=True, handlers={}):
78 """
79 `base_uri` is URI of the referring document from the `schema`.
80 `store` is an dictionary that will be used to cache the fetched schemas
81 (if `cache=True`).
83 Please notice that you can have caching problems when compiling schemas
84 with colliding `$ref`. To force overwriting use `cache=False` or
85 explicitly pass the `store` argument (with a brand new dictionary)
86 """
87 self.base_uri = base_uri
88 self.resolution_scope = base_uri
89 self.schema = schema
90 self.store = store
91 self.cache = cache
92 self.handlers = handlers
93 self.walk(schema)
95 @classmethod
96 def from_schema(cls, schema, handlers={}, **kwargs):
97 """
98 Construct a resolver from a JSON schema object.
99 """
100 return cls(
101 get_id(schema) if isinstance(schema, dict) else '',
102 schema,
103 handlers=handlers,
104 **kwargs
105 )
107 @contextlib.contextmanager
108 def in_scope(self, scope: str):
109 """
110 Context manager to handle current scope.
111 """
112 old_scope = self.resolution_scope
113 self.resolution_scope = urlparse.urljoin(old_scope, scope)
114 try:
115 yield
116 finally:
117 self.resolution_scope = old_scope
119 @contextlib.contextmanager
120 def resolving(self, ref: str):
121 """
122 Context manager which resolves a JSON ``ref`` and enters the
123 resolution scope of this ref.
124 """
125 new_uri = urlparse.urljoin(self.resolution_scope, ref)
126 uri, fragment = urlparse.urldefrag(new_uri)
128 if uri and normalize(uri) in self.store:
129 schema = self.store[normalize(uri)]
130 elif not uri or uri == self.base_uri:
131 schema = self.schema
132 else:
133 schema = resolve_remote(uri, self.handlers)
134 if self.cache:
135 self.store[normalize(uri)] = schema
137 old_base_uri, old_schema = self.base_uri, self.schema
138 self.base_uri, self.schema = uri, schema
139 try:
140 with self.in_scope(uri):
141 yield resolve_path(schema, fragment)
142 finally:
143 self.base_uri, self.schema = old_base_uri, old_schema
145 def get_uri(self):
146 return normalize(self.resolution_scope)
148 def get_scope_name(self):
149 """
150 Get current scope and return it as a valid function name.
151 """
152 name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '')
153 name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name)
154 name = name.lower().rstrip('_')
155 return name
157 def walk(self, node: dict):
158 """
159 Walk thru schema and dereferencing ``id`` and ``$ref`` instances
160 """
161 if isinstance(node, bool):
162 pass
163 elif '$ref' in node and isinstance(node['$ref'], str):
164 ref = node['$ref']
165 node['$ref'] = urlparse.urljoin(self.resolution_scope, ref)
166 elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str):
167 with self.in_scope(get_id(node)):
168 self.store[normalize(self.resolution_scope)] = node
169 for _, item in node.items():
170 if isinstance(item, dict):
171 self.walk(item)
172 else:
173 for _, item in node.items():
174 if isinstance(item, dict):
175 self.walk(item)