Coverage for /pythoncovmergedfiles/medio/medio/src/pydantic/pydantic/json_schema.py: 18%

664 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-27 07:38 +0000

1from __future__ import annotations as _annotations 

2 

3import inspect 

4import math 

5import re 

6import sys 

7import warnings 

8from dataclasses import is_dataclass 

9from enum import Enum 

10from typing import ( 

11 TYPE_CHECKING, 

12 Any, 

13 Callable, 

14 Counter, 

15 Dict, 

16 Iterable, 

17 List, 

18 NewType, 

19 Sequence, 

20 Tuple, 

21 Type, 

22 Union, 

23 cast, 

24) 

25from weakref import WeakKeyDictionary 

26 

27import pydantic_core 

28from typing_extensions import Literal 

29 

30from pydantic._internal._json_schema_shared import GenerateJsonSchemaHandler 

31 

32from ._internal import _core_metadata, _core_utils, _json_schema_shared, _typing_extra 

33from .errors import PydanticInvalidForJsonSchema, PydanticUserError 

34 

35if TYPE_CHECKING: 

36 from pydantic_core import CoreSchema, CoreSchemaType, core_schema 

37 

38 from . import ConfigDict 

39 from ._internal._dataclasses import PydanticDataclass 

40 from .main import BaseModel 

41 

42 

43JsonSchemaValue = _json_schema_shared.JsonSchemaValue 

44# re export GetJsonSchemaHandler 

45GetJsonSchemaHandler = _json_schema_shared.GetJsonSchemaHandler 

46 

47 

48def update_json_schema(schema: JsonSchemaValue, updates: dict[str, Any]) -> JsonSchemaValue: 

49 """ 

50 A convenience function useful for creating `js_modify_function` functions that just set values for some keys. 

51 

52 TODO: This is basically just a wrapper for dict.update that returns the dict. 

53 Would it be better to just make this a less-"domain-specific" utility function? 

54 """ 

55 schema.update(updates) 

56 return schema 

57 

58 

59# These are "kind" labels that can be used to control warnings. See `GenerateJsonSchema.render_warning_message` 

60JsonSchemaWarningKind = Literal['skipped-choice', 'non-serializable-default'] 

61 

62 

63class PydanticJsonSchemaWarning(UserWarning): 

64 """ 

65 This class is used to emit warnings produced during JSON schema generation. 

66 See the `GenerateJsonSchema.emit_warning` and `GenerateJsonSchema.render_warning_message` 

67 methods for more details; these can be overridden to control warning behavior 

68 """ 

69 

70 

71# ##### JSON Schema Generation ##### 

72DEFAULT_REF_TEMPLATE = '#/$defs/{model}' 

73 

74# There are three types of references relevant to building JSON schemas: 

75# 1. core_schema "ref" values; these are not exposed as part of the JSON schema 

76# * these might look like the fully qualified path of a model, its id, or something similar 

77CoreRef = NewType('CoreRef', str) 

78# 2. keys of the "definitions" object that will eventually go into the JSON schema 

79# * by default, these look like "MyModel", though may change in the presence of collisions 

80# * eventually, we may want to make it easier to modify the way these names are generated 

81DefsRef = NewType('DefsRef', str) 

82# 3. the values corresponding to the "$ref" key in the schema 

83# * By default, these look like "#/$defs/MyModel", as in {"$ref": "#/$defs/MyModel"} 

84JsonRef = NewType('JsonRef', str) 

85 

86 

87class GenerateJsonSchema: 

88 # See https://json-schema.org/understanding-json-schema/reference/schema.html#id4 for more info about dialects 

89 schema_dialect = 'https://json-schema.org/draft/2020-12/schema' 

90 

91 # `self.render_warning_message` will do nothing if its argument `kind` is in `ignored_warning_kinds`; 

92 # this value can be modified on subclasses to easily control which warnings are emitted 

93 ignored_warning_kinds: set[JsonSchemaWarningKind] = {'skipped-choice'} 

94 

95 def __init__(self, by_alias: bool = True, ref_template: str = DEFAULT_REF_TEMPLATE): 

96 self.by_alias = by_alias 

97 self.ref_template = ref_template 

98 

99 self.core_to_json_refs: dict[CoreRef, JsonRef] = {} 

100 self.core_to_defs_refs: dict[CoreRef, DefsRef] = {} 

101 self.defs_to_core_refs: dict[DefsRef, CoreRef] = {} 

102 self.json_to_defs_refs: dict[JsonRef, DefsRef] = {} 

103 

104 self.definitions: dict[DefsRef, JsonSchemaValue] = {} 

105 

106 # When collisions are detected, we choose a non-colliding name 

107 # during generation, but we also track the colliding tag so that it 

108 # can be remapped for the first occurrence at the end of the process 

109 self.collisions: set[DefsRef] = set() 

110 self.defs_ref_fallbacks: dict[CoreRef, list[DefsRef]] = {} 

111 

112 self._schema_type_to_method = self.build_schema_type_to_method() 

113 

114 # This changes to True after generating a schema, to prevent issues caused by accidental re-use 

115 # of a single instance of a schema generator 

116 self._used = False 

117 

118 def build_schema_type_to_method(self) -> dict[CoreSchemaType, Callable[[CoreSchema], JsonSchemaValue]]: 

119 mapping: dict[CoreSchemaType, Callable[[CoreSchema], JsonSchemaValue]] = {} 

120 for key in _typing_extra.all_literal_values(pydantic_core.CoreSchemaType): # type: ignore[arg-type] 

121 method_name = f"{key.replace('-', '_')}_schema" 

122 try: 

123 mapping[key] = getattr(self, method_name) 

124 except AttributeError as e: 

125 raise TypeError( 

126 f'No method for generating JsonSchema for core_schema.type={key!r} ' 

127 f'(expected: {type(self).__name__}.{method_name})' 

128 ) from e 

129 return mapping 

130 

131 def generate_definitions(self, schemas: list[CoreSchema]) -> dict[DefsRef, JsonSchemaValue]: 

132 """ 

133 Given a list of core_schema, generate all JSON schema definitions, and return the generated definitions. 

134 """ 

135 if self._used: 

136 raise PydanticUserError( 

137 'This JSON schema generator has already been used to generate a JSON schema. ' 

138 f'You must create a new instance of {type(self).__name__} to generate a new JSON schema.', 

139 code='json-schema-already-used', 

140 ) 

141 for schema in schemas: 

142 self.generate_inner(schema) 

143 

144 self.resolve_collisions({}) 

145 

146 self._used = True 

147 return self.definitions 

148 

149 def generate(self, schema: CoreSchema) -> JsonSchemaValue: 

150 if self._used: 

151 raise PydanticUserError( 

152 'This JSON schema generator has already been used to generate a JSON schema. ' 

153 f'You must create a new instance of {type(self).__name__} to generate a new JSON schema.', 

154 code='json-schema-already-used', 

155 ) 

156 

157 json_schema = self.generate_inner(schema) 

158 json_ref_counts = self.get_json_ref_counts(json_schema) 

159 

