Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jmespath/functions.py: 38%

228 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1import math 

2import json 

3 

4from jmespath import exceptions 

5from jmespath.compat import string_type as STRING_TYPE 

6from jmespath.compat import get_methods 

7 

8 

9# python types -> jmespath types 

10TYPES_MAP = { 

11 'bool': 'boolean', 

12 'list': 'array', 

13 'dict': 'object', 

14 'NoneType': 'null', 

15 'unicode': 'string', 

16 'str': 'string', 

17 'float': 'number', 

18 'int': 'number', 

19 'long': 'number', 

20 'OrderedDict': 'object', 

21 '_Projection': 'array', 

22 '_Expression': 'expref', 

23} 

24 

25 

26# jmespath types -> python types 

27REVERSE_TYPES_MAP = { 

28 'boolean': ('bool',), 

29 'array': ('list', '_Projection'), 

30 'object': ('dict', 'OrderedDict',), 

31 'null': ('NoneType',), 

32 'string': ('unicode', 'str'), 

33 'number': ('float', 'int', 'long'), 

34 'expref': ('_Expression',), 

35} 

36 

37 

38def signature(*arguments): 

39 def _record_signature(func): 

40 func.signature = arguments 

41 return func 

42 return _record_signature 

43 

44 

45class FunctionRegistry(type): 

46 def __init__(cls, name, bases, attrs): 

47 cls._populate_function_table() 

48 super(FunctionRegistry, cls).__init__(name, bases, attrs) 

49 

50 def _populate_function_table(cls): 

51 function_table = {} 

52 # Any method with a @signature decorator that also 

53 # starts with "_func_" is registered as a function. 

54 # _func_max_by -> max_by function. 

55 for name, method in get_methods(cls): 

56 if not name.startswith('_func_'): 

57 continue 

58 signature = getattr(method, 'signature', None) 

59 if signature is not None: 

60 function_table[name[6:]] = { 

61 'function': method, 

62 'signature': signature, 

63 } 

64 cls.FUNCTION_TABLE = function_table 

65 

66 

67class Functions(metaclass=FunctionRegistry): 

68 

69 FUNCTION_TABLE = { 

70 } 

71 

72 def call_function(self, function_name, resolved_args): 

73 try: 

74 spec = self.FUNCTION_TABLE[function_name] 

75 except KeyError: 

76 raise exceptions.UnknownFunctionError( 

77 "Unknown function: %s()" % function_name) 

78 function = spec['function'] 

79 signature = spec['signature'] 

80 self._validate_arguments(resolved_args, signature, function_name) 

81 return function(self, *resolved_args) 

82 

83 def _validate_arguments(self, args, signature, function_name): 

84 if signature and signature[-1].get('variadic'): 

85 if len(args) < len(signature): 

86 raise exceptions.VariadictArityError( 

87 len(signature), len(args), function_name) 

88 elif len(args) != len(signature): 

89 raise exceptions.ArityError( 

90 len(signature), len(args), function_name) 

91 return self._type_check(args, signature, function_name) 

92 

93 def _type_check(self, actual, signature, function_name): 

94 for i in range(len(signature)): 

95 allowed_types = signature[i]['types'] 

96 if allowed_types: 

97 self._type_check_single(actual[i], allowed_types, 

98 function_name) 

99 

100 def _type_check_single(self, current, types, function_name): 

101 # Type checking involves checking the top level type, 

102 # and in the case of arrays, potentially checking the types 

103 # of each element. 

104 allowed_types, allowed_subtypes = self._get_allowed_pytypes(types) 

105 # We're not using isinstance() on purpose. 

106 # The type model for jmespath does not map 

107 # 1-1 with python types (booleans are considered 

108 # integers in python for example). 

109 actual_typename = type(current).__name__ 

110 if actual_typename not in allowed_types: 

111 raise exceptions.JMESPathTypeError( 

112 function_name, current, 

113 self._convert_to_jmespath_type(actual_typename), types) 

114 # If we're dealing with a list type, we can have 

115 # additional restrictions on the type of the list 

116 # elements (for example a function can require a 

117 # list of numbers or a list of strings). 

118 # Arrays are the only types that can have subtypes. 

119 if allowed_subtypes: 

