Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/jsonschema.py: 28%

215 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-22 06:29 +0000

1""" 

2Referencing implementations for JSON Schema specs (historic & current). 

3""" 

4 

5from __future__ import annotations 

6 

7from collections.abc import Sequence, Set 

8from typing import Any, Iterable, Union 

9 

10from referencing import Anchor, Registry, Resource, Specification, exceptions 

11from referencing._attrs import frozen 

12from referencing._core import _UNSET # type: ignore[reportPrivateUsage] 

13from referencing._core import _Unset # type: ignore[reportPrivateUsage] 

14from referencing._core import Resolved as _Resolved, Resolver as _Resolver 

15from referencing.typing import URI, Anchor as AnchorType, Mapping 

16 

17#: A JSON Schema which is a JSON object 

18ObjectSchema = Mapping[str, Any] 

19 

20#: A JSON Schema of any kind 

21Schema = Union[bool, ObjectSchema] 

22 

23#: A JSON Schema Registry 

24SchemaRegistry = Registry[Schema] 

25 

26#: The empty JSON Schema Registry 

27EMPTY_REGISTRY: SchemaRegistry = Registry() 

28 

29 

30@frozen 

31class UnknownDialect(Exception): 

32 """ 

33 A dialect identifier was found for a dialect unknown by this library. 

34 

35 If it's a custom ("unofficial") dialect, be sure you've registered it. 

36 """ 

37 

38 uri: URI 

39 

40 

41def _dollar_id(contents: Schema) -> URI | None: 

42 if isinstance(contents, bool): 

43 return 

44 return contents.get("$id") 

45 

46 

47def _legacy_dollar_id(contents: Schema) -> URI | None: 

48 if isinstance(contents, bool) or "$ref" in contents: 

49 return 

50 id = contents.get("$id") 

51 if id is not None and not id.startswith("#"): 

52 return id 

53 

54 

55def _legacy_id(contents: ObjectSchema) -> URI | None: 

56 if "$ref" in contents: 

57 return 

58 id = contents.get("id") 

59 if id is not None and not id.startswith("#"): 

60 return id 

61 

62 

63def _anchor( 

64 specification: Specification[Schema], 

65 contents: Schema, 

66) -> Iterable[AnchorType[Schema]]: 

67 if isinstance(contents, bool): 

68 return 

69 anchor = contents.get("$anchor") 

70 if anchor is not None: 

71 yield Anchor( 

72 name=anchor, 

73 resource=specification.create_resource(contents), 

74 ) 

75 

76 dynamic_anchor = contents.get("$dynamicAnchor") 

77 if dynamic_anchor is not None: 

78 yield DynamicAnchor( 

79 name=dynamic_anchor, 

80 resource=specification.create_resource(contents), 

81 ) 

82 

83 

84def _anchor_2019( 

85 specification: Specification[Schema], 

86 contents: Schema, 

87) -> Iterable[Anchor[Schema]]: 

88 if isinstance(contents, bool): 

89 return [] 

90 anchor = contents.get("$anchor") 

91 if anchor is None: 

92 return [] 

93 return [ 

94 Anchor( 

95 name=anchor, 

96 resource=specification.create_resource(contents), 

97 ), 

98 ] 

99 

100 

101def _legacy_anchor_in_dollar_id( 

102 specification: Specification[Schema], 

103 contents: Schema, 

104) -> Iterable[Anchor[Schema]]: 

105 if isinstance(contents, bool): 

106 return [] 

107 id = contents.get("$id", "") 

108 if not id.startswith("#"): 

109 return [] 

110 return [ 

111 Anchor( 

112 name=id[1:], 

113 resource=specification.create_resource(contents), 

114 ), 

115 ] 

116 

117 

118def _legacy_anchor_in_id( 

119 specification: Specification[ObjectSchema], 

120 contents: ObjectSchema, 

121) -> Iterable[Anchor[ObjectSchema]]: 

