Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/jsonschema.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

214 statements  

1""" 

2Referencing implementations for JSON Schema specs (historic & current). 

3""" 

4 

5from __future__ import annotations 

6 

7from collections.abc import Sequence, Set 

8from typing import Any, Iterable, Union 

9 

10from referencing import Anchor, Registry, Resource, Specification, exceptions 

11from referencing._attrs import frozen 

12from referencing._core import ( 

13 _UNSET, # type: ignore[reportPrivateUsage] 

14 Resolved as _Resolved, 

15 Resolver as _Resolver, 

16 _Unset, # type: ignore[reportPrivateUsage] 

17) 

18from referencing.typing import URI, Anchor as AnchorType, Mapping 

19 

20#: A JSON Schema which is a JSON object 

21ObjectSchema = Mapping[str, Any] 

22 

23#: A JSON Schema of any kind 

24Schema = Union[bool, ObjectSchema] 

25 

26#: A Resource whose contents are JSON Schemas 

27SchemaResource = Resource[Schema] 

28 

29#: A JSON Schema Registry 

30SchemaRegistry = Registry[Schema] 

31 

32#: The empty JSON Schema Registry 

33EMPTY_REGISTRY: SchemaRegistry = Registry() 

34 

35 

36@frozen 

37class UnknownDialect(Exception): 

38 """ 

39 A dialect identifier was found for a dialect unknown by this library. 

40 

41 If it's a custom ("unofficial") dialect, be sure you've registered it. 

42 """ 

43 

44 uri: URI 

45 

46 

47def _dollar_id(contents: Schema) -> URI | None: 

48 if isinstance(contents, bool): 

49 return 

50 return contents.get("$id") 

51 

52 

53def _legacy_dollar_id(contents: Schema) -> URI | None: 

54 if isinstance(contents, bool) or "$ref" in contents: 

55 return 

56 id = contents.get("$id") 

57 if id is not None and not id.startswith("#"): 

58 return id 

59 

60 

61def _legacy_id(contents: ObjectSchema) -> URI | None: 

62 if "$ref" in contents: 

63 return 

64 id = contents.get("id") 

65 if id is not None and not id.startswith("#"): 

66 return id 

67 

68 

69def _anchor( 

70 specification: Specification[Schema], 

71 contents: Schema, 

72) -> Iterable[AnchorType[Schema]]: 

73 if isinstance(contents, bool): 

74 return 

75 anchor = contents.get("$anchor") 

76 if anchor is not None: 

77 yield Anchor( 

78 name=anchor, 

79 resource=specification.create_resource(contents), 

80 ) 

81 

82 dynamic_anchor = contents.get("$dynamicAnchor") 

83 if dynamic_anchor is not None: 

84 yield DynamicAnchor( 

85 name=dynamic_anchor, 

86 resource=specification.create_resource(contents), 

87 ) 

88 

89 

90def _anchor_2019( 

91 specification: Specification[Schema], 

92 contents: Schema, 

93) -> Iterable[Anchor[Schema]]: 

94 if isinstance(contents, bool): 

95 return [] 

96 anchor = contents.get("$anchor") 

97 if anchor is None: 

98 return [] 

99 return [ 

100 Anchor( 

101 name=anchor, 

102 resource=specification.create_resource(contents), 

103 ), 

104 ] 

105 

106 

107def _legacy_anchor_in_dollar_id( 

108 specification: Specification[Schema], 

109 contents: Schema, 

110) -> Iterable[Anchor[Schema]]: 

111 if isinstance(contents, bool): 

112 return [] 

113 id = contents.get("$id", "") 

114 if not id.startswith("#"): 

115 return [] 

116 return [ 

117 Anchor( 

118 name=id[1:], 

119 resource=specification.create_resource(contents), 

120 ), 

121 ] 

122 

123 

124def _legacy_anchor_in_id( 

125 specification: Specification[ObjectSchema], 

126 contents: ObjectSchema, 

127) -> Iterable[Anchor[ObjectSchema]]: 

