Coverage for /pythoncovmergedfiles/medio/medio/src/jsonschema/jsonschema/_utils.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

189 statements  

1from collections.abc import Mapping, MutableMapping, Sequence 

2from urllib.parse import urlsplit 

3import re 

4 

5# Module-level sentinels so recursive `_uniq_key` calls produce comparable 

6# keys for nested True/False (function-default sentinels would also work, but 

7# this makes the intent explicit). 

8_TRUE = object() 

9_FALSE = object() 

10_UNSUPPORTED = object() 

11 

12 

13class URIDict(MutableMapping): 

14 """ 

15 Dictionary which uses normalized URIs as keys. 

16 """ 

17 

18 def normalize(self, uri): 

19 return urlsplit(uri).geturl() 

20 

21 def __init__(self, *args, **kwargs): 

22 self.store = dict() 

23 self.store.update(*args, **kwargs) 

24 

25 def __getitem__(self, uri): 

26 return self.store[self.normalize(uri)] 

27 

28 def __setitem__(self, uri, value): 

29 self.store[self.normalize(uri)] = value 

30 

31 def __delitem__(self, uri): 

32 del self.store[self.normalize(uri)] 

33 

34 def __iter__(self): 

35 return iter(self.store) 

36 

37 def __len__(self): # pragma: no cover -- untested, but to be removed 

38 return len(self.store) 

39 

40 def __repr__(self): # pragma: no cover -- untested, but to be removed 

41 return repr(self.store) 

42 

43 

44class Unset: 

45 """ 

46 An as-of-yet unset attribute or unprovided default parameter. 

47 """ 

48 

49 def __repr__(self): # pragma: no cover 

50 return "<unset>" 

51 

52 

53def format_as_index(container, indices): 

54 """ 

55 Construct a single string containing indexing operations for the indices. 

56 

57 For example for a container ``bar``, [1, 2, "foo"] -> bar[1][2]["foo"] 

58 

59 Arguments: 

60 

61 container (str): 

62 

63 A word to use for the thing being indexed 

64 

65 indices (sequence): 

66 

67 The indices to format. 

68 

69 """ 

70 if not indices: 

71 return container 

72 return f"{container}[{']['.join(repr(index) for index in indices)}]" 

73 

74 

75def find_additional_properties(instance, schema): 

76 """ 

77 Return the set of additional properties for the given ``instance``. 

78 

79 Weeds out properties that should have been validated by ``properties`` and 

80 / or ``patternProperties``. 

81 

82 Assumes ``instance`` is dict-like already. 

83 """ 

84 properties = schema.get("properties", {}) 

85 patterns = "|".join(schema.get("patternProperties", {})) 

86 for property in instance: 

87 if property not in properties: 

88 if patterns and re.search(patterns, property): 

89 continue 

90 yield property 

91 

92 

93def extras_msg(extras): 

94 """ 

95 Create an error message for extra items or properties. 

96 """ 

97 verb = "was" if len(extras) == 1 else "were" 

98 return ", ".join(repr(extra) for extra in extras), verb 

99 

100 

101def ensure_list(thing): 

102 """ 

103 Wrap ``thing`` in a list if it's a single str. 

104 

105 Otherwise, return it unchanged. 

106 """ 

107 if isinstance(thing, str): 

108 return [thing] 

109 return thing 

110 

111 

112def _mapping_equal(one, two): 

113 """ 

114 Check if two mappings are equal using the semantics of `equal`. 

115 """ 

116 if len(one) != len(two): 

117 return False 

118 return all( 

119 key in two and equal(value, two[key]) 

120 for key, value in one.items() 

121 ) 

122 

123 

124def _sequence_equal(one, two): 

125 """ 

126 Check if two sequences are equal using the semantics of `equal`. 

127 """ 

128 if len(one) != len(two): 

129 return False 

130 return all(equal(i, j) for i, j in zip(one, two)) 

131 

132 

133def equal(one, two): 

134 """ 

135 Check if two things are equal evading some Python type hierarchy semantics. 

136 

137 Specifically in JSON Schema, evade `bool` inheriting from `int`, 

138 recursing into sequences to do the same. 

139 """ 

140 if one is two: 

141 return True 

142 if isinstance(one, str) or isinstance(two, str): 

143 return one == two 

144 if isinstance(one, Sequence) and isinstance(two, Sequence): 

145 return _sequence_equal(one, two) 

146 if isinstance(one, Mapping) and isinstance(two, Mapping): 

