Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/referencing/jsonschema.py: 80%

212 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:30 +0000

1""" 

2Referencing implementations for JSON Schema specs (historic & current). 

3""" 

4 

5from __future__ import annotations 

6 

7from collections.abc import Sequence, Set 

8from typing import Any, Iterable, Union 

9 

10from referencing import Anchor, Registry, Resource, Specification, exceptions 

11from referencing._attrs import frozen 

12from referencing._core import Resolved as _Resolved, Resolver as _Resolver 

13from referencing.typing import URI, Anchor as AnchorType, Mapping 

14 

15#: A JSON Schema which is a JSON object 

16ObjectSchema = Mapping[str, Any] 

17 

18#: A JSON Schema of any kind 

19Schema = Union[bool, ObjectSchema] 

20 

21#: A JSON Schema Registry 

22SchemaRegistry = Registry[Schema] 

23 

24 

25@frozen 

26class UnknownDialect(Exception): 

27 """ 

28 A dialect identifier was found for a dialect unknown by this library. 

29 

30 If it's a custom ("unofficial") dialect, be sure you've registered it. 

31 """ 

32 

33 uri: URI 

34 

35 

36def _dollar_id(contents: Schema) -> URI | None: 

37 if isinstance(contents, bool): 

38 return 

39 return contents.get("$id") 

40 

41 

42def _legacy_dollar_id(contents: Schema) -> URI | None: 

43 if isinstance(contents, bool) or "$ref" in contents: 

44 return 

45 id = contents.get("$id") 

46 if id is not None and not id.startswith("#"): 

47 return id 

48 

49 

50def _legacy_id(contents: ObjectSchema) -> URI | None: 

51 if "$ref" in contents: 

52 return 

53 id = contents.get("id") 

54 if id is not None and not id.startswith("#"): 

55 return id 

56 

57 

58def _anchor( 

59 specification: Specification[Schema], 

60 contents: Schema, 

61) -> Iterable[AnchorType[Schema]]: 

62 if isinstance(contents, bool): 

63 return 

64 anchor = contents.get("$anchor") 

65 if anchor is not None: 

66 yield Anchor( 

67 name=anchor, 

68 resource=specification.create_resource(contents), 

69 ) 

70 

71 dynamic_anchor = contents.get("$dynamicAnchor") 

72 if dynamic_anchor is not None: 

73 yield DynamicAnchor( 

74 name=dynamic_anchor, 

75 resource=specification.create_resource(contents), 

76 ) 

77 

78 

79def _anchor_2019( 

80 specification: Specification[Schema], 

81 contents: Schema, 

82) -> Iterable[Anchor[Schema]]: 

83 if isinstance(contents, bool): 

84 return [] 

85 anchor = contents.get("$anchor") 

86 if anchor is None: 

87 return [] 

88 return [ 

89 Anchor( 

90 name=anchor, 

91 resource=specification.create_resource(contents), 

92 ), 

93 ] 

94 

95 

96def _legacy_anchor_in_dollar_id( 

97 specification: Specification[Schema], 

98 contents: Schema, 

99) -> Iterable[Anchor[Schema]]: 

100 if isinstance(contents, bool): 

101 return [] 

102 id = contents.get("$id", "") 

103 if not id.startswith("#"): 

104 return [] 

105 return [ 

106 Anchor( 

107 name=id[1:], 

108 resource=specification.create_resource(contents), 

109 ), 

110 ] 

111 

112 

113def _legacy_anchor_in_id( 

114 specification: Specification[ObjectSchema], 

115 contents: ObjectSchema, 

116) -> Iterable[Anchor[ObjectSchema]]: 

117 id = contents.get("id", "") 

118 if not id.startswith("#"): 

119 return [] 

120 return [ 

121 Anchor( 

122 name=id[1:], 

123 resource=specification.create_resource(contents), 

124 ), 

125 ] 

126 

127 

