Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/connexion/utils.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

255 statements  

1""" 

2This module provides general utility functions used within Connexion. 

3""" 

4 

5import asyncio 

6import functools 

7import importlib 

8import inspect 

9import os 

10import sys 

11import typing 

12import typing as t 

13 

14import yaml 

15from starlette.routing import compile_path 

16 

17from connexion.exceptions import TypeValidationError 

18 

19if t.TYPE_CHECKING: 

20 from connexion.middleware.main import API 

21 

22 

23def boolean(s): 

24 """ 

25 Convert JSON/Swagger boolean value to Python, raise ValueError otherwise 

26 

27 >>> boolean('true') 

28 True 

29 

30 >>> boolean('false') 

31 False 

32 """ 

33 if isinstance(s, bool): 

34 return s 

35 elif not hasattr(s, "lower"): 

36 raise ValueError("Invalid boolean value") 

37 elif s.lower() == "true": 

38 return True 

39 elif s.lower() == "false": 

40 return False 

41 else: 

42 raise ValueError("Invalid boolean value") 

43 

44 

45# https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#data-types 

46TYPE_MAP: t.Dict[str, t.Any] = { 

47 "integer": int, 

48 "number": float, 

49 "string": str, 

50 "boolean": boolean, 

51 "array": list, 

52 "object": dict, 

53 "file": lambda x: x, # Don't cast files 

54} # map of swagger types to python types 

55 

56 

57def make_type(value: t.Any, type_: str, format_: t.Optional[str]) -> t.Any: 

58 """Cast a value to the type defined in the specification.""" 

59 # In OpenAPI, files are represented with string type and binary format 

60 if type_ == "string" and format_ == "binary": 

61 type_ = "file" 

62 

63 type_func = TYPE_MAP[type_] 

64 return type_func(value) 

65 

66 

67def deep_merge(a, b): 

68 """merges b into a 

69 in case of conflict the value from b is used 

70 """ 

71 for key in b: 

72 if key in a: 

73 if isinstance(a[key], dict) and isinstance(b[key], dict): 

74 deep_merge(a[key], b[key]) 

75 elif a[key] == b[key]: 

76 pass 

77 else: 

78 # b overwrites a 

79 a[key] = b[key] 

80 else: 

81 a[key] = b[key] 

82 return a 

83 

84 

85def deep_getattr(obj, attr): 

86 """ 

87 Recurses through an attribute chain to get the ultimate value. 

88 """ 

89 

90 attrs = attr.split(".") 

91 

92 return functools.reduce(getattr, attrs, obj) 

93 

94 

95def deep_get(obj, keys): 

96 """ 

97 Recurses through a nested object get a leaf value. 

98 

99 There are cases where the use of inheritance or polymorphism-- the use of allOf or 

100 oneOf keywords-- will cause the obj to be a list. In this case the keys will 

101 contain one or more strings containing integers. 

102 

103 :type obj: list or dict 

104 :type keys: list of strings 

105 """ 

106 if not keys: 

107 return obj 

108 

109 if isinstance(obj, list): 

110 return deep_get(obj[int(keys[0])], keys[1:]) 

111 else: 

112 return deep_get(obj[keys[0]], keys[1:]) 

113 

114 

115def get_function_from_name(function_name): 

116 """ 

117 Tries to get function by fully qualified name (e.g. "mymodule.myobj.myfunc") 

118 

119 :type function_name: str 

120 """ 

121 if function_name is None: 

122 raise ValueError("Empty function name") 

123 

124 if "." in function_name: 

125 module_name, attr_path = function_name.rsplit(".", 1) 

126 else: 

127 module_name = "" 

128 attr_path = function_name 

129 

130 module = None 

131 last_import_error = None 

132 

133 while not module: 

134 try: 

135 module = importlib.import_module(module_name) 

136 except ImportError as import_error: 

137 last_import_error = import_error 

138 if "." in module_name: 

139 module_name, attr_path1 = module_name.rsplit(".", 1) 

140 attr_path = f"{attr_path1}.{attr_path}" 

141 else: 

142 raise 

143 try: 

144 function = deep_getattr(module, attr_path) 

145 except AttributeError: 

146 if last_import_error: 

147 raise last_import_error 

148 else: 

149 raise 

150 return function 

151 

152 

153def is_json_mimetype(mimetype): 

154 """ 

155 :type mimetype: str 

156 :rtype: bool 

157 """ 

158 if mimetype is None: 

159 return False 