147 return _mapping_equal(one, two) 

148 return unbool(one) == unbool(two) 

149 

150 

151def unbool(element, true=object(), false=object()): 

152 """ 

153 A hack to make True and 1 and False and 0 unique for ``uniq``. 

154 """ 

155 if element is True: 

156 return true 

157 elif element is False: 

158 return false 

159 return element 

160 

161 

162def _uniq_key(element): 

163 """ 

164 Convert an element into a hashable key compatible with `equal`. 

165 

166 Returns `_UNSUPPORTED` when an element cannot be canonically hashed. 

167 """ 

168 # NaN never equals itself, so it can't be deduplicated via hashing. 

169 # Some custom container equality implementations can also raise here 

170 # (e.g. mappings containing unhashable keys), in which case we fall back 

171 # to brute force as well. 

172 try: 

173 if element != element: # noqa: PLR0124 -- NaN detection 

174 return _UNSUPPORTED 

175 except TypeError: 

176 return _UNSUPPORTED 

177 

178 element = unbool(element, true=_TRUE, false=_FALSE) 

179 

180 # Tagged tuples ("scalar"/"sequence"/"mapping", ...) prevent hash 

181 # collisions between values of different shapes. 

182 if isinstance(element, Sequence) and not isinstance(element, str): 

183 values = [] 

184 for each in element: 

185 key = _uniq_key(each) 

186 if key is _UNSUPPORTED: 

187 return _UNSUPPORTED 

188 values.append(key) 

189 return "sequence", tuple(values) 

190 

191 if isinstance(element, Mapping): 

192 items = [] 

193 for key, value in element.items(): 

194 value_key = _uniq_key(value) 

195 if value_key is _UNSUPPORTED: 

196 return _UNSUPPORTED 

197 items.append((key, value_key)) 

198 try: 

199 return "mapping", frozenset(items) 

200 except TypeError: 

201 # Unhashable mapping key — fall back to brute force. 

202 return _UNSUPPORTED 

203 

204 try: 

205 hash(element) 

206 except TypeError: 

207 return _UNSUPPORTED 

208 

209 return "scalar", element 

210 

211 

212def uniq(container): 

213 """ 

214 Check if all of a container's elements are unique. 

215 

216 Tries to use a structural hash compatible with `equal`, falling back to 

217 brute force when necessary. 

218 """ 

219 seen_keys = set() 

220 unsupported = [] 

221 

222 for element in container: 

223 key = _uniq_key(element) 

224 

225 if key is _UNSUPPORTED: 

226 for previous in unsupported: 

227 if equal(previous, element): 

228 return False 

229 unsupported.append(element) 

230 continue 

231 

232 if key in seen_keys: 

233 return False 

234 

235 seen_keys.add(key) 

236 

237 return True 

238 

239 

240def find_evaluated_item_indexes_by_schema(validator, instance, schema): 

241 """ 

242 Get all indexes of items that get evaluated under the current schema. 

243 

244 Covers all keywords related to unevaluatedItems: items, prefixItems, if, 

245 then, else, contains, unevaluatedItems, allOf, oneOf, anyOf 

246 """ 

247 if validator.is_type(schema, "boolean"): 

248 return [] 

249 evaluated_indexes = [] 

250 

251 if "items" in schema: 

252 return list(range(len(instance))) 

253 

254 ref = schema.get("$ref") 

255 if ref is not None: 

256 resolved = validator._resolver.lookup(ref) 

257 evaluated_indexes.extend( 

258 find_evaluated_item_indexes_by_schema( 

259 validator.evolve( 

260 schema=resolved.contents, 

261 _resolver=resolved.resolver, 

262 ), 

263 instance, 

264 resolved.contents, 

265 ), 

266 ) 

267 

268 dynamicRef = schema.get("$dynamicRef") 

269 if dynamicRef is not None: 

270 resolved = validator._resolver.lookup(dynamicRef) 

271 evaluated_indexes.extend( 

272 find_evaluated_item_indexes_by_schema( 

273 validator.evolve( 

274 schema=resolved.contents, 

275 _resolver=resolved.resolver, 

276 ), 

277 instance, 

278 resolved.contents, 

279 ), 

280 ) 

281 

282 if "prefixItems" in schema: 

283 evaluated_indexes += list(range(len(schema["prefixItems"]))) 

284 

285 if "if" in schema: 

286 if validator.evolve(schema=schema["if"]).is_valid(instance): 

287 evaluated_indexes += find_evaluated_item_indexes_by_schema( 

288 validator, instance, schema["if"], 

289 ) 

