Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fastjsonschema/ref_resolver.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1""" 

2JSON Schema URI resolution scopes and dereferencing 

3 

4https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7 

5 

6Code adapted from https://github.com/Julian/jsonschema 

7""" 

8 

9import contextlib 

10import json 

11import re 

12from urllib import parse as urlparse 

13from urllib.parse import unquote 

14 

15from .exceptions import JsonSchemaDefinitionException 

16 

17 

18def get_id(schema): 

19 """ 

20 Originally ID was `id` and since v7 it's `$id`. 

21 """ 

22 return schema.get('$id', schema.get('id', '')) 

23 

24 

25def resolve_path(schema, fragment): 

26 """ 

27 Return definition from path. 

28 

29 Path is unescaped according https://tools.ietf.org/html/rfc6901 

30 """ 

31 fragment = fragment.lstrip('/') 

32 parts = unquote(fragment).split('/') if fragment else [] 

33 for part in parts: 

34 part = part.replace('~1', '/').replace('~0', '~') 

35 if isinstance(schema, list): 

36 schema = schema[int(part)] 

37 elif part in schema: 

38 schema = schema[part] 

39 else: 

40 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part)) 

41 return schema 

42 

43 

44def normalize(uri): 

45 return urlparse.urlsplit(uri).geturl() 

46 

47 

48def resolve_remote(uri, handlers): 

49 """ 

50 Resolve a remote ``uri``. 

51 

52 .. note:: 

53 

54 urllib library is used to fetch requests from the remote ``uri`` 

55 if handlers does notdefine otherwise. 

56 """ 

57 scheme = urlparse.urlsplit(uri).scheme 

58 if scheme in handlers: 

59 result = handlers[scheme](uri) 

60 else: 

61 from urllib.request import urlopen 

62 

63 req = urlopen(uri) 

64 encoding = req.info().get_content_charset() or 'utf-8' 

65 try: 

66 result = json.loads(req.read().decode(encoding),) 

67 except ValueError as exc: 

68 raise JsonSchemaDefinitionException('{} failed to decode: {}'.format(uri, exc)) 

69 finally: 

70 req.close() 

71 return result 

72 

73 

74class RefResolver: 

75 """ 

76 Resolve JSON References. 

77 """ 

78 

79 # pylint: disable=dangerous-default-value,too-many-arguments 

80 def __init__(self, base_uri, schema, store={}, cache=True, handlers={}): 

81 """ 

82 `base_uri` is URI of the referring document from the `schema`. 

83 `store` is an dictionary that will be used to cache the fetched schemas 

84 (if `cache=True`). 

85 

86 Please notice that you can have caching problems when compiling schemas 

87 with colliding `$ref`. To force overwriting use `cache=False` or 

88 explicitly pass the `store` argument (with a brand new dictionary) 

89 """ 

90 self.base_uri = base_uri 

91 self.resolution_scope = base_uri 

92 self.schema = schema 

93 self.store = store 

94 self.cache = cache 

95 self.handlers = handlers 

96 self.walk(schema) 

97 

98 @classmethod 

99 def from_schema(cls, schema, handlers={}, **kwargs): 

100 """ 

101 Construct a resolver from a JSON schema object. 

102 """ 

103 return cls( 

104 get_id(schema) if isinstance(schema, dict) else '', 

105 schema, 

106 handlers=handlers, 

107 **kwargs 

108 ) 

109 

110 @contextlib.contextmanager 

111 def in_scope(self, scope: str): 

112 """ 

113 Context manager to handle current scope. 

114 """ 

115 old_scope = self.resolution_scope 

116 self.resolution_scope = urlparse.urljoin(old_scope, scope) 

117 try: 

118 yield 

119 finally: 

120 self.resolution_scope = old_scope 

121 

122 @contextlib.contextmanager 

123 def resolving(self, ref: str): 

124 """ 

125 Context manager which resolves a JSON ``ref`` and enters the 

126 resolution scope of this ref. 

127 """ 

128 new_uri = urlparse.urljoin(self.resolution_scope, ref) 

129 uri, fragment = urlparse.urldefrag(new_uri) 

130 

131 if uri and normalize(uri) in self.store: 

132 schema = self.store[normalize(uri)] 

133 elif not uri or uri == self.base_uri: 

134 schema = self.schema 

135 else: 

136 schema = resolve_remote(uri, self.handlers) 

137 if self.cache: 

138 self.store[normalize(uri)] = schema 

139 

140 old_base_uri, old_schema = self.base_uri, self.schema 

141 self.base_uri, self.schema = uri, schema 

142 try: 

143 with self.in_scope(uri): 

144 yield resolve_path(schema, fragment) 

145 finally: 

146 self.base_uri, self.schema = old_base_uri, old_schema 

147 

148 def get_uri(self): 

149 return normalize(self.resolution_scope) 

150 

151 def get_scope_name(self): 

152 """ 

153 Get current scope and return it as a valid function name. 

154 """ 

155 name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '') 

156 name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name) 

157 name = name.lower().rstrip('_') 

158 return name 

159 

160 def walk(self, node: dict): 

161 """ 

162 Walk thru schema and dereferencing ``id`` and ``$ref`` instances 

163 """ 

164 if isinstance(node, bool): 

165 pass 

166 elif '$ref' in node and isinstance(node['$ref'], str): 

167 ref = node['$ref'] 

168 node['$ref'] = urlparse.urljoin(self.resolution_scope, ref) 

169 elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str): 

170 with self.in_scope(get_id(node)): 

171 self.store[normalize(self.resolution_scope)] = node 

172 for _, item in node.items(): 

173 if isinstance(item, dict): 

174 self.walk(item) 

175 else: 

176 for _, item in node.items(): 

177 if isinstance(item, dict): 

178 self.walk(item)