Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/utils.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2This module provides general utility functions used within Connexion.
3"""
5import asyncio
6import functools
7import importlib
8import inspect
9import os
10import pkgutil
11import sys
12import typing as t
14import yaml
15from starlette.routing import compile_path
17from connexion.exceptions import TypeValidationError
19if t.TYPE_CHECKING:
20 from connexion.middleware.main import API
23def boolean(s):
24 """
25 Convert JSON/Swagger boolean value to Python, raise ValueError otherwise
27 >>> boolean('true')
28 True
30 >>> boolean('false')
31 False
32 """
33 if isinstance(s, bool):
34 return s
35 elif not hasattr(s, "lower"):
36 raise ValueError("Invalid boolean value")
37 elif s.lower() == "true":
38 return True
39 elif s.lower() == "false":
40 return False
41 else:
42 raise ValueError("Invalid boolean value")
45# https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#data-types
46TYPE_MAP: t.Dict[str, t.Any] = {
47 "integer": int,
48 "number": float,
49 "string": str,
50 "boolean": boolean,
51 "array": list,
52 "object": dict,
53 "file": lambda x: x, # Don't cast files
54} # map of swagger types to python types
57def make_type(value: t.Any, type_: str, format_: t.Optional[str]) -> t.Any:
58 """Cast a value to the type defined in the specification."""
59 # In OpenAPI, files are represented with string type and binary format
60 if type_ == "string" and format_ == "binary":
61 type_ = "file"
63 type_func = TYPE_MAP[type_]
64 return type_func(value)
67def deep_merge(a, b):
68 """merges b into a
69 in case of conflict the value from b is used
70 """
71 for key in b:
72 if key in a:
73 if isinstance(a[key], dict) and isinstance(b[key], dict):
74 deep_merge(a[key], b[key])
75 elif a[key] == b[key]:
76 pass
77 else:
78 # b overwrites a
79 a[key] = b[key]
80 else:
81 a[key] = b[key]
82 return a
85def deep_getattr(obj, attr):
86 """
87 Recurses through an attribute chain to get the ultimate value.
88 """
90 attrs = attr.split(".")
92 return functools.reduce(getattr, attrs, obj)
95def deep_get(obj, keys):
96 """
97 Recurses through a nested object get a leaf value.
99 There are cases where the use of inheritance or polymorphism-- the use of allOf or
100 oneOf keywords-- will cause the obj to be a list. In this case the keys will
101 contain one or more strings containing integers.
103 :type obj: list or dict
104 :type keys: list of strings
105 """
106 if not keys:
107 return obj
109 if isinstance(obj, list):
110 return deep_get(obj[int(keys[0])], keys[1:])
111 else:
112 return deep_get(obj[keys[0]], keys[1:])
115def get_function_from_name(function_name):
116 """
117 Tries to get function by fully qualified name (e.g. "mymodule.myobj.myfunc")
119 :type function_name: str
120 """
121 if function_name is None:
122 raise ValueError("Empty function name")
124 if "." in function_name:
125 module_name, attr_path = function_name.rsplit(".", 1)
126 else:
127 module_name = ""
128 attr_path = function_name
130 module = None
131 last_import_error = None
133 while not module:
134 try:
135 module = importlib.import_module(module_name)
136 except ImportError as import_error:
137 last_import_error = import_error
138 if "." in module_name:
139 module_name, attr_path1 = module_name.rsplit(".", 1)
140 attr_path = f"{attr_path1}.{attr_path}"
141 else:
142 raise
143 try:
144 function = deep_getattr(module, attr_path)
145 except AttributeError:
146 if last_import_error:
147 raise last_import_error
148 else:
149 raise
150 return function
153def is_json_mimetype(mimetype):
154 """
155 :type mimetype: str
156 :rtype: bool
157 """
158 if mimetype is None:
159 return False
161 maintype, subtype = mimetype.split("/") # type: str, str
162 if ";" in subtype:
163 subtype, parameter = subtype.split(";", maxsplit=1)
164 return maintype == "application" and (
165 subtype == "json" or subtype.endswith("+json")
166 )
169def all_json(mimetypes):
170 """
171 Returns True if all mimetypes are serialized with json
173 :type mimetypes: list
174 :rtype: bool
176 >>> all_json(['application/json'])
177 True
178 >>> all_json(['application/x.custom+json'])
179 True
180 >>> all_json([])
181 True
182 >>> all_json(['application/xml'])
183 False
184 >>> all_json(['text/json'])
185 False
186 >>> all_json(['application/json', 'other/type'])
187 False
188 >>> all_json(['application/json', 'application/x.custom+json'])
189 True
190 """
191 return all(is_json_mimetype(mimetype) for mimetype in mimetypes)
194def is_nullable(param_def):
195 return param_def.get("schema", param_def).get("nullable", False) or param_def.get(
196 "x-nullable", False
197 ) # swagger2
200def is_null(value):
201 if hasattr(value, "strip") and value.strip() in ["null", "None"]:
202 return True
204 if value is None:
205 return True
207 return False
210def has_coroutine(function, api=None):
211 """
212 Checks if function is a coroutine.
213 If ``function`` is a decorator (has a ``__wrapped__`` attribute)
214 this function will also look at the wrapped function.
215 """
217 def iscorofunc(func):
218 iscorofunc = asyncio.iscoroutinefunction(func)
219 while not iscorofunc and hasattr(func, "__wrapped__"):
220 func = func.__wrapped__
221 iscorofunc = asyncio.iscoroutinefunction(func)
222 return iscorofunc
224 if api is None:
225 return iscorofunc(function)
227 else:
228 return any(iscorofunc(func) for func in (function, api.get_response))
231def yamldumper(openapi):
232 """
233 Returns a nicely-formatted yaml spec.
234 :param openapi: a spec dictionary.
235 :return: a nicely-formatted, serialized yaml spec.
236 """
238 def should_use_block(value):
239 char_list = (
240 "\u000a" # line feed
241 "\u000d" # carriage return
242 "\u001c" # file separator
243 "\u001d" # group separator
244 "\u001e" # record separator
245 "\u0085" # next line
246 "\u2028" # line separator
247 "\u2029" # paragraph separator
248 )
249 for c in char_list:
250 if c in value:
251 return True
252 return False
254 def my_represent_scalar(self, tag, value, style=None):
255 if should_use_block(value):
256 style = "|"
257 else:
258 style = self.default_style
260 node = yaml.representer.ScalarNode(tag, value, style=style)
261 if self.alias_key is not None:
262 self.represented_objects[self.alias_key] = node
263 return node
265 class NoAnchorDumper(yaml.dumper.SafeDumper):
266 """A yaml Dumper that does not replace duplicate entries
267 with yaml anchors.
268 """
270 def ignore_aliases(self, *args):
271 return True
273 # Dump long lines as "|".
274 yaml.representer.SafeRepresenter.represent_scalar = my_represent_scalar
276 return yaml.dump(openapi, allow_unicode=True, Dumper=NoAnchorDumper)
279def not_installed_error(exc, *, msg=None): # pragma: no cover
280 """Raises the ImportError when the module/object is actually called with a custom message."""
282 def _delayed_error(*args, **kwargs):
283 if msg is not None:
284 raise type(exc)(msg).with_traceback(exc.__traceback__)
285 raise exc
287 return _delayed_error
290def extract_content_type(
291 headers: t.Union[t.List[t.Tuple[bytes, bytes]], t.Dict[str, str]]
292) -> t.Optional[str]:
293 """Extract the mime type and encoding from the content type headers.
295 :param headers: Headers from ASGI scope
297 :return: The content type if available in headers, otherwise None
298 """
299 content_type: t.Optional[str] = None
301 header_pairs_type = t.Collection[t.Tuple[t.Union[str, bytes], t.Union[str, bytes]]]
302 header_pairs: header_pairs_type = headers.items() if isinstance(headers, dict) else headers # type: ignore
303 for key, value in header_pairs:
304 # Headers can always be decoded using latin-1:
305 # https://stackoverflow.com/a/27357138/4098821
306 if isinstance(key, bytes):
307 decoded_key: str = key.decode("latin-1")
308 else:
309 decoded_key = key
311 if decoded_key.lower() == "content-type":
312 if isinstance(value, bytes):
313 content_type = value.decode("latin-1")
314 else:
315 content_type = value
316 break
318 return content_type
321def split_content_type(
322 content_type: t.Optional[str],
323) -> t.Tuple[t.Optional[str], t.Optional[str]]:
324 """Split the content type in mime_type and encoding. Other parameters are ignored."""
325 mime_type, encoding = None, None
327 if content_type is None:
328 return mime_type, encoding
330 # Check for parameters
331 if ";" in content_type:
332 mime_type, parameters = content_type.split(";", maxsplit=1)
334 # Find parameter describing the charset
335 prefix = "charset="
336 for parameter in parameters.split(";"):
337 if parameter.startswith(prefix):
338 encoding = parameter[len(prefix) :]
339 else:
340 mime_type = content_type
341 return mime_type, encoding
344def coerce_type(param, value, parameter_type, parameter_name=None):
345 # TODO: clean up
346 TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict}
348 def make_type(value, type_literal):
349 type_func = TYPE_MAP.get(type_literal)
350 return type_func(value)
352 param_schema = param.get("schema", param)
353 if is_nullable(param_schema) and is_null(value):
354 return None
356 param_type = param_schema.get("type")
357 parameter_name = parameter_name if parameter_name else param.get("name")
358 if param_type == "array":
359 converted_params = []
360 if parameter_type == "header":
361 value = value.split(",")
362 for v in value:
363 try:
364 converted = make_type(v, param_schema["items"]["type"])
365 except (ValueError, TypeError):
366 converted = v
367 converted_params.append(converted)
368 return converted_params
369 elif param_type == "object":
370 if param_schema.get("properties"):
372 def cast_leaves(d, schema):
373 if type(d) is not dict:
374 try:
375 return make_type(d, schema["type"])
376 except (ValueError, TypeError):
377 return d
378 for k, v in d.items():
379 if k in schema["properties"]:
380 d[k] = cast_leaves(v, schema["properties"][k])
381 return d
383 return cast_leaves(value, param_schema)
384 return value
385 else:
386 try:
387 return make_type(value, param_type)
388 except ValueError:
389 raise TypeValidationError(param_type, parameter_type, parameter_name)
390 except TypeError:
391 return value
394def get_root_path(import_name: str) -> str:
395 """Copied from Flask:
396 https://github.com/pallets/flask/blob/836866dc19218832cf02f8b04911060ac92bfc0b/src/flask/helpers.py#L595
398 Find the root path of a package, or the path that contains a
399 module. If it cannot be found, returns the current working
400 directory.
401 """
402 # Module already imported and has a file attribute. Use that first.
403 mod = sys.modules.get(import_name)
405 if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None:
406 return os.path.dirname(os.path.abspath(mod.__file__))
408 # Next attempt: check the loader.
409 loader = pkgutil.get_loader(import_name)
411 # Loader does not exist or we're referring to an unloaded main
412 # module or a main module without path (interactive sessions), go
413 # with the current working directory.
414 if loader is None or import_name == "__main__":
415 return os.getcwd()
417 if hasattr(loader, "get_filename"):
418 filepath = loader.get_filename(import_name) # type: ignore
419 else:
420 # Fall back to imports.
421 __import__(import_name)
422 mod = sys.modules[import_name]
423 filepath = getattr(mod, "__file__", None)
425 # If we don't have a file path it might be because it is a
426 # namespace package. In this case pick the root path from the
427 # first module that is contained in the package.
428 if filepath is None:
429 raise RuntimeError(
430 "No root path can be found for the provided module"
431 f" {import_name!r}. This can happen because the module"
432 " came from an import hook that does not provide file"
433 " name information or because it's a namespace package."
434 " In this case the root path needs to be explicitly"
435 " provided."
436 )
438 # filepath is import_name.py for a module, or __init__.py for a package.
439 return os.path.dirname(os.path.abspath(filepath))
442def inspect_function_arguments(function: t.Callable) -> t.Tuple[t.List[str], bool]:
443 """
444 Returns the list of variables names of a function and if it
445 accepts keyword arguments.
446 """
447 parameters = inspect.signature(function).parameters
448 bound_arguments = [
449 name
450 for name, p in parameters.items()
451 if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
452 ]
453 has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values())
454 return list(bound_arguments), has_kwargs
457T = t.TypeVar("T")
460@t.overload
461def sort_routes(routes: t.List[str], *, key: None = None) -> t.List[str]:
462 ...
465@t.overload
466def sort_routes(routes: t.List[T], *, key: t.Callable[[T], str]) -> t.List[T]:
467 ...
470def sort_routes(routes, *, key=None):
471 """Sorts a list of routes from most specific to least specific.
473 See Starlette routing documentation and implementation as this function
474 is aimed to sort according to that logic.
475 - https://www.starlette.io/routing/#route-priority
477 The only difference is that a `path` component is appended to each route
478 such that `/` is less specific than `/basepath` while they are technically
479 not comparable.
480 This is because it is also done by the `Mount` class internally:
481 https://github.com/encode/starlette/blob/1c1043ca0ab7126419948b27f9d0a78270fd74e6/starlette/routing.py#L388
483 For example, from most to least specific:
484 - /users/me
485 - /users/{username}/projects/{project}
486 - /users/{username}
488 :param routes: List of routes to sort
489 :param key: Function to extract the path from a route if it is not a string
491 :return: List of routes sorted from most specific to least specific
492 """
494 class SortableRoute:
495 def __init__(self, path: str) -> None:
496 self.path = path.rstrip("/")
497 if not self.path.endswith("/{path:path}"):
498 self.path += "/{path:path}"
499 self.path_regex, _, _ = compile_path(self.path)
501 def __lt__(self, other: "SortableRoute") -> bool:
502 return bool(other.path_regex.match(self.path))
504 return sorted(routes, key=lambda r: SortableRoute(key(r) if key else r))
507def sort_apis_by_basepath(apis: t.List["API"]) -> t.List["API"]:
508 """Sorts a list of APIs by basepath.
510 :param apis: List of APIs to sort
512 :return: List of APIs sorted by basepath
513 """
514 return sort_routes(apis, key=lambda api: api.base_path or "/")
517def build_example_from_schema(schema):
518 if "example" in schema:
519 return schema["example"]
521 if "properties" in schema:
522 # Recurse if schema is an object
523 return {
524 key: build_example_from_schema(value)
525 for (key, value) in schema["properties"].items()
526 }
528 if "items" in schema:
529 # Recurse if schema is an array
530 min_item_count = schema.get("minItems", 0)
531 max_item_count = schema.get("maxItems")
533 if max_item_count is None or max_item_count >= min_item_count + 1:
534 item_count = min_item_count + 1
535 else:
536 item_count = min_item_count
538 return [build_example_from_schema(schema["items"]) for n in range(item_count)]
540 try:
541 from jsf import JSF
542 except ImportError:
543 return None
545 faker = JSF(schema)
546 return faker.generate()