Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastjsonschema/ref_resolver.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1""" 

2JSON Schema URI resolution scopes and dereferencing 

3 

4https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7 

5 

6Code adapted from https://github.com/Julian/jsonschema 

7""" 

8 

9import contextlib 

10import json 

11import re 

12from urllib import parse as urlparse 

13from urllib.parse import unquote 

14 

15from .exceptions import JsonSchemaDefinitionException 

16 

17 

18def get_id(schema): 

19 """ 

20 Originally ID was `id` and since v7 it's `$id`. 

21 """ 

22 return schema.get('$id', schema.get('id', '')) 

23 

24 

25def resolve_path(schema, fragment): 

26 """ 

27 Return definition from path. 

28 

29 Path is unescaped according https://tools.ietf.org/html/rfc6901 

30 """ 

31 fragment = fragment.lstrip('/') 

32 parts = unquote(fragment).split('/') if fragment else [] 

33 for part in parts: 

34 part = part.replace('~1', '/').replace('~0', '~') 

35 if isinstance(schema, list): 

36 schema = schema[int(part)] 

37 elif part in schema: 

38 schema = schema[part] 

39 else: 

40 raise JsonSchemaDefinitionException('Unresolvable ref: {}'.format(part)) 

41 return schema 

42 

43 

44def normalize(uri): 

45 return urlparse.urlsplit(uri).geturl() 

46 

47 

48def resolve_remote(uri, handlers): 

49 """ 

50 Resolve a remote ``uri``. 

51 

52 .. note:: 

53 

54 urllib library is used to fetch requests from the remote ``uri`` 

55 if handlers does notdefine otherwise. 

56 """ 

57 scheme = urlparse.urlsplit(uri).scheme 

58 if scheme in handlers: 

59 result = handlers[scheme](uri) 

60 else: 

61 from urllib.request import urlopen 

62 

63 req = urlopen(uri) 

64 encoding = req.info().get_content_charset() or 'utf-8' 

65 try: 

66 result = json.loads(req.read().decode(encoding),) 

67 except ValueError as exc: 

68 raise JsonSchemaDefinitionException('{} failed to decode: {}'.format(uri, exc)) 

69 return result 

70 

71 

72class RefResolver: 

73 """ 

74 Resolve JSON References. 

75 """ 

76 

77 # pylint: disable=dangerous-default-value,too-many-arguments 

78 def __init__(self, base_uri, schema, store={}, cache=True, handlers={}): 

79 """ 

80 `base_uri` is URI of the referring document from the `schema`. 

81 `store` is an dictionary that will be used to cache the fetched schemas 

82 (if `cache=True`). 

83 

84 Please notice that you can have caching problems when compiling schemas 

85 with colliding `$ref`. To force overwriting use `cache=False` or 

86 explicitly pass the `store` argument (with a brand new dictionary) 

87 """ 

88 self.base_uri = base_uri 

89 self.resolution_scope = base_uri 

90 self.schema = schema 

91 self.store = store 

92 self.cache = cache 

93 self.handlers = handlers 

94 self.walk(schema) 

95 

96 @classmethod 

97 def from_schema(cls, schema, handlers={}, **kwargs): 

98 """ 

99 Construct a resolver from a JSON schema object. 

100 """ 

101 return cls( 

102 get_id(schema) if isinstance(schema, dict) else '', 

103 schema, 

104 handlers=handlers, 

105 **kwargs 

106 ) 

107 

108 @contextlib.contextmanager 

109 def in_scope(self, scope: str): 

110 """ 

111 Context manager to handle current scope. 

112 """ 

113 old_scope = self.resolution_scope 

114 self.resolution_scope = urlparse.urljoin(old_scope, scope) 

115 try: 

116 yield 

117 finally: 

118 self.resolution_scope = old_scope 

119 

120 @contextlib.contextmanager 

121 def resolving(self, ref: str): 

122 """ 

123 Context manager which resolves a JSON ``ref`` and enters the 

124 resolution scope of this ref. 

125 """ 

126 new_uri = urlparse.urljoin(self.resolution_scope, ref) 

127 uri, fragment = urlparse.urldefrag(new_uri) 

128 

129 if uri and normalize(uri) in self.store: 

130 schema = self.store[normalize(uri)] 

131 elif not uri or uri == self.base_uri: 

132 schema = self.schema 

133 else: 

134 schema = resolve_remote(uri, self.handlers) 

135 if self.cache: 

136 self.store[normalize(uri)] = schema 

137 

138 old_base_uri, old_schema = self.base_uri, self.schema 

139 self.base_uri, self.schema = uri, schema 

140 try: 

141 with self.in_scope(uri): 

142 yield resolve_path(schema, fragment) 

143 finally: 

144 self.base_uri, self.schema = old_base_uri, old_schema 

145 

146 def get_uri(self): 

147 return normalize(self.resolution_scope) 

148 

149 def get_scope_name(self): 

150 """ 

151 Get current scope and return it as a valid function name. 

152 """ 

153 name = 'validate_' + unquote(self.resolution_scope).replace('~1', '_').replace('~0', '_').replace('"', '') 

154 name = re.sub(r'($[^a-zA-Z]|[^a-zA-Z0-9])', '_', name) 

155 name = name.lower().rstrip('_') 

156 return name 

157 

158 def walk(self, node: dict): 

159 """ 

160 Walk thru schema and dereferencing ``id`` and ``$ref`` instances 

161 """ 

162 if isinstance(node, bool): 

163 pass 

164 elif '$ref' in node and isinstance(node['$ref'], str): 

165 ref = node['$ref'] 

166 node['$ref'] = urlparse.urljoin(self.resolution_scope, ref) 

167 elif ('$id' in node or 'id' in node) and isinstance(get_id(node), str): 

168 with self.in_scope(get_id(node)): 

169 self.store[normalize(self.resolution_scope)] = node 

170 for _, item in node.items(): 

171 if isinstance(item, dict): 

172 self.walk(item) 

173 else: 

174 for _, item in node.items(): 

175 if isinstance(item, dict): 

176 self.walk(item)