Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastjsonschema/generator.py: 93%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

162 statements  

1from collections import OrderedDict 

2from decimal import Decimal 

3import re 

4 

5from .exceptions import JsonSchemaValueException, JsonSchemaDefinitionException 

6from .indent import indent 

7from .ref_resolver import RefResolver 

8 

9 

10def enforce_list(variable): 

11 if isinstance(variable, list): 

12 return variable 

13 return [variable] 

14 

15 

16# pylint: disable=too-many-instance-attributes,too-many-public-methods 

17class CodeGenerator: 

18 """ 

19 This class is not supposed to be used directly. Anything 

20 inside of this class can be changed without noticing. 

21 

22 This class generates code of validation function from JSON 

23 schema object as string. Example: 

24 

25 .. code-block:: python 

26 

27 CodeGenerator(json_schema_definition).func_code 

28 """ 

29 

30 INDENT = 4 # spaces 

31 

32 def __init__(self, definition, resolver=None): 

33 self._code = [] 

34 self._compile_regexps = {} 

35 self._custom_formats = {} 

36 

37 # Any extra library should be here to be imported only once. 

38 # Lines are imports to be printed in the file and objects 

39 # key-value pair to pass to compile function directly. 

40 self._extra_imports_lines = [ 

41 "from decimal import Decimal", 

42 ] 

43 self._extra_imports_objects = { 

44 "Decimal": Decimal, 

45 } 

46 

47 self._variables = set() 

48 self._indent = 0 

49 self._indent_last_line = None 

50 self._variable = None 

51 self._variable_name = None 

52 self._root_definition = definition 

53 self._definition = None 

54 

55 # map schema URIs to validation function names for functions 

56 # that are not yet generated, but need to be generated 

57 self._needed_validation_functions = {} 

58 # validation function names that are already done 

59 self._validation_functions_done = set() 

60 

61 if resolver is None: 

62 resolver = RefResolver.from_schema(definition, store={}) 

63 self._resolver = resolver 

64 

65 # add main function to `self._needed_validation_functions` 

66 self._needed_validation_functions[self._resolver.get_uri()] = self._resolver.get_scope_name() 

67 

68 self._json_keywords_to_function = OrderedDict() 

69 

70 @property 

71 def func_code(self): 

72 """ 

73 Returns generated code of whole validation function as string. 

74 """ 

75 self._generate_func_code() 

76 

77 return '\n'.join(self._code) 

78 

79 @property 

80 def global_state(self): 

81 """ 

82 Returns global variables for generating function from ``func_code``. Includes 

83 compiled regular expressions and imports, so it does not have to do it every 

84 time when validation function is called. 

85 """ 

86 self._generate_func_code() 

87 

88 return dict( 

89 **self._extra_imports_objects, 

90 REGEX_PATTERNS=self._compile_regexps, 

91 re=re, 

92 JsonSchemaValueException=JsonSchemaValueException, 

93 ) 

94 

95 @property 

96 def global_state_code(self): 

97 """ 

98 Returns global variables for generating function from ``func_code`` as code. 

99 Includes compiled regular expressions and imports. 

100 """ 

101 self._generate_func_code() 

102 

103 if not self._compile_regexps: 

104 return '\n'.join(self._extra_imports_lines + [ 

105 'from fastjsonschema import JsonSchemaValueException', 

106 '', 

107 '', 

108 ]) 

109 return '\n'.join(self._extra_imports_lines + [ 

110 'import re', 

111 'from fastjsonschema import JsonSchemaValueException', 

112 '', 

113 '', 

114 'REGEX_PATTERNS = ' + serialize_regexes(self._compile_regexps), 

115 '', 

116 ]) 

117 

118 

119 def _generate_func_code(self): 

120 if not self._code: 

121 self.generate_func_code() 

122 

123 def generate_func_code(self): 

124 """ 

125 Creates base code of validation function and calls helper 

126 for creating code by definition. 

127 """ 

128 self.l('NoneType = type(None)') 

129 # Generate parts that are referenced and not yet generated 

130 while self._needed_validation_functions: 

131 # During generation of validation function, could be needed to generate 

132 # new one that is added again to `_needed_validation_functions`. 

133 # Therefore usage of while instead of for loop. 

