Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/_core.py: 55%

250 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:44 +0000

1from __future__ import annotations 

2 

3from collections.abc import Iterable, Iterator, Sequence 

4from typing import Any, Callable, ClassVar, Generic, Protocol, TypeVar 

5from urllib.parse import unquote, urldefrag, urljoin 

6 

7from attrs import evolve, field 

8from rpds import HashTrieMap, HashTrieSet, List 

9 

10from referencing import exceptions 

11from referencing._attrs import frozen 

12from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve 

13 

14EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet() 

15EMPTY_PREVIOUS_RESOLVERS: List[URI] = List() 

16 

17 

18class _MaybeInSubresource(Protocol[D]): 

19 def __call__( 

20 self, 

21 segments: Sequence[int | str], 

22 resolver: Resolver[D], 

23 subresource: Resource[D], 

24 ) -> Resolver[D]: 

25 ... 

26 

27 

28@frozen 

29class Specification(Generic[D]): 

30 """ 

31 A specification which defines referencing behavior. 

32 

33 The various methods of a `Specification` allow for varying referencing 

34 behavior across JSON Schema specification versions, etc. 

35 """ 

36 

37 #: A short human-readable name for the specification, used for debugging. 

38 name: str 

39 

40 #: Find the ID of a given document. 

41 id_of: Callable[[D], URI | None] 

42 

43 #: Retrieve the subresources of the given document (without traversing into 

44 #: the subresources themselves). 

45 subresources_of: Callable[[D], Iterable[D]] 

46 

47 #: While resolving a JSON pointer, conditionally enter a subresource 

48 #: (if e.g. we have just entered a keyword whose value is a subresource) 

49 maybe_in_subresource: _MaybeInSubresource[D] 

50 

51 #: Retrieve the anchors contained in the given document. 

52 _anchors_in: Callable[ 

53 [Specification[D], D], 

54 Iterable[AnchorType[D]], 

55 ] = field(alias="anchors_in") 

56 

57 #: An opaque specification where resources have no subresources 

58 #: nor internal identifiers. 

59 OPAQUE: ClassVar[Specification[Any]] 

60 

61 def __repr__(self) -> str: 

62 return f"<Specification name={self.name!r}>" 

63 

64 def anchors_in(self, contents: D): 

65 """ 

66 Retrieve the anchors contained in the given document. 

67 """ 

68 return self._anchors_in(self, contents) 

69 

70 def create_resource(self, contents: D) -> Resource[D]: 

71 """ 

72 Create a resource which is interpreted using this specification. 

73 """ 

74 return Resource(contents=contents, specification=self) 

75 

76 

77Specification.OPAQUE = Specification( 

78 name="opaque", 

79 id_of=lambda contents: None, 

80 subresources_of=lambda contents: [], 

81 anchors_in=lambda specification, contents: [], 

82 maybe_in_subresource=lambda segments, resolver, subresource: resolver, 

83) 

84 

85 

86@frozen 

87class Resource(Generic[D]): 

88 r""" 

89 A document (deserialized JSON) with a concrete interpretation under a spec. 

90 

91 In other words, a Python object, along with an instance of `Specification` 

92 which describes how the document interacts with referencing -- both 

93 internally (how it refers to other `Resource`\ s) and externally (how it 

94 should be identified such that it is referenceable by other documents). 

95 """ 

96 

97 contents: D 

98 _specification: Specification[D] = field(alias="specification") 

99 

100 @classmethod 

101 def from_contents( 

102 cls, 

103 contents: D, 

104 default_specification: Specification[D] = None, # type: ignore[reportGeneralTypeIssues] # noqa: E501 

105 ) -> Resource[D]: 

106 """ 

107 Attempt to discern which specification applies to the given contents. 

108 

109 Raises: 

110 

111 `CannotDetermineSpecification` 

112 

113 if the given contents don't have any discernible 

114 information which could be used to guess which 

115 specification they identify as 

116 """ 

117 specification = default_specification 

118 if isinstance(contents, Mapping): 

119 jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType] # noqa: E501 

120 if jsonschema_dialect_id is not None: 

121 from referencing.jsonschema import specification_with 

122 

123 specification = specification_with( 

124 jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType] # noqa: E501 

125 default=default_specification, 

126 ) 

127 

128 if specification is None: # type: ignore[reportUnnecessaryComparison] 

129 raise exceptions.CannotDetermineSpecification(contents) 

130 return cls(contents=contents, specification=specification) # type: ignore[reportUnknownArgumentType] # noqa: E501 

