1from collections import OrderedDict
2from decimal import Decimal
3import re
4
5from .exceptions import JsonSchemaValueException, JsonSchemaDefinitionException
6from .indent import indent
7from .ref_resolver import RefResolver
8
9
10def enforce_list(variable):
11 if isinstance(variable, list):
12 return variable
13 return [variable]
14
15
16# pylint: disable=too-many-instance-attributes,too-many-public-methods
17class CodeGenerator:
18 """
19 This class is not supposed to be used directly. Anything
20 inside of this class can be changed without noticing.
21
22 This class generates code of validation function from JSON
23 schema object as string. Example:
24
25 .. code-block:: python
26
27 CodeGenerator(json_schema_definition).func_code
28 """
29
30 INDENT = 4 # spaces
31
32 def __init__(self, definition, resolver=None, detailed_exceptions=True):
33 self._code = []
34 self._compile_regexps = {}
35 self._custom_formats = {}
36 self._detailed_exceptions = detailed_exceptions
37
38 # Any extra library should be here to be imported only once.
39 # Lines are imports to be printed in the file and objects
40 # key-value pair to pass to compile function directly.
41 self._extra_imports_lines = [
42 "from decimal import Decimal",
43 ]
44 self._extra_imports_objects = {
45 "Decimal": Decimal,
46 }
47
48 self._variables = set()
49 self._indent = 0
50 self._indent_last_line = None
51 self._variable = None
52 self._variable_name = None
53 self._root_definition = definition
54 self._definition = None
55
56 # map schema URIs to validation function names for functions
57 # that are not yet generated, but need to be generated
58 self._needed_validation_functions = {}
59 # validation function names that are already done
60 self._validation_functions_done = set()
61
62 if resolver is None:
63 resolver = RefResolver.from_schema(definition, store={})
64 self._resolver = resolver
65
66 # add main function to `self._needed_validation_functions`
67 self._needed_validation_functions[self._resolver.get_uri()] = self._resolver.get_scope_name()
68
69 self._json_keywords_to_function = OrderedDict()
70
71 @property
72 def func_code(self):
73 """
74 Returns generated code of whole validation function as string.
75 """
76 self._generate_func_code()
77
78 return '\n'.join(self._code)
79
80 @property
81 def global_state(self):
82 """
83 Returns global variables for generating function from ``func_code``. Includes
84 compiled regular expressions and imports, so it does not have to do it every
85 time when validation function is called.
86 """
87 self._generate_func_code()
88
89 return dict(
90 **self._extra_imports_objects,
91 REGEX_PATTERNS=self._compile_regexps,
92 re=re,
93 JsonSchemaValueException=JsonSchemaValueException,
94 )
95
96 @property
97 def global_state_code(self):
98 """
99 Returns global variables for generating function from ``func_code`` as code.
100 Includes compiled regular expressions and imports.
101 """
102 self._generate_func_code()
103
104 if not self._compile_regexps:
105 return '\n'.join(self._extra_imports_lines + [
106 'from fastjsonschema import JsonSchemaValueException',
107 '',
108 '',
109 ])
110 return '\n'.join(self._extra_imports_lines + [
111 'import re',
112 'from fastjsonschema import JsonSchemaValueException',
113 '',
114 '',
115 'REGEX_PATTERNS = ' + serialize_regexes(self._compile_regexps),
116 '',
117 ])
118
119
120 def _generate_func_code(self):
121 if not self._code:
122 self.generate_func_code()
123
124 def generate_func_code(self):
125 """
126 Creates base code of validation function and calls helper
127 for creating code by definition.
128 """
129 self.l('NoneType = type(None)')
130 # Generate parts that are referenced and not yet generated
131 while self._needed_validation_functions:
132 # During generation of validation function, could be needed to generate
133 # new one that is added again to `_needed_validation_functions`.
134 # Therefore usage of while instead of for loop.
135 uri, name = self._needed_validation_functions.popitem()
136 self.generate_validation_function(uri, name)
137
138 def generate_validation_function(self, uri, name):
139 """
140 Generate validation function for given uri with given name
141 """
142 self._validation_functions_done.add(uri)
143 self.l('')
144 with self._resolver.resolving(uri) as definition:
145 with self.l('def {}(data, custom_formats={{}}, name_prefix=None):', name):
146 self.generate_func_code_block(definition, 'data', 'data', clear_variables=True)
147 self.l('return data')
148
149 def generate_func_code_block(self, definition, variable, variable_name, clear_variables=False):
150 """
151 Creates validation rules for current definition.
152
153 Returns the number of validation rules generated as code.
154 """
155 backup = self._definition, self._variable, self._variable_name
156 self._definition, self._variable, self._variable_name = definition, variable, variable_name
157 if clear_variables:
158 backup_variables = self._variables
159 self._variables = set()
160
161 count = self._generate_func_code_block(definition)
162
163 self._definition, self._variable, self._variable_name = backup
164 if clear_variables:
165 self._variables = backup_variables
166
167 return count
168
169 def _generate_func_code_block(self, definition):
170 if not isinstance(definition, dict):
171 raise JsonSchemaDefinitionException("definition must be an object")
172 if '$ref' in definition:
173 # needed because ref overrides any sibling keywords
174 return self.generate_ref()
175 else:
176 return self.run_generate_functions(definition)
177
178 def run_generate_functions(self, definition):
179 """Returns the number of generate functions that were executed."""
180 count = 0
181 for key, func in self._json_keywords_to_function.items():
182 if key in definition:
183 func()
184 count += 1
185 return count
186
187 def generate_ref(self):
188 """
189 Ref can be link to remote or local definition.
190
191 .. code-block:: python
192
193 {'$ref': 'http://json-schema.org/draft-04/schema#'}
194 {
195 'properties': {
196 'foo': {'type': 'integer'},
197 'bar': {'$ref': '#/properties/foo'}
198 }
199 }
200 """
201 with self._resolver.in_scope(self._definition['$ref']):
202 name = self._resolver.get_scope_name()
203 uri = self._resolver.get_uri()
204 if uri not in self._validation_functions_done:
205 self._needed_validation_functions[uri] = name
206 # call validation function
207 assert self._variable_name.startswith("data")
208 path = self._variable_name[4:]
209 name_arg = '(name_prefix or "data") + "{}"'.format(path)
210 if '{' in name_arg:
211 name_arg = name_arg + '.format(**locals())'
212 self.l('{}({variable}, custom_formats, {name_arg})', name, name_arg=name_arg)
213
214
215 # pylint: disable=invalid-name
216 @indent
217 def l(self, line, *args, **kwds):
218 """
219 Short-cut of line. Used for inserting line. It's formated with parameters
220 ``variable``, ``variable_name`` (as ``name`` for short-cut), all keys from
221 current JSON schema ``definition`` and also passed arguments in ``args``
222 and named ``kwds``.
223
224 .. code-block:: python
225
226 self.l('if {variable} not in {enum}: raise JsonSchemaValueException("Wrong!")')
227
228 When you want to indent block, use it as context manager. For example:
229
230 .. code-block:: python
231
232 with self.l('if {variable} not in {enum}:'):
233 self.l('raise JsonSchemaValueException("Wrong!")')
234 """
235 spaces = ' ' * self.INDENT * self._indent
236
237 name = self._variable_name
238 if name:
239 # Add name_prefix to the name when it is being outputted.
240 assert name.startswith('data')
241 name = '" + (name_prefix or "data") + "' + name[4:]
242 if '{' in name:
243 name = name + '".format(**locals()) + "'
244
245 context = dict(
246 self._definition if self._definition and self._definition is not True else {},
247 variable=self._variable,
248 name=name,
249 **kwds
250 )
251 line = line.format(*args, **context)
252 line = line.replace('\n', '\\n').replace('\r', '\\r')
253 self._code.append(spaces + line)
254 return line
255
256 def e(self, string):
257 """
258 Short-cut of escape. Used for inserting user values into a string message.
259
260 .. code-block:: python
261
262 self.l('raise JsonSchemaValueException("Variable: {}")', self.e(variable))
263 """
264 return str(string).replace('"', '\\"')
265
266 def exc(self, msg, *args, append_to_msg=None, rule=None):
267 """
268 Short-cut for creating raising exception in the code.
269 """
270 if not self._detailed_exceptions:
271 self.l('raise JsonSchemaValueException("'+msg+'")', *args)
272 return
273
274 arg = '"'+msg+'"'
275 if append_to_msg:
276 arg += ' + (' + append_to_msg + ')'
277 msg = 'raise JsonSchemaValueException('+arg+', value={variable}, name="{name}", definition={definition}, rule={rule})'
278 definition = self._expand_refs(self._definition)
279 definition_rule = self.e(definition.get(rule) if isinstance(definition, dict) else None)
280 self.l(msg, *args, definition=repr(definition), rule=repr(rule), definition_rule=definition_rule)
281
282 def _expand_refs(self, definition):
283 if isinstance(definition, list):
284 return [self._expand_refs(v) for v in definition]
285 if not isinstance(definition, dict):
286 return definition
287 if "$ref" in definition and isinstance(definition["$ref"], str):
288 with self._resolver.resolving(definition["$ref"]) as schema:
289 return schema
290 return {k: self._expand_refs(v) for k, v in definition.items()}
291
292 def create_variable_with_length(self):
293 """
294 Append code for creating variable with length of that variable
295 (for example length of list or dictionary) with name ``{variable}_len``.
296 It can be called several times and always it's done only when that variable
297 still does not exists.
298 """
299 variable_name = '{}_len'.format(self._variable)
300 if variable_name in self._variables:
301 return
302 self._variables.add(variable_name)
303 self.l('{variable}_len = len({variable})')
304
305 def create_variable_keys(self):
306 """
307 Append code for creating variable with keys of that variable (dictionary)
308 with a name ``{variable}_keys``. Similar to `create_variable_with_length`.
309 """
310 variable_name = '{}_keys'.format(self._variable)
311 if variable_name in self._variables:
312 return
313 self._variables.add(variable_name)
314 self.l('{variable}_keys = set({variable}.keys())')
315
316 def create_variable_is_list(self):
317 """
318 Append code for creating variable with bool if it's instance of list
319 with a name ``{variable}_is_list``. Similar to `create_variable_with_length`.
320 """
321 variable_name = '{}_is_list'.format(self._variable)
322 if variable_name in self._variables:
323 return
324 self._variables.add(variable_name)
325 self.l('{variable}_is_list = isinstance({variable}, (list, tuple))')
326
327 def create_variable_is_dict(self):
328 """
329 Append code for creating variable with bool if it's instance of list
330 with a name ``{variable}_is_dict``. Similar to `create_variable_with_length`.
331 """
332 variable_name = '{}_is_dict'.format(self._variable)
333 if variable_name in self._variables:
334 return
335 self._variables.add(variable_name)
336 self.l('{variable}_is_dict = isinstance({variable}, dict)')
337
338
339def serialize_regexes(patterns_dict):
340 # Unfortunately using `pprint.pformat` is causing errors
341 # specially with big regexes
342 regex_patterns = (
343 repr(k) + ": " + repr_regex(v)
344 for k, v in patterns_dict.items()
345 )
346 return '{\n ' + ",\n ".join(regex_patterns) + "\n}"
347
348
349def repr_regex(regex):
350 all_flags = ("A", "I", "DEBUG", "L", "M", "S", "X")
351 flags = " | ".join(f"re.{f}" for f in all_flags if regex.flags & getattr(re, f))
352 flags = ", " + flags if flags else ""
353 return "re.compile({!r}{})".format(regex.pattern, flags)