Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/utils.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

244 statements  

1""" 

2This module provides general utility functions used within Connexion. 

3""" 

4 

5import asyncio 

6import functools 

7import importlib 

8import inspect 

9import os 

10import pkgutil 

11import sys 

12import typing as t 

13 

14import yaml 

15from starlette.routing import compile_path 

16 

17from connexion.exceptions import TypeValidationError 

18 

19if t.TYPE_CHECKING: 

20 from connexion.middleware.main import API 

21 

22 

23def boolean(s): 

24 """ 

25 Convert JSON/Swagger boolean value to Python, raise ValueError otherwise 

26 

27 >>> boolean('true') 

28 True 

29 

30 >>> boolean('false') 

31 False 

32 """ 

33 if isinstance(s, bool): 

34 return s 

35 elif not hasattr(s, "lower"): 

36 raise ValueError("Invalid boolean value") 

37 elif s.lower() == "true": 

38 return True 

39 elif s.lower() == "false": 

40 return False 

41 else: 

42 raise ValueError("Invalid boolean value") 

43 

44 

45# https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#data-types 

46TYPE_MAP: t.Dict[str, t.Any] = { 

47 "integer": int, 

48 "number": float, 

49 "string": str, 

50 "boolean": boolean, 

51 "array": list, 

52 "object": dict, 

53 "file": lambda x: x, # Don't cast files 

54} # map of swagger types to python types 

55 

56 

57def make_type(value: t.Any, type_: str, format_: t.Optional[str]) -> t.Any: 

58 """Cast a value to the type defined in the specification.""" 

59 # In OpenAPI, files are represented with string type and binary format 

60 if type_ == "string" and format_ == "binary": 

61 type_ = "file" 

62 

63 type_func = TYPE_MAP[type_] 

64 return type_func(value) 

65 

66 

67def deep_merge(a, b): 

68 """merges b into a 

69 in case of conflict the value from b is used 

70 """ 

71 for key in b: 

72 if key in a: 

73 if isinstance(a[key], dict) and isinstance(b[key], dict): 

74 deep_merge(a[key], b[key]) 

75 elif a[key] == b[key]: 

76 pass 

77 else: 

78 # b overwrites a 

79 a[key] = b[key] 

80 else: 

81 a[key] = b[key] 

82 return a 

83 

84 

85def deep_getattr(obj, attr): 

86 """ 

87 Recurses through an attribute chain to get the ultimate value. 

88 """ 

89 

90 attrs = attr.split(".") 

91 

92 return functools.reduce(getattr, attrs, obj) 

93 

94 

95def deep_get(obj, keys): 

96 """ 

97 Recurses through a nested object get a leaf value. 

98 

99 There are cases where the use of inheritance or polymorphism-- the use of allOf or 

100 oneOf keywords-- will cause the obj to be a list. In this case the keys will 

101 contain one or more strings containing integers. 

102 

103 :type obj: list or dict 

104 :type keys: list of strings 

105 """ 

106 if not keys: 

107 return obj 

108 

109 if isinstance(obj, list): 

110 return deep_get(obj[int(keys[0])], keys[1:]) 

111 else: 

112 return deep_get(obj[keys[0]], keys[1:]) 

113 

114 

115def get_function_from_name(function_name): 

116 """ 

117 Tries to get function by fully qualified name (e.g. "mymodule.myobj.myfunc") 

118 

119 :type function_name: str 

120 """ 

121 if function_name is None: 

122 raise ValueError("Empty function name") 

123 

124 if "." in function_name: 

125 module_name, attr_path = function_name.rsplit(".", 1) 

126 else: 

127 module_name = "" 

128 attr_path = function_name 

129 

130 module = None 

131 last_import_error = None 

132 

133 while not module: 

134 try: 

135 module = importlib.import_module(module_name) 

136 except ImportError as import_error: 

137 last_import_error = import_error 

138 if "." in module_name: 

139 module_name, attr_path1 = module_name.rsplit(".", 1) 

140 attr_path = f"{attr_path1}.{attr_path}" 

141 else: 

142 raise 

143 try: 

144 function = deep_getattr(module, attr_path) 

145 except AttributeError: 

146 if last_import_error: 

147 raise last_import_error 

148 else: 

149 raise 

150 return function 

151 

152 

153def is_json_mimetype(mimetype): 

154 """ 

155 :type mimetype: str 

156 :rtype: bool 

157 """ 

