Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/rfc3986/validators.py: 69%
129 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:04 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:04 +0000
1# Copyright (c) 2017 Ian Stapleton Cordasco
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
11# implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module containing the validation logic for rfc3986."""
15from . import exceptions
16from . import misc
17from . import normalizers
20class Validator:
21 """Object used to configure validation of all objects in rfc3986.
23 .. versionadded:: 1.0
25 Example usage::
27 >>> from rfc3986 import api, validators
28 >>> uri = api.uri_reference('https://github.com/')
29 >>> validator = validators.Validator().require_presence_of(
30 ... 'scheme', 'host', 'path',
31 ... ).allow_schemes(
32 ... 'http', 'https',
33 ... ).allow_hosts(
34 ... '127.0.0.1', 'github.com',
35 ... )
36 >>> validator.validate(uri)
37 >>> invalid_uri = rfc3986.uri_reference('imap://mail.google.com')
38 >>> validator.validate(invalid_uri)
39 Traceback (most recent call last):
40 ...
41 rfc3986.exceptions.MissingComponentError: ('path was required but
42 missing', URIReference(scheme=u'imap', authority=u'mail.google.com',
43 path=None, query=None, fragment=None), ['path'])
45 """
47 COMPONENT_NAMES = frozenset(
48 ["scheme", "userinfo", "host", "port", "path", "query", "fragment"]
49 )
51 def __init__(self):
52 """Initialize our default validations."""
53 self.allowed_schemes = set()
54 self.allowed_hosts = set()
55 self.allowed_ports = set()
56 self.allow_password = True
57 self.required_components = {
58 "scheme": False,
59 "userinfo": False,
60 "host": False,
61 "port": False,
62 "path": False,
63 "query": False,
64 "fragment": False,
65 }
66 self.validated_components = self.required_components.copy()
68 def allow_schemes(self, *schemes):
69 """Require the scheme to be one of the provided schemes.
71 .. versionadded:: 1.0
73 :param schemes:
74 Schemes, without ``://`` that are allowed.
75 :returns:
76 The validator instance.
77 :rtype:
78 Validator
79 """
80 for scheme in schemes:
81 self.allowed_schemes.add(normalizers.normalize_scheme(scheme))
82 return self
84 def allow_hosts(self, *hosts):
85 """Require the host to be one of the provided hosts.
87 .. versionadded:: 1.0
89 :param hosts:
90 Hosts that are allowed.
91 :returns:
92 The validator instance.
93 :rtype:
94 Validator
95 """
96 for host in hosts:
97 self.allowed_hosts.add(normalizers.normalize_host(host))
98 return self
100 def allow_ports(self, *ports):
101 """Require the port to be one of the provided ports.
103 .. versionadded:: 1.0
105 :param ports:
106 Ports that are allowed.
107 :returns:
108 The validator instance.
109 :rtype:
110 Validator
111 """
112 for port in ports:
113 port_int = int(port, base=10)
114 if 0 <= port_int <= 65535:
115 self.allowed_ports.add(port)
116 return self
118 def allow_use_of_password(self):
119 """Allow passwords to be present in the URI.
121 .. versionadded:: 1.0
123 :returns:
124 The validator instance.
125 :rtype:
126 Validator
127 """
128 self.allow_password = True
129 return self
131 def forbid_use_of_password(self):
132 """Prevent passwords from being included in the URI.
134 .. versionadded:: 1.0
136 :returns:
137 The validator instance.
138 :rtype:
139 Validator
140 """
141 self.allow_password = False
142 return self
144 def check_validity_of(self, *components):
145 """Check the validity of the components provided.
147 This can be specified repeatedly.
149 .. versionadded:: 1.1
151 :param components:
152 Names of components from :attr:`Validator.COMPONENT_NAMES`.
153 :returns:
154 The validator instance.
155 :rtype:
156 Validator
157 """
158 components = [c.lower() for c in components]
159 for component in components:
160 if component not in self.COMPONENT_NAMES:
161 raise ValueError(f'"{component}" is not a valid component')
162 self.validated_components.update(
163 {component: True for component in components}
164 )
165 return self
167 def require_presence_of(self, *components):
168 """Require the components provided.
170 This can be specified repeatedly.
172 .. versionadded:: 1.0
174 :param components:
175 Names of components from :attr:`Validator.COMPONENT_NAMES`.
176 :returns:
177 The validator instance.
178 :rtype:
179 Validator
180 """
181 components = [c.lower() for c in components]
182 for component in components:
183 if component not in self.COMPONENT_NAMES:
184 raise ValueError(f'"{component}" is not a valid component')
185 self.required_components.update(
186 {component: True for component in components}
187 )
188 return self
190 def validate(self, uri):
191 """Check a URI for conditions specified on this validator.
193 .. versionadded:: 1.0
195 :param uri:
196 Parsed URI to validate.
197 :type uri:
198 rfc3986.uri.URIReference
199 :raises MissingComponentError:
200 When a required component is missing.
201 :raises UnpermittedComponentError:
202 When a component is not one of those allowed.
203 :raises PasswordForbidden:
204 When a password is present in the userinfo component but is
205 not permitted by configuration.
206 :raises InvalidComponentsError:
207 When a component was found to be invalid.
208 """
209 if not self.allow_password:
210 check_password(uri)
212 required_components = [
213 component
214 for component, required in self.required_components.items()
215 if required
216 ]
217 validated_components = [
218 component
219 for component, required in self.validated_components.items()
220 if required
221 ]
222 if required_components:
223 ensure_required_components_exist(uri, required_components)
224 if validated_components:
225 ensure_components_are_valid(uri, validated_components)
227 ensure_one_of(self.allowed_schemes, uri, "scheme")
228 ensure_one_of(self.allowed_hosts, uri, "host")
229 ensure_one_of(self.allowed_ports, uri, "port")
232def check_password(uri):
233 """Assert that there is no password present in the uri."""
234 userinfo = uri.userinfo
235 if not userinfo:
236 return
237 credentials = userinfo.split(":", 1)
238 if len(credentials) <= 1:
239 return
240 raise exceptions.PasswordForbidden(uri)
243def ensure_one_of(allowed_values, uri, attribute):
244 """Assert that the uri's attribute is one of the allowed values."""
245 value = getattr(uri, attribute)
246 if value is not None and allowed_values and value not in allowed_values:
247 raise exceptions.UnpermittedComponentError(
248 attribute,
249 value,
250 allowed_values,
251 )
254def ensure_required_components_exist(uri, required_components):
255 """Assert that all required components are present in the URI."""
256 missing_components = sorted(
257 component
258 for component in required_components
259 if getattr(uri, component) is None
260 )
261 if missing_components:
262 raise exceptions.MissingComponentError(uri, *missing_components)
265def is_valid(value, matcher, require):
266 """Determine if a value is valid based on the provided matcher.
268 :param str value:
269 Value to validate.
270 :param matcher:
271 Compiled regular expression to use to validate the value.
272 :param require:
273 Whether or not the value is required.
274 """
275 if require:
276 return value is not None and matcher.match(value)
278 # require is False and value is not None
279 return value is None or matcher.match(value)
282def authority_is_valid(authority, host=None, require=False):
283 """Determine if the authority string is valid.
285 :param str authority:
286 The authority to validate.
287 :param str host:
288 (optional) The host portion of the authority to validate.
289 :param bool require:
290 (optional) Specify if authority must not be None.
291 :returns:
292 ``True`` if valid, ``False`` otherwise
293 :rtype:
294 bool
295 """
296 validated = is_valid(authority, misc.SUBAUTHORITY_MATCHER, require)
297 if validated and host is not None:
298 return host_is_valid(host, require)
299 return validated
302def host_is_valid(host, require=False):
303 """Determine if the host string is valid.
305 :param str host:
306 The host to validate.
307 :param bool require:
308 (optional) Specify if host must not be None.
309 :returns:
310 ``True`` if valid, ``False`` otherwise
311 :rtype:
312 bool
313 """
314 validated = is_valid(host, misc.HOST_MATCHER, require)
315 if validated and host is not None and misc.IPv4_MATCHER.match(host):
316 return valid_ipv4_host_address(host)
317 elif validated and host is not None and misc.IPv6_MATCHER.match(host):
318 return misc.IPv6_NO_RFC4007_MATCHER.match(host) is not None
319 return validated
322def scheme_is_valid(scheme, require=False):
323 """Determine if the scheme is valid.
325 :param str scheme:
326 The scheme string to validate.
327 :param bool require:
328 (optional) Set to ``True`` to require the presence of a scheme.
329 :returns:
330 ``True`` if the scheme is valid. ``False`` otherwise.
331 :rtype:
332 bool
333 """
334 return is_valid(scheme, misc.SCHEME_MATCHER, require)
337def path_is_valid(path, require=False):
338 """Determine if the path component is valid.
340 :param str path:
341 The path string to validate.
342 :param bool require:
343 (optional) Set to ``True`` to require the presence of a path.
344 :returns:
345 ``True`` if the path is valid. ``False`` otherwise.
346 :rtype:
347 bool
348 """
349 return is_valid(path, misc.PATH_MATCHER, require)
352def query_is_valid(query, require=False):
353 """Determine if the query component is valid.
355 :param str query:
356 The query string to validate.
357 :param bool require:
358 (optional) Set to ``True`` to require the presence of a query.
359 :returns:
360 ``True`` if the query is valid. ``False`` otherwise.
361 :rtype:
362 bool
363 """
364 return is_valid(query, misc.QUERY_MATCHER, require)
367def fragment_is_valid(fragment, require=False):
368 """Determine if the fragment component is valid.
370 :param str fragment:
371 The fragment string to validate.
372 :param bool require:
373 (optional) Set to ``True`` to require the presence of a fragment.
374 :returns:
375 ``True`` if the fragment is valid. ``False`` otherwise.
376 :rtype:
377 bool
378 """
379 return is_valid(fragment, misc.FRAGMENT_MATCHER, require)
382def valid_ipv4_host_address(host):
383 """Determine if the given host is a valid IPv4 address."""
384 # If the host exists, and it might be IPv4, check each byte in the
385 # address.
386 return all([0 <= int(byte, base=10) <= 255 for byte in host.split(".")])
389_COMPONENT_VALIDATORS = {
390 "scheme": scheme_is_valid,
391 "path": path_is_valid,
392 "query": query_is_valid,
393 "fragment": fragment_is_valid,
394}
396_SUBAUTHORITY_VALIDATORS = {"userinfo", "host", "port"}
399def subauthority_component_is_valid(uri, component):
400 """Determine if the userinfo, host, and port are valid."""
401 try:
402 subauthority_dict = uri.authority_info()
403 except exceptions.InvalidAuthority:
404 return False
406 # If we can parse the authority into sub-components and we're not
407 # validating the port, we can assume it's valid.
408 if component == "host":
409 return host_is_valid(subauthority_dict["host"])
410 elif component != "port":
411 return True
413 try:
414 port = int(subauthority_dict["port"])
415 except TypeError:
416 # If the port wasn't provided it'll be None and int(None) raises a
417 # TypeError
418 return True
420 return 0 <= port <= 65535
423def ensure_components_are_valid(uri, validated_components):
424 """Assert that all components are valid in the URI."""
425 invalid_components = set()
426 for component in validated_components:
427 if component in _SUBAUTHORITY_VALIDATORS:
428 if not subauthority_component_is_valid(uri, component):
429 invalid_components.add(component)
430 # Python's peephole optimizer means that while this continue *is*
431 # actually executed, coverage.py cannot detect that. See also,
432 # https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered
433 continue # nocov: Python 2.7, 3.3, 3.4
435 validator = _COMPONENT_VALIDATORS[component]
436 if not validator(getattr(uri, component)):
437 invalid_components.add(component)
439 if invalid_components:
440 raise exceptions.InvalidComponentsError(uri, *invalid_components)