Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/jsonschema.py: 28%
215 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-22 06:29 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-22 06:29 +0000
1"""
2Referencing implementations for JSON Schema specs (historic & current).
3"""
5from __future__ import annotations
7from collections.abc import Sequence, Set
8from typing import Any, Iterable, Union
10from referencing import Anchor, Registry, Resource, Specification, exceptions
11from referencing._attrs import frozen
12from referencing._core import _UNSET # type: ignore[reportPrivateUsage]
13from referencing._core import _Unset # type: ignore[reportPrivateUsage]
14from referencing._core import Resolved as _Resolved, Resolver as _Resolver
15from referencing.typing import URI, Anchor as AnchorType, Mapping
17#: A JSON Schema which is a JSON object
18ObjectSchema = Mapping[str, Any]
20#: A JSON Schema of any kind
21Schema = Union[bool, ObjectSchema]
23#: A JSON Schema Registry
24SchemaRegistry = Registry[Schema]
26#: The empty JSON Schema Registry
27EMPTY_REGISTRY: SchemaRegistry = Registry()
30@frozen
31class UnknownDialect(Exception):
32 """
33 A dialect identifier was found for a dialect unknown by this library.
35 If it's a custom ("unofficial") dialect, be sure you've registered it.
36 """
38 uri: URI
41def _dollar_id(contents: Schema) -> URI | None:
42 if isinstance(contents, bool):
43 return
44 return contents.get("$id")
47def _legacy_dollar_id(contents: Schema) -> URI | None:
48 if isinstance(contents, bool) or "$ref" in contents:
49 return
50 id = contents.get("$id")
51 if id is not None and not id.startswith("#"):
52 return id
55def _legacy_id(contents: ObjectSchema) -> URI | None:
56 if "$ref" in contents:
57 return
58 id = contents.get("id")
59 if id is not None and not id.startswith("#"):
60 return id
63def _anchor(
64 specification: Specification[Schema],
65 contents: Schema,
66) -> Iterable[AnchorType[Schema]]:
67 if isinstance(contents, bool):
68 return
69 anchor = contents.get("$anchor")
70 if anchor is not None:
71 yield Anchor(
72 name=anchor,
73 resource=specification.create_resource(contents),
74 )
76 dynamic_anchor = contents.get("$dynamicAnchor")
77 if dynamic_anchor is not None:
78 yield DynamicAnchor(
79 name=dynamic_anchor,
80 resource=specification.create_resource(contents),
81 )
84def _anchor_2019(
85 specification: Specification[Schema],
86 contents: Schema,
87) -> Iterable[Anchor[Schema]]:
88 if isinstance(contents, bool):
89 return []
90 anchor = contents.get("$anchor")
91 if anchor is None:
92 return []
93 return [
94 Anchor(
95 name=anchor,
96 resource=specification.create_resource(contents),
97 ),
98 ]
101def _legacy_anchor_in_dollar_id(
102 specification: Specification[Schema],
103 contents: Schema,
104) -> Iterable[Anchor[Schema]]:
105 if isinstance(contents, bool):
106 return []
107 id = contents.get("$id", "")
108 if not id.startswith("#"):
109 return []
110 return [
111 Anchor(
112 name=id[1:],
113 resource=specification.create_resource(contents),
114 ),
115 ]
118def _legacy_anchor_in_id(
119 specification: Specification[ObjectSchema],
120 contents: ObjectSchema,
121) -> Iterable[Anchor[ObjectSchema]]:
122 id = contents.get("id", "")
123 if not id.startswith("#"):
124 return []
125 return [
126 Anchor(
127 name=id[1:],
128 resource=specification.create_resource(contents),
129 ),
130 ]
133def _subresources_of(
134 in_value: Set[str] = frozenset(),
135 in_subvalues: Set[str] = frozenset(),
136 in_subarray: Set[str] = frozenset(),
137):
138 """
139 Create a callable returning JSON Schema specification-style subschemas.
141 Relies on specifying the set of keywords containing subschemas in their
142 values, in a subobject's values, or in a subarray.
143 """
145 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
146 if isinstance(contents, bool):
147 return
148 for each in in_value:
149 if each in contents:
150 yield contents[each]
151 for each in in_subarray:
152 if each in contents:
153 yield from contents[each]
154 for each in in_subvalues:
155 if each in contents:
156 yield from contents[each].values()
158 return subresources_of
161def _subresources_of_with_crazy_items(
162 in_value: Set[str] = frozenset(),
163 in_subvalues: Set[str] = frozenset(),
164 in_subarray: Set[str] = frozenset(),
165):
166 """
167 Specifically handle older drafts where there are some funky keywords.
168 """
170 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
171 if isinstance(contents, bool):
172 return
173 for each in in_value:
174 if each in contents:
175 yield contents[each]
176 for each in in_subarray:
177 if each in contents:
178 yield from contents[each]
179 for each in in_subvalues:
180 if each in contents:
181 yield from contents[each].values()
183 items = contents.get("items")
184 if items is not None:
185 if isinstance(items, Sequence):
186 yield from items
187 else:
188 yield items
190 return subresources_of
193def _subresources_of_with_crazy_items_dependencies(
194 in_value: Set[str] = frozenset(),
195 in_subvalues: Set[str] = frozenset(),
196 in_subarray: Set[str] = frozenset(),
197):
198 """
199 Specifically handle older drafts where there are some funky keywords.
200 """
202 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
203 if isinstance(contents, bool):
204 return
205 for each in in_value:
206 if each in contents:
207 yield contents[each]
208 for each in in_subarray:
209 if each in contents:
210 yield from contents[each]
211 for each in in_subvalues:
212 if each in contents:
213 yield from contents[each].values()
215 items = contents.get("items")
216 if items is not None:
217 if isinstance(items, Sequence):
218 yield from items
219 else:
220 yield items
221 dependencies = contents.get("dependencies")
222 if dependencies is not None:
223 values = iter(dependencies.values())
224 value = next(values, None)
225 if isinstance(value, Mapping):
226 yield value
227 yield from values
229 return subresources_of
232def _subresources_of_with_crazy_aP_items_dependencies(
233 in_value: Set[str] = frozenset(),
234 in_subvalues: Set[str] = frozenset(),
235 in_subarray: Set[str] = frozenset(),
236):
237 """
238 Specifically handle even older drafts where there are some funky keywords.
239 """
241 def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]:
242 for each in in_value:
243 if each in contents:
244 yield contents[each]
245 for each in in_subarray:
246 if each in contents:
247 yield from contents[each]
248 for each in in_subvalues:
249 if each in contents:
250 yield from contents[each].values()
252 items = contents.get("items")
253 if items is not None:
254 if isinstance(items, Sequence):
255 yield from items
256 else:
257 yield items
258 dependencies = contents.get("dependencies")
259 if dependencies is not None:
260 values = iter(dependencies.values())
261 value = next(values, None)
262 if isinstance(value, Mapping):
263 yield value
264 yield from values
266 for each in "additionalItems", "additionalProperties":
267 value = contents.get(each)
268 if isinstance(value, Mapping):
269 yield value
271 return subresources_of
274def _maybe_in_subresource(
275 in_value: Set[str] = frozenset(),
276 in_subvalues: Set[str] = frozenset(),
277 in_subarray: Set[str] = frozenset(),
278):
279 in_child = in_subvalues | in_subarray
281 def maybe_in_subresource(
282 segments: Sequence[int | str],
283 resolver: _Resolver[Any],
284 subresource: Resource[Any],
285 ) -> _Resolver[Any]:
286 _segments = iter(segments)
287 for segment in _segments:
288 if segment not in in_value and (
289 segment not in in_child or next(_segments, None) is None
290 ):
291 return resolver
292 return resolver.in_subresource(subresource)
294 return maybe_in_subresource
297def _maybe_in_subresource_crazy_items(
298 in_value: Set[str] = frozenset(),
299 in_subvalues: Set[str] = frozenset(),
300 in_subarray: Set[str] = frozenset(),
301):
302 in_child = in_subvalues | in_subarray
304 def maybe_in_subresource(
305 segments: Sequence[int | str],
306 resolver: _Resolver[Any],
307 subresource: Resource[Any],
308 ) -> _Resolver[Any]:
309 _segments = iter(segments)
310 for segment in _segments:
311 if segment == "items" and isinstance(
312 subresource.contents,
313 Mapping,
314 ):
315 return resolver.in_subresource(subresource)
316 if segment not in in_value and (
317 segment not in in_child or next(_segments, None) is None
318 ):
319 return resolver
320 return resolver.in_subresource(subresource)
322 return maybe_in_subresource
325def _maybe_in_subresource_crazy_items_dependencies(
326 in_value: Set[str] = frozenset(),
327 in_subvalues: Set[str] = frozenset(),
328 in_subarray: Set[str] = frozenset(),
329):
330 in_child = in_subvalues | in_subarray
332 def maybe_in_subresource(
333 segments: Sequence[int | str],
334 resolver: _Resolver[Any],
335 subresource: Resource[Any],
336 ) -> _Resolver[Any]:
337 _segments = iter(segments)
338 for segment in _segments:
339 if segment in {"items", "dependencies"} and isinstance(
340 subresource.contents,
341 Mapping,
342 ):
343 return resolver.in_subresource(subresource)
344 if segment not in in_value and (
345 segment not in in_child or next(_segments, None) is None
346 ):
347 return resolver
348 return resolver.in_subresource(subresource)
350 return maybe_in_subresource
353#: JSON Schema draft 2020-12
354DRAFT202012 = Specification(
355 name="draft2020-12",
356 id_of=_dollar_id,
357 subresources_of=_subresources_of(
358 in_value={
359 "additionalProperties",
360 "contains",
361 "contentSchema",
362 "else",
363 "if",
364 "items",
365 "not",
366 "propertyNames",
367 "then",
368 "unevaluatedItems",
369 "unevaluatedProperties",
370 },
371 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
372 in_subvalues={
373 "$defs",
374 "dependentSchemas",
375 "patternProperties",
376 "properties",
377 },
378 ),
379 anchors_in=_anchor,
380 maybe_in_subresource=_maybe_in_subresource(
381 in_value={
382 "additionalProperties",
383 "contains",
384 "contentSchema",
385 "else",
386 "if",
387 "items",
388 "not",
389 "propertyNames",
390 "then",
391 "unevaluatedItems",
392 "unevaluatedProperties",
393 },
394 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
395 in_subvalues={
396 "$defs",
397 "dependentSchemas",
398 "patternProperties",
399 "properties",
400 },
401 ),
402)
403#: JSON Schema draft 2019-09
404DRAFT201909 = Specification(
405 name="draft2019-09",
406 id_of=_dollar_id,
407 subresources_of=_subresources_of_with_crazy_items(
408 in_value={
409 "additionalItems",
410 "additionalProperties",
411 "contains",
412 "contentSchema",
413 "else",
414 "if",
415 "not",
416 "propertyNames",
417 "then",
418 "unevaluatedItems",
419 "unevaluatedProperties",
420 },
421 in_subarray={"allOf", "anyOf", "oneOf"},
422 in_subvalues={
423 "$defs",
424 "dependentSchemas",
425 "patternProperties",
426 "properties",
427 },
428 ),
429 anchors_in=_anchor_2019, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real
430 maybe_in_subresource=_maybe_in_subresource_crazy_items(
431 in_value={
432 "additionalItems",
433 "additionalProperties",
434 "contains",
435 "contentSchema",
436 "else",
437 "if",
438 "not",
439 "propertyNames",
440 "then",
441 "unevaluatedItems",
442 "unevaluatedProperties",
443 },
444 in_subarray={"allOf", "anyOf", "oneOf"},
445 in_subvalues={
446 "$defs",
447 "dependentSchemas",
448 "patternProperties",
449 "properties",
450 },
451 ),
452)
453#: JSON Schema draft 7
454DRAFT7 = Specification(
455 name="draft-07",
456 id_of=_legacy_dollar_id,
457 subresources_of=_subresources_of_with_crazy_items_dependencies(
458 in_value={
459 "additionalItems",
460 "additionalProperties",
461 "contains",
462 "else",
463 "if",
464 "not",
465 "propertyNames",
466 "then",
467 },
468 in_subarray={"allOf", "anyOf", "oneOf"},
469 in_subvalues={"definitions", "patternProperties", "properties"},
470 ),
471 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real
472 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
473 in_value={
474 "additionalItems",
475 "additionalProperties",
476 "contains",
477 "else",
478 "if",
479 "not",
480 "propertyNames",
481 "then",
482 },
483 in_subarray={"allOf", "anyOf", "oneOf"},
484 in_subvalues={"definitions", "patternProperties", "properties"},
485 ),
486)
487#: JSON Schema draft 6
488DRAFT6 = Specification(
489 name="draft-06",
490 id_of=_legacy_dollar_id,
491 subresources_of=_subresources_of_with_crazy_items_dependencies(
492 in_value={
493 "additionalItems",
494 "additionalProperties",
495 "contains",
496 "not",
497 "propertyNames",
498 },
499 in_subarray={"allOf", "anyOf", "oneOf"},
500 in_subvalues={"definitions", "patternProperties", "properties"},
501 ),
502 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real
503 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
504 in_value={
505 "additionalItems",
506 "additionalProperties",
507 "contains",
508 "not",
509 "propertyNames",
510 },
511 in_subarray={"allOf", "anyOf", "oneOf"},
512 in_subvalues={"definitions", "patternProperties", "properties"},
513 ),
514)
515#: JSON Schema draft 4
516DRAFT4 = Specification(
517 name="draft-04",
518 id_of=_legacy_id,
519 subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
520 in_value={"not"},
521 in_subarray={"allOf", "anyOf", "oneOf"},
522 in_subvalues={"definitions", "patternProperties", "properties"},
523 ),
524 anchors_in=_legacy_anchor_in_id,
525 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
526 in_value={"additionalItems", "additionalProperties", "not"},
527 in_subarray={"allOf", "anyOf", "oneOf"},
528 in_subvalues={"definitions", "patternProperties", "properties"},
529 ),
530)
531#: JSON Schema draft 3
532DRAFT3 = Specification(
533 name="draft-03",
534 id_of=_legacy_id,
535 subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
536 in_subarray={"extends"},
537 in_subvalues={"definitions", "patternProperties", "properties"},
538 ),
539 anchors_in=_legacy_anchor_in_id,
540 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
541 in_value={"additionalItems", "additionalProperties"},
542 in_subarray={"extends"},
543 in_subvalues={"definitions", "patternProperties", "properties"},
544 ),
545)
548_SPECIFICATIONS: Registry[Specification[Schema]] = Registry(
549 { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types
550 dialect_id: Resource.opaque(specification)
551 for dialect_id, specification in [
552 ("https://json-schema.org/draft/2020-12/schema", DRAFT202012),
553 ("https://json-schema.org/draft/2019-09/schema", DRAFT201909),
554 ("http://json-schema.org/draft-07/schema", DRAFT7),
555 ("http://json-schema.org/draft-06/schema", DRAFT6),
556 ("http://json-schema.org/draft-04/schema", DRAFT4),
557 ("http://json-schema.org/draft-03/schema", DRAFT3),
558 ]
559 },
560)
563def specification_with(
564 dialect_id: URI,
565 default: Specification[Any] | _Unset = _UNSET,
566) -> Specification[Any]:
567 """
568 Retrieve the `Specification` with the given dialect identifier.
570 Raises:
572 `UnknownDialect`
574 if the given ``dialect_id`` isn't known
575 """
576 resource = _SPECIFICATIONS.get(dialect_id.rstrip("#"))
577 if resource is not None:
578 return resource.contents
579 if default is _UNSET:
580 raise UnknownDialect(dialect_id)
581 return default
584@frozen
585class DynamicAnchor:
586 """
587 Dynamic anchors, introduced in draft 2020.
588 """
590 name: str
591 resource: Resource[Schema]
593 def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]:
594 """
595 Resolve this anchor dynamically.
596 """
597 last = self.resource
598 for uri, registry in resolver.dynamic_scope():
599 try:
600 anchor = registry.anchor(uri, self.name).value
601 except exceptions.NoSuchAnchor:
602 continue
603 if isinstance(anchor, DynamicAnchor):
604 last = anchor.resource
605 return _Resolved(
606 contents=last.contents,
607 resolver=resolver.in_subresource(last),
608 )
611def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]:
612 """
613 Recursive references (via recursive anchors), present only in draft 2019.
615 As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive
616 reference is supported (and is therefore assumed to be the relevant
617 reference).
618 """
619 resolved = resolver.lookup("#")
620 if isinstance(resolved.contents, Mapping) and resolved.contents.get(
621 "$recursiveAnchor",
622 ):
623 for uri, _ in resolver.dynamic_scope():
624 next_resolved = resolver.lookup(uri)
625 if not isinstance(
626 next_resolved.contents,
627 Mapping,
628 ) or not next_resolved.contents.get("$recursiveAnchor"):
629 break
630 resolved = next_resolved
631 return resolved