128def _subresources_of( 

129 in_value: Set[str] = frozenset(), 

130 in_subvalues: Set[str] = frozenset(), 

131 in_subarray: Set[str] = frozenset(), 

132): 

133 """ 

134 Create a callable returning JSON Schema specification-style subschemas. 

135 

136 Relies on specifying the set of keywords containing subschemas in their 

137 values, in a subobject's values, or in a subarray. 

138 """ 

139 

140 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

141 if isinstance(contents, bool): 

142 return 

143 for each in in_value: 

144 if each in contents: 

145 yield contents[each] 

146 for each in in_subarray: 

147 if each in contents: 

148 yield from contents[each] 

149 for each in in_subvalues: 

150 if each in contents: 

151 yield from contents[each].values() 

152 

153 return subresources_of 

154 

155 

156def _subresources_of_with_crazy_items( 

157 in_value: Set[str] = frozenset(), 

158 in_subvalues: Set[str] = frozenset(), 

159 in_subarray: Set[str] = frozenset(), 

160): 

161 """ 

162 Specifically handle older drafts where there are some funky keywords. 

163 """ 

164 

165 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

166 if isinstance(contents, bool): 

167 return 

168 for each in in_value: 

169 if each in contents: 

170 yield contents[each] 

171 for each in in_subarray: 

172 if each in contents: 

173 yield from contents[each] 

174 for each in in_subvalues: 

175 if each in contents: 

176 yield from contents[each].values() 

177 

178 items = contents.get("items") 

179 if items is not None: 

180 if isinstance(items, Sequence): 

181 yield from items 

182 else: 

183 yield items 

184 

185 return subresources_of 

186 

187 

188def _subresources_of_with_crazy_items_dependencies( 

189 in_value: Set[str] = frozenset(), 

190 in_subvalues: Set[str] = frozenset(), 

191 in_subarray: Set[str] = frozenset(), 

192): 

193 """ 

194 Specifically handle older drafts where there are some funky keywords. 

195 """ 

196 

197 def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: 

198 if isinstance(contents, bool): 

199 return 

200 for each in in_value: 

201 if each in contents: 

202 yield contents[each] 

203 for each in in_subarray: 

204 if each in contents: 

205 yield from contents[each] 

206 for each in in_subvalues: 

207 if each in contents: 

208 yield from contents[each].values() 

209 

210 items = contents.get("items") 

211 if items is not None: 

212 if isinstance(items, Sequence): 

213 yield from items 

214 else: 

215 yield items 

216 dependencies = contents.get("dependencies") 

217 if dependencies is not None: 

218 values = iter(dependencies.values()) 

219 value = next(values, None) 

220 if isinstance(value, Mapping): 

221 yield value 

222 yield from values 

223 

224 return subresources_of 

225 

226 

227def _subresources_of_with_crazy_aP_items_dependencies( 

228 in_value: Set[str] = frozenset(), 

229 in_subvalues: Set[str] = frozenset(), 

230 in_subarray: Set[str] = frozenset(), 

231): 

232 """ 

233 Specifically handle even older drafts where there are some funky keywords. 

234 """ 

235 

236 def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]: 

237 for each in in_value: 

238 if each in contents: 

239 yield contents[each] 

240 for each in in_subarray: 

241 if each in contents: 

242 yield from contents[each] 

243 for each in in_subvalues: 

244 if each in contents: 

245 yield from contents[each].values() 

246 

247 items = contents.get("items") 

248 if items is not None: 

249 if isinstance(items, Sequence): 

250 yield from items 

251 else: 

252 yield items 

253 dependencies = contents.get("dependencies") 

254 if dependencies is not None: 

255 values = iter(dependencies.values()) 

256 value = next(values, None) 

257 if isinstance(value, Mapping): 

258 yield value 

259 yield from values 

260 

261 for each in "additionalItems", "additionalProperties": 

262 value = contents.get(each) 

263 if isinstance(value, Mapping): 

264 yield value 