128 id = contents.get("id", "") 

129 if not id.startswith("#"): 

130 return [] 

131 return [ 

132 Anchor( 

133 name=id[1:], 

134 resource=specification.create_resource(contents), 

135 ), 

136 ] 

137 

138 

139def _subresources_of( 

140 in_value: Set[str] = frozenset(), 

141 in_subvalues: Set[str] = frozenset(), 

142 in_subarray: Set[str] = frozenset(), 

143): 

144 """ 

145 Create a callable returning JSON Schema specification-style subschemas. 

146 

147 Relies on specifying the set of keywords containing subschemas in their 

148 values, in a subobject's values, or in a subarray. 

149 """ 

150 

151 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

152 if isinstance(contents, bool): 

153 return 

154 for each in in_value: 

155 if each in contents: 

156 yield contents[each] 

157 for each in in_subarray: 

158 if each in contents: 

159 yield from contents[each] 

160 for each in in_subvalues: 

161 if each in contents: 

162 yield from contents[each].values() 

163 

164 return subresources_of 

165 

166 

167def _subresources_of_with_crazy_items( 

168 in_value: Set[str] = frozenset(), 

169 in_subvalues: Set[str] = frozenset(), 

170 in_subarray: Set[str] = frozenset(), 

171): 

172 """ 

173 Specifically handle older drafts where there are some funky keywords. 

174 """ 

175 

176 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

177 if isinstance(contents, bool): 

178 return 

179 for each in in_value: 

180 if each in contents: 

181 yield contents[each] 

182 for each in in_subarray: 

183 if each in contents: 

184 yield from contents[each] 

185 for each in in_subvalues: 

186 if each in contents: 

187 yield from contents[each].values() 

188 

189 items = contents.get("items") 

190 if items is not None: 

191 if isinstance(items, Sequence): 

192 yield from items 

193 else: 

194 yield items 

195 

196 return subresources_of 

197 

198 

199def _subresources_of_with_crazy_items_dependencies( 

200 in_value: Set[str] = frozenset(), 

201 in_subvalues: Set[str] = frozenset(), 

202 in_subarray: Set[str] = frozenset(), 

203): 

204 """ 

205 Specifically handle older drafts where there are some funky keywords. 

206 """ 

207 

208 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

209 if isinstance(contents, bool): 

210 return 

211 for each in in_value: 

212 if each in contents: 

213 yield contents[each] 

214 for each in in_subarray: 

215 if each in contents: 

216 yield from contents[each] 

217 for each in in_subvalues: 

218 if each in contents: 

219 yield from contents[each].values() 

220 

221 items = contents.get("items") 

222 if items is not None: 

223 if isinstance(items, Sequence): 

224 yield from items 

225 else: 

226 yield items 

227 dependencies = contents.get("dependencies") 

228 if dependencies is not None: 

229 values = iter(dependencies.values()) 

230 value = next(values, None) 

231 if isinstance(value, Mapping): 

232 yield value 

233 yield from values 

234 

235 return subresources_of 

236 

237 

238def _subresources_of_with_crazy_aP_items_dependencies( 

239 in_value: Set[str] = frozenset(), 

240 in_subvalues: Set[str] = frozenset(), 

241 in_subarray: Set[str] = frozenset(), 

242): 

243 """ 

244 Specifically handle even older drafts where there are some funky keywords. 

245 """ 

246 

247 def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]: 

248 for each in in_value: 

249 if each in contents: 

250 yield contents[each] 

251 for each in in_subarray: 

252 if each in contents: 

253 yield from contents[each] 

254 for each in in_subvalues: 

255 if each in contents: 

256 yield from contents[each].values() 

257 

258 items = contents.get("items") 

259 if items is not None: 

260 if isinstance(items, Sequence): 

261 yield from items 

262 else: 

263 yield items 

264 dependencies = contents.get("dependencies") 

265 if dependencies is not None: 

266 values = iter(dependencies.values()) 

