Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/msgspec/_json_schema.py: 6%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

289 statements  

1from __future__ import annotations 

2 

3import re 

4import textwrap 

5from collections.abc import Iterable 

6from typing import Any, Callable, Optional 

7 

8from . import inspect as mi, to_builtins 

9 

10__all__ = ("schema", "schema_components") 

11 

12 

13def schema( 

14 type: Any, *, schema_hook: Optional[Callable[[type], dict[str, Any]]] = None 

15) -> dict[str, Any]: 

16 """Generate a JSON Schema for a given type. 

17 

18 Any schemas for (potentially) shared components are extracted and stored in 

19 a top-level ``"$defs"`` field. 

20 

21 If you want to generate schemas for multiple types, or to have more control 

22 over the generated schema you may want to use ``schema_components`` instead. 

23 

24 Parameters 

25 ---------- 

26 type : type 

27 The type to generate the schema for. 

28 schema_hook : callable, optional 

29 An optional callback to use for generating JSON schemas of custom 

30 types. Will be called with the custom type, and should return a dict 

31 representation of the JSON schema for that type. 

32 

33 Returns 

34 ------- 

35 schema : dict 

36 The generated JSON Schema. 

37 

38 See Also 

39 -------- 

40 schema_components 

41 """ 

42 (out,), components = schema_components((type,), schema_hook=schema_hook) 

43 if components: 

44 out["$defs"] = components 

45 return out 

46 

47 

48def schema_components( 

49 types: Iterable[Any], 

50 *, 

51 schema_hook: Optional[Callable[[type], dict[str, Any]]] = None, 

52 ref_template: str = "#/$defs/{name}", 

53) -> tuple[tuple[dict[str, Any], ...], dict[str, Any]]: 

54 """Generate JSON Schemas for one or more types. 

55 

56 Any schemas for (potentially) shared components are extracted and returned 

57 in a separate ``components`` dict. 

58 

59 Parameters 

60 ---------- 

61 types : Iterable[type] 

62 An iterable of one or more types to generate schemas for. 

63 schema_hook : callable, optional 

64 An optional callback to use for generating JSON schemas of custom 

65 types. Will be called with the custom type, and should return a dict 

66 representation of the JSON schema for that type. 

67 ref_template : str, optional 

68 A template to use when generating ``"$ref"`` fields. This template is 

69 formatted with the type name as ``template.format(name=name)``. This 

70 can be useful if you intend to store the ``components`` mapping 

71 somewhere other than a top-level ``"$defs"`` field. For example, you 

72 might use ``ref_template="#/components/{name}"`` if generating an 

73 OpenAPI schema. 

74 

75 Returns 

76 ------- 

77 schemas : tuple[dict] 

78 A tuple of JSON Schemas, one for each type in ``types``. 

79 components : dict 

80 A mapping of name to schema for any shared components used by 

81 ``schemas``. 

82 

83 See Also 

84 -------- 

85 schema 

86 """ 

87 type_infos = mi.multi_type_info(types) 

88 

89 component_types = _collect_component_types(type_infos) 

90 

91 name_map = _build_name_map(component_types) 

92 

93 gen = _SchemaGenerator(name_map, schema_hook, ref_template) 

94 

95 schemas = tuple(gen.to_schema(t) for t in type_infos) 

96 

97 components = { 

98 name_map[cls]: gen.to_schema(t, False) for cls, t in component_types.items() 

99 } 

100 return schemas, components 

101 

102 

103def _collect_component_types(type_infos: Iterable[mi.Type]) -> dict[Any, mi.Type]: 

104 """Find all types in the type tree that are "nameable" and worthy of being 

105 extracted out into a shared top-level components mapping. 

106 

107 Currently this looks for Struct, Dataclass, NamedTuple, TypedDict, and Enum 

108 types. 

109 """ 

110 components = {} 

111 

112 def collect(t): 

113 if isinstance( 

114 t, (mi.StructType, mi.TypedDictType, mi.DataclassType, mi.NamedTupleType) 

115 ): 

116 if t.cls not in components: 

117 components[t.cls] = t 

118 for f in t.fields: 

119 collect(f.type) 

120 elif isinstance(t, mi.EnumType): 

121 components[t.cls] = t 

122 elif isinstance(t, mi.Metadata): 

