Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fastjsonschema/generator.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

166 statements  

1from collections import OrderedDict 

2from decimal import Decimal 

3import re 

4 

5from .exceptions import JsonSchemaValueException, JsonSchemaDefinitionException 

6from .indent import indent 

7from .ref_resolver import RefResolver 

8 

9 

10def enforce_list(variable): 

11 if isinstance(variable, list): 

12 return variable 

13 return [variable] 

14 

15 

16# pylint: disable=too-many-instance-attributes,too-many-public-methods 

17class CodeGenerator: 

18 """ 

19 This class is not supposed to be used directly. Anything 

20 inside of this class can be changed without noticing. 

21 

22 This class generates code of validation function from JSON 

23 schema object as string. Example: 

24 

25 .. code-block:: python 

26 

27 CodeGenerator(json_schema_definition).func_code 

28 """ 

29 

30 INDENT = 4 # spaces 

31 

32 def __init__(self, definition, resolver=None, detailed_exceptions=True): 

33 self._code = [] 

34 self._compile_regexps = {} 

35 self._custom_formats = {} 

36 self._detailed_exceptions = detailed_exceptions 

37 

38 # Any extra library should be here to be imported only once. 

39 # Lines are imports to be printed in the file and objects 

40 # key-value pair to pass to compile function directly. 

41 self._extra_imports_lines = [ 

42 "from decimal import Decimal", 

43 ] 

44 self._extra_imports_objects = { 

45 "Decimal": Decimal, 

46 } 

47 

48 self._variables = set() 

49 self._indent = 0 

50 self._indent_last_line = None 

51 self._variable = None 

52 self._variable_name = None 

53 self._root_definition = definition 

54 self._definition = None 

55 

56 # map schema URIs to validation function names for functions 

57 # that are not yet generated, but need to be generated 

58 self._needed_validation_functions = {} 

59 # validation function names that are already done 

60 self._validation_functions_done = set() 

61 

62 if resolver is None: 

63 resolver = RefResolver.from_schema(definition, store={}) 

64 self._resolver = resolver 

65 

66 # add main function to `self._needed_validation_functions` 

67 self._needed_validation_functions[self._resolver.get_uri()] = self._resolver.get_scope_name() 

68 

69 self._json_keywords_to_function = OrderedDict() 

70 

71 @property 

72 def func_code(self): 

73 """ 

74 Returns generated code of whole validation function as string. 

75 """ 

76 self._generate_func_code() 

77 

78 return '\n'.join(self._code) 

79 

80 @property 

81 def global_state(self): 

82 """ 

83 Returns global variables for generating function from ``func_code``. Includes 

84 compiled regular expressions and imports, so it does not have to do it every 

85 time when validation function is called. 

86 """ 

87 self._generate_func_code() 

88 

89 return dict( 

90 **self._extra_imports_objects, 

91 REGEX_PATTERNS=self._compile_regexps, 

92 re=re, 

93 JsonSchemaValueException=JsonSchemaValueException, 

94 ) 

95 

96 @property 

97 def global_state_code(self): 

98 """ 

99 Returns global variables for generating function from ``func_code`` as code. 

100 Includes compiled regular expressions and imports. 

101 """ 

102 self._generate_func_code() 

103 

104 if not self._compile_regexps: 

105 return '\n'.join(self._extra_imports_lines + [ 

106 'from fastjsonschema import JsonSchemaValueException', 

107 '', 

108 '', 

109 ]) 

110 return '\n'.join(self._extra_imports_lines + [ 

111 'import re', 

112 'from fastjsonschema import JsonSchemaValueException', 

113 '', 

114 '', 

115 'REGEX_PATTERNS = ' + serialize_regexes(self._compile_regexps), 

116 '', 

117 ]) 

118 

119 

120 def _generate_func_code(self): 

121 if not self._code: 

122 self.generate_func_code() 

123 

124 def generate_func_code(self): 

125 """ 

126 Creates base code of validation function and calls helper 

127 for creating code by definition. 

128 """ 

129 self.l('NoneType = type(None)') 

130 # Generate parts that are referenced and not yet generated 

131 while self._needed_validation_functions: 

132 # During generation of validation function, could be needed to generate 

133 # new one that is added again to `_needed_validation_functions`. 

134 # Therefore usage of while instead of for loop. 

