Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/connexion/spec.py: 47%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2This module defines Python interfaces for OpenAPI specifications.
3"""
5import abc
6import copy
7import json
8import os
9import pathlib
10import pkgutil
11import typing as t
12from collections.abc import Mapping
13from urllib.parse import urlsplit
15import jinja2
16import jsonschema
17import yaml
18from jsonschema import Draft4Validator
19from jsonschema.validators import extend as extend_validator
21from .exceptions import InvalidSpecification
22from .json_schema import NullableTypeValidator, URLHandler, resolve_refs
23from .operations import AbstractOperation, OpenAPIOperation, Swagger2Operation
24from .utils import deep_get
26validate_properties = Draft4Validator.VALIDATORS["properties"]
29def create_spec_validator(spec: dict) -> Draft4Validator:
30 """Create a Validator to validate an OpenAPI spec against the OpenAPI schema.
32 :param spec: specification to validate
33 """
34 # Create an instance validator, which validates defaults against the spec itself instead of
35 # against the OpenAPI schema.
36 InstanceValidator = extend_validator(
37 Draft4Validator, {"type": NullableTypeValidator}
38 )
39 instance_validator = InstanceValidator(spec)
41 def validate_defaults(validator, properties, instance, schema):
42 """Validation function to validate the `properties` subschema, enforcing each default
43 value validates against the schema in which it resides.
44 """
45 valid = True
46 for error in validate_properties(validator, properties, instance, schema):
47 valid = False
48 yield error
50 # Validate default only when the subschema has validated successfully
51 if not valid:
52 return
53 if isinstance(instance, dict) and "default" in instance:
54 for error in instance_validator.evolve(schema=instance).iter_errors(
55 instance["default"]
56 ):
57 yield error
59 SpecValidator = extend_validator(Draft4Validator, {"properties": validate_defaults})
60 return SpecValidator
63NO_SPEC_VERSION_ERR_MSG = """Unable to get the spec version.
64You are missing either '"swagger": "2.0"' or '"openapi": "3.0.0"'
65from the top level of your spec."""
68def canonical_base_path(base_path):
69 """
70 Make given "basePath" a canonical base URL which can be prepended to paths starting with "/".
71 """
72 return base_path.rstrip("/")
75class Specification(Mapping):
77 operation_cls: t.Type[AbstractOperation]
79 def __init__(self, raw_spec, *, base_uri=""):
80 self._raw_spec = copy.deepcopy(raw_spec)
81 self._set_defaults(raw_spec)
82 self._validate_spec(raw_spec)
83 self._spec = resolve_refs(raw_spec, base_uri=base_uri)
84 self._base_uri = base_uri
86 @classmethod
87 @abc.abstractmethod
88 def _set_defaults(cls, spec):
89 """set some default values in the spec"""
91 @classmethod
92 def _validate_spec(cls, spec):
93 """validate spec against schema"""
94 try:
95 OpenApiValidator = create_spec_validator(spec)
96 validator = OpenApiValidator(cls.openapi_schema)
97 validator.validate(spec)
98 except jsonschema.exceptions.ValidationError as e:
99 raise InvalidSpecification.create_from(e)
101 def get_path_params(self, path):
102 return deep_get(self._spec, ["paths", path]).get("parameters", [])
104 def get_operation(self, path, method):
105 return deep_get(self._spec, ["paths", path, method])
107 @property
108 def raw(self):
109 return self._raw_spec
111 @property
112 def spec(self):
113 return self._spec
115 @property
116 def version(self):
117 return self._get_spec_version(self._spec)
119 @property
120 def security(self):
121 return self._spec.get("security")
123 @property
124 @abc.abstractmethod
125 def security_schemes(self):
126 raise NotImplementedError
128 def __getitem__(self, k):
129 return self._spec[k]
131 def __iter__(self):
132 return self._spec.__iter__()
134 def __len__(self):
135 return self._spec.__len__()
137 @staticmethod
138 def _load_spec_from_file(arguments, specification):
139 """
140 Loads a YAML specification file, optionally rendering it with Jinja2.
142 :param arguments: passed to Jinja2 renderer
143 :param specification: path to specification
144 """
145 arguments = arguments or {}
147 with specification.open(mode="rb") as openapi_yaml:
148 contents = openapi_yaml.read()
149 try:
150 openapi_template = contents.decode()
151 except UnicodeDecodeError:
152 openapi_template = contents.decode("utf-8", "replace")
154 openapi_string = jinja2.Template(openapi_template).render(**arguments)
155 return yaml.safe_load(openapi_string)
157 @classmethod
158 def from_file(cls, spec, *, arguments=None, base_uri=""):
159 """
160 Takes in a path to a YAML file, and returns a Specification
161 """
162 specification_path = pathlib.Path(spec)
163 spec = cls._load_spec_from_file(arguments, specification_path)
164 return cls.from_dict(spec, base_uri=base_uri)
166 @classmethod
167 def from_url(cls, spec, *, base_uri=""):
168 """
169 Takes in a path to a YAML file, and returns a Specification
170 """
171 spec = URLHandler()(spec)
172 return cls.from_dict(spec, base_uri=base_uri)
174 @staticmethod
175 def _get_spec_version(spec):
176 try:
177 version_string = spec.get("openapi") or spec.get("swagger")
178 except AttributeError:
179 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG)
180 if version_string is None:
181 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG)
182 try:
183 version_tuple = tuple(map(int, version_string.split(".")))
184 except TypeError:
185 err = (
186 "Unable to convert version string to semantic version tuple: "
187 "{version_string}."
188 )
189 err = err.format(version_string=version_string)
190 raise InvalidSpecification(err)
191 return version_tuple
193 @classmethod
194 def from_dict(cls, spec, *, base_uri=""):
195 """
196 Takes in a dictionary, and returns a Specification
197 """
199 def enforce_string_keys(obj):
200 # YAML supports integer keys, but JSON does not
201 if isinstance(obj, dict):
202 return {str(k): enforce_string_keys(v) for k, v in obj.items()}
203 return obj
205 spec = enforce_string_keys(spec)
206 version = cls._get_spec_version(spec)
207 if version < (3, 0, 0):
208 return Swagger2Specification(spec, base_uri=base_uri)
209 return OpenAPISpecification(spec, base_uri=base_uri)
211 def clone(self):
212 return type(self)(copy.deepcopy(self._raw_spec), base_uri=self._base_uri)
214 @classmethod
215 def load(cls, spec, *, arguments=None):
216 if isinstance(spec, str) and (
217 spec.startswith("http://") or spec.startswith("https://")
218 ):
219 return cls.from_url(spec)
220 if not isinstance(spec, dict):
221 base_uri = f"{pathlib.Path(spec).parent}{os.sep}"
222 return cls.from_file(spec, arguments=arguments, base_uri=base_uri)
223 return cls.from_dict(spec)
225 def with_base_path(self, base_path):
226 new_spec = self.clone()
227 new_spec.base_path = base_path
228 return new_spec
230 @property
231 @abc.abstractmethod
232 def base_path(self):
233 pass
235 @base_path.setter
236 @abc.abstractmethod
237 def base_path(self, base_path):
238 pass
241class Swagger2Specification(Specification):
242 """Python interface for a Swagger 2 specification."""
244 yaml_name = "swagger.yaml"
245 operation_cls = Swagger2Operation
247 openapi_schema = json.loads(
248 pkgutil.get_data("connexion", "resources/schemas/v2.0/schema.json") # type: ignore
249 )
251 @classmethod
252 def _set_defaults(cls, spec):
253 spec.setdefault("produces", [])
254 spec.setdefault("consumes", ["application/json"])
255 spec.setdefault("definitions", {})
256 spec.setdefault("parameters", {})
257 spec.setdefault("responses", {})
259 @property
260 def produces(self):
261 return self._spec["produces"]
263 @property
264 def consumes(self):
265 return self._spec["consumes"]
267 @property
268 def definitions(self):
269 return self._spec["definitions"]
271 @property
272 def parameter_definitions(self):
273 return self._spec["parameters"]
275 @property
276 def response_definitions(self):
277 return self._spec["responses"]
279 @property
280 def security_schemes(self):
281 return self._spec.get("securityDefinitions", {})
283 @property
284 def base_path(self):
285 return canonical_base_path(self._spec.get("basePath", ""))
287 @base_path.setter
288 def base_path(self, base_path):
289 base_path = canonical_base_path(base_path)
290 self._raw_spec["basePath"] = base_path
291 self._spec["basePath"] = base_path
294class OpenAPISpecification(Specification):
295 """Python interface for an OpenAPI 3 specification."""
297 yaml_name = "openapi.yaml"
298 operation_cls = OpenAPIOperation
300 openapi_schema = json.loads(
301 pkgutil.get_data("connexion", "resources/schemas/v3.0/schema.json") # type: ignore
302 )
304 @classmethod
305 def _set_defaults(cls, spec):
306 spec.setdefault("components", {})
308 @property
309 def security_schemes(self):
310 return self._spec["components"].get("securitySchemes", {})
312 @property
313 def components(self):
314 return self._spec["components"]
316 @property
317 def base_path(self):
318 servers = self._spec.get("servers", [])
319 try:
320 # assume we're the first server in list
321 server = copy.deepcopy(servers[0])
322 server_vars = server.pop("variables", {})
323 server["url"] = server["url"].format(
324 **{k: v["default"] for k, v in server_vars.items()}
325 )
326 base_path = urlsplit(server["url"]).path
327 except IndexError:
328 base_path = ""
329 return canonical_base_path(base_path)
331 @base_path.setter
332 def base_path(self, base_path):
333 base_path = canonical_base_path(base_path)
334 user_servers = [{"url": base_path}]
335 self._raw_spec["servers"] = user_servers
336 self._spec["servers"] = user_servers