Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fastjsonschema/ref_resolver.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1# pylint: disable=import-outside-toplevel 

2 

3""" 

4JSON Schema URI resolution scopes and dereferencing 

5 

6https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7 

7 

8Code adapted from https://github.com/Julian/jsonschema 

9""" 

10 

11import contextlib 

12import json 

13import re 

14from urllib import parse as urlparse 

15from urllib.parse import unquote 

16 

17from .exceptions import JsonSchemaDefinitionException 

18 

19 

20def get_id(schema): 

21 """ 

22 Originally ID was `id` and since v7 it's `$id`. 

23 """ 

24 return schema.get('$id', schema.get('id', '')) 

25 

26 

27def resolve_path(schema, fragment): 

28 """ 

29 Return definition from path. 

30 

31 Path is unescaped according https://tools.ietf.org/html/rfc6901 

32 """ 

33 fragment = fragment.lstrip('/') 

34 parts = unquote(fragment).split('/') if fragment else [] 

35 for part in parts: 

36 part = part.replace('~1', '/').replace('~0', '~') 

37 if isinstance(schema, list): 

38 schema = schema[int(part)] 

39 elif part in schema: 

40 schema = schema[part] 

41 else: 

42 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part)) 

43 return schema 

44 

45 

46def normalize(uri): 

47 return urlparse.urlsplit(uri).geturl() 

48 

49 

50def resolve_remote(uri, handlers): 

51 """ 

52 Resolve a remote ``uri``. 

53 

54 .. note:: 

55 

56 urllib library is used to fetch requests from the remote ``uri`` 

57 if handlers does notdefine otherwise. 

58 """ 

59 scheme = urlparse.urlsplit(uri).scheme 

60 if scheme in handlers: 

61 result = handlers[scheme](uri) 

62 else: 

63 from urllib.request import urlopen 

64 

65 with urlopen(uri) as response: 

66 encoding = response.info().get_content_charset() or 'utf-8' 

67 try: 

68 result = json.loads(response.read().decode(encoding),) 

69 except ValueError as exc: 

70 raise JsonSchemaDefinitionException('{} failed to decode'.format(uri)) from exc 

71 return result 

72 

73 

74class RefResolver: 

75 """ 

76 Resolve JSON References. 

77 """ 

78 

79 # pylint: disable=dangerous-default-value,too-many-arguments 

80 def __init__(self, base_uri, schema, store={}, cache=True, handlers={}): 

81 """ 

82 `base_uri` is URI of the referring document from the `schema`. 

83 `store` is an dictionary that will be used to cache the fetched schemas 

84 (if `cache=True`). 

85 

86 Please notice that you can have caching problems when compiling schemas 

87 with colliding `$ref`. To force overwriting use `cache=False` or 

88 explicitly pass the `store` argument (with a brand new dictionary) 

89 """ 

90 self.base_uri = base_uri 

91 self.resolution_scope = base_uri 

92 self.schema = schema 

93 self.store = store 

94 self.cache = cache 

95 self.handlers = handlers 

96 self.walk(schema) 

97 

98 @classmethod 

99 def from_schema(cls, schema, handlers={}, **kwargs): 

100 """ 

101 Construct a resolver from a JSON schema object. 

102 """ 

103 return cls( 

104 get_id(schema) if isinstance(schema, dict) else '', 

105 schema, 

106 handlers=handlers, 

107 **kwargs 

108 ) 

109 

110 @contextlib.contextmanager 

111 def in_scope(self, scope: str): 

112 """ 

113 Context manager to handle current scope. 

114 """ 

115 old_scope = self.resolution_scope 

116 self.resolution_scope = urlparse.urljoin(old_scope, scope) 

117 try: 

118 yield 

119 finally: 

120 self.resolution_scope = old_scope 

121 

122 @contextlib.contextmanager 

123 def resolving(self, ref: str): 

124 """ 

125 Context manager which resolves a JSON ``ref`` and enters the 

126 resolution scope of this ref. 

127 """ 

128 new_uri = urlparse.urljoin(self.resolution_scope, ref) 

129 uri, fragment = urlparse.urldefrag(new_uri) 

130 

131 if uri and normalize(uri) in self.store: 

132 schema = self.store[normalize(uri)] 

133 elif not uri or uri == self.base_uri: 

134 schema = self.schema 

135 else: 

136 schema = resolve_remote(uri, self.handlers) 

137 if self.cache: 

138 self.store[normalize(uri)] = schema 

139 

140 old_base_uri, old_schema = self.base_uri, self.schema 

141 self.base_uri, self.schema = uri, schema 

142 try: 

143 with self.in_scope(uri): 

144 yield resolve_path(schema, fragment) 

145 finally: 

146 self.base_uri, self.schema = old_base_uri, old_schema 

147 

148 def get_uri(self): 

149 return normalize(self.resolution_scope) 

150 

151 def get_scope_name(self): 

152 """ 

153 Get current scope and return it as a valid function name. 

154 """ 

155 name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '') 

156 name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name) 

157 name = name.lower().rstrip('_') 

158 return name 

159 

160 def walk(self, node: dict): 

161 """ 

162 Walk thru schema and dereferencing ``id`` and ``$ref`` instances 

163 """ 

164 if isinstance(node, bool): 

165 pass 

166 elif '$ref' in node and isinstance(node['$ref'], str): 

167 ref = node['$ref'] 

168 node['$ref'] = urlparse.urljoin(self.resolution_scope, ref) 

169 elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str): 

170 with self.in_scope(get_id(node)): 

171 self.store[normalize(self.resolution_scope)] = node 

172 for _, item in node.items(): 

173 if isinstance(item, dict): 

174 self.walk(item) 

175 else: 

176 for _, item in node.items(): 

177 if isinstance(item, dict): 

178 self.walk(item)