265 

266 return subresources_of 

267 

268 

269def _maybe_in_subresource( 

270 in_value: Set[str] = frozenset(), 

271 in_subvalues: Set[str] = frozenset(), 

272 in_subarray: Set[str] = frozenset(), 

273): 

274 in_child = in_subvalues | in_subarray 

275 

276 def maybe_in_subresource( 

277 segments: Sequence[int | str], 

278 resolver: _Resolver[Any], 

279 subresource: Resource[Any], 

280 ) -> _Resolver[Any]: 

281 _segments = iter(segments) 

282 for segment in _segments: 

283 if segment not in in_value and ( 

284 segment not in in_child or next(_segments, None) is None 

285 ): 

286 return resolver 

287 return resolver.in_subresource(subresource) 

288 

289 return maybe_in_subresource 

290 

291 

292def _maybe_in_subresource_crazy_items( 

293 in_value: Set[str] = frozenset(), 

294 in_subvalues: Set[str] = frozenset(), 

295 in_subarray: Set[str] = frozenset(), 

296): 

297 in_child = in_subvalues | in_subarray 

298 

299 def maybe_in_subresource( 

300 segments: Sequence[int | str], 

301 resolver: _Resolver[Any], 

302 subresource: Resource[Any], 

303 ) -> _Resolver[Any]: 

304 _segments = iter(segments) 

305 for segment in _segments: 

306 if segment == "items" and isinstance( 

307 subresource.contents, 

308 Mapping, 

309 ): 

310 return resolver.in_subresource(subresource) 

311 if segment not in in_value and ( 

312 segment not in in_child or next(_segments, None) is None 

313 ): 

314 return resolver 

315 return resolver.in_subresource(subresource) 

316 

317 return maybe_in_subresource 

318 

319 

320def _maybe_in_subresource_crazy_items_dependencies( 

321 in_value: Set[str] = frozenset(), 

322 in_subvalues: Set[str] = frozenset(), 

323 in_subarray: Set[str] = frozenset(), 

324): 

325 in_child = in_subvalues | in_subarray 

326 

327 def maybe_in_subresource( 

328 segments: Sequence[int | str], 

329 resolver: _Resolver[Any], 

330 subresource: Resource[Any], 

331 ) -> _Resolver[Any]: 

332 _segments = iter(segments) 

333 for segment in _segments: 

334 if ( 

335 segment == "items" or segment == "dependencies" 

336 ) and isinstance(subresource.contents, Mapping): 

337 return resolver.in_subresource(subresource) 

338 if segment not in in_value and ( 

339 segment not in in_child or next(_segments, None) is None 

340 ): 

341 return resolver 

342 return resolver.in_subresource(subresource) 

343 

344 return maybe_in_subresource 

345 

346 

347#: JSON Schema draft 2020-12 

348DRAFT202012 = Specification( 

349 name="draft2020-12", 

350 id_of=_dollar_id, 

351 subresources_of=_subresources_of( 

352 in_value={ 

353 "additionalProperties", 

354 "contains", 

355 "contentSchema", 

356 "else", 

357 "if", 

358 "items", 

359 "not", 

360 "propertyNames", 

361 "then", 

362 "unevaluatedItems", 

363 "unevaluatedProperties", 

364 }, 

365 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 

366 in_subvalues={ 

367 "$defs", 

368 "dependentSchemas", 

369 "patternProperties", 

370 "properties", 

371 }, 

372 ), 

373 anchors_in=_anchor, 

374 maybe_in_subresource=_maybe_in_subresource( 

375 in_value={ 

376 "additionalProperties", 

377 "contains", 

378 "contentSchema", 

379 "else", 

380 "if", 

381 "items", 

382 "not", 

383 "propertyNames", 

384 "then", 

385 "unevaluatedItems", 

386 "unevaluatedProperties", 

387 }, 

388 in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, 

389 in_subvalues={ 

390 "$defs", 

391 "dependentSchemas", 

392 "patternProperties", 

393 "properties", 

394 }, 

395 ), 

396) 

