Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/utils.py: 22%

187 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1""" 

2This module provides general utility functions used within Connexion. 

3""" 

4 

5import asyncio 

6import functools 

7import importlib 

8import os 

9import pkgutil 

10import sys 

11import typing as t 

12 

13import yaml 

14 

15from connexion.exceptions import TypeValidationError 

16 

17 

18def boolean(s): 

19 """ 

20 Convert JSON/Swagger boolean value to Python, raise ValueError otherwise 

21 

22 >>> boolean('true') 

23 True 

24 

25 >>> boolean('false') 

26 False 

27 """ 

28 if isinstance(s, bool): 

29 return s 

30 elif not hasattr(s, "lower"): 

31 raise ValueError("Invalid boolean value") 

32 elif s.lower() == "true": 

33 return True 

34 elif s.lower() == "false": 

35 return False 

36 else: 

37 raise ValueError("Invalid boolean value") 

38 

39 

40# https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#data-types 

41TYPE_MAP: t.Dict[str, t.Any] = { 

42 "integer": int, 

43 "number": float, 

44 "string": str, 

45 "boolean": boolean, 

46 "array": list, 

47 "object": dict, 

48 "file": lambda x: x, # Don't cast files 

49} # map of swagger types to python types 

50 

51 

52def make_type(value: t.Any, type_: str, format_: t.Optional[str]) -> t.Any: 

53 """Cast a value to the type defined in the specification.""" 

54 # In OpenAPI, files are represented with string type and binary format 

55 if type_ == "string" and format_ == "binary": 

56 type_ = "file" 

57 

58 type_func = TYPE_MAP[type_] 

59 return type_func(value) 

60 

61 

62def deep_merge(a, b): 

63 """merges b into a 

64 in case of conflict the value from b is used 

65 """ 

66 for key in b: 

67 if key in a: 

68 if isinstance(a[key], dict) and isinstance(b[key], dict): 

69 deep_merge(a[key], b[key]) 

70 elif a[key] == b[key]: 

71 pass 

72 else: 

73 # b overwrites a 

74 a[key] = b[key] 

75 else: 

76 a[key] = b[key] 

77 return a 

78 

79 

80def deep_getattr(obj, attr): 

81 """ 

82 Recurses through an attribute chain to get the ultimate value. 

83 """ 

84 

85 attrs = attr.split(".") 

86 

87 return functools.reduce(getattr, attrs, obj) 

88 

89 

90def deep_get(obj, keys): 

91 """ 

92 Recurses through a nested object get a leaf value. 

93 

94 There are cases where the use of inheritance or polymorphism-- the use of allOf or 

95 oneOf keywords-- will cause the obj to be a list. In this case the keys will 

96 contain one or more strings containing integers. 

97 

98 :type obj: list or dict 

99 :type keys: list of strings 

100 """ 

101 if not keys: 

102 return obj 

103 

104 if isinstance(obj, list): 

105 return deep_get(obj[int(keys[0])], keys[1:]) 

106 else: 

107 return deep_get(obj[keys[0]], keys[1:]) 

108 

109 

110def get_function_from_name(function_name): 

111 """ 

112 Tries to get function by fully qualified name (e.g. "mymodule.myobj.myfunc") 

113 

114 :type function_name: str 

115 """ 

116 if function_name is None: 

117 raise ValueError("Empty function name") 

118 

119 if "." in function_name: 

120 module_name, attr_path = function_name.rsplit(".", 1) 

121 else: 

122 module_name = "" 

123 attr_path = function_name 

124 

125 module = None 

126 last_import_error = None 

127 

128 while not module: 

129 try: 

130 module = importlib.import_module(module_name) 

131 except ImportError as import_error: 

132 last_import_error = import_error 

133 if "." in module_name: 

134 module_name, attr_path1 = module_name.rsplit(".", 1) 

135 attr_path = f"{attr_path1}.{attr_path}" 

136 else: 

137 raise 

138 try: 

139 function = deep_getattr(module, attr_path) 

140 except AttributeError: 

141 if last_import_error: 

142 raise last_import_error 

143 else: 

144 raise 

145 return function 

146 

147 

148def is_json_mimetype(mimetype): 

149 """ 

150 :type mimetype: str 

151 :rtype: bool 

152 """ 

153 if mimetype is None: 