160 # Remove the top-level $ref if present; note that the _generate method already ensures there are no sibling keys 

161 ref = json_schema.get('$ref') 

162 while ref is not None: # may need to unpack multiple levels 

163 ref = JsonRef(ref) 

164 ref_json_schema = self.get_schema_from_definitions(ref) 

165 if json_ref_counts[ref] > 1 or ref_json_schema is None: 

166 # Keep the ref, but use an allOf to remove the top level $ref 

167 json_schema = {'allOf': [{'$ref': ref}]} 

168 else: 

169 # "Unpack" the ref since this is the only reference 

170 json_schema = ref_json_schema.copy() # copy to prevent recursive dict reference 

171 json_ref_counts[ref] -= 1 

172 ref = json_schema.get('$ref') 

173 

174 # Remove any definitions that, thanks to $ref-substitution, are no longer present. 

175 # I think this should only _possibly_ apply to the root model, though I'm not 100% sure. 

176 # It might be safe to remove this logic, but I'm keeping it for now 

177 all_json_refs = list(self.json_to_defs_refs.keys()) 

178 for k in all_json_refs: 

179 if json_ref_counts[k] < 1: 

180 del self.definitions[self.json_to_defs_refs[k]] 

181 

182 json_schema = self.resolve_collisions(json_schema) 

183 if self.definitions: 

184 json_schema['$defs'] = self.definitions 

185 

186 # For now, we will not set the $schema key. However, if desired, this can be easily added by overriding 

187 # this method and adding the following line after a call to super().generate(schema): 

188 # json_schema['$schema'] = self.schema_dialect 

189 

190 self._used = True 

191 return json_schema 

192 

193 def generate_inner(self, schema: _core_metadata.CoreSchemaOrField) -> JsonSchemaValue: 

194 # If a schema with the same CoreRef has been handled, just return a reference to it 

195 # Note that this assumes that it will _never_ be the case that the same CoreRef is used 

196 # on types that should have different JSON schemas 

197 if 'ref' in schema: 

198 core_ref = CoreRef(schema['ref']) # type: ignore[typeddict-item] 

199 if core_ref in self.core_to_json_refs: 

200 return {'$ref': self.core_to_json_refs[core_ref]} 

201 

202 # Generate the JSON schema, accounting for the json_schema_override and core_schema_override 

203 metadata_handler = _core_metadata.CoreMetadataHandler(schema) 

204 

205 def handler_func(schema_or_field: _core_metadata.CoreSchemaOrField) -> JsonSchemaValue: 

206 # Generate the core-schema-type-specific bits of the schema generation: 

207 if _core_utils.is_typed_dict_field(schema_or_field): 

208 json_schema = self.typed_dict_field_schema(schema_or_field) 

209 elif _core_utils.is_dataclass_field(schema_or_field): 

210 json_schema = self.dataclass_field_schema(schema_or_field) 

211 elif _core_utils.is_core_schema(schema_or_field): # Ideally we wouldn't need this redundant typeguard.. 

212 generate_for_schema_type = self._schema_type_to_method[schema_or_field['type']] 

213 json_schema = generate_for_schema_type(schema_or_field) 

214 else: 

215 raise TypeError(f'Unexpected schema type: schema={schema_or_field}') 

216 # Populate the definitions 

217 if 'ref' in schema: 

218 core_ref = CoreRef(schema['ref']) # type: ignore[typeddict-item] 

219 defs_ref, ref_json_schema = self.get_cache_defs_ref_schema(core_ref) 

220 self.definitions[defs_ref] = json_schema 

221 json_schema = ref_json_schema 

222 return json_schema 

223 

224 current_handler = GenerateJsonSchemaHandler(self, handler_func) 

225 

226 for js_modify_function in metadata_handler.metadata.get('pydantic_js_functions', ()): 

227 

228 def new_handler_func( 

229 schema_or_field: _core_metadata.CoreSchemaOrField, 

230 current_handler: _core_metadata.GetJsonSchemaHandler = current_handler, 

231 js_modify_function: _core_metadata.GetJsonSchemaFunction = js_modify_function, 

232 ) -> JsonSchemaValue: 

233 return js_modify_function(schema_or_field, current_handler) 

234 

235 current_handler = GenerateJsonSchemaHandler(self, new_handler_func) 

236 

237 return current_handler(schema) 

238 

239 # ### Schema generation methods 

240 def any_schema(self, schema: core_schema.AnySchema) -> JsonSchemaValue: 

241 return {} 

242 

243 def none_schema(self, schema: core_schema.NoneSchema) -> JsonSchemaValue: 

244 return {'type': 'null'} 

245 

246 def bool_schema(self, schema: core_schema.BoolSchema) -> JsonSchemaValue: 

247 return {'type': 'boolean'} 

248 

249 def int_schema(self, schema: core_schema.IntSchema) -> JsonSchemaValue: 

250 json_schema: dict[str, Any] = {'type': 'integer'} 

251 self.update_with_validations(json_schema, schema, self.ValidationsMapping.numeric) 

252 json_schema = {k: v for k, v in json_schema.items() if v not in {math.inf, -math.inf}} 

253 return json_schema 

254 

255 def float_schema(self, schema: core_schema.FloatSchema) -> JsonSchemaValue: 

256 json_schema: dict[str, Any] = {'type': 'number'} 

257 self.update_with_validations(json_schema, schema, self.ValidationsMapping.numeric) 

258 json_schema = {k: v for k, v in json_schema.items() if v not in {math.inf, -math.inf}} 

259 return json_schema 

260 

261 def str_schema(self, schema: core_schema.StringSchema) -> JsonSchemaValue: 

262 json_schema = {'type': 'string'} 

263 self.update_with_validations(json_schema, schema, self.ValidationsMapping.string) 

264 return json_schema 

265 

266 def bytes_schema(self, schema: core_schema.BytesSchema) -> JsonSchemaValue: 

267 json_schema = {'type': 'string', 'format': 'binary'} 

268 self.update_with_validations(json_schema, schema, self.ValidationsMapping.bytes) 

269 return json_schema 

270 

271 def date_schema(self, schema: core_schema.DateSchema) -> JsonSchemaValue: 

272 json_schema = {'type': 'string', 'format': 'date'} 

273 self.update_with_validations(json_schema, schema, self.ValidationsMapping.date) 

274 return json_schema 

275 

276 def time_schema(self, schema: core_schema.TimeSchema) -> JsonSchemaValue: 

277 return {'type': 'string', 'format': 'time'} 

278 

279 def datetime_schema(self, schema: core_schema.DatetimeSchema) -> JsonSchemaValue: 

280 return {'type': 'string', 'format': 'date-time'} 

281 

282 def timedelta_schema(self, schema: core_schema.TimedeltaSchema) -> JsonSchemaValue: 

283 # It's weird that this schema has 'type': 'number' but also specifies a 'format'. 

284 # Relevant issue: https://github.com/pydantic/pydantic/issues/5034 

285 # TODO: Probably should just change this to str (look at readme intro for speeddate) 

286 return {'type': 'number', 'format': 'time-delta'} 

287 

