Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/connexion/spec.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

218 statements  

1""" 

2This module defines Python interfaces for OpenAPI specifications. 

3""" 

4 

5import abc 

6import copy 

7import json 

8import os 

9import pathlib 

10import pkgutil 

11import typing as t 

12from collections.abc import Mapping 

13from urllib.parse import urlsplit 

14 

15import jinja2 

16import jsonschema 

17import yaml 

18from jsonschema import Draft4Validator 

19from jsonschema.validators import extend as extend_validator 

20 

21from .exceptions import InvalidSpecification 

22from .json_schema import NullableTypeValidator, URLHandler, resolve_refs 

23from .operations import AbstractOperation, OpenAPIOperation, Swagger2Operation 

24from .utils import deep_get 

25 

26validate_properties = Draft4Validator.VALIDATORS["properties"] 

27 

28 

29def create_spec_validator(spec: dict) -> Draft4Validator: 

30 """Create a Validator to validate an OpenAPI spec against the OpenAPI schema. 

31 

32 :param spec: specification to validate 

33 """ 

34 # Create an instance validator, which validates defaults against the spec itself instead of 

35 # against the OpenAPI schema. 

36 InstanceValidator = extend_validator( 

37 Draft4Validator, {"type": NullableTypeValidator} 

38 ) 

39 instance_validator = InstanceValidator(spec) 

40 

41 def validate_defaults(validator, properties, instance, schema): 

42 """Validation function to validate the `properties` subschema, enforcing each default 

43 value validates against the schema in which it resides. 

44 """ 

45 valid = True 

46 for error in validate_properties(validator, properties, instance, schema): 

47 valid = False 

48 yield error 

49 

50 # Validate default only when the subschema has validated successfully 

51 if not valid: 

52 return 

53 if isinstance(instance, dict) and "default" in instance: 

54 for error in instance_validator.evolve(schema=instance).iter_errors( 

55 instance["default"] 

56 ): 

57 yield error 

58 

59 SpecValidator = extend_validator(Draft4Validator, {"properties": validate_defaults}) 

60 return SpecValidator 

61 

62 

63NO_SPEC_VERSION_ERR_MSG = """Unable to get the spec version. 

64You are missing either '"swagger": "2.0"' or '"openapi": "3.0.0"' 

65from the top level of your spec.""" 

66 

67 

68def canonical_base_path(base_path): 

69 """ 

70 Make given "basePath" a canonical base URL which can be prepended to paths starting with "/". 

71 """ 

72 return base_path.rstrip("/") 

73 

74 

75class Specification(Mapping): 

76 

77 operation_cls: t.Type[AbstractOperation] 

78 

79 def __init__(self, raw_spec, *, base_uri=""): 

80 self._raw_spec = copy.deepcopy(raw_spec) 

81 self._set_defaults(raw_spec) 

82 self._validate_spec(raw_spec) 

83 self._spec = resolve_refs(raw_spec, base_uri=base_uri) 

84 self._base_uri = base_uri 

85 

86 @classmethod 

87 @abc.abstractmethod 

88 def _set_defaults(cls, spec): 

89 """set some default values in the spec""" 

90 

91 @classmethod 

92 def _validate_spec(cls, spec): 

93 """validate spec against schema""" 

94 try: 

95 OpenApiValidator = create_spec_validator(spec) 

96 validator = OpenApiValidator(cls.openapi_schema) 

97 validator.validate(spec) 

98 except jsonschema.exceptions.ValidationError as e: 

99 raise InvalidSpecification.create_from(e) 

100 

101 def get_path_params(self, path): 

102 return deep_get(self._spec, ["paths", path]).get("parameters", []) 

103 

104 def get_operation(self, path, method): 

105 return deep_get(self._spec, ["paths", path, method]) 

106 

107 @property 

108 def raw(self): 

109 return self._raw_spec 

110 

111 @property 

112 def spec(self): 

113 return self._spec 

114 

115 @property 

116 def version(self): 

117 return self._get_spec_version(self._spec) 

118 

119 @property 

120 def security(self): 

121 return self._spec.get("security") 

122 

123 @property 

124 @abc.abstractmethod 

125 def security_schemes(self): 

126 raise NotImplementedError 

127 

128 def __getitem__(self, k): 

129 return self._spec[k] 

130 

131 def __iter__(self): 

132 return self._spec.__iter__() 

133 

134 def __len__(self): 

135 return self._spec.__len__() 

136 

137 @staticmethod 

138 def _load_spec_from_file(arguments, specification): 

139 """ 

140 Loads a YAML specification file, optionally rendering it with Jinja2. 

141 

142 :param arguments: passed to Jinja2 renderer 

143 :param specification: path to specification 

144 """ 

145 arguments = arguments or {} 

146 

147 with specification.open(mode="rb") as openapi_yaml: 

148 contents = openapi_yaml.read() 

149 try: 

150 openapi_template = contents.decode() 

151 except UnicodeDecodeError: 

152 openapi_template = contents.decode("utf-8", "replace") 

153 

154 openapi_string = jinja2.Template(openapi_template).render(**arguments) 

155 return yaml.safe_load(openapi_string) 

156 

157 @classmethod 

158 def from_file(cls, spec, *, arguments=None, base_uri=""): 

159 """ 

160 Takes in a path to a YAML file, and returns a Specification 

161 """ 

162 specification_path = pathlib.Path(spec) 

163 spec = cls._load_spec_from_file(arguments, specification_path) 

164 return cls.from_dict(spec, base_uri=base_uri) 

165 

166 @classmethod 

167 def from_url(cls, spec, *, base_uri=""): 

