Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/_core.py: 55%
250 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:44 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:44 +0000
1from __future__ import annotations
3from collections.abc import Iterable, Iterator, Sequence
4from typing import Any, Callable, ClassVar, Generic, Protocol, TypeVar
5from urllib.parse import unquote, urldefrag, urljoin
7from attrs import evolve, field
8from rpds import HashTrieMap, HashTrieSet, List
10from referencing import exceptions
11from referencing._attrs import frozen
12from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve
14EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet()
15EMPTY_PREVIOUS_RESOLVERS: List[URI] = List()
18class _MaybeInSubresource(Protocol[D]):
19 def __call__(
20 self,
21 segments: Sequence[int | str],
22 resolver: Resolver[D],
23 subresource: Resource[D],
24 ) -> Resolver[D]:
25 ...
28@frozen
29class Specification(Generic[D]):
30 """
31 A specification which defines referencing behavior.
33 The various methods of a `Specification` allow for varying referencing
34 behavior across JSON Schema specification versions, etc.
35 """
37 #: A short human-readable name for the specification, used for debugging.
38 name: str
40 #: Find the ID of a given document.
41 id_of: Callable[[D], URI | None]
43 #: Retrieve the subresources of the given document (without traversing into
44 #: the subresources themselves).
45 subresources_of: Callable[[D], Iterable[D]]
47 #: While resolving a JSON pointer, conditionally enter a subresource
48 #: (if e.g. we have just entered a keyword whose value is a subresource)
49 maybe_in_subresource: _MaybeInSubresource[D]
51 #: Retrieve the anchors contained in the given document.
52 _anchors_in: Callable[
53 [Specification[D], D],
54 Iterable[AnchorType[D]],
55 ] = field(alias="anchors_in")
57 #: An opaque specification where resources have no subresources
58 #: nor internal identifiers.
59 OPAQUE: ClassVar[Specification[Any]]
61 def __repr__(self) -> str:
62 return f"<Specification name={self.name!r}>"
64 def anchors_in(self, contents: D):
65 """
66 Retrieve the anchors contained in the given document.
67 """
68 return self._anchors_in(self, contents)
70 def create_resource(self, contents: D) -> Resource[D]:
71 """
72 Create a resource which is interpreted using this specification.
73 """
74 return Resource(contents=contents, specification=self)
77Specification.OPAQUE = Specification(
78 name="opaque",
79 id_of=lambda contents: None,
80 subresources_of=lambda contents: [],
81 anchors_in=lambda specification, contents: [],
82 maybe_in_subresource=lambda segments, resolver, subresource: resolver,
83)
86@frozen
87class Resource(Generic[D]):
88 r"""
89 A document (deserialized JSON) with a concrete interpretation under a spec.
91 In other words, a Python object, along with an instance of `Specification`
92 which describes how the document interacts with referencing -- both
93 internally (how it refers to other `Resource`\ s) and externally (how it
94 should be identified such that it is referenceable by other documents).
95 """
97 contents: D
98 _specification: Specification[D] = field(alias="specification")
100 @classmethod
101 def from_contents(
102 cls,
103 contents: D,
104 default_specification: Specification[D] = None, # type: ignore[reportGeneralTypeIssues] # noqa: E501
105 ) -> Resource[D]:
106 """
107 Attempt to discern which specification applies to the given contents.
109 Raises:
111 `CannotDetermineSpecification`
113 if the given contents don't have any discernible
114 information which could be used to guess which
115 specification they identify as
116 """
117 specification = default_specification
118 if isinstance(contents, Mapping):
119 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType] # noqa: E501
120 if jsonschema_dialect_id is not None:
121 from referencing.jsonschema import specification_with
123 specification = specification_with(
124 jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType] # noqa: E501
125 default=default_specification,
126 )
128 if specification is None: # type: ignore[reportUnnecessaryComparison]
129 raise exceptions.CannotDetermineSpecification(contents)
130 return cls(contents=contents, specification=specification) # type: ignore[reportUnknownArgumentType] # noqa: E501
132 @classmethod
133 def opaque(cls, contents: D) -> Resource[D]:
134 """
135 Create an opaque `Resource` -- i.e. one with opaque specification.
137 See `Specification.OPAQUE` for details.
138 """
139 return Specification.OPAQUE.create_resource(contents=contents)
141 def id(self) -> URI | None:
142 """
143 Retrieve this resource's (specification-specific) identifier.
144 """
145 id = self._specification.id_of(self.contents)
146 if id is None:
147 return
148 return id.rstrip("#")
150 def subresources(self) -> Iterable[Resource[D]]:
151 """
152 Retrieve this resource's subresources.
153 """
154 return (
155 Resource.from_contents(
156 each,
157 default_specification=self._specification,
158 )
159 for each in self._specification.subresources_of(self.contents)
160 )
162 def anchors(self) -> Iterable[AnchorType[D]]:
163 """
164 Retrieve this resource's (specification-specific) identifier.
165 """
166 return self._specification.anchors_in(self.contents)
168 def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]:
169 """
170 Resolve the given JSON pointer.
172 Raises:
174 `exceptions.PointerToNowhere`
176 if the pointer points to a location not present in the document
177 """
178 contents = self.contents
179 segments: list[int | str] = []
180 for segment in unquote(pointer[1:]).split("/"):
181 if isinstance(contents, Sequence):
182 segment = int(segment)
183 else:
184 segment = segment.replace("~1", "/").replace("~0", "~")
185 try:
186 contents = contents[segment] # type: ignore[reportUnknownArgumentType] # noqa: E501
187 except LookupError:
188 raise exceptions.PointerToNowhere(ref=pointer, resource=self)
190 segments.append(segment)
191 last = resolver
192 resolver = self._specification.maybe_in_subresource(
193 segments=segments,
194 resolver=resolver,
195 subresource=self._specification.create_resource(contents), # type: ignore[reportUnknownArgumentType] # noqa: E501
196 )
197 if resolver is not last:
198 segments = []
199 return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType] # noqa: E501
202def _fail_to_retrieve(uri: URI):
203 raise exceptions.NoSuchResource(ref=uri)
206@frozen
207class Registry(Mapping[URI, Resource[D]]):
208 r"""
209 A registry of `Resource`\ s, each identified by their canonical URIs.
211 Registries store a collection of in-memory resources, and optionally
212 enable additional resources which may be stored elsewhere (e.g. in a
213 database, a separate set of files, over the network, etc.).
215 They also lazily walk their known resources, looking for subresources
216 within them. In other words, subresources contained within any added
217 resources will be retrievable via their own IDs (though this discovery of
218 subresources will be delayed until necessary).
220 Registries are immutable, and their methods return new instances of the
221 registry with the additional resources added to them.
223 The ``retrieve`` argument can be used to configure retrieval of resources
224 dynamically, either over the network, from a database, or the like.
225 Pass it a callable which will be called if any URI not present in the
226 registry is accessed. It must either return a `Resource` or else raise a
227 `NoSuchResource` exception indicating that the resource does not exist
228 even according to the retrieval logic.
229 """
231 _resources: HashTrieMap[URI, Resource[D]] = field(
232 default=HashTrieMap(),
233 converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues] # noqa: E501
234 alias="resources",
235 )
236 _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap() # type: ignore[reportGeneralTypeIssues] # noqa: E501
237 _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED
238 _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve")
240 def __getitem__(self, uri: URI) -> Resource[D]:
241 """
242 Return the (already crawled) `Resource` identified by the given URI.
243 """
244 try:
245 return self._resources[uri.rstrip("#")]
246 except KeyError:
247 raise exceptions.NoSuchResource(ref=uri)
249 def __iter__(self) -> Iterator[URI]:
250 """
251 Iterate over all crawled URIs in the registry.
252 """
253 return iter(self._resources)
255 def __len__(self) -> int:
256 """
257 Count the total number of fully crawled resources in this registry.
258 """
259 return len(self._resources)
261 def __rmatmul__(
262 self,
263 new: Resource[D] | Iterable[Resource[D]],
264 ) -> Registry[D]:
265 """
266 Create a new registry with resource(s) added using their internal IDs.
268 Resources must have a internal IDs (e.g. the :kw:`$id` keyword in
269 modern JSON Schema versions), otherwise an error will be raised.
271 Both a single resource as well as an iterable of resources works, i.e.:
273 * ``resource @ registry`` or
275 * ``[iterable, of, multiple, resources] @ registry``
277 which -- again, assuming the resources have internal IDs -- is
278 equivalent to calling `Registry.with_resources` as such:
280 .. code:: python
282 registry.with_resources(
283 (resource.id(), resource) for resource in new_resources
284 )
286 Raises:
288 `NoInternalID`
290 if the resource(s) in fact do not have IDs
291 """
292 if isinstance(new, Resource):
293 new = (new,)
295 resources = self._resources
296 uncrawled = self._uncrawled
297 for resource in new:
298 id = resource.id()
299 if id is None:
300 raise exceptions.NoInternalID(resource=resource)
301 uncrawled = uncrawled.insert(id)
302 resources = resources.insert(id, resource)
303 return evolve(self, resources=resources, uncrawled=uncrawled)
305 def __repr__(self) -> str:
306 size = len(self)
307 pluralized = "resource" if size == 1 else "resources"
308 if self._uncrawled:
309 uncrawled = len(self._uncrawled)
310 if uncrawled == size:
311 summary = f"uncrawled {pluralized}"
312 else:
313 summary = f"{pluralized}, {uncrawled} uncrawled"
314 else:
315 summary = f"{pluralized}"
316 return f"<Registry ({size} {summary})>"
318 def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]:
319 """
320 Get a resource from the registry, crawling or retrieving if necessary.
322 May involve crawling to find the given URI if it is not already known,
323 so the returned object is a `Retrieved` object which contains both the
324 resource value as well as the registry which ultimately contained it.
325 """
326 resource = self._resources.get(uri)
327 if resource is not None:
328 return Retrieved(registry=self, value=resource)
330 registry = self.crawl()
331 resource = registry._resources.get(uri)
332 if resource is not None:
333 return Retrieved(registry=registry, value=resource)
335 try:
336 resource = registry._retrieve(uri)
337 except (
338 exceptions.CannotDetermineSpecification,
339 exceptions.NoSuchResource,
340 ):
341 raise
342 except Exception:
343 raise exceptions.Unretrievable(ref=uri)
344 else:
345 registry = registry.with_resource(uri, resource)
346 return Retrieved(registry=registry, value=resource)
348 def remove(self, uri: URI):
349 """
350 Return a registry with the resource identified by a given URI removed.
351 """
352 if uri not in self._resources:
353 raise exceptions.NoSuchResource(ref=uri)
355 return evolve(
356 self,
357 resources=self._resources.remove(uri),
358 uncrawled=self._uncrawled.discard(uri),
359 anchors=HashTrieMap(
360 (k, v) for k, v in self._anchors.items() if k[0] != uri
361 ),
362 )
364 def anchor(self, uri: URI, name: str):
365 """
366 Retrieve a given anchor from a resource which must already be crawled.
367 """
368 value = self._anchors.get((uri, name))
369 if value is not None:
370 return Retrieved(value=value, registry=self)
372 registry = self.crawl()
373 value = registry._anchors.get((uri, name))
374 if value is not None:
375 return Retrieved(value=value, registry=registry)
377 resource = self[uri]
378 canonical_uri = resource.id()
379 if canonical_uri is not None:
380 value = registry._anchors.get((canonical_uri, name))
381 if value is not None:
382 return Retrieved(value=value, registry=registry)
384 if "/" in name:
385 raise exceptions.InvalidAnchor(
386 ref=uri,
387 resource=resource,
388 anchor=name,
389 )
390 raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name)
392 def contents(self, uri: URI) -> D:
393 """
394 Retrieve the (already crawled) contents identified by the given URI.
395 """
396 # Empty fragment URIs are equivalent to URIs without the fragment.
397 # TODO: Is this true for non JSON Schema resources? Probably not.
398 return self._resources[uri.rstrip("#")].contents
400 def crawl(self) -> Registry[D]:
401 """
402 Crawl all added resources, discovering subresources.
403 """
404 resources = self._resources
405 anchors = self._anchors
406 uncrawled = [(uri, resources[uri]) for uri in self._uncrawled]
407 while uncrawled:
408 uri, resource = uncrawled.pop()
410 id = resource.id()
411 if id is not None:
412 uri = urljoin(uri, id)
413 resources = resources.insert(uri, resource)
414 for each in resource.anchors():
415 anchors = anchors.insert((uri, each.name), each)
416 uncrawled.extend((uri, each) for each in resource.subresources())
417 return evolve(
418 self,
419 resources=resources,
420 anchors=anchors,
421 uncrawled=EMPTY_UNCRAWLED,
422 )
424 def with_resource(self, uri: URI, resource: Resource[D]):
425 """
426 Add the given `Resource` to the registry, without crawling it.
427 """
428 return self.with_resources([(uri, resource)])
430 def with_resources(
431 self,
432 pairs: Iterable[tuple[URI, Resource[D]]],
433 ) -> Registry[D]:
434 r"""
435 Add the given `Resource`\ s to the registry, without crawling them.
436 """
437 resources = self._resources
438 uncrawled = self._uncrawled
439 for uri, resource in pairs:
440 # Empty fragment URIs are equivalent to URIs without the fragment.
441 # TODO: Is this true for non JSON Schema resources? Probably not.
442 uri = uri.rstrip("#")
443 uncrawled = uncrawled.insert(uri)
444 resources = resources.insert(uri, resource)
445 return evolve(self, resources=resources, uncrawled=uncrawled)
447 def with_contents(
448 self,
449 pairs: Iterable[tuple[URI, D]],
450 **kwargs: Any,
451 ) -> Registry[D]:
452 r"""
453 Add the given contents to the registry, autodetecting when necessary.
454 """
455 return self.with_resources(
456 (uri, Resource.from_contents(each, **kwargs))
457 for uri, each in pairs
458 )
460 def combine(self, *registries: Registry[D]) -> Registry[D]:
461 """
462 Combine together one or more other registries, producing a unified one.
463 """
464 if registries == (self,):
465 return self
466 resources = self._resources
467 anchors = self._anchors
468 uncrawled = self._uncrawled
469 retrieve = self._retrieve
470 for registry in registries:
471 resources = resources.update(registry._resources) # type: ignore[reportUnknownMemberType] # noqa: E501
472 anchors = anchors.update(registry._anchors) # type: ignore[reportUnknownMemberType] # noqa: E501
473 uncrawled = uncrawled.update(registry._uncrawled)
475 if registry._retrieve is not _fail_to_retrieve:
476 if registry._retrieve is not retrieve is not _fail_to_retrieve:
477 raise ValueError(
478 "Cannot combine registries with conflicting retrieval "
479 "functions.",
480 )
481 retrieve = registry._retrieve
482 return evolve(
483 self,
484 anchors=anchors,
485 resources=resources,
486 uncrawled=uncrawled,
487 retrieve=retrieve,
488 )
490 def resolver(self, base_uri: URI = "") -> Resolver[D]:
491 """
492 Return a `Resolver` which resolves references against this registry.
493 """
494 return Resolver(base_uri=base_uri, registry=self)
496 def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]:
497 """
498 Return a `Resolver` with a specific root resource.
499 """
500 uri = resource.id() or ""
501 return Resolver(
502 base_uri=uri,
503 registry=self.with_resource(uri, resource),
504 )
507#: An anchor or resource.
508AnchorOrResource = TypeVar("AnchorOrResource", AnchorType[Any], Resource[Any])
511@frozen
512class Retrieved(Generic[D, AnchorOrResource]):
513 """
514 A value retrieved from a `Registry`.
515 """
517 value: AnchorOrResource
518 registry: Registry[D]
521@frozen
522class Resolved(Generic[D]):
523 """
524 A reference resolved to its contents by a `Resolver`.
525 """
527 contents: D
528 resolver: Resolver[D]
531@frozen
532class Resolver(Generic[D]):
533 """
534 A reference resolver.
536 Resolvers help resolve references (including relative ones) by
537 pairing a fixed base URI with a `Registry`.
539 This object, under normal circumstances, is expected to be used by
540 *implementers of libraries* built on top of `referencing` (e.g. JSON Schema
541 implementations or other libraries resolving JSON references),
542 not directly by end-users populating registries or while writing
543 schemas or other resources.
545 References are resolved against the base URI, and the combined URI
546 is then looked up within the registry.
548 The process of resolving a reference may itself involve calculating
549 a *new* base URI for future reference resolution (e.g. if an
550 intermediate resource sets a new base URI), or may involve encountering
551 additional subresources and adding them to a new registry.
552 """
554 _base_uri: str = field(alias="base_uri")
555 _registry: Registry[D] = field(alias="registry")
556 _previous: List[URI] = field(default=List(), repr=False, alias="previous")
558 def lookup(self, ref: URI) -> Resolved[D]:
559 """
560 Resolve the given reference to the resource it points to.
562 Raises:
564 `exceptions.Unresolvable`
566 or a subclass thereof (see below) if the reference isn't
567 resolvable
569 `exceptions.NoSuchAnchor`
571 if the reference is to a URI where a resource exists but
572 contains a plain name fragment which does not exist within
573 the resource
575 `exceptions.PointerToNowhere`
577 if the reference is to a URI where a resource exists but
578 contains a JSON pointer to a location within the resource
579 that does not exist
580 """
581 if ref.startswith("#"):
582 uri, fragment = self._base_uri, ref[1:]
583 else:
584 uri, fragment = urldefrag(urljoin(self._base_uri, ref))
585 try:
586 retrieved = self._registry.get_or_retrieve(uri)
587 except exceptions.NoSuchResource:
588 raise exceptions.Unresolvable(ref=ref) from None
589 except exceptions.Unretrievable:
590 raise exceptions.Unresolvable(ref=ref)
592 if fragment.startswith("/"):
593 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
594 return retrieved.value.pointer(pointer=fragment, resolver=resolver)
596 if fragment:
597 retrieved = retrieved.registry.anchor(uri, fragment)
598 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
599 return retrieved.value.resolve(resolver=resolver)
601 resolver = self._evolve(registry=retrieved.registry, base_uri=uri)
602 return Resolved(contents=retrieved.value.contents, resolver=resolver)
604 def in_subresource(self, subresource: Resource[D]) -> Resolver[D]:
605 """
606 Create a resolver for a subresource (which may have a new base URI).
607 """
608 id = subresource.id()
609 if id is None:
610 return self
611 return evolve(self, base_uri=urljoin(self._base_uri, id))
613 def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]:
614 """
615 In specs with such a notion, return the URIs in the dynamic scope.
616 """
617 for uri in self._previous:
618 yield uri, self._registry
620 def _evolve(self, base_uri: str, **kwargs: Any):
621 """
622 Evolve, appending to the dynamic scope.
623 """
624 previous = self._previous
625 if self._base_uri and (not previous or base_uri != self._base_uri):
626 previous = previous.push_front(self._base_uri)
627 return evolve(self, base_uri=base_uri, previous=previous, **kwargs)
630@frozen
631class Anchor(Generic[D]):
632 """
633 A simple anchor in a `Resource`.
634 """
636 name: str
637 resource: Resource[D]
639 def resolve(self, resolver: Resolver[D]):
640 """
641 Return the resource for this anchor.
642 """
643 return Resolved(contents=self.resource.contents, resolver=resolver)