397#: JSON Schema draft 2019-09 

398DRAFT201909 = Specification( 

399 name="draft2019-09", 

400 id_of=_dollar_id, 

401 subresources_of=_subresources_of_with_crazy_items( 

402 in_value={ 

403 "additionalItems", 

404 "additionalProperties", 

405 "contains", 

406 "contentSchema", 

407 "else", 

408 "if", 

409 "not", 

410 "propertyNames", 

411 "then", 

412 "unevaluatedItems", 

413 "unevaluatedProperties", 

414 }, 

415 in_subarray={"allOf", "anyOf", "oneOf"}, 

416 in_subvalues={ 

417 "$defs", 

418 "dependentSchemas", 

419 "patternProperties", 

420 "properties", 

421 }, 

422 ), 

423 anchors_in=_anchor_2019, 

424 maybe_in_subresource=_maybe_in_subresource_crazy_items( 

425 in_value={ 

426 "additionalItems", 

427 "additionalProperties", 

428 "contains", 

429 "contentSchema", 

430 "else", 

431 "if", 

432 "not", 

433 "propertyNames", 

434 "then", 

435 "unevaluatedItems", 

436 "unevaluatedProperties", 

437 }, 

438 in_subarray={"allOf", "anyOf", "oneOf"}, 

439 in_subvalues={ 

440 "$defs", 

441 "dependentSchemas", 

442 "patternProperties", 

443 "properties", 

444 }, 

445 ), 

446) 

447#: JSON Schema draft 7 

448DRAFT7 = Specification( 

449 name="draft-07", 

450 id_of=_legacy_dollar_id, 

451 subresources_of=_subresources_of_with_crazy_items_dependencies( 

452 in_value={ 

453 "additionalItems", 

454 "additionalProperties", 

455 "contains", 

456 "else", 

457 "if", 

458 "not", 

459 "propertyNames", 

460 "then", 

461 }, 

462 in_subarray={"allOf", "anyOf", "oneOf"}, 

463 in_subvalues={"definitions", "patternProperties", "properties"}, 

464 ), 

465 anchors_in=_legacy_anchor_in_dollar_id, 

466 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

467 in_value={ 

468 "additionalItems", 

469 "additionalProperties", 

470 "contains", 

471 "else", 

472 "if", 

473 "not", 

474 "propertyNames", 

475 "then", 

476 }, 

477 in_subarray={"allOf", "anyOf", "oneOf"}, 

478 in_subvalues={"definitions", "patternProperties", "properties"}, 

479 ), 

480) 

481#: JSON Schema draft 6 

482DRAFT6 = Specification( 

483 name="draft-06", 

484 id_of=_legacy_dollar_id, 

485 subresources_of=_subresources_of_with_crazy_items_dependencies( 

486 in_value={ 

487 "additionalItems", 

488 "additionalProperties", 

489 "contains", 

490 "not", 

491 "propertyNames", 

492 }, 

493 in_subarray={"allOf", "anyOf", "oneOf"}, 

494 in_subvalues={"definitions", "patternProperties", "properties"}, 

495 ), 

496 anchors_in=_legacy_anchor_in_dollar_id, 

497 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

498 in_value={ 

499 "additionalItems", 

500 "additionalProperties", 

501 "contains", 

502 "not", 

503 "propertyNames", 

504 }, 

505 in_subarray={"allOf", "anyOf", "oneOf"}, 

506 in_subvalues={"definitions", "patternProperties", "properties"}, 

507 ), 

508) 

509#: JSON Schema draft 4 

510DRAFT4 = Specification( 

511 name="draft-04", 

512 id_of=_legacy_id, 

513 subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 

514 in_value={"not"}, 

515 in_subarray={"allOf", "anyOf", "oneOf"}, 

516 in_subvalues={"definitions", "patternProperties", "properties"}, 

517 ), 