168 """ 

169 Takes in a path to a YAML file, and returns a Specification 

170 """ 

171 spec = URLHandler()(spec) 

172 return cls.from_dict(spec, base_uri=base_uri) 

173 

174 @staticmethod 

175 def _get_spec_version(spec): 

176 try: 

177 version_string = spec.get("openapi") or spec.get("swagger") 

178 except AttributeError: 

179 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG) 

180 if version_string is None: 

181 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG) 

182 try: 

183 version_tuple = tuple(map(int, version_string.split("."))) 

184 except TypeError: 

185 err = ( 

186 "Unable to convert version string to semantic version tuple: " 

187 "{version_string}." 

188 ) 

189 err = err.format(version_string=version_string) 

190 raise InvalidSpecification(err) 

191 return version_tuple 

192 

193 @classmethod 

194 def from_dict(cls, spec, *, base_uri=""): 

195 """ 

196 Takes in a dictionary, and returns a Specification 

197 """ 

198 

199 def enforce_string_keys(obj): 

200 # YAML supports integer keys, but JSON does not 

201 if isinstance(obj, dict): 

202 return {str(k): enforce_string_keys(v) for k, v in obj.items()} 

203 return obj 

204 

205 spec = enforce_string_keys(spec) 

206 version = cls._get_spec_version(spec) 

207 if version < (3, 0, 0): 

208 return Swagger2Specification(spec, base_uri=base_uri) 

209 return OpenAPISpecification(spec, base_uri=base_uri) 

210 

211 def clone(self): 

212 return type(self)(copy.deepcopy(self._raw_spec), base_uri=self._base_uri) 

213 

214 @classmethod 

215 def load(cls, spec, *, arguments=None): 

216 if isinstance(spec, str) and ( 

217 spec.startswith("http://") or spec.startswith("https://") 

218 ): 

219 return cls.from_url(spec) 

220 if not isinstance(spec, dict): 

221 base_uri = f"{pathlib.Path(spec).parent}{os.sep}" 

222 return cls.from_file(spec, arguments=arguments, base_uri=base_uri) 

223 return cls.from_dict(spec) 

224 

225 def with_base_path(self, base_path): 

226 new_spec = self.clone() 

227 new_spec.base_path = base_path 

228 return new_spec 

229 

230 @property 

231 @abc.abstractmethod 

232 def base_path(self): 

233 pass 

234 

235 @base_path.setter 

236 @abc.abstractmethod 

237 def base_path(self, base_path): 

238 pass 

239 

240 

241class Swagger2Specification(Specification): 

242 """Python interface for a Swagger 2 specification.""" 

243 

244 yaml_name = "swagger.yaml" 

245 operation_cls = Swagger2Operation 

246 

247 openapi_schema = json.loads( 

248 pkgutil.get_data("connexion", "resources/schemas/v2.0/schema.json") # type: ignore 

249 ) 

250 

251 @classmethod 

252 def _set_defaults(cls, spec): 

253 spec.setdefault("produces", []) 

254 spec.setdefault("consumes", ["application/json"]) 

255 spec.setdefault("definitions", {}) 

256 spec.setdefault("parameters", {}) 

257 spec.setdefault("responses", {}) 

258 

259 @property 

260 def produces(self): 

261 return self._spec["produces"] 

262 

263 @property 

264 def consumes(self): 

265 return self._spec["consumes"] 

266 

267 @property 

268 def definitions(self): 

269 return self._spec["definitions"] 

270 

271 @property 

272 def parameter_definitions(self): 

273 return self._spec["parameters"] 

274 

275 @property 

276 def response_definitions(self): 

277 return self._spec["responses"] 

278 

279 @property 

280 def security_schemes(self): 

281 return self._spec.get("securityDefinitions", {}) 

282 

283 @property 

284 def base_path(self): 

285 return canonical_base_path(self._spec.get("basePath", "")) 

286 

287 @base_path.setter 

288 def base_path(self, base_path): 

289 base_path = canonical_base_path(base_path) 

290 self._raw_spec["basePath"] = base_path 

291 self._spec["basePath"] = base_path 

292 

293 

294class OpenAPISpecification(Specification): 

295 """Python interface for an OpenAPI 3 specification.""" 

296 

297 yaml_name = "openapi.yaml" 

298 operation_cls = OpenAPIOperation 

299 

300 openapi_schema = json.loads( 

301 pkgutil.get_data("connexion", "resources/schemas/v3.0/schema.json") # type: ignore 

302 ) 

303 

304 @classmethod 

305 def _set_defaults(cls, spec): 

306 spec.setdefault("components", {}) 

307 

308 @property 

309 def security_schemes(self): 

310 return self._spec["components"].get("securitySchemes", {}) 

311 

312 @property 

313 def components(self): 

314 return self._spec["components"] 

315 

316 @property 

317 def base_path(self): 

318 servers = self._spec.get("servers", []) 

319 try: 

320 # assume we're the first server in list 

321 server = copy.deepcopy(servers[0]) 

322 server_vars = server.pop("variables", {}) 

323 server["url"] = server["url"].format( 

324 **{k: v["default"] for k, v in server_vars.items()} 

325 ) 

326 base_path = urlsplit(server["url"]).path 

327 except IndexError: 

328 base_path = "" 

329 return canonical_base_path(base_path) 

330 

331 @base_path.setter 

332 def base_path(self, base_path): 

333 base_path = canonical_base_path(base_path) 

334 user_servers = [{"url": base_path}] 

335 self._raw_spec["servers"] = user_servers 

336 self._spec["servers"] = user_servers