160 

161 maintype, subtype = mimetype.split("/") # type: str, str 

162 if ";" in subtype: 

163 subtype, parameter = subtype.split(";", maxsplit=1) 

164 return maintype == "application" and ( 

165 subtype == "json" or subtype.endswith("+json") 

166 ) 

167 

168 

169def all_json(mimetypes): 

170 """ 

171 Returns True if all mimetypes are serialized with json 

172 

173 :type mimetypes: list 

174 :rtype: bool 

175 

176 >>> all_json(['application/json']) 

177 True 

178 >>> all_json(['application/x.custom+json']) 

179 True 

180 >>> all_json([]) 

181 True 

182 >>> all_json(['application/xml']) 

183 False 

184 >>> all_json(['text/json']) 

185 False 

186 >>> all_json(['application/json', 'other/type']) 

187 False 

188 >>> all_json(['application/json', 'application/x.custom+json']) 

189 True 

190 """ 

191 return all(is_json_mimetype(mimetype) for mimetype in mimetypes) 

192 

193 

194def is_nullable(param_def): 

195 return param_def.get("schema", param_def).get("nullable", False) or param_def.get( 

196 "x-nullable", False 

197 ) # swagger2 

198 

199 

200def is_null(value): 

201 if hasattr(value, "strip") and value.strip() in ["null", "None"]: 

202 return True 

203 

204 if value is None: 

205 return True 

206 

207 return False 

208 

209 

210def has_coroutine(function, api=None): 

211 """ 

212 Checks if function is a coroutine. 

213 If ``function`` is a decorator (has a ``__wrapped__`` attribute) 

214 this function will also look at the wrapped function. 

215 """ 

216 

217 def iscorofunc(func): 

218 iscorofunc = asyncio.iscoroutinefunction(func) 

219 while not iscorofunc and hasattr(func, "__wrapped__"): 

220 func = func.__wrapped__ 

221 iscorofunc = asyncio.iscoroutinefunction(func) 

222 return iscorofunc 

223 

224 if api is None: 

225 return iscorofunc(function) 

226 

227 else: 

228 return any(iscorofunc(func) for func in (function, api.get_response)) 

229 

230 

231def yamldumper(openapi): 

232 """ 

233 Returns a nicely-formatted yaml spec. 

234 :param openapi: a spec dictionary. 

235 :return: a nicely-formatted, serialized yaml spec. 

236 """ 

237 

238 def should_use_block(value): 

239 char_list = ( 

240 "\u000a" # line feed 

241 "\u000d" # carriage return 

242 "\u001c" # file separator 

243 "\u001d" # group separator 

244 "\u001e" # record separator 

245 "\u0085" # next line 

246 "\u2028" # line separator 

247 "\u2029" # paragraph separator 

248 ) 

249 for c in char_list: 

250 if c in value: 

251 return True 

252 return False 

253 

254 def my_represent_scalar(self, tag, value, style=None): 

255 if should_use_block(value): 

256 style = "|" 

257 else: 

258 style = self.default_style 

259 

260 node = yaml.representer.ScalarNode(tag, value, style=style) 

261 if self.alias_key is not None: 

262 self.represented_objects[self.alias_key] = node 

263 return node 

264 

265 class NoAnchorDumper(yaml.dumper.SafeDumper): 

266 """A yaml Dumper that does not replace duplicate entries 

267 with yaml anchors. 

268 """ 

269 

270 def ignore_aliases(self, *args): 

271 return True 

272 

273 # Dump long lines as "|". 

274 yaml.representer.SafeRepresenter.represent_scalar = my_represent_scalar 

275 

276 return yaml.dump(openapi, allow_unicode=True, Dumper=NoAnchorDumper) 

277 

278 

279def not_installed_error(exc, *, msg=None): # pragma: no cover 

280 """Raises the ImportError when the module/object is actually called with a custom message.""" 

281 

282 def _delayed_error(*args, **kwargs): 

283 if msg is not None: 

284 raise type(exc)(msg).with_traceback(exc.__traceback__) 

285 raise exc 

286 

287 return _delayed_error 

288 

289 

290def extract_content_type( 

291 headers: t.Union[t.List[t.Tuple[bytes, bytes]], t.Dict[str, str]] 

292) -> t.Optional[str]: 

293 """Extract the mime type and encoding from the content type headers. 

294 

295 :param headers: Headers from ASGI scope 

296 

297 :return: The content type if available in headers, otherwise None 

298 """ 

