Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/referencing/_core.py: 77%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3from collections.abc import Callable, Iterable, Iterator, Sequence
4from enum import Enum
5from typing import Any, ClassVar, Generic, Protocol
6from urllib.parse import unquote, urldefrag, urljoin
8from attrs import evolve, field
9from rpds import HashTrieMap, HashTrieSet, List
11try:
12 from typing_extensions import TypeVar
13except ImportError: # pragma: no cover
14 from typing import TypeVar
16from referencing import exceptions
17from referencing._attrs import frozen
18from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve
20EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet()
21EMPTY_PREVIOUS_RESOLVERS: List[URI] = List()
24class _Unset(Enum):
25 """
26 What sillyness...
27 """
29 SENTINEL = 1
32_UNSET = _Unset.SENTINEL
35class _MaybeInSubresource(Protocol[D]):
36 def __call__(
37 self,
38 segments: Sequence[int | str],
39 resolver: Resolver[D],
40 subresource: Resource[D],
41 ) -> Resolver[D]: ...
44def _detect_or_error(contents: D) -> Specification[D]:
45 if not isinstance(contents, Mapping):
46 raise exceptions.CannotDetermineSpecification(contents)
48 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
49 if not isinstance(jsonschema_dialect_id, str):
50 raise exceptions.CannotDetermineSpecification(contents)
52 from referencing.jsonschema import specification_with
54 return specification_with(jsonschema_dialect_id)
57def _detect_or_default(
58 default: Specification[D],
59) -> Callable[[D], Specification[D]]:
60 def _detect(contents: D) -> Specification[D]:
61 if not isinstance(contents, Mapping):
62 return default
64 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
65 if jsonschema_dialect_id is None:
66 return default
68 from referencing.jsonschema import specification_with
70 return specification_with(
71 jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType]
72 default=default,
73 )
75 return _detect
78class _SpecificationDetector:
79 def __get__(
80 self,
81 instance: Specification[D] | None,
82 cls: type[Specification[D]],
83 ) -> Callable[[D], Specification[D]]:
84 if instance is None:
85 return _detect_or_error
86 else:
87 return _detect_or_default(instance)
90@frozen
91class Specification(Generic[D]):
92 """
93 A specification which defines referencing behavior.
95 The various methods of a `Specification` allow for varying referencing
96 behavior across JSON Schema specification versions, etc.
97 """
99 #: A short human-readable name for the specification, used for debugging.
100 name: str
102 #: Find the ID of a given document.
103 id_of: Callable[[D], URI | None]
105 #: Retrieve the subresources of the given document (without traversing into
106 #: the subresources themselves).
107 subresources_of: Callable[[D], Iterable[D]]
109 #: While resolving a JSON pointer, conditionally enter a subresource
110 #: (if e.g. we have just entered a keyword whose value is a subresource)
111 maybe_in_subresource: _MaybeInSubresource[D]
113 #: Retrieve the anchors contained in the given document.
114 _anchors_in: Callable[
115 [Specification[D], D],
116 Iterable[AnchorType[D]],
117 ] = field(alias="anchors_in")
119 #: An opaque specification where resources have no subresources
120 #: nor internal identifiers.
121 OPAQUE: ClassVar[Specification[Any]]
123 #: Attempt to discern which specification applies to the given contents.
124 #:
125 #: May be called either as an instance method or as a class method, with
126 #: slightly different behavior in the following case:
127 #:
128 #: Recall that not all contents contains enough internal information about
129 #: which specification it is written for -- the JSON Schema ``{}``,
130 #: for instance, is valid under many different dialects and may be
131 #: interpreted as any one of them.
132 #:
133 #: When this method is used as an instance method (i.e. called on a
134 #: specific specification), that specification is used as the default
135 #: if the given contents are unidentifiable.
136 #:
137 #: On the other hand when called as a class method, an error is raised.
138 #:
139 #: To reiterate, ``DRAFT202012.detect({})`` will return ``DRAFT202012``
140 #: whereas the class method ``Specification.detect({})`` will raise an
141 #: error.
142 #:
143 #: (Note that of course ``DRAFT202012.detect(...)`` may return some other
144 #: specification when given a schema which *does* identify as being for
145 #: another version).
146 #:
147 #: Raises:
148 #:
149 #: `CannotDetermineSpecification`
150 #:
151 #: if the given contents don't have any discernible
152 #: information which could be used to guess which
153 #: specification they identify as
154 detect = _SpecificationDetector()
156 def __repr__(self) -> str:
157 return f"<Specification name={self.name!r}>"
159 def anchors_in(self, contents: D):
160 """
161 Retrieve the anchors contained in the given document.
162 """
163 return self._anchors_in(self, contents)
165 def create_resource(self, contents: D) -> Resource[D]:
166 """
167 Create a resource which is interpreted using this specification.
168 """
169 return Resource(contents=contents, specification=self)
172Specification.OPAQUE = Specification(
173 name="opaque",
174 id_of=lambda contents: None,
175 subresources_of=lambda contents: [],
176 anchors_in=lambda specification, contents: [],
177 maybe_in_subresource=lambda segments, resolver, subresource: resolver,
178)
181@frozen
182class Resource(Generic[D]):
183 r"""
184 A document (deserialized JSON) with a concrete interpretation under a spec.
186 In other words, a Python object, along with an instance of `Specification`
187 which describes how the document interacts with referencing -- both
188 internally (how it refers to other `Resource`\ s) and externally (how it
189 should be identified such that it is referenceable by other documents).
190 """
192 contents: D
193 _specification: Specification[D] = field(alias="specification")
195 @classmethod
196 def from_contents(
197 cls,
198 contents: D,
199 default_specification: (
200 type[Specification[D]] | Specification[D]
201 ) = Specification,
202 ) -> Resource[D]:
203 """
204 Create a resource guessing which specification applies to the contents.
206 Raises:
208 `CannotDetermineSpecification`
210 if the given contents don't have any discernible
211 information which could be used to guess which
212 specification they identify as
214 """
215 specification = default_specification.detect(contents)
216 return specification.create_resource(contents=contents)
218 @classmethod
219 def opaque(cls, contents: D) -> Resource[D]:
220 """
221 Create an opaque `Resource` -- i.e. one with opaque specification.
223 See `Specification.OPAQUE` for details.
224 """
225 return Specification.OPAQUE.create_resource(contents=contents)
227 def id(self) -> URI | None:
228 """
229 Retrieve this resource's (specification-specific) identifier.
230 """
231 id = self._specification.id_of(self.contents)
232 if id is None:
233 return
234 return id.rstrip("#")
236 def subresources(self) -> Iterable[Resource[D]]:
237 """
238 Retrieve this resource's subresources.
239 """
240 return (
241 Resource.from_contents(
242 each,
243 default_specification=self._specification,
244 )
245 for each in self._specification.subresources_of(self.contents)
246 )
248 def anchors(self) -> Iterable[AnchorType[D]]:
249 """
250 Retrieve this resource's (specification-specific) identifier.
251 """
252 return self._specification.anchors_in(self.contents)
254 def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]:
255 """
256 Resolve the given JSON pointer.
258 Raises:
260 `exceptions.PointerToNowhere`
262 if the pointer points to a location not present in the document
264 """
265 if not pointer:
266 return Resolved(contents=self.contents, resolver=resolver)
268 contents = self.contents
269 segments: list[int | str] = []
270 for segment in unquote(pointer[1:]).split("/"):
271 if isinstance(contents, Sequence):
272 segment = int(segment)
273 else:
274 segment = segment.replace("~1", "/").replace("~0", "~")
275 try:
276 contents = contents[segment] # type: ignore[reportUnknownArgumentType]
277 except LookupError as lookup_error:
278 error = exceptions.PointerToNowhere(ref=pointer, resource=self)
279 raise error from lookup_error
281 segments.append(segment)
282 last = resolver
283 resolver = self._specification.maybe_in_subresource(
284 segments=segments,
285 resolver=resolver,
286 subresource=self._specification.create_resource(contents),
287 )
288 if resolver is not last:
289 segments = []
290 return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType]
293def _fail_to_retrieve(uri: URI):
294 raise exceptions.NoSuchResource(ref=uri)
297@frozen
298class Registry(Mapping[URI, Resource[D]]):
299 r"""
300 A registry of `Resource`\ s, each identified by their canonical URIs.
302 Registries store a collection of in-memory resources, and optionally
303 enable additional resources which may be stored elsewhere (e.g. in a
304 database, a separate set of files, over the network, etc.).
306 They also lazily walk their known resources, looking for subresources
307 within them. In other words, subresources contained within any added
308 resources will be retrievable via their own IDs (though this discovery of
309 subresources will be delayed until necessary).
311 Registries are immutable, and their methods return new instances of the
312 registry with the additional resources added to them.
314 The ``retrieve`` argument can be used to configure retrieval of resources
315 dynamically, either over the network, from a database, or the like.
316 Pass it a callable which will be called if any URI not present in the
317 registry is accessed. It must either return a `Resource` or else raise a
318 `NoSuchResource` exception indicating that the resource does not exist
319 even according to the retrieval logic.
320 """
322 _resources: HashTrieMap[URI, Resource[D]] = field(
323 default=HashTrieMap(),
324 converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues]
325 alias="resources",
326 )
327 _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap()
328 _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED
329 _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve")
331 def __getitem__(self, uri: URI) -> Resource[D]:
332 """
333 Return the (already crawled) `Resource` identified by the given URI.
334 """
335 try:
336 return self._resources[uri.rstrip("#")]
337 except KeyError:
338 raise exceptions.NoSuchResource(ref=uri) from None
340 def __iter__(self) -> Iterator[URI]:
341 """
342 Iterate over all crawled URIs in the registry.
343 """
344 return iter(self._resources)
346 def __len__(self) -> int:
347 """
348 Count the total number of fully crawled resources in this registry.
349 """
350 return len(self._resources)
352 def __rmatmul__(
353 self,
354 new: Resource[D] | Iterable[Resource[D]],
355 ) -> Registry[D]:
356 """
357 Create a new registry with resource(s) added using their internal IDs.
359 Resources must have a internal IDs (e.g. the :kw:`$id` keyword in
360 modern JSON Schema versions), otherwise an error will be raised.
362 Both a single resource as well as an iterable of resources works, i.e.:
364 * ``resource @ registry`` or
366 * ``[iterable, of, multiple, resources] @ registry``
368 which -- again, assuming the resources have internal IDs -- is
369 equivalent to calling `Registry.with_resources` as such:
371 .. code:: python
373 registry.with_resources(
374 (resource.id(), resource) for resource in new_resources
375 )
377 Raises:
379 `NoInternalID`
381 if the resource(s) in fact do not have IDs
383 """
384 if isinstance(new, Resource):
385 new = (new,)
387 resources = self._resources
388 uncrawled = self._uncrawled
389 for resource in new:
390 id = resource.id()
391 if id is None:
392 raise exceptions.NoInternalID(resource=resource)
393 uncrawled = uncrawled.insert(id)
394 resources = resources.insert(id, resource)
395 return evolve(self, resources=resources, uncrawled=uncrawled)
397 def __repr__(self) -> str:
398 size = len(self)
399 pluralized = "resource" if size == 1 else "resources"
400 if self._uncrawled:
401 uncrawled = len(self._uncrawled)
402 if uncrawled == size:
403 summary = f"uncrawled {pluralized}"
404 else:
405 summary = f"{pluralized}, {uncrawled} uncrawled"
406 else:
407 summary = f"{pluralized}"
408 return f"<Registry ({size} {summary})>"
410 def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]:
411 """
412 Get a resource from the registry, crawling or retrieving if necessary.
414 May involve crawling to find the given URI if it is not already known,
415 so the returned object is a `Retrieved` object which contains both the
416 resource value as well as the registry which ultimately contained it.
417 """
418 resource = self._resources.get(uri)
419 if resource is not None:
420 return Retrieved(registry=self, value=resource)
422 registry = self.crawl()
423 resource = registry._resources.get(uri)
424 if resource is not None:
425 return Retrieved(registry=registry, value=resource)
427 try:
428 resource = registry._retrieve(uri)
429 except (
430 exceptions.CannotDetermineSpecification,
431 exceptions.NoSuchResource,
432 ):
433 raise
434 except Exception as error:
435 raise exceptions.Unretrievable(ref=uri) from error
436 else:
437 registry = registry.with_resource(uri, resource)
438 return Retrieved(registry=registry, value=resource)
440 def remove(self, uri: URI):
441 """
442 Return a registry with the resource identified by a given URI removed.
443 """
444 if uri not in self._resources:
445 raise exceptions.NoSuchResource(ref=uri)
447 return evolve(
448 self,
449 resources=self._resources.remove(uri),
450 uncrawled=self._uncrawled.discard(uri),
451 anchors=HashTrieMap(
452 (k, v) for k, v in self._anchors.items() if k[0] != uri
453 ),
454 )
456 def anchor(self, uri: URI, name: str):
457 """
458 Retrieve a given anchor from a resource which must already be crawled.
459 """
460 value = self._anchors.get((uri, name))
461 if value is not None:
462 return Retrieved(value=value, registry=self)
464 registry = self.crawl()
465 value = registry._anchors.get((uri, name))
466 if value is not None:
467 return Retrieved(value=value, registry=registry)
469 resource = self[uri]
470 canonical_uri = resource.id()
471 if canonical_uri is not None:
472 value = registry._anchors.get((canonical_uri, name))
473 if value is not None:
474 return Retrieved(value=value, registry=registry)
476 if "/" in name:
477 raise exceptions.InvalidAnchor(
478 ref=uri,
479 resource=resource,
480 anchor=name,
481 )
482 raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name)
484 def contents(self, uri: URI) -> D:
485 """
486 Retrieve the (already crawled) contents identified by the given URI.
487 """
488 return self[uri].contents
490 def crawl(self) -> Registry[D]:
491 """
492 Crawl all added resources, discovering subresources.
493 """
494 resources = self._resources
495 anchors = self._anchors
496 uncrawled = [(uri, resources[uri]) for uri in self._uncrawled]
497 while uncrawled:
498 uri, resource = uncrawled.pop()
500 id = resource.id()
501 if id is not None:
502 uri = urljoin(uri, id)
503 resources = resources.insert(uri, resource)
504 for each in resource.anchors():
505 anchors = anchors.insert((uri, each.name), each)
506 uncrawled.extend((uri, each) for each in resource.subresources())
507 return evolve(
508 self,
509 resources=resources,
510 anchors=anchors,
511 uncrawled=EMPTY_UNCRAWLED,
512 )
514 def with_resource(self, uri: URI, resource: Resource[D]):
515 """
516 Add the given `Resource` to the registry, without crawling it.
517 """
518 return self.with_resources([(uri, resource)])
520 def with_resources(
521 self,
522 pairs: Iterable[tuple[URI, Resource[D]]],
523 ) -> Registry[D]:
524 r"""
525 Add the given `Resource`\ s to the registry, without crawling them.
526 """
527 resources = self._resources
528 uncrawled = self._uncrawled
529 for uri, resource in pairs:
530 # Empty fragment URIs are equivalent to URIs without the fragment.
531 # TODO: Is this true for non JSON Schema resources? Probably not.
532 uri = uri.rstrip("#")
533 uncrawled = uncrawled.insert(uri)
534 resources = resources.insert(uri, resource)
535 return evolve(self, resources=resources, uncrawled=uncrawled)
537 def with_contents(
538 self,
539 pairs: Iterable[tuple[URI, D]],
540 **kwargs: Any,
541 ) -> Registry[D]:
542 r"""
543 Add the given contents to the registry, autodetecting when necessary.
544 """
545 return self.with_resources(
546 (uri, Resource.from_contents(each, **kwargs))
547 for uri, each in pairs
548 )
550 def combine(self, *registries: Registry[D]) -> Registry[D]:
551 """
552 Combine together one or more other registries, producing a unified one.
553 """
554 if registries == (self,):
555 return self
556 resources = self._resources
557 anchors = self._anchors
558 uncrawled = self._uncrawled
559 retrieve = self._retrieve
560 for registry in registries:
561 resources = resources.update(registry._resources)
562 anchors = anchors.update(registry._anchors)
563 uncrawled = uncrawled.update(registry._uncrawled)
565 if registry._retrieve is not _fail_to_retrieve:
566 if registry._retrieve is not retrieve is not _fail_to_retrieve:
567 raise ValueError( # noqa: TRY003
568 "Cannot combine registries with conflicting retrieval "
569 "functions.",
570 )
571 retrieve = registry._retrieve
572 return evolve(
573 self,
574 anchors=anchors,
575 resources=resources,
576 uncrawled=uncrawled,
577 retrieve=retrieve,
578 )
580 def resolver(self, base_uri: URI = "") -> Resolver[D]:
581 """
582 Return a `Resolver` which resolves references against this registry.
583 """
584 return Resolver(base_uri=base_uri, registry=self)
586 def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]:
587 """
588 Return a `Resolver` with a specific root resource.
589 """
590 uri = resource.id() or ""
591 return Resolver(
592 base_uri=uri,
593 registry=self.with_resource(uri, resource),
594 )
597#: An anchor or resource.
598AnchorOrResource = TypeVar(
599 "AnchorOrResource",
600 AnchorType[Any],
601 Resource[Any],
602 default=Resource[Any],
603)
606@frozen
607class Retrieved(Generic[D, AnchorOrResource]):
608 """
609 A value retrieved from a `Registry`.
610 """
612 value: AnchorOrResource
613 registry: Registry[D]
616@frozen
617class Resolved(Generic[D]):
618 """
619 A reference resolved to its contents by a `Resolver`.
620 """
622 contents: D
623 resolver: Resolver[D]
626@frozen
627class Resolver(Generic[D]):
628 """
629 A reference resolver.
631 Resolvers help resolve references (including relative ones) by
632 pairing a fixed base URI with a `Registry`.
634 This object, under normal circumstances, is expected to be used by
635 *implementers of libraries* built on top of `referencing` (e.g. JSON Schema
636 implementations or other libraries resolving JSON references),
637 not directly by end-users populating registries or while writing
638 schemas or other resources.
640 References are resolved against the base URI, and the combined URI
641 is then looked up within the registry.
643 The process of resolving a reference may itself involve calculating
644 a *new* base URI for future reference resolution (e.g. if an
645 intermediate resource sets a new base URI), or may involve encountering
646 additional subresources and adding them to a new registry.
647 """
649 _base_uri: URI = field(alias="base_uri")
650 _registry: Registry[D] = field(alias="registry")
651 _previous: List[URI] = field(default=List(), repr=False, alias="previous")
653 def lookup(self, ref: URI) -> Resolved[D]:
654 """
655 Resolve the given reference to the resource it points to.
657 Raises:
659 `exceptions.Unresolvable`
661 or a subclass thereof (see below) if the reference isn't
662 resolvable
664 `exceptions.NoSuchAnchor`
666 if the reference is to a URI where a resource exists but
667 contains a plain name fragment which does not exist within
668 the resource
670 `exceptions.PointerToNowhere`
672 if the reference is to a URI where a resource exists but
673 contains a JSON pointer to a location within the resource
674 that does not exist
676 """
677 if ref.startswith("#"):
678 uri, fragment = self._base_uri, ref[1:]
679 else:
680 uri, fragment = urldefrag(urljoin(self._base_uri, ref))
681 try:
682 retrieved = self._registry.get_or_retrieve(uri)
683 except exceptions.NoSuchResource:
684 raise exceptions.Unresolvable(ref=ref) from None
685 except exceptions.Unretrievable as error:
686 raise exceptions.Unresolvable(ref=ref) from error
688 if fragment.startswith("/"):
689 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
690 return retrieved.value.pointer(pointer=fragment, resolver=resolver)
692 if fragment:
693 retrieved = retrieved.registry.anchor(uri, fragment)
694 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
695 return retrieved.value.resolve(resolver=resolver)
697 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
698 return Resolved(contents=retrieved.value.contents, resolver=resolver)
700 def in_subresource(self, subresource: Resource[D]) -> Resolver[D]:
701 """
702 Create a resolver for a subresource (which may have a new base URI).
703 """
704 id = subresource.id()
705 if id is None:
706 return self
707 return evolve(self, base_uri=urljoin(self._base_uri, id))
709 def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]:
710 """
711 In specs with such a notion, return the URIs in the dynamic scope.
712 """
713 for uri in self._previous:
714 yield uri, self._registry
716 def _evolve(self, base_uri: URI, **kwargs: Any):
717 """
718 Evolve, appending to the dynamic scope.
719 """
720 previous = self._previous
721 if self._base_uri and (not previous or base_uri != self._base_uri):
722 previous = previous.push_front(self._base_uri)
723 return evolve(self, base_uri=base_uri, previous=previous, **kwargs)
726@frozen
727class Anchor(Generic[D]):
728 """
729 A simple anchor in a `Resource`.
730 """
732 name: str
733 resource: Resource[D]
735 def resolve(self, resolver: Resolver[D]):
736 """
737 Return the resource for this anchor.
738 """
739 return Resolved(contents=self.resource.contents, resolver=resolver)