Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/connexion/spec.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

214 statements  

1""" 

2This module defines Python interfaces for OpenAPI specifications. 

3""" 

4 

5import abc 

6import copy 

7import json 

8import os 

9import pathlib 

10import pkgutil 

11import typing as t 

12from collections.abc import Mapping 

13from urllib.parse import urlsplit 

14 

15import jinja2 

16import jsonschema 

17import yaml 

18from jsonschema import Draft4Validator 

19from jsonschema.validators import extend as extend_validator 

20 

21from .exceptions import InvalidSpecification 

22from .json_schema import NullableTypeValidator, URLHandler, resolve_refs 

23from .operations import AbstractOperation, OpenAPIOperation, Swagger2Operation 

24from .utils import deep_get 

25 

26validate_properties = Draft4Validator.VALIDATORS["properties"] 

27 

28 

29def create_spec_validator(spec: dict) -> Draft4Validator: 

30 """Create a Validator to validate an OpenAPI spec against the OpenAPI schema. 

31 

32 :param spec: specification to validate 

33 """ 

34 # Create an instance validator, which validates defaults against the spec itself instead of 

35 # against the OpenAPI schema. 

36 InstanceValidator = extend_validator( 

37 Draft4Validator, {"type": NullableTypeValidator} 

38 ) 

39 instance_validator = InstanceValidator(spec) 

40 

41 def validate_defaults(validator, properties, instance, schema): 

42 """Validation function to validate the `properties` subschema, enforcing each default 

43 value validates against the schema in which it resides. 

44 """ 

45 valid = True 

46 for error in validate_properties(validator, properties, instance, schema): 

47 valid = False 

48 yield error 

49 

50 # Validate default only when the subschema has validated successfully 

51 if not valid: 

52 return 

53 if isinstance(instance, dict) and "default" in instance: 

54 for error in instance_validator.evolve(schema=instance).iter_errors( 

55 instance["default"] 

56 ): 

57 yield error 

58 

59 SpecValidator = extend_validator(Draft4Validator, {"properties": validate_defaults}) 

60 return SpecValidator 

61 

62 

63NO_SPEC_VERSION_ERR_MSG = """Unable to get the spec version. 

64You are missing either '"swagger": "2.0"' or '"openapi": "3.0.0"' 

65from the top level of your spec.""" 

66 

67 

68def canonical_base_path(base_path): 

69 """ 

70 Make given "basePath" a canonical base URL which can be prepended to paths starting with "/". 

71 """ 

72 return base_path.rstrip("/") 

73 

74 

75class Specification(Mapping): 

76 

77 operation_cls: t.Type[AbstractOperation] 

78 

79 def __init__(self, raw_spec, *, base_uri=""): 

80 self._raw_spec = copy.deepcopy(raw_spec) 

81 self._set_defaults(raw_spec) 

82 self._validate_spec(raw_spec) 

83 self._spec = resolve_refs(raw_spec, base_uri=base_uri) 

84 

85 @classmethod 

86 @abc.abstractmethod 

87 def _set_defaults(cls, spec): 

88 """set some default values in the spec""" 

89 

90 @classmethod 

91 def _validate_spec(cls, spec): 

92 """validate spec against schema""" 

93 try: 

94 OpenApiValidator = create_spec_validator(spec) 

95 validator = OpenApiValidator(cls.openapi_schema) 

96 validator.validate(spec) 

97 except jsonschema.exceptions.ValidationError as e: 

98 raise InvalidSpecification.create_from(e) 

99 

100 def get_path_params(self, path): 

101 return deep_get(self._spec, ["paths", path]).get("parameters", []) 

102 

103 def get_operation(self, path, method): 

104 return deep_get(self._spec, ["paths", path, method]) 

105 

106 @property 

107 def raw(self): 

108 return self._raw_spec 

109 

110 @property 

111 def version(self): 

112 return self._get_spec_version(self._spec) 

113 

114 @property 

115 def security(self): 

116 return self._spec.get("security") 

117 

118 @property 

119 @abc.abstractmethod 

120 def security_schemes(self): 

121 raise NotImplementedError 

122 

123 def __getitem__(self, k): 

124 return self._spec[k] 

125 

126 def __iter__(self): 

127 return self._spec.__iter__() 

128 

129 def __len__(self): 

130 return self._spec.__len__() 

131 

132 @staticmethod 

133 def _load_spec_from_file(arguments, specification): 

134 """ 

135 Loads a YAML specification file, optionally rendering it with Jinja2. 

136 

137 :param arguments: passed to Jinja2 renderer 

138 :param specification: path to specification 

139 """ 

140 arguments = arguments or {} 

141 

142 with specification.open(mode="rb") as openapi_yaml: 

143 contents = openapi_yaml.read() 

144 try: 

145 openapi_template = contents.decode() 

146 except UnicodeDecodeError: 

147 openapi_template = contents.decode("utf-8", "replace") 

148 

149 openapi_string = jinja2.Template(openapi_template).render(**arguments) 

150 return yaml.safe_load(openapi_string) 

151 

152 @classmethod 

153 def from_file(cls, spec, *, arguments=None, base_uri=""): 

154 """ 

155 Takes in a path to a YAML file, and returns a Specification 

156 """ 

157 specification_path = pathlib.Path(spec) 

158 spec = cls._load_spec_from_file(arguments, specification_path) 

159 return cls.from_dict(spec, base_uri=base_uri) 

160 

161 @classmethod 

162 def from_url(cls, spec, *, base_uri=""): 