120 self._subtype_check(current, allowed_subtypes, 

121 types, function_name) 

122 

123 def _get_allowed_pytypes(self, types): 

124 allowed_types = [] 

125 allowed_subtypes = [] 

126 for t in types: 

127 type_ = t.split('-', 1) 

128 if len(type_) == 2: 

129 type_, subtype = type_ 

130 allowed_subtypes.append(REVERSE_TYPES_MAP[subtype]) 

131 else: 

132 type_ = type_[0] 

133 allowed_types.extend(REVERSE_TYPES_MAP[type_]) 

134 return allowed_types, allowed_subtypes 

135 

136 def _subtype_check(self, current, allowed_subtypes, types, function_name): 

137 if len(allowed_subtypes) == 1: 

138 # The easy case, we know up front what type 

139 # we need to validate. 

140 allowed_subtypes = allowed_subtypes[0] 

141 for element in current: 

142 actual_typename = type(element).__name__ 

143 if actual_typename not in allowed_subtypes: 

144 raise exceptions.JMESPathTypeError( 

145 function_name, element, actual_typename, types) 

146 elif len(allowed_subtypes) > 1 and current: 

147 # Dynamic type validation. Based on the first 

148 # type we see, we validate that the remaining types 

149 # match. 

150 first = type(current[0]).__name__ 

151 for subtypes in allowed_subtypes: 

152 if first in subtypes: 

153 allowed = subtypes 

154 break 

155 else: 

156 raise exceptions.JMESPathTypeError( 

157 function_name, current[0], first, types) 

158 for element in current: 

159 actual_typename = type(element).__name__ 

160 if actual_typename not in allowed: 

161 raise exceptions.JMESPathTypeError( 

162 function_name, element, actual_typename, types) 

163 

164 @signature({'types': ['number']}) 

165 def _func_abs(self, arg): 

166 return abs(arg) 

167 

168 @signature({'types': ['array-number']}) 

169 def _func_avg(self, arg): 

170 if arg: 

171 return sum(arg) / float(len(arg)) 

172 else: 

173 return None 

174 

175 @signature({'types': [], 'variadic': True}) 

176 def _func_not_null(self, *arguments): 

177 for argument in arguments: 

178 if argument is not None: 

179 return argument 

180 

181 @signature({'types': []}) 

182 def _func_to_array(self, arg): 

183 if isinstance(arg, list): 

184 return arg 

185 else: 

186 return [arg] 

187 

188 @signature({'types': []}) 

189 def _func_to_string(self, arg): 

190 if isinstance(arg, STRING_TYPE): 

191 return arg 

192 else: 

193 return json.dumps(arg, separators=(',', ':'), 

194 default=str) 

195 

196 @signature({'types': []}) 

197 def _func_to_number(self, arg): 

198 if isinstance(arg, (list, dict, bool)): 

199 return None 

200 elif arg is None: 

201 return None 

202 elif isinstance(arg, (int, float)): 

203 return arg 

204 else: 

205 try: 

206 return int(arg) 

207 except ValueError: 

208 try: 

209 return float(arg) 

210 except ValueError: 

211 return None 

212 

213 @signature({'types': ['array', 'string']}, {'types': []}) 

214 def _func_contains(self, subject, search): 

215 return search in subject 

216 

217 @signature({'types': ['string', 'array', 'object']}) 

218 def _func_length(self, arg): 

219 return len(arg) 

220 

221 @signature({'types': ['string']}, {'types': ['string']}) 

222 def _func_ends_with(self, search, suffix): 

223 return search.endswith(suffix) 

224 

225 @signature({'types': ['string']}, {'types': ['string']}) 

226 def _func_starts_with(self, search, suffix): 

227 return search.startswith(suffix) 

228 

229 @signature({'types': ['array', 'string']}) 

230 def _func_reverse(self, arg): 

231 if isinstance(arg, STRING_TYPE): 

232 return arg[::-1] 

233 else: 

234 return list(reversed(arg)) 

235 

236 @signature({"types": ['number']}) 

237 def _func_ceil(self, arg): 

238 return math.ceil(arg) 

239 

240 @signature({"types": ['number']}) 

241 def _func_floor(self, arg): 

242 return math.floor(arg) 

243 