123 collect(t.type) 

124 elif isinstance(t, mi.CollectionType): 

125 collect(t.item_type) 

126 elif isinstance(t, mi.TupleType): 

127 for st in t.item_types: 

128 collect(st) 

129 elif isinstance(t, mi.DictType): 

130 collect(t.key_type) 

131 collect(t.value_type) 

132 elif isinstance(t, mi.UnionType): 

133 for st in t.types: 

134 collect(st) 

135 

136 for t in type_infos: 

137 collect(t) 

138 

139 return components 

140 

141 

142def _type_repr(obj): 

143 return obj.__name__ if isinstance(obj, type) else repr(obj) 

144 

145 

146def _get_class_name(cls: Any) -> str: 

147 if hasattr(cls, "__origin__"): 

148 name = cls.__origin__.__name__ 

149 args = ", ".join(_type_repr(a) for a in cls.__args__) 

150 return f"{name}[{args}]" 

151 return cls.__name__ 

152 

153 

154def _get_doc(t: mi.Type) -> str: 

155 assert hasattr(t, "cls") 

156 cls = getattr(t.cls, "__origin__", t.cls) 

157 doc = getattr(cls, "__doc__", "") 

158 if not doc: 

159 return "" 

160 doc = textwrap.dedent(doc).strip("\r\n") 

161 if isinstance(t, mi.EnumType): 

162 if doc == "An enumeration.": 

163 return "" 

164 elif isinstance(t, (mi.NamedTupleType, mi.DataclassType)): 

165 if doc.startswith(f"{cls.__name__}(") and doc.endswith(")"): 

166 return "" 

167 return doc 

168 

169 

170def _build_name_map(component_types: dict[Any, mi.Type]) -> dict[Any, str]: 

171 """A mapping from nameable subcomponents to a generated name. 

172 

173 The generated name is usually a normalized version of the class name. In 

174 the case of conflicts, the name will be expanded to also include the full 

175 import path. 

176 """ 

177 

178 def normalize(name): 

179 return re.sub(r"[^a-zA-Z0-9.\-_]", "_", name) 

180 

181 def fullname(cls): 

182 return normalize(f"{cls.__module__}.{cls.__qualname__}") 

183 

184 conflicts = set() 

185 names: dict[str, Any] = {} 

186 

187 for cls in component_types: 

188 name = normalize(_get_class_name(cls)) 

189 if name in names: 

190 old = names.pop(name) 

191 conflicts.add(name) 

192 names[fullname(old)] = old 

193 if name in conflicts: 

194 names[fullname(cls)] = cls 

195 else: 

196 names[name] = cls 

197 return {v: k for k, v in names.items()} 

198 

199 

200class _SchemaGenerator: 

201 def __init__( 

202 self, 

203 name_map: dict[Any, str], 

204 schema_hook: Optional[Callable[[type], dict[str, Any]]] = None, 

205 ref_template: str = "#/$defs/{name}", 

206 ): 

207 self.name_map = name_map 

208 self.schema_hook = schema_hook 

209 self.ref_template = ref_template 

210 

211 def to_schema(self, t: mi.Type, check_ref: bool = True) -> dict[str, Any]: 

212 """Converts a Type to a json-schema.""" 

213 schema: dict[str, Any] = {} 

214 

215 while isinstance(t, mi.Metadata): 

216 schema = mi._merge_json(schema, t.extra_json_schema) 

217 t = t.type 

218 

219 if check_ref and hasattr(t, "cls"): 

220 if name := self.name_map.get(t.cls): 

221 schema["$ref"] = self.ref_template.format(name=name) 

222 return schema 

223 

224 if isinstance(t, (mi.AnyType, mi.RawType)): 

225 pass 

226 elif isinstance(t, mi.NoneType): 

227 schema["type"] = "null" 

228 elif isinstance(t, mi.BoolType): 

229 schema["type"] = "boolean" 

230 elif isinstance(t, (mi.IntType, mi.FloatType)): 

231 schema["type"] = "integer" if isinstance(t, mi.IntType) else "number" 

232 if t.ge is not None: 

233 schema["minimum"] = t.ge 

234 if t.gt is not None: 

235 schema["exclusiveMinimum"] = t.gt 

236 if t.le is not None: 

237 schema["maximum"] = t.le 

