1from collections import OrderedDict
2from decimal import Decimal
3import re
4
5from .exceptions import JsonSchemaValueException, JsonSchemaDefinitionException
6from .indent import indent
7from .ref_resolver import RefResolver
8
9
10def enforce_list(variable):
11 if isinstance(variable, list):
12 return variable
13 return [variable]
14
15
16# pylint: disable=too-many-instance-attributes,too-many-public-methods
17class CodeGenerator:
18 """
19 This class is not supposed to be used directly. Anything
20 inside of this class can be changed without noticing.
21
22 This class generates code of validation function from JSON
23 schema object as string. Example:
24
25 .. code-block:: python
26
27 CodeGenerator(json_schema_definition).func_code
28 """
29
30 INDENT = 4 # spaces
31
32 def __init__(self, definition, resolver=None):
33 self._code = []
34 self._compile_regexps = {}
35 self._custom_formats = {}
36
37 # Any extra library should be here to be imported only once.
38 # Lines are imports to be printed in the file and objects
39 # key-value pair to pass to compile function directly.
40 self._extra_imports_lines = [
41 "from decimal import Decimal",
42 ]
43 self._extra_imports_objects = {
44 "Decimal": Decimal,
45 }
46
47 self._variables = set()
48 self._indent = 0
49 self._indent_last_line = None
50 self._variable = None
51 self._variable_name = None
52 self._root_definition = definition
53 self._definition = None
54
55 # map schema URIs to validation function names for functions
56 # that are not yet generated, but need to be generated
57 self._needed_validation_functions = {}
58 # validation function names that are already done
59 self._validation_functions_done = set()
60
61 if resolver is None:
62 resolver = RefResolver.from_schema(definition, store={})
63 self._resolver = resolver
64
65 # add main function to `self._needed_validation_functions`
66 self._needed_validation_functions[self._resolver.get_uri()] = self._resolver.get_scope_name()
67
68 self._json_keywords_to_function = OrderedDict()
69
70 @property
71 def func_code(self):
72 """
73 Returns generated code of whole validation function as string.
74 """
75 self._generate_func_code()
76
77 return '\n'.join(self._code)
78
79 @property
80 def global_state(self):
81 """
82 Returns global variables for generating function from ``func_code``. Includes
83 compiled regular expressions and imports, so it does not have to do it every
84 time when validation function is called.
85 """
86 self._generate_func_code()
87
88 return dict(
89 **self._extra_imports_objects,
90 REGEX_PATTERNS=self._compile_regexps,
91 re=re,
92 JsonSchemaValueException=JsonSchemaValueException,
93 )
94
95 @property
96 def global_state_code(self):
97 """
98 Returns global variables for generating function from ``func_code`` as code.
99 Includes compiled regular expressions and imports.
100 """
101 self._generate_func_code()
102
103 if not self._compile_regexps:
104 return '\n'.join(self._extra_imports_lines + [
105 'from fastjsonschema import JsonSchemaValueException',
106 '',
107 '',
108 ])
109 return '\n'.join(self._extra_imports_lines + [
110 'import re',
111 'from fastjsonschema import JsonSchemaValueException',
112 '',
113 '',
114 'REGEX_PATTERNS = ' + serialize_regexes(self._compile_regexps),
115 '',
116 ])
117
118
119 def _generate_func_code(self):
120 if not self._code:
121 self.generate_func_code()
122
123 def generate_func_code(self):
124 """
125 Creates base code of validation function and calls helper
126 for creating code by definition.
127 """
128 self.l('NoneType = type(None)')
129 # Generate parts that are referenced and not yet generated
130 while self._needed_validation_functions:
131 # During generation of validation function, could be needed to generate
132 # new one that is added again to `_needed_validation_functions`.
133 # Therefore usage of while instead of for loop.
134 uri, name = self._needed_validation_functions.popitem()
135 self.generate_validation_function(uri, name)
136
137 def generate_validation_function(self, uri, name):
138 """
139 Generate validation function for given uri with given name
140 """
141 self._validation_functions_done.add(uri)
142 self.l('')
143 with self._resolver.resolving(uri) as definition:
144 with self.l('def {}(data, custom_formats={{}}, name_prefix=None):', name):
145 self.generate_func_code_block(definition, 'data', 'data', clear_variables=True)
146 self.l('return data')
147
148 def generate_func_code_block(self, definition, variable, variable_name, clear_variables=False):
149 """
150 Creates validation rules for current definition.
151
152 Returns the number of validation rules generated as code.
153 """
154 backup = self._definition, self._variable, self._variable_name
155 self._definition, self._variable, self._variable_name = definition, variable, variable_name
156 if clear_variables:
157 backup_variables = self._variables
158 self._variables = set()
159
160 count = self._generate_func_code_block(definition)
161
162 self._definition, self._variable, self._variable_name = backup
163 if clear_variables:
164 self._variables = backup_variables
165
166 return count
167
168 def _generate_func_code_block(self, definition):
169 if not isinstance(definition, dict):
170 raise JsonSchemaDefinitionException("definition must be an object")
171 if '$ref' in definition:
172 # needed because ref overrides any sibling keywords
173 return self.generate_ref()
174 else:
175 return self.run_generate_functions(definition)
176
177 def run_generate_functions(self, definition):
178 """Returns the number of generate functions that were executed."""
179 count = 0
180 for key, func in self._json_keywords_to_function.items():
181 if key in definition:
182 func()
183 count += 1
184 return count
185
186 def generate_ref(self):
187 """
188 Ref can be link to remote or local definition.
189
190 .. code-block:: python
191
192 {'$ref': 'http://json-schema.org/draft-04/schema#'}
193 {
194 'properties': {
195 'foo': {'type': 'integer'},
196 'bar': {'$ref': '#/properties/foo'}
197 }
198 }
199 """
200 with self._resolver.in_scope(self._definition['$ref']):
201 name = self._resolver.get_scope_name()
202 uri = self._resolver.get_uri()
203 if uri not in self._validation_functions_done:
204 self._needed_validation_functions[uri] = name
205 # call validation function
206 assert self._variable_name.startswith("data")
207 path = self._variable_name[4:]
208 name_arg = '(name_prefix or "data") + "{}"'.format(path)
209 if '{' in name_arg:
210 name_arg = name_arg + '.format(**locals())'
211 self.l('{}({variable}, custom_formats, {name_arg})', name, name_arg=name_arg)
212
213
214 # pylint: disable=invalid-name
215 @indent
216 def l(self, line, *args, **kwds):
217 """
218 Short-cut of line. Used for inserting line. It's formated with parameters
219 ``variable``, ``variable_name`` (as ``name`` for short-cut), all keys from
220 current JSON schema ``definition`` and also passed arguments in ``args``
221 and named ``kwds``.
222
223 .. code-block:: python
224
225 self.l('if {variable} not in {enum}: raise JsonSchemaValueException("Wrong!")')
226
227 When you want to indent block, use it as context manager. For example:
228
229 .. code-block:: python
230
231 with self.l('if {variable} not in {enum}:'):
232 self.l('raise JsonSchemaValueException("Wrong!")')
233 """
234 spaces = ' ' * self.INDENT * self._indent
235
236 name = self._variable_name
237 if name:
238 # Add name_prefix to the name when it is being outputted.
239 assert name.startswith('data')
240 name = '" + (name_prefix or "data") + "' + name[4:]
241 if '{' in name:
242 name = name + '".format(**locals()) + "'
243
244 context = dict(
245 self._definition if self._definition and self._definition is not True else {},
246 variable=self._variable,
247 name=name,
248 **kwds
249 )
250 line = line.format(*args, **context)
251 line = line.replace('\n', '\\n').replace('\r', '\\r')
252 self._code.append(spaces + line)
253 return line
254
255 def e(self, string):
256 """
257 Short-cut of escape. Used for inserting user values into a string message.
258
259 .. code-block:: python
260
261 self.l('raise JsonSchemaValueException("Variable: {}")', self.e(variable))
262 """
263 return str(string).replace('"', '\\"')
264
265 def exc(self, msg, *args, append_to_msg=None, rule=None):
266 """
267 Short-cut for creating raising exception in the code.
268 """
269 arg = '"'+msg+'"'
270 if append_to_msg:
271 arg += ' + (' + append_to_msg + ')'
272 msg = 'raise JsonSchemaValueException('+arg+', value={variable}, name="{name}", definition={definition}, rule={rule})'
273 definition = self._expand_refs(self._definition)
274 definition_rule = self.e(definition.get(rule) if isinstance(definition, dict) else None)
275 self.l(msg, *args, definition=repr(definition), rule=repr(rule), definition_rule=definition_rule)
276
277 def _expand_refs(self, definition):
278 if isinstance(definition, list):
279 return [self._expand_refs(v) for v in definition]
280 if not isinstance(definition, dict):
281 return definition
282 if "$ref" in definition and isinstance(definition["$ref"], str):
283 with self._resolver.resolving(definition["$ref"]) as schema:
284 return schema
285 return {k: self._expand_refs(v) for k, v in definition.items()}
286
287 def create_variable_with_length(self):
288 """
289 Append code for creating variable with length of that variable
290 (for example length of list or dictionary) with name ``{variable}_len``.
291 It can be called several times and always it's done only when that variable
292 still does not exists.
293 """
294 variable_name = '{}_len'.format(self._variable)
295 if variable_name in self._variables:
296 return
297 self._variables.add(variable_name)
298 self.l('{variable}_len = len({variable})')
299
300 def create_variable_keys(self):
301 """
302 Append code for creating variable with keys of that variable (dictionary)
303 with a name ``{variable}_keys``. Similar to `create_variable_with_length`.
304 """
305 variable_name = '{}_keys'.format(self._variable)
306 if variable_name in self._variables:
307 return
308 self._variables.add(variable_name)
309 self.l('{variable}_keys = set({variable}.keys())')
310
311 def create_variable_is_list(self):
312 """
313 Append code for creating variable with bool if it's instance of list
314 with a name ``{variable}_is_list``. Similar to `create_variable_with_length`.
315 """
316 variable_name = '{}_is_list'.format(self._variable)
317 if variable_name in self._variables:
318 return
319 self._variables.add(variable_name)
320 self.l('{variable}_is_list = isinstance({variable}, (list, tuple))')
321
322 def create_variable_is_dict(self):
323 """
324 Append code for creating variable with bool if it's instance of list
325 with a name ``{variable}_is_dict``. Similar to `create_variable_with_length`.
326 """
327 variable_name = '{}_is_dict'.format(self._variable)
328 if variable_name in self._variables:
329 return
330 self._variables.add(variable_name)
331 self.l('{variable}_is_dict = isinstance({variable}, dict)')
332
333
334def serialize_regexes(patterns_dict):
335 # Unfortunately using `pprint.pformat` is causing errors
336 # specially with big regexes
337 regex_patterns = (
338 repr(k) + ": " + repr_regex(v)
339 for k, v in patterns_dict.items()
340 )
341 return '{\n ' + ",\n ".join(regex_patterns) + "\n}"
342
343
344def repr_regex(regex):
345 all_flags = ("A", "I", "DEBUG", "L", "M", "S", "X")
346 flags = " | ".join(f"re.{f}" for f in all_flags if regex.flags & getattr(re, f))
347 flags = ", " + flags if flags else ""
348 return "re.compile({!r}{})".format(regex.pattern, flags)