Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/connexion/spec.py: 47%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2This module defines Python interfaces for OpenAPI specifications.
3"""
5import abc
6import copy
7import json
8import os
9import pathlib
10import pkgutil
11import typing as t
12from collections.abc import Mapping
13from urllib.parse import urlsplit
15import jinja2
16import jsonschema
17import yaml
18from jsonschema import Draft4Validator
19from jsonschema.validators import extend as extend_validator
21from .exceptions import InvalidSpecification
22from .json_schema import NullableTypeValidator, URLHandler, resolve_refs
23from .operations import AbstractOperation, OpenAPIOperation, Swagger2Operation
24from .utils import deep_get
26validate_properties = Draft4Validator.VALIDATORS["properties"]
29def create_spec_validator(spec: dict) -> Draft4Validator:
30 """Create a Validator to validate an OpenAPI spec against the OpenAPI schema.
32 :param spec: specification to validate
33 """
34 # Create an instance validator, which validates defaults against the spec itself instead of
35 # against the OpenAPI schema.
36 InstanceValidator = extend_validator(
37 Draft4Validator, {"type": NullableTypeValidator}
38 )
39 instance_validator = InstanceValidator(spec)
41 def validate_defaults(validator, properties, instance, schema):
42 """Validation function to validate the `properties` subschema, enforcing each default
43 value validates against the schema in which it resides.
44 """
45 valid = True
46 for error in validate_properties(validator, properties, instance, schema):
47 valid = False
48 yield error
50 # Validate default only when the subschema has validated successfully
51 if not valid:
52 return
53 if isinstance(instance, dict) and "default" in instance:
54 for error in instance_validator.evolve(schema=instance).iter_errors(
55 instance["default"]
56 ):
57 yield error
59 SpecValidator = extend_validator(Draft4Validator, {"properties": validate_defaults})
60 return SpecValidator
63NO_SPEC_VERSION_ERR_MSG = """Unable to get the spec version.
64You are missing either '"swagger": "2.0"' or '"openapi": "3.0.0"'
65from the top level of your spec."""
68def canonical_base_path(base_path):
69 """
70 Make given "basePath" a canonical base URL which can be prepended to paths starting with "/".
71 """
72 return base_path.rstrip("/")
75class Specification(Mapping):
77 operation_cls: t.Type[AbstractOperation]
79 def __init__(self, raw_spec, *, base_uri=""):
80 self._raw_spec = copy.deepcopy(raw_spec)
81 self._set_defaults(raw_spec)
82 self._validate_spec(raw_spec)
83 self._spec = resolve_refs(raw_spec, base_uri=base_uri)
85 @classmethod
86 @abc.abstractmethod
87 def _set_defaults(cls, spec):
88 """set some default values in the spec"""
90 @classmethod
91 def _validate_spec(cls, spec):
92 """validate spec against schema"""
93 try:
94 OpenApiValidator = create_spec_validator(spec)
95 validator = OpenApiValidator(cls.openapi_schema)
96 validator.validate(spec)
97 except jsonschema.exceptions.ValidationError as e:
98 raise InvalidSpecification.create_from(e)
100 def get_path_params(self, path):
101 return deep_get(self._spec, ["paths", path]).get("parameters", [])
103 def get_operation(self, path, method):
104 return deep_get(self._spec, ["paths", path, method])
106 @property
107 def raw(self):
108 return self._raw_spec
110 @property
111 def version(self):
112 return self._get_spec_version(self._spec)
114 @property
115 def security(self):
116 return self._spec.get("security")
118 @property
119 @abc.abstractmethod
120 def security_schemes(self):
121 raise NotImplementedError
123 def __getitem__(self, k):
124 return self._spec[k]
126 def __iter__(self):
127 return self._spec.__iter__()
129 def __len__(self):
130 return self._spec.__len__()
132 @staticmethod
133 def _load_spec_from_file(arguments, specification):
134 """
135 Loads a YAML specification file, optionally rendering it with Jinja2.
137 :param arguments: passed to Jinja2 renderer
138 :param specification: path to specification
139 """
140 arguments = arguments or {}
142 with specification.open(mode="rb") as openapi_yaml:
143 contents = openapi_yaml.read()
144 try:
145 openapi_template = contents.decode()
146 except UnicodeDecodeError:
147 openapi_template = contents.decode("utf-8", "replace")
149 openapi_string = jinja2.Template(openapi_template).render(**arguments)
150 return yaml.safe_load(openapi_string)
152 @classmethod
153 def from_file(cls, spec, *, arguments=None, base_uri=""):
154 """
155 Takes in a path to a YAML file, and returns a Specification
156 """
157 specification_path = pathlib.Path(spec)
158 spec = cls._load_spec_from_file(arguments, specification_path)
159 return cls.from_dict(spec, base_uri=base_uri)
161 @classmethod
162 def from_url(cls, spec, *, base_uri=""):
163 """
164 Takes in a path to a YAML file, and returns a Specification
165 """
166 spec = URLHandler()(spec)
167 return cls.from_dict(spec, base_uri=base_uri)
169 @staticmethod
170 def _get_spec_version(spec):
171 try:
172 version_string = spec.get("openapi") or spec.get("swagger")
173 except AttributeError:
174 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG)
175 if version_string is None:
176 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG)
177 try:
178 version_tuple = tuple(map(int, version_string.split(".")))
179 except TypeError:
180 err = (
181 "Unable to convert version string to semantic version tuple: "
182 "{version_string}."
183 )
184 err = err.format(version_string=version_string)
185 raise InvalidSpecification(err)
186 return version_tuple
188 @classmethod
189 def from_dict(cls, spec, *, base_uri=""):
190 """
191 Takes in a dictionary, and returns a Specification
192 """
194 def enforce_string_keys(obj):
195 # YAML supports integer keys, but JSON does not
196 if isinstance(obj, dict):
197 return {str(k): enforce_string_keys(v) for k, v in obj.items()}
198 return obj
200 spec = enforce_string_keys(spec)
201 version = cls._get_spec_version(spec)
202 if version < (3, 0, 0):
203 return Swagger2Specification(spec, base_uri=base_uri)
204 return OpenAPISpecification(spec, base_uri=base_uri)
206 def clone(self):
207 return type(self)(copy.deepcopy(self._spec))
209 @classmethod
210 def load(cls, spec, *, arguments=None):
211 if isinstance(spec, str) and (
212 spec.startswith("http://") or spec.startswith("https://")
213 ):
214 return cls.from_url(spec)
215 if not isinstance(spec, dict):
216 base_uri = f"{pathlib.Path(spec).parent}{os.sep}"
217 return cls.from_file(spec, arguments=arguments, base_uri=base_uri)
218 return cls.from_dict(spec)
220 def with_base_path(self, base_path):
221 new_spec = self.clone()
222 new_spec.base_path = base_path
223 return new_spec
225 @property
226 @abc.abstractmethod
227 def base_path(self):
228 pass
230 @base_path.setter
231 @abc.abstractmethod
232 def base_path(self, base_path):
233 pass
236class Swagger2Specification(Specification):
237 """Python interface for a Swagger 2 specification."""
239 yaml_name = "swagger.yaml"
240 operation_cls = Swagger2Operation
242 openapi_schema = json.loads(
243 pkgutil.get_data("connexion", "resources/schemas/v2.0/schema.json") # type: ignore
244 )
246 @classmethod
247 def _set_defaults(cls, spec):
248 spec.setdefault("produces", [])
249 spec.setdefault("consumes", ["application/json"])
250 spec.setdefault("definitions", {})
251 spec.setdefault("parameters", {})
252 spec.setdefault("responses", {})
254 @property
255 def produces(self):
256 return self._spec["produces"]
258 @property
259 def consumes(self):
260 return self._spec["consumes"]
262 @property
263 def definitions(self):
264 return self._spec["definitions"]
266 @property
267 def parameter_definitions(self):
268 return self._spec["parameters"]
270 @property
271 def response_definitions(self):
272 return self._spec["responses"]
274 @property
275 def security_schemes(self):
276 return self._spec.get("securityDefinitions", {})
278 @property
279 def base_path(self):
280 return canonical_base_path(self._spec.get("basePath", ""))
282 @base_path.setter
283 def base_path(self, base_path):
284 base_path = canonical_base_path(base_path)
285 self._raw_spec["basePath"] = base_path
286 self._spec["basePath"] = base_path
289class OpenAPISpecification(Specification):
290 """Python interface for an OpenAPI 3 specification."""
292 yaml_name = "openapi.yaml"
293 operation_cls = OpenAPIOperation
295 openapi_schema = json.loads(
296 pkgutil.get_data("connexion", "resources/schemas/v3.0/schema.json") # type: ignore
297 )
299 @classmethod
300 def _set_defaults(cls, spec):
301 spec.setdefault("components", {})
303 @property
304 def security_schemes(self):
305 return self._spec["components"].get("securitySchemes", {})
307 @property
308 def components(self):
309 return self._spec["components"]
311 @property
312 def base_path(self):
313 servers = self._spec.get("servers", [])
314 try:
315 # assume we're the first server in list
316 server = copy.deepcopy(servers[0])
317 server_vars = server.pop("variables", {})
318 server["url"] = server["url"].format(
319 **{k: v["default"] for k, v in server_vars.items()}
320 )
321 base_path = urlsplit(server["url"]).path
322 except IndexError:
323 base_path = ""
324 return canonical_base_path(base_path)
326 @base_path.setter
327 def base_path(self, base_path):
328 base_path = canonical_base_path(base_path)
329 user_servers = [{"url": base_path}]
330 self._raw_spec["servers"] = user_servers
331 self._spec["servers"] = user_servers