131 

132 @classmethod 

133 def opaque(cls, contents: D) -> Resource[D]: 

134 """ 

135 Create an opaque `Resource` -- i.e. one with opaque specification. 

136 

137 See `Specification.OPAQUE` for details. 

138 """ 

139 return Specification.OPAQUE.create_resource(contents=contents) 

140 

141 def id(self) -> URI | None: 

142 """ 

143 Retrieve this resource's (specification-specific) identifier. 

144 """ 

145 id = self._specification.id_of(self.contents) 

146 if id is None: 

147 return 

148 return id.rstrip("#") 

149 

150 def subresources(self) -> Iterable[Resource[D]]: 

151 """ 

152 Retrieve this resource's subresources. 

153 """ 

154 return ( 

155 Resource.from_contents( 

156 each, 

157 default_specification=self._specification, 

158 ) 

159 for each in self._specification.subresources_of(self.contents) 

160 ) 

161 

162 def anchors(self) -> Iterable[AnchorType[D]]: 

163 """ 

164 Retrieve this resource's (specification-specific) identifier. 

165 """ 

166 return self._specification.anchors_in(self.contents) 

167 

168 def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]: 

169 """ 

170 Resolve the given JSON pointer. 

171 

172 Raises: 

173 

174 `exceptions.PointerToNowhere` 

175 

176 if the pointer points to a location not present in the document 

177 """ 

178 contents = self.contents 

179 segments: list[int | str] = [] 

180 for segment in unquote(pointer[1:]).split("/"): 

181 if isinstance(contents, Sequence): 

182 segment = int(segment) 

183 else: 

184 segment = segment.replace("~1", "/").replace("~0", "~") 

185 try: 

186 contents = contents[segment] # type: ignore[reportUnknownArgumentType] # noqa: E501 

187 except LookupError: 

188 raise exceptions.PointerToNowhere(ref=pointer, resource=self) 

189 

190 segments.append(segment) 

191 last = resolver 

192 resolver = self._specification.maybe_in_subresource( 

193 segments=segments, 

194 resolver=resolver, 

195 subresource=self._specification.create_resource(contents), # type: ignore[reportUnknownArgumentType] # noqa: E501 

196 ) 

197 if resolver is not last: 

198 segments = [] 

199 return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType] # noqa: E501 

200 

201 

202def _fail_to_retrieve(uri: URI): 

203 raise exceptions.NoSuchResource(ref=uri) 

204 

205 

206@frozen 

207class Registry(Mapping[URI, Resource[D]]): 

208 r""" 

209 A registry of `Resource`\ s, each identified by their canonical URIs. 

210 

211 Registries store a collection of in-memory resources, and optionally 

212 enable additional resources which may be stored elsewhere (e.g. in a 

213 database, a separate set of files, over the network, etc.). 

214 

215 They also lazily walk their known resources, looking for subresources 

216 within them. In other words, subresources contained within any added 

217 resources will be retrievable via their own IDs (though this discovery of 

218 subresources will be delayed until necessary). 

219 

220 Registries are immutable, and their methods return new instances of the 

221 registry with the additional resources added to them. 

222 

223 The ``retrieve`` argument can be used to configure retrieval of resources 

224 dynamically, either over the network, from a database, or the like. 

225 Pass it a callable which will be called if any URI not present in the 

226 registry is accessed. It must either return a `Resource` or else raise a 

227 `NoSuchResource` exception indicating that the resource does not exist 

228 even according to the retrieval logic. 

229 """ 

230 

231 _resources: HashTrieMap[URI, Resource[D]] = field( 

232 default=HashTrieMap(), 

233 converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues] # noqa: E501 

234 alias="resources", 

235 ) 

236 _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap() # type: ignore[reportGeneralTypeIssues] # noqa: E501 

237 _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED 

238 _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve") 

239 

240 def __getitem__(self, uri: URI) -> Resource[D]: 

241 """ 

242 Return the (already crawled) `Resource` identified by the given URI. 

243 """ 

244 try: 

245 return self._resources[uri.rstrip("#")] 

246 except KeyError: 

247 raise exceptions.NoSuchResource(ref=uri) 

248 

249 def __iter__(self) -> Iterator[URI]: 

250 """ 

251 Iterate over all crawled URIs in the registry. 

252 """ 

253 return iter(self._resources) 

254 

255 def __len__(self) -> int: 

256 """ 

257 Count the total number of fully crawled resources in this registry. 

258 """ 