134 uri, name = self._needed_validation_functions.popitem() 

135 self.generate_validation_function(uri, name) 

136 

137 def generate_validation_function(self, uri, name): 

138 """ 

139 Generate validation function for given uri with given name 

140 """ 

141 self._validation_functions_done.add(uri) 

142 self.l('') 

143 with self._resolver.resolving(uri) as definition: 

144 with self.l('def {}(data, custom_formats={{}}, name_prefix=None):', name): 

145 self.generate_func_code_block(definition, 'data', 'data', clear_variables=True) 

146 self.l('return data') 

147 

148 def generate_func_code_block(self, definition, variable, variable_name, clear_variables=False): 

149 """ 

150 Creates validation rules for current definition. 

151 

152 Returns the number of validation rules generated as code. 

153 """ 

154 backup = self._definition, self._variable, self._variable_name 

155 self._definition, self._variable, self._variable_name = definition, variable, variable_name 

156 if clear_variables: 

157 backup_variables = self._variables 

158 self._variables = set() 

159 

160 count = self._generate_func_code_block(definition) 

161 

162 self._definition, self._variable, self._variable_name = backup 

163 if clear_variables: 

164 self._variables = backup_variables 

165 

166 return count 

167 

168 def _generate_func_code_block(self, definition): 

169 if not isinstance(definition, dict): 

170 raise JsonSchemaDefinitionException("definition must be an object") 

171 if '$ref' in definition: 

172 # needed because ref overrides any sibling keywords 

173 return self.generate_ref() 

174 else: 

175 return self.run_generate_functions(definition) 

176 

177 def run_generate_functions(self, definition): 

178 """Returns the number of generate functions that were executed.""" 

179 count = 0 

180 for key, func in self._json_keywords_to_function.items(): 

181 if key in definition: 

182 func() 

183 count += 1 

184 return count 

185 

186 def generate_ref(self): 

187 """ 

188 Ref can be link to remote or local definition. 

189 

190 .. code-block:: python 

191 

192 {'$ref': 'http://json-schema.org/draft-04/schema#'} 

193 { 

194 'properties': { 

195 'foo': {'type': 'integer'}, 

196 'bar': {'$ref': '#/properties/foo'} 

197 } 

198 } 

199 """ 

200 with self._resolver.in_scope(self._definition['$ref']): 

201 name = self._resolver.get_scope_name() 

202 uri = self._resolver.get_uri() 

203 if uri not in self._validation_functions_done: 

204 self._needed_validation_functions[uri] = name 

205 # call validation function 

206 assert self._variable_name.startswith("data") 

207 path = self._variable_name[4:] 

208 name_arg = '(name_prefix or "data") + "{}"'.format(path) 

209 if '{' in name_arg: 

210 name_arg = name_arg + '.format(**locals())' 

211 self.l('{}({variable}, custom_formats, {name_arg})', name, name_arg=name_arg) 

212 

213 

214 # pylint: disable=invalid-name 

215 @indent 

216 def l(self, line, *args, **kwds): 

217 """ 

218 Short-cut of line. Used for inserting line. It's formated with parameters 

219 ``variable``, ``variable_name`` (as ``name`` for short-cut), all keys from 

220 current JSON schema ``definition`` and also passed arguments in ``args`` 

221 and named ``kwds``. 

222 

223 .. code-block:: python 

224 

225 self.l('if {variable} not in {enum}: raise JsonSchemaValueException("Wrong!")') 

226 

227 When you want to indent block, use it as context manager. For example: 

228 

229 .. code-block:: python 

230 

231 with self.l('if {variable} not in {enum}:'): 

232 self.l('raise JsonSchemaValueException("Wrong!")') 

233 """ 

234 spaces = ' ' * self.INDENT * self._indent 

235 

236 name = self._variable_name 

237 if name: 

238 # Add name_prefix to the name when it is being outputted. 

239 assert name.startswith('data') 

240 name = '" + (name_prefix or "data") + "' + name[4:] 

241 if '{' in name: 

242 name = name + '".format(**locals()) + "' 

243 

244 context = dict( 

245 self._definition if self._definition and self._definition is not True else {}, 

246 variable=self._variable, 

247 name=name, 

248 **kwds 

249 ) 

