1"""
2Module containing all code related to json schema validation.
3"""
4
5import contextlib
6import io
7import os
8import typing as t
9import urllib.parse
10import urllib.request
11from collections.abc import Mapping
12from copy import deepcopy
13
14import requests
15import yaml
16from jsonschema import Draft4Validator, RefResolver
17from jsonschema.exceptions import RefResolutionError, ValidationError # noqa
18from jsonschema.validators import extend
19
20from .utils import deep_get
21
22
23class ExtendedSafeLoader(yaml.SafeLoader):
24 """Extends the yaml SafeLoader to coerce all keys to string so the result is valid json."""
25
26 def __init__(self, stream):
27 self.original_construct_mapping = self.construct_mapping
28 self.construct_mapping = self.extended_construct_mapping
29 super().__init__(stream)
30
31 def extended_construct_mapping(self, node, deep=False):
32 data = self.original_construct_mapping(node, deep)
33 return {str(key): data[key] for key in data}
34
35
36class FileHandler:
37 """Handler to resolve file refs."""
38
39 def __call__(self, uri):
40 filepath = self._uri_to_path(uri)
41 with open(filepath) as fh:
42 return yaml.load(fh, ExtendedSafeLoader)
43
44 @staticmethod
45 def _uri_to_path(uri):
46 parsed = urllib.parse.urlparse(uri)
47 host = "{0}{0}{mnt}{0}".format(os.path.sep, mnt=parsed.netloc)
48 return os.path.abspath(
49 os.path.join(host, urllib.request.url2pathname(parsed.path))
50 )
51
52
53class URLHandler:
54 """Handler to resolve url refs."""
55
56 def __call__(self, uri):
57 response = requests.get(uri)
58 response.raise_for_status()
59
60 data = io.StringIO(response.text)
61 with contextlib.closing(data) as fh:
62 return yaml.load(fh, ExtendedSafeLoader)
63
64
65handlers = {
66 "http": URLHandler(),
67 "https": URLHandler(),
68 "file": FileHandler(),
69 "": FileHandler(),
70}
71
72
73def resolve_refs(spec, store=None, base_uri=""):
74 """
75 Resolve JSON references like {"$ref": <some URI>} in a spec.
76 Optionally takes a store, which is a mapping from reference URLs to a
77 dereferenced objects. Prepopulating the store can avoid network calls.
78 """
79 spec = deepcopy(spec)
80 store = store or {}
81 resolver = RefResolver(base_uri, spec, store, handlers=handlers)
82
83 def _do_resolve(node):
84 if isinstance(node, Mapping) and "$ref" in node:
85 path = node["$ref"][2:].split("/")
86 try:
87 # resolve known references
88 retrieved = deep_get(spec, path)
89 node.update(retrieved)
90 if isinstance(retrieved, Mapping) and "$ref" in retrieved:
91 node = _do_resolve(node)
92 node.pop("$ref", None)
93 return node
94 except KeyError:
95 # resolve external references
96 with resolver.resolving(node["$ref"]) as resolved:
97 return _do_resolve(resolved)
98 elif isinstance(node, Mapping):
99 for k, v in node.items():
100 node[k] = _do_resolve(v)
101 elif isinstance(node, (list, tuple)):
102 for i, _ in enumerate(node):
103 node[i] = _do_resolve(node[i])
104 return node
105
106 res = _do_resolve(spec)
107 return res
108
109
110def format_error_with_path(exception: ValidationError) -> str:
111 """Format a `ValidationError` with path to error."""
112 error_path = ".".join(str(item) for item in exception.path)
113 error_path_msg = f" - '{error_path}'" if error_path else ""
114 return error_path_msg
115
116
117def allow_nullable(validation_fn: t.Callable) -> t.Callable:
118 """Extend an existing validation function, so it allows nullable values to be null."""
119
120 def nullable_validation_fn(validator, to_validate, instance, schema):
121 if instance is None and (
122 schema.get("x-nullable") is True or schema.get("nullable")
123 ):
124 return
125
126 yield from validation_fn(validator, to_validate, instance, schema)
127
128 return nullable_validation_fn
129
130
131def validate_writeOnly(validator, wo, instance, schema):
132 yield ValidationError("Property is write-only")
133
134
135NullableTypeValidator = allow_nullable(Draft4Validator.VALIDATORS["type"])
136NullableEnumValidator = allow_nullable(Draft4Validator.VALIDATORS["enum"])
137
138Draft4RequestValidator = extend(
139 Draft4Validator,
140 {
141 "type": NullableTypeValidator,
142 "enum": NullableEnumValidator,
143 },
144)
145
146Draft4ResponseValidator = extend(
147 Draft4Validator,
148 {
149 "type": NullableTypeValidator,
150 "enum": NullableEnumValidator,
151 "writeOnly": validate_writeOnly,
152 "x-writeOnly": validate_writeOnly,
153 },
154)