267 value = next(values, None) 

268 if isinstance(value, Mapping): 

269 yield value 

270 yield from values 

271 

272 for each in "additionalItems", "additionalProperties": 

273 value = contents.get(each) 

274 if isinstance(value, Mapping): 

275 yield value 

276 

277 return subresources_of 

278 

279 

280def _maybe_in_subresource( 

281 in_value: Set[str] = frozenset(), 

282 in_subvalues: Set[str] = frozenset(), 

283 in_subarray: Set[str] = frozenset(), 

284): 

285 in_child = in_subvalues | in_subarray 

286 

287 def maybe_in_subresource( 

288 segments: Sequence[int | str], 

289 resolver: _Resolver[Any], 

290 subresource: Resource[Any], 

291 ) -> _Resolver[Any]: 

292 _segments = iter(segments) 

293 for segment in _segments: 

294 if segment not in in_value and ( 

295 segment not in in_child or next(_segments, None) is None 

296 ): 

297 return resolver 

298 return resolver.in_subresource(subresource) 

299 

300 return maybe_in_subresource 

301 

302 

303def _maybe_in_subresource_crazy_items( 

304 in_value: Set[str] = frozenset(), 

305 in_subvalues: Set[str] = frozenset(), 

306 in_subarray: Set[str] = frozenset(), 

307): 

308 in_child = in_subvalues | in_subarray 

309 

310 def maybe_in_subresource( 

311 segments: Sequence[int | str], 

312 resolver: _Resolver[Any], 

313 subresource: Resource[Any], 

314 ) -> _Resolver[Any]: 

315 _segments = iter(segments) 

316 for segment in _segments: 

317 if segment == "items" and isinstance( 

318 subresource.contents, 

319 Mapping, 

320 ): 

321 return resolver.in_subresource(subresource) 

322 if segment not in in_value and ( 

323 segment not in in_child or next(_segments, None) is None 

324 ): 

325 return resolver 

326 return resolver.in_subresource(subresource) 

327 

328 return maybe_in_subresource 

329 

330 

331def _maybe_in_subresource_crazy_items_dependencies( 

332 in_value: Set[str] = frozenset(), 

333 in_subvalues: Set[str] = frozenset(), 

334 in_subarray: Set[str] = frozenset(), 

335): 

336 in_child = in_subvalues | in_subarray 

337 

338 def maybe_in_subresource( 

339 segments: Sequence[int | str], 

340 resolver: _Resolver[Any], 

341 subresource: Resource[Any], 

342 ) -> _Resolver[Any]: 

343 _segments = iter(segments) 

344 for segment in _segments: 

345 if segment in {"items", "dependencies"} and isinstance( 

346 subresource.contents, 

347 Mapping, 

348 ): 

349 return resolver.in_subresource(subresource) 

350 if segment not in in_value and ( 

351 segment not in in_child or next(_segments, None) is None 

352 ): 

353 return resolver 

354 return resolver.in_subresource(subresource) 

355 

356 return maybe_in_subresource 

357 

358 

359#: JSON Schema draft 2020-12 

360DRAFT202012 = Specification( 

361 name="draft2020-12", 

362 id_of=_dollar_id, 

363 subresources_of=_subresources_of( 

364 in_value={ 

365 "additionalProperties", 

366 "contains", 

367 "contentSchema", 

368 "else", 

369 "if", 

370 "items", 

371 "not", 

372 "propertyNames", 

373 "then", 

374 "unevaluatedItems", 

375 "unevaluatedProperties", 

376 }, 

377 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 

378 in_subvalues={ 

379 "$defs", 

380 "definitions", 

381 "dependentSchemas", 

382 "patternProperties", 

383 "properties", 

384 }, 

385 ), 

386 anchors_in=_anchor, 

387 maybe_in_subresource=_maybe_in_subresource( 

388 in_value={ 

389 "additionalProperties", 

390 "contains", 

391 "contentSchema", 

392 "else", 

393 "if", 

394 "items", 

395 "not", 

396 "propertyNames", 

397 "then", 

398 "unevaluatedItems", 

399 "unevaluatedProperties", 

400 }, 

401 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 

402 in_subvalues={ 

403 "$defs", 

404 "definitions", 

405 "dependentSchemas", 

406 "patternProperties", 

407 "properties", 

408 }, 

409 ), 

410) 