158 if mimetype is None: 

159 return False 

160 

161 maintype, subtype = mimetype.split("/") # type: str, str 

162 if ";" in subtype: 

163 subtype, parameter = subtype.split(";", maxsplit=1) 

164 return maintype == "application" and ( 

165 subtype == "json" or subtype.endswith("+json") 

166 ) 

167 

168 

169def all_json(mimetypes): 

170 """ 

171 Returns True if all mimetypes are serialized with json 

172 

173 :type mimetypes: list 

174 :rtype: bool 

175 

176 >>> all_json(['application/json']) 

177 True 

178 >>> all_json(['application/x.custom+json']) 

179 True 

180 >>> all_json([]) 

181 True 

182 >>> all_json(['application/xml']) 

183 False 

184 >>> all_json(['text/json']) 

185 False 

186 >>> all_json(['application/json', 'other/type']) 

187 False 

188 >>> all_json(['application/json', 'application/x.custom+json']) 

189 True 

190 """ 

191 return all(is_json_mimetype(mimetype) for mimetype in mimetypes) 

192 

193 

194def is_nullable(param_def): 

195 return param_def.get("schema", param_def).get("nullable", False) or param_def.get( 

196 "x-nullable", False 

197 ) # swagger2 

198 

199 

200def is_null(value): 

201 if hasattr(value, "strip") and value.strip() in ["null", "None"]: 

202 return True 

203 

204 if value is None: 

205 return True 

206 

207 return False 

208 

209 

210def has_coroutine(function, api=None): 

211 """ 

212 Checks if function is a coroutine. 

213 If ``function`` is a decorator (has a ``__wrapped__`` attribute) 

214 this function will also look at the wrapped function. 

215 """ 

216 

217 def iscorofunc(func): 

218 iscorofunc = asyncio.iscoroutinefunction(func) 

219 while not iscorofunc and hasattr(func, "__wrapped__"): 

220 func = func.__wrapped__ 

221 iscorofunc = asyncio.iscoroutinefunction(func) 

222 return iscorofunc 

223 

224 if api is None: 

225 return iscorofunc(function) 

226 

227 else: 

228 return any(iscorofunc(func) for func in (function, api.get_response)) 

229 

230 

231def yamldumper(openapi): 

232 """ 

233 Returns a nicely-formatted yaml spec. 

234 :param openapi: a spec dictionary. 

235 :return: a nicely-formatted, serialized yaml spec. 

236 """ 

237 

238 def should_use_block(value): 

239 char_list = ( 

240 "\u000a" # line feed 

241 "\u000d" # carriage return 

242 "\u001c" # file separator 

243 "\u001d" # group separator 

244 "\u001e" # record separator 

245 "\u0085" # next line 

246 "\u2028" # line separator 

247 "\u2029" # paragraph separator 

248 ) 

249 for c in char_list: 

250 if c in value: 

251 return True 

252 return False 

253 

254 def my_represent_scalar(self, tag, value, style=None): 

255 if should_use_block(value): 

256 style = "|" 

257 else: 

258 style = self.default_style 

259 

260 node = yaml.representer.ScalarNode(tag, value, style=style) 

261 if self.alias_key is not None: 

262 self.represented_objects[self.alias_key] = node 

263 return node 

264 

265 class NoAnchorDumper(yaml.dumper.SafeDumper): 

266 """A yaml Dumper that does not replace duplicate entries 

267 with yaml anchors. 

268 """ 

269 

270 def ignore_aliases(self, *args): 

271 return True 

272 

273 # Dump long lines as "|". 

274 yaml.representer.SafeRepresenter.represent_scalar = my_represent_scalar 

275 

276 return yaml.dump(openapi, allow_unicode=True, Dumper=NoAnchorDumper) 

277 

278 

279def not_installed_error(exc, *, msg=None): # pragma: no cover 

280 """Raises the ImportError when the module/object is actually called with a custom message.""" 

281 

282 def _delayed_error(*args, **kwargs): 

283 if msg is not None: 

284 raise type(exc)(msg).with_traceback(exc.__traceback__) 

285 raise exc 

286 

287 return _delayed_error 

288 

289 

290def extract_content_type( 

291 headers: t.Union[t.List[t.Tuple[bytes, bytes]], t.Dict[str, str]] 

292) -> t.Optional[str]: 

