Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/validators/abstract.py: 27%
78 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1"""
2This module defines a Validator interface with base functionality that can be subclassed
3for custom validators provided to the RequestValidationMiddleware.
4"""
5import copy
6import json
7import typing as t
9from starlette.datastructures import Headers, MutableHeaders
10from starlette.types import Receive, Scope, Send
12from connexion.exceptions import BadRequestProblem
15class AbstractRequestBodyValidator:
16 """
17 Validator interface with base functionality that can be subclassed for custom validators.
19 .. note: Validators load the whole body into memory, which can be a problem for large payloads.
20 """
22 MUTABLE_VALIDATION = False
23 """
24 Whether mutations to the body during validation should be transmitted via the receive channel.
25 Note that this does not apply to the substitution of a missing body with the default body, which always
26 updates the receive channel.
27 """
28 MAX_MESSAGE_LENGTH = 256000
29 """Maximum message length that will be sent via the receive channel for mutated bodies."""
31 def __init__(
32 self,
33 *,
34 schema: dict,
35 required: bool = False,
36 nullable: bool = False,
37 encoding: str,
38 strict_validation: bool,
39 **kwargs,
40 ):
41 """
42 :param schema: Schema of operation to validate
43 :param required: Whether RequestBody is required
44 :param nullable: Whether RequestBody is nullable
45 :param encoding: Encoding of body (passed via Content-Type header)
46 :param kwargs: Additional arguments for subclasses
47 :param strict_validation: Whether to allow parameters not defined in the spec
48 """
49 self._schema = schema
50 self._nullable = nullable
51 self._required = required
52 self._encoding = encoding
53 self._strict_validation = strict_validation
55 async def _parse(
56 self, stream: t.AsyncGenerator[bytes, None], scope: Scope
57 ) -> t.Any:
58 """Parse the incoming stream."""
60 def _validate(self, body: t.Any) -> t.Optional[dict]:
61 """
62 Validate the parsed body.
64 :raises: :class:`connexion.exceptions.BadRequestProblem`
65 """
67 def _insert_body(self, receive: Receive, *, body: t.Any, scope: Scope) -> Receive:
68 """
69 Insert messages transmitting the body at the start of the `receive` channel.
71 This method updates the provided `scope` in place with the right `Content-Length` header.
72 """
73 if body is None:
74 return receive
76 bytes_body = json.dumps(body).encode(self._encoding)
78 # Update the content-length header
79 new_scope = copy.deepcopy(scope)
80 headers = MutableHeaders(scope=new_scope)
81 headers["content-length"] = str(len(bytes_body))
83 # Wrap in new receive channel
84 messages = (
85 {
86 "type": "http.request",
87 "body": bytes_body[i : i + self.MAX_MESSAGE_LENGTH],
88 "more_body": i + self.MAX_MESSAGE_LENGTH < len(bytes_body),
89 }
90 for i in range(0, len(bytes_body), self.MAX_MESSAGE_LENGTH)
91 )
93 receive = self._insert_messages(receive, messages=messages)
95 return receive
97 @staticmethod
98 def _insert_messages(
99 receive: Receive, *, messages: t.Iterable[t.MutableMapping[str, t.Any]]
100 ) -> Receive:
101 """Insert messages at the start of the `receive` channel."""
103 async def receive_() -> t.MutableMapping[str, t.Any]:
104 for message in messages:
105 return message
106 return await receive()
108 return receive_
110 async def wrap_receive(self, receive: Receive, *, scope: Scope) -> Receive:
111 """
112 Wrap the provided `receive` channel with request body validation.
114 This method updates the provided `scope` in place with the right `Content-Length` header.
115 """
116 # Handle missing bodies
117 headers = Headers(scope=scope)
118 if not int(headers.get("content-length", 0)):
119 body = self._schema.get("default")
120 if body is None and self._required:
121 raise BadRequestProblem("RequestBody is required")
122 # The default body is encoded as a `receive` channel to mimic an incoming body
123 receive = self._insert_body(receive, body=body, scope=scope)
125 # The receive channel is converted to a stream for convenient access
126 messages = []
128 async def stream() -> t.AsyncGenerator[bytes, None]:
129 more_body = True
130 while more_body:
131 message = await receive()
132 messages.append(message)
133 more_body = message.get("more_body", False)
134 yield message.get("body", b"")
135 yield b""
137 # The body is parsed and validated
138 body = await self._parse(stream(), scope=scope)
139 if not (body is None and self._nullable):
140 self._validate(body)
142 # If MUTABLE_VALIDATION is enabled, include any changes made during validation in the messages to send
143 if self.MUTABLE_VALIDATION:
144 # Include changes made during validation
145 receive = self._insert_body(receive, body=body, scope=scope)
146 else:
147 # Serialize original messages
148 receive = self._insert_messages(receive, messages=messages)
150 return receive
153class AbstractResponseBodyValidator:
154 """
155 Validator interface with base functionality that can be subclassed for custom validators.
157 .. note: Validators load the whole body into memory, which can be a problem for large payloads.
158 """
160 def __init__(
161 self,
162 scope: Scope,
163 *,
164 schema: dict,
165 nullable: bool = False,
166 encoding: str,
167 ) -> None:
168 self._scope = scope
169 self._schema = schema
170 self._nullable = nullable
171 self._encoding = encoding
173 def _parse(self, stream: t.Generator[bytes, None, None]) -> t.Any:
174 """Parse the incoming stream."""
176 def _validate(self, body: t.Any) -> t.Optional[dict]:
177 """
178 Validate the body.
180 :raises: :class:`connexion.exceptions.NonConformingResponse`
181 """
183 def wrap_send(self, send: Send) -> Send:
184 """Wrap the provided send channel with response body validation"""
186 messages = []
188 async def send_(message: t.MutableMapping[str, t.Any]) -> None:
189 messages.append(message)
191 if message["type"] == "http.response.start" or message.get(
192 "more_body", False
193 ):
194 return
196 stream = (message.get("body", b"") for message in messages)
197 body = self._parse(stream)
199 if not (body is None and self._nullable):
200 self._validate(body)
202 while messages:
203 await send(messages.pop(0))
205 return send_