290 if "then" in schema: 

291 evaluated_indexes += find_evaluated_item_indexes_by_schema( 

292 validator, instance, schema["then"], 

293 ) 

294 elif "else" in schema: 

295 evaluated_indexes += find_evaluated_item_indexes_by_schema( 

296 validator, instance, schema["else"], 

297 ) 

298 

299 for keyword in ["contains", "unevaluatedItems"]: 

300 if keyword in schema: 

301 for k, v in enumerate(instance): 

302 if validator.evolve(schema=schema[keyword]).is_valid(v): 

303 evaluated_indexes.append(k) 

304 

305 for keyword in ["allOf", "oneOf", "anyOf"]: 

306 if keyword in schema: 

307 for subschema in schema[keyword]: 

308 errs = next(validator.descend(instance, subschema), None) 

309 if errs is None: 

310 evaluated_indexes += find_evaluated_item_indexes_by_schema( 

311 validator, instance, subschema, 

312 ) 

313 

314 return evaluated_indexes 

315 

316 

317def find_evaluated_property_keys_by_schema(validator, instance, schema): 

318 """ 

319 Get all keys of items that get evaluated under the current schema. 

320 

321 Covers all keywords related to unevaluatedProperties: properties, 

322 additionalProperties, unevaluatedProperties, patternProperties, 

323 dependentSchemas, allOf, oneOf, anyOf, if, then, else 

324 """ 

325 if validator.is_type(schema, "boolean"): 

326 return [] 

327 evaluated_keys = [] 

328 

329 ref = schema.get("$ref") 

330 if ref is not None: 

331 resolved = validator._resolver.lookup(ref) 

332 evaluated_keys.extend( 

333 find_evaluated_property_keys_by_schema( 

334 validator.evolve( 

335 schema=resolved.contents, 

336 _resolver=resolved.resolver, 

337 ), 

338 instance, 

339 resolved.contents, 

340 ), 

341 ) 

342 

343 dynamicRef = schema.get("$dynamicRef") 

344 if dynamicRef is not None: 

345 resolved = validator._resolver.lookup(dynamicRef) 

346 evaluated_keys.extend( 

347 find_evaluated_property_keys_by_schema( 

348 validator.evolve( 

349 schema=resolved.contents, 

350 _resolver=resolved.resolver, 

351 ), 

352 instance, 

353 resolved.contents, 

354 ), 

355 ) 

356 

357 properties = schema.get("properties") 

358 if validator.is_type(properties, "object"): 

359 evaluated_keys += properties.keys() & instance.keys() 

360 

361 for keyword in ["additionalProperties", "unevaluatedProperties"]: 

362 if (subschema := schema.get(keyword)) is None: 

363 continue 

364 evaluated_keys += ( 

365 key 

366 for key, value in instance.items() 

367 if is_valid(validator.descend(value, subschema)) 

368 ) 

369 

370 if "patternProperties" in schema: 

371 for property in instance: 

372 for pattern in schema["patternProperties"]: 

373 if re.search(pattern, property): 

374 evaluated_keys.append(property) 

375 

376 if "dependentSchemas" in schema: 

377 for property, subschema in schema["dependentSchemas"].items(): 

378 if property not in instance: 

379 continue 

380 evaluated_keys += find_evaluated_property_keys_by_schema( 

381 validator, instance, subschema, 

382 ) 

383 

384 for keyword in ["allOf", "oneOf", "anyOf"]: 

385 for subschema in schema.get(keyword, []): 

386 if not is_valid(validator.descend(instance, subschema)): 

387 continue 

388 evaluated_keys += find_evaluated_property_keys_by_schema( 

389 validator, instance, subschema, 

390 ) 

391 

392 if "if" in schema: 

393 if validator.evolve(schema=schema["if"]).is_valid(instance): 

394 evaluated_keys += find_evaluated_property_keys_by_schema( 

395 validator, instance, schema["if"], 

396 ) 

397 if "then" in schema: 

398 evaluated_keys += find_evaluated_property_keys_by_schema( 

399 validator, instance, schema["then"], 

400 ) 

401 elif "else" in schema: 

402 evaluated_keys += find_evaluated_property_keys_by_schema( 

403 validator, instance, schema["else"], 

404 ) 

405 

406 return evaluated_keys 

407 

408 

409def is_valid(errs_it): 

410 """Whether there are no errors in the given iterator.""" 

411 return next(errs_it, None) is None