Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/spec.py: 46%
197 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1"""
2This module defines Python interfaces for OpenAPI specifications.
3"""
5import abc
6import copy
7import json
8import os
9import pathlib
10import pkgutil
11from collections.abc import Mapping
12from urllib.parse import urlsplit
14import jinja2
15import jsonschema
16import yaml
17from jsonschema import Draft4Validator
18from jsonschema.validators import extend as extend_validator
20from .exceptions import InvalidSpecification
21from .json_schema import NullableTypeValidator, resolve_refs
22from .operations import OpenAPIOperation, Swagger2Operation
23from .utils import deep_get
25validate_properties = Draft4Validator.VALIDATORS["properties"]
28def create_spec_validator(spec: dict) -> Draft4Validator:
29 """Create a Validator to validate an OpenAPI spec against the OpenAPI schema.
31 :param spec: specification to validate
32 """
33 # Create an instance validator, which validates defaults against the spec itself instead of
34 # against the OpenAPI schema.
35 InstanceValidator = extend_validator(
36 Draft4Validator, {"type": NullableTypeValidator}
37 )
38 instance_validator = InstanceValidator(spec)
40 def validate_defaults(validator, properties, instance, schema):
41 """Validation function to validate the `properties` subschema, enforcing each default
42 value validates against the schema in which it resides.
43 """
44 valid = True
45 for error in validate_properties(validator, properties, instance, schema):
46 valid = False
47 yield error
49 # Validate default only when the subschema has validated successfully
50 if not valid:
51 return
52 if isinstance(instance, dict) and "default" in instance:
53 for error in instance_validator.evolve(schema=instance).iter_errors(
54 instance["default"]
55 ):
56 yield error
58 SpecValidator = extend_validator(Draft4Validator, {"properties": validate_defaults})
59 return SpecValidator
62NO_SPEC_VERSION_ERR_MSG = """Unable to get the spec version.
63You are missing either '"swagger": "2.0"' or '"openapi": "3.0.0"'
64from the top level of your spec."""
67def canonical_base_path(base_path):
68 """
69 Make given "basePath" a canonical base URL which can be prepended to paths starting with "/".
70 """
71 return base_path.rstrip("/")
74class Specification(Mapping):
75 def __init__(self, raw_spec, *, base_uri=""):
76 self._raw_spec = copy.deepcopy(raw_spec)
77 self._set_defaults(raw_spec)
78 self._validate_spec(raw_spec)
79 self._spec = resolve_refs(raw_spec, base_uri=base_uri)
81 @classmethod
82 @abc.abstractmethod
83 def _set_defaults(cls, spec):
84 """set some default values in the spec"""
86 @classmethod
87 def _validate_spec(cls, spec):
88 """validate spec against schema"""
89 try:
90 OpenApiValidator = create_spec_validator(spec)
91 validator = OpenApiValidator(cls.openapi_schema)
92 validator.validate(spec)
93 except jsonschema.exceptions.ValidationError as e:
94 raise InvalidSpecification.create_from(e)
96 def get_path_params(self, path):
97 return deep_get(self._spec, ["paths", path]).get("parameters", [])
99 def get_operation(self, path, method):
100 return deep_get(self._spec, ["paths", path, method])
102 @property
103 def raw(self):
104 return self._raw_spec
106 @property
107 def version(self):
108 return self._get_spec_version(self._spec)
110 @property
111 def security(self):
112 return self._spec.get("security")
114 @property
115 @abc.abstractmethod
116 def security_schemes(self):
117 raise NotImplementedError
119 def __getitem__(self, k):
120 return self._spec[k]
122 def __iter__(self):
123 return self._spec.__iter__()
125 def __len__(self):
126 return self._spec.__len__()
128 @staticmethod
129 def _load_spec_from_file(arguments, specification):
130 """
131 Loads a YAML specification file, optionally rendering it with Jinja2.
133 :param arguments: passed to Jinja2 renderer
134 :param specification: path to specification
135 """
136 arguments = arguments or {}
138 with specification.open(mode="rb") as openapi_yaml:
139 contents = openapi_yaml.read()
140 try:
141 openapi_template = contents.decode()
142 except UnicodeDecodeError:
143 openapi_template = contents.decode("utf-8", "replace")
145 openapi_string = jinja2.Template(openapi_template).render(**arguments)
146 return yaml.safe_load(openapi_string)
148 @classmethod
149 def from_file(cls, spec, *, arguments=None, base_uri=""):
150 """
151 Takes in a path to a YAML file, and returns a Specification
152 """
153 specification_path = pathlib.Path(spec)
154 spec = cls._load_spec_from_file(arguments, specification_path)
155 return cls.from_dict(spec, base_uri=base_uri)
157 @staticmethod
158 def _get_spec_version(spec):
159 try:
160 version_string = spec.get("openapi") or spec.get("swagger")
161 except AttributeError:
162 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG)
163 if version_string is None:
164 raise InvalidSpecification(NO_SPEC_VERSION_ERR_MSG)
165 try:
166 version_tuple = tuple(map(int, version_string.split(".")))
167 except TypeError:
168 err = (
169 "Unable to convert version string to semantic version tuple: "
170 "{version_string}."
171 )
172 err = err.format(version_string=version_string)
173 raise InvalidSpecification(err)
174 return version_tuple
176 @classmethod
177 def from_dict(cls, spec, *, base_uri=""):
178 """
179 Takes in a dictionary, and returns a Specification
180 """
182 def enforce_string_keys(obj):
183 # YAML supports integer keys, but JSON does not
184 if isinstance(obj, dict):
185 return {str(k): enforce_string_keys(v) for k, v in obj.items()}
186 return obj
188 spec = enforce_string_keys(spec)
189 version = cls._get_spec_version(spec)
190 if version < (3, 0, 0):
191 return Swagger2Specification(spec, base_uri=base_uri)
192 return OpenAPISpecification(spec, base_uri=base_uri)
194 def clone(self):
195 return type(self)(copy.deepcopy(self._spec))
197 @classmethod
198 def load(cls, spec, *, arguments=None):
199 if not isinstance(spec, dict):
200 base_uri = f"{pathlib.Path(spec).parent}{os.sep}"
201 return cls.from_file(spec, arguments=arguments, base_uri=base_uri)
202 return cls.from_dict(spec)
204 def with_base_path(self, base_path):
205 new_spec = self.clone()
206 new_spec.base_path = base_path
207 return new_spec
210class Swagger2Specification(Specification):
211 """Python interface for a Swagger 2 specification."""
213 yaml_name = "swagger.yaml"
214 operation_cls = Swagger2Operation
216 openapi_schema = json.loads(
217 pkgutil.get_data("connexion", "resources/schemas/v2.0/schema.json") # type: ignore
218 )
220 @classmethod
221 def _set_defaults(cls, spec):
222 spec.setdefault("produces", [])
223 spec.setdefault("consumes", ["application/json"])
224 spec.setdefault("definitions", {})
225 spec.setdefault("parameters", {})
226 spec.setdefault("responses", {})
228 @property
229 def produces(self):
230 return self._spec["produces"]
232 @property
233 def consumes(self):
234 return self._spec["consumes"]
236 @property
237 def definitions(self):
238 return self._spec["definitions"]
240 @property
241 def parameter_definitions(self):
242 return self._spec["parameters"]
244 @property
245 def response_definitions(self):
246 return self._spec["responses"]
248 @property
249 def security_schemes(self):
250 return self._spec.get("securityDefinitions", {})
252 @property
253 def base_path(self):
254 return canonical_base_path(self._spec.get("basePath", ""))
256 @base_path.setter
257 def base_path(self, base_path):
258 base_path = canonical_base_path(base_path)
259 self._raw_spec["basePath"] = base_path
260 self._spec["basePath"] = base_path
263class OpenAPISpecification(Specification):
264 """Python interface for an OpenAPI 3 specification."""
266 yaml_name = "openapi.yaml"
267 operation_cls = OpenAPIOperation
269 openapi_schema = json.loads(
270 pkgutil.get_data("connexion", "resources/schemas/v3.0/schema.json") # type: ignore
271 )
273 @classmethod
274 def _set_defaults(cls, spec):
275 spec.setdefault("components", {})
277 @property
278 def security_schemes(self):
279 return self._spec["components"].get("securitySchemes", {})
281 @property
282 def components(self):
283 return self._spec["components"]
285 @property
286 def base_path(self):
287 servers = self._spec.get("servers", [])
288 try:
289 # assume we're the first server in list
290 server = copy.deepcopy(servers[0])
291 server_vars = server.pop("variables", {})
292 server["url"] = server["url"].format(
293 **{k: v["default"] for k, v in server_vars.items()}
294 )
295 base_path = urlsplit(server["url"]).path
296 except IndexError:
297 base_path = ""
298 return canonical_base_path(base_path)
300 @base_path.setter
301 def base_path(self, base_path):
302 base_path = canonical_base_path(base_path)
303 user_servers = [{"url": base_path}]
304 self._raw_spec["servers"] = user_servers
305 self._spec["servers"] = user_servers