411#: JSON Schema draft 2019-09 

412DRAFT201909 = Specification( 

413 name="draft2019-09", 

414 id_of=_dollar_id, 

415 subresources_of=_subresources_of_with_crazy_items( 

416 in_value={ 

417 "additionalItems", 

418 "additionalProperties", 

419 "contains", 

420 "contentSchema", 

421 "else", 

422 "if", 

423 "not", 

424 "propertyNames", 

425 "then", 

426 "unevaluatedItems", 

427 "unevaluatedProperties", 

428 }, 

429 in_subarray={"allOf", "anyOf", "oneOf"}, 

430 in_subvalues={ 

431 "$defs", 

432 "definitions", 

433 "dependentSchemas", 

434 "patternProperties", 

435 "properties", 

436 }, 

437 ), 

438 anchors_in=_anchor_2019, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real 

439 maybe_in_subresource=_maybe_in_subresource_crazy_items( 

440 in_value={ 

441 "additionalItems", 

442 "additionalProperties", 

443 "contains", 

444 "contentSchema", 

445 "else", 

446 "if", 

447 "not", 

448 "propertyNames", 

449 "then", 

450 "unevaluatedItems", 

451 "unevaluatedProperties", 

452 }, 

453 in_subarray={"allOf", "anyOf", "oneOf"}, 

454 in_subvalues={ 

455 "$defs", 

456 "definitions", 

457 "dependentSchemas", 

458 "patternProperties", 

459 "properties", 

460 }, 

461 ), 

462) 

463#: JSON Schema draft 7 

464DRAFT7 = Specification( 

465 name="draft-07", 

466 id_of=_legacy_dollar_id, 

467 subresources_of=_subresources_of_with_crazy_items_dependencies( 

468 in_value={ 

469 "additionalItems", 

470 "additionalProperties", 

471 "contains", 

472 "else", 

473 "if", 

474 "not", 

475 "propertyNames", 

476 "then", 

477 }, 

478 in_subarray={"allOf", "anyOf", "oneOf"}, 

479 in_subvalues={"definitions", "patternProperties", "properties"}, 

480 ), 

481 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real 

482 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

483 in_value={ 

484 "additionalItems", 

485 "additionalProperties", 

486 "contains", 

487 "else", 

488 "if", 

489 "not", 

490 "propertyNames", 

491 "then", 

492 }, 

493 in_subarray={"allOf", "anyOf", "oneOf"}, 

494 in_subvalues={"definitions", "patternProperties", "properties"}, 

495 ), 

496) 

497#: JSON Schema draft 6 

498DRAFT6 = Specification( 

499 name="draft-06", 

500 id_of=_legacy_dollar_id, 

501 subresources_of=_subresources_of_with_crazy_items_dependencies( 

502 in_value={ 

503 "additionalItems", 

504 "additionalProperties", 

505 "contains", 

506 "not", 

507 "propertyNames", 

508 }, 

509 in_subarray={"allOf", "anyOf", "oneOf"}, 

510 in_subvalues={"definitions", "patternProperties", "properties"}, 

511 ), 

512 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real 

513 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

514 in_value={ 

515 "additionalItems", 

516 "additionalProperties", 

517 "contains", 

518 "not", 

519 "propertyNames", 

520 }, 

521 in_subarray={"allOf", "anyOf", "oneOf"}, 

522 in_subvalues={"definitions", "patternProperties", "properties"}, 

523 ), 

524) 

525#: JSON Schema draft 4 

