Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/jsonschema.py: 75%

215 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1""" 

2Referencing implementations for JSON Schema specs (historic & current). 

3""" 

4 

5from __future__ import annotations 

6 

7from collections.abc import Sequence, Set 

8from typing import Any, Iterable, Union 

9 

10from referencing import Anchor, Registry, Resource, Specification, exceptions 

11from referencing._attrs import frozen 

12from referencing._core import _UNSET # type: ignore[reportPrivateUsage] 

13from referencing._core import _Unset # type: ignore[reportPrivateUsage] 

14from referencing._core import Resolved as _Resolved, Resolver as _Resolver 

15from referencing.typing import URI, Anchor as AnchorType, Mapping 

16 

17#: A JSON Schema which is a JSON object 

18ObjectSchema = Mapping[str, Any] 

19 

20#: A JSON Schema of any kind 

21Schema = Union[bool, ObjectSchema] 

22 

23#: A JSON Schema Registry 

24SchemaRegistry = Registry[Schema] 

25 

26#: The empty JSON Schema Registry 

27EMPTY_REGISTRY: SchemaRegistry = Registry() 

28 

29 

30@frozen 

31class UnknownDialect(Exception): 

32 """ 

33 A dialect identifier was found for a dialect unknown by this library. 

34 

35 If it's a custom ("unofficial") dialect, be sure you've registered it. 

36 """ 

37 

38 uri: URI 

39 

40 

41def _dollar_id(contents: Schema) -> URI | None: 

42 if isinstance(contents, bool): 

43 return 

44 return contents.get("$id") 

45 

46 

47def _legacy_dollar_id(contents: Schema) -> URI | None: 

48 if isinstance(contents, bool) or "$ref" in contents: 

49 return 

50 id = contents.get("$id") 

51 if id is not None and not id.startswith("#"): 

52 return id 

53 

54 

55def _legacy_id(contents: ObjectSchema) -> URI | None: 

56 if "$ref" in contents: 

57 return 

58 id = contents.get("id") 

59 if id is not None and not id.startswith("#"): 

60 return id 

61 

62 

63def _anchor( 

64 specification: Specification[Schema], 

65 contents: Schema, 

66) -> Iterable[AnchorType[Schema]]: 

67 if isinstance(contents, bool): 

68 return 

69 anchor = contents.get("$anchor") 

70 if anchor is not None: 

71 yield Anchor( 

72 name=anchor, 

73 resource=specification.create_resource(contents), 

74 ) 

75 

76 dynamic_anchor = contents.get("$dynamicAnchor") 

77 if dynamic_anchor is not None: 

78 yield DynamicAnchor( 

79 name=dynamic_anchor, 

80 resource=specification.create_resource(contents), 

81 ) 

82 

83 

84def _anchor_2019( 

85 specification: Specification[Schema], 

86 contents: Schema, 

87) -> Iterable[Anchor[Schema]]: 

88 if isinstance(contents, bool): 

89 return [] 

90 anchor = contents.get("$anchor") 

91 if anchor is None: 

92 return [] 

93 return [ 

94 Anchor( 

95 name=anchor, 

96 resource=specification.create_resource(contents), 

97 ), 

98 ] 

99 

100 

101def _legacy_anchor_in_dollar_id( 

102 specification: Specification[Schema], 

103 contents: Schema, 

104) -> Iterable[Anchor[Schema]]: 

105 if isinstance(contents, bool): 

106 return [] 

107 id = contents.get("$id", "") 

108 if not id.startswith("#"): 

109 return [] 

110 return [ 

111 Anchor( 

112 name=id[1:], 

113 resource=specification.create_resource(contents), 

114 ), 

115 ] 

116 

117 

118def _legacy_anchor_in_id( 

119 specification: Specification[ObjectSchema], 

120 contents: ObjectSchema, 

121) -> Iterable[Anchor[ObjectSchema]]: 

122 id = contents.get("id", "") 