293 """Extract the mime type and encoding from the content type headers. 

294 

295 :param headers: Headers from ASGI scope 

296 

297 :return: The content type if available in headers, otherwise None 

298 """ 

299 content_type: t.Optional[str] = None 

300 

301 header_pairs_type = t.Collection[t.Tuple[t.Union[str, bytes], t.Union[str, bytes]]] 

302 header_pairs: header_pairs_type = headers.items() if isinstance(headers, dict) else headers # type: ignore 

303 for key, value in header_pairs: 

304 # Headers can always be decoded using latin-1: 

305 # https://stackoverflow.com/a/27357138/4098821 

306 if isinstance(key, bytes): 

307 decoded_key: str = key.decode("latin-1") 

308 else: 

309 decoded_key = key 

310 

311 if decoded_key.lower() == "content-type": 

312 if isinstance(value, bytes): 

313 content_type = value.decode("latin-1") 

314 else: 

315 content_type = value 

316 break 

317 

318 return content_type 

319 

320 

321def split_content_type( 

322 content_type: t.Optional[str], 

323) -> t.Tuple[t.Optional[str], t.Optional[str]]: 

324 """Split the content type in mime_type and encoding. Other parameters are ignored.""" 

325 mime_type, encoding = None, None 

326 

327 if content_type is None: 

328 return mime_type, encoding 

329 

330 # Check for parameters 

331 if ";" in content_type: 

332 mime_type, parameters = content_type.split(";", maxsplit=1) 

333 

334 # Find parameter describing the charset 

335 prefix = "charset=" 

336 for parameter in parameters.split(";"): 

337 if parameter.startswith(prefix): 

338 encoding = parameter[len(prefix) :] 

339 else: 

340 mime_type = content_type 

341 return mime_type, encoding 

342 

343 

344def coerce_type(param, value, parameter_type, parameter_name=None): 

345 # TODO: clean up 

346 TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict} 

347 

348 def make_type(value, type_literal): 

349 type_func = TYPE_MAP.get(type_literal) 

350 return type_func(value) 

351 

352 param_schema = param.get("schema", param) 

353 if is_nullable(param_schema) and is_null(value): 

354 return None 

355 

356 param_type = param_schema.get("type") 

357 parameter_name = parameter_name if parameter_name else param.get("name") 

358 if param_type == "array": 

359 converted_params = [] 

360 if parameter_type == "header": 

361 value = value.split(",") 

362 for v in value: 

363 try: 

364 converted = make_type(v, param_schema["items"]["type"]) 

365 except (ValueError, TypeError): 

366 converted = v 

367 converted_params.append(converted) 

368 return converted_params 

369 elif param_type == "object": 

370 if param_schema.get("properties"): 

371 

372 def cast_leaves(d, schema): 

373 if type(d) is not dict: 

374 try: 

375 return make_type(d, schema["type"]) 

376 except (ValueError, TypeError): 

377 return d 

378 for k, v in d.items(): 

379 if k in schema["properties"]: 

380 d[k] = cast_leaves(v, schema["properties"][k]) 

381 return d 

382 

383 return cast_leaves(value, param_schema) 

384 return value 

385 else: 

386 try: 

387 return make_type(value, param_type) 

388 except ValueError: 

389 raise TypeValidationError(param_type, parameter_type, parameter_name) 

390 except TypeError: 

391 return value 

392 

393 

394def get_root_path(import_name: str) -> str: 

395 """Copied from Flask: 

396 https://github.com/pallets/flask/blob/836866dc19218832cf02f8b04911060ac92bfc0b/src/flask/helpers.py#L595 

397 

398 Find the root path of a package, or the path that contains a 

399 module. If it cannot be found, returns the current working 

400 directory. 

401 """ 

402 # Module already imported and has a file attribute. Use that first. 

403 mod = sys.modules.get(import_name) 

404 

405 if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None: 

406 return os.path.dirname(os.path.abspath(mod.__file__)) 

407 

408 # Next attempt: check the loader. 

409 loader = pkgutil.get_loader(import_name) 

410 

411 # Loader does not exist or we're referring to an unloaded main 

412 # module or a main module without path (interactive sessions), go 

413 # with the current working directory. 

414 if loader is None or import_name == "__main__": 

415 return os.getcwd() 

416 

417 if hasattr(loader, "get_filename"): 

418 filepath = loader.get_filename(import_name) # type: ignore 