122 id = contents.get("id", "") 

123 if not id.startswith("#"): 

124 return [] 

125 return [ 

126 Anchor( 

127 name=id[1:], 

128 resource=specification.create_resource(contents), 

129 ), 

130 ] 

131 

132 

133def _subresources_of( 

134 in_value: Set[str] = frozenset(), 

135 in_subvalues: Set[str] = frozenset(), 

136 in_subarray: Set[str] = frozenset(), 

137): 

138 """ 

139 Create a callable returning JSON Schema specification-style subschemas. 

140 

141 Relies on specifying the set of keywords containing subschemas in their 

142 values, in a subobject's values, or in a subarray. 

143 """ 

144 

145 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

146 if isinstance(contents, bool): 

147 return 

148 for each in in_value: 

149 if each in contents: 

150 yield contents[each] 

151 for each in in_subarray: 

152 if each in contents: 

153 yield from contents[each] 

154 for each in in_subvalues: 

155 if each in contents: 

156 yield from contents[each].values() 

157 

158 return subresources_of 

159 

160 

161def _subresources_of_with_crazy_items( 

162 in_value: Set[str] = frozenset(), 

163 in_subvalues: Set[str] = frozenset(), 

164 in_subarray: Set[str] = frozenset(), 

165): 

166 """ 

167 Specifically handle older drafts where there are some funky keywords. 

168 """ 

169 

170 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

171 if isinstance(contents, bool): 

172 return 

173 for each in in_value: 

174 if each in contents: 

175 yield contents[each] 

176 for each in in_subarray: 

177 if each in contents: 

178 yield from contents[each] 

179 for each in in_subvalues: 

180 if each in contents: 

181 yield from contents[each].values() 

182 

183 items = contents.get("items") 

184 if items is not None: 

185 if isinstance(items, Sequence): 

186 yield from items 

187 else: 

188 yield items 

189 

190 return subresources_of 

191 

192 

193def _subresources_of_with_crazy_items_dependencies( 

194 in_value: Set[str] = frozenset(), 

195 in_subvalues: Set[str] = frozenset(), 

196 in_subarray: Set[str] = frozenset(), 

197): 

198 """ 

199 Specifically handle older drafts where there are some funky keywords. 

200 """ 

201 

202 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

203 if isinstance(contents, bool): 

204 return 

205 for each in in_value: 

206 if each in contents: 

207 yield contents[each] 

208 for each in in_subarray: 

209 if each in contents: 

210 yield from contents[each] 

211 for each in in_subvalues: 

212 if each in contents: 

213 yield from contents[each].values() 

214 

215 items = contents.get("items") 

216 if items is not None: 

217 if isinstance(items, Sequence): 

218 yield from items 

219 else: 

220 yield items 

221 dependencies = contents.get("dependencies") 

222 if dependencies is not None: 

223 values = iter(dependencies.values()) 

224 value = next(values, None) 

225 if isinstance(value, Mapping): 

226 yield value 

227 yield from values 

228 

229 return subresources_of 

230 

231 

232def _subresources_of_with_crazy_aP_items_dependencies( 

233 in_value: Set[str] = frozenset(), 

234 in_subvalues: Set[str] = frozenset(), 

235 in_subarray: Set[str] = frozenset(), 

236): 

237 """ 

238 Specifically handle even older drafts where there are some funky keywords. 

239 """ 

240 

241 def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]: 

242 for each in in_value: 

243 if each in contents: 

244 yield contents[each] 

245 for each in in_subarray: 

246 if each in contents: 

247 yield from contents[each] 

248 for each in in_subvalues: 

249 if each in contents: 

250 yield from contents[each].values() 

251 

252 items = contents.get("items") 

253 if items is not None: 

254 if isinstance(items, Sequence): 

255 yield from items 

256 else: 

257 yield items 

258 dependencies = contents.get("dependencies") 

259 if dependencies is not None: 

260 values = iter(dependencies.values()) 

