Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/utils.py: 22%
187 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1"""
2This module provides general utility functions used within Connexion.
3"""
5import asyncio
6import functools
7import importlib
8import os
9import pkgutil
10import sys
11import typing as t
13import yaml
15from connexion.exceptions import TypeValidationError
18def boolean(s):
19 """
20 Convert JSON/Swagger boolean value to Python, raise ValueError otherwise
22 >>> boolean('true')
23 True
25 >>> boolean('false')
26 False
27 """
28 if isinstance(s, bool):
29 return s
30 elif not hasattr(s, "lower"):
31 raise ValueError("Invalid boolean value")
32 elif s.lower() == "true":
33 return True
34 elif s.lower() == "false":
35 return False
36 else:
37 raise ValueError("Invalid boolean value")
40# https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#data-types
41TYPE_MAP: t.Dict[str, t.Any] = {
42 "integer": int,
43 "number": float,
44 "string": str,
45 "boolean": boolean,
46 "array": list,
47 "object": dict,
48 "file": lambda x: x, # Don't cast files
49} # map of swagger types to python types
52def make_type(value: t.Any, type_: str, format_: t.Optional[str]) -> t.Any:
53 """Cast a value to the type defined in the specification."""
54 # In OpenAPI, files are represented with string type and binary format
55 if type_ == "string" and format_ == "binary":
56 type_ = "file"
58 type_func = TYPE_MAP[type_]
59 return type_func(value)
62def deep_merge(a, b):
63 """merges b into a
64 in case of conflict the value from b is used
65 """
66 for key in b:
67 if key in a:
68 if isinstance(a[key], dict) and isinstance(b[key], dict):
69 deep_merge(a[key], b[key])
70 elif a[key] == b[key]:
71 pass
72 else:
73 # b overwrites a
74 a[key] = b[key]
75 else:
76 a[key] = b[key]
77 return a
80def deep_getattr(obj, attr):
81 """
82 Recurses through an attribute chain to get the ultimate value.
83 """
85 attrs = attr.split(".")
87 return functools.reduce(getattr, attrs, obj)
90def deep_get(obj, keys):
91 """
92 Recurses through a nested object get a leaf value.
94 There are cases where the use of inheritance or polymorphism-- the use of allOf or
95 oneOf keywords-- will cause the obj to be a list. In this case the keys will
96 contain one or more strings containing integers.
98 :type obj: list or dict
99 :type keys: list of strings
100 """
101 if not keys:
102 return obj
104 if isinstance(obj, list):
105 return deep_get(obj[int(keys[0])], keys[1:])
106 else:
107 return deep_get(obj[keys[0]], keys[1:])
110def get_function_from_name(function_name):
111 """
112 Tries to get function by fully qualified name (e.g. "mymodule.myobj.myfunc")
114 :type function_name: str
115 """
116 if function_name is None:
117 raise ValueError("Empty function name")
119 if "." in function_name:
120 module_name, attr_path = function_name.rsplit(".", 1)
121 else:
122 module_name = ""
123 attr_path = function_name
125 module = None
126 last_import_error = None
128 while not module:
129 try:
130 module = importlib.import_module(module_name)
131 except ImportError as import_error:
132 last_import_error = import_error
133 if "." in module_name:
134 module_name, attr_path1 = module_name.rsplit(".", 1)
135 attr_path = f"{attr_path1}.{attr_path}"
136 else:
137 raise
138 try:
139 function = deep_getattr(module, attr_path)
140 except AttributeError:
141 if last_import_error:
142 raise last_import_error
143 else:
144 raise
145 return function
148def is_json_mimetype(mimetype):
149 """
150 :type mimetype: str
151 :rtype: bool
152 """
153 if mimetype is None:
154 return False
156 maintype, subtype = mimetype.split("/") # type: str, str
157 if ";" in subtype:
158 subtype, parameter = subtype.split(";", maxsplit=1)
159 return maintype == "application" and (
160 subtype == "json" or subtype.endswith("+json")
161 )
164def all_json(mimetypes):
165 """
166 Returns True if all mimetypes are serialized with json
168 :type mimetypes: list
169 :rtype: bool
171 >>> all_json(['application/json'])
172 True
173 >>> all_json(['application/x.custom+json'])
174 True
175 >>> all_json([])
176 True
177 >>> all_json(['application/xml'])
178 False
179 >>> all_json(['text/json'])
180 False
181 >>> all_json(['application/json', 'other/type'])
182 False
183 >>> all_json(['application/json', 'application/x.custom+json'])
184 True
185 """
186 return all(is_json_mimetype(mimetype) for mimetype in mimetypes)
189def is_nullable(param_def):
190 return param_def.get("schema", param_def).get("nullable", False) or param_def.get(
191 "x-nullable", False
192 ) # swagger2
195def is_null(value):
196 if hasattr(value, "strip") and value.strip() in ["null", "None"]:
197 return True
199 if value is None:
200 return True
202 return False
205def has_coroutine(function, api=None):
206 """
207 Checks if function is a coroutine.
208 If ``function`` is a decorator (has a ``__wrapped__`` attribute)
209 this function will also look at the wrapped function.
210 """
212 def iscorofunc(func):
213 iscorofunc = asyncio.iscoroutinefunction(func)
214 while not iscorofunc and hasattr(func, "__wrapped__"):
215 func = func.__wrapped__
216 iscorofunc = asyncio.iscoroutinefunction(func)
217 return iscorofunc
219 if api is None:
220 return iscorofunc(function)
222 else:
223 return any(iscorofunc(func) for func in (function, api.get_response))
226def yamldumper(openapi):
227 """
228 Returns a nicely-formatted yaml spec.
229 :param openapi: a spec dictionary.
230 :return: a nicely-formatted, serialized yaml spec.
231 """
233 def should_use_block(value):
234 char_list = (
235 "\u000a" # line feed
236 "\u000d" # carriage return
237 "\u001c" # file separator
238 "\u001d" # group separator
239 "\u001e" # record separator
240 "\u0085" # next line
241 "\u2028" # line separator
242 "\u2029" # paragraph separator
243 )
244 for c in char_list:
245 if c in value:
246 return True
247 return False
249 def my_represent_scalar(self, tag, value, style=None):
250 if should_use_block(value):
251 style = "|"
252 else:
253 style = self.default_style
255 node = yaml.representer.ScalarNode(tag, value, style=style)
256 if self.alias_key is not None:
257 self.represented_objects[self.alias_key] = node
258 return node
260 class NoAnchorDumper(yaml.dumper.SafeDumper):
261 """A yaml Dumper that does not replace duplicate entries
262 with yaml anchors.
263 """
265 def ignore_aliases(self, *args):
266 return True
268 # Dump long lines as "|".
269 yaml.representer.SafeRepresenter.represent_scalar = my_represent_scalar
271 return yaml.dump(openapi, allow_unicode=True, Dumper=NoAnchorDumper)
274def not_installed_error(exc, *, msg=None): # pragma: no cover
275 """Raises the ImportError when the module/object is actually called with a custom message."""
277 def _delayed_error(*args, **kwargs):
278 if msg is not None:
279 raise type(exc)(msg).with_traceback(exc.__traceback__)
280 raise exc
282 return _delayed_error
285def extract_content_type(
286 headers: t.List[t.Tuple[bytes, bytes]]
287) -> t.Tuple[t.Optional[str], t.Optional[str]]:
288 """Extract the mime type and encoding from the content type headers.
290 :param headers: Headers from ASGI scope
292 :return: A tuple of mime type, encoding
293 """
294 mime_type, encoding = None, None
295 for key, value in headers:
296 # Headers can always be decoded using latin-1:
297 # https://stackoverflow.com/a/27357138/4098821
298 decoded_key = key.decode("latin-1")
299 if decoded_key.lower() == "content-type":
300 content_type = value.decode("latin-1")
301 if ";" in content_type:
302 mime_type, parameters = content_type.split(";", maxsplit=1)
304 prefix = "charset="
305 for parameter in parameters.split(";"):
306 if parameter.startswith(prefix):
307 encoding = parameter[len(prefix) :]
308 else:
309 mime_type = content_type
310 break
311 return mime_type, encoding
314def coerce_type(param, value, parameter_type, parameter_name=None):
315 # TODO: clean up
316 TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict}
318 def make_type(value, type_literal):
319 type_func = TYPE_MAP.get(type_literal)
320 return type_func(value)
322 param_schema = param.get("schema", param)
323 if is_nullable(param_schema) and is_null(value):
324 return None
326 param_type = param_schema.get("type")
327 parameter_name = parameter_name if parameter_name else param.get("name")
328 if param_type == "array":
329 converted_params = []
330 if parameter_type == "header":
331 value = value.split(",")
332 for v in value:
333 try:
334 converted = make_type(v, param_schema["items"]["type"])
335 except (ValueError, TypeError):
336 converted = v
337 converted_params.append(converted)
338 return converted_params
339 elif param_type == "object":
340 if param_schema.get("properties"):
342 def cast_leaves(d, schema):
343 if type(d) is not dict:
344 try:
345 return make_type(d, schema["type"])
346 except (ValueError, TypeError):
347 return d
348 for k, v in d.items():
349 if k in schema["properties"]:
350 d[k] = cast_leaves(v, schema["properties"][k])
351 return d
353 return cast_leaves(value, param_schema)
354 return value
355 else:
356 try:
357 return make_type(value, param_type)
358 except ValueError:
359 raise TypeValidationError(param_type, parameter_type, parameter_name)
360 except TypeError:
361 return value
364def get_root_path(import_name: str) -> str:
365 """Copied from Flask:
366 https://github.com/pallets/flask/blob/836866dc19218832cf02f8b04911060ac92bfc0b/src/flask/helpers.py#L595
368 Find the root path of a package, or the path that contains a
369 module. If it cannot be found, returns the current working
370 directory.
371 """
372 # Module already imported and has a file attribute. Use that first.
373 mod = sys.modules.get(import_name)
375 if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None:
376 return os.path.dirname(os.path.abspath(mod.__file__))
378 # Next attempt: check the loader.
379 loader = pkgutil.get_loader(import_name)
381 # Loader does not exist or we're referring to an unloaded main
382 # module or a main module without path (interactive sessions), go
383 # with the current working directory.
384 if loader is None or import_name == "__main__":
385 return os.getcwd()
387 if hasattr(loader, "get_filename"):
388 filepath = loader.get_filename(import_name) # type: ignore
389 else:
390 # Fall back to imports.
391 __import__(import_name)
392 mod = sys.modules[import_name]
393 filepath = getattr(mod, "__file__", None)
395 # If we don't have a file path it might be because it is a
396 # namespace package. In this case pick the root path from the
397 # first module that is contained in the package.
398 if filepath is None:
399 raise RuntimeError(
400 "No root path can be found for the provided module"
401 f" {import_name!r}. This can happen because the module"
402 " came from an import hook that does not provide file"
403 " name information or because it's a namespace package."
404 " In this case the root path needs to be explicitly"
405 " provided."
406 )
408 # filepath is import_name.py for a module, or __init__.py for a package.
409 return os.path.dirname(os.path.abspath(filepath))