526DRAFT4 = Specification( 

527 name="draft-04", 

528 id_of=_legacy_id, 

529 subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 

530 in_value={"not"}, 

531 in_subarray={"allOf", "anyOf", "oneOf"}, 

532 in_subvalues={"definitions", "patternProperties", "properties"}, 

533 ), 

534 anchors_in=_legacy_anchor_in_id, 

535 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

536 in_value={"additionalItems", "additionalProperties", "not"}, 

537 in_subarray={"allOf", "anyOf", "oneOf"}, 

538 in_subvalues={"definitions", "patternProperties", "properties"}, 

539 ), 

540) 

541#: JSON Schema draft 3 

542DRAFT3 = Specification( 

543 name="draft-03", 

544 id_of=_legacy_id, 

545 subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 

546 in_subarray={"extends"}, 

547 in_subvalues={"definitions", "patternProperties", "properties"}, 

548 ), 

549 anchors_in=_legacy_anchor_in_id, 

550 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

551 in_value={"additionalItems", "additionalProperties"}, 

552 in_subarray={"extends"}, 

553 in_subvalues={"definitions", "patternProperties", "properties"}, 

554 ), 

555) 

556 

557 

558_SPECIFICATIONS: Registry[Specification[Schema]] = Registry( 

559 { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types 

560 dialect_id: Resource.opaque(specification) 

561 for dialect_id, specification in [ 

562 ("https://json-schema.org/draft/2020-12/schema", DRAFT202012), 

563 ("https://json-schema.org/draft/2019-09/schema", DRAFT201909), 

564 ("http://json-schema.org/draft-07/schema", DRAFT7), 

565 ("http://json-schema.org/draft-06/schema", DRAFT6), 

566 ("http://json-schema.org/draft-04/schema", DRAFT4), 

567 ("http://json-schema.org/draft-03/schema", DRAFT3), 

568 ] 

569 }, 

570) 

571 

572 

573def specification_with( 

574 dialect_id: URI, 

575 default: Specification[Any] | _Unset = _UNSET, 

576) -> Specification[Any]: 

577 """ 

578 Retrieve the `Specification` with the given dialect identifier. 

579 

580 Raises: 

581 

582 `UnknownDialect` 

583 

584 if the given ``dialect_id`` isn't known 

585 

586 """ 

587 resource = _SPECIFICATIONS.get(dialect_id.rstrip("#")) 

588 if resource is not None: 

589 return resource.contents 

590 if default is _UNSET: 

591 raise UnknownDialect(dialect_id) 

592 return default 

593 

594 

595@frozen 

596class DynamicAnchor: 

597 """ 

598 Dynamic anchors, introduced in draft 2020. 

599 """ 

600 

601 name: str 

602 resource: SchemaResource 

603 

604 def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]: 

605 """ 

606 Resolve this anchor dynamically. 

607 """ 

608 last = self.resource 

609 for uri, registry in resolver.dynamic_scope(): 

610 try: 

611 anchor = registry.anchor(uri, self.name).value 

612 except exceptions.NoSuchAnchor: 

613 continue 

614 if isinstance(anchor, DynamicAnchor): 

615 last = anchor.resource 

616 return _Resolved( 

617 contents=last.contents, 

618 resolver=resolver.in_subresource(last), 

619 ) 

620 

621 

622def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]: 

623 """ 

624 Recursive references (via recursive anchors), present only in draft 2019. 

625 

626 As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive 

627 reference is supported (and is therefore assumed to be the relevant 

628 reference). 

629 """ 

630 resolved = resolver.lookup("#") 

631 if isinstance(resolved.contents, Mapping) and resolved.contents.get( 

632 "$recursiveAnchor", 

633 ): 

634 for uri, _ in resolver.dynamic_scope(): 

635 next_resolved = resolver.lookup(uri) 

636 if not isinstance( 

637 next_resolved.contents, 

638 Mapping, 

639 ) or not next_resolved.contents.get("$recursiveAnchor"): 

640 break 

641 resolved = next_resolved 

642 return resolved