Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/spec.py: 46%

197 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1""" 

2This module defines Python interfaces for OpenAPI specifications. 

3""" 

4 

5import abc 

6import copy 

7import json 

8import os 

9import pathlib 

10import pkgutil 

11from collections.abc import Mapping 

12from urllib.parse import urlsplit 

13 

14import jinja2 

15import jsonschema 

16import yaml 

17from jsonschema import Draft4Validator 

18from jsonschema.validators import extend as extend_validator 

19 

20from .exceptions import InvalidSpecification 

21from .json_schema import NullableTypeValidator, resolve_refs 

22from .operations import OpenAPIOperation, Swagger2Operation 

23from .utils import deep_get 

24 

25validate_properties = Draft4Validator.VALIDATORS["properties"] 

26 

27 

28def create_spec_validator(spec: dict) -> Draft4Validator: 

29 """Create a Validator to validate an OpenAPI spec against the OpenAPI schema. 

30 

31 :param spec: specification to validate 

32 """ 

33 # Create an instance validator, which validates defaults against the spec itself instead of 

34 # against the OpenAPI schema. 

35 InstanceValidator = extend_validator( 

36 Draft4Validator, {"type": NullableTypeValidator} 

37 ) 

38 instance_validator = InstanceValidator(spec) 

39 

40 def validate_defaults(validator, properties, instance, schema): 

41 """Validation function to validate the `properties` subschema, enforcing each default 

42 value validates against the schema in which it resides. 

43 """ 

44 valid = True 

45 for error in validate_properties(validator, properties, instance, schema): 

46 valid = False 

47 yield error 

48 

49 # Validate default only when the subschema has validated successfully 

50 if not valid: 

51 return 

52 if isinstance(instance, dict) and "default" in instance: 

53 for error in instance_validator.evolve(schema=instance).iter_errors( 

54 instance["default"] 

55 ): 

56 yield error 

57 

58 SpecValidator = extend_validator(Draft4Validator, {"properties": validate_defaults}) 

59 return SpecValidator 

60 

61 

62NO_SPEC_VERSION_ERR_MSG = """Unable to get the spec version. 

63You are missing either '"swagger": "2.0"' or '"openapi": "3.0.0"' 

64from the top level of your spec.""" 

65 

66 

67def canonical_base_path(base_path): 

68 """ 

69 Make given "basePath" a canonical base URL which can be prepended to paths starting with "/". 

70 """ 

71 return base_path.rstrip("/") 

72 

73 

74class Specification(Mapping): 

75 def __init__(self, raw_spec, *, base_uri=""): 

76 self._raw_spec = copy.deepcopy(raw_spec) 

77 self._set_defaults(raw_spec) 

78 self._validate_spec(raw_spec) 

79 self._spec = resolve_refs(raw_spec, base_uri=base_uri) 

80 

81 @classmethod 

82 @abc.abstractmethod 

83 def _set_defaults(cls, spec): 

84 """set some default values in the spec""" 

85 

86 @classmethod 

87 def _validate_spec(cls, spec): 

88 """validate spec against schema""" 

89 try: 

90 OpenApiValidator = create_spec_validator(spec) 

91 validator = OpenApiValidator(cls.openapi_schema) 

92 validator.validate(spec) 

93 except jsonschema.exceptions.ValidationError as e: 

94 raise InvalidSpecification.create_from(e) 

95 

96 def get_path_params(self, path): 

97 return deep_get(self._spec, ["paths", path]).get("parameters", []) 

98 

99 def get_operation(self, path, method): 

100 return deep_get(self._spec, ["paths", path, method]) 

101 

102 @property 

103 def raw(self): 

104 return self._raw_spec 

105 

106 @property 

107 def version(self): 

108 return self._get_spec_version(self._spec) 

109 

110 @property 

111 def security(self): 

112 return self._spec.get("security") 

113 

114 @property 

115 @abc.abstractmethod 

116 def security_schemes(self): 

117 raise NotImplementedError 

118 

119 def __getitem__(self, k): 

120 return self._spec[k] 

121 

122 def __iter__(self): 

123 return self._spec.__iter__() 

124 

125 def __len__(self): 

126 return self._spec.__len__() 

127 

128 @staticmethod 

129 def _load_spec_from_file(arguments, specification): 

130 """ 

131 Loads a YAML specification file, optionally rendering it with Jinja2. 

132 

133 :param arguments: passed to Jinja2 renderer 

134 :param specification: path to specification 

135 """ 

136 arguments = arguments or {} 

137 

138 with specification.open(mode="rb") as openapi_yaml: 

139 contents = openapi_yaml.read() 

140 try: 

141 openapi_template = contents.decode() 

142 except UnicodeDecodeError: 

143 openapi_template = contents.decode("utf-8", "replace") 

144 

145 openapi_string = jinja2.Template(openapi_template).render(**arguments) 

146 return yaml.safe_load(openapi_string) 

147 

148 @classmethod 

149 def from_file(cls, spec, *, arguments=None, base_uri=""): 

150 """ 

151 Takes in a path to a YAML file, and returns a Specification 

152 """ 