299 content_type: t.Optional[str] = None 

300 

301 header_pairs_type = t.Collection[t.Tuple[t.Union[str, bytes], t.Union[str, bytes]]] 

302 header_pairs: header_pairs_type = headers.items() if isinstance(headers, dict) else headers # type: ignore 

303 for key, value in header_pairs: 

304 # Headers can always be decoded using latin-1: 

305 # https://stackoverflow.com/a/27357138/4098821 

306 if isinstance(key, bytes): 

307 decoded_key: str = key.decode("latin-1") 

308 else: 

309 decoded_key = key 

310 

311 if decoded_key.lower() == "content-type": 

312 if isinstance(value, bytes): 

313 content_type = value.decode("latin-1") 

314 else: 

315 content_type = value 

316 break 

317 

318 return content_type 

319 

320 

321def split_content_type( 

322 content_type: t.Optional[str], 

323) -> t.Tuple[t.Optional[str], t.Optional[str]]: 

324 """Split the content type in mime_type and encoding. Other parameters are ignored.""" 

325 mime_type, encoding = None, None 

326 

327 if content_type is None: 

328 return mime_type, encoding 

329 

330 # Check for parameters 

331 if ";" in content_type: 

332 mime_type, parameters = content_type.split(";", maxsplit=1) 

333 

334 # Find parameter describing the charset 

335 prefix = "charset=" 

336 for parameter in parameters.split(";"): 

337 if parameter.startswith(prefix): 

338 encoding = parameter[len(prefix) :] 

339 else: 

340 mime_type = content_type 

341 return mime_type, encoding 

342 

343 

344def coerce_type(param, value, parameter_type, parameter_name=None): 

345 # TODO: clean up 

346 TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict} 

347 

348 def make_type(value, type_literal): 

349 type_func = TYPE_MAP.get(type_literal) 

350 return type_func(value) 

351 

352 param_schema = param.get("schema", param) 

353 if is_nullable(param_schema) and is_null(value): 

354 return None 

355 

356 param_type = param_schema.get("type") 

357 parameter_name = parameter_name if parameter_name else param.get("name") 

358 if param_type == "array": 

359 converted_params = [] 

360 if parameter_type == "header": 

361 value = value.split(",") 

362 for v in value: 

363 try: 

364 converted = make_type(v, param_schema["items"]["type"]) 

365 except (ValueError, TypeError): 

366 converted = v 

367 converted_params.append(converted) 

368 return converted_params 

369 elif param_type == "object": 

370 if param_schema.get("properties"): 

371 

372 def cast_leaves(d, schema): 

373 if type(d) is not dict: 

374 try: 

375 return make_type(d, schema["type"]) 

376 except (ValueError, TypeError): 

377 return d 

378 for k, v in d.items(): 

379 if k in schema["properties"]: 

380 d[k] = cast_leaves(v, schema["properties"][k]) 

381 return d 

382 

383 return cast_leaves(value, param_schema) 

384 return value 

385 else: 

386 try: 

387 return make_type(value, param_type) 

388 except ValueError: 

389 raise TypeValidationError(param_type, parameter_type, parameter_name) 

390 except TypeError: 

391 return value 

392 

393 

394@typing.no_type_check 

395def get_root_path(import_name: str) -> str: 

396 """Copied from Flask: 

397 https://github.com/pallets/flask/blob/836866dc19218832cf02f8b04911060ac92bfc0b/src/flask/helpers.py#L595 

398 

399 Find the root path of a package, or the path that contains a 

400 module. If it cannot be found, returns the current working 

401 directory. 

402 

403 :meta private: 

404 """ 

405 # Module already imported and has a file attribute. Use that first. 

406 mod = sys.modules.get(import_name) 

407 

408 if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None: 

409 return os.path.dirname(os.path.abspath(mod.__file__)) 

410 

411 # Next attempt: check the loader. 

412 try: 

413 spec = importlib.util.find_spec(import_name) 

414 

415 if spec is None: 

416 raise ValueError 

417 except (ImportError, ValueError): 

418 loader = None 

419 else: 

420 loader = spec.loader 

421 

422 # Loader does not exist or we're referring to an unloaded main 

423 # module or a main module without path (interactive sessions), go 

424 # with the current working directory. 

425 if loader is None: 

426 return os.getcwd() 

427 

428 if hasattr(loader, "get_filename"): 

429 filepath = loader.get_filename(import_name) # pyright: ignore 

430 else: 

