Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastjsonschema/ref_resolver.py: 24%

91 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1""" 

2JSON Schema URI resolution scopes and dereferencing 

3 

4https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7 

5 

6Code adapted from https://github.com/Julian/jsonschema 

7""" 

8 

9import contextlib 

10import json 

11import re 

12from urllib import parse as urlparse 

13from urllib.parse import unquote 

14from urllib.request import urlopen 

15 

16from .exceptions import JsonSchemaDefinitionException 

17 

18 

19def get_id(schema): 

20 """ 

21 Originally ID was `id` and since v7 it's `$id`. 

22 """ 

23 return schema.get('$id', schema.get('id', '')) 

24 

25 

26def resolve_path(schema, fragment): 

27 """ 

28 Return definition from path. 

29 

30 Path is unescaped according https://tools.ietf.org/html/rfc6901 

31 """ 

32 fragment = fragment.lstrip('/') 

33 parts = unquote(fragment).split('/') if fragment else [] 

34 for part in parts: 

35 part = part.replace('~1', '/').replace('~0', '~') 

36 if isinstance(schema, list): 

37 schema = schema[int(part)] 

38 elif part in schema: 

39 schema = schema[part] 

40 else: 

41 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part)) 

42 return schema 

43 

44 

45def normalize(uri): 

46 return urlparse.urlsplit(uri).geturl() 

47 

48 

49def resolve_remote(uri, handlers): 

50 """ 

51 Resolve a remote ``uri``. 

52 

53 .. note:: 

54 

55 urllib library is used to fetch requests from the remote ``uri`` 

56 if handlers does notdefine otherwise. 

57 """ 

58 scheme = urlparse.urlsplit(uri).scheme 

59 if scheme in handlers: 

60 result = handlers[scheme](uri) 

61 else: 

62 req = urlopen(uri) 

63 encoding = req.info().get_content_charset() or 'utf-8' 

64 try: 

65 result = json.loads(req.read().decode(encoding),) 

66 except ValueError as exc: 

67 raise JsonSchemaDefinitionException('{} failed to decode: {}'.format(uri, exc)) 

68 return result 

69 

70 

71class RefResolver: 

72 """ 

73 Resolve JSON References. 

74 """ 

75 

76 # pylint: disable=dangerous-default-value,too-many-arguments 

77 def __init__(self, base_uri, schema, store={}, cache=True, handlers={}): 

78 """ 

79 `base_uri` is URI of the referring document from the `schema`. 

80 `store` is an dictionary that will be used to cache the fetched schemas 

81 (if `cache=True`). 

82 

83 Please notice that you can have caching problems when compiling schemas 

84 with colliding `$ref`. To force overwriting use `cache=False` or 

85 explicitly pass the `store` argument (with a brand new dictionary) 

86 """ 

87 self.base_uri = base_uri 

88 self.resolution_scope = base_uri 

89 self.schema = schema 

90 self.store = store 

91 self.cache = cache 

92 self.handlers = handlers 

93 self.walk(schema) 

94 

95 @classmethod 

96 def from_schema(cls, schema, handlers={}, **kwargs): 

97 """ 

98 Construct a resolver from a JSON schema object. 

99 """ 

100 return cls( 

101 get_id(schema) if isinstance(schema, dict) else '', 

102 schema, 

103 handlers=handlers, 

104 **kwargs 

105 ) 

106 

107 @contextlib.contextmanager 

108 def in_scope(self, scope: str): 

109 """ 

110 Context manager to handle current scope. 

111 """ 

112 old_scope = self.resolution_scope 

113 self.resolution_scope = urlparse.urljoin(old_scope, scope) 

114 try: 

115 yield 

116 finally: 

117 self.resolution_scope = old_scope 

118 

119 @contextlib.contextmanager 

120 def resolving(self, ref: str): 

121 """ 

122 Context manager which resolves a JSON ``ref`` and enters the 

123 resolution scope of this ref. 

124 """ 

125 new_uri = urlparse.urljoin(self.resolution_scope, ref) 

126 uri, fragment = urlparse.urldefrag(new_uri) 

127 

128 if uri and normalize(uri) in self.store: 

129 schema = self.store[normalize(uri)] 

130 elif not uri or uri == self.base_uri: 

131 schema = self.schema 

132 else: 

133 schema = resolve_remote(uri, self.handlers) 

134 if self.cache: 

135 self.store[normalize(uri)] = schema 

136 

137 old_base_uri, old_schema = self.base_uri, self.schema 

138 self.base_uri, self.schema = uri, schema 

139 try: 

140 with self.in_scope(uri): 

141 yield resolve_path(schema, fragment) 

142 finally: 

143 self.base_uri, self.schema = old_base_uri, old_schema 

144 

145 def get_uri(self): 

146 return normalize(self.resolution_scope) 

147 

148 def get_scope_name(self): 

149 """ 

150 Get current scope and return it as a valid function name. 

151 """ 

152 name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '') 

153 name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name) 

154 name = name.lower().rstrip('_') 

155 return name 

156 

157 def walk(self, node: dict): 

158 """ 

159 Walk thru schema and dereferencing ``id`` and ``$ref`` instances 

160 """ 

161 if isinstance(node, bool): 

162 pass 

163 elif '$ref' in node and isinstance(node['$ref'], str): 

164 ref = node['$ref'] 

165 node['$ref'] = urlparse.urljoin(self.resolution_scope, ref) 

166 elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str): 

167 with self.in_scope(get_id(node)): 

168 self.store[normalize(self.resolution_scope)] = node 

169 for _, item in node.items(): 

170 if isinstance(item, dict): 

171 self.walk(item) 

172 else: 

173 for _, item in node.items(): 

174 if isinstance(item, dict): 

175 self.walk(item)