Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/decorators/parameter.py: 24%
194 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1"""
2This module defines a decorator to convert request parameters to arguments for the view function.
3"""
4import abc
5import asyncio
6import builtins
7import functools
8import inspect
9import keyword
10import logging
11import re
12import typing as t
13from copy import copy, deepcopy
15import inflection
17from connexion.context import context, operation
18from connexion.frameworks.abstract import Framework
19from connexion.http_facts import FORM_CONTENT_TYPES
20from connexion.lifecycle import ASGIRequest, WSGIRequest
21from connexion.operations import AbstractOperation, Swagger2Operation
22from connexion.utils import deep_merge, is_null, is_nullable, make_type
24logger = logging.getLogger(__name__)
26CONTEXT_NAME = "context_"
29class BaseParameterDecorator:
30 def __init__(
31 self,
32 *,
33 framework: t.Type[Framework],
34 pythonic_params: bool = False,
35 ) -> None:
36 self.framework = framework
37 self.sanitize_fn = pythonic if pythonic_params else sanitized
39 def _maybe_get_body(
40 self,
41 request: t.Union[WSGIRequest, ASGIRequest],
42 *,
43 arguments: t.List[str],
44 has_kwargs: bool,
45 ) -> t.Any:
46 body_name = self.sanitize_fn(operation.body_name(request.content_type))
47 # Pass form contents separately for Swagger2 for backward compatibility with
48 # Connexion 2 Checking for body_name is not enough
49 if (body_name in arguments or has_kwargs) or (
50 request.mimetype in FORM_CONTENT_TYPES
51 and isinstance(operation, Swagger2Operation)
52 ):
53 return request.get_body()
54 else:
55 return None
57 @abc.abstractmethod
58 def __call__(self, function: t.Callable) -> t.Callable:
59 raise NotImplementedError
62class SyncParameterDecorator(BaseParameterDecorator):
63 def __call__(self, function: t.Callable) -> t.Callable:
64 unwrapped_function = unwrap_decorators(function)
65 arguments, has_kwargs = inspect_function_arguments(unwrapped_function)
67 @functools.wraps(function)
68 def wrapper(request: WSGIRequest) -> t.Any:
69 request_body = self._maybe_get_body(
70 request, arguments=arguments, has_kwargs=has_kwargs
71 )
73 kwargs = prep_kwargs(
74 request,
75 request_body=request_body,
76 files=request.files(),
77 arguments=arguments,
78 has_kwargs=has_kwargs,
79 sanitize=self.sanitize_fn,
80 )
82 return function(**kwargs)
84 return wrapper
87class AsyncParameterDecorator(BaseParameterDecorator):
88 def __call__(self, function: t.Callable) -> t.Callable:
89 unwrapped_function = unwrap_decorators(function)
90 arguments, has_kwargs = inspect_function_arguments(unwrapped_function)
92 @functools.wraps(function)
93 async def wrapper(request: ASGIRequest) -> t.Any:
94 request_body = self._maybe_get_body(
95 request, arguments=arguments, has_kwargs=has_kwargs
96 )
98 while asyncio.iscoroutine(request_body):
99 request_body = await request_body
101 kwargs = prep_kwargs(
102 request,
103 request_body=request_body,
104 files=await request.files(),
105 arguments=arguments,
106 has_kwargs=has_kwargs,
107 sanitize=self.sanitize_fn,
108 )
110 return await function(**kwargs)
112 return wrapper
115def prep_kwargs(
116 request: t.Union[WSGIRequest, ASGIRequest],
117 *,
118 request_body: t.Any,
119 files: t.Dict[str, t.Any],
120 arguments: t.List[str],
121 has_kwargs: bool,
122 sanitize: t.Callable,
123) -> dict:
124 kwargs = get_arguments(
125 operation,
126 path_params=request.path_params,
127 query_params=request.query_params,
128 body=request_body,
129 files=files,
130 arguments=arguments,
131 has_kwargs=has_kwargs,
132 sanitize=sanitize,
133 content_type=request.mimetype,
134 )
136 # optionally convert parameter variable names to un-shadowed, snake_case form
137 kwargs = {sanitize(k): v for k, v in kwargs.items()}
139 # add context info (e.g. from security decorator)
140 for key, value in context.items():
141 if has_kwargs or key in arguments:
142 kwargs[key] = value
143 else:
144 logger.debug("Context parameter '%s' not in function arguments", key)
145 # attempt to provide the request context to the function
146 if CONTEXT_NAME in arguments:
147 kwargs[CONTEXT_NAME] = context
149 return kwargs
152def unwrap_decorators(function: t.Callable) -> t.Callable:
153 """Unwrap decorators to return the original function."""
154 while hasattr(function, "__wrapped__"):
155 function = function.__wrapped__ # type: ignore
156 return function
159def inspect_function_arguments(function: t.Callable) -> t.Tuple[t.List[str], bool]:
160 """
161 Returns the list of variables names of a function and if it
162 accepts keyword arguments.
163 """
164 parameters = inspect.signature(function).parameters
165 bound_arguments = [
166 name
167 for name, p in parameters.items()
168 if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
169 ]
170 has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values())
171 return list(bound_arguments), has_kwargs
174def snake_and_shadow(name: str) -> str:
175 """
176 Converts the given name into Pythonic form. Firstly it converts CamelCase names to snake_case. Secondly it looks to
177 see if the name matches a known built-in and if it does it appends an underscore to the name.
178 :param name: The parameter name
179 """
180 snake = inflection.underscore(name)
181 if snake in builtins.__dict__ or keyword.iskeyword(snake):
182 return f"{snake}_"
183 return snake
186def sanitized(name: str) -> str:
187 return name and re.sub(
188 "^[^a-zA-Z_]+", "", re.sub("[^0-9a-zA-Z_]", "", re.sub(r"\[(?!])", "_", name))
189 )
192def pythonic(name: str) -> str:
193 name = name and snake_and_shadow(name)
194 return sanitized(name)
197def get_arguments(
198 operation: AbstractOperation,
199 *,
200 path_params: dict,
201 query_params: dict,
202 body: t.Any,
203 files: dict,
204 arguments: t.List[str],
205 has_kwargs: bool,
206 sanitize: t.Callable,
207 content_type: str,
208) -> t.Dict[str, t.Any]:
209 """
210 get arguments for handler function
211 """
212 ret = {}
213 ret.update(_get_path_arguments(path_params, operation=operation, sanitize=sanitize))
214 ret.update(
215 _get_query_arguments(
216 query_params,
217 operation=operation,
218 arguments=arguments,
219 has_kwargs=has_kwargs,
220 sanitize=sanitize,
221 )
222 )
224 if operation.method.upper() in ["PATCH", "POST", "PUT"]:
225 ret.update(
226 _get_body_argument(
227 body,
228 operation=operation,
229 arguments=arguments,
230 has_kwargs=has_kwargs,
231 sanitize=sanitize,
232 content_type=content_type,
233 )
234 )
235 ret.update(_get_file_arguments(files, arguments, has_kwargs))
236 return ret
239def _get_path_arguments(
240 path_params: dict, *, operation: AbstractOperation, sanitize: t.Callable
241) -> dict:
242 """
243 Extract handler function arguments from path parameters
244 """
245 kwargs = {}
247 path_definitions = {
248 parameter["name"]: parameter
249 for parameter in operation.parameters
250 if parameter["in"] == "path"
251 }
253 for name, value in path_params.items():
254 sanitized_key = sanitize(name)
255 if name in path_definitions:
256 kwargs[sanitized_key] = _get_val_from_param(value, path_definitions[name])
257 else: # Assume path params mechanism used for injection
258 kwargs[sanitized_key] = value
259 return kwargs
262def _get_val_from_param(value: t.Any, param_definitions: t.Dict[str, dict]) -> t.Any:
263 """Cast a value according to its definition in the specification."""
264 param_schema = param_definitions.get("schema", param_definitions)
266 if is_nullable(param_schema) and is_null(value):
267 return None
269 if param_schema["type"] == "array":
270 type_ = param_schema["items"]["type"]
271 format_ = param_schema["items"].get("format")
272 return [make_type(part, type_, format_) for part in value]
273 else:
274 type_ = param_schema["type"]
275 format_ = param_schema.get("format")
276 return make_type(value, type_, format_)
279def _get_query_arguments(
280 query_params: dict,
281 *,
282 operation: AbstractOperation,
283 arguments: t.List[str],
284 has_kwargs: bool,
285 sanitize: t.Callable,
286) -> dict:
287 """
288 extract handler function arguments from the query parameters
289 """
290 query_definitions = {
291 parameter["name"]: parameter
292 for parameter in operation.parameters
293 if parameter["in"] == "query"
294 }
296 default_query_params = _get_query_defaults(query_definitions)
298 query_arguments = deepcopy(default_query_params)
299 query_arguments = deep_merge(query_arguments, query_params)
300 return _query_args_helper(
301 query_definitions, query_arguments, arguments, has_kwargs, sanitize
302 )
305def _get_query_defaults(query_definitions: t.Dict[str, dict]) -> t.Dict[str, t.Any]:
306 """Get the default values for the query parameter from the parameter definition."""
307 defaults = {}
308 for k, v in query_definitions.items():
309 try:
310 if "default" in v:
311 defaults[k] = v["default"]
312 elif v["schema"]["type"] == "object":
313 defaults[k] = _get_default_obj(v["schema"])
314 else:
315 defaults[k] = v["schema"]["default"]
316 except KeyError:
317 pass
318 return defaults
321def _get_default_obj(schema: dict) -> dict:
322 try:
323 return deepcopy(schema["default"])
324 except KeyError:
325 properties = schema.get("properties", {})
326 return _build_default_obj_recursive(properties, {})
329def _build_default_obj_recursive(properties: dict, default_object: dict) -> dict:
330 """takes disparate and nested default keys, and builds up a default object"""
331 for name, property_ in properties.items():
332 if "default" in property_ and name not in default_object:
333 default_object[name] = copy(property_["default"])
334 elif property_.get("type") == "object" and "properties" in property_:
335 default_object.setdefault(name, {})
336 default_object[name] = _build_default_obj_recursive(
337 property_["properties"], default_object[name]
338 )
339 return default_object
342def _query_args_helper(
343 query_definitions: dict,
344 query_arguments: dict,
345 function_arguments: t.List[str],
346 has_kwargs: bool,
347 sanitize: t.Callable,
348) -> dict:
349 result = {}
350 for key, value in query_arguments.items():
351 sanitized_key = sanitize(key)
352 if not has_kwargs and sanitized_key not in function_arguments:
353 logger.debug(
354 "Query Parameter '%s' (sanitized: '%s') not in function arguments",
355 key,
356 sanitized_key,
357 )
358 else:
359 logger.debug(
360 "Query Parameter '%s' (sanitized: '%s') in function arguments",
361 key,
362 sanitized_key,
363 )
364 try:
365 query_defn = query_definitions[key]
366 except KeyError: # pragma: no cover
367 logger.error(
368 "Function argument '%s' (non-sanitized: %s) not defined in specification",
369 sanitized_key,
370 key,
371 )
372 else:
373 logger.debug("%s is a %s", key, query_defn)
374 result.update({sanitized_key: _get_val_from_param(value, query_defn)})
375 return result
378def _get_body_argument(
379 body: t.Any,
380 *,
381 operation: AbstractOperation,
382 arguments: t.List[str],
383 has_kwargs: bool,
384 sanitize: t.Callable,
385 content_type: str,
386) -> dict:
387 if len(arguments) <= 0 and not has_kwargs:
388 return {}
390 body_name = sanitize(operation.body_name(content_type))
392 if content_type in FORM_CONTENT_TYPES:
393 result = _get_body_argument_form(
394 body, operation=operation, content_type=content_type
395 )
397 # Unpack form values for Swagger for compatibility with Connexion 2 behavior
398 if content_type in FORM_CONTENT_TYPES and isinstance(
399 operation, Swagger2Operation
400 ):
401 if has_kwargs:
402 return result
403 else:
404 return {
405 sanitize(name): value
406 for name, value in result.items()
407 if sanitize(name) in arguments
408 }
409 else:
410 result = _get_body_argument_json(
411 body, operation=operation, content_type=content_type
412 )
414 if body_name in arguments or has_kwargs:
415 return {body_name: result}
417 return {}
420def _get_body_argument_json(
421 body: t.Any, *, operation: AbstractOperation, content_type: str
422) -> t.Any:
423 # if the body came in null, and the schema says it can be null, we decide
424 # to include no value for the body argument, rather than the default body
425 if is_nullable(operation.body_schema(content_type)) and is_null(body):
426 return None
428 if body is None:
429 default_body = operation.body_schema(content_type).get("default", {})
430 return deepcopy(default_body)
432 return body
435def _get_body_argument_form(
436 body: dict, *, operation: AbstractOperation, content_type: str
437) -> dict:
438 # now determine the actual value for the body (whether it came in or is default)
439 default_body = operation.body_schema(content_type).get("default", {})
440 body_props = {
441 k: {"schema": v}
442 for k, v in operation.body_schema(content_type).get("properties", {}).items()
443 }
445 # by OpenAPI specification `additionalProperties` defaults to `true`
446 # see: https://github.com/OAI/OpenAPI-Specification/blame/3.0.2/versions/3.0.2.md#L2305
447 additional_props = operation.body_schema().get("additionalProperties", True)
449 body_arg = deepcopy(default_body)
450 body_arg.update(body or {})
452 if body_props or additional_props:
453 return _get_typed_body_values(body_arg, body_props, additional_props)
455 return {}
458def _get_typed_body_values(body_arg, body_props, additional_props):
459 """
460 Return a copy of the provided body_arg dictionary
461 whose values will have the appropriate types
462 as defined in the provided schemas.
464 :type body_arg: type dict
465 :type body_props: dict
466 :type additional_props: dict|bool
467 :rtype: dict
468 """
469 additional_props_defn = (
470 {"schema": additional_props} if isinstance(additional_props, dict) else None
471 )
472 res = {}
474 for key, value in body_arg.items():
475 try:
476 prop_defn = body_props[key]
477 res[key] = _get_val_from_param(value, prop_defn)
478 except KeyError: # pragma: no cover
479 if not additional_props:
480 logger.error(f"Body property '{key}' not defined in body schema")
481 continue
482 if additional_props_defn is not None:
483 value = _get_val_from_param(value, additional_props_defn)
484 res[key] = value
486 return res
489def _get_file_arguments(files, arguments, has_kwargs=False):
490 return {k: v for k, v in files.items() if k in arguments or has_kwargs}