123 if not id.startswith("#"): 

124 return [] 

125 return [ 

126 Anchor( 

127 name=id[1:], 

128 resource=specification.create_resource(contents), 

129 ), 

130 ] 

131 

132 

133def _subresources_of( 

134 in_value: Set[str] = frozenset(), 

135 in_subvalues: Set[str] = frozenset(), 

136 in_subarray: Set[str] = frozenset(), 

137): 

138 """ 

139 Create a callable returning JSON Schema specification-style subschemas. 

140 

141 Relies on specifying the set of keywords containing subschemas in their 

142 values, in a subobject's values, or in a subarray. 

143 """ 

144 

145 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

146 if isinstance(contents, bool): 

147 return 

148 for each in in_value: 

149 if each in contents: 

150 yield contents[each] 

151 for each in in_subarray: 

152 if each in contents: 

153 yield from contents[each] 

154 for each in in_subvalues: 

155 if each in contents: 

156 yield from contents[each].values() 

157 

158 return subresources_of 

159 

160 

161def _subresources_of_with_crazy_items( 

162 in_value: Set[str] = frozenset(), 

163 in_subvalues: Set[str] = frozenset(), 

164 in_subarray: Set[str] = frozenset(), 

165): 

166 """ 

167 Specifically handle older drafts where there are some funky keywords. 

168 """ 

169 

170 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

171 if isinstance(contents, bool): 

172 return 

173 for each in in_value: 

174 if each in contents: 

175 yield contents[each] 

176 for each in in_subarray: 

177 if each in contents: 

178 yield from contents[each] 

179 for each in in_subvalues: 

180 if each in contents: 

181 yield from contents[each].values() 

182 

183 items = contents.get("items") 

184 if items is not None: 

185 if isinstance(items, Sequence): 

186 yield from items 

187 else: 

188 yield items 

189 

190 return subresources_of 

191 

192 

193def _subresources_of_with_crazy_items_dependencies( 

194 in_value: Set[str] = frozenset(), 

195 in_subvalues: Set[str] = frozenset(), 

196 in_subarray: Set[str] = frozenset(), 

197): 

198 """ 

199 Specifically handle older drafts where there are some funky keywords. 

200 """ 

201 

202 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

203 if isinstance(contents, bool): 

204 return 

205 for each in in_value: 

206 if each in contents: 

207 yield contents[each] 

208 for each in in_subarray: 

209 if each in contents: 

210 yield from contents[each] 

211 for each in in_subvalues: 

212 if each in contents: 

213 yield from contents[each].values() 

214 

215 items = contents.get("items") 

216 if items is not None: 

217 if isinstance(items, Sequence): 

218 yield from items 

219 else: 

220 yield items 

221 dependencies = contents.get("dependencies") 

222 if dependencies is not None: 

223 values = iter(dependencies.values()) 

224 value = next(values, None) 

225 if isinstance(value, Mapping): 

226 yield value 

227 yield from values 

228 

229 return subresources_of 

230 

231 

232def _subresources_of_with_crazy_aP_items_dependencies( 

233 in_value: Set[str] = frozenset(), 

234 in_subvalues: Set[str] = frozenset(), 

235 in_subarray: Set[str] = frozenset(), 

236): 

237 """ 

238 Specifically handle even older drafts where there are some funky keywords. 

239 """ 

240 

241 def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]: 

242 for each in in_value: 

243 if each in contents: 

244 yield contents[each] 

245 for each in in_subarray: 

246 if each in contents: 

247 yield from contents[each] 

248 for each in in_subvalues: 

249 if each in contents: 

250 yield from contents[each].values() 

251 

252 items = contents.get("items") 

253 if items is not None: 

254 if isinstance(items, Sequence): 

255 yield from items 

256 else: 

257 yield items 

258 dependencies = contents.get("dependencies") 

259 if dependencies is not None: 

260 values = iter(dependencies.values()) 