259 return len(self._resources) 

260 

261 def __rmatmul__( 

262 self, 

263 new: Resource[D] | Iterable[Resource[D]], 

264 ) -> Registry[D]: 

265 """ 

266 Create a new registry with resource(s) added using their internal IDs. 

267 

268 Resources must have a internal IDs (e.g. the :kw:`$id` keyword in 

269 modern JSON Schema versions), otherwise an error will be raised. 

270 

271 Both a single resource as well as an iterable of resources works, i.e.: 

272 

273 * ``resource @ registry`` or 

274 

275 * ``[iterable, of, multiple, resources] @ registry`` 

276 

277 which -- again, assuming the resources have internal IDs -- is 

278 equivalent to calling `Registry.with_resources` as such: 

279 

280 .. code:: python 

281 

282 registry.with_resources( 

283 (resource.id(), resource) for resource in new_resources 

284 ) 

285 

286 Raises: 

287 

288 `NoInternalID` 

289 

290 if the resource(s) in fact do not have IDs 

291 """ 

292 if isinstance(new, Resource): 

293 new = (new,) 

294 

295 resources = self._resources 

296 uncrawled = self._uncrawled 

297 for resource in new: 

298 id = resource.id() 

299 if id is None: 

300 raise exceptions.NoInternalID(resource=resource) 

301 uncrawled = uncrawled.insert(id) 

302 resources = resources.insert(id, resource) 

303 return evolve(self, resources=resources, uncrawled=uncrawled) 

304 

305 def __repr__(self) -> str: 

306 size = len(self) 

307 pluralized = "resource" if size == 1 else "resources" 

308 if self._uncrawled: 

309 uncrawled = len(self._uncrawled) 

310 if uncrawled == size: 

311 summary = f"uncrawled {pluralized}" 

312 else: 

313 summary = f"{pluralized}, {uncrawled} uncrawled" 

314 else: 

315 summary = f"{pluralized}" 

316 return f"<Registry ({size} {summary})>" 

317 

318 def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]: 

319 """ 

320 Get a resource from the registry, crawling or retrieving if necessary. 

321 

322 May involve crawling to find the given URI if it is not already known, 

323 so the returned object is a `Retrieved` object which contains both the 

324 resource value as well as the registry which ultimately contained it. 

325 """ 

326 resource = self._resources.get(uri) 

327 if resource is not None: 

328 return Retrieved(registry=self, value=resource) 

329 

330 registry = self.crawl() 

331 resource = registry._resources.get(uri) 

332 if resource is not None: 

333 return Retrieved(registry=registry, value=resource) 

334 

335 try: 

336 resource = registry._retrieve(uri) 

337 except ( 

338 exceptions.CannotDetermineSpecification, 

339 exceptions.NoSuchResource, 

340 ): 

341 raise 

342 except Exception: 

343 raise exceptions.Unretrievable(ref=uri) 

344 else: 

345 registry = registry.with_resource(uri, resource) 

346 return Retrieved(registry=registry, value=resource) 

347 

348 def remove(self, uri: URI): 

349 """ 

350 Return a registry with the resource identified by a given URI removed. 

351 """ 

352 if uri not in self._resources: 

353 raise exceptions.NoSuchResource(ref=uri) 

354 

355 return evolve( 

356 self, 

357 resources=self._resources.remove(uri), 

358 uncrawled=self._uncrawled.discard(uri), 

359 anchors=HashTrieMap( 

360 (k, v) for k, v in self._anchors.items() if k[0] != uri 

361 ), 

362 ) 

363 

364 def anchor(self, uri: URI, name: str): 

365 """ 

366 Retrieve a given anchor from a resource which must already be crawled. 

367 """ 

368 value = self._anchors.get((uri, name)) 

369 if value is not None: 

370 return Retrieved(value=value, registry=self) 

371 

372 registry = self.crawl() 

373 value = registry._anchors.get((uri, name)) 

374 if value is not None: 

375 return Retrieved(value=value, registry=registry) 

376 

377 resource = self[uri] 

378 canonical_uri = resource.id() 

379 if canonical_uri is not None: 

380 value = registry._anchors.get((canonical_uri, name)) 

381 if value is not None: 

382 return Retrieved(value=value, registry=registry) 

383 

384 if "/" in name: 

385 raise exceptions.InvalidAnchor( 

386 ref=uri, 

387 resource=resource, 

388 anchor=name, 

389 ) 

390 raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name) 

391 

392 def contents(self, uri: URI) -> D: 