250 line = line.format(*args, **context) 

251 line = line.replace('\n', '\\n').replace('\r', '\\r') 

252 self._code.append(spaces + line) 

253 return line 

254 

255 def e(self, string): 

256 """ 

257 Short-cut of escape. Used for inserting user values into a string message. 

258 

259 .. code-block:: python 

260 

261 self.l('raise JsonSchemaValueException("Variable: {}")', self.e(variable)) 

262 """ 

263 return str(string).replace('"', '\\"') 

264 

265 def exc(self, msg, *args, append_to_msg=None, rule=None): 

266 """ 

267 Short-cut for creating raising exception in the code. 

268 """ 

269 arg = '"'+msg+'"' 

270 if append_to_msg: 

271 arg += ' + (' + append_to_msg + ')' 

272 msg = 'raise JsonSchemaValueException('+arg+', value={variable}, name="{name}", definition={definition}, rule={rule})' 

273 definition = self._expand_refs(self._definition) 

274 definition_rule = self.e(definition.get(rule) if isinstance(definition, dict) else None) 

275 self.l(msg, *args, definition=repr(definition), rule=repr(rule), definition_rule=definition_rule) 

276 

277 def _expand_refs(self, definition): 

278 if isinstance(definition, list): 

279 return [self._expand_refs(v) for v in definition] 

280 if not isinstance(definition, dict): 

281 return definition 

282 if "$ref" in definition and isinstance(definition["$ref"], str): 

283 with self._resolver.resolving(definition["$ref"]) as schema: 

284 return schema 

285 return {k: self._expand_refs(v) for k, v in definition.items()} 

286 

287 def create_variable_with_length(self): 

288 """ 

289 Append code for creating variable with length of that variable 

290 (for example length of list or dictionary) with name ``{variable}_len``. 

291 It can be called several times and always it's done only when that variable 

292 still does not exists. 

293 """ 

294 variable_name = '{}_len'.format(self._variable) 

295 if variable_name in self._variables: 

296 return 

297 self._variables.add(variable_name) 

298 self.l('{variable}_len = len({variable})') 

299 

300 def create_variable_keys(self): 

301 """ 

302 Append code for creating variable with keys of that variable (dictionary) 

303 with a name ``{variable}_keys``. Similar to `create_variable_with_length`. 

304 """ 

305 variable_name = '{}_keys'.format(self._variable) 

306 if variable_name in self._variables: 

307 return 

308 self._variables.add(variable_name) 

309 self.l('{variable}_keys = set({variable}.keys())') 

310 

311 def create_variable_is_list(self): 

312 """ 

313 Append code for creating variable with bool if it's instance of list 

314 with a name ``{variable}_is_list``. Similar to `create_variable_with_length`. 

315 """ 

316 variable_name = '{}_is_list'.format(self._variable) 

317 if variable_name in self._variables: 

318 return 

319 self._variables.add(variable_name) 

320 self.l('{variable}_is_list = isinstance({variable}, (list, tuple))') 

321 

322 def create_variable_is_dict(self): 

323 """ 

324 Append code for creating variable with bool if it's instance of list 

325 with a name ``{variable}_is_dict``. Similar to `create_variable_with_length`. 

326 """ 

327 variable_name = '{}_is_dict'.format(self._variable) 

328 if variable_name in self._variables: 

329 return 

330 self._variables.add(variable_name) 

331 self.l('{variable}_is_dict = isinstance({variable}, dict)') 

332 

333 

334def serialize_regexes(patterns_dict): 

335 # Unfortunately using `pprint.pformat` is causing errors 

336 # specially with big regexes 

337 regex_patterns = ( 

338 repr(k) + ": " + repr_regex(v) 

339 for k, v in patterns_dict.items() 

340 ) 

341 return '{\n ' + ",\n ".join(regex_patterns) + "\n}" 

342 

343 

344def repr_regex(regex): 

345 all_flags = ("A", "I", "DEBUG", "L", "M", "S", "X") 

346 flags = " | ".join(f"re.{f}" for f in all_flags if regex.flags & getattr(re, f)) 

347 flags = ", " + flags if flags else "" 

348 return "re.compile({!r}{})".format(regex.pattern, flags)