1"""
2This module defines a Swagger2Operation class, a Connexion operation specific for Swagger 2 specs.
3"""
4
5import logging
6import typing as t
7
8from connexion.exceptions import InvalidSpecification
9from connexion.operations.abstract import AbstractOperation
10from connexion.uri_parsing import Swagger2URIParser
11from connexion.utils import build_example_from_schema, deep_get
12
13logger = logging.getLogger("connexion.operations.swagger2")
14
15
16COLLECTION_FORMAT_MAPPING = {
17 "multi": {"style": "form", "explode": True},
18 "csv": {"style": "form", "explode": False},
19 "ssv": {"style": "spaceDelimited", "explode": False},
20 "pipes": {"style": "pipeDelimited", "explode": False},
21}
22
23
24class Swagger2Operation(AbstractOperation):
25
26 """
27 Exposes a Swagger 2.0 operation under the AbstractOperation interface.
28 The primary purpose of this class is to provide the `function()` method
29 to the API. A Swagger2Operation is plugged into the API with the provided
30 (path, method) pair. It resolves the handler function for this operation
31 with the provided resolver, and wraps the handler function with multiple
32 decorators that provide security, validation, serialization,
33 and deserialization.
34 """
35
36 def __init__(
37 self,
38 method,
39 path,
40 operation,
41 resolver,
42 app_produces,
43 app_consumes,
44 path_parameters=None,
45 app_security=None,
46 security_schemes=None,
47 definitions=None,
48 randomize_endpoint=None,
49 uri_parser_class=None,
50 ):
51 """
52 :param method: HTTP method
53 :type method: str
54 :param path: relative path to this operation
55 :type path: str
56 :param operation: swagger operation object
57 :type operation: dict
58 :param resolver: Callable that maps operationID to a function
59 :type resolver: resolver.Resolver
60 :param app_produces: list of content types the application can return by default
61 :type app_produces: list
62 :param app_consumes: list of content types the application consumes by default
63 :type app_consumes: list
64 :param path_parameters: Parameters defined in the path level
65 :type path_parameters: list
66 :param app_security: list of security rules the application uses by default
67 :type app_security: list
68 :param security_schemes: `Security Definitions Object
69 <https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#security-definitions-object>`_
70 :type security_schemes: dict
71 :param definitions: `Definitions Object
72 <https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#definitionsObject>`_
73 :type definitions: dict
74 :param randomize_endpoint: number of random characters to append to operation name
75 :type randomize_endpoint: integer
76 :param uri_parser_class: class to use for uri parsing
77 :type uri_parser_class: AbstractURIParser
78 """
79 uri_parser_class = uri_parser_class or Swagger2URIParser
80
81 self._router_controller = operation.get("x-swagger-router-controller")
82
83 super().__init__(
84 method=method,
85 path=path,
86 operation=operation,
87 resolver=resolver,
88 app_security=app_security,
89 security_schemes=security_schemes,
90 randomize_endpoint=randomize_endpoint,
91 uri_parser_class=uri_parser_class,
92 )
93
94 self._produces = operation.get("produces", app_produces)
95 self._consumes = operation.get("consumes", app_consumes)
96
97 self.definitions = definitions or {}
98
99 self._parameters = operation.get("parameters", [])
100 if path_parameters:
101 self._parameters += path_parameters
102
103 self._responses = operation.get("responses", {})
104
105 @classmethod
106 def from_spec(cls, spec, *args, path, method, resolver, **kwargs):
107 return cls(
108 method,
109 path,
110 spec.get_operation(path, method),
111 resolver=resolver,
112 path_parameters=spec.get_path_params(path),
113 app_produces=spec.produces,
114 app_consumes=spec.consumes,
115 app_security=spec.security,
116 security_schemes=spec.security_schemes,
117 definitions=spec.definitions,
118 *args,
119 **kwargs,
120 )
121
122 @property
123 def request_body(self) -> dict:
124 if not hasattr(self, "_request_body"):
125 body_params = []
126 form_params = []
127 for parameter in self.parameters:
128 if parameter["in"] == "body":
129 body_params.append(parameter)
130 elif parameter["in"] == "formData":
131 form_params.append(parameter)
132
133 if len(body_params) > 1:
134 raise InvalidSpecification(
135 f"{self.method} {self.path}: There can be one 'body' parameter at most"
136 )
137
138 if body_params and form_params:
139 raise InvalidSpecification(
140 f"{self.method} {self.path}: 'body' and 'formData' parameters are mutually exclusive"
141 )
142
143 if body_params:
144 self._request_body = self._transform_json(body_params[0])
145 elif form_params:
146 self._request_body = self._transform_form(form_params)
147 else:
148 self._request_body = {}
149
150 return self._request_body
151
152 @property
153 def parameters(self):
154 return self._parameters
155
156 @property
157 def consumes(self):
158 return self._consumes
159
160 @property
161 def produces(self):
162 return self._produces
163
164 def get_path_parameter_types(self):
165 types = {}
166 path_parameters = (p for p in self.parameters if p["in"] == "path")
167 for path_defn in path_parameters:
168 if path_defn.get("type") == "string" and path_defn.get("format") == "path":
169 # path is special case for type 'string'
170 path_type = "path"
171 else:
172 path_type = path_defn.get("type")
173 types[path_defn["name"]] = path_type
174 return types
175
176 def with_definitions(self, schema):
177 if "schema" in schema:
178 schema["schema"]["definitions"] = self.definitions
179 return schema
180
181 def response_schema(self, status_code=None, content_type=None):
182 response_definition = self.response_definition(status_code, content_type)
183 return self.with_definitions(response_definition.get("schema", {}))
184
185 def example_response(self, status_code=None, *args, **kwargs):
186 """
187 Returns example response from spec
188 """
189 # simply use the first/lowest status code, this is probably 200 or 201
190 status_code = status_code or sorted(self._responses.keys())[0]
191 examples_path = [str(status_code), "examples"]
192 schema_example_path = [str(status_code), "schema", "example"]
193 schema_path = [str(status_code), "schema"]
194
195 try:
196 status_code = int(status_code)
197 except ValueError:
198 status_code = 200
199 try:
200 return (
201 list(deep_get(self._responses, examples_path).values())[0],
202 status_code,
203 )
204 except KeyError:
205 pass
206 try:
207 return (deep_get(self._responses, schema_example_path), status_code)
208 except KeyError:
209 pass
210
211 try:
212 schema = deep_get(self._responses, schema_path)
213 except KeyError:
214 return ("No example response or response schema defined.", status_code)
215
216 return (build_example_from_schema(schema), status_code)
217
218 def body_name(self, content_type: str = None) -> str:
219 return self.body_definition(content_type).get("name", "body")
220
221 def body_schema(self, content_type: str = None) -> dict:
222 """
223 The body schema definition for this operation.
224 """
225 body_definition = self.body_definition(content_type)
226 return self.with_definitions(body_definition).get("schema", {})
227
228 def body_definition(self, content_type: str = None) -> dict:
229 """
230 The body complete definition for this operation.
231
232 **There can be one "body" parameter at most.**
233 """
234 return self.request_body
235
236 def _transform_json(self, body_parameter: dict) -> dict:
237 """Translate Swagger2 json parameters into OpenAPI 3 jsonschema spec."""
238 nullable = body_parameter.get("x-nullable")
239 if nullable is not None:
240 body_parameter["schema"]["nullable"] = nullable
241 return body_parameter
242
243 def _transform_form(self, form_parameters: t.List[dict]) -> dict:
244 """Translate Swagger2 form parameters into OpenAPI 3 jsonschema spec."""
245 properties = {}
246 defaults = {}
247 required = []
248 encoding = {}
249
250 for param in form_parameters:
251 prop = {}
252
253 if param["type"] == "file":
254 prop.update(
255 {
256 "type": "string",
257 "format": "binary",
258 }
259 )
260 else:
261 prop["type"] = param["type"]
262
263 format_ = param.get("format")
264 if format_ is not None:
265 prop["format"] = format_
266
267 default = param.get("default")
268 if default is not None:
269 prop["default"] = default
270 defaults[param["name"]] = default
271
272 nullable = param.get("x-nullable")
273 if nullable is not None:
274 prop["nullable"] = nullable
275
276 if param["type"] == "array":
277 prop["items"] = param.get("items", {})
278
279 collection_format = param.get("collectionFormat", "csv")
280 try:
281 encoding[param["name"]] = COLLECTION_FORMAT_MAPPING[
282 collection_format
283 ]
284 except KeyError:
285 raise InvalidSpecification(
286 f"The collection format ({collection_format}) is not supported by "
287 f"Connexion as it cannot be mapped to OpenAPI 3."
288 )
289
290 properties[param["name"]] = prop
291
292 if param.get("required", False):
293 required.append(param["name"])
294
295 definition: t.Dict[str, t.Any] = {
296 "schema": {
297 "type": "object",
298 "properties": properties,
299 "required": required,
300 }
301 }
302 if defaults:
303 definition["schema"]["default"] = defaults
304 if encoding:
305 definition["encoding"] = encoding
306
307 return definition