Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jmespath/functions.py: 38%
228 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1import math
2import json
4from jmespath import exceptions
5from jmespath.compat import string_type as STRING_TYPE
6from jmespath.compat import get_methods
9# python types -> jmespath types
10TYPES_MAP = {
11 'bool': 'boolean',
12 'list': 'array',
13 'dict': 'object',
14 'NoneType': 'null',
15 'unicode': 'string',
16 'str': 'string',
17 'float': 'number',
18 'int': 'number',
19 'long': 'number',
20 'OrderedDict': 'object',
21 '_Projection': 'array',
22 '_Expression': 'expref',
23}
26# jmespath types -> python types
27REVERSE_TYPES_MAP = {
28 'boolean': ('bool',),
29 'array': ('list', '_Projection'),
30 'object': ('dict', 'OrderedDict',),
31 'null': ('NoneType',),
32 'string': ('unicode', 'str'),
33 'number': ('float', 'int', 'long'),
34 'expref': ('_Expression',),
35}
38def signature(*arguments):
39 def _record_signature(func):
40 func.signature = arguments
41 return func
42 return _record_signature
45class FunctionRegistry(type):
46 def __init__(cls, name, bases, attrs):
47 cls._populate_function_table()
48 super(FunctionRegistry, cls).__init__(name, bases, attrs)
50 def _populate_function_table(cls):
51 function_table = {}
52 # Any method with a @signature decorator that also
53 # starts with "_func_" is registered as a function.
54 # _func_max_by -> max_by function.
55 for name, method in get_methods(cls):
56 if not name.startswith('_func_'):
57 continue
58 signature = getattr(method, 'signature', None)
59 if signature is not None:
60 function_table[name[6:]] = {
61 'function': method,
62 'signature': signature,
63 }
64 cls.FUNCTION_TABLE = function_table
67class Functions(metaclass=FunctionRegistry):
69 FUNCTION_TABLE = {
70 }
72 def call_function(self, function_name, resolved_args):
73 try:
74 spec = self.FUNCTION_TABLE[function_name]
75 except KeyError:
76 raise exceptions.UnknownFunctionError(
77 "Unknown function: %s()" % function_name)
78 function = spec['function']
79 signature = spec['signature']
80 self._validate_arguments(resolved_args, signature, function_name)
81 return function(self, *resolved_args)
83 def _validate_arguments(self, args, signature, function_name):
84 if signature and signature[-1].get('variadic'):
85 if len(args) < len(signature):
86 raise exceptions.VariadictArityError(
87 len(signature), len(args), function_name)
88 elif len(args) != len(signature):
89 raise exceptions.ArityError(
90 len(signature), len(args), function_name)
91 return self._type_check(args, signature, function_name)
93 def _type_check(self, actual, signature, function_name):
94 for i in range(len(signature)):
95 allowed_types = signature[i]['types']
96 if allowed_types:
97 self._type_check_single(actual[i], allowed_types,
98 function_name)
100 def _type_check_single(self, current, types, function_name):
101 # Type checking involves checking the top level type,
102 # and in the case of arrays, potentially checking the types
103 # of each element.
104 allowed_types, allowed_subtypes = self._get_allowed_pytypes(types)
105 # We're not using isinstance() on purpose.
106 # The type model for jmespath does not map
107 # 1-1 with python types (booleans are considered
108 # integers in python for example).
109 actual_typename = type(current).__name__
110 if actual_typename not in allowed_types:
111 raise exceptions.JMESPathTypeError(
112 function_name, current,
113 self._convert_to_jmespath_type(actual_typename), types)
114 # If we're dealing with a list type, we can have
115 # additional restrictions on the type of the list
116 # elements (for example a function can require a
117 # list of numbers or a list of strings).
118 # Arrays are the only types that can have subtypes.
119 if allowed_subtypes:
120 self._subtype_check(current, allowed_subtypes,
121 types, function_name)
123 def _get_allowed_pytypes(self, types):
124 allowed_types = []
125 allowed_subtypes = []
126 for t in types:
127 type_ = t.split('-', 1)
128 if len(type_) == 2:
129 type_, subtype = type_
130 allowed_subtypes.append(REVERSE_TYPES_MAP[subtype])
131 else:
132 type_ = type_[0]
133 allowed_types.extend(REVERSE_TYPES_MAP[type_])
134 return allowed_types, allowed_subtypes
136 def _subtype_check(self, current, allowed_subtypes, types, function_name):
137 if len(allowed_subtypes) == 1:
138 # The easy case, we know up front what type
139 # we need to validate.
140 allowed_subtypes = allowed_subtypes[0]
141 for element in current:
142 actual_typename = type(element).__name__
143 if actual_typename not in allowed_subtypes:
144 raise exceptions.JMESPathTypeError(
145 function_name, element, actual_typename, types)
146 elif len(allowed_subtypes) > 1 and current:
147 # Dynamic type validation. Based on the first
148 # type we see, we validate that the remaining types
149 # match.
150 first = type(current[0]).__name__
151 for subtypes in allowed_subtypes:
152 if first in subtypes:
153 allowed = subtypes
154 break
155 else:
156 raise exceptions.JMESPathTypeError(
157 function_name, current[0], first, types)
158 for element in current:
159 actual_typename = type(element).__name__
160 if actual_typename not in allowed:
161 raise exceptions.JMESPathTypeError(
162 function_name, element, actual_typename, types)
164 @signature({'types': ['number']})
165 def _func_abs(self, arg):
166 return abs(arg)
168 @signature({'types': ['array-number']})
169 def _func_avg(self, arg):
170 if arg:
171 return sum(arg) / float(len(arg))
172 else:
173 return None
175 @signature({'types': [], 'variadic': True})
176 def _func_not_null(self, *arguments):
177 for argument in arguments:
178 if argument is not None:
179 return argument
181 @signature({'types': []})
182 def _func_to_array(self, arg):
183 if isinstance(arg, list):
184 return arg
185 else:
186 return [arg]
188 @signature({'types': []})
189 def _func_to_string(self, arg):
190 if isinstance(arg, STRING_TYPE):
191 return arg
192 else:
193 return json.dumps(arg, separators=(',', ':'),
194 default=str)
196 @signature({'types': []})
197 def _func_to_number(self, arg):
198 if isinstance(arg, (list, dict, bool)):
199 return None
200 elif arg is None:
201 return None
202 elif isinstance(arg, (int, float)):
203 return arg
204 else:
205 try:
206 return int(arg)
207 except ValueError:
208 try:
209 return float(arg)
210 except ValueError:
211 return None
213 @signature({'types': ['array', 'string']}, {'types': []})
214 def _func_contains(self, subject, search):
215 return search in subject
217 @signature({'types': ['string', 'array', 'object']})
218 def _func_length(self, arg):
219 return len(arg)
221 @signature({'types': ['string']}, {'types': ['string']})
222 def _func_ends_with(self, search, suffix):
223 return search.endswith(suffix)
225 @signature({'types': ['string']}, {'types': ['string']})
226 def _func_starts_with(self, search, suffix):
227 return search.startswith(suffix)
229 @signature({'types': ['array', 'string']})
230 def _func_reverse(self, arg):
231 if isinstance(arg, STRING_TYPE):
232 return arg[::-1]
233 else:
234 return list(reversed(arg))
236 @signature({"types": ['number']})
237 def _func_ceil(self, arg):
238 return math.ceil(arg)
240 @signature({"types": ['number']})
241 def _func_floor(self, arg):
242 return math.floor(arg)
244 @signature({"types": ['string']}, {"types": ['array-string']})
245 def _func_join(self, separator, array):
246 return separator.join(array)
248 @signature({'types': ['expref']}, {'types': ['array']})
249 def _func_map(self, expref, arg):
250 result = []
251 for element in arg:
252 result.append(expref.visit(expref.expression, element))
253 return result
255 @signature({"types": ['array-number', 'array-string']})
256 def _func_max(self, arg):
257 if arg:
258 return max(arg)
259 else:
260 return None
262 @signature({"types": ["object"], "variadic": True})
263 def _func_merge(self, *arguments):
264 merged = {}
265 for arg in arguments:
266 merged.update(arg)
267 return merged
269 @signature({"types": ['array-number', 'array-string']})
270 def _func_min(self, arg):
271 if arg:
272 return min(arg)
273 else:
274 return None
276 @signature({"types": ['array-string', 'array-number']})
277 def _func_sort(self, arg):
278 return list(sorted(arg))
280 @signature({"types": ['array-number']})
281 def _func_sum(self, arg):
282 return sum(arg)
284 @signature({"types": ['object']})
285 def _func_keys(self, arg):
286 # To be consistent with .values()
287 # should we also return the indices of a list?
288 return list(arg.keys())
290 @signature({"types": ['object']})
291 def _func_values(self, arg):
292 return list(arg.values())
294 @signature({'types': []})
295 def _func_type(self, arg):
296 if isinstance(arg, STRING_TYPE):
297 return "string"
298 elif isinstance(arg, bool):
299 return "boolean"
300 elif isinstance(arg, list):
301 return "array"
302 elif isinstance(arg, dict):
303 return "object"
304 elif isinstance(arg, (float, int)):
305 return "number"
306 elif arg is None:
307 return "null"
309 @signature({'types': ['array']}, {'types': ['expref']})
310 def _func_sort_by(self, array, expref):
311 if not array:
312 return array
313 # sort_by allows for the expref to be either a number of
314 # a string, so we have some special logic to handle this.
315 # We evaluate the first array element and verify that it's
316 # either a string of a number. We then create a key function
317 # that validates that type, which requires that remaining array
318 # elements resolve to the same type as the first element.
319 required_type = self._convert_to_jmespath_type(
320 type(expref.visit(expref.expression, array[0])).__name__)
321 if required_type not in ['number', 'string']:
322 raise exceptions.JMESPathTypeError(
323 'sort_by', array[0], required_type, ['string', 'number'])
324 keyfunc = self._create_key_func(expref,
325 [required_type],
326 'sort_by')
327 return list(sorted(array, key=keyfunc))
329 @signature({'types': ['array']}, {'types': ['expref']})
330 def _func_min_by(self, array, expref):
331 keyfunc = self._create_key_func(expref,
332 ['number', 'string'],
333 'min_by')
334 if array:
335 return min(array, key=keyfunc)
336 else:
337 return None
339 @signature({'types': ['array']}, {'types': ['expref']})
340 def _func_max_by(self, array, expref):
341 keyfunc = self._create_key_func(expref,
342 ['number', 'string'],
343 'max_by')
344 if array:
345 return max(array, key=keyfunc)
346 else:
347 return None
349 def _create_key_func(self, expref, allowed_types, function_name):
350 def keyfunc(x):
351 result = expref.visit(expref.expression, x)
352 actual_typename = type(result).__name__
353 jmespath_type = self._convert_to_jmespath_type(actual_typename)
354 # allowed_types is in term of jmespath types, not python types.
355 if jmespath_type not in allowed_types:
356 raise exceptions.JMESPathTypeError(
357 function_name, result, jmespath_type, allowed_types)
358 return result
359 return keyfunc
361 def _convert_to_jmespath_type(self, pyobject):
362 return TYPES_MAP.get(pyobject, 'unknown')