393 """ 

394 Retrieve the (already crawled) contents identified by the given URI. 

395 """ 

396 # Empty fragment URIs are equivalent to URIs without the fragment. 

397 # TODO: Is this true for non JSON Schema resources? Probably not. 

398 return self._resources[uri.rstrip("#")].contents 

399 

400 def crawl(self) -> Registry[D]: 

401 """ 

402 Crawl all added resources, discovering subresources. 

403 """ 

404 resources = self._resources 

405 anchors = self._anchors 

406 uncrawled = [(uri, resources[uri]) for uri in self._uncrawled] 

407 while uncrawled: 

408 uri, resource = uncrawled.pop() 

409 

410 id = resource.id() 

411 if id is not None: 

412 uri = urljoin(uri, id) 

413 resources = resources.insert(uri, resource) 

414 for each in resource.anchors(): 

415 anchors = anchors.insert((uri, each.name), each) 

416 uncrawled.extend((uri, each) for each in resource.subresources()) 

417 return evolve( 

418 self, 

419 resources=resources, 

420 anchors=anchors, 

421 uncrawled=EMPTY_UNCRAWLED, 

422 ) 

423 

424 def with_resource(self, uri: URI, resource: Resource[D]): 

425 """ 

426 Add the given `Resource` to the registry, without crawling it. 

427 """ 

428 return self.with_resources([(uri, resource)]) 

429 

430 def with_resources( 

431 self, 

432 pairs: Iterable[tuple[URI, Resource[D]]], 

433 ) -> Registry[D]: 

434 r""" 

435 Add the given `Resource`\ s to the registry, without crawling them. 

436 """ 

437 resources = self._resources 

438 uncrawled = self._uncrawled 

439 for uri, resource in pairs: 

440 # Empty fragment URIs are equivalent to URIs without the fragment. 

441 # TODO: Is this true for non JSON Schema resources? Probably not. 

442 uri = uri.rstrip("#") 

443 uncrawled = uncrawled.insert(uri) 

444 resources = resources.insert(uri, resource) 

445 return evolve(self, resources=resources, uncrawled=uncrawled) 

446 

447 def with_contents( 

448 self, 

449 pairs: Iterable[tuple[URI, D]], 

450 **kwargs: Any, 

451 ) -> Registry[D]: 

452 r""" 

453 Add the given contents to the registry, autodetecting when necessary. 

454 """ 

455 return self.with_resources( 

456 (uri, Resource.from_contents(each, **kwargs)) 

457 for uri, each in pairs 

458 ) 

459 

460 def combine(self, *registries: Registry[D]) -> Registry[D]: 

461 """ 

462 Combine together one or more other registries, producing a unified one. 

463 """ 

464 if registries == (self,): 

465 return self 

466 resources = self._resources 

467 anchors = self._anchors 

468 uncrawled = self._uncrawled 

469 retrieve = self._retrieve 

470 for registry in registries: 

471 resources = resources.update(registry._resources) # type: ignore[reportUnknownMemberType] # noqa: E501 

472 anchors = anchors.update(registry._anchors) # type: ignore[reportUnknownMemberType] # noqa: E501 

473 uncrawled = uncrawled.update(registry._uncrawled) 

474 

475 if registry._retrieve is not _fail_to_retrieve: 

476 if registry._retrieve is not retrieve is not _fail_to_retrieve: 

477 raise ValueError( 

478 "Cannot combine registries with conflicting retrieval " 

479 "functions.", 

480 ) 

481 retrieve = registry._retrieve 

482 return evolve( 

483 self, 

484 anchors=anchors, 

485 resources=resources, 

486 uncrawled=uncrawled, 

487 retrieve=retrieve, 

488 ) 

489 

490 def resolver(self, base_uri: URI = "") -> Resolver[D]: 

491 """ 

492 Return a `Resolver` which resolves references against this registry. 

493 """ 

494 return Resolver(base_uri=base_uri, registry=self) 

495 

496 def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]: 

497 """ 

498 Return a `Resolver` with a specific root resource. 

499 """ 

500 uri = resource.id() or "" 

501 return Resolver( 

502 base_uri=uri, 

503 registry=self.with_resource(uri, resource), 

504 ) 

505 

506 

507#: An anchor or resource. 

508AnchorOrResource = TypeVar("AnchorOrResource", AnchorType[Any], Resource[Any]) 

509 

510 

511@frozen 

512class Retrieved(Generic[D, AnchorOrResource]): 

513 """ 

514 A value retrieved from a `Registry`. 

515 """ 