431 # Fall back to imports. 

432 __import__(import_name) 

433 mod = sys.modules[import_name] 

434 filepath = getattr(mod, "__file__", None) 

435 

436 # If we don't have a file path it might be because it is a 

437 # namespace package. In this case pick the root path from the 

438 # first module that is contained in the package. 

439 if filepath is None: 

440 raise RuntimeError( 

441 "No root path can be found for the provided module" 

442 f" {import_name!r}. This can happen because the module" 

443 " came from an import hook that does not provide file" 

444 " name information or because it's a namespace package." 

445 " In this case the root path needs to be explicitly" 

446 " provided." 

447 ) 

448 

449 # filepath is import_name.py for a module, or __init__.py for a package. 

450 return os.path.dirname(os.path.abspath(filepath)) # type: ignore[no-any-return] 

451 

452 

453def inspect_function_arguments(function: t.Callable) -> t.Tuple[t.List[str], bool]: 

454 """ 

455 Returns the list of variables names of a function and if it 

456 accepts keyword arguments. 

457 """ 

458 parameters = inspect.signature(function).parameters 

459 bound_arguments = [ 

460 name 

461 for name, p in parameters.items() 

462 if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD) 

463 ] 

464 has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values()) 

465 return list(bound_arguments), has_kwargs 

466 

467 

468T = t.TypeVar("T") 

469 

470 

471@t.overload 

472def sort_routes(routes: t.List[str], *, key: None = None) -> t.List[str]: 

473 ... 

474 

475 

476@t.overload 

477def sort_routes(routes: t.List[T], *, key: t.Callable[[T], str]) -> t.List[T]: 

478 ... 

479 

480 

481def sort_routes(routes, *, key=None): 

482 """Sorts a list of routes from most specific to least specific. 

483 

484 See Starlette routing documentation and implementation as this function 

485 is aimed to sort according to that logic. 

486 - https://www.starlette.io/routing/#route-priority 

487 

488 The only difference is that a `path` component is appended to each route 

489 such that `/` is less specific than `/basepath` while they are technically 

490 not comparable. 

491 This is because it is also done by the `Mount` class internally: 

492 https://github.com/encode/starlette/blob/1c1043ca0ab7126419948b27f9d0a78270fd74e6/starlette/routing.py#L388 

493 

494 For example, from most to least specific: 

495 - /users/me 

496 - /users/{username}/projects/{project} 

497 - /users/{username} 

498 

499 :param routes: List of routes to sort 

500 :param key: Function to extract the path from a route if it is not a string 

501 

502 :return: List of routes sorted from most specific to least specific 

503 """ 

504 

505 class SortableRoute: 

506 def __init__(self, path: str) -> None: 

507 self.path = path.rstrip("/") 

508 if not self.path.endswith("/{path:path}"): 

509 self.path += "/{path:path}" 

510 self.path_regex, _, _ = compile_path(self.path) 

511 

512 def __lt__(self, other: "SortableRoute") -> bool: 

513 return bool(other.path_regex.match(self.path)) or self.path < other.path 

514 

515 return sorted(routes, key=lambda r: SortableRoute(key(r) if key else r)) 

516 

517 

518def sort_apis_by_basepath(apis: t.List["API"]) -> t.List["API"]: 

519 """Sorts a list of APIs by basepath. 

520 

521 :param apis: List of APIs to sort 

522 

523 :return: List of APIs sorted by basepath 

524 """ 

525 return sort_routes(apis, key=lambda api: api.base_path or "/") 

526 

527 

528def build_example_from_schema(schema): 

529 if "example" in schema: 

530 return schema["example"] 

531 

532 if "properties" in schema: 

533 # Recurse if schema is an object 

534 return { 

535 key: build_example_from_schema(value) 

536 for (key, value) in schema["properties"].items() 

537 } 

538 

539 if "items" in schema: 

540 # Recurse if schema is an array 

541 min_item_count = schema.get("minItems", 0) 

542 max_item_count = schema.get("maxItems") 

543 

544 if max_item_count is None or max_item_count >= min_item_count + 1: 

545 item_count = min_item_count + 1 

546 else: 

547 item_count = min_item_count 

548 

549 return [build_example_from_schema(schema["items"]) for n in range(item_count)] 

550 

551 try: 

552 from jsf import JSF 

553 except ImportError: 

554 return None 

555 

556 try: 

557 faker = JSF(schema) 

558 return faker.generate() 

559 except TypeError: 

560 

561 return None