135 uri, name = self._needed_validation_functions.popitem() 

136 self.generate_validation_function(uri, name) 

137 

138 def generate_validation_function(self, uri, name): 

139 """ 

140 Generate validation function for given uri with given name 

141 """ 

142 self._validation_functions_done.add(uri) 

143 self.l('') 

144 with self._resolver.resolving(uri) as definition: 

145 with self.l('def {}(data, custom_formats={{}}, name_prefix=None):', name): 

146 self.generate_func_code_block(definition, 'data', 'data', clear_variables=True) 

147 self.l('return data') 

148 

149 def generate_func_code_block(self, definition, variable, variable_name, clear_variables=False): 

150 """ 

151 Creates validation rules for current definition. 

152 

153 Returns the number of validation rules generated as code. 

154 """ 

155 backup = self._definition, self._variable, self._variable_name 

156 self._definition, self._variable, self._variable_name = definition, variable, variable_name 

157 if clear_variables: 

158 backup_variables = self._variables 

159 self._variables = set() 

160 

161 count = self._generate_func_code_block(definition) 

162 

163 self._definition, self._variable, self._variable_name = backup 

164 if clear_variables: 

165 self._variables = backup_variables 

166 

167 return count 

168 

169 def _generate_func_code_block(self, definition): 

170 if not isinstance(definition, dict): 

171 raise JsonSchemaDefinitionException("definition must be an object") 

172 if '$ref' in definition: 

173 # needed because ref overrides any sibling keywords 

174 return self.generate_ref() 

175 else: 

176 return self.run_generate_functions(definition) 

177 

178 def run_generate_functions(self, definition): 

179 """Returns the number of generate functions that were executed.""" 

180 count = 0 

181 for key, func in self._json_keywords_to_function.items(): 

182 if key in definition: 

183 func() 

184 count += 1 

185 return count 

186 

187 def generate_ref(self): 

188 """ 

189 Ref can be link to remote or local definition. 

190 

191 .. code-block:: python 

192 

193 {'$ref': 'http://json-schema.org/draft-04/schema#'} 

194 { 

195 'properties': { 

196 'foo': {'type': 'integer'}, 

197 'bar': {'$ref': '#/properties/foo'} 

198 } 

199 } 

200 """ 

201 with self._resolver.in_scope(self._definition['$ref']): 

202 name = self._resolver.get_scope_name() 

203 uri = self._resolver.get_uri() 

204 if uri not in self._validation_functions_done: 

205 self._needed_validation_functions[uri] = name 

206 # call validation function 

207 assert self._variable_name.startswith("data") 

208 path = self._variable_name[4:] 

209 name_arg = '(name_prefix or "data") + "{}"'.format(path) 

210 if '{' in name_arg: 

211 name_arg = name_arg + '.format(**locals())' 

212 self.l('{}({variable}, custom_formats, {name_arg})', name, name_arg=name_arg) 

213 

214 

215 # pylint: disable=invalid-name 

216 @indent 

217 def l(self, line, *args, **kwds): 

218 """ 

219 Short-cut of line. Used for inserting line. It's formated with parameters 

220 ``variable``, ``variable_name`` (as ``name`` for short-cut), all keys from 

221 current JSON schema ``definition`` and also passed arguments in ``args`` 

222 and named ``kwds``. 

223 

224 .. code-block:: python 

225 

226 self.l('if {variable} not in {enum}: raise JsonSchemaValueException("Wrong!")') 

227 

228 When you want to indent block, use it as context manager. For example: 

229 

230 .. code-block:: python 

231 

232 with self.l('if {variable} not in {enum}:'): 

233 self.l('raise JsonSchemaValueException("Wrong!")') 

234 """ 

235 spaces = ' ' * self.INDENT * self._indent 

236 

237 name = self._variable_name 

238 if name: 

239 # Add name_prefix to the name when it is being outputted. 

240 assert name.startswith('data') 

241 name = '" + (name_prefix or "data") + "' + name[4:] 

242 if '{' in name: 

243 name = name + '".format(**locals()) + "' 

244 

245 context = dict( 

246 self._definition if self._definition and self._definition is not True else {}, 

247 variable=self._variable, 

248 name=name, 

249 **kwds 

250 ) 

251 line = line.format(*args, **context) 