261 value = next(values, None) 

262 if isinstance(value, Mapping): 

263 yield value 

264 yield from values 

265 

266 for each in "additionalItems", "additionalProperties": 

267 value = contents.get(each) 

268 if isinstance(value, Mapping): 

269 yield value 

270 

271 return subresources_of 

272 

273 

274def _maybe_in_subresource( 

275 in_value: Set[str] = frozenset(), 

276 in_subvalues: Set[str] = frozenset(), 

277 in_subarray: Set[str] = frozenset(), 

278): 

279 in_child = in_subvalues | in_subarray 

280 

281 def maybe_in_subresource( 

282 segments: Sequence[int | str], 

283 resolver: _Resolver[Any], 

284 subresource: Resource[Any], 

285 ) -> _Resolver[Any]: 

286 _segments = iter(segments) 

287 for segment in _segments: 

288 if segment not in in_value and ( 

289 segment not in in_child or next(_segments, None) is None 

290 ): 

291 return resolver 

292 return resolver.in_subresource(subresource) 

293 

294 return maybe_in_subresource 

295 

296 

297def _maybe_in_subresource_crazy_items( 

298 in_value: Set[str] = frozenset(), 

299 in_subvalues: Set[str] = frozenset(), 

300 in_subarray: Set[str] = frozenset(), 

301): 

302 in_child = in_subvalues | in_subarray 

303 

304 def maybe_in_subresource( 

305 segments: Sequence[int | str], 

306 resolver: _Resolver[Any], 

307 subresource: Resource[Any], 

308 ) -> _Resolver[Any]: 

309 _segments = iter(segments) 

310 for segment in _segments: 

311 if segment == "items" and isinstance( 

312 subresource.contents, 

313 Mapping, 

314 ): 

315 return resolver.in_subresource(subresource) 

316 if segment not in in_value and ( 

317 segment not in in_child or next(_segments, None) is None 

318 ): 

319 return resolver 

320 return resolver.in_subresource(subresource) 

321 

322 return maybe_in_subresource 

323 

324 

325def _maybe_in_subresource_crazy_items_dependencies( 

326 in_value: Set[str] = frozenset(), 

327 in_subvalues: Set[str] = frozenset(), 

328 in_subarray: Set[str] = frozenset(), 

329): 

330 in_child = in_subvalues | in_subarray 

331 

332 def maybe_in_subresource( 

333 segments: Sequence[int | str], 

334 resolver: _Resolver[Any], 

335 subresource: Resource[Any], 

336 ) -> _Resolver[Any]: 

337 _segments = iter(segments) 

338 for segment in _segments: 

339 if ( 

340 segment == "items" or segment == "dependencies" 

341 ) and isinstance(subresource.contents, Mapping): 

342 return resolver.in_subresource(subresource) 

343 if segment not in in_value and ( 

344 segment not in in_child or next(_segments, None) is None 

345 ): 

346 return resolver 

347 return resolver.in_subresource(subresource) 

348 

349 return maybe_in_subresource 

350 

351 

352#: JSON Schema draft 2020-12 

353DRAFT202012 = Specification( 

354 name="draft2020-12", 

355 id_of=_dollar_id, 

356 subresources_of=_subresources_of( 

357 in_value={ 

358 "additionalProperties", 

359 "contains", 

360 "contentSchema", 

361 "else", 

362 "if", 

363 "items", 

364 "not", 

365 "propertyNames", 

366 "then", 

367 "unevaluatedItems", 

368 "unevaluatedProperties", 

369 }, 

370 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 

371 in_subvalues={ 

372 "$defs", 

373 "dependentSchemas", 

374 "patternProperties", 

375 "properties", 

376 }, 

377 ), 

378 anchors_in=_anchor, 

379 maybe_in_subresource=_maybe_in_subresource( 

380 in_value={ 

381 "additionalProperties", 

382 "contains", 

383 "contentSchema", 

384 "else", 

385 "if", 

386 "items", 

387 "not", 

388 "propertyNames", 

389 "then", 

390 "unevaluatedItems", 

391 "unevaluatedProperties", 

392 }, 

393 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 

394 in_subvalues={ 

395 "$defs", 

396 "dependentSchemas", 

397 "patternProperties", 

398 "properties", 

399 }, 

400 ), 

401) 