154 return False 

155 

156 maintype, subtype = mimetype.split("/") # type: str, str 

157 if ";" in subtype: 

158 subtype, parameter = subtype.split(";", maxsplit=1) 

159 return maintype == "application" and ( 

160 subtype == "json" or subtype.endswith("+json") 

161 ) 

162 

163 

164def all_json(mimetypes): 

165 """ 

166 Returns True if all mimetypes are serialized with json 

167 

168 :type mimetypes: list 

169 :rtype: bool 

170 

171 >>> all_json(['application/json']) 

172 True 

173 >>> all_json(['application/x.custom+json']) 

174 True 

175 >>> all_json([]) 

176 True 

177 >>> all_json(['application/xml']) 

178 False 

179 >>> all_json(['text/json']) 

180 False 

181 >>> all_json(['application/json', 'other/type']) 

182 False 

183 >>> all_json(['application/json', 'application/x.custom+json']) 

184 True 

185 """ 

186 return all(is_json_mimetype(mimetype) for mimetype in mimetypes) 

187 

188 

189def is_nullable(param_def): 

190 return param_def.get("schema", param_def).get("nullable", False) or param_def.get( 

191 "x-nullable", False 

192 ) # swagger2 

193 

194 

195def is_null(value): 

196 if hasattr(value, "strip") and value.strip() in ["null", "None"]: 

197 return True 

198 

199 if value is None: 

200 return True 

201 

202 return False 

203 

204 

205def has_coroutine(function, api=None): 

206 """ 

207 Checks if function is a coroutine. 

208 If ``function`` is a decorator (has a ``__wrapped__`` attribute) 

209 this function will also look at the wrapped function. 

210 """ 

211 

212 def iscorofunc(func): 

213 iscorofunc = asyncio.iscoroutinefunction(func) 

214 while not iscorofunc and hasattr(func, "__wrapped__"): 

215 func = func.__wrapped__ 

216 iscorofunc = asyncio.iscoroutinefunction(func) 

217 return iscorofunc 

218 

219 if api is None: 

220 return iscorofunc(function) 

221 

222 else: 

223 return any(iscorofunc(func) for func in (function, api.get_response)) 

224 

225 

226def yamldumper(openapi): 

227 """ 

228 Returns a nicely-formatted yaml spec. 

229 :param openapi: a spec dictionary. 

230 :return: a nicely-formatted, serialized yaml spec. 

231 """ 

232 

233 def should_use_block(value): 

234 char_list = ( 

235 "\u000a" # line feed 

236 "\u000d" # carriage return 

237 "\u001c" # file separator 

238 "\u001d" # group separator 

239 "\u001e" # record separator 

240 "\u0085" # next line 

241 "\u2028" # line separator 

242 "\u2029" # paragraph separator 

243 ) 

244 for c in char_list: 

245 if c in value: 

246 return True 

247 return False 

248 

249 def my_represent_scalar(self, tag, value, style=None): 

250 if should_use_block(value): 

251 style = "|" 

252 else: 

253 style = self.default_style 

254 

255 node = yaml.representer.ScalarNode(tag, value, style=style) 

256 if self.alias_key is not None: 

257 self.represented_objects[self.alias_key] = node 

258 return node 

259 

260 class NoAnchorDumper(yaml.dumper.SafeDumper): 

261 """A yaml Dumper that does not replace duplicate entries 

262 with yaml anchors. 

263 """ 

264 

265 def ignore_aliases(self, *args): 

266 return True 

267 

268 # Dump long lines as "|". 

269 yaml.representer.SafeRepresenter.represent_scalar = my_represent_scalar 

270 

271 return yaml.dump(openapi, allow_unicode=True, Dumper=NoAnchorDumper) 

272 

273 

274def not_installed_error(exc, *, msg=None): # pragma: no cover 

275 """Raises the ImportError when the module/object is actually called with a custom message.""" 

276 

277 def _delayed_error(*args, **kwargs): 

278 if msg is not None: 

279 raise type(exc)(msg).with_traceback(exc.__traceback__) 

280 raise exc 

281 

282 return _delayed_error 

283 

284 

285def extract_content_type( 

286 headers: t.List[t.Tuple[bytes, bytes]] 

287) -> t.Tuple[t.Optional[str], t.Optional[str]]: 