288 def literal_schema(self, schema: core_schema.LiteralSchema) -> JsonSchemaValue: 

289 expected = [v.value if isinstance(v, Enum) else v for v in schema['expected']] 

290 

291 if len(expected) == 1: 

292 return {'const': expected[0]} 

293 else: 

294 return {'enum': expected} 

295 

296 def is_instance_schema(self, schema: core_schema.IsInstanceSchema) -> JsonSchemaValue: 

297 return self.handle_invalid_for_json_schema(schema, f'core_schema.IsInstanceSchema ({schema["cls"]})') 

298 

299 def is_subclass_schema(self, schema: core_schema.IsSubclassSchema) -> JsonSchemaValue: 

300 return {} # TODO: This was for compatibility with V1 -- is this the right thing to do? 

301 

302 def callable_schema(self, schema: core_schema.CallableSchema) -> JsonSchemaValue: 

303 return self.handle_invalid_for_json_schema(schema, 'core_schema.CallableSchema') 

304 

305 def list_schema(self, schema: core_schema.ListSchema) -> JsonSchemaValue: 

306 items_schema = {} if 'items_schema' not in schema else self.generate_inner(schema['items_schema']) 

307 json_schema = {'type': 'array', 'items': items_schema} 

308 self.update_with_validations(json_schema, schema, self.ValidationsMapping.array) 

309 return json_schema 

310 

311 def tuple_positional_schema(self, schema: core_schema.TuplePositionalSchema) -> JsonSchemaValue: 

312 json_schema: JsonSchemaValue = {'type': 'array'} 

313 json_schema['minItems'] = len(schema['items_schema']) 

314 prefixItems = [self.generate_inner(item) for item in schema['items_schema']] 

315 if prefixItems: 

316 json_schema['prefixItems'] = prefixItems 

317 if 'extra_schema' in schema: 

318 json_schema['items'] = self.generate_inner(schema['extra_schema']) 

319 else: 

320 json_schema['maxItems'] = len(schema['items_schema']) 

321 self.update_with_validations(json_schema, schema, self.ValidationsMapping.array) 

322 return json_schema 

323 

324 def tuple_variable_schema(self, schema: core_schema.TupleVariableSchema) -> JsonSchemaValue: 

325 json_schema: JsonSchemaValue = {'type': 'array', 'items': {}} 

326 if 'items_schema' in schema: 

327 json_schema['items'] = self.generate_inner(schema['items_schema']) 

328 self.update_with_validations(json_schema, schema, self.ValidationsMapping.array) 

329 return json_schema 

330 

331 def set_schema(self, schema: core_schema.SetSchema) -> JsonSchemaValue: 

332 return self._common_set_schema(schema) 

333 

334 def frozenset_schema(self, schema: core_schema.FrozenSetSchema) -> JsonSchemaValue: 

335 return self._common_set_schema(schema) 

336 

337 def _common_set_schema(self, schema: core_schema.SetSchema | core_schema.FrozenSetSchema) -> JsonSchemaValue: 

338 items_schema = {} if 'items_schema' not in schema else self.generate_inner(schema['items_schema']) 

339 json_schema = {'type': 'array', 'uniqueItems': True, 'items': items_schema} 

340 self.update_with_validations(json_schema, schema, self.ValidationsMapping.array) 

341 return json_schema 

342 

343 def generator_schema(self, schema: core_schema.GeneratorSchema) -> JsonSchemaValue: 

344 items_schema = {} if 'items_schema' not in schema else self.generate_inner(schema['items_schema']) 

345 json_schema = {'type': 'array', 'items': items_schema} 

346 self.update_with_validations(json_schema, schema, self.ValidationsMapping.array) 

347 return json_schema 

348 

349 def dict_schema(self, schema: core_schema.DictSchema) -> JsonSchemaValue: 

350 json_schema: JsonSchemaValue = {'type': 'object'} 

351 

352 keys_schema = self.generate_inner(schema['keys_schema']).copy() if 'keys_schema' in schema else {} 

353 keys_pattern = keys_schema.pop('pattern', None) 

354 

355 values_schema = self.generate_inner(schema['values_schema']).copy() if 'values_schema' in schema else {} 

356 values_schema.pop('title', None) # don't give a title to the additionalProperties 

357 if values_schema or keys_pattern is not None: # don't add additionalProperties if it's empty 

358 if keys_pattern is None: 

359 json_schema['additionalProperties'] = values_schema 

360 else: 

361 json_schema['patternProperties'] = {keys_pattern: values_schema} 

362 

363 self.update_with_validations(json_schema, schema, self.ValidationsMapping.object) 

364 return json_schema 

365 

366 def _function_schema( 

367 self, 

368 schema: _core_utils.AnyFunctionSchema, 

369 ) -> JsonSchemaValue: 

370 if _core_utils.is_function_with_inner_schema(schema): 

371 # I'm not sure if this might need to be different if the function's mode is 'before' 

372 return self.generate_inner(schema['schema']) 

373 # function-plain 

374 return self.handle_invalid_for_json_schema( 

375 schema, f'core_schema.PlainValidatorFunctionSchema ({schema["function"]})' 

376 ) 

377 

378 def function_before_schema(self, schema: core_schema.BeforeValidatorFunctionSchema) -> JsonSchemaValue: 

379 return self._function_schema(schema) 

380 

381 def function_after_schema(self, schema: core_schema.AfterValidatorFunctionSchema) -> JsonSchemaValue: 

382 return self._function_schema(schema) 

383 

384 def function_plain_schema(self, schema: core_schema.PlainValidatorFunctionSchema) -> JsonSchemaValue: 

385 return self._function_schema(schema) 

386 

387 def function_wrap_schema(self, schema: core_schema.WrapValidatorFunctionSchema) -> JsonSchemaValue: 

388 return self._function_schema(schema) 

389 

390 def default_schema(self, schema: core_schema.WithDefaultSchema) -> JsonSchemaValue: 

391 json_schema = self.generate_inner(schema['schema']) 

392 

393 if 'default' in schema: 

394 default = schema['default'] 

395 elif 'default_factory' in schema: 

396 default = schema['default_factory']() 

397 else: 

398 raise ValueError('`schema` has neither default nor default_factory') 

399 

400 try: 

401 encoded_default = self.encode_default(default) 

402 except pydantic_core.PydanticSerializationError: 

403 self.emit_warning( 

404 'non-serializable-default', 

405 f'Default value {default} is not JSON serializable; excluding default from JSON schema', 

406 ) 

407 # Return the inner schema, as though there was no default 

408 return json_schema 

409 

410 if '$ref' in json_schema: 

411 # Since reference schemas do not support child keys, we wrap the reference schema in a single-case allOf: 

412 return {'allOf': [json_schema], 'default': encoded_default} 

413 else: 

414 json_schema['default'] = encoded_default 

415 return json_schema 

416 

417 def nullable_schema(self, schema: core_schema.NullableSchema) -> JsonSchemaValue: 

418 null_schema = {'type': 'null'} 

419 inner_json_schema = self.generate_inner(schema['schema']) 

420 

421 if inner_json_schema == null_schema: 

422 return null_schema 