244 @signature({"types": ['string']}, {"types": ['array-string']}) 

245 def _func_join(self, separator, array): 

246 return separator.join(array) 

247 

248 @signature({'types': ['expref']}, {'types': ['array']}) 

249 def _func_map(self, expref, arg): 

250 result = [] 

251 for element in arg: 

252 result.append(expref.visit(expref.expression, element)) 

253 return result 

254 

255 @signature({"types": ['array-number', 'array-string']}) 

256 def _func_max(self, arg): 

257 if arg: 

258 return max(arg) 

259 else: 

260 return None 

261 

262 @signature({"types": ["object"], "variadic": True}) 

263 def _func_merge(self, *arguments): 

264 merged = {} 

265 for arg in arguments: 

266 merged.update(arg) 

267 return merged 

268 

269 @signature({"types": ['array-number', 'array-string']}) 

270 def _func_min(self, arg): 

271 if arg: 

272 return min(arg) 

273 else: 

274 return None 

275 

276 @signature({"types": ['array-string', 'array-number']}) 

277 def _func_sort(self, arg): 

278 return list(sorted(arg)) 

279 

280 @signature({"types": ['array-number']}) 

281 def _func_sum(self, arg): 

282 return sum(arg) 

283 

284 @signature({"types": ['object']}) 

285 def _func_keys(self, arg): 

286 # To be consistent with .values() 

287 # should we also return the indices of a list? 

288 return list(arg.keys()) 

289 

290 @signature({"types": ['object']}) 

291 def _func_values(self, arg): 

292 return list(arg.values()) 

293 

294 @signature({'types': []}) 

295 def _func_type(self, arg): 

296 if isinstance(arg, STRING_TYPE): 

297 return "string" 

298 elif isinstance(arg, bool): 

299 return "boolean" 

300 elif isinstance(arg, list): 

301 return "array" 

302 elif isinstance(arg, dict): 

303 return "object" 

304 elif isinstance(arg, (float, int)): 

305 return "number" 

306 elif arg is None: 

307 return "null" 

308 

309 @signature({'types': ['array']}, {'types': ['expref']}) 

310 def _func_sort_by(self, array, expref): 

311 if not array: 

312 return array 

313 # sort_by allows for the expref to be either a number of 

314 # a string, so we have some special logic to handle this. 

315 # We evaluate the first array element and verify that it's 

316 # either a string of a number. We then create a key function 

317 # that validates that type, which requires that remaining array 

318 # elements resolve to the same type as the first element. 

319 required_type = self._convert_to_jmespath_type( 

320 type(expref.visit(expref.expression, array[0])).__name__) 

321 if required_type not in ['number', 'string']: 

322 raise exceptions.JMESPathTypeError( 

323 'sort_by', array[0], required_type, ['string', 'number']) 

324 keyfunc = self._create_key_func(expref, 

325 [required_type], 

326 'sort_by') 

327 return list(sorted(array, key=keyfunc)) 

328 

329 @signature({'types': ['array']}, {'types': ['expref']}) 

330 def _func_min_by(self, array, expref): 

331 keyfunc = self._create_key_func(expref, 

332 ['number', 'string'], 

333 'min_by') 

334 if array: 

335 return min(array, key=keyfunc) 

336 else: 

337 return None 

338 

339 @signature({'types': ['array']}, {'types': ['expref']}) 

340 def _func_max_by(self, array, expref): 

341 keyfunc = self._create_key_func(expref, 

342 ['number', 'string'], 

343 'max_by') 

344 if array: 

345 return max(array, key=keyfunc) 

346 else: 

347 return None 

348 

349 def _create_key_func(self, expref, allowed_types, function_name): 

350 def keyfunc(x): 

351 result = expref.visit(expref.expression, x) 

352 actual_typename = type(result).__name__ 

353 jmespath_type = self._convert_to_jmespath_type(actual_typename) 

354 # allowed_types is in term of jmespath types, not python types. 

355 if jmespath_type not in allowed_types: 

356 raise exceptions.JMESPathTypeError( 

357 function_name, result, jmespath_type, allowed_types) 

358 return result 

359 return keyfunc 

360 

361 def _convert_to_jmespath_type(self, pyobject): 

362 return TYPES_MAP.get(pyobject, 'unknown')