Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastjsonschema/generator.py: 21%
162 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1from collections import OrderedDict
2from decimal import Decimal
3import re
5from .exceptions import JsonSchemaValueException, JsonSchemaDefinitionException
6from .indent import indent
7from .ref_resolver import RefResolver
10def enforce_list(variable):
11 if isinstance(variable, list):
12 return variable
13 return [variable]
16# pylint: disable=too-many-instance-attributes,too-many-public-methods
17class CodeGenerator:
18 """
19 This class is not supposed to be used directly. Anything
20 inside of this class can be changed without noticing.
22 This class generates code of validation function from JSON
23 schema object as string. Example:
25 .. code-block:: python
27 CodeGenerator(json_schema_definition).func_code
28 """
30 INDENT = 4 # spaces
32 def __init__(self, definition, resolver=None):
33 self._code = []
34 self._compile_regexps = {}
35 self._custom_formats = {}
37 # Any extra library should be here to be imported only once.
38 # Lines are imports to be printed in the file and objects
39 # key-value pair to pass to compile function directly.
40 self._extra_imports_lines = [
41 "from decimal import Decimal",
42 ]
43 self._extra_imports_objects = {
44 "Decimal": Decimal,
45 }
47 self._variables = set()
48 self._indent = 0
49 self._indent_last_line = None
50 self._variable = None
51 self._variable_name = None
52 self._root_definition = definition
53 self._definition = None
55 # map schema URIs to validation function names for functions
56 # that are not yet generated, but need to be generated
57 self._needed_validation_functions = {}
58 # validation function names that are already done
59 self._validation_functions_done = set()
61 if resolver is None:
62 resolver = RefResolver.from_schema(definition, store={})
63 self._resolver = resolver
65 # add main function to `self._needed_validation_functions`
66 self._needed_validation_functions[self._resolver.get_uri()] = self._resolver.get_scope_name()
68 self._json_keywords_to_function = OrderedDict()
70 @property
71 def func_code(self):
72 """
73 Returns generated code of whole validation function as string.
74 """
75 self._generate_func_code()
77 return '\n'.join(self._code)
79 @property
80 def global_state(self):
81 """
82 Returns global variables for generating function from ``func_code``. Includes
83 compiled regular expressions and imports, so it does not have to do it every
84 time when validation function is called.
85 """
86 self._generate_func_code()
88 return dict(
89 **self._extra_imports_objects,
90 REGEX_PATTERNS=self._compile_regexps,
91 re=re,
92 JsonSchemaValueException=JsonSchemaValueException,
93 )
95 @property
96 def global_state_code(self):
97 """
98 Returns global variables for generating function from ``func_code`` as code.
99 Includes compiled regular expressions and imports.
100 """
101 self._generate_func_code()
103 if not self._compile_regexps:
104 return '\n'.join(self._extra_imports_lines + [
105 'from fastjsonschema import JsonSchemaValueException',
106 '',
107 '',
108 ])
109 return '\n'.join(self._extra_imports_lines + [
110 'import re',
111 'from fastjsonschema import JsonSchemaValueException',
112 '',
113 '',
114 'REGEX_PATTERNS = ' + serialize_regexes(self._compile_regexps),
115 '',
116 ])
119 def _generate_func_code(self):
120 if not self._code:
121 self.generate_func_code()
123 def generate_func_code(self):
124 """
125 Creates base code of validation function and calls helper
126 for creating code by definition.
127 """
128 self.l('NoneType = type(None)')
129 # Generate parts that are referenced and not yet generated
130 while self._needed_validation_functions:
131 # During generation of validation function, could be needed to generate
132 # new one that is added again to `_needed_validation_functions`.
133 # Therefore usage of while instead of for loop.
134 uri, name = self._needed_validation_functions.popitem()
135 self.generate_validation_function(uri, name)
137 def generate_validation_function(self, uri, name):
138 """
139 Generate validation function for given uri with given name
140 """
141 self._validation_functions_done.add(uri)
142 self.l('')
143 with self._resolver.resolving(uri) as definition:
144 with self.l('def {}(data, custom_formats={{}}, name_prefix=None):', name):
145 self.generate_func_code_block(definition, 'data', 'data', clear_variables=True)
146 self.l('return data')
148 def generate_func_code_block(self, definition, variable, variable_name, clear_variables=False):
149 """
150 Creates validation rules for current definition.
152 Returns the number of validation rules generated as code.
153 """
154 backup = self._definition, self._variable, self._variable_name
155 self._definition, self._variable, self._variable_name = definition, variable, variable_name
156 if clear_variables:
157 backup_variables = self._variables
158 self._variables = set()
160 count = self._generate_func_code_block(definition)
162 self._definition, self._variable, self._variable_name = backup
163 if clear_variables:
164 self._variables = backup_variables
166 return count
168 def _generate_func_code_block(self, definition):
169 if not isinstance(definition, dict):
170 raise JsonSchemaDefinitionException("definition must be an object")
171 if '$ref' in definition:
172 # needed because ref overrides any sibling keywords
173 return self.generate_ref()
174 else:
175 return self.run_generate_functions(definition)
177 def run_generate_functions(self, definition):
178 """Returns the number of generate functions that were executed."""
179 count = 0
180 for key, func in self._json_keywords_to_function.items():
181 if key in definition:
182 func()
183 count += 1
184 return count
186 def generate_ref(self):
187 """
188 Ref can be link to remote or local definition.
190 .. code-block:: python
192 {'$ref': 'http://json-schema.org/draft-04/schema#'}
193 {
194 'properties': {
195 'foo': {'type': 'integer'},
196 'bar': {'$ref': '#/properties/foo'}
197 }
198 }
199 """
200 with self._resolver.in_scope(self._definition['$ref']):
201 name = self._resolver.get_scope_name()
202 uri = self._resolver.get_uri()
203 if uri not in self._validation_functions_done:
204 self._needed_validation_functions[uri] = name
205 # call validation function
206 assert self._variable_name.startswith("data")
207 path = self._variable_name[4:]
208 name_arg = '(name_prefix or "data") + "{}"'.format(path)
209 if '{' in name_arg:
210 name_arg = name_arg + '.format(**locals())'
211 self.l('{}({variable}, custom_formats, {name_arg})', name, name_arg=name_arg)
214 # pylint: disable=invalid-name
215 @indent
216 def l(self, line, *args, **kwds):
217 """
218 Short-cut of line. Used for inserting line. It's formated with parameters
219 ``variable``, ``variable_name`` (as ``name`` for short-cut), all keys from
220 current JSON schema ``definition`` and also passed arguments in ``args``
221 and named ``kwds``.
223 .. code-block:: python
225 self.l('if {variable} not in {enum}: raise JsonSchemaValueException("Wrong!")')
227 When you want to indent block, use it as context manager. For example:
229 .. code-block:: python
231 with self.l('if {variable} not in {enum}:'):
232 self.l('raise JsonSchemaValueException("Wrong!")')
233 """
234 spaces = ' ' * self.INDENT * self._indent
236 name = self._variable_name
237 if name:
238 # Add name_prefix to the name when it is being outputted.
239 assert name.startswith('data')
240 name = '" + (name_prefix or "data") + "' + name[4:]
241 if '{' in name:
242 name = name + '".format(**locals()) + "'
244 context = dict(
245 self._definition or {},
246 variable=self._variable,
247 name=name,
248 **kwds
249 )
250 line = line.format(*args, **context)
251 line = line.replace('\n', '\\n').replace('\r', '\\r')
252 self._code.append(spaces + line)
253 return line
255 def e(self, string):
256 """
257 Short-cut of escape. Used for inserting user values into a string message.
259 .. code-block:: python
261 self.l('raise JsonSchemaValueException("Variable: {}")', self.e(variable))
262 """
263 return str(string).replace('"', '\\"')
265 def exc(self, msg, *args, append_to_msg=None, rule=None):
266 """
267 Short-cut for creating raising exception in the code.
268 """
269 arg = '"'+msg+'"'
270 if append_to_msg:
271 arg += ' + (' + append_to_msg + ')'
272 msg = 'raise JsonSchemaValueException('+arg+', value={variable}, name="{name}", definition={definition}, rule={rule})'
273 definition = self._expand_refs(self._definition)
274 definition_rule = self.e(definition.get(rule) if isinstance(definition, dict) else None)
275 self.l(msg, *args, definition=repr(definition), rule=repr(rule), definition_rule=definition_rule)
277 def _expand_refs(self, definition):
278 if isinstance(definition, list):
279 return [self._expand_refs(v) for v in definition]
280 if not isinstance(definition, dict):
281 return definition
282 if "$ref" in definition and isinstance(definition["$ref"], str):
283 with self._resolver.resolving(definition["$ref"]) as schema:
284 return schema
285 return {k: self._expand_refs(v) for k, v in definition.items()}
287 def create_variable_with_length(self):
288 """
289 Append code for creating variable with length of that variable
290 (for example length of list or dictionary) with name ``{variable}_len``.
291 It can be called several times and always it's done only when that variable
292 still does not exists.
293 """
294 variable_name = '{}_len'.format(self._variable)
295 if variable_name in self._variables:
296 return
297 self._variables.add(variable_name)
298 self.l('{variable}_len = len({variable})')
300 def create_variable_keys(self):
301 """
302 Append code for creating variable with keys of that variable (dictionary)
303 with a name ``{variable}_keys``. Similar to `create_variable_with_length`.
304 """
305 variable_name = '{}_keys'.format(self._variable)
306 if variable_name in self._variables:
307 return
308 self._variables.add(variable_name)
309 self.l('{variable}_keys = set({variable}.keys())')
311 def create_variable_is_list(self):
312 """
313 Append code for creating variable with bool if it's instance of list
314 with a name ``{variable}_is_list``. Similar to `create_variable_with_length`.
315 """
316 variable_name = '{}_is_list'.format(self._variable)
317 if variable_name in self._variables:
318 return
319 self._variables.add(variable_name)
320 self.l('{variable}_is_list = isinstance({variable}, (list, tuple))')
322 def create_variable_is_dict(self):
323 """
324 Append code for creating variable with bool if it's instance of list
325 with a name ``{variable}_is_dict``. Similar to `create_variable_with_length`.
326 """
327 variable_name = '{}_is_dict'.format(self._variable)
328 if variable_name in self._variables:
329 return
330 self._variables.add(variable_name)
331 self.l('{variable}_is_dict = isinstance({variable}, dict)')
334def serialize_regexes(patterns_dict):
335 # Unfortunately using `pprint.pformat` is causing errors
336 # specially with big regexes
337 regex_patterns = (
338 repr(k) + ": " + repr_regex(v)
339 for k, v in patterns_dict.items()
340 )
341 return '{\n ' + ",\n ".join(regex_patterns) + "\n}"
344def repr_regex(regex):
345 all_flags = ("A", "I", "DEBUG", "L", "M", "S", "X")
346 flags = " | ".join(f"re.{f}" for f in all_flags if regex.flags & getattr(re, f))
347 flags = ", " + flags if flags else ""
348 return "re.compile({!r}{})".format(regex.pattern, flags)