423 else: 

424 # Thanks to the equality check against `null_schema` above, I think 'oneOf' would also be valid here; 

425 # I'll use 'anyOf' for now, but it could be changed it if it would work better with some external tooling 

426 return self.get_flattened_anyof([inner_json_schema, null_schema]) 

427 

428 def union_schema(self, schema: core_schema.UnionSchema) -> JsonSchemaValue: 

429 generated: list[JsonSchemaValue] = [] 

430 

431 choices = schema['choices'] 

432 for s in choices: 

433 try: 

434 generated.append(self.generate_inner(s)) 

435 except PydanticInvalidForJsonSchema as exc: 

436 self.emit_warning('skipped-choice', exc.message) 

437 if len(generated) == 1: 

438 return generated[0] 

439 return self.get_flattened_anyof(generated) 

440 

441 def tagged_union_schema(self, schema: core_schema.TaggedUnionSchema) -> JsonSchemaValue: 

442 generated: dict[str, JsonSchemaValue] = {} 

443 for k, v in schema['choices'].items(): 

444 if not isinstance(v, (str, int)): 

445 try: 

446 # Use str(k) since keys must be strings for json; while not technically correct, 

447 # it's the closest that can be represented in valid JSON 

448 generated[str(k)] = self.generate_inner(v).copy() 

449 except PydanticInvalidForJsonSchema as exc: 

450 self.emit_warning('skipped-choice', exc.message) 

451 

452 # Populate the schema with any "indirect" references 

453 for k, v in schema['choices'].items(): 

454 if isinstance(v, (str, int)): 

455 while isinstance(schema['choices'][v], (str, int)): 

456 v = schema['choices'][v] 

457 assert isinstance(v, (int, str)) 

458 if str(v) in generated: 

459 # while it might seem unnecessary to check `if str(v) in generated`, a PydanticInvalidForJsonSchema 

460 # may have been raised above, which would mean that the schema we want to reference won't be present 

461 generated[str(k)] = generated[str(v)] 

462 

463 one_of_choices = _deduplicate_schemas(generated.values()) 

464 json_schema: JsonSchemaValue = {'oneOf': one_of_choices} 

465 

466 # This reflects the v1 behavior; TODO: we should make it possible to exclude OpenAPI stuff from the JSON schema 

467 openapi_discriminator = self._extract_discriminator(schema, one_of_choices) 

468 if openapi_discriminator is not None: 

469 json_schema['discriminator'] = { 

470 'propertyName': openapi_discriminator, 

471 'mapping': {k: v.get('$ref', v) for k, v in generated.items()}, 

472 } 

473 

474 return json_schema 

475 

476 def _extract_discriminator( 

477 self, schema: core_schema.TaggedUnionSchema, one_of_choices: list[_JsonDict] 

478 ) -> str | None: 

479 """ 

480 Extract a compatible OpenAPI discriminator from the schema and one_of choices that end up in the final schema. 

481 """ 

482 openapi_discriminator: str | None = None 

483 if 'discriminator' not in schema: 

484 return None 

485 

486 if isinstance(schema['discriminator'], str): 

487 return schema['discriminator'] 

488 

489 if isinstance(schema['discriminator'], list): 

490 # If the discriminator is a single item list containing a string, that is equivalent to the string case 

491 if len(schema['discriminator']) == 1 and isinstance(schema['discriminator'][0], str): 

492 return schema['discriminator'][0] 

493 # When an alias is used that is different from the field name, the discriminator will be a list of single 

494 # str lists, one for the attribute and one for the actual alias. The logic here will work even if there is 

495 # more than one possible attribute, and looks for whether a single alias choice is present as a documented 

496 # property on all choices. If so, that property will be used as the OpenAPI discriminator. 

497 for alias_path in schema['discriminator']: 

498 if not isinstance(alias_path, list): 

499 break # this means that the discriminator is not a list of alias paths 

500 if len(alias_path) != 1: 

501 continue # this means that the "alias" does not represent a single field 

502 alias = alias_path[0] 

503 if not isinstance(alias, str): 

504 continue # this means that the "alias" does not represent a field 

505 alias_is_present_on_all_choices = True 

506 for choice in one_of_choices: 

507 while '$ref' in choice: 

508 assert isinstance(choice['$ref'], str) 

509 choice = self.get_schema_from_definitions(JsonRef(choice['$ref'])) or {} 

510 properties = choice.get('properties', {}) 

511 if not isinstance(properties, dict) or alias not in properties: 

512 alias_is_present_on_all_choices = False 

513 break 

514 if alias_is_present_on_all_choices: 

515 openapi_discriminator = alias 

516 break 

517 return openapi_discriminator 

518 

519 def chain_schema(self, schema: core_schema.ChainSchema) -> JsonSchemaValue: 

520 try: 

521 # Note: If we wanted to generate a schema for the _serialization_, would want to use the _last_ step: 

522 return self.generate_inner(schema['steps'][0]) 

523 except IndexError as e: 

524 raise ValueError('Cannot generate a JsonSchema for a zero-step ChainSchema') from e 

525 

526 def lax_or_strict_schema(self, schema: core_schema.LaxOrStrictSchema) -> JsonSchemaValue: 

527 """ 

528 LaxOrStrict will use the strict branch for serialization internally, 

529 unless it was overridden here. 

530 """ 

531 # TODO: Need to read the default value off of model config or whatever 

532 use_strict = schema.get('strict', False) # TODO: replace this default False 

533 # If your JSON schema fails to generate it is probably 

534 # because one of the following two branches failed. 

535 if use_strict: 

536 return self.generate_inner(schema['strict_schema']) 

537 else: 

538 return self.generate_inner(schema['lax_schema']) 

539 

540 def typed_dict_schema(self, schema: core_schema.TypedDictSchema) -> JsonSchemaValue: 

541 named_required_fields = [ 

542 (k, v['required'], v) for k, v in schema['fields'].items() # type: ignore # required is always populated 

543 ] 

544 return self._named_required_fields_schema(named_required_fields) 

545 

546 def _named_required_fields_schema( 

547 self, named_required_fields: Sequence[tuple[str, bool, core_schema.TypedDictField | core_schema.DataclassField]] 

548 ) -> JsonSchemaValue: 

549 properties: dict[str, JsonSchemaValue] = {} 

550 required_fields: list[str] = [] 

551 for name, required, field in named_required_fields: 

552 if self.by_alias: 

553 alias: Any = field.get('validation_alias', name) 

554 if isinstance(alias, str): 

555 name = alias 

556 elif isinstance(alias, list): 

557 alias = cast('list[str] | str', alias) 

558 for path in alias: 

559 if isinstance(path, list) and len(path) == 1 and isinstance(path[0], str): 

560 # Use the first valid single-item string path; the code that constructs the alias array 

561 # should ensure the first such item is what belongs in the JSON schema 

562 name = path[0] 

563 break 

564 field_json_schema = self.generate_inner(field).copy() 

565 if 'title' not in field_json_schema and self.field_title_should_be_set(field): 

566 title = self.get_title_from_name(name) 

567 field_json_schema['title'] = title 

