1import abc
2import collections.abc
3import functools
4import logging
5import types
6import typing as t
7from enum import Enum
8
9from connexion import utils
10from connexion.context import operation
11from connexion.datastructures import NoContent
12from connexion.exceptions import NonConformingResponseHeaders
13from connexion.frameworks.abstract import Framework
14from connexion.lifecycle import ConnexionResponse
15
16logger = logging.getLogger(__name__)
17
18
19class BaseResponseDecorator:
20 def __init__(self, *, framework: t.Type[Framework], jsonifier):
21 self.framework = framework
22 self.jsonifier = jsonifier
23
24 @abc.abstractmethod
25 def __call__(self, function: t.Callable) -> t.Callable:
26 raise NotImplementedError
27
28 def build_framework_response(self, handler_response):
29 data, status_code, headers = self._unpack_handler_response(handler_response)
30 content_type = self._infer_content_type(data, headers)
31 if not self.framework.is_framework_response(data):
32 data = self._serialize_data(data, content_type=content_type)
33 status_code = status_code or self._infer_status_code(data)
34 headers = self._update_headers(headers, content_type=content_type)
35 return self.framework.build_response(
36 data, content_type=content_type, status_code=status_code, headers=headers
37 )
38
39 @staticmethod
40 def _infer_content_type(data: t.Any, headers: dict) -> t.Optional[str]:
41 """Infer the response content type from the returned data, headers and operation spec.
42
43 :param data: Response data
44 :param headers: Headers returned by the handler.
45
46 :return: Inferred content type
47
48 :raises: NonConformingResponseHeaders if content type cannot be deducted.
49 """
50 content_type = utils.extract_content_type(headers)
51
52 # TODO: don't default
53 produces = list(set(operation.produces))
54 if data is not None and not produces:
55 produces = ["application/json"]
56
57 if content_type:
58 if content_type not in produces:
59 raise NonConformingResponseHeaders(
60 f"Returned content type ({content_type}) is not defined in operation spec "
61 f"({operation.produces})."
62 )
63 else:
64 if not produces:
65 # Produces can be empty/ for empty responses
66 pass
67 elif len(produces) == 1:
68 content_type = produces[0]
69 else:
70 if isinstance(data, str):
71 for produced_content_type in produces:
72 if "text/plain" in produced_content_type:
73 content_type = produced_content_type
74 elif isinstance(data, bytes) or isinstance(
75 data, (types.GeneratorType, collections.abc.Iterator)
76 ):
77 for produced_content_type in produces:
78 if "application/octet-stream" in produced_content_type:
79 content_type = produced_content_type
80
81 if content_type is None:
82 raise NonConformingResponseHeaders(
83 "Multiple response content types are defined in the operation spec, but "
84 "the handler response did not specify which one to return."
85 )
86
87 return content_type
88
89 def _serialize_data(self, data: t.Any, *, content_type: str) -> t.Any:
90 """Serialize the data based on the content type."""
91 if data is None or data is NoContent:
92 return None
93 # TODO: encode responses
94 mime_type, _ = utils.split_content_type(content_type)
95 if utils.is_json_mimetype(mime_type):
96 return self.jsonifier.dumps(data)
97 return data
98
99 @staticmethod
100 def _infer_status_code(data: t.Any) -> int:
101 """Infer the status code from the returned data."""
102 if data is None:
103 return 204
104 return 200
105
106 @staticmethod
107 def _update_headers(
108 headers: t.Dict[str, str], *, content_type: str
109 ) -> t.Dict[str, str]:
110 # Check if Content-Type is in headers, taking into account case-insensitivity
111 for key, value in headers.items():
112 if key.lower() == "content-type":
113 return headers
114
115 if content_type:
116 headers["Content-Type"] = content_type
117 return headers
118
119 @staticmethod
120 def _unpack_handler_response(
121 handler_response: t.Union[str, bytes, dict, list, tuple]
122 ) -> t.Tuple[t.Union[str, bytes, dict, list, None], t.Optional[int], dict]:
123 """Unpack the handler response into data, status_code and headers.
124
125 :param handler_response: The response returned from the handler function if it was not a
126 response class.
127
128 :return: A tuple of data, status_code and headers
129 """
130 data, status_code, headers = None, None, {}
131
132 if not isinstance(handler_response, tuple):
133 data = handler_response
134
135 elif len(handler_response) == 1:
136 (data,) = handler_response
137
138 elif len(handler_response) == 2:
139 data, status_code_or_headers = handler_response
140 if isinstance(status_code_or_headers, int):
141 # Extra int call because of int subclasses such as http.HTTPStatus (IntEnum)
142 status_code = int(status_code_or_headers)
143 elif isinstance(status_code_or_headers, Enum) and isinstance(
144 status_code_or_headers.value, int
145 ):
146 status_code = status_code_or_headers.value
147 else:
148 headers = status_code_or_headers
149
150 elif len(handler_response) == 3:
151 data, status_code, headers = handler_response
152
153 else:
154 raise TypeError(
155 "The view function did not return a valid response tuple."
156 " The tuple must have the form (body), (body, status, headers),"
157 " (body, status), or (body, headers)."
158 )
159
160 return data, status_code, headers
161
162
163class SyncResponseDecorator(BaseResponseDecorator):
164 def __call__(self, function: t.Callable) -> t.Callable:
165 @functools.wraps(function)
166 def wrapper(*args, **kwargs):
167 """
168 This method converts a handler response to a framework response.
169 The handler response can be a ConnexionResponse, a framework response, a tuple or an
170 object.
171 """
172 handler_response = function(*args, **kwargs)
173 if self.framework.is_framework_response(handler_response):
174 return handler_response
175 elif isinstance(handler_response, ConnexionResponse):
176 return self.framework.connexion_to_framework_response(handler_response)
177 else:
178 return self.build_framework_response(handler_response)
179
180 return wrapper
181
182
183class AsyncResponseDecorator(BaseResponseDecorator):
184 def __call__(self, function: t.Callable) -> t.Callable:
185 @functools.wraps(function)
186 async def wrapper(*args, **kwargs):
187 """
188 This method converts a handler response to a framework response.
189 The handler response can be a ConnexionResponse, a framework response, a tuple or an
190 object.
191 """
192 handler_response = await function(*args, **kwargs)
193 if self.framework.is_framework_response(handler_response):
194 return handler_response
195 elif isinstance(handler_response, ConnexionResponse):
196 return self.framework.connexion_to_framework_response(handler_response)
197 else:
198 return self.build_framework_response(handler_response)
199
200 return wrapper
201
202
203class NoResponseDecorator(BaseResponseDecorator):
204 """Dummy decorator to skip response serialization."""
205
206 def __call__(self, function: t.Callable) -> t.Callable:
207 return lambda request: function(request)