518 anchors_in=_legacy_anchor_in_id, 

519 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

520 in_value={"additionalItems", "additionalProperties", "not"}, 

521 in_subarray={"allOf", "anyOf", "oneOf"}, 

522 in_subvalues={"definitions", "patternProperties", "properties"}, 

523 ), 

524) 

525#: JSON Schema draft 3 

526DRAFT3 = Specification( 

527 name="draft-03", 

528 id_of=_legacy_id, 

529 subresources_of=_subresources_of_with_crazy_aP_items_dependencies( 

530 in_subarray={"extends"}, 

531 in_subvalues={"definitions", "patternProperties", "properties"}, 

532 ), 

533 anchors_in=_legacy_anchor_in_id, 

534 maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( 

535 in_value={"additionalItems", "additionalProperties"}, 

536 in_subarray={"extends"}, 

537 in_subvalues={"definitions", "patternProperties", "properties"}, 

538 ), 

539) 

540 

541 

542_SPECIFICATIONS: Registry[Specification[Schema]] = Registry( 

543 { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types 

544 dialect_id: Resource.opaque(specification) 

545 for dialect_id, specification in [ 

546 ("https://json-schema.org/draft/2020-12/schema", DRAFT202012), 

547 ("https://json-schema.org/draft/2019-09/schema", DRAFT201909), 

548 ("http://json-schema.org/draft-07/schema", DRAFT7), 

549 ("http://json-schema.org/draft-06/schema", DRAFT6), 

550 ("http://json-schema.org/draft-04/schema", DRAFT4), 

551 ("http://json-schema.org/draft-03/schema", DRAFT3), 

552 ] 

553 }, 

554) 

555 

556 

557def specification_with( 

558 dialect_id: URI, 

559 default: Specification[Any] = None, # type: ignore[reportGeneralTypeIssues] # noqa: E501 

560) -> Specification[Any]: 

561 """ 

562 Retrieve the `Specification` with the given dialect identifier. 

563 

564 Raises: 

565 

566 `UnknownDialect` 

567 

568 if the given ``dialect_id`` isn't known 

569 """ 

570 resource = _SPECIFICATIONS.get(dialect_id.rstrip("#")) 

571 if resource is not None: 

572 return resource.contents 

573 if default is None: # type: ignore[reportUnnecessaryComparison] 

574 raise UnknownDialect(dialect_id) 

575 return default 

576 

577 

578@frozen 

579class DynamicAnchor: 

580 """ 

581 Dynamic anchors, introduced in draft 2020. 

582 """ 

583 

584 name: str 

585 resource: Resource[Schema] 

586 

587 def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]: 

588 """ 

589 Resolve this anchor dynamically. 

590 """ 

591 last = self.resource 

592 for uri, registry in resolver.dynamic_scope(): 

593 try: 

594 anchor = registry.anchor(uri, self.name).value 

595 except exceptions.NoSuchAnchor: 

596 continue 

597 if isinstance(anchor, DynamicAnchor): 

598 last = anchor.resource 

599 return _Resolved( 

600 contents=last.contents, 

601 resolver=resolver.in_subresource(last), 

602 ) 

603 

604 

605def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]: 

606 """ 

607 Recursive references (via recursive anchors), present only in draft 2019. 

608 

609 As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive 

610 reference is supported (and is therefore assumed to be the relevant 

611 reference). 

612 """ 

613 resolved = resolver.lookup("#") 

614 if isinstance(resolved.contents, Mapping) and resolved.contents.get( 

615 "$recursiveAnchor", 

616 ): 

617 for uri, _ in resolver.dynamic_scope(): 

618 next_resolved = resolver.lookup(uri) 

619 if not isinstance( 

620 next_resolved.contents, 

621 Mapping, 

622 ) or not next_resolved.contents.get("$recursiveAnchor"): 

623 break 

624 resolved = next_resolved 

625 return resolved