Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/decorators/parameter.py: 24%

194 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1""" 

2This module defines a decorator to convert request parameters to arguments for the view function. 

3""" 

4import abc 

5import asyncio 

6import builtins 

7import functools 

8import inspect 

9import keyword 

10import logging 

11import re 

12import typing as t 

13from copy import copy, deepcopy 

14 

15import inflection 

16 

17from connexion.context import context, operation 

18from connexion.frameworks.abstract import Framework 

19from connexion.http_facts import FORM_CONTENT_TYPES 

20from connexion.lifecycle import ASGIRequest, WSGIRequest 

21from connexion.operations import AbstractOperation, Swagger2Operation 

22from connexion.utils import deep_merge, is_null, is_nullable, make_type 

23 

24logger = logging.getLogger(__name__) 

25 

26CONTEXT_NAME = "context_" 

27 

28 

29class BaseParameterDecorator: 

30 def __init__( 

31 self, 

32 *, 

33 framework: t.Type[Framework], 

34 pythonic_params: bool = False, 

35 ) -> None: 

36 self.framework = framework 

37 self.sanitize_fn = pythonic if pythonic_params else sanitized 

38 

39 def _maybe_get_body( 

40 self, 

41 request: t.Union[WSGIRequest, ASGIRequest], 

42 *, 

43 arguments: t.List[str], 

44 has_kwargs: bool, 

45 ) -> t.Any: 

46 body_name = self.sanitize_fn(operation.body_name(request.content_type)) 

47 # Pass form contents separately for Swagger2 for backward compatibility with 

48 # Connexion 2 Checking for body_name is not enough 

49 if (body_name in arguments or has_kwargs) or ( 

50 request.mimetype in FORM_CONTENT_TYPES 

51 and isinstance(operation, Swagger2Operation) 

52 ): 

53 return request.get_body() 

54 else: 

55 return None 

56 

57 @abc.abstractmethod 

58 def __call__(self, function: t.Callable) -> t.Callable: 

59 raise NotImplementedError 

60 

61 

62class SyncParameterDecorator(BaseParameterDecorator): 

63 def __call__(self, function: t.Callable) -> t.Callable: 

64 unwrapped_function = unwrap_decorators(function) 

65 arguments, has_kwargs = inspect_function_arguments(unwrapped_function) 

66 

67 @functools.wraps(function) 

68 def wrapper(request: WSGIRequest) -> t.Any: 

69 request_body = self._maybe_get_body( 

70 request, arguments=arguments, has_kwargs=has_kwargs 

71 ) 

72 

73 kwargs = prep_kwargs( 

74 request, 

75 request_body=request_body, 

76 files=request.files(), 

77 arguments=arguments, 

78 has_kwargs=has_kwargs, 

79 sanitize=self.sanitize_fn, 

80 ) 

81 

82 return function(**kwargs) 

83 

84 return wrapper 

85 

86 

87class AsyncParameterDecorator(BaseParameterDecorator): 

88 def __call__(self, function: t.Callable) -> t.Callable: 

89 unwrapped_function = unwrap_decorators(function) 

90 arguments, has_kwargs = inspect_function_arguments(unwrapped_function) 

91 

92 @functools.wraps(function) 

93 async def wrapper(request: ASGIRequest) -> t.Any: 

94 request_body = self._maybe_get_body( 

95 request, arguments=arguments, has_kwargs=has_kwargs 

96 ) 

97 

98 while asyncio.iscoroutine(request_body): 

99 request_body = await request_body 

100 

101 kwargs = prep_kwargs( 

102 request, 

103 request_body=request_body, 

104 files=await request.files(), 

105 arguments=arguments, 

106 has_kwargs=has_kwargs, 

107 sanitize=self.sanitize_fn, 

108 ) 

109 

110 return await function(**kwargs) 

111 

112 return wrapper 

113 

114 

115def prep_kwargs( 

116 request: t.Union[WSGIRequest, ASGIRequest], 

117 *, 

118 request_body: t.Any, 

119 files: t.Dict[str, t.Any], 

120 arguments: t.List[str], 

121 has_kwargs: bool, 

122 sanitize: t.Callable, 

123) -> dict: 