568 field_json_schema = self.handle_ref_overrides(field_json_schema) 

569 properties[name] = field_json_schema 

570 if required: 

571 required_fields.append(name) 

572 

573 json_schema = {'type': 'object', 'properties': properties} 

574 if required_fields: 

575 json_schema['required'] = required_fields # type: ignore 

576 return json_schema 

577 

578 def typed_dict_field_schema(self, schema: core_schema.TypedDictField) -> JsonSchemaValue: 

579 return self.generate_inner(schema['schema']) 

580 

581 def dataclass_field_schema(self, schema: core_schema.DataclassField) -> JsonSchemaValue: 

582 return self.generate_inner(schema['schema']) 

583 

584 def model_schema(self, schema: core_schema.ModelSchema) -> JsonSchemaValue: 

585 # We do not use schema['model'].model_json_schema() because it could lead to inconsistent refs handling, etc. 

586 json_schema = self.generate_inner(schema['schema']) 

587 

588 if 'config' in schema: 

589 title = schema['config'].get('title') 

590 forbid_additional_properties = schema['config'].get('extra_fields_behavior') == 'forbid' 

591 json_schema = self._update_class_schema(json_schema, title, forbid_additional_properties) 

592 

593 return json_schema 

594 

595 def _update_class_schema( 

596 self, json_schema: JsonSchemaValue, title: str | None, forbid_additional_properties: bool 

597 ) -> JsonSchemaValue: 

598 if '$ref' in json_schema: 

599 schema_to_update = self.get_schema_from_definitions(JsonRef(json_schema['$ref'])) or json_schema 

600 else: 

601 schema_to_update = json_schema 

602 

603 if title is not None: 

604 # referenced_schema['title'] = title 

605 schema_to_update.setdefault('title', title) 

606 

607 if forbid_additional_properties: 

608 schema_to_update['additionalProperties'] = False 

609 

610 return json_schema 

611 

612 def dataclass_args_schema(self, schema: core_schema.DataclassArgsSchema) -> JsonSchemaValue: 

613 named_required_fields = [ 

614 (field['name'], field['schema']['type'] != 'default', field) for field in schema['fields'] 

615 ] 

616 return self._named_required_fields_schema(named_required_fields) 

617 

618 def dataclass_schema(self, schema: core_schema.DataclassSchema) -> JsonSchemaValue: 

619 # TODO: Better-share this logic with model_schema 

620 # I'd prefer to clean this up _after_ we rework the approach to customizing dataclass JSON schema though 

621 

622 json_schema = self.generate_inner(schema['schema']).copy() 

623 

624 cls = schema['cls'] 

625 config: ConfigDict = getattr(cls, '__pydantic_config__', cast('ConfigDict', {})) 

626 

627 title = config.get('title') or cls.__name__ 

628 forbid_additional_properties = config.get('extra') == 'forbid' 

629 json_schema = self._update_class_schema(json_schema, title, forbid_additional_properties) 

630 

631 # Dataclass-specific handling of description 

632 if is_dataclass(cls) and not hasattr(cls, '__pydantic_validator__'): 

633 # vanilla dataclass; don't use cls.__doc__ as it will contain the class signature by default 

634 description = None 

635 else: 

636 description = None if cls.__doc__ is None else inspect.cleandoc(cls.__doc__) 

637 if description: 

638 json_schema['description'] = description 

639 

640 return json_schema 

641 

642 def arguments_schema(self, schema: core_schema.ArgumentsSchema) -> JsonSchemaValue: 

643 metadata = _core_metadata.CoreMetadataHandler(schema).metadata 

644 prefer_positional = metadata.get('pydantic_js_prefer_positional_arguments') 

645 

646 arguments = schema['arguments_schema'] 

647 kw_only_arguments = [a for a in arguments if a.get('mode') == 'keyword_only'] 

648 kw_or_p_arguments = [a for a in arguments if a.get('mode') in {'positional_or_keyword', None}] 

649 p_only_arguments = [a for a in arguments if a.get('mode') == 'positional_only'] 

650 var_args_schema = schema.get('var_args_schema') 

651 var_kwargs_schema = schema.get('var_kwargs_schema') 

652 

653 if prefer_positional: 

654 positional_possible = not kw_only_arguments and not var_kwargs_schema 

655 if positional_possible: 

656 return self.p_arguments_schema(p_only_arguments + kw_or_p_arguments, var_args_schema) 

657 

658 keyword_possible = not p_only_arguments and not var_args_schema 

659 if keyword_possible: 

660 return self.kw_arguments_schema(kw_or_p_arguments + kw_only_arguments, var_kwargs_schema) 

661 

662 if not prefer_positional: 

663 positional_possible = not kw_only_arguments and not var_kwargs_schema 

664 if positional_possible: 

665 return self.p_arguments_schema(p_only_arguments + kw_or_p_arguments, var_args_schema) 

666 

667 raise PydanticInvalidForJsonSchema( 

668 'Unable to generate JSON schema for arguments validator with positional only and keyword only arguments' 

669 ) 

670 

671 def kw_arguments_schema( 

672 self, arguments: list[core_schema.ArgumentsParameter], var_kwargs_schema: CoreSchema | None 

673 ) -> JsonSchemaValue: 

674 properties: dict[str, JsonSchemaValue] = {} 

675 required: list[str] = [] 

676 for argument in arguments: 

677 name = self.get_argument_name(argument) 

678 argument_schema = self.generate_inner(argument['schema']).copy() 

679 argument_schema['title'] = self.get_title_from_name(name) 

680 properties[name] = argument_schema 

681 

682 if argument['schema']['type'] != 'default': 

683 # This assumes that if the argument has a default value, 

684 # the inner schema must be of type WithDefaultSchema. 

685 # I believe this is true, but I am not 100% sure 

686 required.append(name) 

687 

688 json_schema: JsonSchemaValue = {'type': 'object', 'properties': properties} 

689 if required: 

690 json_schema['required'] = required 

691 

692 if var_kwargs_schema: 

693 additional_properties_schema = self.generate_inner(var_kwargs_schema) 

694 if additional_properties_schema: 

695 json_schema['additionalProperties'] = additional_properties_schema 

696 else: 

697 json_schema['additionalProperties'] = False 

698 return json_schema 

699 

700 def p_arguments_schema( 

701 self, arguments: list[core_schema.ArgumentsParameter], var_args_schema: CoreSchema | None 

702 ) -> JsonSchemaValue: 

703 prefix_items: list[JsonSchemaValue] = [] 

704 min_items = 0 

705 

706 for argument in arguments: 

707 name = self.get_argument_name(argument) 

708 

709 argument_schema = self.generate_inner(argument['schema']).copy() 

710 argument_schema['title'] = self.get_title_from_name(name) 

711 prefix_items.append(argument_schema) 

712 

713 if argument['schema']['type'] != 'default': 

714 # This assumes that if the argument has a default value, 

715 # the inner schema must be of type WithDefaultSchema. 

716 # I believe this is true, but I am not 100% sure 

717 min_items += 1 

718 

