1"""
2This module defines a Swagger2Operation class, a Connexion operation specific for Swagger 2 specs.
3"""
4
5import logging
6import typing as t
7from http import HTTPStatus
8
9from connexion.datastructures import NoContent
10from connexion.exceptions import InvalidSpecification
11from connexion.operations.abstract import AbstractOperation
12from connexion.uri_parsing import Swagger2URIParser
13from connexion.utils import build_example_from_schema, deep_get
14
15logger = logging.getLogger("connexion.operations.swagger2")
16
17
18COLLECTION_FORMAT_MAPPING = {
19 "multi": {"style": "form", "explode": True},
20 "csv": {"style": "form", "explode": False},
21 "ssv": {"style": "spaceDelimited", "explode": False},
22 "pipes": {"style": "pipeDelimited", "explode": False},
23}
24
25
26class Swagger2Operation(AbstractOperation):
27
28 """
29 Exposes a Swagger 2.0 operation under the AbstractOperation interface.
30 The primary purpose of this class is to provide the `function()` method
31 to the API. A Swagger2Operation is plugged into the API with the provided
32 (path, method) pair. It resolves the handler function for this operation
33 with the provided resolver, and wraps the handler function with multiple
34 decorators that provide security, validation, serialization,
35 and deserialization.
36 """
37
38 def __init__(
39 self,
40 method,
41 path,
42 operation,
43 resolver,
44 app_produces,
45 app_consumes,
46 path_parameters=None,
47 app_security=None,
48 security_schemes=None,
49 definitions=None,
50 randomize_endpoint=None,
51 uri_parser_class=None,
52 ):
53 """
54 :param method: HTTP method
55 :type method: str
56 :param path: relative path to this operation
57 :type path: str
58 :param operation: swagger operation object
59 :type operation: dict
60 :param resolver: Callable that maps operationID to a function
61 :type resolver: resolver.Resolver
62 :param app_produces: list of content types the application can return by default
63 :type app_produces: list
64 :param app_consumes: list of content types the application consumes by default
65 :type app_consumes: list
66 :param path_parameters: Parameters defined in the path level
67 :type path_parameters: list
68 :param app_security: list of security rules the application uses by default
69 :type app_security: list
70 :param security_schemes: `Security Definitions Object
71 <https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#security-definitions-object>`_
72 :type security_schemes: dict
73 :param definitions: `Definitions Object
74 <https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#definitionsObject>`_
75 :type definitions: dict
76 :param randomize_endpoint: number of random characters to append to operation name
77 :type randomize_endpoint: integer
78 :param uri_parser_class: class to use for uri parsing
79 :type uri_parser_class: AbstractURIParser
80 """
81 uri_parser_class = uri_parser_class or Swagger2URIParser
82
83 self._router_controller = operation.get("x-swagger-router-controller")
84
85 super().__init__(
86 method=method,
87 path=path,
88 operation=operation,
89 resolver=resolver,
90 app_security=app_security,
91 security_schemes=security_schemes,
92 randomize_endpoint=randomize_endpoint,
93 uri_parser_class=uri_parser_class,
94 )
95
96 self._produces = operation.get("produces", app_produces)
97 self._consumes = operation.get("consumes", app_consumes)
98
99 self.definitions = definitions or {}
100
101 self._parameters = operation.get("parameters", [])
102 if path_parameters:
103 self._parameters += path_parameters
104
105 self._responses = operation.get("responses", {})
106
107 @classmethod
108 def from_spec(cls, spec, *args, path, method, resolver, **kwargs):
109 return cls(
110 method,
111 path,
112 spec.get_operation(path, method),
113 resolver=resolver,
114 path_parameters=spec.get_path_params(path),
115 app_produces=spec.produces,
116 app_consumes=spec.consumes,
117 app_security=spec.security,
118 security_schemes=spec.security_schemes,
119 definitions=spec.definitions,
120 *args,
121 **kwargs,
122 )
123
124 @property
125 def request_body(self) -> dict:
126 if not hasattr(self, "_request_body"):
127 body_params = []
128 form_params = []
129 for parameter in self.parameters:
130 if parameter["in"] == "body":
131 body_params.append(parameter)
132 elif parameter["in"] == "formData":
133 form_params.append(parameter)
134
135 if len(body_params) > 1:
136 raise InvalidSpecification(
137 f"{self.method} {self.path}: There can be one 'body' parameter at most"
138 )
139
140 if body_params and form_params:
141 raise InvalidSpecification(
142 f"{self.method} {self.path}: 'body' and 'formData' parameters are mutually exclusive"
143 )
144
145 if body_params:
146 self._request_body = self._transform_json(body_params[0])
147 elif form_params:
148 self._request_body = self._transform_form(form_params)
149 else:
150 self._request_body = {}
151
152 return self._request_body
153
154 @property
155 def parameters(self):
156 return self._parameters
157
158 @property
159 def consumes(self):
160 return self._consumes
161
162 @property
163 def produces(self):
164 return self._produces
165
166 def get_path_parameter_types(self):
167 types = {}
168 path_parameters = (p for p in self.parameters if p["in"] == "path")
169 for path_defn in path_parameters:
170 if path_defn.get("type") == "string" and path_defn.get("format") == "path":
171 # path is special case for type 'string'
172 path_type = "path"
173 else:
174 path_type = path_defn.get("type")
175 types[path_defn["name"]] = path_type
176 return types
177
178 def with_definitions(self, schema):
179 if "schema" in schema:
180 schema["schema"]["definitions"] = self.definitions
181 return schema
182
183 def response_schema(self, status_code=None, content_type=None):
184 response_definition = self.response_definition(status_code, content_type)
185 return self.with_definitions(response_definition.get("schema", {}))
186
187 def example_response(self, status_code=None, *args, **kwargs):
188 """
189 Returns example response from spec
190 """
191 # simply use the first/lowest status code, this is probably 200 or 201
192 status_code = status_code or sorted(self._responses.keys())[0]
193 examples_path = [str(status_code), "examples"]
194 schema_example_path = [str(status_code), "schema", "example"]
195 schema_path = [str(status_code), "schema"]
196
197 try:
198 status_code = int(status_code)
199 except ValueError:
200 status_code = 200
201
202 if status_code == HTTPStatus.NO_CONTENT:
203 return NoContent, status_code
204
205 try:
206 return (
207 list(deep_get(self._responses, examples_path).values())[0],
208 status_code,
209 )
210 except KeyError:
211 pass
212 try:
213 return (deep_get(self._responses, schema_example_path), status_code)
214 except KeyError:
215 pass
216
217 try:
218 schema = deep_get(self._responses, schema_path)
219 except KeyError:
220 return ("No example response or response schema defined.", status_code)
221
222 return (build_example_from_schema(schema), status_code)
223
224 def body_name(self, content_type: t.Optional[str] = None) -> str:
225 return self.body_definition(content_type).get("name", "body")
226
227 def body_schema(self, content_type: t.Optional[str] = None) -> dict:
228 """
229 The body schema definition for this operation.
230 """
231 body_definition = self.body_definition(content_type)
232 return self.with_definitions(body_definition).get("schema", {})
233
234 def body_definition(self, content_type: t.Optional[str] = None) -> dict:
235 """
236 The body complete definition for this operation.
237
238 **There can be one "body" parameter at most.**
239 """
240 return self.request_body
241
242 def _transform_json(self, body_parameter: dict) -> dict:
243 """Translate Swagger2 json parameters into OpenAPI 3 jsonschema spec."""
244 nullable = body_parameter.get("x-nullable")
245 if nullable is not None:
246 body_parameter["schema"]["nullable"] = nullable
247 return body_parameter
248
249 def _transform_form(self, form_parameters: t.List[dict]) -> dict:
250 """Translate Swagger2 form parameters into OpenAPI 3 jsonschema spec."""
251 properties = {}
252 defaults = {}
253 required = []
254 encoding = {}
255
256 for param in form_parameters:
257 prop = {}
258
259 if param["type"] == "file":
260 prop.update(
261 {
262 "type": "string",
263 "format": "binary",
264 }
265 )
266 else:
267 prop["type"] = param["type"]
268
269 format_ = param.get("format")
270 if format_ is not None:
271 prop["format"] = format_
272
273 default = param.get("default")
274 if default is not None:
275 prop["default"] = default
276 defaults[param["name"]] = default
277
278 nullable = param.get("x-nullable")
279 if nullable is not None:
280 prop["nullable"] = nullable
281
282 if param["type"] == "array":
283 prop["items"] = param.get("items", {})
284
285 collection_format = param.get("collectionFormat", "csv")
286 try:
287 encoding[param["name"]] = COLLECTION_FORMAT_MAPPING[
288 collection_format
289 ]
290 except KeyError:
291 raise InvalidSpecification(
292 f"The collection format ({collection_format}) is not supported by "
293 f"Connexion as it cannot be mapped to OpenAPI 3."
294 )
295
296 properties[param["name"]] = prop
297
298 if param.get("required", False):
299 required.append(param["name"])
300
301 definition: t.Dict[str, t.Any] = {
302 "schema": {
303 "type": "object",
304 "properties": properties,
305 "required": required,
306 }
307 }
308 if defaults:
309 definition["schema"]["default"] = defaults
310 if encoding:
311 definition["encoding"] = encoding
312
313 return definition