Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/_core.py: 36%
272 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-22 06:29 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-22 06:29 +0000
1from __future__ import annotations
3from collections.abc import Iterable, Iterator, Sequence
4from enum import Enum
5from typing import Any, Callable, ClassVar, Generic, Protocol, TypeVar
6from urllib.parse import unquote, urldefrag, urljoin
8from attrs import evolve, field
9from rpds import HashTrieMap, HashTrieSet, List
11from referencing import exceptions
12from referencing._attrs import frozen
13from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve
15EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet()
16EMPTY_PREVIOUS_RESOLVERS: List[URI] = List()
19class _Unset(Enum):
20 """
21 What sillyness...
22 """
24 SENTINEL = 1
27_UNSET = _Unset.SENTINEL
30class _MaybeInSubresource(Protocol[D]):
31 def __call__(
32 self,
33 segments: Sequence[int | str],
34 resolver: Resolver[D],
35 subresource: Resource[D],
36 ) -> Resolver[D]:
37 ...
40def _detect_or_error(contents: D) -> Specification[D]:
41 if not isinstance(contents, Mapping):
42 raise exceptions.CannotDetermineSpecification(contents)
44 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
45 if not isinstance(jsonschema_dialect_id, str):
46 raise exceptions.CannotDetermineSpecification(contents)
48 from referencing.jsonschema import specification_with
50 return specification_with(jsonschema_dialect_id)
53def _detect_or_default(
54 default: Specification[D],
55) -> Callable[[D], Specification[D]]:
56 def _detect(contents: D) -> Specification[D]:
57 if not isinstance(contents, Mapping):
58 return default
60 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType]
61 if jsonschema_dialect_id is None:
62 return default
64 from referencing.jsonschema import specification_with
66 return specification_with(
67 jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType]
68 default=default,
69 )
71 return _detect
74class _SpecificationDetector:
75 def __get__(
76 self,
77 instance: Specification[D] | None,
78 cls: type[Specification[D]],
79 ) -> Callable[[D], Specification[D]]:
80 if instance is None:
81 return _detect_or_error
82 else:
83 return _detect_or_default(instance)
86@frozen
87class Specification(Generic[D]):
88 """
89 A specification which defines referencing behavior.
91 The various methods of a `Specification` allow for varying referencing
92 behavior across JSON Schema specification versions, etc.
93 """
95 #: A short human-readable name for the specification, used for debugging.
96 name: str
98 #: Find the ID of a given document.
99 id_of: Callable[[D], URI | None]
101 #: Retrieve the subresources of the given document (without traversing into
102 #: the subresources themselves).
103 subresources_of: Callable[[D], Iterable[D]]
105 #: While resolving a JSON pointer, conditionally enter a subresource
106 #: (if e.g. we have just entered a keyword whose value is a subresource)
107 maybe_in_subresource: _MaybeInSubresource[D]
109 #: Retrieve the anchors contained in the given document.
110 _anchors_in: Callable[
111 [Specification[D], D],
112 Iterable[AnchorType[D]],
113 ] = field(alias="anchors_in")
115 #: An opaque specification where resources have no subresources
116 #: nor internal identifiers.
117 OPAQUE: ClassVar[Specification[Any]]
119 #: Attempt to discern which specification applies to the given contents.
120 #:
121 #: May be called either as an instance method or as a class method, with
122 #: slightly different behavior in the following case:
123 #:
124 #: Recall that not all contents contains enough internal information about
125 #: which specification it is written for -- the JSON Schema ``{}``,
126 #: for instance, is valid under many different dialects and may be
127 #: interpreted as any one of them.
128 #:
129 #: When this method is used as an instance method (i.e. called on a
130 #: specific specification), that specification is used as the default
131 #: if the given contents are unidentifiable.
132 #:
133 #: On the other hand when called as a class method, an error is raised.
134 #:
135 #: To reiterate, ``DRAFT202012.detect({})`` will return ``DRAFT202012``
136 #: whereas the class method ``Specification.detect({})`` will raise an
137 #: error.
138 #:
139 #: (Note that of course ``DRAFT202012.detect(...)`` may return some other
140 #: specification when given a schema which *does* identify as being for
141 #: another version).
142 #:
143 #: Raises:
144 #:
145 #: `CannotDetermineSpecification`
146 #:
147 #: if the given contents don't have any discernible
148 #: information which could be used to guess which
149 #: specification they identify as
150 detect = _SpecificationDetector()
152 def __repr__(self) -> str:
153 return f"<Specification name={self.name!r}>"
155 def anchors_in(self, contents: D):
156 """
157 Retrieve the anchors contained in the given document.
158 """
159 return self._anchors_in(self, contents)
161 def create_resource(self, contents: D) -> Resource[D]:
162 """
163 Create a resource which is interpreted using this specification.
164 """
165 return Resource(contents=contents, specification=self)
168Specification.OPAQUE = Specification(
169 name="opaque",
170 id_of=lambda contents: None,
171 subresources_of=lambda contents: [],
172 anchors_in=lambda specification, contents: [],
173 maybe_in_subresource=lambda segments, resolver, subresource: resolver,
174)
177@frozen
178class Resource(Generic[D]):
179 r"""
180 A document (deserialized JSON) with a concrete interpretation under a spec.
182 In other words, a Python object, along with an instance of `Specification`
183 which describes how the document interacts with referencing -- both
184 internally (how it refers to other `Resource`\ s) and externally (how it
185 should be identified such that it is referenceable by other documents).
186 """
188 contents: D
189 _specification: Specification[D] = field(alias="specification")
191 @classmethod
192 def from_contents(
193 cls,
194 contents: D,
195 default_specification: type[Specification[D]]
196 | Specification[D] = Specification,
197 ) -> Resource[D]:
198 """
199 Create a resource guessing which specification applies to the contents.
201 Raises:
203 `CannotDetermineSpecification`
205 if the given contents don't have any discernible
206 information which could be used to guess which
207 specification they identify as
208 """
209 specification = default_specification.detect(contents)
210 return specification.create_resource(contents=contents)
212 @classmethod
213 def opaque(cls, contents: D) -> Resource[D]:
214 """
215 Create an opaque `Resource` -- i.e. one with opaque specification.
217 See `Specification.OPAQUE` for details.
218 """
219 return Specification.OPAQUE.create_resource(contents=contents)
221 def id(self) -> URI | None:
222 """
223 Retrieve this resource's (specification-specific) identifier.
224 """
225 id = self._specification.id_of(self.contents)
226 if id is None:
227 return
228 return id.rstrip("#")
230 def subresources(self) -> Iterable[Resource[D]]:
231 """
232 Retrieve this resource's subresources.
233 """
234 return (
235 Resource.from_contents(
236 each,
237 default_specification=self._specification,
238 )
239 for each in self._specification.subresources_of(self.contents)
240 )
242 def anchors(self) -> Iterable[AnchorType[D]]:
243 """
244 Retrieve this resource's (specification-specific) identifier.
245 """
246 return self._specification.anchors_in(self.contents)
248 def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]:
249 """
250 Resolve the given JSON pointer.
252 Raises:
254 `exceptions.PointerToNowhere`
256 if the pointer points to a location not present in the document
257 """
258 contents = self.contents
259 segments: list[int | str] = []
260 for segment in unquote(pointer[1:]).split("/"):
261 if isinstance(contents, Sequence):
262 segment = int(segment)
263 else:
264 segment = segment.replace("~1", "/").replace("~0", "~")
265 try:
266 contents = contents[segment] # type: ignore[reportUnknownArgumentType]
267 except LookupError as lookup_error:
268 error = exceptions.PointerToNowhere(ref=pointer, resource=self)
269 raise error from lookup_error
271 segments.append(segment)
272 last = resolver
273 resolver = self._specification.maybe_in_subresource(
274 segments=segments,
275 resolver=resolver,
276 subresource=self._specification.create_resource(contents),
277 )
278 if resolver is not last:
279 segments = []
280 return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType]
283def _fail_to_retrieve(uri: URI):
284 raise exceptions.NoSuchResource(ref=uri)
287@frozen
288class Registry(Mapping[URI, Resource[D]]):
289 r"""
290 A registry of `Resource`\ s, each identified by their canonical URIs.
292 Registries store a collection of in-memory resources, and optionally
293 enable additional resources which may be stored elsewhere (e.g. in a
294 database, a separate set of files, over the network, etc.).
296 They also lazily walk their known resources, looking for subresources
297 within them. In other words, subresources contained within any added
298 resources will be retrievable via their own IDs (though this discovery of
299 subresources will be delayed until necessary).
301 Registries are immutable, and their methods return new instances of the
302 registry with the additional resources added to them.
304 The ``retrieve`` argument can be used to configure retrieval of resources
305 dynamically, either over the network, from a database, or the like.
306 Pass it a callable which will be called if any URI not present in the
307 registry is accessed. It must either return a `Resource` or else raise a
308 `NoSuchResource` exception indicating that the resource does not exist
309 even according to the retrieval logic.
310 """
312 _resources: HashTrieMap[URI, Resource[D]] = field(
313 default=HashTrieMap(),
314 converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues]
315 alias="resources",
316 )
317 _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap()
318 _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED
319 _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve")
321 def __getitem__(self, uri: URI) -> Resource[D]:
322 """
323 Return the (already crawled) `Resource` identified by the given URI.
324 """
325 try:
326 return self._resources[uri.rstrip("#")]
327 except KeyError:
328 raise exceptions.NoSuchResource(ref=uri) from None
330 def __iter__(self) -> Iterator[URI]:
331 """
332 Iterate over all crawled URIs in the registry.
333 """
334 return iter(self._resources)
336 def __len__(self) -> int:
337 """
338 Count the total number of fully crawled resources in this registry.
339 """
340 return len(self._resources)
342 def __rmatmul__(
343 self,
344 new: Resource[D] | Iterable[Resource[D]],
345 ) -> Registry[D]:
346 """
347 Create a new registry with resource(s) added using their internal IDs.
349 Resources must have a internal IDs (e.g. the :kw:`$id` keyword in
350 modern JSON Schema versions), otherwise an error will be raised.
352 Both a single resource as well as an iterable of resources works, i.e.:
354 * ``resource @ registry`` or
356 * ``[iterable, of, multiple, resources] @ registry``
358 which -- again, assuming the resources have internal IDs -- is
359 equivalent to calling `Registry.with_resources` as such:
361 .. code:: python
363 registry.with_resources(
364 (resource.id(), resource) for resource in new_resources
365 )
367 Raises:
369 `NoInternalID`
371 if the resource(s) in fact do not have IDs
372 """
373 if isinstance(new, Resource):
374 new = (new,)
376 resources = self._resources
377 uncrawled = self._uncrawled
378 for resource in new:
379 id = resource.id()
380 if id is None:
381 raise exceptions.NoInternalID(resource=resource)
382 uncrawled = uncrawled.insert(id)
383 resources = resources.insert(id, resource)
384 return evolve(self, resources=resources, uncrawled=uncrawled)
386 def __repr__(self) -> str:
387 size = len(self)
388 pluralized = "resource" if size == 1 else "resources"
389 if self._uncrawled:
390 uncrawled = len(self._uncrawled)
391 if uncrawled == size:
392 summary = f"uncrawled {pluralized}"
393 else:
394 summary = f"{pluralized}, {uncrawled} uncrawled"
395 else:
396 summary = f"{pluralized}"
397 return f"<Registry ({size} {summary})>"
399 def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]:
400 """
401 Get a resource from the registry, crawling or retrieving if necessary.
403 May involve crawling to find the given URI if it is not already known,
404 so the returned object is a `Retrieved` object which contains both the
405 resource value as well as the registry which ultimately contained it.
406 """
407 resource = self._resources.get(uri)
408 if resource is not None:
409 return Retrieved(registry=self, value=resource)
411 registry = self.crawl()
412 resource = registry._resources.get(uri)
413 if resource is not None:
414 return Retrieved(registry=registry, value=resource)
416 try:
417 resource = registry._retrieve(uri)
418 except (
419 exceptions.CannotDetermineSpecification,
420 exceptions.NoSuchResource,
421 ):
422 raise
423 except Exception as error: # noqa: BLE001
424 raise exceptions.Unretrievable(ref=uri) from error
425 else:
426 registry = registry.with_resource(uri, resource)
427 return Retrieved(registry=registry, value=resource)
429 def remove(self, uri: URI):
430 """
431 Return a registry with the resource identified by a given URI removed.
432 """
433 if uri not in self._resources:
434 raise exceptions.NoSuchResource(ref=uri)
436 return evolve(
437 self,
438 resources=self._resources.remove(uri),
439 uncrawled=self._uncrawled.discard(uri),
440 anchors=HashTrieMap(
441 (k, v) for k, v in self._anchors.items() if k[0] != uri
442 ),
443 )
445 def anchor(self, uri: URI, name: str):
446 """
447 Retrieve a given anchor from a resource which must already be crawled.
448 """
449 value = self._anchors.get((uri, name))
450 if value is not None:
451 return Retrieved(value=value, registry=self)
453 registry = self.crawl()
454 value = registry._anchors.get((uri, name))
455 if value is not None:
456 return Retrieved(value=value, registry=registry)
458 resource = self[uri]
459 canonical_uri = resource.id()
460 if canonical_uri is not None:
461 value = registry._anchors.get((canonical_uri, name))
462 if value is not None:
463 return Retrieved(value=value, registry=registry)
465 if "/" in name:
466 raise exceptions.InvalidAnchor(
467 ref=uri,
468 resource=resource,
469 anchor=name,
470 )
471 raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name)
473 def contents(self, uri: URI) -> D:
474 """
475 Retrieve the (already crawled) contents identified by the given URI.
476 """
477 # Empty fragment URIs are equivalent to URIs without the fragment.
478 # TODO: Is this true for non JSON Schema resources? Probably not.
479 return self._resources[uri.rstrip("#")].contents
481 def crawl(self) -> Registry[D]:
482 """
483 Crawl all added resources, discovering subresources.
484 """
485 resources = self._resources
486 anchors = self._anchors
487 uncrawled = [(uri, resources[uri]) for uri in self._uncrawled]
488 while uncrawled:
489 uri, resource = uncrawled.pop()
491 id = resource.id()
492 if id is not None:
493 uri = urljoin(uri, id)
494 resources = resources.insert(uri, resource)
495 for each in resource.anchors():
496 anchors = anchors.insert((uri, each.name), each)
497 uncrawled.extend((uri, each) for each in resource.subresources())
498 return evolve(
499 self,
500 resources=resources,
501 anchors=anchors,
502 uncrawled=EMPTY_UNCRAWLED,
503 )
505 def with_resource(self, uri: URI, resource: Resource[D]):
506 """
507 Add the given `Resource` to the registry, without crawling it.
508 """
509 return self.with_resources([(uri, resource)])
511 def with_resources(
512 self,
513 pairs: Iterable[tuple[URI, Resource[D]]],
514 ) -> Registry[D]:
515 r"""
516 Add the given `Resource`\ s to the registry, without crawling them.
517 """
518 resources = self._resources
519 uncrawled = self._uncrawled
520 for uri, resource in pairs:
521 # Empty fragment URIs are equivalent to URIs without the fragment.
522 # TODO: Is this true for non JSON Schema resources? Probably not.
523 uri = uri.rstrip("#")
524 uncrawled = uncrawled.insert(uri)
525 resources = resources.insert(uri, resource)
526 return evolve(self, resources=resources, uncrawled=uncrawled)
528 def with_contents(
529 self,
530 pairs: Iterable[tuple[URI, D]],
531 **kwargs: Any,
532 ) -> Registry[D]:
533 r"""
534 Add the given contents to the registry, autodetecting when necessary.
535 """
536 return self.with_resources(
537 (uri, Resource.from_contents(each, **kwargs))
538 for uri, each in pairs
539 )
541 def combine(self, *registries: Registry[D]) -> Registry[D]:
542 """
543 Combine together one or more other registries, producing a unified one.
544 """
545 if registries == (self,):
546 return self
547 resources = self._resources
548 anchors = self._anchors
549 uncrawled = self._uncrawled
550 retrieve = self._retrieve
551 for registry in registries:
552 resources = resources.update(registry._resources) # type: ignore[reportUnknownMemberType]
553 anchors = anchors.update(registry._anchors) # type: ignore[reportUnknownMemberType]
554 uncrawled = uncrawled.update(registry._uncrawled)
556 if registry._retrieve is not _fail_to_retrieve:
557 if registry._retrieve is not retrieve is not _fail_to_retrieve:
558 raise ValueError( # noqa: TRY003
559 "Cannot combine registries with conflicting retrieval "
560 "functions.",
561 )
562 retrieve = registry._retrieve
563 return evolve(
564 self,
565 anchors=anchors,
566 resources=resources,
567 uncrawled=uncrawled,
568 retrieve=retrieve,
569 )
571 def resolver(self, base_uri: URI = "") -> Resolver[D]:
572 """
573 Return a `Resolver` which resolves references against this registry.
574 """
575 return Resolver(base_uri=base_uri, registry=self)
577 def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]:
578 """
579 Return a `Resolver` with a specific root resource.
580 """
581 uri = resource.id() or ""
582 return Resolver(
583 base_uri=uri,
584 registry=self.with_resource(uri, resource),
585 )
588#: An anchor or resource.
589AnchorOrResource = TypeVar("AnchorOrResource", AnchorType[Any], Resource[Any])
592@frozen
593class Retrieved(Generic[D, AnchorOrResource]):
594 """
595 A value retrieved from a `Registry`.
596 """
598 value: AnchorOrResource
599 registry: Registry[D]
602@frozen
603class Resolved(Generic[D]):
604 """
605 A reference resolved to its contents by a `Resolver`.
606 """
608 contents: D
609 resolver: Resolver[D]
612@frozen
613class Resolver(Generic[D]):
614 """
615 A reference resolver.
617 Resolvers help resolve references (including relative ones) by
618 pairing a fixed base URI with a `Registry`.
620 This object, under normal circumstances, is expected to be used by
621 *implementers of libraries* built on top of `referencing` (e.g. JSON Schema
622 implementations or other libraries resolving JSON references),
623 not directly by end-users populating registries or while writing
624 schemas or other resources.
626 References are resolved against the base URI, and the combined URI
627 is then looked up within the registry.
629 The process of resolving a reference may itself involve calculating
630 a *new* base URI for future reference resolution (e.g. if an
631 intermediate resource sets a new base URI), or may involve encountering
632 additional subresources and adding them to a new registry.
633 """
635 _base_uri: URI = field(alias="base_uri")
636 _registry: Registry[D] = field(alias="registry")
637 _previous: List[URI] = field(default=List(), repr=False, alias="previous")
639 def lookup(self, ref: URI) -> Resolved[D]:
640 """
641 Resolve the given reference to the resource it points to.
643 Raises:
645 `exceptions.Unresolvable`
647 or a subclass thereof (see below) if the reference isn't
648 resolvable
650 `exceptions.NoSuchAnchor`
652 if the reference is to a URI where a resource exists but
653 contains a plain name fragment which does not exist within
654 the resource
656 `exceptions.PointerToNowhere`
658 if the reference is to a URI where a resource exists but
659 contains a JSON pointer to a location within the resource
660 that does not exist
661 """
662 if ref.startswith("#"):
663 uri, fragment = self._base_uri, ref[1:]
664 else:
665 uri, fragment = urldefrag(urljoin(self._base_uri, ref))
666 try:
667 retrieved = self._registry.get_or_retrieve(uri)
668 except exceptions.NoSuchResource:
669 raise exceptions.Unresolvable(ref=ref) from None
670 except exceptions.Unretrievable as error:
671 raise exceptions.Unresolvable(ref=ref) from error
673 if fragment.startswith("/"):
674 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
675 return retrieved.value.pointer(pointer=fragment, resolver=resolver)
677 if fragment:
678 retrieved = retrieved.registry.anchor(uri, fragment)
679 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
680 return retrieved.value.resolve(resolver=resolver)
682 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
683 return Resolved(contents=retrieved.value.contents, resolver=resolver)
685 def in_subresource(self, subresource: Resource[D]) -> Resolver[D]:
686 """
687 Create a resolver for a subresource (which may have a new base URI).
688 """
689 id = subresource.id()
690 if id is None:
691 return self
692 return evolve(self, base_uri=urljoin(self._base_uri, id))
694 def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]:
695 """
696 In specs with such a notion, return the URIs in the dynamic scope.
697 """
698 for uri in self._previous:
699 yield uri, self._registry
701 def _evolve(self, base_uri: URI, **kwargs: Any):
702 """
703 Evolve, appending to the dynamic scope.
704 """
705 previous = self._previous
706 if self._base_uri and (not previous or base_uri != self._base_uri):
707 previous = previous.push_front(self._base_uri)
708 return evolve(self, base_uri=base_uri, previous=previous, **kwargs)
711@frozen
712class Anchor(Generic[D]):
713 """
714 A simple anchor in a `Resource`.
715 """
717 name: str
718 resource: Resource[D]
720 def resolve(self, resolver: Resolver[D]):
721 """
722 Return the resource for this anchor.
723 """
724 return Resolved(contents=self.resource.contents, resolver=resolver)