1import json
2import logging
3import typing as t
4
5import jsonschema
6from jsonschema import Draft4Validator, ValidationError
7from starlette.types import Scope
8
9from connexion.exceptions import BadRequestProblem, NonConformingResponseBody
10from connexion.json_schema import (
11 Draft4RequestValidator,
12 Draft4ResponseValidator,
13 format_error_with_path,
14)
15from connexion.validators import (
16 AbstractRequestBodyValidator,
17 AbstractResponseBodyValidator,
18)
19
20logger = logging.getLogger(__name__)
21
22
23class JSONRequestBodyValidator(AbstractRequestBodyValidator):
24 """Request body validator for json content types."""
25
26 def __init__(
27 self,
28 *,
29 schema: dict,
30 required=False,
31 nullable=False,
32 encoding: str,
33 strict_validation: bool,
34 **kwargs,
35 ) -> None:
36 super().__init__(
37 schema=schema,
38 required=required,
39 nullable=nullable,
40 encoding=encoding,
41 strict_validation=strict_validation,
42 )
43
44 @property
45 def _validator(self):
46 return Draft4RequestValidator(
47 self._schema, format_checker=Draft4Validator.FORMAT_CHECKER
48 )
49
50 async def _parse(
51 self, stream: t.AsyncGenerator[bytes, None], scope: Scope
52 ) -> t.Any:
53 bytes_body = b"".join([message async for message in stream])
54 body = bytes_body.decode(self._encoding)
55
56 if not body:
57 return None
58
59 try:
60 return json.loads(body)
61 except json.decoder.JSONDecodeError as e:
62 raise BadRequestProblem(detail=str(e))
63
64 def _validate(self, body: t.Any) -> t.Optional[dict]:
65 if not self._nullable and body is None:
66 raise BadRequestProblem("Request body must not be empty")
67 try:
68 return self._validator.validate(body)
69 except ValidationError as exception:
70 error_path_msg = format_error_with_path(exception=exception)
71 logger.info(
72 f"Validation error: {exception.message}{error_path_msg}",
73 extra={"validator": "body"},
74 )
75 raise BadRequestProblem(detail=f"{exception.message}{error_path_msg}")
76
77
78class DefaultsJSONRequestBodyValidator(JSONRequestBodyValidator):
79 """Request body validator for json content types which fills in default values. This Validator
80 intercepts the body, makes changes to it, and replays it for the next ASGI application.
81 """
82
83 MUTABLE_VALIDATION = True
84 """This validator might mutate to the body."""
85
86 @property
87 def _validator(self):
88 validator_cls = self.extend_with_set_default(Draft4RequestValidator)
89 return validator_cls(
90 self._schema, format_checker=Draft4Validator.FORMAT_CHECKER
91 )
92
93 # via https://python-jsonschema.readthedocs.io/
94 @staticmethod
95 def extend_with_set_default(validator_class):
96 validate_properties = validator_class.VALIDATORS["properties"]
97
98 def set_defaults(validator, properties, instance, schema):
99 for property, subschema in properties.items():
100 if "default" in subschema:
101 instance.setdefault(property, subschema["default"])
102
103 yield from validate_properties(validator, properties, instance, schema)
104
105 return jsonschema.validators.extend(
106 validator_class, {"properties": set_defaults}
107 )
108
109
110class JSONResponseBodyValidator(AbstractResponseBodyValidator):
111 """Response body validator for json content types."""
112
113 @property
114 def validator(self) -> Draft4Validator:
115 return Draft4ResponseValidator(
116 self._schema, format_checker=Draft4Validator.FORMAT_CHECKER
117 )
118
119 def _parse(self, stream: t.Generator[bytes, None, None]) -> t.Any:
120 body = b"".join(stream).decode(self._encoding)
121
122 if not body:
123 return None
124
125 try:
126 return json.loads(body)
127 except json.decoder.JSONDecodeError as e:
128 raise NonConformingResponseBody(str(e))
129
130 def _validate(self, body: dict):
131 try:
132 self.validator.validate(body)
133 except ValidationError as exception:
134 error_path_msg = format_error_with_path(exception=exception)
135 logger.warning(
136 f"Validation error: {exception.message}{error_path_msg}",
137 extra={"validator": "body"},
138 )
139 raise NonConformingResponseBody(
140 detail=f"Response body does not conform to specification. {exception.message}{error_path_msg}"
141 )
142
143
144class TextResponseBodyValidator(JSONResponseBodyValidator):
145 def _parse(self, stream: t.Generator[bytes, None, None]) -> str: # type: ignore
146 body = b"".join(stream).decode(self._encoding)
147
148 try:
149 return json.loads(body)
150 except json.decoder.JSONDecodeError:
151 return body