Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rfc3986/validators.py: 83%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2017 Ian Stapleton Cordasco
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
11# implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module containing the validation logic for rfc3986."""
16import typing as t
18from . import exceptions
19from . import misc
20from . import normalizers
21from . import uri
22from ._typing_compat import Self as _Self
25class Validator:
26 """Object used to configure validation of all objects in rfc3986.
28 .. versionadded:: 1.0
30 Example usage::
32 >>> from rfc3986 import api, validators
33 >>> uri = api.uri_reference('https://github.com/')
34 >>> validator = validators.Validator().require_presence_of(
35 ... 'scheme', 'host', 'path',
36 ... ).allow_schemes(
37 ... 'http', 'https',
38 ... ).allow_hosts(
39 ... '127.0.0.1', 'github.com',
40 ... )
41 >>> validator.validate(uri)
42 >>> invalid_uri = rfc3986.uri_reference('imap://mail.google.com')
43 >>> validator.validate(invalid_uri)
44 Traceback (most recent call last):
45 ...
46 rfc3986.exceptions.MissingComponentError: ('path was required but
47 missing', URIReference(scheme=u'imap', authority=u'mail.google.com',
48 path=None, query=None, fragment=None), ['path'])
50 """
52 COMPONENT_NAMES = frozenset(
53 ["scheme", "userinfo", "host", "port", "path", "query", "fragment"]
54 )
56 def __init__(self) -> None:
57 """Initialize our default validations."""
58 self.allowed_schemes: t.Set[str] = set()
59 self.allowed_hosts: t.Set[str] = set()
60 self.allowed_ports: t.Set[str] = set()
61 self.allow_password: bool = True
62 self.required_components: t.Dict[str, bool] = {
63 "scheme": False,
64 "userinfo": False,
65 "host": False,
66 "port": False,
67 "path": False,
68 "query": False,
69 "fragment": False,
70 }
71 self.validated_components: t.Dict[str, bool] = (
72 self.required_components.copy()
73 )
75 def allow_schemes(self, *schemes: str) -> _Self:
76 """Require the scheme to be one of the provided schemes.
78 .. versionadded:: 1.0
80 :param schemes:
81 Schemes, without ``://`` that are allowed.
82 :returns:
83 The validator instance.
84 :rtype:
85 Validator
86 """
87 for scheme in schemes:
88 self.allowed_schemes.add(normalizers.normalize_scheme(scheme))
89 return self
91 def allow_hosts(self, *hosts: str) -> _Self:
92 """Require the host to be one of the provided hosts.
94 .. versionadded:: 1.0
96 :param hosts:
97 Hosts that are allowed.
98 :returns:
99 The validator instance.
100 :rtype:
101 Validator
102 """
103 for host in hosts:
104 self.allowed_hosts.add(normalizers.normalize_host(host))
105 return self
107 def allow_ports(self, *ports: str) -> _Self:
108 """Require the port to be one of the provided ports.
110 .. versionadded:: 1.0
112 :param ports:
113 Ports that are allowed.
114 :returns:
115 The validator instance.
116 :rtype:
117 Validator
118 """
119 for port in ports:
120 port_int = int(port, base=10)
121 if 0 <= port_int <= 65535:
122 self.allowed_ports.add(port)
123 return self
125 def allow_use_of_password(self) -> _Self:
126 """Allow passwords to be present in the URI.
128 .. versionadded:: 1.0
130 :returns:
131 The validator instance.
132 :rtype:
133 Validator
134 """
135 self.allow_password = True
136 return self
138 def forbid_use_of_password(self) -> _Self:
139 """Prevent passwords from being included in the URI.
141 .. versionadded:: 1.0
143 :returns:
144 The validator instance.
145 :rtype:
146 Validator
147 """
148 self.allow_password = False
149 return self
151 def check_validity_of(self, *components: str) -> _Self:
152 """Check the validity of the components provided.
154 This can be specified repeatedly.
156 .. versionadded:: 1.1
158 :param components:
159 Names of components from :attr:`Validator.COMPONENT_NAMES`.
160 :returns:
161 The validator instance.
162 :rtype:
163 Validator
164 """
165 components = tuple(c.lower() for c in components)
166 for component in components:
167 if component not in self.COMPONENT_NAMES:
168 raise ValueError(f'"{component}" is not a valid component')
169 self.validated_components.update(
170 {component: True for component in components}
171 )
172 return self
174 def require_presence_of(self, *components: str) -> _Self:
175 """Require the components provided.
177 This can be specified repeatedly.
179 .. versionadded:: 1.0
181 :param components:
182 Names of components from :attr:`Validator.COMPONENT_NAMES`.
183 :returns:
184 The validator instance.
185 :rtype:
186 Validator
187 """
188 components = tuple(c.lower() for c in components)
189 for component in components:
190 if component not in self.COMPONENT_NAMES:
191 raise ValueError(f'"{component}" is not a valid component')
192 self.required_components.update(
193 {component: True for component in components}
194 )
195 return self
197 def validate(self, uri: "uri.URIReference") -> None:
198 """Check a URI for conditions specified on this validator.
200 .. versionadded:: 1.0
202 :param uri:
203 Parsed URI to validate.
204 :type uri:
205 rfc3986.uri.URIReference
206 :raises MissingComponentError:
207 When a required component is missing.
208 :raises UnpermittedComponentError:
209 When a component is not one of those allowed.
210 :raises PasswordForbidden:
211 When a password is present in the userinfo component but is
212 not permitted by configuration.
213 :raises InvalidComponentsError:
214 When a component was found to be invalid.
215 """
216 if not self.allow_password:
217 check_password(uri)
219 required_components = [
220 component
221 for component, required in self.required_components.items()
222 if required
223 ]
224 validated_components = [
225 component
226 for component, required in self.validated_components.items()
227 if required
228 ]
229 if required_components:
230 ensure_required_components_exist(uri, required_components)
231 if validated_components:
232 ensure_components_are_valid(uri, validated_components)
234 ensure_one_of(self.allowed_schemes, uri, "scheme")
235 ensure_one_of(self.allowed_hosts, uri, "host")
236 ensure_one_of(self.allowed_ports, uri, "port")
239def check_password(uri: "uri.URIReference") -> None:
240 """Assert that there is no password present in the uri."""
241 userinfo = uri.userinfo
242 if not userinfo:
243 return
244 credentials = userinfo.split(":", 1)
245 if len(credentials) <= 1:
246 return
247 raise exceptions.PasswordForbidden(uri)
250def ensure_one_of(
251 allowed_values: t.Collection[object],
252 uri: "uri.URIReference",
253 attribute: str,
254) -> None:
255 """Assert that the uri's attribute is one of the allowed values."""
256 value = getattr(uri, attribute)
257 if value is not None and allowed_values and value not in allowed_values:
258 raise exceptions.UnpermittedComponentError(
259 attribute,
260 value,
261 allowed_values,
262 )
265def ensure_required_components_exist(
266 uri: "uri.URIReference",
267 required_components: t.Iterable[str],
268) -> None:
269 """Assert that all required components are present in the URI."""
270 missing_components = sorted(
271 component
272 for component in required_components
273 if getattr(uri, component) is None
274 )
275 if missing_components:
276 raise exceptions.MissingComponentError(uri, *missing_components)
279def is_valid(
280 value: t.Optional[str],
281 matcher: t.Pattern[str],
282 require: bool,
283) -> bool:
284 """Determine if a value is valid based on the provided matcher.
286 :param str value:
287 Value to validate.
288 :param matcher:
289 Compiled regular expression to use to validate the value.
290 :param require:
291 Whether or not the value is required.
292 """
293 if require:
294 return value is not None and bool(matcher.match(value))
296 # require is False and value is not None
297 return value is None or bool(matcher.match(value))
300def authority_is_valid(
301 authority: t.Optional[str],
302 host: t.Optional[str] = None,
303 require: bool = False,
304) -> bool:
305 """Determine if the authority string is valid.
307 :param str authority:
308 The authority to validate.
309 :param str host:
310 (optional) The host portion of the authority to validate.
311 :param bool require:
312 (optional) Specify if authority must not be None.
313 :returns:
314 ``True`` if valid, ``False`` otherwise
315 :rtype:
316 bool
317 """
318 validated = is_valid(authority, misc.SUBAUTHORITY_MATCHER, require)
319 if validated and host is not None:
320 return host_is_valid(host, require)
321 return validated
324def host_is_valid(host: t.Optional[str], require: bool = False) -> bool:
325 """Determine if the host string is valid.
327 :param str host:
328 The host to validate.
329 :param bool require:
330 (optional) Specify if host must not be None.
331 :returns:
332 ``True`` if valid, ``False`` otherwise
333 :rtype:
334 bool
335 """
336 validated = is_valid(host, misc.HOST_MATCHER, require)
337 if validated and host is not None and misc.IPv4_MATCHER.match(host):
338 return valid_ipv4_host_address(host)
339 elif validated and host is not None and misc.IPv6_MATCHER.match(host):
340 return misc.IPv6_NO_RFC4007_MATCHER.match(host) is not None
341 return validated
344def scheme_is_valid(scheme: t.Optional[str], require: bool = False) -> bool:
345 """Determine if the scheme is valid.
347 :param str scheme:
348 The scheme string to validate.
349 :param bool require:
350 (optional) Set to ``True`` to require the presence of a scheme.
351 :returns:
352 ``True`` if the scheme is valid. ``False`` otherwise.
353 :rtype:
354 bool
355 """
356 return is_valid(scheme, misc.SCHEME_MATCHER, require)
359def path_is_valid(path: t.Optional[str], require: bool = False) -> bool:
360 """Determine if the path component is valid.
362 :param str path:
363 The path string to validate.
364 :param bool require:
365 (optional) Set to ``True`` to require the presence of a path.
366 :returns:
367 ``True`` if the path is valid. ``False`` otherwise.
368 :rtype:
369 bool
370 """
371 return is_valid(path, misc.PATH_MATCHER, require)
374def query_is_valid(query: t.Optional[str], require: bool = False) -> bool:
375 """Determine if the query component is valid.
377 :param str query:
378 The query string to validate.
379 :param bool require:
380 (optional) Set to ``True`` to require the presence of a query.
381 :returns:
382 ``True`` if the query is valid. ``False`` otherwise.
383 :rtype:
384 bool
385 """
386 return is_valid(query, misc.QUERY_MATCHER, require)
389def fragment_is_valid(
390 fragment: t.Optional[str],
391 require: bool = False,
392) -> bool:
393 """Determine if the fragment component is valid.
395 :param str fragment:
396 The fragment string to validate.
397 :param bool require:
398 (optional) Set to ``True`` to require the presence of a fragment.
399 :returns:
400 ``True`` if the fragment is valid. ``False`` otherwise.
401 :rtype:
402 bool
403 """
404 return is_valid(fragment, misc.FRAGMENT_MATCHER, require)
407def valid_ipv4_host_address(host: str) -> bool:
408 """Determine if the given host is a valid IPv4 address."""
409 # If the host exists, and it might be IPv4, check each byte in the
410 # address.
411 return all([0 <= int(byte, base=10) <= 255 for byte in host.split(".")])
414_COMPONENT_VALIDATORS = {
415 "scheme": scheme_is_valid,
416 "path": path_is_valid,
417 "query": query_is_valid,
418 "fragment": fragment_is_valid,
419}
421_SUBAUTHORITY_VALIDATORS = {"userinfo", "host", "port"}
424def subauthority_component_is_valid(
425 uri: "uri.URIReference",
426 component: str,
427) -> bool:
428 """Determine if the userinfo, host, and port are valid."""
429 try:
430 subauthority_dict = uri.authority_info()
431 except exceptions.InvalidAuthority:
432 return False
434 # If we can parse the authority into sub-components and we're not
435 # validating the port, we can assume it's valid.
436 if component == "host":
437 return host_is_valid(subauthority_dict["host"])
438 elif component != "port":
439 return True
441 port = subauthority_dict["port"]
443 if port is None:
444 return True
446 # We know it has to have fewer than 6 digits if it exists.
447 if not (port.isdigit() and len(port) < 6): # pragma: no cover
448 # This branch can only execute when this function is called directly
449 # with a URI reference manually constructed with an invalid port.
450 # Such a use case is unsupported, since this function isn't part of
451 # the public API.
452 return False
454 return 0 <= int(port) <= 65535
457def ensure_components_are_valid(
458 uri: "uri.URIReference",
459 validated_components: t.List[str],
460) -> None:
461 """Assert that all components are valid in the URI."""
462 invalid_components: set[str] = set()
463 for component in validated_components:
464 if component in _SUBAUTHORITY_VALIDATORS:
465 if not subauthority_component_is_valid(uri, component):
466 invalid_components.add(component)
467 # Python's peephole optimizer means that while this continue *is*
468 # actually executed, coverage.py cannot detect that. See also,
469 # https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered
470 continue # nocov: Python 2.7, 3.3, 3.4
472 validator = _COMPONENT_VALIDATORS[component]
473 if not validator(getattr(uri, component)):
474 invalid_components.add(component)
476 if invalid_components:
477 raise exceptions.InvalidComponentsError(uri, *invalid_components)