1import json
2import logging
3import typing as t
4
5import jsonschema
6from jsonschema import Draft4Validator, ValidationError, draft4_format_checker
7from starlette.types import Scope
8
9from connexion.exceptions import BadRequestProblem, NonConformingResponseBody
10from connexion.json_schema import (
11 Draft4RequestValidator,
12 Draft4ResponseValidator,
13 format_error_with_path,
14)
15from connexion.validators import (
16 AbstractRequestBodyValidator,
17 AbstractResponseBodyValidator,
18)
19
20logger = logging.getLogger(__name__)
21
22
23class JSONRequestBodyValidator(AbstractRequestBodyValidator):
24 """Request body validator for json content types."""
25
26 def __init__(
27 self,
28 *,
29 schema: dict,
30 required=False,
31 nullable=False,
32 encoding: str,
33 strict_validation: bool,
34 **kwargs,
35 ) -> None:
36 super().__init__(
37 schema=schema,
38 required=required,
39 nullable=nullable,
40 encoding=encoding,
41 strict_validation=strict_validation,
42 )
43
44 @property
45 def _validator(self):
46 return Draft4RequestValidator(
47 self._schema, format_checker=draft4_format_checker
48 )
49
50 async def _parse(
51 self, stream: t.AsyncGenerator[bytes, None], scope: Scope
52 ) -> t.Any:
53 bytes_body = b"".join([message async for message in stream])
54 body = bytes_body.decode(self._encoding)
55
56 if not body:
57 return None
58
59 try:
60 return json.loads(body)
61 except json.decoder.JSONDecodeError as e:
62 raise BadRequestProblem(detail=str(e))
63
64 def _validate(self, body: t.Any) -> t.Optional[dict]:
65 if not self._nullable and body is None:
66 raise BadRequestProblem("Request body must not be empty")
67 try:
68 return self._validator.validate(body)
69 except ValidationError as exception:
70 error_path_msg = format_error_with_path(exception=exception)
71 logger.error(
72 f"Validation error: {exception.message}{error_path_msg}",
73 extra={"validator": "body"},
74 )
75 raise BadRequestProblem(detail=f"{exception.message}{error_path_msg}")
76
77
78class DefaultsJSONRequestBodyValidator(JSONRequestBodyValidator):
79 """Request body validator for json content types which fills in default values. This Validator
80 intercepts the body, makes changes to it, and replays it for the next ASGI application."""
81
82 MUTABLE_VALIDATION = True
83 """This validator might mutate to the body."""
84
85 @property
86 def _validator(self):
87 validator_cls = self.extend_with_set_default(Draft4RequestValidator)
88 return validator_cls(self._schema, format_checker=draft4_format_checker)
89
90 # via https://python-jsonschema.readthedocs.io/
91 @staticmethod
92 def extend_with_set_default(validator_class):
93 validate_properties = validator_class.VALIDATORS["properties"]
94
95 def set_defaults(validator, properties, instance, schema):
96 for property, subschema in properties.items():
97 if "default" in subschema:
98 instance.setdefault(property, subschema["default"])
99
100 yield from validate_properties(validator, properties, instance, schema)
101
102 return jsonschema.validators.extend(
103 validator_class, {"properties": set_defaults}
104 )
105
106
107class JSONResponseBodyValidator(AbstractResponseBodyValidator):
108 """Response body validator for json content types."""
109
110 @property
111 def validator(self) -> Draft4Validator:
112 return Draft4ResponseValidator(
113 self._schema, format_checker=draft4_format_checker
114 )
115
116 def _parse(self, stream: t.Generator[bytes, None, None]) -> t.Any:
117 body = b"".join(stream).decode(self._encoding)
118
119 if not body:
120 return None
121
122 try:
123 return json.loads(body)
124 except json.decoder.JSONDecodeError as e:
125 raise NonConformingResponseBody(str(e))
126
127 def _validate(self, body: dict):
128 try:
129 self.validator.validate(body)
130 except ValidationError as exception:
131 error_path_msg = format_error_with_path(exception=exception)
132 logger.error(
133 f"Validation error: {exception.message}{error_path_msg}",
134 extra={"validator": "body"},
135 )
136 raise NonConformingResponseBody(
137 detail=f"Response body does not conform to specification. {exception.message}{error_path_msg}"
138 )
139
140
141class TextResponseBodyValidator(JSONResponseBodyValidator):
142 def _parse(self, stream: t.Generator[bytes, None, None]) -> str: # type: ignore
143 body = b"".join(stream).decode(self._encoding)
144
145 try:
146 return json.loads(body)
147 except json.decoder.JSONDecodeError:
148 return body