1# Copyright (c) 2017 Ian Stapleton Cordasco
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
11# implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module containing the validation logic for rfc3986."""
15import typing as t
16
17from . import exceptions
18from . import misc
19from . import normalizers
20from . import uri
21from ._typing_compat import Self as _Self
22
23
24class Validator:
25 """Object used to configure validation of all objects in rfc3986.
26
27 .. versionadded:: 1.0
28
29 Example usage::
30
31 >>> from rfc3986 import api, validators
32 >>> uri = api.uri_reference('https://github.com/')
33 >>> validator = validators.Validator().require_presence_of(
34 ... 'scheme', 'host', 'path',
35 ... ).allow_schemes(
36 ... 'http', 'https',
37 ... ).allow_hosts(
38 ... '127.0.0.1', 'github.com',
39 ... )
40 >>> validator.validate(uri)
41 >>> invalid_uri = rfc3986.uri_reference('imap://mail.google.com')
42 >>> validator.validate(invalid_uri)
43 Traceback (most recent call last):
44 ...
45 rfc3986.exceptions.MissingComponentError: ('path was required but
46 missing', URIReference(scheme=u'imap', authority=u'mail.google.com',
47 path=None, query=None, fragment=None), ['path'])
48
49 """
50
51 COMPONENT_NAMES = frozenset(
52 ["scheme", "userinfo", "host", "port", "path", "query", "fragment"]
53 )
54
55 def __init__(self) -> None:
56 """Initialize our default validations."""
57 self.allowed_schemes: t.Set[str] = set()
58 self.allowed_hosts: t.Set[str] = set()
59 self.allowed_ports: t.Set[str] = set()
60 self.allow_password: bool = True
61 self.required_components: t.Dict[str, bool] = {
62 "scheme": False,
63 "userinfo": False,
64 "host": False,
65 "port": False,
66 "path": False,
67 "query": False,
68 "fragment": False,
69 }
70 self.validated_components: t.Dict[str, bool] = (
71 self.required_components.copy()
72 )
73
74 def allow_schemes(self, *schemes: str) -> _Self:
75 """Require the scheme to be one of the provided schemes.
76
77 .. versionadded:: 1.0
78
79 :param schemes:
80 Schemes, without ``://`` that are allowed.
81 :returns:
82 The validator instance.
83 :rtype:
84 Validator
85 """
86 for scheme in schemes:
87 self.allowed_schemes.add(normalizers.normalize_scheme(scheme))
88 return self
89
90 def allow_hosts(self, *hosts: str) -> _Self:
91 """Require the host to be one of the provided hosts.
92
93 .. versionadded:: 1.0
94
95 :param hosts:
96 Hosts that are allowed.
97 :returns:
98 The validator instance.
99 :rtype:
100 Validator
101 """
102 for host in hosts:
103 self.allowed_hosts.add(normalizers.normalize_host(host))
104 return self
105
106 def allow_ports(self, *ports: str) -> _Self:
107 """Require the port to be one of the provided ports.
108
109 .. versionadded:: 1.0
110
111 :param ports:
112 Ports that are allowed.
113 :returns:
114 The validator instance.
115 :rtype:
116 Validator
117 """
118 for port in ports:
119 port_int = int(port, base=10)
120 if 0 <= port_int <= 65535:
121 self.allowed_ports.add(port)
122 return self
123
124 def allow_use_of_password(self) -> _Self:
125 """Allow passwords to be present in the URI.
126
127 .. versionadded:: 1.0
128
129 :returns:
130 The validator instance.
131 :rtype:
132 Validator
133 """
134 self.allow_password = True
135 return self
136
137 def forbid_use_of_password(self) -> _Self:
138 """Prevent passwords from being included in the URI.
139
140 .. versionadded:: 1.0
141
142 :returns:
143 The validator instance.
144 :rtype:
145 Validator
146 """
147 self.allow_password = False
148 return self
149
150 def check_validity_of(self, *components: str) -> _Self:
151 """Check the validity of the components provided.
152
153 This can be specified repeatedly.
154
155 .. versionadded:: 1.1
156
157 :param components:
158 Names of components from :attr:`Validator.COMPONENT_NAMES`.
159 :returns:
160 The validator instance.
161 :rtype:
162 Validator
163 """
164 components = tuple(c.lower() for c in components)
165 for component in components:
166 if component not in self.COMPONENT_NAMES:
167 raise ValueError(f'"{component}" is not a valid component')
168 self.validated_components.update(
169 {component: True for component in components}
170 )
171 return self
172
173 def require_presence_of(self, *components: str) -> _Self:
174 """Require the components provided.
175
176 This can be specified repeatedly.
177
178 .. versionadded:: 1.0
179
180 :param components:
181 Names of components from :attr:`Validator.COMPONENT_NAMES`.
182 :returns:
183 The validator instance.
184 :rtype:
185 Validator
186 """
187 components = tuple(c.lower() for c in components)
188 for component in components:
189 if component not in self.COMPONENT_NAMES:
190 raise ValueError(f'"{component}" is not a valid component')
191 self.required_components.update(
192 {component: True for component in components}
193 )
194 return self
195
196 def validate(self, uri: "uri.URIReference") -> None:
197 """Check a URI for conditions specified on this validator.
198
199 .. versionadded:: 1.0
200
201 :param uri:
202 Parsed URI to validate.
203 :type uri:
204 rfc3986.uri.URIReference
205 :raises MissingComponentError:
206 When a required component is missing.
207 :raises UnpermittedComponentError:
208 When a component is not one of those allowed.
209 :raises PasswordForbidden:
210 When a password is present in the userinfo component but is
211 not permitted by configuration.
212 :raises InvalidComponentsError:
213 When a component was found to be invalid.
214 """
215 if not self.allow_password:
216 check_password(uri)
217
218 required_components = [
219 component
220 for component, required in self.required_components.items()
221 if required
222 ]
223 validated_components = [
224 component
225 for component, required in self.validated_components.items()
226 if required
227 ]
228 if required_components:
229 ensure_required_components_exist(uri, required_components)
230 if validated_components:
231 ensure_components_are_valid(uri, validated_components)
232
233 ensure_one_of(self.allowed_schemes, uri, "scheme")
234 ensure_one_of(self.allowed_hosts, uri, "host")
235 ensure_one_of(self.allowed_ports, uri, "port")
236
237
238def check_password(uri: "uri.URIReference") -> None:
239 """Assert that there is no password present in the uri."""
240 userinfo = uri.userinfo
241 if not userinfo:
242 return
243 credentials = userinfo.split(":", 1)
244 if len(credentials) <= 1:
245 return
246 raise exceptions.PasswordForbidden(uri)
247
248
249def ensure_one_of(
250 allowed_values: t.Collection[object],
251 uri: "uri.URIReference",
252 attribute: str,
253) -> None:
254 """Assert that the uri's attribute is one of the allowed values."""
255 value = getattr(uri, attribute)
256 if value is not None and allowed_values and value not in allowed_values:
257 raise exceptions.UnpermittedComponentError(
258 attribute,
259 value,
260 allowed_values,
261 )
262
263
264def ensure_required_components_exist(
265 uri: "uri.URIReference",
266 required_components: t.Iterable[str],
267) -> None:
268 """Assert that all required components are present in the URI."""
269 missing_components = sorted(
270 component
271 for component in required_components
272 if getattr(uri, component) is None
273 )
274 if missing_components:
275 raise exceptions.MissingComponentError(uri, *missing_components)
276
277
278def is_valid(
279 value: t.Optional[str],
280 matcher: t.Pattern[str],
281 require: bool,
282) -> bool:
283 """Determine if a value is valid based on the provided matcher.
284
285 :param str value:
286 Value to validate.
287 :param matcher:
288 Compiled regular expression to use to validate the value.
289 :param require:
290 Whether or not the value is required.
291 """
292 if require:
293 return value is not None and bool(matcher.match(value))
294
295 # require is False and value is not None
296 return value is None or bool(matcher.match(value))
297
298
299def authority_is_valid(
300 authority: t.Optional[str],
301 host: t.Optional[str] = None,
302 require: bool = False,
303) -> bool:
304 """Determine if the authority string is valid.
305
306 :param str authority:
307 The authority to validate.
308 :param str host:
309 (optional) The host portion of the authority to validate.
310 :param bool require:
311 (optional) Specify if authority must not be None.
312 :returns:
313 ``True`` if valid, ``False`` otherwise
314 :rtype:
315 bool
316 """
317 validated = is_valid(authority, misc.SUBAUTHORITY_MATCHER, require)
318 if validated and host is not None:
319 return host_is_valid(host, require)
320 return validated
321
322
323def host_is_valid(host: t.Optional[str], require: bool = False) -> bool:
324 """Determine if the host string is valid.
325
326 :param str host:
327 The host to validate.
328 :param bool require:
329 (optional) Specify if host must not be None.
330 :returns:
331 ``True`` if valid, ``False`` otherwise
332 :rtype:
333 bool
334 """
335 validated = is_valid(host, misc.HOST_MATCHER, require)
336 if validated and host is not None and misc.IPv4_MATCHER.match(host):
337 return valid_ipv4_host_address(host)
338 elif validated and host is not None and misc.IPv6_MATCHER.match(host):
339 return misc.IPv6_NO_RFC4007_MATCHER.match(host) is not None
340 return validated
341
342
343def scheme_is_valid(scheme: t.Optional[str], require: bool = False) -> bool:
344 """Determine if the scheme is valid.
345
346 :param str scheme:
347 The scheme string to validate.
348 :param bool require:
349 (optional) Set to ``True`` to require the presence of a scheme.
350 :returns:
351 ``True`` if the scheme is valid. ``False`` otherwise.
352 :rtype:
353 bool
354 """
355 return is_valid(scheme, misc.SCHEME_MATCHER, require)
356
357
358def path_is_valid(path: t.Optional[str], require: bool = False) -> bool:
359 """Determine if the path component is valid.
360
361 :param str path:
362 The path string to validate.
363 :param bool require:
364 (optional) Set to ``True`` to require the presence of a path.
365 :returns:
366 ``True`` if the path is valid. ``False`` otherwise.
367 :rtype:
368 bool
369 """
370 return is_valid(path, misc.PATH_MATCHER, require)
371
372
373def query_is_valid(query: t.Optional[str], require: bool = False) -> bool:
374 """Determine if the query component is valid.
375
376 :param str query:
377 The query string to validate.
378 :param bool require:
379 (optional) Set to ``True`` to require the presence of a query.
380 :returns:
381 ``True`` if the query is valid. ``False`` otherwise.
382 :rtype:
383 bool
384 """
385 return is_valid(query, misc.QUERY_MATCHER, require)
386
387
388def fragment_is_valid(
389 fragment: t.Optional[str],
390 require: bool = False,
391) -> bool:
392 """Determine if the fragment component is valid.
393
394 :param str fragment:
395 The fragment string to validate.
396 :param bool require:
397 (optional) Set to ``True`` to require the presence of a fragment.
398 :returns:
399 ``True`` if the fragment is valid. ``False`` otherwise.
400 :rtype:
401 bool
402 """
403 return is_valid(fragment, misc.FRAGMENT_MATCHER, require)
404
405
406def valid_ipv4_host_address(host: str) -> bool:
407 """Determine if the given host is a valid IPv4 address."""
408 # If the host exists, and it might be IPv4, check each byte in the
409 # address.
410 return all([0 <= int(byte, base=10) <= 255 for byte in host.split(".")])
411
412
413_COMPONENT_VALIDATORS = {
414 "scheme": scheme_is_valid,
415 "path": path_is_valid,
416 "query": query_is_valid,
417 "fragment": fragment_is_valid,
418}
419
420_SUBAUTHORITY_VALIDATORS = {"userinfo", "host", "port"}
421
422
423def subauthority_component_is_valid(
424 uri: "uri.URIReference",
425 component: str,
426) -> bool:
427 """Determine if the userinfo, host, and port are valid."""
428 try:
429 subauthority_dict = uri.authority_info()
430 except exceptions.InvalidAuthority:
431 return False
432
433 # If we can parse the authority into sub-components and we're not
434 # validating the port, we can assume it's valid.
435 if component == "host":
436 return host_is_valid(subauthority_dict["host"])
437 elif component != "port":
438 return True
439
440 port = subauthority_dict["port"]
441
442 if port is None:
443 return True
444
445 # We know it has to have fewer than 6 digits if it exists.
446 if not (port.isdigit() and len(port) < 6): # pragma: no cover
447 # This branch can only execute when this function is called directly
448 # with a URI reference manually constructed with an invalid port.
449 # Such a use case is unsupported, since this function isn't part of
450 # the public API.
451 return False
452
453 return 0 <= int(port) <= 65535
454
455
456def ensure_components_are_valid(
457 uri: "uri.URIReference",
458 validated_components: t.List[str],
459) -> None:
460 """Assert that all components are valid in the URI."""
461 invalid_components: set[str] = set()
462 for component in validated_components:
463 if component in _SUBAUTHORITY_VALIDATORS:
464 if not subauthority_component_is_valid(uri, component):
465 invalid_components.add(component)
466 # Python's peephole optimizer means that while this continue *is*
467 # actually executed, coverage.py cannot detect that. See also,
468 # https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered
469 continue # nocov: Python 2.7, 3.3, 3.4
470
471 validator = _COMPONENT_VALIDATORS[component]
472 if not validator(getattr(uri, component)):
473 invalid_components.add(component)
474
475 if invalid_components:
476 raise exceptions.InvalidComponentsError(uri, *invalid_components)