238 if t.lt is not None: 

239 schema["exclusiveMaximum"] = t.lt 

240 if t.multiple_of is not None: 

241 schema["multipleOf"] = t.multiple_of 

242 elif isinstance(t, mi.StrType): 

243 schema["type"] = "string" 

244 if t.max_length is not None: 

245 schema["maxLength"] = t.max_length 

246 if t.min_length is not None: 

247 schema["minLength"] = t.min_length 

248 if t.pattern is not None: 

249 schema["pattern"] = t.pattern 

250 elif isinstance(t, (mi.BytesType, mi.ByteArrayType, mi.MemoryViewType)): 

251 schema["type"] = "string" 

252 schema["contentEncoding"] = "base64" 

253 if t.max_length is not None: 

254 schema["maxLength"] = 4 * ((t.max_length + 2) // 3) 

255 if t.min_length is not None: 

256 schema["minLength"] = 4 * ((t.min_length + 2) // 3) 

257 elif isinstance(t, mi.DateTimeType): 

258 schema["type"] = "string" 

259 if t.tz is True: 

260 schema["format"] = "date-time" 

261 elif isinstance(t, mi.TimeType): 

262 schema["type"] = "string" 

263 if t.tz is True: 

264 schema["format"] = "time" 

265 elif t.tz is False: 

266 schema["format"] = "partial-time" 

267 elif isinstance(t, mi.DateType): 

268 schema["type"] = "string" 

269 schema["format"] = "date" 

270 elif isinstance(t, mi.TimeDeltaType): 

271 schema["type"] = "string" 

272 schema["format"] = "duration" 

273 elif isinstance(t, mi.UUIDType): 

274 schema["type"] = "string" 

275 schema["format"] = "uuid" 

276 elif isinstance(t, mi.DecimalType): 

277 schema["type"] = "string" 

278 schema["format"] = "decimal" 

279 elif isinstance(t, mi.CollectionType): 

280 schema["type"] = "array" 

281 if not isinstance(t.item_type, mi.AnyType): 

282 schema["items"] = self.to_schema(t.item_type) 

283 if t.max_length is not None: 

284 schema["maxItems"] = t.max_length 

285 if t.min_length is not None: 

286 schema["minItems"] = t.min_length 

287 elif isinstance(t, mi.TupleType): 

288 schema["type"] = "array" 

289 schema["minItems"] = schema["maxItems"] = len(t.item_types) 

290 if t.item_types: 

291 schema["prefixItems"] = [self.to_schema(i) for i in t.item_types] 

292 schema["items"] = False 

293 elif isinstance(t, mi.DictType): 

294 schema["type"] = "object" 

295 # If there are restrictions on the keys, specify them as propertyNames 

296 if isinstance(key_type := t.key_type, mi.StrType): 

297 property_names: dict[str, Any] = {} 

298 if key_type.min_length is not None: 

299 property_names["minLength"] = key_type.min_length 

300 if key_type.max_length is not None: 

301 property_names["maxLength"] = key_type.max_length 

302 if key_type.pattern is not None: 

303 property_names["pattern"] = key_type.pattern 

304 if property_names: 

305 schema["propertyNames"] = property_names 

306 if not isinstance(t.value_type, mi.AnyType): 

307 schema["additionalProperties"] = self.to_schema(t.value_type) 

308 if t.max_length is not None: 

309 schema["maxProperties"] = t.max_length 

310 if t.min_length is not None: 

311 schema["minProperties"] = t.min_length 

312 elif isinstance(t, mi.UnionType): 

313 structs = {} 

314 other = [] 

315 tag_field = None 

316 for subtype in t.types: 

317 real_type = subtype 

318 while isinstance(real_type, mi.Metadata): 

319 real_type = real_type.type 

320 if isinstance(real_type, mi.StructType) and not real_type.array_like: 

321 tag_field = real_type.tag_field 

322 structs[real_type.tag] = real_type 

323 else: 

324 other.append(subtype) 

325 

326 options = [self.to_schema(a) for a in other] 

327 

328 if len(structs) >= 2: 

329 mapping = { 

330 k: self.ref_template.format(name=self.name_map[v.cls]) 

331 for k, v in structs.items() 

332 } 

333 struct_schema = { 

334 "anyOf": [self.to_schema(v) for v in structs.values()], 

335 "discriminator": {"propertyName": tag_field, "mapping": mapping}, 

336 } 

337 if options: 

338 options.append(struct_schema) 

339 schema["anyOf"] = options 

340 else: 

341 schema.update(struct_schema) 

342 elif len(structs) == 1: 

343 _, subtype = structs.popitem() 

344 options.append(self.to_schema(subtype)) 

345 schema["anyOf"] = options 

346 else: 

347 schema["anyOf"] = options 

348 elif isinstance(t, mi.LiteralType): 

349 schema["enum"] = sorted(t.values) 

350 elif isinstance(t, mi.EnumType): 

351 schema.setdefault("title", t.cls.__name__) 

352 if doc := _get_doc(t): 

353 schema.setdefault("description", doc) 

354 schema["enum"] = sorted(e.value for e in t.cls) 

355 elif isinstance(t, mi.StructType): 

356 schema.setdefault("title", _get_class_name(t.cls)) 

357 if doc := _get_doc(t): 

358 schema.setdefault("description", doc) 

359 required = [] 

360 names = [] 

361 fields = [] 

362 

363 if t.tag_field is not None: 

364 required.append(t.tag_field) 

365 names.append(t.tag_field) 

366 fields.append({"enum": [t.tag]}) 

367 

368 for field in t.fields: 

369 field_schema = self.to_schema(field.type) 

370 if field.required: 

371 required.append(field.encode_name) 

372 elif field.default is not mi.NODEFAULT: 

373 field_schema["default"] = to_builtins(field.default, str_keys=True) 

374 elif field.default_factory in (list, dict, set, bytearray): 

375 field_schema["default"] = field.default_factory() 

376 names.append(field.encode_name) 

377 fields.append(field_schema) 

378 

379 if t.array_like: 

380 n_trailing_defaults = 0 

381 for n_trailing_defaults, f in enumerate(reversed(t.fields)): 

382 if f.required: 

383 break 

384 schema["type"] = "array" 

385 schema["prefixItems"] = fields 

386 schema["minItems"] = len(fields) - n_trailing_defaults 

387 if t.forbid_unknown_fields: 

388 schema["maxItems"] = len(fields) 

389 else: 

390 schema["type"] = "object" 

391 schema["properties"] = dict(zip(names, fields)) 

392 schema["required"] = required 

393 if t.forbid_unknown_fields: 

394 schema["additionalProperties"] = False 

395 elif isinstance(t, (mi.TypedDictType, mi.DataclassType, mi.NamedTupleType)): 

396 schema.setdefault("title", _get_class_name(t.cls)) 

397 if doc := _get_doc(t): 

398 schema.setdefault("description", doc) 

399 names = [] 

400 fields = [] 

401 required = [] 

402 for field in t.fields: 

403 field_schema = self.to_schema(field.type) 

404 if field.required: 

405 required.append(field.encode_name) 

406 elif field.default is not mi.NODEFAULT: 

407 field_schema["default"] = to_builtins(field.default, str_keys=True) 

408 names.append(field.encode_name) 

409 fields.append(field_schema) 

410 if isinstance(t, mi.NamedTupleType): 

411 schema["type"] = "array" 

412 schema["prefixItems"] = fields 

413 schema["minItems"] = len(required) 

414 schema["maxItems"] = len(fields) 

415 else: 

416 schema["type"] = "object" 

417 schema["properties"] = dict(zip(names, fields)) 

418 schema["required"] = required 

419 elif isinstance(t, mi.ExtType): 

420 raise TypeError("json-schema doesn't support msgpack Ext types") 

421 elif isinstance(t, mi.CustomType): 

422 if self.schema_hook: 

423 try: 

424 schema = mi._merge_json(self.schema_hook(t.cls), schema) 

425 except NotImplementedError: 

426 pass 

427 if not schema: 

428 raise TypeError( 

429 "Generating JSON schema for custom types requires either:\n" 

430 "- specifying a `schema_hook`\n" 

431 "- annotating the type with `Meta(extra_json_schema=...)`\n" 

432 "\n" 

433 f"type {t.cls!r} is not supported" 

434 ) 

435 else: 

436 # This should be unreachable 

437 raise TypeError(f"json-schema doesn't support type {t!r}") 

438 

439 return schema