124 kwargs = get_arguments( 

125 operation, 

126 path_params=request.path_params, 

127 query_params=request.query_params, 

128 body=request_body, 

129 files=files, 

130 arguments=arguments, 

131 has_kwargs=has_kwargs, 

132 sanitize=sanitize, 

133 content_type=request.mimetype, 

134 ) 

135 

136 # optionally convert parameter variable names to un-shadowed, snake_case form 

137 kwargs = {sanitize(k): v for k, v in kwargs.items()} 

138 

139 # add context info (e.g. from security decorator) 

140 for key, value in context.items(): 

141 if has_kwargs or key in arguments: 

142 kwargs[key] = value 

143 else: 

144 logger.debug("Context parameter '%s' not in function arguments", key) 

145 # attempt to provide the request context to the function 

146 if CONTEXT_NAME in arguments: 

147 kwargs[CONTEXT_NAME] = context 

148 

149 return kwargs 

150 

151 

152def unwrap_decorators(function: t.Callable) -> t.Callable: 

153 """Unwrap decorators to return the original function.""" 

154 while hasattr(function, "__wrapped__"): 

155 function = function.__wrapped__ # type: ignore 

156 return function 

157 

158 

159def inspect_function_arguments(function: t.Callable) -> t.Tuple[t.List[str], bool]: 

160 """ 

161 Returns the list of variables names of a function and if it 

162 accepts keyword arguments. 

163 """ 

164 parameters = inspect.signature(function).parameters 

165 bound_arguments = [ 

166 name 

167 for name, p in parameters.items() 

168 if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD) 

169 ] 

170 has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values()) 

171 return list(bound_arguments), has_kwargs 

172 

173 

174def snake_and_shadow(name: str) -> str: 

175 """ 

176 Converts the given name into Pythonic form. Firstly it converts CamelCase names to snake_case. Secondly it looks to 

177 see if the name matches a known built-in and if it does it appends an underscore to the name. 

178 :param name: The parameter name 

179 """ 

180 snake = inflection.underscore(name) 

181 if snake in builtins.__dict__ or keyword.iskeyword(snake): 

182 return f"{snake}_" 

183 return snake 

184 

185 

186def sanitized(name: str) -> str: 

187 return name and re.sub( 

188 "^[^a-zA-Z_]+", "", re.sub("[^0-9a-zA-Z_]", "", re.sub(r"\[(?!])", "_", name)) 

189 ) 

190 

191 

192def pythonic(name: str) -> str: 

193 name = name and snake_and_shadow(name) 

194 return sanitized(name) 

195 

196 

197def get_arguments( 

198 operation: AbstractOperation, 

199 *, 

200 path_params: dict, 

201 query_params: dict, 

202 body: t.Any, 

203 files: dict, 

204 arguments: t.List[str], 

205 has_kwargs: bool, 

206 sanitize: t.Callable, 

207 content_type: str, 

208) -> t.Dict[str, t.Any]: 

209 """ 

210 get arguments for handler function 

211 """ 

212 ret = {} 

213 ret.update(_get_path_arguments(path_params, operation=operation, sanitize=sanitize)) 

214 ret.update( 

215 _get_query_arguments( 

216 query_params, 

217 operation=operation, 

218 arguments=arguments, 

219 has_kwargs=has_kwargs, 

220 sanitize=sanitize, 

221 ) 

222 ) 

223 

224 if operation.method.upper() in ["PATCH", "POST", "PUT"]: 

225 ret.update( 

226 _get_body_argument( 

227 body, 

228 operation=operation, 

229 arguments=arguments, 

230 has_kwargs=has_kwargs, 

231 sanitize=sanitize, 

232 content_type=content_type, 

233 ) 

234 ) 

235 ret.update(_get_file_arguments(files, arguments, has_kwargs)) 

236 return ret 

237 

238 

239def _get_path_arguments( 

240 path_params: dict, *, operation: AbstractOperation, sanitize: t.Callable 

241) -> dict: 

242 """ 

243 Extract handler function arguments from path parameters 

244 """ 

245 kwargs = {} 

246 

247 path_definitions = { 

248 parameter["name"]: parameter 

249 for parameter in operation.parameters 

250 if parameter["in"] == "path" 

251 } 

