1import json
2import logging
3import typing as t
4
5import jsonschema
6from jsonschema import Draft4Validator, ValidationError, draft4_format_checker
7from starlette.types import Scope
8
9from connexion.exceptions import BadRequestProblem, NonConformingResponseBody
10from connexion.json_schema import (
11 Draft4RequestValidator,
12 Draft4ResponseValidator,
13 format_error_with_path,
14)
15from connexion.validators import (
16 AbstractRequestBodyValidator,
17 AbstractResponseBodyValidator,
18)
19
20logger = logging.getLogger(__name__)
21
22
23class JSONRequestBodyValidator(AbstractRequestBodyValidator):
24 """Request body validator for json content types."""
25
26 def __init__(
27 self,
28 *,
29 schema: dict,
30 required=False,
31 nullable=False,
32 encoding: str,
33 strict_validation: bool,
34 **kwargs,
35 ) -> None:
36 super().__init__(
37 schema=schema,
38 required=required,
39 nullable=nullable,
40 encoding=encoding,
41 strict_validation=strict_validation,
42 )
43
44 @property
45 def _validator(self):
46 return Draft4RequestValidator(
47 self._schema, format_checker=draft4_format_checker
48 )
49
50 async def _parse(
51 self, stream: t.AsyncGenerator[bytes, None], scope: Scope
52 ) -> t.Any:
53 bytes_body = b"".join([message async for message in stream])
54 body = bytes_body.decode(self._encoding)
55
56 if not body:
57 return None
58
59 try:
60 return json.loads(body)
61 except json.decoder.JSONDecodeError as e:
62 raise BadRequestProblem(detail=str(e))
63
64 def _validate(self, body: t.Any) -> t.Optional[dict]:
65 if not self._nullable and body is None:
66 raise BadRequestProblem("Request body must not be empty")
67 try:
68 return self._validator.validate(body)
69 except ValidationError as exception:
70 error_path_msg = format_error_with_path(exception=exception)
71 logger.info(
72 f"Validation error: {exception.message}{error_path_msg}",
73 extra={"validator": "body"},
74 )
75 raise BadRequestProblem(detail=f"{exception.message}{error_path_msg}")
76
77
78class DefaultsJSONRequestBodyValidator(JSONRequestBodyValidator):
79 """Request body validator for json content types which fills in default values. This Validator
80 intercepts the body, makes changes to it, and replays it for the next ASGI application.
81 """
82
83 MUTABLE_VALIDATION = True
84 """This validator might mutate to the body."""
85
86 @property
87 def _validator(self):
88 validator_cls = self.extend_with_set_default(Draft4RequestValidator)
89 return validator_cls(self._schema, format_checker=draft4_format_checker)
90
91 # via https://python-jsonschema.readthedocs.io/
92 @staticmethod
93 def extend_with_set_default(validator_class):
94 validate_properties = validator_class.VALIDATORS["properties"]
95
96 def set_defaults(validator, properties, instance, schema):
97 for property, subschema in properties.items():
98 if "default" in subschema:
99 instance.setdefault(property, subschema["default"])
100
101 yield from validate_properties(validator, properties, instance, schema)
102
103 return jsonschema.validators.extend(
104 validator_class, {"properties": set_defaults}
105 )
106
107
108class JSONResponseBodyValidator(AbstractResponseBodyValidator):
109 """Response body validator for json content types."""
110
111 @property
112 def validator(self) -> Draft4Validator:
113 return Draft4ResponseValidator(
114 self._schema, format_checker=draft4_format_checker
115 )
116
117 def _parse(self, stream: t.Generator[bytes, None, None]) -> t.Any:
118 body = b"".join(stream).decode(self._encoding)
119
120 if not body:
121 return None
122
123 try:
124 return json.loads(body)
125 except json.decoder.JSONDecodeError as e:
126 raise NonConformingResponseBody(str(e))
127
128 def _validate(self, body: dict):
129 try:
130 self.validator.validate(body)
131 except ValidationError as exception:
132 error_path_msg = format_error_with_path(exception=exception)
133 logger.warning(
134 f"Validation error: {exception.message}{error_path_msg}",
135 extra={"validator": "body"},
136 )
137 raise NonConformingResponseBody(
138 detail=f"Response body does not conform to specification. {exception.message}{error_path_msg}"
139 )
140
141
142class TextResponseBodyValidator(JSONResponseBodyValidator):
143 def _parse(self, stream: t.Generator[bytes, None, None]) -> str: # type: ignore
144 body = b"".join(stream).decode(self._encoding)
145
146 try:
147 return json.loads(body)
148 except json.decoder.JSONDecodeError:
149 return body