719 json_schema: JsonSchemaValue = {'type': 'array', 'prefixItems': prefix_items} 

720 if min_items: 

721 json_schema['minItems'] = min_items 

722 

723 if var_args_schema: 

724 items_schema = self.generate_inner(var_args_schema) 

725 if items_schema: 

726 json_schema['items'] = items_schema 

727 else: 

728 json_schema['maxItems'] = len(prefix_items) 

729 

730 return json_schema 

731 

732 def get_argument_name(self, argument: core_schema.ArgumentsParameter) -> str: 

733 name = argument['name'] 

734 if self.by_alias: 

735 alias = argument.get('alias') 

736 if isinstance(alias, str): 

737 name = alias 

738 else: 

739 pass # might want to do something else? 

740 return name 

741 

742 def call_schema(self, schema: core_schema.CallSchema) -> JsonSchemaValue: 

743 return self.generate_inner(schema['arguments_schema']) 

744 

745 def custom_error_schema(self, schema: core_schema.CustomErrorSchema) -> JsonSchemaValue: 

746 return self.generate_inner(schema['schema']) 

747 

748 def json_schema(self, schema: core_schema.JsonSchema) -> JsonSchemaValue: 

749 # TODO: For v1 compatibility, we should probably be using `schema['schema']` to produce the schema. 

750 # This is a serialization vs. validation thing; see https://github.com/pydantic/pydantic/issues/5072 

751 # - 

752 # The behavior below is not currently consistent with the v1 behavior, so should probably be changed. 

753 # I think making it work like v1 should be as easy as handling schema['schema'] instead, with the note 

754 # that we'll need to make generics work with Json (there is a test for this in test_generics.py). 

755 return {'type': 'string', 'format': 'json-string'} 

756 

757 def url_schema(self, schema: core_schema.UrlSchema) -> JsonSchemaValue: 

758 json_schema = {'type': 'string', 'format': 'uri', 'minLength': 1} 

759 self.update_with_validations(json_schema, schema, self.ValidationsMapping.string) 

760 return json_schema 

761 

762 def multi_host_url_schema(self, schema: core_schema.MultiHostUrlSchema) -> JsonSchemaValue: 

763 # Note: 'multi-host-uri' is a custom/pydantic-specific format, not part of the JSON Schema spec 

764 json_schema = {'type': 'string', 'format': 'multi-host-uri', 'minLength': 1} 

765 self.update_with_validations(json_schema, schema, self.ValidationsMapping.string) 

766 return json_schema 

767 

768 def definitions_schema(self, schema: core_schema.DefinitionsSchema) -> JsonSchemaValue: 

769 for definition in schema['definitions']: 

770 self.generate_inner(definition) 

771 return self.generate_inner(schema['schema']) 

772 

773 def definition_ref_schema(self, schema: core_schema.DefinitionReferenceSchema) -> JsonSchemaValue: 

774 core_ref = CoreRef(schema['schema_ref']) 

775 _, ref_json_schema = self.get_cache_defs_ref_schema(core_ref) 

776 return ref_json_schema 

777 

778 # ### Utility methods 

779 

780 def get_title_from_name(self, name: str) -> str: 

781 return name.title().replace('_', ' ') 

782 

783 def field_title_should_be_set( 

784 self, schema: CoreSchema | core_schema.TypedDictField | core_schema.DataclassField 

785 ) -> bool: 

786 """ 

787 Returns true if a field with the given schema should have a title set based on the field name. 

788 

789 Intuitively, we want this to return true for schemas that wouldn't otherwise provide their own title 

790 (e.g., int, float, str), and false for those that would (e.g., BaseModel subclasses). 

791 """ 

792 if _core_utils.is_typed_dict_field(schema) or _core_utils.is_dataclass_field(schema): 

793 return self.field_title_should_be_set(schema['schema']) 

794 

795 elif _core_utils.is_core_schema(schema): 

796 if schema.get('ref'): # things with refs, such as models and enums, should not have titles set 

797 return False 

798 if schema['type'] in {'default', 'nullable', 'definitions'}: 

799 return self.field_title_should_be_set(schema['schema']) # type: ignore[typeddict-item] 

800 if _core_utils.is_function_with_inner_schema(schema): 

801 return self.field_title_should_be_set(schema['schema']) 

802 if schema['type'] == 'definition-ref': 

803 # Referenced schemas should not have titles set for the same reason 

804 # schemas with refs should not 

805 return False 

806 return True # anything else should have title set 

807 

808 else: 

809 raise PydanticInvalidForJsonSchema(f'Unexpected schema type: schema={schema}') 

810 

811 def normalize_name(self, name: str) -> str: 

812 return re.sub(r'[^a-zA-Z0-9.\-_]', '_', name).replace('.', '__') 

813 

814 def get_defs_ref(self, core_ref: CoreRef) -> DefsRef: 

815 """ 

816 Override this method to change the way that definitions keys are generated from a core reference. 

817 """ 

818 # Split the core ref into "components"; generic origins and arguments are each separate components 

819 components = re.split(r'([\][,])', core_ref) 

820 # Remove IDs from each component 

821 components = [x.split(':')[0] for x in components] 

822 core_ref_no_id = ''.join(components) 

823 # Remove everything before the last period from each "component" 

824 components = [re.sub(r'(?:[^.[\]]+\.)+((?:[^.[\]]+))', r'\1', x) for x in components] 

825 short_ref = ''.join(components) 

826 

827 first_choice = DefsRef(self.normalize_name(short_ref)) # name 

828 second_choice = DefsRef(self.normalize_name(core_ref_no_id)) # module + qualname 

829 third_choice = DefsRef(self.normalize_name(core_ref)) # module + qualname + id 

830 

831 # It is important that the generated defs_ref values be such that at least one could not 

832 # be generated for any other core_ref. Currently, this should be the case because we include 

833 # the id of the source type in the core_ref, and therefore in the third_choice 

834 choices = [first_choice, second_choice, third_choice] 

835 self.defs_ref_fallbacks[core_ref] = choices[1:] 

836 

837 for choice in choices: 

838 if self.defs_to_core_refs.get(choice, core_ref) == core_ref: 

839 return choice 

840 else: 

841 self.collisions.add(choice) 

842 

843 return choices[-1] # should never get here if the final choice is guaranteed unique 

844 

845 def resolve_collisions(self, json_schema: JsonSchemaValue) -> JsonSchemaValue: 

846 """ 

847 This function ensures that any defs_ref's that were involved in collisions 

848 (due to simplification of the core_ref) get updated, even if they were the 

849 first occurrence of the colliding defs_ref. 

850 

851 This is intended to prevent confusion where the type that gets the "shortened" 

852 ref depends on the order in which the types were visited. 

853 """ 

854 made_changes = True 

855 

856 # Note that because the defs ref choices eventually produce values that use the IDs and 

857 # should _never_ collide, it should not be possible for this while loop to run forever 

858 while made_changes: 

859 made_changes = False 

860 

861 for defs_ref, core_ref in self.defs_to_core_refs.items(): 

862 if defs_ref not in self.collisions: 

863 continue 

864 