261 value = next(values, None) 

262 if isinstance(value, Mapping): 

263 yield value 

264 yield from values 

265 

266 for each in "additionalItems", "additionalProperties": 

267 value = contents.get(each) 

268 if isinstance(value, Mapping): 

269 yield value 

270 

271 return subresources_of 

272 

273 

274def _maybe_in_subresource( 

275 in_value: Set[str] = frozenset(), 

276 in_subvalues: Set[str] = frozenset(), 

277 in_subarray: Set[str] = frozenset(), 

278): 

279 in_child = in_subvalues | in_subarray 

280 

281 def maybe_in_subresource( 

282 segments: Sequence[int | str], 

283 resolver: _Resolver[Any], 

284 subresource: Resource[Any], 

285 ) -> _Resolver[Any]: 

286 _segments = iter(segments) 

287 for segment in _segments: 

288 if segment not in in_value and ( 

289 segment not in in_child or next(_segments, None) is None 

290 ): 

291 return resolver 

292 return resolver.in_subresource(subresource) 

293 

294 return maybe_in_subresource 

295 

296 

297def _maybe_in_subresource_crazy_items( 

298 in_value: Set[str] = frozenset(), 

299 in_subvalues: Set[str] = frozenset(), 

300 in_subarray: Set[str] = frozenset(), 

301): 

302 in_child = in_subvalues | in_subarray 

303 

304 def maybe_in_subresource( 

305 segments: Sequence[int | str], 

306 resolver: _Resolver[Any], 

307 subresource: Resource[Any], 

308 ) -> _Resolver[Any]: 

309 _segments = iter(segments) 

310 for segment in _segments: 

311 if segment == "items" and isinstance( 

312 subresource.contents, 

313 Mapping, 

314 ): 

315 return resolver.in_subresource(subresource) 

316 if segment not in in_value and ( 

317 segment not in in_child or next(_segments, None) is None 

318 ): 

319 return resolver 

320 return resolver.in_subresource(subresource) 

321 

322 return maybe_in_subresource 

323 

324 

325def _maybe_in_subresource_crazy_items_dependencies( 

326 in_value: Set[str] = frozenset(), 

327 in_subvalues: Set[str] = frozenset(), 

328 in_subarray: Set[str] = frozenset(), 

329): 

330 in_child = in_subvalues | in_subarray 

331 

332 def maybe_in_subresource( 

333 segments: Sequence[int | str], 

334 resolver: _Resolver[Any], 

335 subresource: Resource[Any], 

336 ) -> _Resolver[Any]: 

337 _segments = iter(segments) 

338 for segment in _segments: 

339 if segment in {"items", "dependencies"} and isinstance( 

340 subresource.contents, 

341 Mapping, 

342 ): 

343 return resolver.in_subresource(subresource) 

344 if segment not in in_value and ( 

345 segment not in in_child or next(_segments, None) is None 

346 ): 

347 return resolver 

348 return resolver.in_subresource(subresource) 

349 

350 return maybe_in_subresource 

351 

352 

353#: JSON Schema draft 2020-12 

354DRAFT202012 = Specification( 

355 name="draft2020-12", 

356 id_of=_dollar_id, 

357 subresources_of=_subresources_of( 

358 in_value={ 

359 "additionalProperties", 

360 "contains", 

361 "contentSchema", 

362 "else", 

363 "if", 

364 "items", 

365 "not", 

366 "propertyNames", 

367 "then", 

368 "unevaluatedItems", 

369 "unevaluatedProperties", 

370 }, 

371 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 

372 in_subvalues={ 

373 "$defs", 

374 "dependentSchemas", 

375 "patternProperties", 

376 "properties", 

377 }, 

378 ), 

379 anchors_in=_anchor, 

380 maybe_in_subresource=_maybe_in_subresource( 

381 in_value={ 

382 "additionalProperties", 

383 "contains", 

384 "contentSchema", 

385 "else", 

386 "if", 

387 "items", 

388 "not", 

389 "propertyNames", 

390 "then", 

391 "unevaluatedItems", 

392 "unevaluatedProperties", 

393 }, 

394 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 

395 in_subvalues={ 

396 "$defs", 

397 "dependentSchemas", 

398 "patternProperties", 

399 "properties", 

400 }, 

401 ), 

402) 

