1from __future__ import annotations
2
3from collections.abc import Iterable, Iterator, Sequence
4from enum import Enum
5from typing import Any, Callable, ClassVar, Generic, Protocol, TypeVar
6from urllib.parse import unquote, urldefrag, urljoin
7
8from attrs import evolve, field
9from rpds import HashTrieMap, HashTrieSet, List
10
11from referencing import exceptions
12from referencing._attrs import frozen
13from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve
14
15EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet()
16EMPTY_PREVIOUS_RESOLVERS: List[URI] = List()
17
18
19class _Unset(Enum):
20 """
21 What sillyness...
22 """
23
24 SENTINEL = 1
25
26
27_UNSET = _Unset.SENTINEL
28
29
30class _MaybeInSubresource(Protocol[D]):
31 def __call__(
32 self,
33 segments: Sequence[int | str],
34 resolver: Resolver[D],
35 subresource: Resource[D],
36 ) -> Resolver[D]: ...
37
38
39def _detect_or_error(contents: D) -> Specification[D]:
40 if not isinstance(contents, Mapping):
41 raise exceptions.CannotDetermineSpecification(contents)
42
43 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
44 if not isinstance(jsonschema_dialect_id, str):
45 raise exceptions.CannotDetermineSpecification(contents)
46
47 from referencing.jsonschema import specification_with
48
49 return specification_with(jsonschema_dialect_id)
50
51
52def _detect_or_default(
53 default: Specification[D],
54) -> Callable[[D], Specification[D]]:
55 def _detect(contents: D) -> Specification[D]:
56 if not isinstance(contents, Mapping):
57 return default
58
59 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
60 if jsonschema_dialect_id is None:
61 return default
62
63 from referencing.jsonschema import specification_with
64
65 return specification_with(
66 jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType]
67 default=default,
68 )
69
70 return _detect
71
72
73class _SpecificationDetector:
74 def __get__(
75 self,
76 instance: Specification[D] | None,
77 cls: type[Specification[D]],
78 ) -> Callable[[D], Specification[D]]:
79 if instance is None:
80 return _detect_or_error
81 else:
82 return _detect_or_default(instance)
83
84
85@frozen
86class Specification(Generic[D]):
87 """
88 A specification which defines referencing behavior.
89
90 The various methods of a `Specification` allow for varying referencing
91 behavior across JSON Schema specification versions, etc.
92 """
93
94 #: A short human-readable name for the specification, used for debugging.
95 name: str
96
97 #: Find the ID of a given document.
98 id_of: Callable[[D], URI | None]
99
100 #: Retrieve the subresources of the given document (without traversing into
101 #: the subresources themselves).
102 subresources_of: Callable[[D], Iterable[D]]
103
104 #: While resolving a JSON pointer, conditionally enter a subresource
105 #: (if e.g. we have just entered a keyword whose value is a subresource)
106 maybe_in_subresource: _MaybeInSubresource[D]
107
108 #: Retrieve the anchors contained in the given document.
109 _anchors_in: Callable[
110 [Specification[D], D],
111 Iterable[AnchorType[D]],
112 ] = field(alias="anchors_in")
113
114 #: An opaque specification where resources have no subresources
115 #: nor internal identifiers.
116 OPAQUE: ClassVar[Specification[Any]]
117
118 #: Attempt to discern which specification applies to the given contents.
119 #:
120 #: May be called either as an instance method or as a class method, with
121 #: slightly different behavior in the following case:
122 #:
123 #: Recall that not all contents contains enough internal information about
124 #: which specification it is written for -- the JSON Schema ``{}``,
125 #: for instance, is valid under many different dialects and may be
126 #: interpreted as any one of them.
127 #:
128 #: When this method is used as an instance method (i.e. called on a
129 #: specific specification), that specification is used as the default
130 #: if the given contents are unidentifiable.
131 #:
132 #: On the other hand when called as a class method, an error is raised.
133 #:
134 #: To reiterate, ``DRAFT202012.detect({})`` will return ``DRAFT202012``
135 #: whereas the class method ``Specification.detect({})`` will raise an
136 #: error.
137 #:
138 #: (Note that of course ``DRAFT202012.detect(...)`` may return some other
139 #: specification when given a schema which *does* identify as being for
140 #: another version).
141 #:
142 #: Raises:
143 #:
144 #: `CannotDetermineSpecification`
145 #:
146 #: if the given contents don't have any discernible
147 #: information which could be used to guess which
148 #: specification they identify as
149 detect = _SpecificationDetector()
150
151 def __repr__(self) -> str:
152 return f"<Specification name={self.name!r}>"
153
154 def anchors_in(self, contents: D):
155 """
156 Retrieve the anchors contained in the given document.
157 """
158 return self._anchors_in(self, contents)
159
160 def create_resource(self, contents: D) -> Resource[D]:
161 """
162 Create a resource which is interpreted using this specification.
163 """
164 return Resource(contents=contents, specification=self)
165
166
167Specification.OPAQUE = Specification(
168 name="opaque",
169 id_of=lambda contents: None,
170 subresources_of=lambda contents: [],
171 anchors_in=lambda specification, contents: [],
172 maybe_in_subresource=lambda segments, resolver, subresource: resolver,
173)
174
175
176@frozen
177class Resource(Generic[D]):
178 r"""
179 A document (deserialized JSON) with a concrete interpretation under a spec.
180
181 In other words, a Python object, along with an instance of `Specification`
182 which describes how the document interacts with referencing -- both
183 internally (how it refers to other `Resource`\ s) and externally (how it
184 should be identified such that it is referenceable by other documents).
185 """
186
187 contents: D
188 _specification: Specification[D] = field(alias="specification")
189
190 @classmethod
191 def from_contents(
192 cls,
193 contents: D,
194 default_specification: (
195 type[Specification[D]] | Specification[D]
196 ) = Specification,
197 ) -> Resource[D]:
198 """
199 Create a resource guessing which specification applies to the contents.
200
201 Raises:
202
203 `CannotDetermineSpecification`
204
205 if the given contents don't have any discernible
206 information which could be used to guess which
207 specification they identify as
208
209 """
210 specification = default_specification.detect(contents)
211 return specification.create_resource(contents=contents)
212
213 @classmethod
214 def opaque(cls, contents: D) -> Resource[D]:
215 """
216 Create an opaque `Resource` -- i.e. one with opaque specification.
217
218 See `Specification.OPAQUE` for details.
219 """
220 return Specification.OPAQUE.create_resource(contents=contents)
221
222 def id(self) -> URI | None:
223 """
224 Retrieve this resource's (specification-specific) identifier.
225 """
226 id = self._specification.id_of(self.contents)
227 if id is None:
228 return
229 return id.rstrip("#")
230
231 def subresources(self) -> Iterable[Resource[D]]:
232 """
233 Retrieve this resource's subresources.
234 """
235 return (
236 Resource.from_contents(
237 each,
238 default_specification=self._specification,
239 )
240 for each in self._specification.subresources_of(self.contents)
241 )
242
243 def anchors(self) -> Iterable[AnchorType[D]]:
244 """
245 Retrieve this resource's (specification-specific) identifier.
246 """
247 return self._specification.anchors_in(self.contents)
248
249 def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]:
250 """
251 Resolve the given JSON pointer.
252
253 Raises:
254
255 `exceptions.PointerToNowhere`
256
257 if the pointer points to a location not present in the document
258
259 """
260 if not pointer:
261 return Resolved(contents=self.contents, resolver=resolver)
262
263 contents = self.contents
264 segments: list[int | str] = []
265 for segment in unquote(pointer[1:]).split("/"):
266 if isinstance(contents, Sequence):
267 segment = int(segment)
268 else:
269 segment = segment.replace("~1", "/").replace("~0", "~")
270 try:
271 contents = contents[segment] # type: ignore[reportUnknownArgumentType]
272 except LookupError as lookup_error:
273 error = exceptions.PointerToNowhere(ref=pointer, resource=self)
274 raise error from lookup_error
275
276 segments.append(segment)
277 last = resolver
278 resolver = self._specification.maybe_in_subresource(
279 segments=segments,
280 resolver=resolver,
281 subresource=self._specification.create_resource(contents),
282 )
283 if resolver is not last:
284 segments = []
285 return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType]
286
287
288def _fail_to_retrieve(uri: URI):
289 raise exceptions.NoSuchResource(ref=uri)
290
291
292@frozen
293class Registry(Mapping[URI, Resource[D]]):
294 r"""
295 A registry of `Resource`\ s, each identified by their canonical URIs.
296
297 Registries store a collection of in-memory resources, and optionally
298 enable additional resources which may be stored elsewhere (e.g. in a
299 database, a separate set of files, over the network, etc.).
300
301 They also lazily walk their known resources, looking for subresources
302 within them. In other words, subresources contained within any added
303 resources will be retrievable via their own IDs (though this discovery of
304 subresources will be delayed until necessary).
305
306 Registries are immutable, and their methods return new instances of the
307 registry with the additional resources added to them.
308
309 The ``retrieve`` argument can be used to configure retrieval of resources
310 dynamically, either over the network, from a database, or the like.
311 Pass it a callable which will be called if any URI not present in the
312 registry is accessed. It must either return a `Resource` or else raise a
313 `NoSuchResource` exception indicating that the resource does not exist
314 even according to the retrieval logic.
315 """
316
317 _resources: HashTrieMap[URI, Resource[D]] = field(
318 default=HashTrieMap(),
319 converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues]
320 alias="resources",
321 )
322 _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap()
323 _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED
324 _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve")
325
326 def __getitem__(self, uri: URI) -> Resource[D]:
327 """
328 Return the (already crawled) `Resource` identified by the given URI.
329 """
330 try:
331 return self._resources[uri.rstrip("#")]
332 except KeyError:
333 raise exceptions.NoSuchResource(ref=uri) from None
334
335 def __iter__(self) -> Iterator[URI]:
336 """
337 Iterate over all crawled URIs in the registry.
338 """
339 return iter(self._resources)
340
341 def __len__(self) -> int:
342 """
343 Count the total number of fully crawled resources in this registry.
344 """
345 return len(self._resources)
346
347 def __rmatmul__(
348 self,
349 new: Resource[D] | Iterable[Resource[D]],
350 ) -> Registry[D]:
351 """
352 Create a new registry with resource(s) added using their internal IDs.
353
354 Resources must have a internal IDs (e.g. the :kw:`$id` keyword in
355 modern JSON Schema versions), otherwise an error will be raised.
356
357 Both a single resource as well as an iterable of resources works, i.e.:
358
359 * ``resource @ registry`` or
360
361 * ``[iterable, of, multiple, resources] @ registry``
362
363 which -- again, assuming the resources have internal IDs -- is
364 equivalent to calling `Registry.with_resources` as such:
365
366 .. code:: python
367
368 registry.with_resources(
369 (resource.id(), resource) for resource in new_resources
370 )
371
372 Raises:
373
374 `NoInternalID`
375
376 if the resource(s) in fact do not have IDs
377
378 """
379 if isinstance(new, Resource):
380 new = (new,)
381
382 resources = self._resources
383 uncrawled = self._uncrawled
384 for resource in new:
385 id = resource.id()
386 if id is None:
387 raise exceptions.NoInternalID(resource=resource)
388 uncrawled = uncrawled.insert(id)
389 resources = resources.insert(id, resource)
390 return evolve(self, resources=resources, uncrawled=uncrawled)
391
392 def __repr__(self) -> str:
393 size = len(self)
394 pluralized = "resource" if size == 1 else "resources"
395 if self._uncrawled:
396 uncrawled = len(self._uncrawled)
397 if uncrawled == size:
398 summary = f"uncrawled {pluralized}"
399 else:
400 summary = f"{pluralized}, {uncrawled} uncrawled"
401 else:
402 summary = f"{pluralized}"
403 return f"<Registry ({size} {summary})>"
404
405 def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]:
406 """
407 Get a resource from the registry, crawling or retrieving if necessary.
408
409 May involve crawling to find the given URI if it is not already known,
410 so the returned object is a `Retrieved` object which contains both the
411 resource value as well as the registry which ultimately contained it.
412 """
413 resource = self._resources.get(uri)
414 if resource is not None:
415 return Retrieved(registry=self, value=resource)
416
417 registry = self.crawl()
418 resource = registry._resources.get(uri)
419 if resource is not None:
420 return Retrieved(registry=registry, value=resource)
421
422 try:
423 resource = registry._retrieve(uri)
424 except (
425 exceptions.CannotDetermineSpecification,
426 exceptions.NoSuchResource,
427 ):
428 raise
429 except Exception as error:
430 raise exceptions.Unretrievable(ref=uri) from error
431 else:
432 registry = registry.with_resource(uri, resource)
433 return Retrieved(registry=registry, value=resource)
434
435 def remove(self, uri: URI):
436 """
437 Return a registry with the resource identified by a given URI removed.
438 """
439 if uri not in self._resources:
440 raise exceptions.NoSuchResource(ref=uri)
441
442 return evolve(
443 self,
444 resources=self._resources.remove(uri),
445 uncrawled=self._uncrawled.discard(uri),
446 anchors=HashTrieMap(
447 (k, v) for k, v in self._anchors.items() if k[0] != uri
448 ),
449 )
450
451 def anchor(self, uri: URI, name: str):
452 """
453 Retrieve a given anchor from a resource which must already be crawled.
454 """
455 value = self._anchors.get((uri, name))
456 if value is not None:
457 return Retrieved(value=value, registry=self)
458
459 registry = self.crawl()
460 value = registry._anchors.get((uri, name))
461 if value is not None:
462 return Retrieved(value=value, registry=registry)
463
464 resource = self[uri]
465 canonical_uri = resource.id()
466 if canonical_uri is not None:
467 value = registry._anchors.get((canonical_uri, name))
468 if value is not None:
469 return Retrieved(value=value, registry=registry)
470
471 if "/" in name:
472 raise exceptions.InvalidAnchor(
473 ref=uri,
474 resource=resource,
475 anchor=name,
476 )
477 raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name)
478
479 def contents(self, uri: URI) -> D:
480 """
481 Retrieve the (already crawled) contents identified by the given URI.
482 """
483 return self[uri].contents
484
485 def crawl(self) -> Registry[D]:
486 """
487 Crawl all added resources, discovering subresources.
488 """
489 resources = self._resources
490 anchors = self._anchors
491 uncrawled = [(uri, resources[uri]) for uri in self._uncrawled]
492 while uncrawled:
493 uri, resource = uncrawled.pop()
494
495 id = resource.id()
496 if id is not None:
497 uri = urljoin(uri, id)
498 resources = resources.insert(uri, resource)
499 for each in resource.anchors():
500 anchors = anchors.insert((uri, each.name), each)
501 uncrawled.extend((uri, each) for each in resource.subresources())
502 return evolve(
503 self,
504 resources=resources,
505 anchors=anchors,
506 uncrawled=EMPTY_UNCRAWLED,
507 )
508
509 def with_resource(self, uri: URI, resource: Resource[D]):
510 """
511 Add the given `Resource` to the registry, without crawling it.
512 """
513 return self.with_resources([(uri, resource)])
514
515 def with_resources(
516 self,
517 pairs: Iterable[tuple[URI, Resource[D]]],
518 ) -> Registry[D]:
519 r"""
520 Add the given `Resource`\ s to the registry, without crawling them.
521 """
522 resources = self._resources
523 uncrawled = self._uncrawled
524 for uri, resource in pairs:
525 # Empty fragment URIs are equivalent to URIs without the fragment.
526 # TODO: Is this true for non JSON Schema resources? Probably not.
527 uri = uri.rstrip("#")
528 uncrawled = uncrawled.insert(uri)
529 resources = resources.insert(uri, resource)
530 return evolve(self, resources=resources, uncrawled=uncrawled)
531
532 def with_contents(
533 self,
534 pairs: Iterable[tuple[URI, D]],
535 **kwargs: Any,
536 ) -> Registry[D]:
537 r"""
538 Add the given contents to the registry, autodetecting when necessary.
539 """
540 return self.with_resources(
541 (uri, Resource.from_contents(each, **kwargs))
542 for uri, each in pairs
543 )
544
545 def combine(self, *registries: Registry[D]) -> Registry[D]:
546 """
547 Combine together one or more other registries, producing a unified one.
548 """
549 if registries == (self,):
550 return self
551 resources = self._resources
552 anchors = self._anchors
553 uncrawled = self._uncrawled
554 retrieve = self._retrieve
555 for registry in registries:
556 resources = resources.update(registry._resources)
557 anchors = anchors.update(registry._anchors)
558 uncrawled = uncrawled.update(registry._uncrawled)
559
560 if registry._retrieve is not _fail_to_retrieve:
561 if registry._retrieve is not retrieve is not _fail_to_retrieve:
562 raise ValueError( # noqa: TRY003
563 "Cannot combine registries with conflicting retrieval "
564 "functions.",
565 )
566 retrieve = registry._retrieve
567 return evolve(
568 self,
569 anchors=anchors,
570 resources=resources,
571 uncrawled=uncrawled,
572 retrieve=retrieve,
573 )
574
575 def resolver(self, base_uri: URI = "") -> Resolver[D]:
576 """
577 Return a `Resolver` which resolves references against this registry.
578 """
579 return Resolver(base_uri=base_uri, registry=self)
580
581 def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]:
582 """
583 Return a `Resolver` with a specific root resource.
584 """
585 uri = resource.id() or ""
586 return Resolver(
587 base_uri=uri,
588 registry=self.with_resource(uri, resource),
589 )
590
591
592#: An anchor or resource.
593AnchorOrResource = TypeVar("AnchorOrResource", AnchorType[Any], Resource[Any])
594
595
596@frozen
597class Retrieved(Generic[D, AnchorOrResource]):
598 """
599 A value retrieved from a `Registry`.
600 """
601
602 value: AnchorOrResource
603 registry: Registry[D]
604
605
606@frozen
607class Resolved(Generic[D]):
608 """
609 A reference resolved to its contents by a `Resolver`.
610 """
611
612 contents: D
613 resolver: Resolver[D]
614
615
616@frozen
617class Resolver(Generic[D]):
618 """
619 A reference resolver.
620
621 Resolvers help resolve references (including relative ones) by
622 pairing a fixed base URI with a `Registry`.
623
624 This object, under normal circumstances, is expected to be used by
625 *implementers of libraries* built on top of `referencing` (e.g. JSON Schema
626 implementations or other libraries resolving JSON references),
627 not directly by end-users populating registries or while writing
628 schemas or other resources.
629
630 References are resolved against the base URI, and the combined URI
631 is then looked up within the registry.
632
633 The process of resolving a reference may itself involve calculating
634 a *new* base URI for future reference resolution (e.g. if an
635 intermediate resource sets a new base URI), or may involve encountering
636 additional subresources and adding them to a new registry.
637 """
638
639 _base_uri: URI = field(alias="base_uri")
640 _registry: Registry[D] = field(alias="registry")
641 _previous: List[URI] = field(default=List(), repr=False, alias="previous")
642
643 def lookup(self, ref: URI) -> Resolved[D]:
644 """
645 Resolve the given reference to the resource it points to.
646
647 Raises:
648
649 `exceptions.Unresolvable`
650
651 or a subclass thereof (see below) if the reference isn't
652 resolvable
653
654 `exceptions.NoSuchAnchor`
655
656 if the reference is to a URI where a resource exists but
657 contains a plain name fragment which does not exist within
658 the resource
659
660 `exceptions.PointerToNowhere`
661
662 if the reference is to a URI where a resource exists but
663 contains a JSON pointer to a location within the resource
664 that does not exist
665
666 """
667 if ref.startswith("#"):
668 uri, fragment = self._base_uri, ref[1:]
669 else:
670 uri, fragment = urldefrag(urljoin(self._base_uri, ref))
671 try:
672 retrieved = self._registry.get_or_retrieve(uri)
673 except exceptions.NoSuchResource:
674 raise exceptions.Unresolvable(ref=ref) from None
675 except exceptions.Unretrievable as error:
676 raise exceptions.Unresolvable(ref=ref) from error
677
678 if fragment.startswith("/"):
679 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
680 return retrieved.value.pointer(pointer=fragment, resolver=resolver)
681
682 if fragment:
683 retrieved = retrieved.registry.anchor(uri, fragment)
684 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
685 return retrieved.value.resolve(resolver=resolver)
686
687 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
688 return Resolved(contents=retrieved.value.contents, resolver=resolver)
689
690 def in_subresource(self, subresource: Resource[D]) -> Resolver[D]:
691 """
692 Create a resolver for a subresource (which may have a new base URI).
693 """
694 id = subresource.id()
695 if id is None:
696 return self
697 return evolve(self, base_uri=urljoin(self._base_uri, id))
698
699 def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]:
700 """
701 In specs with such a notion, return the URIs in the dynamic scope.
702 """
703 for uri in self._previous:
704 yield uri, self._registry
705
706 def _evolve(self, base_uri: URI, **kwargs: Any):
707 """
708 Evolve, appending to the dynamic scope.
709 """
710 previous = self._previous
711 if self._base_uri and (not previous or base_uri != self._base_uri):
712 previous = previous.push_front(self._base_uri)
713 return evolve(self, base_uri=base_uri, previous=previous, **kwargs)
714
715
716@frozen
717class Anchor(Generic[D]):
718 """
719 A simple anchor in a `Resource`.
720 """
721
722 name: str
723 resource: Resource[D]
724
725 def resolve(self, resolver: Resolver[D]):
726 """
727 Return the resource for this anchor.
728 """
729 return Resolved(contents=self.resource.contents, resolver=resolver)