Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/connexion/json_schema.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

83 statements  

1""" 

2Module containing all code related to json schema validation. 

3""" 

4 

5import contextlib 

6import io 

7import os 

8import typing as t 

9import urllib.parse 

10import urllib.request 

11from collections.abc import Mapping 

12from copy import deepcopy 

13 

14import requests 

15import yaml 

16from jsonschema import Draft4Validator, RefResolver 

17from jsonschema.exceptions import RefResolutionError, ValidationError # noqa 

18from jsonschema.validators import extend 

19 

20from .utils import deep_get 

21 

22 

23class ExtendedSafeLoader(yaml.SafeLoader): 

24 """Extends the yaml SafeLoader to coerce all keys to string so the result is valid json.""" 

25 

26 def __init__(self, stream): 

27 self.original_construct_mapping = self.construct_mapping 

28 self.construct_mapping = self.extended_construct_mapping 

29 super().__init__(stream) 

30 

31 def extended_construct_mapping(self, node, deep=False): 

32 data = self.original_construct_mapping(node, deep) 

33 return {str(key): data[key] for key in data} 

34 

35 

36class FileHandler: 

37 """Handler to resolve file refs.""" 

38 

39 def __call__(self, uri): 

40 filepath = self._uri_to_path(uri) 

41 with open(filepath) as fh: 

42 return yaml.load(fh, ExtendedSafeLoader) 

43 

44 @staticmethod 

45 def _uri_to_path(uri): 

46 parsed = urllib.parse.urlparse(uri) 

47 host = "{0}{0}{mnt}{0}".format(os.path.sep, mnt=parsed.netloc) 

48 return os.path.abspath( 

49 os.path.join(host, urllib.request.url2pathname(parsed.path)) 

50 ) 

51 

52 

53class URLHandler: 

54 """Handler to resolve url refs.""" 

55 

56 def __call__(self, uri): 

57 response = requests.get(uri) 

58 response.raise_for_status() 

59 

60 data = io.StringIO(response.text) 

61 with contextlib.closing(data) as fh: 

62 return yaml.load(fh, ExtendedSafeLoader) 

63 

64 

65handlers = { 

66 "http": URLHandler(), 

67 "https": URLHandler(), 

68 "file": FileHandler(), 

69 "": FileHandler(), 

70} 

71 

72 

73def resolve_refs(spec, store=None, base_uri=""): 

74 """ 

75 Resolve JSON references like {"$ref": <some URI>} in a spec. 

76 Optionally takes a store, which is a mapping from reference URLs to a 

77 dereferenced objects. Prepopulating the store can avoid network calls. 

78 """ 

79 spec = deepcopy(spec) 

80 store = store or {} 

81 resolver = RefResolver(base_uri, spec, store, handlers=handlers) 

82 

83 def _do_resolve(node): 

84 if isinstance(node, Mapping) and "$ref" in node: 

85 path = node["$ref"][2:].split("/") 

86 try: 

87 # resolve known references 

88 retrieved = deep_get(spec, path) 

89 node.update(retrieved) 

90 if isinstance(retrieved, Mapping) and "$ref" in retrieved: 

91 node = _do_resolve(node) 

92 node.pop("$ref", None) 

93 return node 

94 except KeyError: 

95 # resolve external references 

96 with resolver.resolving(node["$ref"]) as resolved: 

97 return _do_resolve(resolved) 

98 elif isinstance(node, Mapping): 

99 for k, v in node.items(): 

100 node[k] = _do_resolve(v) 

101 elif isinstance(node, (list, tuple)): 

102 for i, _ in enumerate(node): 

103 node[i] = _do_resolve(node[i]) 

104 return node 

105 

106 res = _do_resolve(spec) 

107 return res 

108 

109 

110def format_error_with_path(exception: ValidationError) -> str: 

111 """Format a `ValidationError` with path to error.""" 

112 error_path = ".".join(str(item) for item in exception.path) 

113 error_path_msg = f" - '{error_path}'" if error_path else "" 

114 return error_path_msg 

115 

116 

117def allow_nullable(validation_fn: t.Callable) -> t.Callable: 

118 """Extend an existing validation function, so it allows nullable values to be null.""" 

119 

120 def nullable_validation_fn(validator, to_validate, instance, schema): 

121 if instance is None and ( 

122 schema.get("x-nullable") is True or schema.get("nullable") 

123 ): 

124 return 

125 

126 yield from validation_fn(validator, to_validate, instance, schema) 

127 

128 return nullable_validation_fn 

129 

130 

131def validate_writeOnly(validator, wo, instance, schema): 

132 yield ValidationError("Property is write-only") 

133 

134 

135NullableTypeValidator = allow_nullable(Draft4Validator.VALIDATORS["type"]) 

136NullableEnumValidator = allow_nullable(Draft4Validator.VALIDATORS["enum"]) 

137 

138Draft4RequestValidator = extend( 

139 Draft4Validator, 

140 { 

141 "type": NullableTypeValidator, 

142 "enum": NullableEnumValidator, 

143 }, 

144) 

145 

146Draft4ResponseValidator = extend( 

147 Draft4Validator, 

148 { 

149 "type": NullableTypeValidator, 

150 "enum": NullableEnumValidator, 

151 "writeOnly": validate_writeOnly, 

152 "x-writeOnly": validate_writeOnly, 

153 }, 

154)