516 

517 value: AnchorOrResource 

518 registry: Registry[D] 

519 

520 

521@frozen 

522class Resolved(Generic[D]): 

523 """ 

524 A reference resolved to its contents by a `Resolver`. 

525 """ 

526 

527 contents: D 

528 resolver: Resolver[D] 

529 

530 

531@frozen 

532class Resolver(Generic[D]): 

533 """ 

534 A reference resolver. 

535 

536 Resolvers help resolve references (including relative ones) by 

537 pairing a fixed base URI with a `Registry`. 

538 

539 This object, under normal circumstances, is expected to be used by 

540 *implementers of libraries* built on top of `referencing` (e.g. JSON Schema 

541 implementations or other libraries resolving JSON references), 

542 not directly by end-users populating registries or while writing 

543 schemas or other resources. 

544 

545 References are resolved against the base URI, and the combined URI 

546 is then looked up within the registry. 

547 

548 The process of resolving a reference may itself involve calculating 

549 a *new* base URI for future reference resolution (e.g. if an 

550 intermediate resource sets a new base URI), or may involve encountering 

551 additional subresources and adding them to a new registry. 

552 """ 

553 

554 _base_uri: str = field(alias="base_uri") 

555 _registry: Registry[D] = field(alias="registry") 

556 _previous: List[URI] = field(default=List(), repr=False, alias="previous") 

557 

558 def lookup(self, ref: URI) -> Resolved[D]: 

559 """ 

560 Resolve the given reference to the resource it points to. 

561 

562 Raises: 

563 

564 `exceptions.Unresolvable` 

565 

566 or a subclass thereof (see below) if the reference isn't 

567 resolvable 

568 

569 `exceptions.NoSuchAnchor` 

570 

571 if the reference is to a URI where a resource exists but 

572 contains a plain name fragment which does not exist within 

573 the resource 

574 

575 `exceptions.PointerToNowhere` 

576 

577 if the reference is to a URI where a resource exists but 

578 contains a JSON pointer to a location within the resource 

579 that does not exist 

580 """ 

581 if ref.startswith("#"): 

582 uri, fragment = self._base_uri, ref[1:] 

583 else: 

584 uri, fragment = urldefrag(urljoin(self._base_uri, ref)) 

585 try: 

586 retrieved = self._registry.get_or_retrieve(uri) 

587 except exceptions.NoSuchResource: 

588 raise exceptions.Unresolvable(ref=ref) from None 

589 except exceptions.Unretrievable: 

590 raise exceptions.Unresolvable(ref=ref) 

591 

592 if fragment.startswith("/"): 

593 resolver = self._evolve(registry=retrieved.registry, base_uri=uri) 

594 return retrieved.value.pointer(pointer=fragment, resolver=resolver) 

595 

596 if fragment: 

597 retrieved = retrieved.registry.anchor(uri, fragment) 

598 resolver = self._evolve(registry=retrieved.registry, base_uri=uri) 

599 return retrieved.value.resolve(resolver=resolver) 

600 

601 resolver = self._evolve(registry=retrieved.registry, base_uri=uri) 

602 return Resolved(contents=retrieved.value.contents, resolver=resolver) 

603 

604 def in_subresource(self, subresource: Resource[D]) -> Resolver[D]: 

605 """ 

606 Create a resolver for a subresource (which may have a new base URI). 

607 """ 

608 id = subresource.id() 

609 if id is None: 

610 return self 

611 return evolve(self, base_uri=urljoin(self._base_uri, id)) 

612 

613 def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]: 

614 """ 

615 In specs with such a notion, return the URIs in the dynamic scope. 

616 """ 

617 for uri in self._previous: 

618 yield uri, self._registry 

619 

620 def _evolve(self, base_uri: str, **kwargs: Any): 

621 """ 

622 Evolve, appending to the dynamic scope. 

623 """ 

624 previous = self._previous 

625 if self._base_uri and (not previous or base_uri != self._base_uri): 

626 previous = previous.push_front(self._base_uri) 

627 return evolve(self, base_uri=base_uri, previous=previous, **kwargs) 

628 

629 

630@frozen 

631class Anchor(Generic[D]): 

632 """ 

633 A simple anchor in a `Resource`. 

634 """ 

635 

636 name: str 

637 resource: Resource[D] 

638 

639 def resolve(self, resolver: Resolver[D]): 

640 """ 

641 Return the resource for this anchor. 

642 """ 

643 return Resolved(contents=self.resource.contents, resolver=resolver)