Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/jsonschema.py: 75%
215 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1"""
2Referencing implementations for JSON Schema specs (historic & current).
3"""
5from __future__ import annotations
7from collections.abc import Sequence, Set
8from typing import Any, Iterable, Union
10from referencing import Anchor, Registry, Resource, Specification, exceptions
11from referencing._attrs import frozen
12from referencing._core import _UNSET # type: ignore[reportPrivateUsage]
13from referencing._core import _Unset # type: ignore[reportPrivateUsage]
14from referencing._core import Resolved as _Resolved, Resolver as _Resolver
15from referencing.typing import URI, Anchor as AnchorType, Mapping
17#: A JSON Schema which is a JSON object
18ObjectSchema = Mapping[str, Any]
20#: A JSON Schema of any kind
21Schema = Union[bool, ObjectSchema]
23#: A JSON Schema Registry
24SchemaRegistry = Registry[Schema]
26#: The empty JSON Schema Registry
27EMPTY_REGISTRY: SchemaRegistry = Registry()
30@frozen
31class UnknownDialect(Exception):
32 """
33 A dialect identifier was found for a dialect unknown by this library.
35 If it's a custom ("unofficial") dialect, be sure you've registered it.
36 """
38 uri: URI
41def _dollar_id(contents: Schema) -> URI | None:
42 if isinstance(contents, bool):
43 return
44 return contents.get("$id")
47def _legacy_dollar_id(contents: Schema) -> URI | None:
48 if isinstance(contents, bool) or "$ref" in contents:
49 return
50 id = contents.get("$id")
51 if id is not None and not id.startswith("#"):
52 return id
55def _legacy_id(contents: ObjectSchema) -> URI | None:
56 if "$ref" in contents:
57 return
58 id = contents.get("id")
59 if id is not None and not id.startswith("#"):
60 return id
63def _anchor(
64 specification: Specification[Schema],
65 contents: Schema,
66) -> Iterable[AnchorType[Schema]]:
67 if isinstance(contents, bool):
68 return
69 anchor = contents.get("$anchor")
70 if anchor is not None:
71 yield Anchor(
72 name=anchor,
73 resource=specification.create_resource(contents),
74 )
76 dynamic_anchor = contents.get("$dynamicAnchor")
77 if dynamic_anchor is not None:
78 yield DynamicAnchor(
79 name=dynamic_anchor,
80 resource=specification.create_resource(contents),
81 )
84def _anchor_2019(
85 specification: Specification[Schema],
86 contents: Schema,
87) -> Iterable[Anchor[Schema]]:
88 if isinstance(contents, bool):
89 return []
90 anchor = contents.get("$anchor")
91 if anchor is None:
92 return []
93 return [
94 Anchor(
95 name=anchor,
96 resource=specification.create_resource(contents),
97 ),
98 ]
101def _legacy_anchor_in_dollar_id(
102 specification: Specification[Schema],
103 contents: Schema,
104) -> Iterable[Anchor[Schema]]:
105 if isinstance(contents, bool):
106 return []
107 id = contents.get("$id", "")
108 if not id.startswith("#"):
109 return []
110 return [
111 Anchor(
112 name=id[1:],
113 resource=specification.create_resource(contents),
114 ),
115 ]
118def _legacy_anchor_in_id(
119 specification: Specification[ObjectSchema],
120 contents: ObjectSchema,
121) -> Iterable[Anchor[ObjectSchema]]:
122 id = contents.get("id", "")
123 if not id.startswith("#"):
124 return []
125 return [
126 Anchor(
127 name=id[1:],
128 resource=specification.create_resource(contents),
129 ),
130 ]
133def _subresources_of(
134 in_value: Set[str] = frozenset(),
135 in_subvalues: Set[str] = frozenset(),
136 in_subarray: Set[str] = frozenset(),
137):
138 """
139 Create a callable returning JSON Schema specification-style subschemas.
141 Relies on specifying the set of keywords containing subschemas in their
142 values, in a subobject's values, or in a subarray.
143 """
145 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
146 if isinstance(contents, bool):
147 return
148 for each in in_value:
149 if each in contents:
150 yield contents[each]
151 for each in in_subarray:
152 if each in contents:
153 yield from contents[each]
154 for each in in_subvalues:
155 if each in contents:
156 yield from contents[each].values()
158 return subresources_of
161def _subresources_of_with_crazy_items(
162 in_value: Set[str] = frozenset(),
163 in_subvalues: Set[str] = frozenset(),
164 in_subarray: Set[str] = frozenset(),
165):
166 """
167 Specifically handle older drafts where there are some funky keywords.
168 """
170 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
171 if isinstance(contents, bool):
172 return
173 for each in in_value:
174 if each in contents:
175 yield contents[each]
176 for each in in_subarray:
177 if each in contents:
178 yield from contents[each]
179 for each in in_subvalues:
180 if each in contents:
181 yield from contents[each].values()
183 items = contents.get("items")
184 if items is not None:
185 if isinstance(items, Sequence):
186 yield from items
187 else:
188 yield items
190 return subresources_of
193def _subresources_of_with_crazy_items_dependencies(
194 in_value: Set[str] = frozenset(),
195 in_subvalues: Set[str] = frozenset(),
196 in_subarray: Set[str] = frozenset(),
197):
198 """
199 Specifically handle older drafts where there are some funky keywords.
200 """
202 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
203 if isinstance(contents, bool):
204 return
205 for each in in_value:
206 if each in contents:
207 yield contents[each]
208 for each in in_subarray:
209 if each in contents:
210 yield from contents[each]
211 for each in in_subvalues:
212 if each in contents:
213 yield from contents[each].values()
215 items = contents.get("items")
216 if items is not None:
217 if isinstance(items, Sequence):
218 yield from items
219 else:
220 yield items
221 dependencies = contents.get("dependencies")
222 if dependencies is not None:
223 values = iter(dependencies.values())
224 value = next(values, None)
225 if isinstance(value, Mapping):
226 yield value
227 yield from values
229 return subresources_of
232def _subresources_of_with_crazy_aP_items_dependencies(
233 in_value: Set[str] = frozenset(),
234 in_subvalues: Set[str] = frozenset(),
235 in_subarray: Set[str] = frozenset(),
236):
237 """
238 Specifically handle even older drafts where there are some funky keywords.
239 """
241 def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]:
242 for each in in_value:
243 if each in contents:
244 yield contents[each]
245 for each in in_subarray:
246 if each in contents:
247 yield from contents[each]
248 for each in in_subvalues:
249 if each in contents:
250 yield from contents[each].values()
252 items = contents.get("items")
253 if items is not None:
254 if isinstance(items, Sequence):
255 yield from items
256 else:
257 yield items
258 dependencies = contents.get("dependencies")
259 if dependencies is not None:
260 values = iter(dependencies.values())
261 value = next(values, None)
262 if isinstance(value, Mapping):
263 yield value
264 yield from values
266 for each in "additionalItems", "additionalProperties":
267 value = contents.get(each)
268 if isinstance(value, Mapping):
269 yield value
271 return subresources_of
274def _maybe_in_subresource(
275 in_value: Set[str] = frozenset(),
276 in_subvalues: Set[str] = frozenset(),
277 in_subarray: Set[str] = frozenset(),
278):
279 in_child = in_subvalues | in_subarray
281 def maybe_in_subresource(
282 segments: Sequence[int | str],
283 resolver: _Resolver[Any],
284 subresource: Resource[Any],
285 ) -> _Resolver[Any]:
286 _segments = iter(segments)
287 for segment in _segments:
288 if segment not in in_value and (
289 segment not in in_child or next(_segments, None) is None
290 ):
291 return resolver
292 return resolver.in_subresource(subresource)
294 return maybe_in_subresource
297def _maybe_in_subresource_crazy_items(
298 in_value: Set[str] = frozenset(),
299 in_subvalues: Set[str] = frozenset(),
300 in_subarray: Set[str] = frozenset(),
301):
302 in_child = in_subvalues | in_subarray
304 def maybe_in_subresource(
305 segments: Sequence[int | str],
306 resolver: _Resolver[Any],
307 subresource: Resource[Any],
308 ) -> _Resolver[Any]:
309 _segments = iter(segments)
310 for segment in _segments:
311 if segment == "items" and isinstance(
312 subresource.contents,
313 Mapping,
314 ):
315 return resolver.in_subresource(subresource)
316 if segment not in in_value and (
317 segment not in in_child or next(_segments, None) is None
318 ):
319 return resolver
320 return resolver.in_subresource(subresource)
322 return maybe_in_subresource
325def _maybe_in_subresource_crazy_items_dependencies(
326 in_value: Set[str] = frozenset(),
327 in_subvalues: Set[str] = frozenset(),
328 in_subarray: Set[str] = frozenset(),
329):
330 in_child = in_subvalues | in_subarray
332 def maybe_in_subresource(
333 segments: Sequence[int | str],
334 resolver: _Resolver[Any],
335 subresource: Resource[Any],
336 ) -> _Resolver[Any]:
337 _segments = iter(segments)
338 for segment in _segments:
339 if (
340 segment == "items" or segment == "dependencies"
341 ) and isinstance(subresource.contents, Mapping):
342 return resolver.in_subresource(subresource)
343 if segment not in in_value and (
344 segment not in in_child or next(_segments, None) is None
345 ):
346 return resolver
347 return resolver.in_subresource(subresource)
349 return maybe_in_subresource
352#: JSON Schema draft 2020-12
353DRAFT202012 = Specification(
354 name="draft2020-12",
355 id_of=_dollar_id,
356 subresources_of=_subresources_of(
357 in_value={
358 "additionalProperties",
359 "contains",
360 "contentSchema",
361 "else",
362 "if",
363 "items",
364 "not",
365 "propertyNames",
366 "then",
367 "unevaluatedItems",
368 "unevaluatedProperties",
369 },
370 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
371 in_subvalues={
372 "$defs",
373 "dependentSchemas",
374 "patternProperties",
375 "properties",
376 },
377 ),
378 anchors_in=_anchor,
379 maybe_in_subresource=_maybe_in_subresource(
380 in_value={
381 "additionalProperties",
382 "contains",
383 "contentSchema",
384 "else",
385 "if",
386 "items",
387 "not",
388 "propertyNames",
389 "then",
390 "unevaluatedItems",
391 "unevaluatedProperties",
392 },
393 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
394 in_subvalues={
395 "$defs",
396 "dependentSchemas",
397 "patternProperties",
398 "properties",
399 },
400 ),
401)
402#: JSON Schema draft 2019-09
403DRAFT201909 = Specification(
404 name="draft2019-09",
405 id_of=_dollar_id,
406 subresources_of=_subresources_of_with_crazy_items(
407 in_value={
408 "additionalItems",
409 "additionalProperties",
410 "contains",
411 "contentSchema",
412 "else",
413 "if",
414 "not",
415 "propertyNames",
416 "then",
417 "unevaluatedItems",
418 "unevaluatedProperties",
419 },
420 in_subarray={"allOf", "anyOf", "oneOf"},
421 in_subvalues={
422 "$defs",
423 "dependentSchemas",
424 "patternProperties",
425 "properties",
426 },
427 ),
428 anchors_in=_anchor_2019, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real
429 maybe_in_subresource=_maybe_in_subresource_crazy_items(
430 in_value={
431 "additionalItems",
432 "additionalProperties",
433 "contains",
434 "contentSchema",
435 "else",
436 "if",
437 "not",
438 "propertyNames",
439 "then",
440 "unevaluatedItems",
441 "unevaluatedProperties",
442 },
443 in_subarray={"allOf", "anyOf", "oneOf"},
444 in_subvalues={
445 "$defs",
446 "dependentSchemas",
447 "patternProperties",
448 "properties",
449 },
450 ),
451)
452#: JSON Schema draft 7
453DRAFT7 = Specification(
454 name="draft-07",
455 id_of=_legacy_dollar_id,
456 subresources_of=_subresources_of_with_crazy_items_dependencies(
457 in_value={
458 "additionalItems",
459 "additionalProperties",
460 "contains",
461 "else",
462 "if",
463 "not",
464 "propertyNames",
465 "then",
466 },
467 in_subarray={"allOf", "anyOf", "oneOf"},
468 in_subvalues={"definitions", "patternProperties", "properties"},
469 ),
470 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real
471 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
472 in_value={
473 "additionalItems",
474 "additionalProperties",
475 "contains",
476 "else",
477 "if",
478 "not",
479 "propertyNames",
480 "then",
481 },
482 in_subarray={"allOf", "anyOf", "oneOf"},
483 in_subvalues={"definitions", "patternProperties", "properties"},
484 ),
485)
486#: JSON Schema draft 6
487DRAFT6 = Specification(
488 name="draft-06",
489 id_of=_legacy_dollar_id,
490 subresources_of=_subresources_of_with_crazy_items_dependencies(
491 in_value={
492 "additionalItems",
493 "additionalProperties",
494 "contains",
495 "not",
496 "propertyNames",
497 },
498 in_subarray={"allOf", "anyOf", "oneOf"},
499 in_subvalues={"definitions", "patternProperties", "properties"},
500 ),
501 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real
502 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
503 in_value={
504 "additionalItems",
505 "additionalProperties",
506 "contains",
507 "not",
508 "propertyNames",
509 },
510 in_subarray={"allOf", "anyOf", "oneOf"},
511 in_subvalues={"definitions", "patternProperties", "properties"},
512 ),
513)
514#: JSON Schema draft 4
515DRAFT4 = Specification(
516 name="draft-04",
517 id_of=_legacy_id,
518 subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
519 in_value={"not"},
520 in_subarray={"allOf", "anyOf", "oneOf"},
521 in_subvalues={"definitions", "patternProperties", "properties"},
522 ),
523 anchors_in=_legacy_anchor_in_id,
524 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
525 in_value={"additionalItems", "additionalProperties", "not"},
526 in_subarray={"allOf", "anyOf", "oneOf"},
527 in_subvalues={"definitions", "patternProperties", "properties"},
528 ),
529)
530#: JSON Schema draft 3
531DRAFT3 = Specification(
532 name="draft-03",
533 id_of=_legacy_id,
534 subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
535 in_subarray={"extends"},
536 in_subvalues={"definitions", "patternProperties", "properties"},
537 ),
538 anchors_in=_legacy_anchor_in_id,
539 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
540 in_value={"additionalItems", "additionalProperties"},
541 in_subarray={"extends"},
542 in_subvalues={"definitions", "patternProperties", "properties"},
543 ),
544)
547_SPECIFICATIONS: Registry[Specification[Schema]] = Registry(
548 { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types
549 dialect_id: Resource.opaque(specification)
550 for dialect_id, specification in [
551 ("https://json-schema.org/draft/2020-12/schema", DRAFT202012),
552 ("https://json-schema.org/draft/2019-09/schema", DRAFT201909),
553 ("http://json-schema.org/draft-07/schema", DRAFT7),
554 ("http://json-schema.org/draft-06/schema", DRAFT6),
555 ("http://json-schema.org/draft-04/schema", DRAFT4),
556 ("http://json-schema.org/draft-03/schema", DRAFT3),
557 ]
558 },
559)
562def specification_with(
563 dialect_id: URI,
564 default: Specification[Any] | _Unset = _UNSET,
565) -> Specification[Any]:
566 """
567 Retrieve the `Specification` with the given dialect identifier.
569 Raises:
571 `UnknownDialect`
573 if the given ``dialect_id`` isn't known
574 """
575 resource = _SPECIFICATIONS.get(dialect_id.rstrip("#"))
576 if resource is not None:
577 return resource.contents
578 if default is _UNSET:
579 raise UnknownDialect(dialect_id)
580 return default
583@frozen
584class DynamicAnchor:
585 """
586 Dynamic anchors, introduced in draft 2020.
587 """
589 name: str
590 resource: Resource[Schema]
592 def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]:
593 """
594 Resolve this anchor dynamically.
595 """
596 last = self.resource
597 for uri, registry in resolver.dynamic_scope():
598 try:
599 anchor = registry.anchor(uri, self.name).value
600 except exceptions.NoSuchAnchor:
601 continue
602 if isinstance(anchor, DynamicAnchor):
603 last = anchor.resource
604 return _Resolved(
605 contents=last.contents,
606 resolver=resolver.in_subresource(last),
607 )
610def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]:
611 """
612 Recursive references (via recursive anchors), present only in draft 2019.
614 As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive
615 reference is supported (and is therefore assumed to be the relevant
616 reference).
617 """
618 resolved = resolver.lookup("#")
619 if isinstance(resolved.contents, Mapping) and resolved.contents.get(
620 "$recursiveAnchor",
621 ):
622 for uri, _ in resolver.dynamic_scope():
623 next_resolved = resolver.lookup(uri)
624 if not isinstance(
625 next_resolved.contents,
626 Mapping,
627 ) or not next_resolved.contents.get("$recursiveAnchor"):
628 break
629 resolved = next_resolved
630 return resolved