163 """ 

164 Takes in a path to a YAML file, and returns a Specification 

165 """ 

166 spec = URLHandler()(spec) 

167 return cls.from_dict(spec, base_uri=base_uri) 

168 

169 @staticmethod 

170 def _get_spec_version(spec): 

171 try: 

172 version_string = spec.get("openapi") or spec.get("swagger") 

173 except AttributeError: 

174 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG) 

175 if version_string is None: 

176 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG) 

177 try: 

178 version_tuple = tuple(map(int, version_string.split("."))) 

179 except TypeError: 

180 err = ( 

181 "Unable to convert version string to semantic version tuple: " 

182 "{version_string}." 

183 ) 

184 err = err.format(version_string=version_string) 

185 raise InvalidSpecification(err) 

186 return version_tuple 

187 

188 @classmethod 

189 def from_dict(cls, spec, *, base_uri=""): 

190 """ 

191 Takes in a dictionary, and returns a Specification 

192 """ 

193 

194 def enforce_string_keys(obj): 

195 # YAML supports integer keys, but JSON does not 

196 if isinstance(obj, dict): 

197 return {str(k): enforce_string_keys(v) for k, v in obj.items()} 

198 return obj 

199 

200 spec = enforce_string_keys(spec) 

201 version = cls._get_spec_version(spec) 

202 if version < (3, 0, 0): 

203 return Swagger2Specification(spec, base_uri=base_uri) 

204 return OpenAPISpecification(spec, base_uri=base_uri) 

205 

206 def clone(self): 

207 return type(self)(copy.deepcopy(self._spec)) 

208 

209 @classmethod 

210 def load(cls, spec, *, arguments=None): 

211 if isinstance(spec, str) and ( 

212 spec.startswith("http://") or spec.startswith("https://") 

213 ): 

214 return cls.from_url(spec) 

215 if not isinstance(spec, dict): 

216 base_uri = f"{pathlib.Path(spec).parent}{os.sep}" 

217 return cls.from_file(spec, arguments=arguments, base_uri=base_uri) 

218 return cls.from_dict(spec) 

219 

220 def with_base_path(self, base_path): 

221 new_spec = self.clone() 

222 new_spec.base_path = base_path 

223 return new_spec 

224 

225 @property 

226 @abc.abstractmethod 

227 def base_path(self): 

228 pass 

229 

230 @base_path.setter 

231 @abc.abstractmethod 

232 def base_path(self, base_path): 

233 pass 

234 

235 

236class Swagger2Specification(Specification): 

237 """Python interface for a Swagger 2 specification.""" 

238 

239 yaml_name = "swagger.yaml" 

240 operation_cls = Swagger2Operation 

241 

242 openapi_schema = json.loads( 

243 pkgutil.get_data("connexion", "resources/schemas/v2.0/schema.json") # type: ignore 

244 ) 

245 

246 @classmethod 

247 def _set_defaults(cls, spec): 

248 spec.setdefault("produces", []) 

249 spec.setdefault("consumes", ["application/json"]) 

250 spec.setdefault("definitions", {}) 

251 spec.setdefault("parameters", {}) 

252 spec.setdefault("responses", {}) 

253 

254 @property 

255 def produces(self): 

256 return self._spec["produces"] 

257 

258 @property 

259 def consumes(self): 

260 return self._spec["consumes"] 

261 

262 @property 

263 def definitions(self): 

264 return self._spec["definitions"] 

265 

266 @property 

267 def parameter_definitions(self): 

268 return self._spec["parameters"] 

269 

270 @property 

271 def response_definitions(self): 

272 return self._spec["responses"] 

273 

274 @property 

275 def security_schemes(self): 

276 return self._spec.get("securityDefinitions", {}) 

277 

278 @property 

279 def base_path(self): 

280 return canonical_base_path(self._spec.get("basePath", "")) 

281 

282 @base_path.setter 

283 def base_path(self, base_path): 

284 base_path = canonical_base_path(base_path) 

285 self._raw_spec["basePath"] = base_path 

286 self._spec["basePath"] = base_path 

287 

288 

289class OpenAPISpecification(Specification): 

290 """Python interface for an OpenAPI 3 specification.""" 

291 

292 yaml_name = "openapi.yaml" 

293 operation_cls = OpenAPIOperation 

294 

295 openapi_schema = json.loads( 

296 pkgutil.get_data("connexion", "resources/schemas/v3.0/schema.json") # type: ignore 

297 ) 

298 

299 @classmethod 

300 def _set_defaults(cls, spec): 

301 spec.setdefault("components", {}) 

302 

303 @property 

304 def security_schemes(self): 

305 return self._spec["components"].get("securitySchemes", {}) 

306 

307 @property 

308 def components(self): 

309 return self._spec["components"] 

310 

311 @property 

312 def base_path(self): 

313 servers = self._spec.get("servers", []) 

314 try: 

315 # assume we're the first server in list 

316 server = copy.deepcopy(servers[0]) 

317 server_vars = server.pop("variables", {}) 

318 server["url"] = server["url"].format( 

319 **{k: v["default"] for k, v in server_vars.items()} 

320 ) 

321 base_path = urlsplit(server["url"]).path 

322 except IndexError: 

323 base_path = "" 

324 return canonical_base_path(base_path) 

325 

326 @base_path.setter 

327 def base_path(self, base_path): 

328 base_path = canonical_base_path(base_path) 

329 user_servers = [{"url": base_path}] 

330 self._raw_spec["servers"] = user_servers 

331 self._spec["servers"] = user_servers