419 else: 

420 # Fall back to imports. 

421 __import__(import_name) 

422 mod = sys.modules[import_name] 

423 filepath = getattr(mod, "__file__", None) 

424 

425 # If we don't have a file path it might be because it is a 

426 # namespace package. In this case pick the root path from the 

427 # first module that is contained in the package. 

428 if filepath is None: 

429 raise RuntimeError( 

430 "No root path can be found for the provided module" 

431 f" {import_name!r}. This can happen because the module" 

432 " came from an import hook that does not provide file" 

433 " name information or because it's a namespace package." 

434 " In this case the root path needs to be explicitly" 

435 " provided." 

436 ) 

437 

438 # filepath is import_name.py for a module, or __init__.py for a package. 

439 return os.path.dirname(os.path.abspath(filepath)) 

440 

441 

442def inspect_function_arguments(function: t.Callable) -> t.Tuple[t.List[str], bool]: 

443 """ 

444 Returns the list of variables names of a function and if it 

445 accepts keyword arguments. 

446 """ 

447 parameters = inspect.signature(function).parameters 

448 bound_arguments = [ 

449 name 

450 for name, p in parameters.items() 

451 if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD) 

452 ] 

453 has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values()) 

454 return list(bound_arguments), has_kwargs 

455 

456 

457T = t.TypeVar("T") 

458 

459 

460@t.overload 

461def sort_routes(routes: t.List[str], *, key: None = None) -> t.List[str]: 

462 ... 

463 

464 

465@t.overload 

466def sort_routes(routes: t.List[T], *, key: t.Callable[[T], str]) -> t.List[T]: 

467 ... 

468 

469 

470def sort_routes(routes, *, key=None): 

471 """Sorts a list of routes from most specific to least specific. 

472 

473 See Starlette routing documentation and implementation as this function 

474 is aimed to sort according to that logic. 

475 - https://www.starlette.io/routing/#route-priority 

476 

477 The only difference is that a `path` component is appended to each route 

478 such that `/` is less specific than `/basepath` while they are technically 

479 not comparable. 

480 This is because it is also done by the `Mount` class internally: 

481 https://github.com/encode/starlette/blob/1c1043ca0ab7126419948b27f9d0a78270fd74e6/starlette/routing.py#L388 

482 

483 For example, from most to least specific: 

484 - /users/me 

485 - /users/{username}/projects/{project} 

486 - /users/{username} 

487 

488 :param routes: List of routes to sort 

489 :param key: Function to extract the path from a route if it is not a string 

490 

491 :return: List of routes sorted from most specific to least specific 

492 """ 

493 

494 class SortableRoute: 

495 def __init__(self, path: str) -> None: 

496 self.path = path.rstrip("/") 

497 if not self.path.endswith("/{path:path}"): 

498 self.path += "/{path:path}" 

499 self.path_regex, _, _ = compile_path(self.path) 

500 

501 def __lt__(self, other: "SortableRoute") -> bool: 

502 return bool(other.path_regex.match(self.path)) 

503 

504 return sorted(routes, key=lambda r: SortableRoute(key(r) if key else r)) 

505 

506 

507def sort_apis_by_basepath(apis: t.List["API"]) -> t.List["API"]: 

508 """Sorts a list of APIs by basepath. 

509 

510 :param apis: List of APIs to sort 

511 

512 :return: List of APIs sorted by basepath 

513 """ 

514 return sort_routes(apis, key=lambda api: api.base_path or "/") 

515 

516 

517def build_example_from_schema(schema): 

518 if "example" in schema: 

519 return schema["example"] 

520 

521 if "properties" in schema: 

522 # Recurse if schema is an object 

523 return { 

524 key: build_example_from_schema(value) 

525 for (key, value) in schema["properties"].items() 

526 } 

527 

528 if "items" in schema: 

529 # Recurse if schema is an array 

530 min_item_count = schema.get("minItems", 0) 

531 max_item_count = schema.get("maxItems") 

532 

533 if max_item_count is None or max_item_count >= min_item_count + 1: 

534 item_count = min_item_count + 1 

535 else: 

536 item_count = min_item_count 

537 

538 return [build_example_from_schema(schema["items"]) for n in range(item_count)] 

539 

540 try: 

541 from jsf import JSF 

542 except ImportError: 

543 return None 

544 

545 faker = JSF(schema) 

546 return faker.generate()