Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rfc3986/validators.py: 83%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2017 Ian Stapleton Cordasco
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
11# implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module containing the validation logic for rfc3986."""
15import typing as t
17from . import exceptions
18from . import misc
19from . import normalizers
20from . import uri
21from ._typing_compat import Self as _Self
24class Validator:
25 """Object used to configure validation of all objects in rfc3986.
27 .. versionadded:: 1.0
29 Example usage::
31 >>> from rfc3986 import api, validators
32 >>> uri = api.uri_reference('https://github.com/')
33 >>> validator = validators.Validator().require_presence_of(
34 ... 'scheme', 'host', 'path',
35 ... ).allow_schemes(
36 ... 'http', 'https',
37 ... ).allow_hosts(
38 ... '127.0.0.1', 'github.com',
39 ... )
40 >>> validator.validate(uri)
41 >>> invalid_uri = rfc3986.uri_reference('imap://mail.google.com')
42 >>> validator.validate(invalid_uri)
43 Traceback (most recent call last):
44 ...
45 rfc3986.exceptions.MissingComponentError: ('path was required but
46 missing', URIReference(scheme=u'imap', authority=u'mail.google.com',
47 path=None, query=None, fragment=None), ['path'])
49 """
51 COMPONENT_NAMES = frozenset(
52 ["scheme", "userinfo", "host", "port", "path", "query", "fragment"]
53 )
55 def __init__(self) -> None:
56 """Initialize our default validations."""
57 self.allowed_schemes: t.Set[str] = set()
58 self.allowed_hosts: t.Set[str] = set()
59 self.allowed_ports: t.Set[str] = set()
60 self.allow_password: bool = True
61 self.required_components: t.Dict[str, bool] = {
62 "scheme": False,
63 "userinfo": False,
64 "host": False,
65 "port": False,
66 "path": False,
67 "query": False,
68 "fragment": False,
69 }
70 self.validated_components: t.Dict[str, bool] = (
71 self.required_components.copy()
72 )
74 def allow_schemes(self, *schemes: str) -> _Self:
75 """Require the scheme to be one of the provided schemes.
77 .. versionadded:: 1.0
79 :param schemes:
80 Schemes, without ``://`` that are allowed.
81 :returns:
82 The validator instance.
83 :rtype:
84 Validator
85 """
86 for scheme in schemes:
87 self.allowed_schemes.add(normalizers.normalize_scheme(scheme))
88 return self
90 def allow_hosts(self, *hosts: str) -> _Self:
91 """Require the host to be one of the provided hosts.
93 .. versionadded:: 1.0
95 :param hosts:
96 Hosts that are allowed.
97 :returns:
98 The validator instance.
99 :rtype:
100 Validator
101 """
102 for host in hosts:
103 self.allowed_hosts.add(normalizers.normalize_host(host))
104 return self
106 def allow_ports(self, *ports: str) -> _Self:
107 """Require the port to be one of the provided ports.
109 .. versionadded:: 1.0
111 :param ports:
112 Ports that are allowed.
113 :returns:
114 The validator instance.
115 :rtype:
116 Validator
117 """
118 for port in ports:
119 port_int = int(port, base=10)
120 if 0 <= port_int <= 65535:
121 self.allowed_ports.add(port)
122 return self
124 def allow_use_of_password(self) -> _Self:
125 """Allow passwords to be present in the URI.
127 .. versionadded:: 1.0
129 :returns:
130 The validator instance.
131 :rtype:
132 Validator
133 """
134 self.allow_password = True
135 return self
137 def forbid_use_of_password(self) -> _Self:
138 """Prevent passwords from being included in the URI.
140 .. versionadded:: 1.0
142 :returns:
143 The validator instance.
144 :rtype:
145 Validator
146 """
147 self.allow_password = False
148 return self
150 def check_validity_of(self, *components: str) -> _Self:
151 """Check the validity of the components provided.
153 This can be specified repeatedly.
155 .. versionadded:: 1.1
157 :param components:
158 Names of components from :attr:`Validator.COMPONENT_NAMES`.
159 :returns:
160 The validator instance.
161 :rtype:
162 Validator
163 """
164 components = tuple(c.lower() for c in components)
165 for component in components:
166 if component not in self.COMPONENT_NAMES:
167 raise ValueError(f'"{component}" is not a valid component')
168 self.validated_components.update(
169 {component: True for component in components}
170 )
171 return self
173 def require_presence_of(self, *components: str) -> _Self:
174 """Require the components provided.
176 This can be specified repeatedly.
178 .. versionadded:: 1.0
180 :param components:
181 Names of components from :attr:`Validator.COMPONENT_NAMES`.
182 :returns:
183 The validator instance.
184 :rtype:
185 Validator
186 """
187 components = tuple(c.lower() for c in components)
188 for component in components:
189 if component not in self.COMPONENT_NAMES:
190 raise ValueError(f'"{component}" is not a valid component')
191 self.required_components.update(
192 {component: True for component in components}
193 )
194 return self
196 def validate(self, uri: "uri.URIReference") -> None:
197 """Check a URI for conditions specified on this validator.
199 .. versionadded:: 1.0
201 :param uri:
202 Parsed URI to validate.
203 :type uri:
204 rfc3986.uri.URIReference
205 :raises MissingComponentError:
206 When a required component is missing.
207 :raises UnpermittedComponentError:
208 When a component is not one of those allowed.
209 :raises PasswordForbidden:
210 When a password is present in the userinfo component but is
211 not permitted by configuration.
212 :raises InvalidComponentsError:
213 When a component was found to be invalid.
214 """
215 if not self.allow_password:
216 check_password(uri)
218 required_components = [
219 component
220 for component, required in self.required_components.items()
221 if required
222 ]
223 validated_components = [
224 component
225 for component, required in self.validated_components.items()
226 if required
227 ]
228 if required_components:
229 ensure_required_components_exist(uri, required_components)
230 if validated_components:
231 ensure_components_are_valid(uri, validated_components)
233 ensure_one_of(self.allowed_schemes, uri, "scheme")
234 ensure_one_of(self.allowed_hosts, uri, "host")
235 ensure_one_of(self.allowed_ports, uri, "port")
238def check_password(uri: "uri.URIReference") -> None:
239 """Assert that there is no password present in the uri."""
240 userinfo = uri.userinfo
241 if not userinfo:
242 return
243 credentials = userinfo.split(":", 1)
244 if len(credentials) <= 1:
245 return
246 raise exceptions.PasswordForbidden(uri)
249def ensure_one_of(
250 allowed_values: t.Collection[object],
251 uri: "uri.URIReference",
252 attribute: str,
253) -> None:
254 """Assert that the uri's attribute is one of the allowed values."""
255 value = getattr(uri, attribute)
256 if value is not None and allowed_values and value not in allowed_values:
257 raise exceptions.UnpermittedComponentError(
258 attribute,
259 value,
260 allowed_values,
261 )
264def ensure_required_components_exist(
265 uri: "uri.URIReference",
266 required_components: t.Iterable[str],
267) -> None:
268 """Assert that all required components are present in the URI."""
269 missing_components = sorted(
270 component
271 for component in required_components
272 if getattr(uri, component) is None
273 )
274 if missing_components:
275 raise exceptions.MissingComponentError(uri, *missing_components)
278def is_valid(
279 value: t.Optional[str],
280 matcher: t.Pattern[str],
281 require: bool,
282) -> bool:
283 """Determine if a value is valid based on the provided matcher.
285 :param str value:
286 Value to validate.
287 :param matcher:
288 Compiled regular expression to use to validate the value.
289 :param require:
290 Whether or not the value is required.
291 """
292 if require:
293 return value is not None and bool(matcher.match(value))
295 # require is False and value is not None
296 return value is None or bool(matcher.match(value))
299def authority_is_valid(
300 authority: t.Optional[str],
301 host: t.Optional[str] = None,
302 require: bool = False,
303) -> bool:
304 """Determine if the authority string is valid.
306 :param str authority:
307 The authority to validate.
308 :param str host:
309 (optional) The host portion of the authority to validate.
310 :param bool require:
311 (optional) Specify if authority must not be None.
312 :returns:
313 ``True`` if valid, ``False`` otherwise
314 :rtype:
315 bool
316 """
317 validated = is_valid(authority, misc.SUBAUTHORITY_MATCHER, require)
318 if validated and host is not None:
319 return host_is_valid(host, require)
320 return validated
323def host_is_valid(host: t.Optional[str], require: bool = False) -> bool:
324 """Determine if the host string is valid.
326 :param str host:
327 The host to validate.
328 :param bool require:
329 (optional) Specify if host must not be None.
330 :returns:
331 ``True`` if valid, ``False`` otherwise
332 :rtype:
333 bool
334 """
335 validated = is_valid(host, misc.HOST_MATCHER, require)
336 if validated and host is not None and misc.IPv4_MATCHER.match(host):
337 return valid_ipv4_host_address(host)
338 elif validated and host is not None and misc.IPv6_MATCHER.match(host):
339 return misc.IPv6_NO_RFC4007_MATCHER.match(host) is not None
340 return validated
343def scheme_is_valid(scheme: t.Optional[str], require: bool = False) -> bool:
344 """Determine if the scheme is valid.
346 :param str scheme:
347 The scheme string to validate.
348 :param bool require:
349 (optional) Set to ``True`` to require the presence of a scheme.
350 :returns:
351 ``True`` if the scheme is valid. ``False`` otherwise.
352 :rtype:
353 bool
354 """
355 return is_valid(scheme, misc.SCHEME_MATCHER, require)
358def path_is_valid(path: t.Optional[str], require: bool = False) -> bool:
359 """Determine if the path component is valid.
361 :param str path:
362 The path string to validate.
363 :param bool require:
364 (optional) Set to ``True`` to require the presence of a path.
365 :returns:
366 ``True`` if the path is valid. ``False`` otherwise.
367 :rtype:
368 bool
369 """
370 return is_valid(path, misc.PATH_MATCHER, require)
373def query_is_valid(query: t.Optional[str], require: bool = False) -> bool:
374 """Determine if the query component is valid.
376 :param str query:
377 The query string to validate.
378 :param bool require:
379 (optional) Set to ``True`` to require the presence of a query.
380 :returns:
381 ``True`` if the query is valid. ``False`` otherwise.
382 :rtype:
383 bool
384 """
385 return is_valid(query, misc.QUERY_MATCHER, require)
388def fragment_is_valid(
389 fragment: t.Optional[str],
390 require: bool = False,
391) -> bool:
392 """Determine if the fragment component is valid.
394 :param str fragment:
395 The fragment string to validate.
396 :param bool require:
397 (optional) Set to ``True`` to require the presence of a fragment.
398 :returns:
399 ``True`` if the fragment is valid. ``False`` otherwise.
400 :rtype:
401 bool
402 """
403 return is_valid(fragment, misc.FRAGMENT_MATCHER, require)
406def valid_ipv4_host_address(host: str) -> bool:
407 """Determine if the given host is a valid IPv4 address."""
408 # If the host exists, and it might be IPv4, check each byte in the
409 # address.
410 return all([0 <= int(byte, base=10) <= 255 for byte in host.split(".")])
413_COMPONENT_VALIDATORS = {
414 "scheme": scheme_is_valid,
415 "path": path_is_valid,
416 "query": query_is_valid,
417 "fragment": fragment_is_valid,
418}
420_SUBAUTHORITY_VALIDATORS = {"userinfo", "host", "port"}
423def subauthority_component_is_valid(
424 uri: "uri.URIReference",
425 component: str,
426) -> bool:
427 """Determine if the userinfo, host, and port are valid."""
428 try:
429 subauthority_dict = uri.authority_info()
430 except exceptions.InvalidAuthority:
431 return False
433 # If we can parse the authority into sub-components and we're not
434 # validating the port, we can assume it's valid.
435 if component == "host":
436 return host_is_valid(subauthority_dict["host"])
437 elif component != "port":
438 return True
440 port = subauthority_dict["port"]
442 if port is None:
443 return True
445 # We know it has to have fewer than 6 digits if it exists.
446 if not (port.isdigit() and len(port) < 6): # pragma: no cover
447 # This branch can only execute when this function is called directly
448 # with a URI reference manually constructed with an invalid port.
449 # Such a use case is unsupported, since this function isn't part of
450 # the public API.
451 return False
453 return 0 <= int(port) <= 65535
456def ensure_components_are_valid(
457 uri: "uri.URIReference",
458 validated_components: t.List[str],
459) -> None:
460 """Assert that all components are valid in the URI."""
461 invalid_components: set[str] = set()
462 for component in validated_components:
463 if component in _SUBAUTHORITY_VALIDATORS:
464 if not subauthority_component_is_valid(uri, component):
465 invalid_components.add(component)
466 # Python's peephole optimizer means that while this continue *is*
467 # actually executed, coverage.py cannot detect that. See also,
468 # https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered
469 continue # nocov: Python 2.7, 3.3, 3.4
471 validator = _COMPONENT_VALIDATORS[component]
472 if not validator(getattr(uri, component)):
473 invalid_components.add(component)
475 if invalid_components:
476 raise exceptions.InvalidComponentsError(uri, *invalid_components)