865 for choice in self.defs_ref_fallbacks[core_ref]: 

866 if choice == defs_ref or choice in self.collisions: 

867 continue 

868 

869 if self.defs_to_core_refs.get(choice, core_ref) == core_ref: 

870 json_schema = self.change_defs_ref(defs_ref, choice, json_schema) 

871 made_changes = True 

872 break 

873 else: 

874 self.collisions.add(choice) 

875 

876 if made_changes: 

877 break 

878 

879 return json_schema 

880 

881 def change_defs_ref(self, old: DefsRef, new: DefsRef, json_schema: JsonSchemaValue) -> JsonSchemaValue: 

882 if new == old: 

883 return json_schema 

884 core_ref = self.defs_to_core_refs[old] 

885 old_json_ref = self.core_to_json_refs[core_ref] 

886 new_json_ref = JsonRef(self.ref_template.format(model=new)) 

887 

888 self.definitions[new] = self.definitions.pop(old) 

889 self.defs_to_core_refs[new] = self.defs_to_core_refs.pop(old) 

890 self.json_to_defs_refs[new_json_ref] = self.json_to_defs_refs.pop(old_json_ref) 

891 self.core_to_defs_refs[core_ref] = new 

892 self.core_to_json_refs[core_ref] = new_json_ref 

893 

894 def walk_replace_json_schema_ref(item: Any) -> Any: 

895 """ 

896 Recursively update the JSON schema to use the new defs_ref. 

897 """ 

898 if isinstance(item, list): 

899 return [walk_replace_json_schema_ref(item) for item in item] 

900 elif isinstance(item, dict): 

901 ref = item.get('$ref') 

902 if ref == old_json_ref: 

903 item['$ref'] = new_json_ref 

904 return {k: walk_replace_json_schema_ref(v) for k, v in item.items()} 

905 else: 

906 return item 

907 

908 return walk_replace_json_schema_ref(json_schema) 

909 

910 def get_cache_defs_ref_schema(self, core_ref: CoreRef) -> tuple[DefsRef, JsonSchemaValue]: 

911 """ 

912 This method wraps the get_defs_ref method with some cache-lookup/population logic, 

913 and returns both the produced defs_ref and the JSON schema that will refer to the right definition. 

914 """ 

915 maybe_defs_ref = self.core_to_defs_refs.get(core_ref) 

916 if maybe_defs_ref is not None: 

917 json_ref = self.core_to_json_refs[core_ref] 

918 return maybe_defs_ref, {'$ref': json_ref} 

919 

920 defs_ref = self.get_defs_ref(core_ref) 

921 

922 # populate the ref translation mappings 

923 self.core_to_defs_refs[core_ref] = defs_ref 

924 self.defs_to_core_refs[defs_ref] = core_ref 

925 

926 json_ref = JsonRef(self.ref_template.format(model=defs_ref)) 

927 self.core_to_json_refs[core_ref] = json_ref 

928 self.json_to_defs_refs[json_ref] = defs_ref 

929 ref_json_schema = {'$ref': json_ref} 

930 return defs_ref, ref_json_schema 

931 

932 def handle_ref_overrides(self, json_schema: JsonSchemaValue) -> JsonSchemaValue: 

933 """ 

934 It is not valid for a schema with a top-level $ref to have sibling keys. 

935 

936 During our own schema generation, we treat sibling keys as overrides to the referenced schema, 

937 but this is not how the official JSON schema spec works. 

938 

939 Because of this, we first remove any sibling keys that are redundant with the referenced schema, then if 

940 any remain, we transform the schema from a top-level '$ref' to use allOf to move the $ref out of the top level. 

941 (See bottom of https://swagger.io/docs/specification/using-ref/ for a reference about this behavior) 

942 """ 

943 if '$ref' in json_schema: 

944 # prevent modifications to the input; this copy may be safe to drop if there is significant overhead 

945 json_schema = json_schema.copy() 

946 

947 referenced_json_schema = self.get_schema_from_definitions(JsonRef(json_schema['$ref'])) 

948 if referenced_json_schema is None: 

949 # This can happen when building schemas for models with not-yet-defined references. 

950 # It may be a good idea to do a recursive pass at the end of the generation to remove 

951 # any redundant override keys. 

952 if len(json_schema) > 1: 

953 # Make it an allOf to at least resolve the sibling keys issue 

954 json_schema = json_schema.copy() 

955 json_schema.setdefault('allOf', []) 

956 json_schema['allOf'].append({'$ref': json_schema['$ref']}) 

957 del json_schema['$ref'] 

958 

959 return json_schema 

960 for k, v in list(json_schema.items()): 

961 if k == '$ref': 

962 continue 

963 if k in referenced_json_schema and referenced_json_schema[k] == v: 

964 del json_schema[k] # redundant key 

965 if len(json_schema) > 1: 

966 # There is a remaining "override" key, so we need to move $ref out of the top level 

967 json_ref = JsonRef(json_schema['$ref']) 

968 del json_schema['$ref'] 

969 assert 'allOf' not in json_schema # this should never happen, but just in case 

970 json_schema['allOf'] = [{'$ref': json_ref}] 

971 

972 return json_schema 

973 

974 def get_schema_from_definitions(self, json_ref: JsonRef) -> JsonSchemaValue | None: 

975 return self.definitions.get(self.json_to_defs_refs[json_ref]) 

976 

977 def encode_default(self, dft: Any) -> Any: 

978 return pydantic_core.to_jsonable_python(dft) 

979 

980 def update_with_validations( 

981 self, json_schema: JsonSchemaValue, core_schema: CoreSchema, mapping: dict[str, str] 

982 ) -> None: 

983 """ 

984 Update the json_schema with the corresponding validations specified in the core_schema, 

985 using the provided mapping to translate keys in core_schema to the appropriate keys for a JSON schema. 

986 """ 

987 for core_key, json_schema_key in mapping.items(): 

988 if core_key in core_schema: 

989 json_schema[json_schema_key] = core_schema[core_key] # type: ignore[literal-required] 

990 

991 class ValidationsMapping: 

992 """ 

993 This class just contains mappings from core_schema attribute names to the corresponding 

994 JSON schema attribute names. While I suspect it is unlikely to be necessary, you can in 

995 principle override this class in a subclass of GenerateJsonSchema (by inheriting from 

996 GenerateJsonSchema.ValidationsMapping) to change these mappings. 

997 """ 

998 

999 numeric = { 

1000 'multiple_of': 'multipleOf', 

1001 'le': 'maximum', 

1002 'ge': 'minimum', 

1003 'lt': 'exclusiveMaximum', 

1004 'gt': 'exclusiveMinimum', 

1005 } 

1006 bytes = { 

1007 'min_length': 'minLength', 

1008 'max_length': 'maxLength', 

1009 } 

1010 string = { 

1011 'min_length': 'minLength', 

1012 'max_length': 'maxLength', 

1013 'pattern': 'pattern', 

1014 } 

1015 array = { 

1016 'min_length': 'minItems', 

1017 'max_length': 'maxItems', 

1018 } 

1019 object = { 

1020 'min_length': 'minProperties', 

1021 'max_length': 'maxProperties', 

1022 } 