403#: JSON Schema draft 2019-09 

404DRAFT201909 = Specification( 

405 name="draft2019-09", 

406 id_of=_dollar_id, 

407 subresources_of=_subresources_of_with_crazy_items( 

408 in_value={ 

409 "additionalItems", 

410 "additionalProperties", 

411 "contains", 

412 "contentSchema", 

413 "else", 

414 "if", 

415 "not", 

416 "propertyNames", 

417 "then", 

418 "unevaluatedItems", 

419 "unevaluatedProperties", 

420 }, 

421 in_subarray={"allOf", "anyOf", "oneOf"}, 

422 in_subvalues={ 

423 "$defs", 

424 "dependentSchemas", 

425 "patternProperties", 

426 "properties", 

427 }, 

428 ), 

429 anchors_in=_anchor_2019, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real 

430 maybe_in_subresource=_maybe_in_subresource_crazy_items( 

431 in_value={ 

432 "additionalItems", 

433 "additionalProperties", 

434 "contains", 

435 "contentSchema", 

436 "else", 

437 "if", 

438 "not", 

439 "propertyNames", 

440 "then", 

441 "unevaluatedItems", 

442 "unevaluatedProperties", 

443 }, 

444 in_subarray={"allOf", "anyOf", "oneOf"}, 

445 in_subvalues={ 

446 "$defs", 

447 "dependentSchemas", 

448 "patternProperties", 

449 "properties", 

450 }, 

451 ), 

452) 

453#: JSON Schema draft 7 

454DRAFT7 = Specification( 

455 name="draft-07", 

456 id_of=_legacy_dollar_id, 

457 subresources_of=_subresources_of_with_crazy_items_dependencies( 

458 in_value={ 

459 "additionalItems", 

460 "additionalProperties", 

461 "contains", 

462 "else", 

463 "if", 

464 "not", 

465 "propertyNames", 

466 "then", 

467 }, 

468 in_subarray={"allOf", "anyOf", "oneOf"}, 

469 in_subvalues={"definitions", "patternProperties", "properties"}, 

470 ), 

471 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real 

472 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

473 in_value={ 

474 "additionalItems", 

475 "additionalProperties", 

476 "contains", 

477 "else", 

478 "if", 

479 "not", 

480 "propertyNames", 

481 "then", 

482 }, 

483 in_subarray={"allOf", "anyOf", "oneOf"}, 

484 in_subvalues={"definitions", "patternProperties", "properties"}, 

485 ), 

486) 

487#: JSON Schema draft 6 

488DRAFT6 = Specification( 

489 name="draft-06", 

490 id_of=_legacy_dollar_id, 

491 subresources_of=_subresources_of_with_crazy_items_dependencies( 

492 in_value={ 

493 "additionalItems", 

494 "additionalProperties", 

495 "contains", 

496 "not", 

497 "propertyNames", 

498 }, 

499 in_subarray={"allOf", "anyOf", "oneOf"}, 

500 in_subvalues={"definitions", "patternProperties", "properties"}, 

501 ), 

502 anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] TODO: check whether this is real 

503 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

504 in_value={ 

505 "additionalItems", 

506 "additionalProperties", 

507 "contains", 

508 "not", 

509 "propertyNames", 

510 }, 

511 in_subarray={"allOf", "anyOf", "oneOf"}, 

512 in_subvalues={"definitions", "patternProperties", "properties"}, 

513 ), 

514) 

515#: JSON Schema draft 4 

