Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/msgspec/_json_schema.py: 6%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import re
4import textwrap
5from collections.abc import Iterable
6from typing import Any, Callable, Optional
8from . import inspect as mi, to_builtins
10__all__ = ("schema", "schema_components")
13def schema(
14 type: Any, *, schema_hook: Optional[Callable[[type], dict[str, Any]]] = None
15) -> dict[str, Any]:
16 """Generate a JSON Schema for a given type.
18 Any schemas for (potentially) shared components are extracted and stored in
19 a top-level ``"$defs"`` field.
21 If you want to generate schemas for multiple types, or to have more control
22 over the generated schema you may want to use ``schema_components`` instead.
24 Parameters
25 ----------
26 type : type
27 The type to generate the schema for.
28 schema_hook : callable, optional
29 An optional callback to use for generating JSON schemas of custom
30 types. Will be called with the custom type, and should return a dict
31 representation of the JSON schema for that type.
33 Returns
34 -------
35 schema : dict
36 The generated JSON Schema.
38 See Also
39 --------
40 schema_components
41 """
42 (out,), components = schema_components((type,), schema_hook=schema_hook)
43 if components:
44 out["$defs"] = components
45 return out
48def schema_components(
49 types: Iterable[Any],
50 *,
51 schema_hook: Optional[Callable[[type], dict[str, Any]]] = None,
52 ref_template: str = "#/$defs/{name}",
53) -> tuple[tuple[dict[str, Any], ...], dict[str, Any]]:
54 """Generate JSON Schemas for one or more types.
56 Any schemas for (potentially) shared components are extracted and returned
57 in a separate ``components`` dict.
59 Parameters
60 ----------
61 types : Iterable[type]
62 An iterable of one or more types to generate schemas for.
63 schema_hook : callable, optional
64 An optional callback to use for generating JSON schemas of custom
65 types. Will be called with the custom type, and should return a dict
66 representation of the JSON schema for that type.
67 ref_template : str, optional
68 A template to use when generating ``"$ref"`` fields. This template is
69 formatted with the type name as ``template.format(name=name)``. This
70 can be useful if you intend to store the ``components`` mapping
71 somewhere other than a top-level ``"$defs"`` field. For example, you
72 might use ``ref_template="#/components/{name}"`` if generating an
73 OpenAPI schema.
75 Returns
76 -------
77 schemas : tuple[dict]
78 A tuple of JSON Schemas, one for each type in ``types``.
79 components : dict
80 A mapping of name to schema for any shared components used by
81 ``schemas``.
83 See Also
84 --------
85 schema
86 """
87 type_infos = mi.multi_type_info(types)
89 component_types = _collect_component_types(type_infos)
91 name_map = _build_name_map(component_types)
93 gen = _SchemaGenerator(name_map, schema_hook, ref_template)
95 schemas = tuple(gen.to_schema(t) for t in type_infos)
97 components = {
98 name_map[cls]: gen.to_schema(t, False) for cls, t in component_types.items()
99 }
100 return schemas, components
103def _collect_component_types(type_infos: Iterable[mi.Type]) -> dict[Any, mi.Type]:
104 """Find all types in the type tree that are "nameable" and worthy of being
105 extracted out into a shared top-level components mapping.
107 Currently this looks for Struct, Dataclass, NamedTuple, TypedDict, and Enum
108 types.
109 """
110 components = {}
112 def collect(t):
113 if isinstance(
114 t, (mi.StructType, mi.TypedDictType, mi.DataclassType, mi.NamedTupleType)
115 ):
116 if t.cls not in components:
117 components[t.cls] = t
118 for f in t.fields:
119 collect(f.type)
120 elif isinstance(t, mi.EnumType):
121 components[t.cls] = t
122 elif isinstance(t, mi.Metadata):
123 collect(t.type)
124 elif isinstance(t, mi.CollectionType):
125 collect(t.item_type)
126 elif isinstance(t, mi.TupleType):
127 for st in t.item_types:
128 collect(st)
129 elif isinstance(t, mi.DictType):
130 collect(t.key_type)
131 collect(t.value_type)
132 elif isinstance(t, mi.UnionType):
133 for st in t.types:
134 collect(st)
136 for t in type_infos:
137 collect(t)
139 return components
142def _type_repr(obj):
143 return obj.__name__ if isinstance(obj, type) else repr(obj)
146def _get_class_name(cls: Any) -> str:
147 if hasattr(cls, "__origin__"):
148 name = cls.__origin__.__name__
149 args = ", ".join(_type_repr(a) for a in cls.__args__)
150 return f"{name}[{args}]"
151 return cls.__name__
154def _get_doc(t: mi.Type) -> str:
155 assert hasattr(t, "cls")
156 cls = getattr(t.cls, "__origin__", t.cls)
157 doc = getattr(cls, "__doc__", "")
158 if not doc:
159 return ""
160 doc = textwrap.dedent(doc).strip("\r\n")
161 if isinstance(t, mi.EnumType):
162 if doc == "An enumeration.":
163 return ""
164 elif isinstance(t, (mi.NamedTupleType, mi.DataclassType)):
165 if doc.startswith(f"{cls.__name__}(") and doc.endswith(")"):
166 return ""
167 return doc
170def _build_name_map(component_types: dict[Any, mi.Type]) -> dict[Any, str]:
171 """A mapping from nameable subcomponents to a generated name.
173 The generated name is usually a normalized version of the class name. In
174 the case of conflicts, the name will be expanded to also include the full
175 import path.
176 """
178 def normalize(name):
179 return re.sub(r"[^a-zA-Z0-9.\-_]", "_", name)
181 def fullname(cls):
182 return normalize(f"{cls.__module__}.{cls.__qualname__}")
184 conflicts = set()
185 names: dict[str, Any] = {}
187 for cls in component_types:
188 name = normalize(_get_class_name(cls))
189 if name in names:
190 old = names.pop(name)
191 conflicts.add(name)
192 names[fullname(old)] = old
193 if name in conflicts:
194 names[fullname(cls)] = cls
195 else:
196 names[name] = cls
197 return {v: k for k, v in names.items()}
200class _SchemaGenerator:
201 def __init__(
202 self,
203 name_map: dict[Any, str],
204 schema_hook: Optional[Callable[[type], dict[str, Any]]] = None,
205 ref_template: str = "#/$defs/{name}",
206 ):
207 self.name_map = name_map
208 self.schema_hook = schema_hook
209 self.ref_template = ref_template
211 def to_schema(self, t: mi.Type, check_ref: bool = True) -> dict[str, Any]:
212 """Converts a Type to a json-schema."""
213 schema: dict[str, Any] = {}
215 while isinstance(t, mi.Metadata):
216 schema = mi._merge_json(schema, t.extra_json_schema)
217 t = t.type
219 if check_ref and hasattr(t, "cls"):
220 if name := self.name_map.get(t.cls):
221 schema["$ref"] = self.ref_template.format(name=name)
222 return schema
224 if isinstance(t, (mi.AnyType, mi.RawType)):
225 pass
226 elif isinstance(t, mi.NoneType):
227 schema["type"] = "null"
228 elif isinstance(t, mi.BoolType):
229 schema["type"] = "boolean"
230 elif isinstance(t, (mi.IntType, mi.FloatType)):
231 schema["type"] = "integer" if isinstance(t, mi.IntType) else "number"
232 if t.ge is not None:
233 schema["minimum"] = t.ge
234 if t.gt is not None:
235 schema["exclusiveMinimum"] = t.gt
236 if t.le is not None:
237 schema["maximum"] = t.le
238 if t.lt is not None:
239 schema["exclusiveMaximum"] = t.lt
240 if t.multiple_of is not None:
241 schema["multipleOf"] = t.multiple_of
242 elif isinstance(t, mi.StrType):
243 schema["type"] = "string"
244 if t.max_length is not None:
245 schema["maxLength"] = t.max_length
246 if t.min_length is not None:
247 schema["minLength"] = t.min_length
248 if t.pattern is not None:
249 schema["pattern"] = t.pattern
250 elif isinstance(t, (mi.BytesType, mi.ByteArrayType, mi.MemoryViewType)):
251 schema["type"] = "string"
252 schema["contentEncoding"] = "base64"
253 if t.max_length is not None:
254 schema["maxLength"] = 4 * ((t.max_length + 2) // 3)
255 if t.min_length is not None:
256 schema["minLength"] = 4 * ((t.min_length + 2) // 3)
257 elif isinstance(t, mi.DateTimeType):
258 schema["type"] = "string"
259 if t.tz is True:
260 schema["format"] = "date-time"
261 elif isinstance(t, mi.TimeType):
262 schema["type"] = "string"
263 if t.tz is True:
264 schema["format"] = "time"
265 elif t.tz is False:
266 schema["format"] = "partial-time"
267 elif isinstance(t, mi.DateType):
268 schema["type"] = "string"
269 schema["format"] = "date"
270 elif isinstance(t, mi.TimeDeltaType):
271 schema["type"] = "string"
272 schema["format"] = "duration"
273 elif isinstance(t, mi.UUIDType):
274 schema["type"] = "string"
275 schema["format"] = "uuid"
276 elif isinstance(t, mi.DecimalType):
277 schema["type"] = "string"
278 schema["format"] = "decimal"
279 elif isinstance(t, mi.CollectionType):
280 schema["type"] = "array"
281 if not isinstance(t.item_type, mi.AnyType):
282 schema["items"] = self.to_schema(t.item_type)
283 if t.max_length is not None:
284 schema["maxItems"] = t.max_length
285 if t.min_length is not None:
286 schema["minItems"] = t.min_length
287 elif isinstance(t, mi.TupleType):
288 schema["type"] = "array"
289 schema["minItems"] = schema["maxItems"] = len(t.item_types)
290 if t.item_types:
291 schema["prefixItems"] = [self.to_schema(i) for i in t.item_types]
292 schema["items"] = False
293 elif isinstance(t, mi.DictType):
294 schema["type"] = "object"
295 # If there are restrictions on the keys, specify them as propertyNames
296 if isinstance(key_type := t.key_type, mi.StrType):
297 property_names: dict[str, Any] = {}
298 if key_type.min_length is not None:
299 property_names["minLength"] = key_type.min_length
300 if key_type.max_length is not None:
301 property_names["maxLength"] = key_type.max_length
302 if key_type.pattern is not None:
303 property_names["pattern"] = key_type.pattern
304 if property_names:
305 schema["propertyNames"] = property_names
306 if not isinstance(t.value_type, mi.AnyType):
307 schema["additionalProperties"] = self.to_schema(t.value_type)
308 if t.max_length is not None:
309 schema["maxProperties"] = t.max_length
310 if t.min_length is not None:
311 schema["minProperties"] = t.min_length
312 elif isinstance(t, mi.UnionType):
313 structs = {}
314 other = []
315 tag_field = None
316 for subtype in t.types:
317 real_type = subtype
318 while isinstance(real_type, mi.Metadata):
319 real_type = real_type.type
320 if isinstance(real_type, mi.StructType) and not real_type.array_like:
321 tag_field = real_type.tag_field
322 structs[real_type.tag] = real_type
323 else:
324 other.append(subtype)
326 options = [self.to_schema(a) for a in other]
328 if len(structs) >= 2:
329 mapping = {
330 k: self.ref_template.format(name=self.name_map[v.cls])
331 for k, v in structs.items()
332 }
333 struct_schema = {
334 "anyOf": [self.to_schema(v) for v in structs.values()],
335 "discriminator": {"propertyName": tag_field, "mapping": mapping},
336 }
337 if options:
338 options.append(struct_schema)
339 schema["anyOf"] = options
340 else:
341 schema.update(struct_schema)
342 elif len(structs) == 1:
343 _, subtype = structs.popitem()
344 options.append(self.to_schema(subtype))
345 schema["anyOf"] = options
346 else:
347 schema["anyOf"] = options
348 elif isinstance(t, mi.LiteralType):
349 schema["enum"] = sorted(t.values)
350 elif isinstance(t, mi.EnumType):
351 schema.setdefault("title", t.cls.__name__)
352 if doc := _get_doc(t):
353 schema.setdefault("description", doc)
354 schema["enum"] = sorted(e.value for e in t.cls)
355 elif isinstance(t, mi.StructType):
356 schema.setdefault("title", _get_class_name(t.cls))
357 if doc := _get_doc(t):
358 schema.setdefault("description", doc)
359 required = []
360 names = []
361 fields = []
363 if t.tag_field is not None:
364 required.append(t.tag_field)
365 names.append(t.tag_field)
366 fields.append({"enum": [t.tag]})
368 for field in t.fields:
369 field_schema = self.to_schema(field.type)
370 if field.required:
371 required.append(field.encode_name)
372 elif field.default is not mi.NODEFAULT:
373 field_schema["default"] = to_builtins(field.default, str_keys=True)
374 elif field.default_factory in (list, dict, set, bytearray):
375 field_schema["default"] = field.default_factory()
376 names.append(field.encode_name)
377 fields.append(field_schema)
379 if t.array_like:
380 n_trailing_defaults = 0
381 for n_trailing_defaults, f in enumerate(reversed(t.fields)):
382 if f.required:
383 break
384 schema["type"] = "array"
385 schema["prefixItems"] = fields
386 schema["minItems"] = len(fields) - n_trailing_defaults
387 if t.forbid_unknown_fields:
388 schema["maxItems"] = len(fields)
389 else:
390 schema["type"] = "object"
391 schema["properties"] = dict(zip(names, fields))
392 schema["required"] = required
393 if t.forbid_unknown_fields:
394 schema["additionalProperties"] = False
395 elif isinstance(t, (mi.TypedDictType, mi.DataclassType, mi.NamedTupleType)):
396 schema.setdefault("title", _get_class_name(t.cls))
397 if doc := _get_doc(t):
398 schema.setdefault("description", doc)
399 names = []
400 fields = []
401 required = []
402 for field in t.fields:
403 field_schema = self.to_schema(field.type)
404 if field.required:
405 required.append(field.encode_name)
406 elif field.default is not mi.NODEFAULT:
407 field_schema["default"] = to_builtins(field.default, str_keys=True)
408 names.append(field.encode_name)
409 fields.append(field_schema)
410 if isinstance(t, mi.NamedTupleType):
411 schema["type"] = "array"
412 schema["prefixItems"] = fields
413 schema["minItems"] = len(required)
414 schema["maxItems"] = len(fields)
415 else:
416 schema["type"] = "object"
417 schema["properties"] = dict(zip(names, fields))
418 schema["required"] = required
419 elif isinstance(t, mi.ExtType):
420 raise TypeError("json-schema doesn't support msgpack Ext types")
421 elif isinstance(t, mi.CustomType):
422 if self.schema_hook:
423 try:
424 schema = mi._merge_json(self.schema_hook(t.cls), schema)
425 except NotImplementedError:
426 pass
427 if not schema:
428 raise TypeError(
429 "Generating JSON schema for custom types requires either:\n"
430 "- specifying a `schema_hook`\n"
431 "- annotating the type with `Meta(extra_json_schema=...)`\n"
432 "\n"
433 f"type {t.cls!r} is not supported"
434 )
435 else:
436 # This should be unreachable
437 raise TypeError(f"json-schema doesn't support type {t!r}")
439 return schema