252 

253 for name, value in path_params.items(): 

254 sanitized_key = sanitize(name) 

255 if name in path_definitions: 

256 kwargs[sanitized_key] = _get_val_from_param(value, path_definitions[name]) 

257 else: # Assume path params mechanism used for injection 

258 kwargs[sanitized_key] = value 

259 return kwargs 

260 

261 

262def _get_val_from_param(value: t.Any, param_definitions: t.Dict[str, dict]) -> t.Any: 

263 """Cast a value according to its definition in the specification.""" 

264 param_schema = param_definitions.get("schema", param_definitions) 

265 

266 if is_nullable(param_schema) and is_null(value): 

267 return None 

268 

269 if param_schema["type"] == "array": 

270 type_ = param_schema["items"]["type"] 

271 format_ = param_schema["items"].get("format") 

272 return [make_type(part, type_, format_) for part in value] 

273 else: 

274 type_ = param_schema["type"] 

275 format_ = param_schema.get("format") 

276 return make_type(value, type_, format_) 

277 

278 

279def _get_query_arguments( 

280 query_params: dict, 

281 *, 

282 operation: AbstractOperation, 

283 arguments: t.List[str], 

284 has_kwargs: bool, 

285 sanitize: t.Callable, 

286) -> dict: 

287 """ 

288 extract handler function arguments from the query parameters 

289 """ 

290 query_definitions = { 

291 parameter["name"]: parameter 

292 for parameter in operation.parameters 

293 if parameter["in"] == "query" 

294 } 

295 

296 default_query_params = _get_query_defaults(query_definitions) 

297 

298 query_arguments = deepcopy(default_query_params) 

299 query_arguments = deep_merge(query_arguments, query_params) 

300 return _query_args_helper( 

301 query_definitions, query_arguments, arguments, has_kwargs, sanitize 

302 ) 

303 

304 

305def _get_query_defaults(query_definitions: t.Dict[str, dict]) -> t.Dict[str, t.Any]: 

306 """Get the default values for the query parameter from the parameter definition.""" 

307 defaults = {} 

308 for k, v in query_definitions.items(): 

309 try: 

310 if "default" in v: 

311 defaults[k] = v["default"] 

312 elif v["schema"]["type"] == "object": 

313 defaults[k] = _get_default_obj(v["schema"]) 

314 else: 

315 defaults[k] = v["schema"]["default"] 

316 except KeyError: 

317 pass 

318 return defaults 

319 

320 

321def _get_default_obj(schema: dict) -> dict: 

322 try: 

323 return deepcopy(schema["default"]) 

324 except KeyError: 

325 properties = schema.get("properties", {}) 

326 return _build_default_obj_recursive(properties, {}) 

327 

328 

329def _build_default_obj_recursive(properties: dict, default_object: dict) -> dict: 

330 """takes disparate and nested default keys, and builds up a default object""" 

331 for name, property_ in properties.items(): 

332 if "default" in property_ and name not in default_object: 

333 default_object[name] = copy(property_["default"]) 

334 elif property_.get("type") == "object" and "properties" in property_: 

335 default_object.setdefault(name, {}) 

336 default_object[name] = _build_default_obj_recursive( 

337 property_["properties"], default_object[name] 

338 ) 

339 return default_object 

340 

341 

342def _query_args_helper( 

343 query_definitions: dict, 

344 query_arguments: dict, 

345 function_arguments: t.List[str], 

346 has_kwargs: bool, 

347 sanitize: t.Callable, 

348) -> dict: 

349 result = {} 

350 for key, value in query_arguments.items(): 

351 sanitized_key = sanitize(key) 

352 if not has_kwargs and sanitized_key not in function_arguments: 

353 logger.debug( 

354 "Query Parameter '%s' (sanitized: '%s') not in function arguments", 

355 key, 

356 sanitized_key, 

357 ) 

358 else: 

359 logger.debug( 

360 "Query Parameter '%s' (sanitized: '%s') in function arguments", 

361 key, 

362 sanitized_key, 

363 ) 

364 try: 

365 query_defn = query_definitions[key] 

366 except KeyError: # pragma: no cover 