402#: JSON Schema draft 2019-09 

403DRAFT201909 = Specification( 

404 name="draft2019-09", 

405 id_of=_dollar_id, 

406 subresources_of=_subresources_of_with_crazy_items( 

407 in_value={ 

408 "additionalItems", 

409 "additionalProperties", 

410 "contains", 

411 "contentSchema", 

412 "else", 

413 "if", 

414 "not", 

415 "propertyNames", 

416 "then", 

417 "unevaluatedItems", 

418 "unevaluatedProperties", 

419 }, 

420 in_subarray={"allOf", "anyOf", "oneOf"}, 

421 in_subvalues={ 

422 "$defs", 

423 "dependentSchemas", 

424 "patternProperties", 

425 "properties", 

426 }, 

427 ), 

428 anchors_in=_anchor_2019, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real 

429 maybe_in_subresource=_maybe_in_subresource_crazy_items( 

430 in_value={ 

431 "additionalItems", 

432 "additionalProperties", 

433 "contains", 

434 "contentSchema", 

435 "else", 

436 "if", 

437 "not", 

438 "propertyNames", 

439 "then", 

440 "unevaluatedItems", 

441 "unevaluatedProperties", 

442 }, 

443 in_subarray={"allOf", "anyOf", "oneOf"}, 

444 in_subvalues={ 

445 "$defs", 

446 "dependentSchemas", 

447 "patternProperties", 

448 "properties", 

449 }, 

450 ), 

451) 

452#: JSON Schema draft 7 

453DRAFT7 = Specification( 

454 name="draft-07", 

455 id_of=_legacy_dollar_id, 

456 subresources_of=_subresources_of_with_crazy_items_dependencies( 

457 in_value={ 

458 "additionalItems", 

459 "additionalProperties", 

460 "contains", 

461 "else", 

462 "if", 

463 "not", 

464 "propertyNames", 

465 "then", 

466 }, 

467 in_subarray={"allOf", "anyOf", "oneOf"}, 

468 in_subvalues={"definitions", "patternProperties", "properties"}, 

469 ), 

470 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real 

471 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

472 in_value={ 

473 "additionalItems", 

474 "additionalProperties", 

475 "contains", 

476 "else", 

477 "if", 

478 "not", 

479 "propertyNames", 

480 "then", 

481 }, 

482 in_subarray={"allOf", "anyOf", "oneOf"}, 

483 in_subvalues={"definitions", "patternProperties", "properties"}, 

484 ), 

485) 

486#: JSON Schema draft 6 

487DRAFT6 = Specification( 

488 name="draft-06", 

489 id_of=_legacy_dollar_id, 

490 subresources_of=_subresources_of_with_crazy_items_dependencies( 

491 in_value={ 

492 "additionalItems", 

493 "additionalProperties", 

494 "contains", 

495 "not", 

496 "propertyNames", 

497 }, 

498 in_subarray={"allOf", "anyOf", "oneOf"}, 

499 in_subvalues={"definitions", "patternProperties", "properties"}, 

500 ), 

501 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real 

502 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

503 in_value={ 

504 "additionalItems", 

505 "additionalProperties", 

506 "contains", 

507 "not", 

508 "propertyNames", 

509 }, 

510 in_subarray={"allOf", "anyOf", "oneOf"}, 

511 in_subvalues={"definitions", "patternProperties", "properties"}, 

512 ), 

513) 

514#: JSON Schema draft 4 