288 """Extract the mime type and encoding from the content type headers. 

289 

290 :param headers: Headers from ASGI scope 

291 

292 :return: A tuple of mime type, encoding 

293 """ 

294 mime_type, encoding = None, None 

295 for key, value in headers: 

296 # Headers can always be decoded using latin-1: 

297 # https://stackoverflow.com/a/27357138/4098821 

298 decoded_key = key.decode("latin-1") 

299 if decoded_key.lower() == "content-type": 

300 content_type = value.decode("latin-1") 

301 if ";" in content_type: 

302 mime_type, parameters = content_type.split(";", maxsplit=1) 

303 

304 prefix = "charset=" 

305 for parameter in parameters.split(";"): 

306 if parameter.startswith(prefix): 

307 encoding = parameter[len(prefix) :] 

308 else: 

309 mime_type = content_type 

310 break 

311 return mime_type, encoding 

312 

313 

314def coerce_type(param, value, parameter_type, parameter_name=None): 

315 # TODO: clean up 

316 TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict} 

317 

318 def make_type(value, type_literal): 

319 type_func = TYPE_MAP.get(type_literal) 

320 return type_func(value) 

321 

322 param_schema = param.get("schema", param) 

323 if is_nullable(param_schema) and is_null(value): 

324 return None 

325 

326 param_type = param_schema.get("type") 

327 parameter_name = parameter_name if parameter_name else param.get("name") 

328 if param_type == "array": 

329 converted_params = [] 

330 if parameter_type == "header": 

331 value = value.split(",") 

332 for v in value: 

333 try: 

334 converted = make_type(v, param_schema["items"]["type"]) 

335 except (ValueError, TypeError): 

336 converted = v 

337 converted_params.append(converted) 

338 return converted_params 

339 elif param_type == "object": 

340 if param_schema.get("properties"): 

341 

342 def cast_leaves(d, schema): 

343 if type(d) is not dict: 

344 try: 

345 return make_type(d, schema["type"]) 

346 except (ValueError, TypeError): 

347 return d 

348 for k, v in d.items(): 

349 if k in schema["properties"]: 

350 d[k] = cast_leaves(v, schema["properties"][k]) 

351 return d 

352 

353 return cast_leaves(value, param_schema) 

354 return value 

355 else: 

356 try: 

357 return make_type(value, param_type) 

358 except ValueError: 

359 raise TypeValidationError(param_type, parameter_type, parameter_name) 

360 except TypeError: 

361 return value 

362 

363 

364def get_root_path(import_name: str) -> str: 

365 """Copied from Flask: 

366 https://github.com/pallets/flask/blob/836866dc19218832cf02f8b04911060ac92bfc0b/src/flask/helpers.py#L595 

367 

368 Find the root path of a package, or the path that contains a 

369 module. If it cannot be found, returns the current working 

370 directory. 

371 """ 

372 # Module already imported and has a file attribute. Use that first. 

373 mod = sys.modules.get(import_name) 

374 

375 if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None: 

376 return os.path.dirname(os.path.abspath(mod.__file__)) 

377 

378 # Next attempt: check the loader. 

379 loader = pkgutil.get_loader(import_name) 

380 

381 # Loader does not exist or we're referring to an unloaded main 

382 # module or a main module without path (interactive sessions), go 

383 # with the current working directory. 

384 if loader is None or import_name == "__main__": 

385 return os.getcwd() 

386 

387 if hasattr(loader, "get_filename"): 

388 filepath = loader.get_filename(import_name) # type: ignore 

389 else: 

390 # Fall back to imports. 

391 __import__(import_name) 

392 mod = sys.modules[import_name] 

393 filepath = getattr(mod, "__file__", None) 

394 

395 # If we don't have a file path it might be because it is a 

396 # namespace package. In this case pick the root path from the 

397 # first module that is contained in the package. 

398 if filepath is None: 

399 raise RuntimeError( 

400 "No root path can be found for the provided module" 

401 f" {import_name!r}. This can happen because the module" 

402 " came from an import hook that does not provide file" 

403 " name information or because it's a namespace package." 

404 " In this case the root path needs to be explicitly" 

405 " provided." 

406 ) 

407 

408 # filepath is import_name.py for a module, or __init__.py for a package. 

409 return os.path.dirname(os.path.abspath(filepath))