1from __future__ import annotations
2
3from collections.abc import Iterable, Iterator, Sequence
4from enum import Enum
5from typing import Any, Callable, ClassVar, Generic, Protocol
6from urllib.parse import unquote, urldefrag, urljoin
7
8from attrs import evolve, field
9from rpds import HashTrieMap, HashTrieSet, List
10
11try:
12 from typing_extensions import TypeVar
13except ImportError: # pragma: no cover
14 from typing import TypeVar
15
16from referencing import exceptions
17from referencing._attrs import frozen
18from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve
19
20EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet()
21EMPTY_PREVIOUS_RESOLVERS: List[URI] = List()
22
23
24class _Unset(Enum):
25 """
26 What sillyness...
27 """
28
29 SENTINEL = 1
30
31
32_UNSET = _Unset.SENTINEL
33
34
35class _MaybeInSubresource(Protocol[D]):
36 def __call__(
37 self,
38 segments: Sequence[int | str],
39 resolver: Resolver[D],
40 subresource: Resource[D],
41 ) -> Resolver[D]: ...
42
43
44def _detect_or_error(contents: D) -> Specification[D]:
45 if not isinstance(contents, Mapping):
46 raise exceptions.CannotDetermineSpecification(contents)
47
48 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
49 if not isinstance(jsonschema_dialect_id, str):
50 raise exceptions.CannotDetermineSpecification(contents)
51
52 from referencing.jsonschema import specification_with
53
54 return specification_with(jsonschema_dialect_id)
55
56
57def _detect_or_default(
58 default: Specification[D],
59) -> Callable[[D], Specification[D]]:
60 def _detect(contents: D) -> Specification[D]:
61 if not isinstance(contents, Mapping):
62 return default
63
64 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
65 if jsonschema_dialect_id is None:
66 return default
67
68 from referencing.jsonschema import specification_with
69
70 return specification_with(
71 jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType]
72 default=default,
73 )
74
75 return _detect
76
77
78class _SpecificationDetector:
79 def __get__(
80 self,
81 instance: Specification[D] | None,
82 cls: type[Specification[D]],
83 ) -> Callable[[D], Specification[D]]:
84 if instance is None:
85 return _detect_or_error
86 else:
87 return _detect_or_default(instance)
88
89
90@frozen
91class Specification(Generic[D]):
92 """
93 A specification which defines referencing behavior.
94
95 The various methods of a `Specification` allow for varying referencing
96 behavior across JSON Schema specification versions, etc.
97 """
98
99 #: A short human-readable name for the specification, used for debugging.
100 name: str
101
102 #: Find the ID of a given document.
103 id_of: Callable[[D], URI | None]
104
105 #: Retrieve the subresources of the given document (without traversing into
106 #: the subresources themselves).
107 subresources_of: Callable[[D], Iterable[D]]
108
109 #: While resolving a JSON pointer, conditionally enter a subresource
110 #: (if e.g. we have just entered a keyword whose value is a subresource)
111 maybe_in_subresource: _MaybeInSubresource[D]
112
113 #: Retrieve the anchors contained in the given document.
114 _anchors_in: Callable[
115 [Specification[D], D],
116 Iterable[AnchorType[D]],
117 ] = field(alias="anchors_in")
118
119 #: An opaque specification where resources have no subresources
120 #: nor internal identifiers.
121 OPAQUE: ClassVar[Specification[Any]]
122
123 #: Attempt to discern which specification applies to the given contents.
124 #:
125 #: May be called either as an instance method or as a class method, with
126 #: slightly different behavior in the following case:
127 #:
128 #: Recall that not all contents contains enough internal information about
129 #: which specification it is written for -- the JSON Schema ``{}``,
130 #: for instance, is valid under many different dialects and may be
131 #: interpreted as any one of them.
132 #:
133 #: When this method is used as an instance method (i.e. called on a
134 #: specific specification), that specification is used as the default
135 #: if the given contents are unidentifiable.
136 #:
137 #: On the other hand when called as a class method, an error is raised.
138 #:
139 #: To reiterate, ``DRAFT202012.detect({})`` will return ``DRAFT202012``
140 #: whereas the class method ``Specification.detect({})`` will raise an
141 #: error.
142 #:
143 #: (Note that of course ``DRAFT202012.detect(...)`` may return some other
144 #: specification when given a schema which *does* identify as being for
145 #: another version).
146 #:
147 #: Raises:
148 #:
149 #: `CannotDetermineSpecification`
150 #:
151 #: if the given contents don't have any discernible
152 #: information which could be used to guess which
153 #: specification they identify as
154 detect = _SpecificationDetector()
155
156 def __repr__(self) -> str:
157 return f"<Specification name={self.name!r}>"
158
159 def anchors_in(self, contents: D):
160 """
161 Retrieve the anchors contained in the given document.
162 """
163 return self._anchors_in(self, contents)
164
165 def create_resource(self, contents: D) -> Resource[D]:
166 """
167 Create a resource which is interpreted using this specification.
168 """
169 return Resource(contents=contents, specification=self)
170
171
172Specification.OPAQUE = Specification(
173 name="opaque",
174 id_of=lambda contents: None,
175 subresources_of=lambda contents: [],
176 anchors_in=lambda specification, contents: [],
177 maybe_in_subresource=lambda segments, resolver, subresource: resolver,
178)
179
180
181@frozen
182class Resource(Generic[D]):
183 r"""
184 A document (deserialized JSON) with a concrete interpretation under a spec.
185
186 In other words, a Python object, along with an instance of `Specification`
187 which describes how the document interacts with referencing -- both
188 internally (how it refers to other `Resource`\ s) and externally (how it
189 should be identified such that it is referenceable by other documents).
190 """
191
192 contents: D
193 _specification: Specification[D] = field(alias="specification")
194
195 @classmethod
196 def from_contents(
197 cls,
198 contents: D,
199 default_specification: (
200 type[Specification[D]] | Specification[D]
201 ) = Specification,
202 ) -> Resource[D]:
203 """
204 Create a resource guessing which specification applies to the contents.
205
206 Raises:
207
208 `CannotDetermineSpecification`
209
210 if the given contents don't have any discernible
211 information which could be used to guess which
212 specification they identify as
213
214 """
215 specification = default_specification.detect(contents)
216 return specification.create_resource(contents=contents)
217
218 @classmethod
219 def opaque(cls, contents: D) -> Resource[D]:
220 """
221 Create an opaque `Resource` -- i.e. one with opaque specification.
222
223 See `Specification.OPAQUE` for details.
224 """
225 return Specification.OPAQUE.create_resource(contents=contents)
226
227 def id(self) -> URI | None:
228 """
229 Retrieve this resource's (specification-specific) identifier.
230 """
231 id = self._specification.id_of(self.contents)
232 if id is None:
233 return
234 return id.rstrip("#")
235
236 def subresources(self) -> Iterable[Resource[D]]:
237 """
238 Retrieve this resource's subresources.
239 """
240 return (
241 Resource.from_contents(
242 each,
243 default_specification=self._specification,
244 )
245 for each in self._specification.subresources_of(self.contents)
246 )
247
248 def anchors(self) -> Iterable[AnchorType[D]]:
249 """
250 Retrieve this resource's (specification-specific) identifier.
251 """
252 return self._specification.anchors_in(self.contents)
253
254 def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]:
255 """
256 Resolve the given JSON pointer.
257
258 Raises:
259
260 `exceptions.PointerToNowhere`
261
262 if the pointer points to a location not present in the document
263
264 """
265 if not pointer:
266 return Resolved(contents=self.contents, resolver=resolver)
267
268 contents = self.contents
269 segments: list[int | str] = []
270 for segment in unquote(pointer[1:]).split("/"):
271 if isinstance(contents, Sequence):
272 segment = int(segment)
273 else:
274 segment = segment.replace("~1", "/").replace("~0", "~")
275 try:
276 contents = contents[segment] # type: ignore[reportUnknownArgumentType]
277 except LookupError as lookup_error:
278 error = exceptions.PointerToNowhere(ref=pointer, resource=self)
279 raise error from lookup_error
280
281 segments.append(segment)
282 last = resolver
283 resolver = self._specification.maybe_in_subresource(
284 segments=segments,
285 resolver=resolver,
286 subresource=self._specification.create_resource(contents),
287 )
288 if resolver is not last:
289 segments = []
290 return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType]
291
292
293def _fail_to_retrieve(uri: URI):
294 raise exceptions.NoSuchResource(ref=uri)
295
296
297@frozen
298class Registry(Mapping[URI, Resource[D]]):
299 r"""
300 A registry of `Resource`\ s, each identified by their canonical URIs.
301
302 Registries store a collection of in-memory resources, and optionally
303 enable additional resources which may be stored elsewhere (e.g. in a
304 database, a separate set of files, over the network, etc.).
305
306 They also lazily walk their known resources, looking for subresources
307 within them. In other words, subresources contained within any added
308 resources will be retrievable via their own IDs (though this discovery of
309 subresources will be delayed until necessary).
310
311 Registries are immutable, and their methods return new instances of the
312 registry with the additional resources added to them.
313
314 The ``retrieve`` argument can be used to configure retrieval of resources
315 dynamically, either over the network, from a database, or the like.
316 Pass it a callable which will be called if any URI not present in the
317 registry is accessed. It must either return a `Resource` or else raise a
318 `NoSuchResource` exception indicating that the resource does not exist
319 even according to the retrieval logic.
320 """
321
322 _resources: HashTrieMap[URI, Resource[D]] = field(
323 default=HashTrieMap(),
324 converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues]
325 alias="resources",
326 )
327 _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap()
328 _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED
329 _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve")
330
331 def __getitem__(self, uri: URI) -> Resource[D]:
332 """
333 Return the (already crawled) `Resource` identified by the given URI.
334 """
335 try:
336 return self._resources[uri.rstrip("#")]
337 except KeyError:
338 raise exceptions.NoSuchResource(ref=uri) from None
339
340 def __iter__(self) -> Iterator[URI]:
341 """
342 Iterate over all crawled URIs in the registry.
343 """
344 return iter(self._resources)
345
346 def __len__(self) -> int:
347 """
348 Count the total number of fully crawled resources in this registry.
349 """
350 return len(self._resources)
351
352 def __rmatmul__(
353 self,
354 new: Resource[D] | Iterable[Resource[D]],
355 ) -> Registry[D]:
356 """
357 Create a new registry with resource(s) added using their internal IDs.
358
359 Resources must have a internal IDs (e.g. the :kw:`$id` keyword in
360 modern JSON Schema versions), otherwise an error will be raised.
361
362 Both a single resource as well as an iterable of resources works, i.e.:
363
364 * ``resource @ registry`` or
365
366 * ``[iterable, of, multiple, resources] @ registry``
367
368 which -- again, assuming the resources have internal IDs -- is
369 equivalent to calling `Registry.with_resources` as such:
370
371 .. code:: python
372
373 registry.with_resources(
374 (resource.id(), resource) for resource in new_resources
375 )
376
377 Raises:
378
379 `NoInternalID`
380
381 if the resource(s) in fact do not have IDs
382
383 """
384 if isinstance(new, Resource):
385 new = (new,)
386
387 resources = self._resources
388 uncrawled = self._uncrawled
389 for resource in new:
390 id = resource.id()
391 if id is None:
392 raise exceptions.NoInternalID(resource=resource)
393 uncrawled = uncrawled.insert(id)
394 resources = resources.insert(id, resource)
395 return evolve(self, resources=resources, uncrawled=uncrawled)
396
397 def __repr__(self) -> str:
398 size = len(self)
399 pluralized = "resource" if size == 1 else "resources"
400 if self._uncrawled:
401 uncrawled = len(self._uncrawled)
402 if uncrawled == size:
403 summary = f"uncrawled {pluralized}"
404 else:
405 summary = f"{pluralized}, {uncrawled} uncrawled"
406 else:
407 summary = f"{pluralized}"
408 return f"<Registry ({size} {summary})>"
409
410 def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]:
411 """
412 Get a resource from the registry, crawling or retrieving if necessary.
413
414 May involve crawling to find the given URI if it is not already known,
415 so the returned object is a `Retrieved` object which contains both the
416 resource value as well as the registry which ultimately contained it.
417 """
418 resource = self._resources.get(uri)
419 if resource is not None:
420 return Retrieved(registry=self, value=resource)
421
422 registry = self.crawl()
423 resource = registry._resources.get(uri)
424 if resource is not None:
425 return Retrieved(registry=registry, value=resource)
426
427 try:
428 resource = registry._retrieve(uri)
429 except (
430 exceptions.CannotDetermineSpecification,
431 exceptions.NoSuchResource,
432 ):
433 raise
434 except Exception as error:
435 raise exceptions.Unretrievable(ref=uri) from error
436 else:
437 registry = registry.with_resource(uri, resource)
438 return Retrieved(registry=registry, value=resource)
439
440 def remove(self, uri: URI):
441 """
442 Return a registry with the resource identified by a given URI removed.
443 """
444 if uri not in self._resources:
445 raise exceptions.NoSuchResource(ref=uri)
446
447 return evolve(
448 self,
449 resources=self._resources.remove(uri),
450 uncrawled=self._uncrawled.discard(uri),
451 anchors=HashTrieMap(
452 (k, v) for k, v in self._anchors.items() if k[0] != uri
453 ),
454 )
455
456 def anchor(self, uri: URI, name: str):
457 """
458 Retrieve a given anchor from a resource which must already be crawled.
459 """
460 value = self._anchors.get((uri, name))
461 if value is not None:
462 return Retrieved(value=value, registry=self)
463
464 registry = self.crawl()
465 value = registry._anchors.get((uri, name))
466 if value is not None:
467 return Retrieved(value=value, registry=registry)
468
469 resource = self[uri]
470 canonical_uri = resource.id()
471 if canonical_uri is not None:
472 value = registry._anchors.get((canonical_uri, name))
473 if value is not None:
474 return Retrieved(value=value, registry=registry)
475
476 if "/" in name:
477 raise exceptions.InvalidAnchor(
478 ref=uri,
479 resource=resource,
480 anchor=name,
481 )
482 raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name)
483
484 def contents(self, uri: URI) -> D:
485 """
486 Retrieve the (already crawled) contents identified by the given URI.
487 """
488 return self[uri].contents
489
490 def crawl(self) -> Registry[D]:
491 """
492 Crawl all added resources, discovering subresources.
493 """
494 resources = self._resources
495 anchors = self._anchors
496 uncrawled = [(uri, resources[uri]) for uri in self._uncrawled]
497 while uncrawled:
498 uri, resource = uncrawled.pop()
499
500 id = resource.id()
501 if id is not None:
502 uri = urljoin(uri, id)
503 resources = resources.insert(uri, resource)
504 for each in resource.anchors():
505 anchors = anchors.insert((uri, each.name), each)
506 uncrawled.extend((uri, each) for each in resource.subresources())
507 return evolve(
508 self,
509 resources=resources,
510 anchors=anchors,
511 uncrawled=EMPTY_UNCRAWLED,
512 )
513
514 def with_resource(self, uri: URI, resource: Resource[D]):
515 """
516 Add the given `Resource` to the registry, without crawling it.
517 """
518 return self.with_resources([(uri, resource)])
519
520 def with_resources(
521 self,
522 pairs: Iterable[tuple[URI, Resource[D]]],
523 ) -> Registry[D]:
524 r"""
525 Add the given `Resource`\ s to the registry, without crawling them.
526 """
527 resources = self._resources
528 uncrawled = self._uncrawled
529 for uri, resource in pairs:
530 # Empty fragment URIs are equivalent to URIs without the fragment.
531 # TODO: Is this true for non JSON Schema resources? Probably not.
532 uri = uri.rstrip("#")
533 uncrawled = uncrawled.insert(uri)
534 resources = resources.insert(uri, resource)
535 return evolve(self, resources=resources, uncrawled=uncrawled)
536
537 def with_contents(
538 self,
539 pairs: Iterable[tuple[URI, D]],
540 **kwargs: Any,
541 ) -> Registry[D]:
542 r"""
543 Add the given contents to the registry, autodetecting when necessary.
544 """
545 return self.with_resources(
546 (uri, Resource.from_contents(each, **kwargs))
547 for uri, each in pairs
548 )
549
550 def combine(self, *registries: Registry[D]) -> Registry[D]:
551 """
552 Combine together one or more other registries, producing a unified one.
553 """
554 if registries == (self,):
555 return self
556 resources = self._resources
557 anchors = self._anchors
558 uncrawled = self._uncrawled
559 retrieve = self._retrieve
560 for registry in registries:
561 resources = resources.update(registry._resources)
562 anchors = anchors.update(registry._anchors)
563 uncrawled = uncrawled.update(registry._uncrawled)
564
565 if registry._retrieve is not _fail_to_retrieve: # type: ignore[reportUnnecessaryComparison] ???
566 if registry._retrieve is not retrieve is not _fail_to_retrieve: # type: ignore[reportUnnecessaryComparison] ???
567 raise ValueError( # noqa: TRY003
568 "Cannot combine registries with conflicting retrieval "
569 "functions.",
570 )
571 retrieve = registry._retrieve
572 return evolve(
573 self,
574 anchors=anchors,
575 resources=resources,
576 uncrawled=uncrawled,
577 retrieve=retrieve,
578 )
579
580 def resolver(self, base_uri: URI = "") -> Resolver[D]:
581 """
582 Return a `Resolver` which resolves references against this registry.
583 """
584 return Resolver(base_uri=base_uri, registry=self)
585
586 def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]:
587 """
588 Return a `Resolver` with a specific root resource.
589 """
590 uri = resource.id() or ""
591 return Resolver(
592 base_uri=uri,
593 registry=self.with_resource(uri, resource),
594 )
595
596
597#: An anchor or resource.
598AnchorOrResource = TypeVar(
599 "AnchorOrResource",
600 AnchorType[Any],
601 Resource[Any],
602 default=Resource[Any],
603)
604
605
606@frozen
607class Retrieved(Generic[D, AnchorOrResource]):
608 """
609 A value retrieved from a `Registry`.
610 """
611
612 value: AnchorOrResource
613 registry: Registry[D]
614
615
616@frozen
617class Resolved(Generic[D]):
618 """
619 A reference resolved to its contents by a `Resolver`.
620 """
621
622 contents: D
623 resolver: Resolver[D]
624
625
626@frozen
627class Resolver(Generic[D]):
628 """
629 A reference resolver.
630
631 Resolvers help resolve references (including relative ones) by
632 pairing a fixed base URI with a `Registry`.
633
634 This object, under normal circumstances, is expected to be used by
635 *implementers of libraries* built on top of `referencing` (e.g. JSON Schema
636 implementations or other libraries resolving JSON references),
637 not directly by end-users populating registries or while writing
638 schemas or other resources.
639
640 References are resolved against the base URI, and the combined URI
641 is then looked up within the registry.
642
643 The process of resolving a reference may itself involve calculating
644 a *new* base URI for future reference resolution (e.g. if an
645 intermediate resource sets a new base URI), or may involve encountering
646 additional subresources and adding them to a new registry.
647 """
648
649 _base_uri: URI = field(alias="base_uri")
650 _registry: Registry[D] = field(alias="registry")
651 _previous: List[URI] = field(default=List(), repr=False, alias="previous")
652
653 def lookup(self, ref: URI) -> Resolved[D]:
654 """
655 Resolve the given reference to the resource it points to.
656
657 Raises:
658
659 `exceptions.Unresolvable`
660
661 or a subclass thereof (see below) if the reference isn't
662 resolvable
663
664 `exceptions.NoSuchAnchor`
665
666 if the reference is to a URI where a resource exists but
667 contains a plain name fragment which does not exist within
668 the resource
669
670 `exceptions.PointerToNowhere`
671
672 if the reference is to a URI where a resource exists but
673 contains a JSON pointer to a location within the resource
674 that does not exist
675
676 """
677 if ref.startswith("#"):
678 uri, fragment = self._base_uri, ref[1:]
679 else:
680 uri, fragment = urldefrag(urljoin(self._base_uri, ref))
681 try:
682 retrieved = self._registry.get_or_retrieve(uri)
683 except exceptions.NoSuchResource:
684 raise exceptions.Unresolvable(ref=ref) from None
685 except exceptions.Unretrievable as error:
686 raise exceptions.Unresolvable(ref=ref) from error
687
688 if fragment.startswith("/"):
689 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
690 return retrieved.value.pointer(pointer=fragment, resolver=resolver)
691
692 if fragment:
693 retrieved = retrieved.registry.anchor(uri, fragment)
694 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
695 return retrieved.value.resolve(resolver=resolver)
696
697 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
698 return Resolved(contents=retrieved.value.contents, resolver=resolver)
699
700 def in_subresource(self, subresource: Resource[D]) -> Resolver[D]:
701 """
702 Create a resolver for a subresource (which may have a new base URI).
703 """
704 id = subresource.id()
705 if id is None:
706 return self
707 return evolve(self, base_uri=urljoin(self._base_uri, id))
708
709 def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]:
710 """
711 In specs with such a notion, return the URIs in the dynamic scope.
712 """
713 for uri in self._previous:
714 yield uri, self._registry
715
716 def _evolve(self, base_uri: URI, **kwargs: Any):
717 """
718 Evolve, appending to the dynamic scope.
719 """
720 previous = self._previous
721 if self._base_uri and (not previous or base_uri != self._base_uri):
722 previous = previous.push_front(self._base_uri)
723 return evolve(self, base_uri=base_uri, previous=previous, **kwargs)
724
725
726@frozen
727class Anchor(Generic[D]):
728 """
729 A simple anchor in a `Resource`.
730 """
731
732 name: str
733 resource: Resource[D]
734
735 def resolve(self, resolver: Resolver[D]):
736 """
737 Return the resource for this anchor.
738 """
739 return Resolved(contents=self.resource.contents, resolver=resolver)