252 line = line.replace('\n', '\\n').replace('\r', '\\r') 

253 self._code.append(spaces + line) 

254 return line 

255 

256 def e(self, string): 

257 """ 

258 Short-cut of escape. Used for inserting user values into a string message. 

259 

260 .. code-block:: python 

261 

262 self.l('raise JsonSchemaValueException("Variable: {}")', self.e(variable)) 

263 """ 

264 return str(string).replace('"', '\\"') 

265 

266 def exc(self, msg, *args, append_to_msg=None, rule=None): 

267 """ 

268 Short-cut for creating raising exception in the code. 

269 """ 

270 if not self._detailed_exceptions: 

271 self.l('raise JsonSchemaValueException("'+msg+'")', *args) 

272 return 

273 

274 arg = '"'+msg+'"' 

275 if append_to_msg: 

276 arg += ' + (' + append_to_msg + ')' 

277 msg = 'raise JsonSchemaValueException('+arg+', value={variable}, name="{name}", definition={definition}, rule={rule})' 

278 definition = self._expand_refs(self._definition) 

279 definition_rule = self.e(definition.get(rule) if isinstance(definition, dict) else None) 

280 self.l(msg, *args, definition=repr(definition), rule=repr(rule), definition_rule=definition_rule) 

281 

282 def _expand_refs(self, definition): 

283 if isinstance(definition, list): 

284 return [self._expand_refs(v) for v in definition] 

285 if not isinstance(definition, dict): 

286 return definition 

287 if "$ref" in definition and isinstance(definition["$ref"], str): 

288 with self._resolver.resolving(definition["$ref"]) as schema: 

289 return schema 

290 return {k: self._expand_refs(v) for k, v in definition.items()} 

291 

292 def create_variable_with_length(self): 

293 """ 

294 Append code for creating variable with length of that variable 

295 (for example length of list or dictionary) with name ``{variable}_len``. 

296 It can be called several times and always it's done only when that variable 

297 still does not exists. 

298 """ 

299 variable_name = '{}_len'.format(self._variable) 

300 if variable_name in self._variables: 

301 return 

302 self._variables.add(variable_name) 

303 self.l('{variable}_len = len({variable})') 

304 

305 def create_variable_keys(self): 

306 """ 

307 Append code for creating variable with keys of that variable (dictionary) 

308 with a name ``{variable}_keys``. Similar to `create_variable_with_length`. 

309 """ 

310 variable_name = '{}_keys'.format(self._variable) 

311 if variable_name in self._variables: 

312 return 

313 self._variables.add(variable_name) 

314 self.l('{variable}_keys = set({variable}.keys())') 

315 

316 def create_variable_is_list(self): 

317 """ 

318 Append code for creating variable with bool if it's instance of list 

319 with a name ``{variable}_is_list``. Similar to `create_variable_with_length`. 

320 """ 

321 variable_name = '{}_is_list'.format(self._variable) 

322 if variable_name in self._variables: 

323 return 

324 self._variables.add(variable_name) 

325 self.l('{variable}_is_list = isinstance({variable}, (list, tuple))') 

326 

327 def create_variable_is_dict(self): 

328 """ 

329 Append code for creating variable with bool if it's instance of list 

330 with a name ``{variable}_is_dict``. Similar to `create_variable_with_length`. 

331 """ 

332 variable_name = '{}_is_dict'.format(self._variable) 

333 if variable_name in self._variables: 

334 return 

335 self._variables.add(variable_name) 

336 self.l('{variable}_is_dict = isinstance({variable}, dict)') 

337 

338 

339def serialize_regexes(patterns_dict): 

340 # Unfortunately using `pprint.pformat` is causing errors 

341 # specially with big regexes 

342 regex_patterns = ( 

343 repr(k) + ": " + repr_regex(v) 

344 for k, v in patterns_dict.items() 

345 ) 

346 return '{\n ' + ",\n ".join(regex_patterns) + "\n}" 

347 

348 

349def repr_regex(regex): 

350 all_flags = ("A", "I", "DEBUG", "L", "M", "S", "X") 

351 flags = " | ".join(f"re.{f}" for f in all_flags if regex.flags & getattr(re, f)) 

352 flags = ", " + flags if flags else "" 

353 return "re.compile({!r}{})".format(regex.pattern, flags)