367 logger.error( 

368 "Function argument '%s' (non-sanitized: %s) not defined in specification", 

369 sanitized_key, 

370 key, 

371 ) 

372 else: 

373 logger.debug("%s is a %s", key, query_defn) 

374 result.update({sanitized_key: _get_val_from_param(value, query_defn)}) 

375 return result 

376 

377 

378def _get_body_argument( 

379 body: t.Any, 

380 *, 

381 operation: AbstractOperation, 

382 arguments: t.List[str], 

383 has_kwargs: bool, 

384 sanitize: t.Callable, 

385 content_type: str, 

386) -> dict: 

387 if len(arguments) <= 0 and not has_kwargs: 

388 return {} 

389 

390 body_name = sanitize(operation.body_name(content_type)) 

391 

392 if content_type in FORM_CONTENT_TYPES: 

393 result = _get_body_argument_form( 

394 body, operation=operation, content_type=content_type 

395 ) 

396 

397 # Unpack form values for Swagger for compatibility with Connexion 2 behavior 

398 if content_type in FORM_CONTENT_TYPES and isinstance( 

399 operation, Swagger2Operation 

400 ): 

401 if has_kwargs: 

402 return result 

403 else: 

404 return { 

405 sanitize(name): value 

406 for name, value in result.items() 

407 if sanitize(name) in arguments 

408 } 

409 else: 

410 result = _get_body_argument_json( 

411 body, operation=operation, content_type=content_type 

412 ) 

413 

414 if body_name in arguments or has_kwargs: 

415 return {body_name: result} 

416 

417 return {} 

418 

419 

420def _get_body_argument_json( 

421 body: t.Any, *, operation: AbstractOperation, content_type: str 

422) -> t.Any: 

423 # if the body came in null, and the schema says it can be null, we decide 

424 # to include no value for the body argument, rather than the default body 

425 if is_nullable(operation.body_schema(content_type)) and is_null(body): 

426 return None 

427 

428 if body is None: 

429 default_body = operation.body_schema(content_type).get("default", {}) 

430 return deepcopy(default_body) 

431 

432 return body 

433 

434 

435def _get_body_argument_form( 

436 body: dict, *, operation: AbstractOperation, content_type: str 

437) -> dict: 

438 # now determine the actual value for the body (whether it came in or is default) 

439 default_body = operation.body_schema(content_type).get("default", {}) 

440 body_props = { 

441 k: {"schema": v} 

442 for k, v in operation.body_schema(content_type).get("properties", {}).items() 

443 } 

444 

445 # by OpenAPI specification `additionalProperties` defaults to `true` 

446 # see: https://github.com/OAI/OpenAPI-Specification/blame/3.0.2/versions/3.0.2.md#L2305 

447 additional_props = operation.body_schema().get("additionalProperties", True) 

448 

449 body_arg = deepcopy(default_body) 

450 body_arg.update(body or {}) 

451 

452 if body_props or additional_props: 

453 return _get_typed_body_values(body_arg, body_props, additional_props) 

454 

455 return {} 

456 

457 

458def _get_typed_body_values(body_arg, body_props, additional_props): 

459 """ 

460 Return a copy of the provided body_arg dictionary 

461 whose values will have the appropriate types 

462 as defined in the provided schemas. 

463 

464 :type body_arg: type dict 

465 :type body_props: dict 

466 :type additional_props: dict|bool 

467 :rtype: dict 

468 """ 

469 additional_props_defn = ( 

470 {"schema": additional_props} if isinstance(additional_props, dict) else None 

471 ) 

472 res = {} 

473 

474 for key, value in body_arg.items(): 

475 try: 

476 prop_defn = body_props[key] 

477 res[key] = _get_val_from_param(value, prop_defn) 

478 except KeyError: # pragma: no cover 

479 if not additional_props: 

480 logger.error(f"Body property '{key}' not defined in body schema") 

481 continue 

482 if additional_props_defn is not None: 

483 value = _get_val_from_param(value, additional_props_defn) 

484 res[key] = value 

485 

486 return res 

487 

488 

489def _get_file_arguments(files, arguments, has_kwargs=False): 

490 return {k: v for k, v in files.items() if k in arguments or has_kwargs}