153 specification_path = pathlib.Path(spec) 

154 spec = cls._load_spec_from_file(arguments, specification_path) 

155 return cls.from_dict(spec, base_uri=base_uri) 

156 

157 @staticmethod 

158 def _get_spec_version(spec): 

159 try: 

160 version_string = spec.get("openapi") or spec.get("swagger") 

161 except AttributeError: 

162 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG) 

163 if version_string is None: 

164 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG) 

165 try: 

166 version_tuple = tuple(map(int, version_string.split("."))) 

167 except TypeError: 

168 err = ( 

169 "Unable to convert version string to semantic version tuple: " 

170 "{version_string}." 

171 ) 

172 err = err.format(version_string=version_string) 

173 raise InvalidSpecification(err) 

174 return version_tuple 

175 

176 @classmethod 

177 def from_dict(cls, spec, *, base_uri=""): 

178 """ 

179 Takes in a dictionary, and returns a Specification 

180 """ 

181 

182 def enforce_string_keys(obj): 

183 # YAML supports integer keys, but JSON does not 

184 if isinstance(obj, dict): 

185 return {str(k): enforce_string_keys(v) for k, v in obj.items()} 

186 return obj 

187 

188 spec = enforce_string_keys(spec) 

189 version = cls._get_spec_version(spec) 

190 if version < (3, 0, 0): 

191 return Swagger2Specification(spec, base_uri=base_uri) 

192 return OpenAPISpecification(spec, base_uri=base_uri) 

193 

194 def clone(self): 

195 return type(self)(copy.deepcopy(self._spec)) 

196 

197 @classmethod 

198 def load(cls, spec, *, arguments=None): 

199 if not isinstance(spec, dict): 

200 base_uri = f"{pathlib.Path(spec).parent}{os.sep}" 

201 return cls.from_file(spec, arguments=arguments, base_uri=base_uri) 

202 return cls.from_dict(spec) 

203 

204 def with_base_path(self, base_path): 

205 new_spec = self.clone() 

206 new_spec.base_path = base_path 

207 return new_spec 

208 

209 

210class Swagger2Specification(Specification): 

211 """Python interface for a Swagger 2 specification.""" 

212 

213 yaml_name = "swagger.yaml" 

214 operation_cls = Swagger2Operation 

215 

216 openapi_schema = json.loads( 

217 pkgutil.get_data("connexion", "resources/schemas/v2.0/schema.json") # type: ignore 

218 ) 

219 

220 @classmethod 

221 def _set_defaults(cls, spec): 

222 spec.setdefault("produces", []) 

223 spec.setdefault("consumes", ["application/json"]) 

224 spec.setdefault("definitions", {}) 

225 spec.setdefault("parameters", {}) 

226 spec.setdefault("responses", {}) 

227 

228 @property 

229 def produces(self): 

230 return self._spec["produces"] 

231 

232 @property 

233 def consumes(self): 

234 return self._spec["consumes"] 

235 

236 @property 

237 def definitions(self): 

238 return self._spec["definitions"] 

239 

240 @property 

241 def parameter_definitions(self): 

242 return self._spec["parameters"] 

243 

244 @property 

245 def response_definitions(self): 

246 return self._spec["responses"] 

247 

248 @property 

249 def security_schemes(self): 

250 return self._spec.get("securityDefinitions", {}) 

251 

252 @property 

253 def base_path(self): 

254 return canonical_base_path(self._spec.get("basePath", "")) 

255 

256 @base_path.setter 

257 def base_path(self, base_path): 

258 base_path = canonical_base_path(base_path) 

259 self._raw_spec["basePath"] = base_path 

260 self._spec["basePath"] = base_path 

261 

262 

263class OpenAPISpecification(Specification): 

264 """Python interface for an OpenAPI 3 specification.""" 

265 

266 yaml_name = "openapi.yaml" 

267 operation_cls = OpenAPIOperation 

268 

269 openapi_schema = json.loads( 

270 pkgutil.get_data("connexion", "resources/schemas/v3.0/schema.json") # type: ignore 

271 ) 

272 

273 @classmethod 

274 def _set_defaults(cls, spec): 

275 spec.setdefault("components", {}) 

276 

277 @property 

278 def security_schemes(self): 

279 return self._spec["components"].get("securitySchemes", {}) 

280 

281 @property 

282 def components(self): 

283 return self._spec["components"] 

284 

285 @property 

286 def base_path(self): 

287 servers = self._spec.get("servers", []) 

288 try: 

289 # assume we're the first server in list 

290 server = copy.deepcopy(servers[0]) 

291 server_vars = server.pop("variables", {}) 

292 server["url"] = server["url"].format( 

293 **{k: v["default"] for k, v in server_vars.items()} 

294 ) 

295 base_path = urlsplit(server["url"]).path 

296 except IndexError: 

297 base_path = "" 

298 return canonical_base_path(base_path) 

299 

300 @base_path.setter 

301 def base_path(self, base_path): 

302 base_path = canonical_base_path(base_path) 

303 user_servers = [{"url": base_path}] 

304 self._raw_spec["servers"] = user_servers 

305 self._spec["servers"] = user_servers