1023 date = { 

1024 'le': 'maximum', 

1025 'ge': 'minimum', 

1026 'lt': 'exclusiveMaximum', 

1027 'gt': 'exclusiveMinimum', 

1028 } 

1029 

1030 def get_flattened_anyof(self, schemas: list[JsonSchemaValue]) -> JsonSchemaValue: 

1031 members = [] 

1032 for schema in schemas: 

1033 if len(schema) == 1 and 'anyOf' in schema: 

1034 members.extend(schema['anyOf']) 

1035 else: 

1036 members.append(schema) 

1037 members = _deduplicate_schemas(members) 

1038 if len(members) == 1: 

1039 return members[0] 

1040 return {'anyOf': members} 

1041 

1042 def get_json_ref_counts(self, json_schema: JsonSchemaValue) -> dict[JsonRef, int]: 

1043 """ 

1044 Get all values corresponding to the key '$ref' anywhere in the json_schema 

1045 """ 

1046 json_refs: dict[JsonRef, int] = Counter() 

1047 

1048 def _add_json_refs(schema: Any) -> None: 

1049 if isinstance(schema, dict): 

1050 if '$ref' in schema: 

1051 json_ref = JsonRef(schema['$ref']) 

1052 already_visited = json_ref in json_refs 

1053 json_refs[json_ref] += 1 

1054 if already_visited: 

1055 return # prevent recursion on a definition that was already visited 

1056 _add_json_refs(self.definitions[self.json_to_defs_refs[json_ref]]) 

1057 for v in schema.values(): 

1058 _add_json_refs(v) 

1059 elif isinstance(schema, list): 

1060 for v in schema: 

1061 _add_json_refs(v) 

1062 

1063 _add_json_refs(json_schema) 

1064 return json_refs 

1065 

1066 def handle_invalid_for_json_schema( 

1067 self, schema: CoreSchema | core_schema.TypedDictField | core_schema.DataclassField, error_info: str 

1068 ) -> JsonSchemaValue: 

1069 if _core_metadata.CoreMetadataHandler(schema).metadata.get('pydantic_js_modify_function') is not None: 

1070 # Since there is a json schema modify function, assume that this type is meant to be handled, 

1071 # and the modify function will set all properties as appropriate 

1072 return {} 

1073 else: 

1074 raise PydanticInvalidForJsonSchema(f'Cannot generate a JsonSchema for {error_info}') 

1075 

1076 def emit_warning(self, kind: JsonSchemaWarningKind, detail: str) -> None: 

1077 """ 

1078 This method simply emits PydanticJsonSchemaWarnings based on handling in the `warning_message` method. 

1079 """ 

1080 message = self.render_warning_message(kind, detail) 

1081 if message is not None: 

1082 warnings.warn(message, PydanticJsonSchemaWarning) 

1083 

1084 def render_warning_message(self, kind: JsonSchemaWarningKind, detail: str) -> str | None: 

1085 """ 

1086 This method is responsible for ignoring warnings as desired, and for formatting the warning messages. 

1087 

1088 You can override the value of `ignored_warning_kinds` in a subclass of GenerateJsonSchema 

1089 to modify what warnings are generated. If you want more control, you can override this method; 

1090 just return None in situations where you don't want warnings to be emitted. 

1091 """ 

1092 if kind in self.ignored_warning_kinds: 

1093 return None 

1094 return f'{detail} [{kind}]' 

1095 

1096 

1097# ##### Start JSON Schema Generation Functions ##### 

1098# TODO: These should be moved to the pydantic.funcs module or whatever when appropriate. 

1099 

1100 

1101def models_json_schema( 

1102 models: Sequence[type[BaseModel] | type[PydanticDataclass]], 

1103 *, 

1104 by_alias: bool = True, 

1105 title: str | None = None, 

1106 description: str | None = None, 

1107 ref_template: str = DEFAULT_REF_TEMPLATE, 

1108 schema_generator: type[GenerateJsonSchema] = GenerateJsonSchema, 

1109) -> dict[str, Any]: 

1110 # TODO: Put this in the "methods" module once that is created? 

1111 instance = schema_generator(by_alias=by_alias, ref_template=ref_template) 

1112 definitions = instance.generate_definitions([x.__pydantic_core_schema__ for x in models]) 

1113 

1114 json_schema: dict[str, Any] = {} 

1115 if definitions: 

1116 json_schema['$defs'] = definitions 

1117 if title: 

1118 json_schema['title'] = title 

1119 if description: 

1120 json_schema['description'] = description 

1121 

1122 return json_schema 

1123 

1124 

1125# TODO: Consider removing this cache, as it already gets used pretty infrequently. 

1126 

1127if sys.version_info >= (3, 9): # Typing for weak dictionaries available at 3.9 

1128 _JsonSchemaCache = WeakKeyDictionary[Type[Any], Dict[Any, Any]] 

1129else: 

1130 _JsonSchemaCache = WeakKeyDictionary 

1131 

1132_JSON_SCHEMA_CACHE = _JsonSchemaCache() 

1133 

1134 

1135def model_json_schema( 

1136 cls: type[BaseModel] | type[PydanticDataclass], 

1137 by_alias: bool = True, 

1138 ref_template: str = DEFAULT_REF_TEMPLATE, 

1139 schema_generator: type[GenerateJsonSchema] = GenerateJsonSchema, 

1140) -> dict[str, Any]: 

1141 # TODO: Put this in the "methods" module once that is created 

1142 cls_json_schema_cache = _JSON_SCHEMA_CACHE.get(cls) 

1143 if cls_json_schema_cache is None: 

1144 _JSON_SCHEMA_CACHE[cls] = cls_json_schema_cache = {} 

1145 

1146 cached = cls_json_schema_cache.get((by_alias, ref_template, schema_generator)) 

1147 if cached is not None: 

1148 return cached 

1149 

1150 json_schema = schema_generator(by_alias=by_alias, ref_template=ref_template).generate(cls.__pydantic_core_schema__) 

1151 cls_json_schema_cache[(by_alias, ref_template, schema_generator)] = json_schema 

1152 

1153 return json_schema 

1154 

1155 

1156# ##### End JSON Schema Generation Functions ##### 

1157 

1158 

1159_Json = Union[Dict[str, Any], List[Any], str, int, float, bool, None] 

1160_JsonDict = Dict[str, _Json] 

1161_HashableJson = Union[Tuple[Tuple[str, Any], ...], Tuple[Any, ...], str, int, float, bool, None] 

1162 

1163 

1164def _deduplicate_schemas(schemas: Iterable[_JsonDict]) -> list[_JsonDict]: 

1165 return list({_make_json_hashable(schema): schema for schema in schemas}.values()) 

1166 

1167 

1168def _make_json_hashable(value: _Json) -> _HashableJson: 

1169 if isinstance(value, dict): 

1170 return tuple(sorted((k, _make_json_hashable(v)) for k, v in value.items())) 

1171 elif isinstance(value, list): 

1172 return tuple(_make_json_hashable(v) for v in value) 

1173 else: 

1174 return value