1""" 
    2Referencing implementations for JSON Schema specs (historic & current). 
    3""" 
    4 
    5from __future__ import annotations 
    6 
    7from collections.abc import Sequence, Set 
    8from typing import Any, Iterable, Union 
    9 
    10from referencing import Anchor, Registry, Resource, Specification, exceptions 
    11from referencing._attrs import frozen 
    12from referencing._core import ( 
    13    _UNSET,  # type: ignore[reportPrivateUsage] 
    14    Resolved as _Resolved, 
    15    Resolver as _Resolver, 
    16    _Unset,  # type: ignore[reportPrivateUsage] 
    17) 
    18from referencing.typing import URI, Anchor as AnchorType, Mapping 
    19 
    20#: A JSON Schema which is a JSON object 
    21ObjectSchema = Mapping[str, Any] 
    22 
    23#: A JSON Schema of any kind 
    24Schema = Union[bool, ObjectSchema] 
    25 
    26#: A Resource whose contents are JSON Schemas 
    27SchemaResource = Resource[Schema] 
    28 
    29#: A JSON Schema Registry 
    30SchemaRegistry = Registry[Schema] 
    31 
    32#: The empty JSON Schema Registry 
    33EMPTY_REGISTRY: SchemaRegistry = Registry() 
    34 
    35 
    36@frozen 
    37class UnknownDialect(Exception): 
    38    """ 
    39    A dialect identifier was found for a dialect unknown by this library. 
    40 
    41    If it's a custom ("unofficial") dialect, be sure you've registered it. 
    42    """ 
    43 
    44    uri: URI 
    45 
    46 
    47def _dollar_id(contents: Schema) -> URI | None: 
    48    if isinstance(contents, bool): 
    49        return 
    50    return contents.get("$id") 
    51 
    52 
    53def _legacy_dollar_id(contents: Schema) -> URI | None: 
    54    if isinstance(contents, bool) or "$ref" in contents: 
    55        return 
    56    id = contents.get("$id") 
    57    if id is not None and not id.startswith("#"): 
    58        return id 
    59 
    60 
    61def _legacy_id(contents: ObjectSchema) -> URI | None: 
    62    if "$ref" in contents: 
    63        return 
    64    id = contents.get("id") 
    65    if id is not None and not id.startswith("#"): 
    66        return id 
    67 
    68 
    69def _anchor( 
    70    specification: Specification[Schema], 
    71    contents: Schema, 
    72) -> Iterable[AnchorType[Schema]]: 
    73    if isinstance(contents, bool): 
    74        return 
    75    anchor = contents.get("$anchor") 
    76    if anchor is not None: 
    77        yield Anchor( 
    78            name=anchor, 
    79            resource=specification.create_resource(contents), 
    80        ) 
    81 
    82    dynamic_anchor = contents.get("$dynamicAnchor") 
    83    if dynamic_anchor is not None: 
    84        yield DynamicAnchor( 
    85            name=dynamic_anchor, 
    86            resource=specification.create_resource(contents), 
    87        ) 
    88 
    89 
    90def _anchor_2019( 
    91    specification: Specification[Schema], 
    92    contents: Schema, 
    93) -> Iterable[Anchor[Schema]]: 
    94    if isinstance(contents, bool): 
    95        return [] 
    96    anchor = contents.get("$anchor") 
    97    if anchor is None: 
    98        return [] 
    99    return [ 
    100        Anchor( 
    101            name=anchor, 
    102            resource=specification.create_resource(contents), 
    103        ), 
    104    ] 
    105 
    106 
    107def _legacy_anchor_in_dollar_id( 
    108    specification: Specification[Schema], 
    109    contents: Schema, 
    110) -> Iterable[Anchor[Schema]]: 
    111    if isinstance(contents, bool): 
    112        return [] 
    113    id = contents.get("$id", "") 
    114    if not id.startswith("#"): 
    115        return [] 
    116    return [ 
    117        Anchor( 
    118            name=id[1:], 
    119            resource=specification.create_resource(contents), 
    120        ), 
    121    ] 
    122 
    123 
    124def _legacy_anchor_in_id( 
    125    specification: Specification[ObjectSchema], 
    126    contents: ObjectSchema, 
    127) -> Iterable[Anchor[ObjectSchema]]: 
    128    id = contents.get("id", "") 
    129    if not id.startswith("#"): 
    130        return [] 
    131    return [ 
    132        Anchor( 
    133            name=id[1:], 
    134            resource=specification.create_resource(contents), 
    135        ), 
    136    ] 
    137 
    138 
    139def _subresources_of( 
    140    in_value: Set[str] = frozenset(), 
    141    in_subvalues: Set[str] = frozenset(), 
    142    in_subarray: Set[str] = frozenset(), 
    143): 
    144    """ 
    145    Create a callable returning JSON Schema specification-style subschemas. 
    146 
    147    Relies on specifying the set of keywords containing subschemas in their 
    148    values, in a subobject's values, or in a subarray. 
    149    """ 
    150 
    151    def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 
    152        if isinstance(contents, bool): 
    153            return 
    154        for each in in_value: 
    155            if each in contents: 
    156                yield contents[each] 
    157        for each in in_subarray: 
    158            if each in contents: 
    159                yield from contents[each] 
    160        for each in in_subvalues: 
    161            if each in contents: 
    162                yield from contents[each].values() 
    163 
    164    return subresources_of 
    165 
    166 
    167def _subresources_of_with_crazy_items( 
    168    in_value: Set[str] = frozenset(), 
    169    in_subvalues: Set[str] = frozenset(), 
    170    in_subarray: Set[str] = frozenset(), 
    171): 
    172    """ 
    173    Specifically handle older drafts where there are some funky keywords. 
    174    """ 
    175 
    176    def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 
    177        if isinstance(contents, bool): 
    178            return 
    179        for each in in_value: 
    180            if each in contents: 
    181                yield contents[each] 
    182        for each in in_subarray: 
    183            if each in contents: 
    184                yield from contents[each] 
    185        for each in in_subvalues: 
    186            if each in contents: 
    187                yield from contents[each].values() 
    188 
    189        items = contents.get("items") 
    190        if items is not None: 
    191            if isinstance(items, Sequence): 
    192                yield from items 
    193            else: 
    194                yield items 
    195 
    196    return subresources_of 
    197 
    198 
    199def _subresources_of_with_crazy_items_dependencies( 
    200    in_value: Set[str] = frozenset(), 
    201    in_subvalues: Set[str] = frozenset(), 
    202    in_subarray: Set[str] = frozenset(), 
    203): 
    204    """ 
    205    Specifically handle older drafts where there are some funky keywords. 
    206    """ 
    207 
    208    def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 
    209        if isinstance(contents, bool): 
    210            return 
    211        for each in in_value: 
    212            if each in contents: 
    213                yield contents[each] 
    214        for each in in_subarray: 
    215            if each in contents: 
    216                yield from contents[each] 
    217        for each in in_subvalues: 
    218            if each in contents: 
    219                yield from contents[each].values() 
    220 
    221        items = contents.get("items") 
    222        if items is not None: 
    223            if isinstance(items, Sequence): 
    224                yield from items 
    225            else: 
    226                yield items 
    227        dependencies = contents.get("dependencies") 
    228        if dependencies is not None: 
    229            values = iter(dependencies.values()) 
    230            value = next(values, None) 
    231            if isinstance(value, Mapping): 
    232                yield value 
    233                yield from values 
    234 
    235    return subresources_of 
    236 
    237 
    238def _subresources_of_with_crazy_aP_items_dependencies( 
    239    in_value: Set[str] = frozenset(), 
    240    in_subvalues: Set[str] = frozenset(), 
    241    in_subarray: Set[str] = frozenset(), 
    242): 
    243    """ 
    244    Specifically handle even older drafts where there are some funky keywords. 
    245    """ 
    246 
    247    def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]: 
    248        for each in in_value: 
    249            if each in contents: 
    250                yield contents[each] 
    251        for each in in_subarray: 
    252            if each in contents: 
    253                yield from contents[each] 
    254        for each in in_subvalues: 
    255            if each in contents: 
    256                yield from contents[each].values() 
    257 
    258        items = contents.get("items") 
    259        if items is not None: 
    260            if isinstance(items, Sequence): 
    261                yield from items 
    262            else: 
    263                yield items 
    264        dependencies = contents.get("dependencies") 
    265        if dependencies is not None: 
    266            values = iter(dependencies.values()) 
    267            value = next(values, None) 
    268            if isinstance(value, Mapping): 
    269                yield value 
    270                yield from values 
    271 
    272        for each in "additionalItems", "additionalProperties": 
    273            value = contents.get(each) 
    274            if isinstance(value, Mapping): 
    275                yield value 
    276 
    277    return subresources_of 
    278 
    279 
    280def _maybe_in_subresource( 
    281    in_value: Set[str] = frozenset(), 
    282    in_subvalues: Set[str] = frozenset(), 
    283    in_subarray: Set[str] = frozenset(), 
    284): 
    285    in_child = in_subvalues | in_subarray 
    286 
    287    def maybe_in_subresource( 
    288        segments: Sequence[int | str], 
    289        resolver: _Resolver[Any], 
    290        subresource: Resource[Any], 
    291    ) -> _Resolver[Any]: 
    292        _segments = iter(segments) 
    293        for segment in _segments: 
    294            if segment not in in_value and ( 
    295                segment not in in_child or next(_segments, None) is None 
    296            ): 
    297                return resolver 
    298        return resolver.in_subresource(subresource) 
    299 
    300    return maybe_in_subresource 
    301 
    302 
    303def _maybe_in_subresource_crazy_items( 
    304    in_value: Set[str] = frozenset(), 
    305    in_subvalues: Set[str] = frozenset(), 
    306    in_subarray: Set[str] = frozenset(), 
    307): 
    308    in_child = in_subvalues | in_subarray 
    309 
    310    def maybe_in_subresource( 
    311        segments: Sequence[int | str], 
    312        resolver: _Resolver[Any], 
    313        subresource: Resource[Any], 
    314    ) -> _Resolver[Any]: 
    315        _segments = iter(segments) 
    316        for segment in _segments: 
    317            if segment == "items" and isinstance( 
    318                subresource.contents, 
    319                Mapping, 
    320            ): 
    321                return resolver.in_subresource(subresource) 
    322            if segment not in in_value and ( 
    323                segment not in in_child or next(_segments, None) is None 
    324            ): 
    325                return resolver 
    326        return resolver.in_subresource(subresource) 
    327 
    328    return maybe_in_subresource 
    329 
    330 
    331def _maybe_in_subresource_crazy_items_dependencies( 
    332    in_value: Set[str] = frozenset(), 
    333    in_subvalues: Set[str] = frozenset(), 
    334    in_subarray: Set[str] = frozenset(), 
    335): 
    336    in_child = in_subvalues | in_subarray 
    337 
    338    def maybe_in_subresource( 
    339        segments: Sequence[int | str], 
    340        resolver: _Resolver[Any], 
    341        subresource: Resource[Any], 
    342    ) -> _Resolver[Any]: 
    343        _segments = iter(segments) 
    344        for segment in _segments: 
    345            if segment in {"items", "dependencies"} and isinstance( 
    346                subresource.contents, 
    347                Mapping, 
    348            ): 
    349                return resolver.in_subresource(subresource) 
    350            if segment not in in_value and ( 
    351                segment not in in_child or next(_segments, None) is None 
    352            ): 
    353                return resolver 
    354        return resolver.in_subresource(subresource) 
    355 
    356    return maybe_in_subresource 
    357 
    358 
    359#: JSON Schema draft 2020-12 
    360DRAFT202012 = Specification( 
    361    name="draft2020-12", 
    362    id_of=_dollar_id, 
    363    subresources_of=_subresources_of( 
    364        in_value={ 
    365            "additionalProperties", 
    366            "contains", 
    367            "contentSchema", 
    368            "else", 
    369            "if", 
    370            "items", 
    371            "not", 
    372            "propertyNames", 
    373            "then", 
    374            "unevaluatedItems", 
    375            "unevaluatedProperties", 
    376        }, 
    377        in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 
    378        in_subvalues={ 
    379            "$defs", 
    380            "definitions", 
    381            "dependentSchemas", 
    382            "patternProperties", 
    383            "properties", 
    384        }, 
    385    ), 
    386    anchors_in=_anchor, 
    387    maybe_in_subresource=_maybe_in_subresource( 
    388        in_value={ 
    389            "additionalProperties", 
    390            "contains", 
    391            "contentSchema", 
    392            "else", 
    393            "if", 
    394            "items", 
    395            "not", 
    396            "propertyNames", 
    397            "then", 
    398            "unevaluatedItems", 
    399            "unevaluatedProperties", 
    400        }, 
    401        in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 
    402        in_subvalues={ 
    403            "$defs", 
    404            "definitions", 
    405            "dependentSchemas", 
    406            "patternProperties", 
    407            "properties", 
    408        }, 
    409    ), 
    410) 
    411#: JSON Schema draft 2019-09 
    412DRAFT201909 = Specification( 
    413    name="draft2019-09", 
    414    id_of=_dollar_id, 
    415    subresources_of=_subresources_of_with_crazy_items( 
    416        in_value={ 
    417            "additionalItems", 
    418            "additionalProperties", 
    419            "contains", 
    420            "contentSchema", 
    421            "else", 
    422            "if", 
    423            "not", 
    424            "propertyNames", 
    425            "then", 
    426            "unevaluatedItems", 
    427            "unevaluatedProperties", 
    428        }, 
    429        in_subarray={"allOf", "anyOf", "oneOf"}, 
    430        in_subvalues={ 
    431            "$defs", 
    432            "definitions", 
    433            "dependentSchemas", 
    434            "patternProperties", 
    435            "properties", 
    436        }, 
    437    ), 
    438    anchors_in=_anchor_2019,  # type: ignore[reportGeneralTypeIssues]  # TODO: check whether this is real 
    439    maybe_in_subresource=_maybe_in_subresource_crazy_items( 
    440        in_value={ 
    441            "additionalItems", 
    442            "additionalProperties", 
    443            "contains", 
    444            "contentSchema", 
    445            "else", 
    446            "if", 
    447            "not", 
    448            "propertyNames", 
    449            "then", 
    450            "unevaluatedItems", 
    451            "unevaluatedProperties", 
    452        }, 
    453        in_subarray={"allOf", "anyOf", "oneOf"}, 
    454        in_subvalues={ 
    455            "$defs", 
    456            "definitions", 
    457            "dependentSchemas", 
    458            "patternProperties", 
    459            "properties", 
    460        }, 
    461    ), 
    462) 
    463#: JSON Schema draft 7 
    464DRAFT7 = Specification( 
    465    name="draft-07", 
    466    id_of=_legacy_dollar_id, 
    467    subresources_of=_subresources_of_with_crazy_items_dependencies( 
    468        in_value={ 
    469            "additionalItems", 
    470            "additionalProperties", 
    471            "contains", 
    472            "else", 
    473            "if", 
    474            "not", 
    475            "propertyNames", 
    476            "then", 
    477        }, 
    478        in_subarray={"allOf", "anyOf", "oneOf"}, 
    479        in_subvalues={"definitions", "patternProperties", "properties"}, 
    480    ), 
    481    anchors_in=_legacy_anchor_in_dollar_id,  # type: ignore[reportGeneralTypeIssues]  # TODO: check whether this is real 
    482    maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 
    483        in_value={ 
    484            "additionalItems", 
    485            "additionalProperties", 
    486            "contains", 
    487            "else", 
    488            "if", 
    489            "not", 
    490            "propertyNames", 
    491            "then", 
    492        }, 
    493        in_subarray={"allOf", "anyOf", "oneOf"}, 
    494        in_subvalues={"definitions", "patternProperties", "properties"}, 
    495    ), 
    496) 
    497#: JSON Schema draft 6 
    498DRAFT6 = Specification( 
    499    name="draft-06", 
    500    id_of=_legacy_dollar_id, 
    501    subresources_of=_subresources_of_with_crazy_items_dependencies( 
    502        in_value={ 
    503            "additionalItems", 
    504            "additionalProperties", 
    505            "contains", 
    506            "not", 
    507            "propertyNames", 
    508        }, 
    509        in_subarray={"allOf", "anyOf", "oneOf"}, 
    510        in_subvalues={"definitions", "patternProperties", "properties"}, 
    511    ), 
    512    anchors_in=_legacy_anchor_in_dollar_id,  # type: ignore[reportGeneralTypeIssues]  # TODO: check whether this is real 
    513    maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 
    514        in_value={ 
    515            "additionalItems", 
    516            "additionalProperties", 
    517            "contains", 
    518            "not", 
    519            "propertyNames", 
    520        }, 
    521        in_subarray={"allOf", "anyOf", "oneOf"}, 
    522        in_subvalues={"definitions", "patternProperties", "properties"}, 
    523    ), 
    524) 
    525#: JSON Schema draft 4 
    526DRAFT4 = Specification( 
    527    name="draft-04", 
    528    id_of=_legacy_id, 
    529    subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 
    530        in_value={"not"}, 
    531        in_subarray={"allOf", "anyOf", "oneOf"}, 
    532        in_subvalues={"definitions", "patternProperties", "properties"}, 
    533    ), 
    534    anchors_in=_legacy_anchor_in_id, 
    535    maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 
    536        in_value={"additionalItems", "additionalProperties", "not"}, 
    537        in_subarray={"allOf", "anyOf", "oneOf"}, 
    538        in_subvalues={"definitions", "patternProperties", "properties"}, 
    539    ), 
    540) 
    541#: JSON Schema draft 3 
    542DRAFT3 = Specification( 
    543    name="draft-03", 
    544    id_of=_legacy_id, 
    545    subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 
    546        in_subarray={"extends"}, 
    547        in_subvalues={"definitions", "patternProperties", "properties"}, 
    548    ), 
    549    anchors_in=_legacy_anchor_in_id, 
    550    maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 
    551        in_value={"additionalItems", "additionalProperties"}, 
    552        in_subarray={"extends"}, 
    553        in_subvalues={"definitions", "patternProperties", "properties"}, 
    554    ), 
    555) 
    556 
    557 
    558_SPECIFICATIONS: Registry[Specification[Schema]] = Registry( 
    559    {  # type: ignore[reportGeneralTypeIssues]  # :/ internal vs external types 
    560        dialect_id: Resource.opaque(specification) 
    561        for dialect_id, specification in [ 
    562            ("https://json-schema.org/draft/2020-12/schema", DRAFT202012), 
    563            ("https://json-schema.org/draft/2019-09/schema", DRAFT201909), 
    564            ("http://json-schema.org/draft-07/schema", DRAFT7), 
    565            ("http://json-schema.org/draft-06/schema", DRAFT6), 
    566            ("http://json-schema.org/draft-04/schema", DRAFT4), 
    567            ("http://json-schema.org/draft-03/schema", DRAFT3), 
    568        ] 
    569    }, 
    570) 
    571 
    572 
    573def specification_with( 
    574    dialect_id: URI, 
    575    default: Specification[Any] | _Unset = _UNSET, 
    576) -> Specification[Any]: 
    577    """ 
    578    Retrieve the `Specification` with the given dialect identifier. 
    579 
    580    Raises: 
    581 
    582        `UnknownDialect` 
    583 
    584            if the given ``dialect_id`` isn't known 
    585 
    586    """ 
    587    resource = _SPECIFICATIONS.get(dialect_id.rstrip("#")) 
    588    if resource is not None: 
    589        return resource.contents 
    590    if default is _UNSET: 
    591        raise UnknownDialect(dialect_id) 
    592    return default 
    593 
    594 
    595@frozen 
    596class DynamicAnchor: 
    597    """ 
    598    Dynamic anchors, introduced in draft 2020. 
    599    """ 
    600 
    601    name: str 
    602    resource: SchemaResource 
    603 
    604    def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]: 
    605        """ 
    606        Resolve this anchor dynamically. 
    607        """ 
    608        last = self.resource 
    609        for uri, registry in resolver.dynamic_scope(): 
    610            try: 
    611                anchor = registry.anchor(uri, self.name).value 
    612            except exceptions.NoSuchAnchor: 
    613                continue 
    614            if isinstance(anchor, DynamicAnchor): 
    615                last = anchor.resource 
    616        return _Resolved( 
    617            contents=last.contents, 
    618            resolver=resolver.in_subresource(last), 
    619        ) 
    620 
    621 
    622def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]: 
    623    """ 
    624    Recursive references (via recursive anchors), present only in draft 2019. 
    625 
    626    As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive 
    627    reference is supported (and is therefore assumed to be the relevant 
    628    reference). 
    629    """ 
    630    resolved = resolver.lookup("#") 
    631    if isinstance(resolved.contents, Mapping) and resolved.contents.get( 
    632        "$recursiveAnchor", 
    633    ): 
    634        for uri, _ in resolver.dynamic_scope(): 
    635            next_resolved = resolver.lookup(uri) 
    636            if not isinstance( 
    637                next_resolved.contents, 
    638                Mapping, 
    639            ) or not next_resolved.contents.get("$recursiveAnchor"): 
    640                break 
    641            resolved = next_resolved 
    642    return resolved