Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/jsonschema.py: 80%
212 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:30 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:30 +0000
1"""
2Referencing implementations for JSON Schema specs (historic & current).
3"""
5from __future__ import annotations
7from collections.abc import Sequence, Set
8from typing import Any, Iterable, Union
10from referencing import Anchor, Registry, Resource, Specification, exceptions
11from referencing._attrs import frozen
12from referencing._core import Resolved as _Resolved, Resolver as _Resolver
13from referencing.typing import URI, Anchor as AnchorType, Mapping
15#: A JSON Schema which is a JSON object
16ObjectSchema = Mapping[str, Any]
18#: A JSON Schema of any kind
19Schema = Union[bool, ObjectSchema]
21#: A JSON Schema Registry
22SchemaRegistry = Registry[Schema]
25@frozen
26class UnknownDialect(Exception):
27 """
28 A dialect identifier was found for a dialect unknown by this library.
30 If it's a custom ("unofficial") dialect, be sure you've registered it.
31 """
33 uri: URI
36def _dollar_id(contents: Schema) -> URI | None:
37 if isinstance(contents, bool):
38 return
39 return contents.get("$id")
42def _legacy_dollar_id(contents: Schema) -> URI | None:
43 if isinstance(contents, bool) or "$ref" in contents:
44 return
45 id = contents.get("$id")
46 if id is not None and not id.startswith("#"):
47 return id
50def _legacy_id(contents: ObjectSchema) -> URI | None:
51 if "$ref" in contents:
52 return
53 id = contents.get("id")
54 if id is not None and not id.startswith("#"):
55 return id
58def _anchor(
59 specification: Specification[Schema],
60 contents: Schema,
61) -> Iterable[AnchorType[Schema]]:
62 if isinstance(contents, bool):
63 return
64 anchor = contents.get("$anchor")
65 if anchor is not None:
66 yield Anchor(
67 name=anchor,
68 resource=specification.create_resource(contents),
69 )
71 dynamic_anchor = contents.get("$dynamicAnchor")
72 if dynamic_anchor is not None:
73 yield DynamicAnchor(
74 name=dynamic_anchor,
75 resource=specification.create_resource(contents),
76 )
79def _anchor_2019(
80 specification: Specification[Schema],
81 contents: Schema,
82) -> Iterable[Anchor[Schema]]:
83 if isinstance(contents, bool):
84 return []
85 anchor = contents.get("$anchor")
86 if anchor is None:
87 return []
88 return [
89 Anchor(
90 name=anchor,
91 resource=specification.create_resource(contents),
92 ),
93 ]
96def _legacy_anchor_in_dollar_id(
97 specification: Specification[Schema],
98 contents: Schema,
99) -> Iterable[Anchor[Schema]]:
100 if isinstance(contents, bool):
101 return []
102 id = contents.get("$id", "")
103 if not id.startswith("#"):
104 return []
105 return [
106 Anchor(
107 name=id[1:],
108 resource=specification.create_resource(contents),
109 ),
110 ]
113def _legacy_anchor_in_id(
114 specification: Specification[ObjectSchema],
115 contents: ObjectSchema,
116) -> Iterable[Anchor[ObjectSchema]]:
117 id = contents.get("id", "")
118 if not id.startswith("#"):
119 return []
120 return [
121 Anchor(
122 name=id[1:],
123 resource=specification.create_resource(contents),
124 ),
125 ]
128def _subresources_of(
129 in_value: Set[str] = frozenset(),
130 in_subvalues: Set[str] = frozenset(),
131 in_subarray: Set[str] = frozenset(),
132):
133 """
134 Create a callable returning JSON Schema specification-style subschemas.
136 Relies on specifying the set of keywords containing subschemas in their
137 values, in a subobject's values, or in a subarray.
138 """
140 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
141 if isinstance(contents, bool):
142 return
143 for each in in_value:
144 if each in contents:
145 yield contents[each]
146 for each in in_subarray:
147 if each in contents:
148 yield from contents[each]
149 for each in in_subvalues:
150 if each in contents:
151 yield from contents[each].values()
153 return subresources_of
156def _subresources_of_with_crazy_items(
157 in_value: Set[str] = frozenset(),
158 in_subvalues: Set[str] = frozenset(),
159 in_subarray: Set[str] = frozenset(),
160):
161 """
162 Specifically handle older drafts where there are some funky keywords.
163 """
165 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
166 if isinstance(contents, bool):
167 return
168 for each in in_value:
169 if each in contents:
170 yield contents[each]
171 for each in in_subarray:
172 if each in contents:
173 yield from contents[each]
174 for each in in_subvalues:
175 if each in contents:
176 yield from contents[each].values()
178 items = contents.get("items")
179 if items is not None:
180 if isinstance(items, Sequence):
181 yield from items
182 else:
183 yield items
185 return subresources_of
188def _subresources_of_with_crazy_items_dependencies(
189 in_value: Set[str] = frozenset(),
190 in_subvalues: Set[str] = frozenset(),
191 in_subarray: Set[str] = frozenset(),
192):
193 """
194 Specifically handle older drafts where there are some funky keywords.
195 """
197 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
198 if isinstance(contents, bool):
199 return
200 for each in in_value:
201 if each in contents:
202 yield contents[each]
203 for each in in_subarray:
204 if each in contents:
205 yield from contents[each]
206 for each in in_subvalues:
207 if each in contents:
208 yield from contents[each].values()
210 items = contents.get("items")
211 if items is not None:
212 if isinstance(items, Sequence):
213 yield from items
214 else:
215 yield items
216 dependencies = contents.get("dependencies")
217 if dependencies is not None:
218 values = iter(dependencies.values())
219 value = next(values, None)
220 if isinstance(value, Mapping):
221 yield value
222 yield from values
224 return subresources_of
227def _subresources_of_with_crazy_aP_items_dependencies(
228 in_value: Set[str] = frozenset(),
229 in_subvalues: Set[str] = frozenset(),
230 in_subarray: Set[str] = frozenset(),
231):
232 """
233 Specifically handle even older drafts where there are some funky keywords.
234 """
236 def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]:
237 for each in in_value:
238 if each in contents:
239 yield contents[each]
240 for each in in_subarray:
241 if each in contents:
242 yield from contents[each]
243 for each in in_subvalues:
244 if each in contents:
245 yield from contents[each].values()
247 items = contents.get("items")
248 if items is not None:
249 if isinstance(items, Sequence):
250 yield from items
251 else:
252 yield items
253 dependencies = contents.get("dependencies")
254 if dependencies is not None:
255 values = iter(dependencies.values())
256 value = next(values, None)
257 if isinstance(value, Mapping):
258 yield value
259 yield from values
261 for each in "additionalItems", "additionalProperties":
262 value = contents.get(each)
263 if isinstance(value, Mapping):
264 yield value
266 return subresources_of
269def _maybe_in_subresource(
270 in_value: Set[str] = frozenset(),
271 in_subvalues: Set[str] = frozenset(),
272 in_subarray: Set[str] = frozenset(),
273):
274 in_child = in_subvalues | in_subarray
276 def maybe_in_subresource(
277 segments: Sequence[int | str],
278 resolver: _Resolver[Any],
279 subresource: Resource[Any],
280 ) -> _Resolver[Any]:
281 _segments = iter(segments)
282 for segment in _segments:
283 if segment not in in_value and (
284 segment not in in_child or next(_segments, None) is None
285 ):
286 return resolver
287 return resolver.in_subresource(subresource)
289 return maybe_in_subresource
292def _maybe_in_subresource_crazy_items(
293 in_value: Set[str] = frozenset(),
294 in_subvalues: Set[str] = frozenset(),
295 in_subarray: Set[str] = frozenset(),
296):
297 in_child = in_subvalues | in_subarray
299 def maybe_in_subresource(
300 segments: Sequence[int | str],
301 resolver: _Resolver[Any],
302 subresource: Resource[Any],
303 ) -> _Resolver[Any]:
304 _segments = iter(segments)
305 for segment in _segments:
306 if segment == "items" and isinstance(
307 subresource.contents,
308 Mapping,
309 ):
310 return resolver.in_subresource(subresource)
311 if segment not in in_value and (
312 segment not in in_child or next(_segments, None) is None
313 ):
314 return resolver
315 return resolver.in_subresource(subresource)
317 return maybe_in_subresource
320def _maybe_in_subresource_crazy_items_dependencies(
321 in_value: Set[str] = frozenset(),
322 in_subvalues: Set[str] = frozenset(),
323 in_subarray: Set[str] = frozenset(),
324):
325 in_child = in_subvalues | in_subarray
327 def maybe_in_subresource(
328 segments: Sequence[int | str],
329 resolver: _Resolver[Any],
330 subresource: Resource[Any],
331 ) -> _Resolver[Any]:
332 _segments = iter(segments)
333 for segment in _segments:
334 if (
335 segment == "items" or segment == "dependencies"
336 ) and isinstance(subresource.contents, Mapping):
337 return resolver.in_subresource(subresource)
338 if segment not in in_value and (
339 segment not in in_child or next(_segments, None) is None
340 ):
341 return resolver
342 return resolver.in_subresource(subresource)
344 return maybe_in_subresource
347#: JSON Schema draft 2020-12
348DRAFT202012 = Specification(
349 name="draft2020-12",
350 id_of=_dollar_id,
351 subresources_of=_subresources_of(
352 in_value={
353 "additionalProperties",
354 "contains",
355 "contentSchema",
356 "else",
357 "if",
358 "items",
359 "not",
360 "propertyNames",
361 "then",
362 "unevaluatedItems",
363 "unevaluatedProperties",
364 },
365 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
366 in_subvalues={
367 "$defs",
368 "dependentSchemas",
369 "patternProperties",
370 "properties",
371 },
372 ),
373 anchors_in=_anchor,
374 maybe_in_subresource=_maybe_in_subresource(
375 in_value={
376 "additionalProperties",
377 "contains",
378 "contentSchema",
379 "else",
380 "if",
381 "items",
382 "not",
383 "propertyNames",
384 "then",
385 "unevaluatedItems",
386 "unevaluatedProperties",
387 },
388 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
389 in_subvalues={
390 "$defs",
391 "dependentSchemas",
392 "patternProperties",
393 "properties",
394 },
395 ),
396)
397#: JSON Schema draft 2019-09
398DRAFT201909 = Specification(
399 name="draft2019-09",
400 id_of=_dollar_id,
401 subresources_of=_subresources_of_with_crazy_items(
402 in_value={
403 "additionalItems",
404 "additionalProperties",
405 "contains",
406 "contentSchema",
407 "else",
408 "if",
409 "not",
410 "propertyNames",
411 "then",
412 "unevaluatedItems",
413 "unevaluatedProperties",
414 },
415 in_subarray={"allOf", "anyOf", "oneOf"},
416 in_subvalues={
417 "$defs",
418 "dependentSchemas",
419 "patternProperties",
420 "properties",
421 },
422 ),
423 anchors_in=_anchor_2019,
424 maybe_in_subresource=_maybe_in_subresource_crazy_items(
425 in_value={
426 "additionalItems",
427 "additionalProperties",
428 "contains",
429 "contentSchema",
430 "else",
431 "if",
432 "not",
433 "propertyNames",
434 "then",
435 "unevaluatedItems",
436 "unevaluatedProperties",
437 },
438 in_subarray={"allOf", "anyOf", "oneOf"},
439 in_subvalues={
440 "$defs",
441 "dependentSchemas",
442 "patternProperties",
443 "properties",
444 },
445 ),
446)
447#: JSON Schema draft 7
448DRAFT7 = Specification(
449 name="draft-07",
450 id_of=_legacy_dollar_id,
451 subresources_of=_subresources_of_with_crazy_items_dependencies(
452 in_value={
453 "additionalItems",
454 "additionalProperties",
455 "contains",
456 "else",
457 "if",
458 "not",
459 "propertyNames",
460 "then",
461 },
462 in_subarray={"allOf", "anyOf", "oneOf"},
463 in_subvalues={"definitions", "patternProperties", "properties"},
464 ),
465 anchors_in=_legacy_anchor_in_dollar_id,
466 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
467 in_value={
468 "additionalItems",
469 "additionalProperties",
470 "contains",
471 "else",
472 "if",
473 "not",
474 "propertyNames",
475 "then",
476 },
477 in_subarray={"allOf", "anyOf", "oneOf"},
478 in_subvalues={"definitions", "patternProperties", "properties"},
479 ),
480)
481#: JSON Schema draft 6
482DRAFT6 = Specification(
483 name="draft-06",
484 id_of=_legacy_dollar_id,
485 subresources_of=_subresources_of_with_crazy_items_dependencies(
486 in_value={
487 "additionalItems",
488 "additionalProperties",
489 "contains",
490 "not",
491 "propertyNames",
492 },
493 in_subarray={"allOf", "anyOf", "oneOf"},
494 in_subvalues={"definitions", "patternProperties", "properties"},
495 ),
496 anchors_in=_legacy_anchor_in_dollar_id,
497 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
498 in_value={
499 "additionalItems",
500 "additionalProperties",
501 "contains",
502 "not",
503 "propertyNames",
504 },
505 in_subarray={"allOf", "anyOf", "oneOf"},
506 in_subvalues={"definitions", "patternProperties", "properties"},
507 ),
508)
509#: JSON Schema draft 4
510DRAFT4 = Specification(
511 name="draft-04",
512 id_of=_legacy_id,
513 subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
514 in_value={"not"},
515 in_subarray={"allOf", "anyOf", "oneOf"},
516 in_subvalues={"definitions", "patternProperties", "properties"},
517 ),
518 anchors_in=_legacy_anchor_in_id,
519 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
520 in_value={"additionalItems", "additionalProperties", "not"},
521 in_subarray={"allOf", "anyOf", "oneOf"},
522 in_subvalues={"definitions", "patternProperties", "properties"},
523 ),
524)
525#: JSON Schema draft 3
526DRAFT3 = Specification(
527 name="draft-03",
528 id_of=_legacy_id,
529 subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
530 in_subarray={"extends"},
531 in_subvalues={"definitions", "patternProperties", "properties"},
532 ),
533 anchors_in=_legacy_anchor_in_id,
534 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
535 in_value={"additionalItems", "additionalProperties"},
536 in_subarray={"extends"},
537 in_subvalues={"definitions", "patternProperties", "properties"},
538 ),
539)
542_SPECIFICATIONS: Registry[Specification[Schema]] = Registry(
543 { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types
544 dialect_id: Resource.opaque(specification)
545 for dialect_id, specification in [
546 ("https://json-schema.org/draft/2020-12/schema", DRAFT202012),
547 ("https://json-schema.org/draft/2019-09/schema", DRAFT201909),
548 ("http://json-schema.org/draft-07/schema", DRAFT7),
549 ("http://json-schema.org/draft-06/schema", DRAFT6),
550 ("http://json-schema.org/draft-04/schema", DRAFT4),
551 ("http://json-schema.org/draft-03/schema", DRAFT3),
552 ]
553 },
554)
557def specification_with(
558 dialect_id: URI,
559 default: Specification[Any] = None, # type: ignore[reportGeneralTypeIssues] # noqa: E501
560) -> Specification[Any]:
561 """
562 Retrieve the `Specification` with the given dialect identifier.
564 Raises:
566 `UnknownDialect`
568 if the given ``dialect_id`` isn't known
569 """
570 resource = _SPECIFICATIONS.get(dialect_id.rstrip("#"))
571 if resource is not None:
572 return resource.contents
573 if default is None: # type: ignore[reportUnnecessaryComparison]
574 raise UnknownDialect(dialect_id)
575 return default
578@frozen
579class DynamicAnchor:
580 """
581 Dynamic anchors, introduced in draft 2020.
582 """
584 name: str
585 resource: Resource[Schema]
587 def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]:
588 """
589 Resolve this anchor dynamically.
590 """
591 last = self.resource
592 for uri, registry in resolver.dynamic_scope():
593 try:
594 anchor = registry.anchor(uri, self.name).value
595 except exceptions.NoSuchAnchor:
596 continue
597 if isinstance(anchor, DynamicAnchor):
598 last = anchor.resource
599 return _Resolved(
600 contents=last.contents,
601 resolver=resolver.in_subresource(last),
602 )
605def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]:
606 """
607 Recursive references (via recursive anchors), present only in draft 2019.
609 As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive
610 reference is supported (and is therefore assumed to be the relevant
611 reference).
612 """
613 resolved = resolver.lookup("#")
614 if isinstance(resolved.contents, Mapping) and resolved.contents.get(
615 "$recursiveAnchor",
616 ):
617 for uri, _ in resolver.dynamic_scope():
618 next_resolved = resolver.lookup(uri)
619 if not isinstance(
620 next_resolved.contents,
621 Mapping,
622 ) or not next_resolved.contents.get("$recursiveAnchor"):
623 break
624 resolved = next_resolved
625 return resolved