Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/_core.py: 58%
271 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1from __future__ import annotations
3from collections.abc import Iterable, Iterator, Sequence
4from enum import Enum
5from typing import Any, Callable, ClassVar, Generic, Protocol, TypeVar
6from urllib.parse import unquote, urldefrag, urljoin
8from attrs import evolve, field
9from rpds import HashTrieMap, HashTrieSet, List
11from referencing import exceptions
12from referencing._attrs import frozen
13from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve
15EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet()
16EMPTY_PREVIOUS_RESOLVERS: List[URI] = List()
19class _Unset(Enum):
20 """
21 What sillyness...
22 """
24 SENTINEL = 1
27_UNSET = _Unset.SENTINEL
30class _MaybeInSubresource(Protocol[D]):
31 def __call__(
32 self,
33 segments: Sequence[int | str],
34 resolver: Resolver[D],
35 subresource: Resource[D],
36 ) -> Resolver[D]:
37 ...
40def _detect_or_error(contents: D) -> Specification[D]:
41 if not isinstance(contents, Mapping):
42 raise exceptions.CannotDetermineSpecification(contents)
44 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
45 if jsonschema_dialect_id is None:
46 raise exceptions.CannotDetermineSpecification(contents)
48 from referencing.jsonschema import specification_with
50 return specification_with(
51 jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType]
52 )
55def _detect_or_default(
56 default: Specification[D],
57) -> Callable[[D], Specification[D]]:
58 def _detect(contents: D) -> Specification[D]:
59 if not isinstance(contents, Mapping):
60 return default
62 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
63 if jsonschema_dialect_id is None:
64 return default
66 from referencing.jsonschema import specification_with
68 return specification_with(
69 jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType]
70 default=default,
71 )
73 return _detect
76class _SpecificationDetector:
77 def __get__(
78 self,
79 instance: Specification[D] | None,
80 cls: type[Specification[D]],
81 ) -> Callable[[D], Specification[D]]:
82 if instance is None:
83 return _detect_or_error
84 else:
85 return _detect_or_default(instance)
88@frozen
89class Specification(Generic[D]):
90 """
91 A specification which defines referencing behavior.
93 The various methods of a `Specification` allow for varying referencing
94 behavior across JSON Schema specification versions, etc.
95 """
97 #: A short human-readable name for the specification, used for debugging.
98 name: str
100 #: Find the ID of a given document.
101 id_of: Callable[[D], URI | None]
103 #: Retrieve the subresources of the given document (without traversing into
104 #: the subresources themselves).
105 subresources_of: Callable[[D], Iterable[D]]
107 #: While resolving a JSON pointer, conditionally enter a subresource
108 #: (if e.g. we have just entered a keyword whose value is a subresource)
109 maybe_in_subresource: _MaybeInSubresource[D]
111 #: Retrieve the anchors contained in the given document.
112 _anchors_in: Callable[
113 [Specification[D], D],
114 Iterable[AnchorType[D]],
115 ] = field(alias="anchors_in")
117 #: An opaque specification where resources have no subresources
118 #: nor internal identifiers.
119 OPAQUE: ClassVar[Specification[Any]]
121 #: Attempt to discern which specification applies to the given contents.
122 #:
123 #: May be called either as an instance method or as a class method, with
124 #: slightly different behavior in the following case:
125 #:
126 #: Recall that not all contents contains enough internal information about
127 #: which specification it is written for -- the JSON Schema ``{}``,
128 #: for instance, is valid under many different dialects and may be
129 #: interpreted as any one of them.
130 #:
131 #: When this method is used as an instance method (i.e. called on a
132 #: specific specification), that specification is used as the default
133 #: if the given contents are unidentifiable.
134 #:
135 #: On the other hand when called as a class method, an error is raised.
136 #:
137 #: To reiterate, ``DRAFT202012.detect({})`` will return ``DRAFT202012``
138 #: whereas the class method ``Specification.detect({})`` will raise an
139 #: error.
140 #:
141 #: (Note that of course ``DRAFT202012.detect(...)`` may return some other
142 #: specification when given a schema which *does* identify as being for
143 #: another version).
144 #:
145 #: Raises:
146 #:
147 #: `CannotDetermineSpecification`
148 #:
149 #: if the given contents don't have any discernible
150 #: information which could be used to guess which
151 #: specification they identify as
152 detect = _SpecificationDetector()
154 def __repr__(self) -> str:
155 return f"<Specification name={self.name!r}>"
157 def anchors_in(self, contents: D):
158 """
159 Retrieve the anchors contained in the given document.
160 """
161 return self._anchors_in(self, contents)
163 def create_resource(self, contents: D) -> Resource[D]:
164 """
165 Create a resource which is interpreted using this specification.
166 """
167 return Resource(contents=contents, specification=self)
170Specification.OPAQUE = Specification(
171 name="opaque",
172 id_of=lambda contents: None,
173 subresources_of=lambda contents: [],
174 anchors_in=lambda specification, contents: [],
175 maybe_in_subresource=lambda segments, resolver, subresource: resolver,
176)
179@frozen
180class Resource(Generic[D]):
181 r"""
182 A document (deserialized JSON) with a concrete interpretation under a spec.
184 In other words, a Python object, along with an instance of `Specification`
185 which describes how the document interacts with referencing -- both
186 internally (how it refers to other `Resource`\ s) and externally (how it
187 should be identified such that it is referenceable by other documents).
188 """
190 contents: D
191 _specification: Specification[D] = field(alias="specification")
193 @classmethod
194 def from_contents(
195 cls,
196 contents: D,
197 default_specification: type[Specification[D]]
198 | Specification[D] = Specification,
199 ) -> Resource[D]:
200 """
201 Create a resource guessing which specification applies to the contents.
203 Raises:
205 `CannotDetermineSpecification`
207 if the given contents don't have any discernible
208 information which could be used to guess which
209 specification they identify as
210 """
211 specification = default_specification.detect(contents)
212 return specification.create_resource(contents=contents)
214 @classmethod
215 def opaque(cls, contents: D) -> Resource[D]:
216 """
217 Create an opaque `Resource` -- i.e. one with opaque specification.
219 See `Specification.OPAQUE` for details.
220 """
221 return Specification.OPAQUE.create_resource(contents=contents)
223 def id(self) -> URI | None:
224 """
225 Retrieve this resource's (specification-specific) identifier.
226 """
227 id = self._specification.id_of(self.contents)
228 if id is None:
229 return
230 return id.rstrip("#")
232 def subresources(self) -> Iterable[Resource[D]]:
233 """
234 Retrieve this resource's subresources.
235 """
236 return (
237 Resource.from_contents(
238 each,
239 default_specification=self._specification,
240 )
241 for each in self._specification.subresources_of(self.contents)
242 )
244 def anchors(self) -> Iterable[AnchorType[D]]:
245 """
246 Retrieve this resource's (specification-specific) identifier.
247 """
248 return self._specification.anchors_in(self.contents)
250 def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]:
251 """
252 Resolve the given JSON pointer.
254 Raises:
256 `exceptions.PointerToNowhere`
258 if the pointer points to a location not present in the document
259 """
260 contents = self.contents
261 segments: list[int | str] = []
262 for segment in unquote(pointer[1:]).split("/"):
263 if isinstance(contents, Sequence):
264 segment = int(segment)
265 else:
266 segment = segment.replace("~1", "/").replace("~0", "~")
267 try:
268 contents = contents[segment] # type: ignore[reportUnknownArgumentType]
269 except LookupError:
270 raise exceptions.PointerToNowhere(ref=pointer, resource=self)
272 segments.append(segment)
273 last = resolver
274 resolver = self._specification.maybe_in_subresource(
275 segments=segments,
276 resolver=resolver,
277 subresource=self._specification.create_resource(contents),
278 )
279 if resolver is not last:
280 segments = []
281 return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType]
284def _fail_to_retrieve(uri: URI):
285 raise exceptions.NoSuchResource(ref=uri)
288@frozen
289class Registry(Mapping[URI, Resource[D]]):
290 r"""
291 A registry of `Resource`\ s, each identified by their canonical URIs.
293 Registries store a collection of in-memory resources, and optionally
294 enable additional resources which may be stored elsewhere (e.g. in a
295 database, a separate set of files, over the network, etc.).
297 They also lazily walk their known resources, looking for subresources
298 within them. In other words, subresources contained within any added
299 resources will be retrievable via their own IDs (though this discovery of
300 subresources will be delayed until necessary).
302 Registries are immutable, and their methods return new instances of the
303 registry with the additional resources added to them.
305 The ``retrieve`` argument can be used to configure retrieval of resources
306 dynamically, either over the network, from a database, or the like.
307 Pass it a callable which will be called if any URI not present in the
308 registry is accessed. It must either return a `Resource` or else raise a
309 `NoSuchResource` exception indicating that the resource does not exist
310 even according to the retrieval logic.
311 """
313 _resources: HashTrieMap[URI, Resource[D]] = field(
314 default=HashTrieMap(),
315 converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues]
316 alias="resources",
317 )
318 _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap()
319 _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED
320 _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve")
322 def __getitem__(self, uri: URI) -> Resource[D]:
323 """
324 Return the (already crawled) `Resource` identified by the given URI.
325 """
326 try:
327 return self._resources[uri.rstrip("#")]
328 except KeyError:
329 raise exceptions.NoSuchResource(ref=uri)
331 def __iter__(self) -> Iterator[URI]:
332 """
333 Iterate over all crawled URIs in the registry.
334 """
335 return iter(self._resources)
337 def __len__(self) -> int:
338 """
339 Count the total number of fully crawled resources in this registry.
340 """
341 return len(self._resources)
343 def __rmatmul__(
344 self,
345 new: Resource[D] | Iterable[Resource[D]],
346 ) -> Registry[D]:
347 """
348 Create a new registry with resource(s) added using their internal IDs.
350 Resources must have a internal IDs (e.g. the :kw:`$id` keyword in
351 modern JSON Schema versions), otherwise an error will be raised.
353 Both a single resource as well as an iterable of resources works, i.e.:
355 * ``resource @ registry`` or
357 * ``[iterable, of, multiple, resources] @ registry``
359 which -- again, assuming the resources have internal IDs -- is
360 equivalent to calling `Registry.with_resources` as such:
362 .. code:: python
364 registry.with_resources(
365 (resource.id(), resource) for resource in new_resources
366 )
368 Raises:
370 `NoInternalID`
372 if the resource(s) in fact do not have IDs
373 """
374 if isinstance(new, Resource):
375 new = (new,)
377 resources = self._resources
378 uncrawled = self._uncrawled
379 for resource in new:
380 id = resource.id()
381 if id is None:
382 raise exceptions.NoInternalID(resource=resource)
383 uncrawled = uncrawled.insert(id)
384 resources = resources.insert(id, resource)
385 return evolve(self, resources=resources, uncrawled=uncrawled)
387 def __repr__(self) -> str:
388 size = len(self)
389 pluralized = "resource" if size == 1 else "resources"
390 if self._uncrawled:
391 uncrawled = len(self._uncrawled)
392 if uncrawled == size:
393 summary = f"uncrawled {pluralized}"
394 else:
395 summary = f"{pluralized}, {uncrawled} uncrawled"
396 else:
397 summary = f"{pluralized}"
398 return f"<Registry ({size} {summary})>"
400 def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]:
401 """
402 Get a resource from the registry, crawling or retrieving if necessary.
404 May involve crawling to find the given URI if it is not already known,
405 so the returned object is a `Retrieved` object which contains both the
406 resource value as well as the registry which ultimately contained it.
407 """
408 resource = self._resources.get(uri)
409 if resource is not None:
410 return Retrieved(registry=self, value=resource)
412 registry = self.crawl()
413 resource = registry._resources.get(uri)
414 if resource is not None:
415 return Retrieved(registry=registry, value=resource)
417 try:
418 resource = registry._retrieve(uri)
419 except (
420 exceptions.CannotDetermineSpecification,
421 exceptions.NoSuchResource,
422 ):
423 raise
424 except Exception:
425 raise exceptions.Unretrievable(ref=uri)
426 else:
427 registry = registry.with_resource(uri, resource)
428 return Retrieved(registry=registry, value=resource)
430 def remove(self, uri: URI):
431 """
432 Return a registry with the resource identified by a given URI removed.
433 """
434 if uri not in self._resources:
435 raise exceptions.NoSuchResource(ref=uri)
437 return evolve(
438 self,
439 resources=self._resources.remove(uri),
440 uncrawled=self._uncrawled.discard(uri),
441 anchors=HashTrieMap(
442 (k, v) for k, v in self._anchors.items() if k[0] != uri
443 ),
444 )
446 def anchor(self, uri: URI, name: str):
447 """
448 Retrieve a given anchor from a resource which must already be crawled.
449 """
450 value = self._anchors.get((uri, name))
451 if value is not None:
452 return Retrieved(value=value, registry=self)
454 registry = self.crawl()
455 value = registry._anchors.get((uri, name))
456 if value is not None:
457 return Retrieved(value=value, registry=registry)
459 resource = self[uri]
460 canonical_uri = resource.id()
461 if canonical_uri is not None:
462 value = registry._anchors.get((canonical_uri, name))
463 if value is not None:
464 return Retrieved(value=value, registry=registry)
466 if "/" in name:
467 raise exceptions.InvalidAnchor(
468 ref=uri,
469 resource=resource,
470 anchor=name,
471 )
472 raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name)
474 def contents(self, uri: URI) -> D:
475 """
476 Retrieve the (already crawled) contents identified by the given URI.
477 """
478 # Empty fragment URIs are equivalent to URIs without the fragment.
479 # TODO: Is this true for non JSON Schema resources? Probably not.
480 return self._resources[uri.rstrip("#")].contents
482 def crawl(self) -> Registry[D]:
483 """
484 Crawl all added resources, discovering subresources.
485 """
486 resources = self._resources
487 anchors = self._anchors
488 uncrawled = [(uri, resources[uri]) for uri in self._uncrawled]
489 while uncrawled:
490 uri, resource = uncrawled.pop()
492 id = resource.id()
493 if id is not None:
494 uri = urljoin(uri, id)
495 resources = resources.insert(uri, resource)
496 for each in resource.anchors():
497 anchors = anchors.insert((uri, each.name), each)
498 uncrawled.extend((uri, each) for each in resource.subresources())
499 return evolve(
500 self,
501 resources=resources,
502 anchors=anchors,
503 uncrawled=EMPTY_UNCRAWLED,
504 )
506 def with_resource(self, uri: URI, resource: Resource[D]):
507 """
508 Add the given `Resource` to the registry, without crawling it.
509 """
510 return self.with_resources([(uri, resource)])
512 def with_resources(
513 self,
514 pairs: Iterable[tuple[URI, Resource[D]]],
515 ) -> Registry[D]:
516 r"""
517 Add the given `Resource`\ s to the registry, without crawling them.
518 """
519 resources = self._resources
520 uncrawled = self._uncrawled
521 for uri, resource in pairs:
522 # Empty fragment URIs are equivalent to URIs without the fragment.
523 # TODO: Is this true for non JSON Schema resources? Probably not.
524 uri = uri.rstrip("#")
525 uncrawled = uncrawled.insert(uri)
526 resources = resources.insert(uri, resource)
527 return evolve(self, resources=resources, uncrawled=uncrawled)
529 def with_contents(
530 self,
531 pairs: Iterable[tuple[URI, D]],
532 **kwargs: Any,
533 ) -> Registry[D]:
534 r"""
535 Add the given contents to the registry, autodetecting when necessary.
536 """
537 return self.with_resources(
538 (uri, Resource.from_contents(each, **kwargs))
539 for uri, each in pairs
540 )
542 def combine(self, *registries: Registry[D]) -> Registry[D]:
543 """
544 Combine together one or more other registries, producing a unified one.
545 """
546 if registries == (self,):
547 return self
548 resources = self._resources
549 anchors = self._anchors
550 uncrawled = self._uncrawled
551 retrieve = self._retrieve
552 for registry in registries:
553 resources = resources.update(registry._resources) # type: ignore[reportUnknownMemberType]
554 anchors = anchors.update(registry._anchors) # type: ignore[reportUnknownMemberType]
555 uncrawled = uncrawled.update(registry._uncrawled)
557 if registry._retrieve is not _fail_to_retrieve:
558 if registry._retrieve is not retrieve is not _fail_to_retrieve:
559 raise ValueError(
560 "Cannot combine registries with conflicting retrieval "
561 "functions.",
562 )
563 retrieve = registry._retrieve
564 return evolve(
565 self,
566 anchors=anchors,
567 resources=resources,
568 uncrawled=uncrawled,
569 retrieve=retrieve,
570 )
572 def resolver(self, base_uri: URI = "") -> Resolver[D]:
573 """
574 Return a `Resolver` which resolves references against this registry.
575 """
576 return Resolver(base_uri=base_uri, registry=self)
578 def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]:
579 """
580 Return a `Resolver` with a specific root resource.
581 """
582 uri = resource.id() or ""
583 return Resolver(
584 base_uri=uri,
585 registry=self.with_resource(uri, resource),
586 )
589#: An anchor or resource.
590AnchorOrResource = TypeVar("AnchorOrResource", AnchorType[Any], Resource[Any])
593@frozen
594class Retrieved(Generic[D, AnchorOrResource]):
595 """
596 A value retrieved from a `Registry`.
597 """
599 value: AnchorOrResource
600 registry: Registry[D]
603@frozen
604class Resolved(Generic[D]):
605 """
606 A reference resolved to its contents by a `Resolver`.
607 """
609 contents: D
610 resolver: Resolver[D]
613@frozen
614class Resolver(Generic[D]):
615 """
616 A reference resolver.
618 Resolvers help resolve references (including relative ones) by
619 pairing a fixed base URI with a `Registry`.
621 This object, under normal circumstances, is expected to be used by
622 *implementers of libraries* built on top of `referencing` (e.g. JSON Schema
623 implementations or other libraries resolving JSON references),
624 not directly by end-users populating registries or while writing
625 schemas or other resources.
627 References are resolved against the base URI, and the combined URI
628 is then looked up within the registry.
630 The process of resolving a reference may itself involve calculating
631 a *new* base URI for future reference resolution (e.g. if an
632 intermediate resource sets a new base URI), or may involve encountering
633 additional subresources and adding them to a new registry.
634 """
636 _base_uri: URI = field(alias="base_uri")
637 _registry: Registry[D] = field(alias="registry")
638 _previous: List[URI] = field(default=List(), repr=False, alias="previous")
640 def lookup(self, ref: URI) -> Resolved[D]:
641 """
642 Resolve the given reference to the resource it points to.
644 Raises:
646 `exceptions.Unresolvable`
648 or a subclass thereof (see below) if the reference isn't
649 resolvable
651 `exceptions.NoSuchAnchor`
653 if the reference is to a URI where a resource exists but
654 contains a plain name fragment which does not exist within
655 the resource
657 `exceptions.PointerToNowhere`
659 if the reference is to a URI where a resource exists but
660 contains a JSON pointer to a location within the resource
661 that does not exist
662 """
663 if ref.startswith("#"):
664 uri, fragment = self._base_uri, ref[1:]
665 else:
666 uri, fragment = urldefrag(urljoin(self._base_uri, ref))
667 try:
668 retrieved = self._registry.get_or_retrieve(uri)
669 except exceptions.NoSuchResource:
670 raise exceptions.Unresolvable(ref=ref) from None
671 except exceptions.Unretrievable:
672 raise exceptions.Unresolvable(ref=ref)
674 if fragment.startswith("/"):
675 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
676 return retrieved.value.pointer(pointer=fragment, resolver=resolver)
678 if fragment:
679 retrieved = retrieved.registry.anchor(uri, fragment)
680 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
681 return retrieved.value.resolve(resolver=resolver)
683 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
684 return Resolved(contents=retrieved.value.contents, resolver=resolver)
686 def in_subresource(self, subresource: Resource[D]) -> Resolver[D]:
687 """
688 Create a resolver for a subresource (which may have a new base URI).
689 """
690 id = subresource.id()
691 if id is None:
692 return self
693 return evolve(self, base_uri=urljoin(self._base_uri, id))
695 def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]:
696 """
697 In specs with such a notion, return the URIs in the dynamic scope.
698 """
699 for uri in self._previous:
700 yield uri, self._registry
702 def _evolve(self, base_uri: URI, **kwargs: Any):
703 """
704 Evolve, appending to the dynamic scope.
705 """
706 previous = self._previous
707 if self._base_uri and (not previous or base_uri != self._base_uri):
708 previous = previous.push_front(self._base_uri)
709 return evolve(self, base_uri=base_uri, previous=previous, **kwargs)
712@frozen
713class Anchor(Generic[D]):
714 """
715 A simple anchor in a `Resource`.
716 """
718 name: str
719 resource: Resource[D]
721 def resolve(self, resolver: Resolver[D]):
722 """
723 Return the resource for this anchor.
724 """
725 return Resolved(contents=self.resource.contents, resolver=resolver)