515DRAFT4 = Specification( 

516 name="draft-04", 

517 id_of=_legacy_id, 

518 subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 

519 in_value={"not"}, 

520 in_subarray={"allOf", "anyOf", "oneOf"}, 

521 in_subvalues={"definitions", "patternProperties", "properties"}, 

522 ), 

523 anchors_in=_legacy_anchor_in_id, 

524 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

525 in_value={"additionalItems", "additionalProperties", "not"}, 

526 in_subarray={"allOf", "anyOf", "oneOf"}, 

527 in_subvalues={"definitions", "patternProperties", "properties"}, 

528 ), 

529) 

530#: JSON Schema draft 3 

531DRAFT3 = Specification( 

532 name="draft-03", 

533 id_of=_legacy_id, 

534 subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 

535 in_subarray={"extends"}, 

536 in_subvalues={"definitions", "patternProperties", "properties"}, 

537 ), 

538 anchors_in=_legacy_anchor_in_id, 

539 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

540 in_value={"additionalItems", "additionalProperties"}, 

541 in_subarray={"extends"}, 

542 in_subvalues={"definitions", "patternProperties", "properties"}, 

543 ), 

544) 

545 

546 

547_SPECIFICATIONS: Registry[Specification[Schema]] = Registry( 

548 { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types 

549 dialect_id: Resource.opaque(specification) 

550 for dialect_id, specification in [ 

551 ("https://json-schema.org/draft/2020-12/schema", DRAFT202012), 

552 ("https://json-schema.org/draft/2019-09/schema", DRAFT201909), 

553 ("http://json-schema.org/draft-07/schema", DRAFT7), 

554 ("http://json-schema.org/draft-06/schema", DRAFT6), 

555 ("http://json-schema.org/draft-04/schema", DRAFT4), 

556 ("http://json-schema.org/draft-03/schema", DRAFT3), 

557 ] 

558 }, 

559) 

560 

561 

562def specification_with( 

563 dialect_id: URI, 

564 default: Specification[Any] | _Unset = _UNSET, 

565) -> Specification[Any]: 

566 """ 

567 Retrieve the `Specification` with the given dialect identifier. 

568 

569 Raises: 

570 

571 `UnknownDialect` 

572 

573 if the given ``dialect_id`` isn't known 

574 """ 

575 resource = _SPECIFICATIONS.get(dialect_id.rstrip("#")) 

576 if resource is not None: 

577 return resource.contents 

578 if default is _UNSET: 

579 raise UnknownDialect(dialect_id) 

580 return default 

581 

582 

583@frozen 

584class DynamicAnchor: 

585 """ 

586 Dynamic anchors, introduced in draft 2020. 

587 """ 

588 

589 name: str 

590 resource: Resource[Schema] 

591 

592 def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]: 

593 """ 

594 Resolve this anchor dynamically. 

595 """ 

596 last = self.resource 

597 for uri, registry in resolver.dynamic_scope(): 

598 try: 

599 anchor = registry.anchor(uri, self.name).value 

600 except exceptions.NoSuchAnchor: 

601 continue 

602 if isinstance(anchor, DynamicAnchor): 

603 last = anchor.resource 

604 return _Resolved( 

605 contents=last.contents, 

606 resolver=resolver.in_subresource(last), 

607 ) 

608 

609 

610def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]: 

611 """ 

612 Recursive references (via recursive anchors), present only in draft 2019. 

613 

614 As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive 

615 reference is supported (and is therefore assumed to be the relevant 

616 reference). 

617 """ 

618 resolved = resolver.lookup("#") 

619 if isinstance(resolved.contents, Mapping) and resolved.contents.get( 

620 "$recursiveAnchor", 

621 ): 

622 for uri, _ in resolver.dynamic_scope(): 

623 next_resolved = resolver.lookup(uri) 

624 if not isinstance( 

625 next_resolved.contents, 

626 Mapping, 

627 ) or not next_resolved.contents.get("$recursiveAnchor"): 

628 break 

629 resolved = next_resolved 

630 return resolved