516DRAFT4 = Specification( 

517 name="draft-04", 

518 id_of=_legacy_id, 

519 subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 

520 in_value={"not"}, 

521 in_subarray={"allOf", "anyOf", "oneOf"}, 

522 in_subvalues={"definitions", "patternProperties", "properties"}, 

523 ), 

524 anchors_in=_legacy_anchor_in_id, 

525 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

526 in_value={"additionalItems", "additionalProperties", "not"}, 

527 in_subarray={"allOf", "anyOf", "oneOf"}, 

528 in_subvalues={"definitions", "patternProperties", "properties"}, 

529 ), 

530) 

531#: JSON Schema draft 3 

532DRAFT3 = Specification( 

533 name="draft-03", 

534 id_of=_legacy_id, 

535 subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 

536 in_subarray={"extends"}, 

537 in_subvalues={"definitions", "patternProperties", "properties"}, 

538 ), 

539 anchors_in=_legacy_anchor_in_id, 

540 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

541 in_value={"additionalItems", "additionalProperties"}, 

542 in_subarray={"extends"}, 

543 in_subvalues={"definitions", "patternProperties", "properties"}, 

544 ), 

545) 

546 

547 

548_SPECIFICATIONS: Registry[Specification[Schema]] = Registry( 

549 { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types 

550 dialect_id: Resource.opaque(specification) 

551 for dialect_id, specification in [ 

552 ("https://json-schema.org/draft/2020-12/schema", DRAFT202012), 

553 ("https://json-schema.org/draft/2019-09/schema", DRAFT201909), 

554 ("http://json-schema.org/draft-07/schema", DRAFT7), 

555 ("http://json-schema.org/draft-06/schema", DRAFT6), 

556 ("http://json-schema.org/draft-04/schema", DRAFT4), 

557 ("http://json-schema.org/draft-03/schema", DRAFT3), 

558 ] 

559 }, 

560) 

561 

562 

563def specification_with( 

564 dialect_id: URI, 

565 default: Specification[Any] | _Unset = _UNSET, 

566) -> Specification[Any]: 

567 """ 

568 Retrieve the `Specification` with the given dialect identifier. 

569 

570 Raises: 

571 

572 `UnknownDialect` 

573 

574 if the given ``dialect_id`` isn't known 

575 """ 

576 resource = _SPECIFICATIONS.get(dialect_id.rstrip("#")) 

577 if resource is not None: 

578 return resource.contents 

579 if default is _UNSET: 

580 raise UnknownDialect(dialect_id) 

581 return default 

582 

583 

584@frozen 

585class DynamicAnchor: 

586 """ 

587 Dynamic anchors, introduced in draft 2020. 

588 """ 

589 

590 name: str 

591 resource: Resource[Schema] 

592 

593 def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]: 

594 """ 

595 Resolve this anchor dynamically. 

596 """ 

597 last = self.resource 

598 for uri, registry in resolver.dynamic_scope(): 

599 try: 

600 anchor = registry.anchor(uri, self.name).value 

601 except exceptions.NoSuchAnchor: 

602 continue 

603 if isinstance(anchor, DynamicAnchor): 

604 last = anchor.resource 

605 return _Resolved( 

606 contents=last.contents, 

607 resolver=resolver.in_subresource(last), 

608 ) 

609 

610 

611def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]: 

612 """ 

613 Recursive references (via recursive anchors), present only in draft 2019. 

614 

615 As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive 

616 reference is supported (and is therefore assumed to be the relevant 

617 reference). 

618 """ 

619 resolved = resolver.lookup("#") 

620 if isinstance(resolved.contents, Mapping) and resolved.contents.get( 

621 "$recursiveAnchor", 

622 ): 

623 for uri, _ in resolver.dynamic_scope(): 

624 next_resolved = resolver.lookup(uri) 

625 if not isinstance( 

626 next_resolved.contents, 

627 Mapping, 

628 ) or not next_resolved.contents.get("$recursiveAnchor"): 

629 break 

630 resolved = next_resolved 

631 return resolved