1"""
2Referencing implementations for JSON Schema specs (historic & current).
3"""
4
5from __future__ import annotations
6
7from collections.abc import Sequence, Set
8from typing import Any, Iterable, Union
9
10from referencing import Anchor, Registry, Resource, Specification, exceptions
11from referencing._attrs import frozen
12from referencing._core import (
13 _UNSET, # type: ignore[reportPrivateUsage]
14 Resolved as _Resolved,
15 Resolver as _Resolver,
16 _Unset, # type: ignore[reportPrivateUsage]
17)
18from referencing.typing import URI, Anchor as AnchorType, Mapping
19
20#: A JSON Schema which is a JSON object
21ObjectSchema = Mapping[str, Any]
22
23#: A JSON Schema of any kind
24Schema = Union[bool, ObjectSchema]
25
26#: A Resource whose contents are JSON Schemas
27SchemaResource = Resource[Schema]
28
29#: A JSON Schema Registry
30SchemaRegistry = Registry[Schema]
31
32#: The empty JSON Schema Registry
33EMPTY_REGISTRY: SchemaRegistry = Registry()
34
35
36@frozen
37class UnknownDialect(Exception):
38 """
39 A dialect identifier was found for a dialect unknown by this library.
40
41 If it's a custom ("unofficial") dialect, be sure you've registered it.
42 """
43
44 uri: URI
45
46
47def _dollar_id(contents: Schema) -> URI | None:
48 if isinstance(contents, bool):
49 return
50 return contents.get("$id")
51
52
53def _legacy_dollar_id(contents: Schema) -> URI | None:
54 if isinstance(contents, bool) or "$ref" in contents:
55 return
56 id = contents.get("$id")
57 if id is not None and not id.startswith("#"):
58 return id
59
60
61def _legacy_id(contents: ObjectSchema) -> URI | None:
62 if "$ref" in contents:
63 return
64 id = contents.get("id")
65 if id is not None and not id.startswith("#"):
66 return id
67
68
69def _anchor(
70 specification: Specification[Schema],
71 contents: Schema,
72) -> Iterable[AnchorType[Schema]]:
73 if isinstance(contents, bool):
74 return
75 anchor = contents.get("$anchor")
76 if anchor is not None:
77 yield Anchor(
78 name=anchor,
79 resource=specification.create_resource(contents),
80 )
81
82 dynamic_anchor = contents.get("$dynamicAnchor")
83 if dynamic_anchor is not None:
84 yield DynamicAnchor(
85 name=dynamic_anchor,
86 resource=specification.create_resource(contents),
87 )
88
89
90def _anchor_2019(
91 specification: Specification[Schema],
92 contents: Schema,
93) -> Iterable[Anchor[Schema]]:
94 if isinstance(contents, bool):
95 return []
96 anchor = contents.get("$anchor")
97 if anchor is None:
98 return []
99 return [
100 Anchor(
101 name=anchor,
102 resource=specification.create_resource(contents),
103 ),
104 ]
105
106
107def _legacy_anchor_in_dollar_id(
108 specification: Specification[Schema],
109 contents: Schema,
110) -> Iterable[Anchor[Schema]]:
111 if isinstance(contents, bool):
112 return []
113 id = contents.get("$id", "")
114 if not id.startswith("#"):
115 return []
116 return [
117 Anchor(
118 name=id[1:],
119 resource=specification.create_resource(contents),
120 ),
121 ]
122
123
124def _legacy_anchor_in_id(
125 specification: Specification[ObjectSchema],
126 contents: ObjectSchema,
127) -> Iterable[Anchor[ObjectSchema]]:
128 id = contents.get("id", "")
129 if not id.startswith("#"):
130 return []
131 return [
132 Anchor(
133 name=id[1:],
134 resource=specification.create_resource(contents),
135 ),
136 ]
137
138
139def _subresources_of(
140 in_value: Set[str] = frozenset(),
141 in_subvalues: Set[str] = frozenset(),
142 in_subarray: Set[str] = frozenset(),
143):
144 """
145 Create a callable returning JSON Schema specification-style subschemas.
146
147 Relies on specifying the set of keywords containing subschemas in their
148 values, in a subobject's values, or in a subarray.
149 """
150
151 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
152 if isinstance(contents, bool):
153 return
154 for each in in_value:
155 if each in contents:
156 yield contents[each]
157 for each in in_subarray:
158 if each in contents:
159 yield from contents[each]
160 for each in in_subvalues:
161 if each in contents:
162 yield from contents[each].values()
163
164 return subresources_of
165
166
167def _subresources_of_with_crazy_items(
168 in_value: Set[str] = frozenset(),
169 in_subvalues: Set[str] = frozenset(),
170 in_subarray: Set[str] = frozenset(),
171):
172 """
173 Specifically handle older drafts where there are some funky keywords.
174 """
175
176 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
177 if isinstance(contents, bool):
178 return
179 for each in in_value:
180 if each in contents:
181 yield contents[each]
182 for each in in_subarray:
183 if each in contents:
184 yield from contents[each]
185 for each in in_subvalues:
186 if each in contents:
187 yield from contents[each].values()
188
189 items = contents.get("items")
190 if items is not None:
191 if isinstance(items, Sequence):
192 yield from items
193 else:
194 yield items
195
196 return subresources_of
197
198
199def _subresources_of_with_crazy_items_dependencies(
200 in_value: Set[str] = frozenset(),
201 in_subvalues: Set[str] = frozenset(),
202 in_subarray: Set[str] = frozenset(),
203):
204 """
205 Specifically handle older drafts where there are some funky keywords.
206 """
207
208 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]:
209 if isinstance(contents, bool):
210 return
211 for each in in_value:
212 if each in contents:
213 yield contents[each]
214 for each in in_subarray:
215 if each in contents:
216 yield from contents[each]
217 for each in in_subvalues:
218 if each in contents:
219 yield from contents[each].values()
220
221 items = contents.get("items")
222 if items is not None:
223 if isinstance(items, Sequence):
224 yield from items
225 else:
226 yield items
227 dependencies = contents.get("dependencies")
228 if dependencies is not None:
229 values = iter(dependencies.values())
230 value = next(values, None)
231 if isinstance(value, Mapping):
232 yield value
233 yield from values
234
235 return subresources_of
236
237
238def _subresources_of_with_crazy_aP_items_dependencies(
239 in_value: Set[str] = frozenset(),
240 in_subvalues: Set[str] = frozenset(),
241 in_subarray: Set[str] = frozenset(),
242):
243 """
244 Specifically handle even older drafts where there are some funky keywords.
245 """
246
247 def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]:
248 for each in in_value:
249 if each in contents:
250 yield contents[each]
251 for each in in_subarray:
252 if each in contents:
253 yield from contents[each]
254 for each in in_subvalues:
255 if each in contents:
256 yield from contents[each].values()
257
258 items = contents.get("items")
259 if items is not None:
260 if isinstance(items, Sequence):
261 yield from items
262 else:
263 yield items
264 dependencies = contents.get("dependencies")
265 if dependencies is not None:
266 values = iter(dependencies.values())
267 value = next(values, None)
268 if isinstance(value, Mapping):
269 yield value
270 yield from values
271
272 for each in "additionalItems", "additionalProperties":
273 value = contents.get(each)
274 if isinstance(value, Mapping):
275 yield value
276
277 return subresources_of
278
279
280def _maybe_in_subresource(
281 in_value: Set[str] = frozenset(),
282 in_subvalues: Set[str] = frozenset(),
283 in_subarray: Set[str] = frozenset(),
284):
285 in_child = in_subvalues | in_subarray
286
287 def maybe_in_subresource(
288 segments: Sequence[int | str],
289 resolver: _Resolver[Any],
290 subresource: Resource[Any],
291 ) -> _Resolver[Any]:
292 _segments = iter(segments)
293 for segment in _segments:
294 if segment not in in_value and (
295 segment not in in_child or next(_segments, None) is None
296 ):
297 return resolver
298 return resolver.in_subresource(subresource)
299
300 return maybe_in_subresource
301
302
303def _maybe_in_subresource_crazy_items(
304 in_value: Set[str] = frozenset(),
305 in_subvalues: Set[str] = frozenset(),
306 in_subarray: Set[str] = frozenset(),
307):
308 in_child = in_subvalues | in_subarray
309
310 def maybe_in_subresource(
311 segments: Sequence[int | str],
312 resolver: _Resolver[Any],
313 subresource: Resource[Any],
314 ) -> _Resolver[Any]:
315 _segments = iter(segments)
316 for segment in _segments:
317 if segment == "items" and isinstance(
318 subresource.contents,
319 Mapping,
320 ):
321 return resolver.in_subresource(subresource)
322 if segment not in in_value and (
323 segment not in in_child or next(_segments, None) is None
324 ):
325 return resolver
326 return resolver.in_subresource(subresource)
327
328 return maybe_in_subresource
329
330
331def _maybe_in_subresource_crazy_items_dependencies(
332 in_value: Set[str] = frozenset(),
333 in_subvalues: Set[str] = frozenset(),
334 in_subarray: Set[str] = frozenset(),
335):
336 in_child = in_subvalues | in_subarray
337
338 def maybe_in_subresource(
339 segments: Sequence[int | str],
340 resolver: _Resolver[Any],
341 subresource: Resource[Any],
342 ) -> _Resolver[Any]:
343 _segments = iter(segments)
344 for segment in _segments:
345 if segment in {"items", "dependencies"} and isinstance(
346 subresource.contents,
347 Mapping,
348 ):
349 return resolver.in_subresource(subresource)
350 if segment not in in_value and (
351 segment not in in_child or next(_segments, None) is None
352 ):
353 return resolver
354 return resolver.in_subresource(subresource)
355
356 return maybe_in_subresource
357
358
359#: JSON Schema draft 2020-12
360DRAFT202012 = Specification(
361 name="draft2020-12",
362 id_of=_dollar_id,
363 subresources_of=_subresources_of(
364 in_value={
365 "additionalProperties",
366 "contains",
367 "contentSchema",
368 "else",
369 "if",
370 "items",
371 "not",
372 "propertyNames",
373 "then",
374 "unevaluatedItems",
375 "unevaluatedProperties",
376 },
377 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
378 in_subvalues={
379 "$defs",
380 "definitions",
381 "dependentSchemas",
382 "patternProperties",
383 "properties",
384 },
385 ),
386 anchors_in=_anchor,
387 maybe_in_subresource=_maybe_in_subresource(
388 in_value={
389 "additionalProperties",
390 "contains",
391 "contentSchema",
392 "else",
393 "if",
394 "items",
395 "not",
396 "propertyNames",
397 "then",
398 "unevaluatedItems",
399 "unevaluatedProperties",
400 },
401 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"},
402 in_subvalues={
403 "$defs",
404 "definitions",
405 "dependentSchemas",
406 "patternProperties",
407 "properties",
408 },
409 ),
410)
411#: JSON Schema draft 2019-09
412DRAFT201909 = Specification(
413 name="draft2019-09",
414 id_of=_dollar_id,
415 subresources_of=_subresources_of_with_crazy_items(
416 in_value={
417 "additionalItems",
418 "additionalProperties",
419 "contains",
420 "contentSchema",
421 "else",
422 "if",
423 "not",
424 "propertyNames",
425 "then",
426 "unevaluatedItems",
427 "unevaluatedProperties",
428 },
429 in_subarray={"allOf", "anyOf", "oneOf"},
430 in_subvalues={
431 "$defs",
432 "definitions",
433 "dependentSchemas",
434 "patternProperties",
435 "properties",
436 },
437 ),
438 anchors_in=_anchor_2019, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real
439 maybe_in_subresource=_maybe_in_subresource_crazy_items(
440 in_value={
441 "additionalItems",
442 "additionalProperties",
443 "contains",
444 "contentSchema",
445 "else",
446 "if",
447 "not",
448 "propertyNames",
449 "then",
450 "unevaluatedItems",
451 "unevaluatedProperties",
452 },
453 in_subarray={"allOf", "anyOf", "oneOf"},
454 in_subvalues={
455 "$defs",
456 "definitions",
457 "dependentSchemas",
458 "patternProperties",
459 "properties",
460 },
461 ),
462)
463#: JSON Schema draft 7
464DRAFT7 = Specification(
465 name="draft-07",
466 id_of=_legacy_dollar_id,
467 subresources_of=_subresources_of_with_crazy_items_dependencies(
468 in_value={
469 "additionalItems",
470 "additionalProperties",
471 "contains",
472 "else",
473 "if",
474 "not",
475 "propertyNames",
476 "then",
477 },
478 in_subarray={"allOf", "anyOf", "oneOf"},
479 in_subvalues={"definitions", "patternProperties", "properties"},
480 ),
481 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real
482 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
483 in_value={
484 "additionalItems",
485 "additionalProperties",
486 "contains",
487 "else",
488 "if",
489 "not",
490 "propertyNames",
491 "then",
492 },
493 in_subarray={"allOf", "anyOf", "oneOf"},
494 in_subvalues={"definitions", "patternProperties", "properties"},
495 ),
496)
497#: JSON Schema draft 6
498DRAFT6 = Specification(
499 name="draft-06",
500 id_of=_legacy_dollar_id,
501 subresources_of=_subresources_of_with_crazy_items_dependencies(
502 in_value={
503 "additionalItems",
504 "additionalProperties",
505 "contains",
506 "not",
507 "propertyNames",
508 },
509 in_subarray={"allOf", "anyOf", "oneOf"},
510 in_subvalues={"definitions", "patternProperties", "properties"},
511 ),
512 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real
513 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
514 in_value={
515 "additionalItems",
516 "additionalProperties",
517 "contains",
518 "not",
519 "propertyNames",
520 },
521 in_subarray={"allOf", "anyOf", "oneOf"},
522 in_subvalues={"definitions", "patternProperties", "properties"},
523 ),
524)
525#: JSON Schema draft 4
526DRAFT4 = Specification(
527 name="draft-04",
528 id_of=_legacy_id,
529 subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
530 in_value={"not"},
531 in_subarray={"allOf", "anyOf", "oneOf"},
532 in_subvalues={"definitions", "patternProperties", "properties"},
533 ),
534 anchors_in=_legacy_anchor_in_id,
535 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
536 in_value={"additionalItems", "additionalProperties", "not"},
537 in_subarray={"allOf", "anyOf", "oneOf"},
538 in_subvalues={"definitions", "patternProperties", "properties"},
539 ),
540)
541#: JSON Schema draft 3
542DRAFT3 = Specification(
543 name="draft-03",
544 id_of=_legacy_id,
545 subresources_of=_subresources_of_with_crazy_aP_items_dependencies(
546 in_subarray={"extends"},
547 in_subvalues={"definitions", "patternProperties", "properties"},
548 ),
549 anchors_in=_legacy_anchor_in_id,
550 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies(
551 in_value={"additionalItems", "additionalProperties"},
552 in_subarray={"extends"},
553 in_subvalues={"definitions", "patternProperties", "properties"},
554 ),
555)
556
557
558_SPECIFICATIONS: Registry[Specification[Schema]] = Registry(
559 { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types
560 dialect_id: Resource.opaque(specification)
561 for dialect_id, specification in [
562 ("https://json-schema.org/draft/2020-12/schema", DRAFT202012),
563 ("https://json-schema.org/draft/2019-09/schema", DRAFT201909),
564 ("http://json-schema.org/draft-07/schema", DRAFT7),
565 ("http://json-schema.org/draft-06/schema", DRAFT6),
566 ("http://json-schema.org/draft-04/schema", DRAFT4),
567 ("http://json-schema.org/draft-03/schema", DRAFT3),
568 ]
569 },
570)
571
572
573def specification_with(
574 dialect_id: URI,
575 default: Specification[Any] | _Unset = _UNSET,
576) -> Specification[Any]:
577 """
578 Retrieve the `Specification` with the given dialect identifier.
579
580 Raises:
581
582 `UnknownDialect`
583
584 if the given ``dialect_id`` isn't known
585
586 """
587 resource = _SPECIFICATIONS.get(dialect_id.rstrip("#"))
588 if resource is not None:
589 return resource.contents
590 if default is _UNSET:
591 raise UnknownDialect(dialect_id)
592 return default
593
594
595@frozen
596class DynamicAnchor:
597 """
598 Dynamic anchors, introduced in draft 2020.
599 """
600
601 name: str
602 resource: SchemaResource
603
604 def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]:
605 """
606 Resolve this anchor dynamically.
607 """
608 last = self.resource
609 for uri, registry in resolver.dynamic_scope():
610 try:
611 anchor = registry.anchor(uri, self.name).value
612 except exceptions.NoSuchAnchor:
613 continue
614 if isinstance(anchor, DynamicAnchor):
615 last = anchor.resource
616 return _Resolved(
617 contents=last.contents,
618 resolver=resolver.in_subresource(last),
619 )
620
621
622def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]:
623 """
624 Recursive references (via recursive anchors), present only in draft 2019.
625
626 As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive
627 reference is supported (and is therefore assumed to be the relevant
628 reference).
629 """
630 resolved = resolver.lookup("#")
631 if isinstance(resolved.contents, Mapping) and resolved.contents.get(
632 "$recursiveAnchor",
633 ):
634 for uri, _ in resolver.dynamic_scope():
635 next_resolved = resolver.lookup(uri)
636 if not isinstance(
637 next_resolved.contents,
638 Mapping,
639 ) or not next_resolved.contents.get("$recursiveAnchor"):
640 break
641 resolved = next_resolved
642 return resolved