Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rfc3986/validators.py: 83%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

132 statements  

1# Copyright (c) 2017 Ian Stapleton Cordasco 

2# Licensed under the Apache License, Version 2.0 (the "License"); 

3# you may not use this file except in compliance with the License. 

4# You may obtain a copy of the License at 

5# 

6# http://www.apache.org/licenses/LICENSE-2.0 

7# 

8# Unless required by applicable law or agreed to in writing, software 

9# distributed under the License is distributed on an "AS IS" BASIS, 

10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

11# implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Module containing the validation logic for rfc3986.""" 

15import typing as t 

16 

17from . import exceptions 

18from . import misc 

19from . import normalizers 

20from . import uri 

21from ._typing_compat import Self as _Self 

22 

23 

24class Validator: 

25 """Object used to configure validation of all objects in rfc3986. 

26 

27 .. versionadded:: 1.0 

28 

29 Example usage:: 

30 

31 >>> from rfc3986 import api, validators 

32 >>> uri = api.uri_reference('https://github.com/') 

33 >>> validator = validators.Validator().require_presence_of( 

34 ... 'scheme', 'host', 'path', 

35 ... ).allow_schemes( 

36 ... 'http', 'https', 

37 ... ).allow_hosts( 

38 ... '127.0.0.1', 'github.com', 

39 ... ) 

40 >>> validator.validate(uri) 

41 >>> invalid_uri = rfc3986.uri_reference('imap://mail.google.com') 

42 >>> validator.validate(invalid_uri) 

43 Traceback (most recent call last): 

44 ... 

45 rfc3986.exceptions.MissingComponentError: ('path was required but 

46 missing', URIReference(scheme=u'imap', authority=u'mail.google.com', 

47 path=None, query=None, fragment=None), ['path']) 

48 

49 """ 

50 

51 COMPONENT_NAMES = frozenset( 

52 ["scheme", "userinfo", "host", "port", "path", "query", "fragment"] 

53 ) 

54 

55 def __init__(self) -> None: 

56 """Initialize our default validations.""" 

57 self.allowed_schemes: t.Set[str] = set() 

58 self.allowed_hosts: t.Set[str] = set() 

59 self.allowed_ports: t.Set[str] = set() 

60 self.allow_password: bool = True 

61 self.required_components: t.Dict[str, bool] = { 

62 "scheme": False, 

63 "userinfo": False, 

64 "host": False, 

65 "port": False, 

66 "path": False, 

67 "query": False, 

68 "fragment": False, 

69 } 

70 self.validated_components: t.Dict[str, bool] = ( 

71 self.required_components.copy() 

72 ) 

73 

74 def allow_schemes(self, *schemes: str) -> _Self: 

75 """Require the scheme to be one of the provided schemes. 

76 

77 .. versionadded:: 1.0 

78 

79 :param schemes: 

80 Schemes, without ``://`` that are allowed. 

81 :returns: 

82 The validator instance. 

83 :rtype: 

84 Validator 

85 """ 

86 for scheme in schemes: 

87 self.allowed_schemes.add(normalizers.normalize_scheme(scheme)) 

88 return self 

89 

90 def allow_hosts(self, *hosts: str) -> _Self: 

91 """Require the host to be one of the provided hosts. 

92 

93 .. versionadded:: 1.0 

94 

95 :param hosts: 

96 Hosts that are allowed. 

97 :returns: 

98 The validator instance. 

99 :rtype: 

100 Validator 

101 """ 

102 for host in hosts: 

103 self.allowed_hosts.add(normalizers.normalize_host(host)) 

104 return self 

105 

106 def allow_ports(self, *ports: str) -> _Self: 

107 """Require the port to be one of the provided ports. 

108 

109 .. versionadded:: 1.0 

110 

111 :param ports: 

112 Ports that are allowed. 

113 :returns: 

114 The validator instance. 

115 :rtype: 

116 Validator 

117 """ 

118 for port in ports: 

119 port_int = int(port, base=10) 

120 if 0 <= port_int <= 65535: 

121 self.allowed_ports.add(port) 

122 return self 

123 

124 def allow_use_of_password(self) -> _Self: 

125 """Allow passwords to be present in the URI. 

126 

127 .. versionadded:: 1.0 

128 

129 :returns: 

130 The validator instance. 

131 :rtype: 

132 Validator 

133 """ 

134 self.allow_password = True 

135 return self 

136 

137 def forbid_use_of_password(self) -> _Self: 

138 """Prevent passwords from being included in the URI. 

139 

140 .. versionadded:: 1.0 

141 

142 :returns: 

143 The validator instance. 

144 :rtype: 

145 Validator 

146 """ 

147 self.allow_password = False 

148 return self 

149 

150 def check_validity_of(self, *components: str) -> _Self: 

151 """Check the validity of the components provided. 

152 

153 This can be specified repeatedly. 

154 

155 .. versionadded:: 1.1 

156 

157 :param components: 

158 Names of components from :attr:`Validator.COMPONENT_NAMES`. 

159 :returns: 

160 The validator instance. 

161 :rtype: 

162 Validator 

163 """ 

164 components = tuple(c.lower() for c in components) 

165 for component in components: 

166 if component not in self.COMPONENT_NAMES: 

167 raise ValueError(f'"{component}" is not a valid component') 

168 self.validated_components.update( 

169 {component: True for component in components} 

170 ) 

171 return self 

172 

173 def require_presence_of(self, *components: str) -> _Self: 

174 """Require the components provided. 

175 

176 This can be specified repeatedly. 

177 

178 .. versionadded:: 1.0 

179 

180 :param components: 

181 Names of components from :attr:`Validator.COMPONENT_NAMES`. 

182 :returns: 

183 The validator instance. 

184 :rtype: 

185 Validator 

186 """ 

187 components = tuple(c.lower() for c in components) 

188 for component in components: 

189 if component not in self.COMPONENT_NAMES: 

190 raise ValueError(f'"{component}" is not a valid component') 

191 self.required_components.update( 

192 {component: True for component in components} 

193 ) 

194 return self 

195 

196 def validate(self, uri: "uri.URIReference") -> None: 

197 """Check a URI for conditions specified on this validator. 

198 

199 .. versionadded:: 1.0 

200 

201 :param uri: 

202 Parsed URI to validate. 

203 :type uri: 

204 rfc3986.uri.URIReference 

205 :raises MissingComponentError: 

206 When a required component is missing. 

207 :raises UnpermittedComponentError: 

208 When a component is not one of those allowed. 

209 :raises PasswordForbidden: 

210 When a password is present in the userinfo component but is 

211 not permitted by configuration. 

212 :raises InvalidComponentsError: 

213 When a component was found to be invalid. 

214 """ 

215 if not self.allow_password: 

216 check_password(uri) 

217 

218 required_components = [ 

219 component 

220 for component, required in self.required_components.items() 

221 if required 

222 ] 

223 validated_components = [ 

224 component 

225 for component, required in self.validated_components.items() 

226 if required 

227 ] 

228 if required_components: 

229 ensure_required_components_exist(uri, required_components) 

230 if validated_components: 

231 ensure_components_are_valid(uri, validated_components) 

232 

233 ensure_one_of(self.allowed_schemes, uri, "scheme") 

234 ensure_one_of(self.allowed_hosts, uri, "host") 

235 ensure_one_of(self.allowed_ports, uri, "port") 

236 

237 

238def check_password(uri: "uri.URIReference") -> None: 

239 """Assert that there is no password present in the uri.""" 

240 userinfo = uri.userinfo 

241 if not userinfo: 

242 return 

243 credentials = userinfo.split(":", 1) 

244 if len(credentials) <= 1: 

245 return 

246 raise exceptions.PasswordForbidden(uri) 

247 

248 

249def ensure_one_of( 

250 allowed_values: t.Collection[object], 

251 uri: "uri.URIReference", 

252 attribute: str, 

253) -> None: 

254 """Assert that the uri's attribute is one of the allowed values.""" 

255 value = getattr(uri, attribute) 

256 if value is not None and allowed_values and value not in allowed_values: 

257 raise exceptions.UnpermittedComponentError( 

258 attribute, 

259 value, 

260 allowed_values, 

261 ) 

262 

263 

264def ensure_required_components_exist( 

265 uri: "uri.URIReference", 

266 required_components: t.Iterable[str], 

267) -> None: 

268 """Assert that all required components are present in the URI.""" 

269 missing_components = sorted( 

270 component 

271 for component in required_components 

272 if getattr(uri, component) is None 

273 ) 

274 if missing_components: 

275 raise exceptions.MissingComponentError(uri, *missing_components) 

276 

277 

278def is_valid( 

279 value: t.Optional[str], 

280 matcher: t.Pattern[str], 

281 require: bool, 

282) -> bool: 

283 """Determine if a value is valid based on the provided matcher. 

284 

285 :param str value: 

286 Value to validate. 

287 :param matcher: 

288 Compiled regular expression to use to validate the value. 

289 :param require: 

290 Whether or not the value is required. 

291 """ 

292 if require: 

293 return value is not None and bool(matcher.match(value)) 

294 

295 # require is False and value is not None 

296 return value is None or bool(matcher.match(value)) 

297 

298 

299def authority_is_valid( 

300 authority: t.Optional[str], 

301 host: t.Optional[str] = None, 

302 require: bool = False, 

303) -> bool: 

304 """Determine if the authority string is valid. 

305 

306 :param str authority: 

307 The authority to validate. 

308 :param str host: 

309 (optional) The host portion of the authority to validate. 

310 :param bool require: 

311 (optional) Specify if authority must not be None. 

312 :returns: 

313 ``True`` if valid, ``False`` otherwise 

314 :rtype: 

315 bool 

316 """ 

317 validated = is_valid(authority, misc.SUBAUTHORITY_MATCHER, require) 

318 if validated and host is not None: 

319 return host_is_valid(host, require) 

320 return validated 

321 

322 

323def host_is_valid(host: t.Optional[str], require: bool = False) -> bool: 

324 """Determine if the host string is valid. 

325 

326 :param str host: 

327 The host to validate. 

328 :param bool require: 

329 (optional) Specify if host must not be None. 

330 :returns: 

331 ``True`` if valid, ``False`` otherwise 

332 :rtype: 

333 bool 

334 """ 

335 validated = is_valid(host, misc.HOST_MATCHER, require) 

336 if validated and host is not None and misc.IPv4_MATCHER.match(host): 

337 return valid_ipv4_host_address(host) 

338 elif validated and host is not None and misc.IPv6_MATCHER.match(host): 

339 return misc.IPv6_NO_RFC4007_MATCHER.match(host) is not None 

340 return validated 

341 

342 

343def scheme_is_valid(scheme: t.Optional[str], require: bool = False) -> bool: 

344 """Determine if the scheme is valid. 

345 

346 :param str scheme: 

347 The scheme string to validate. 

348 :param bool require: 

349 (optional) Set to ``True`` to require the presence of a scheme. 

350 :returns: 

351 ``True`` if the scheme is valid. ``False`` otherwise. 

352 :rtype: 

353 bool 

354 """ 

355 return is_valid(scheme, misc.SCHEME_MATCHER, require) 

356 

357 

358def path_is_valid(path: t.Optional[str], require: bool = False) -> bool: 

359 """Determine if the path component is valid. 

360 

361 :param str path: 

362 The path string to validate. 

363 :param bool require: 

364 (optional) Set to ``True`` to require the presence of a path. 

365 :returns: 

366 ``True`` if the path is valid. ``False`` otherwise. 

367 :rtype: 

368 bool 

369 """ 

370 return is_valid(path, misc.PATH_MATCHER, require) 

371 

372 

373def query_is_valid(query: t.Optional[str], require: bool = False) -> bool: 

374 """Determine if the query component is valid. 

375 

376 :param str query: 

377 The query string to validate. 

378 :param bool require: 

379 (optional) Set to ``True`` to require the presence of a query. 

380 :returns: 

381 ``True`` if the query is valid. ``False`` otherwise. 

382 :rtype: 

383 bool 

384 """ 

385 return is_valid(query, misc.QUERY_MATCHER, require) 

386 

387 

388def fragment_is_valid( 

389 fragment: t.Optional[str], 

390 require: bool = False, 

391) -> bool: 

392 """Determine if the fragment component is valid. 

393 

394 :param str fragment: 

395 The fragment string to validate. 

396 :param bool require: 

397 (optional) Set to ``True`` to require the presence of a fragment. 

398 :returns: 

399 ``True`` if the fragment is valid. ``False`` otherwise. 

400 :rtype: 

401 bool 

402 """ 

403 return is_valid(fragment, misc.FRAGMENT_MATCHER, require) 

404 

405 

406def valid_ipv4_host_address(host: str) -> bool: 

407 """Determine if the given host is a valid IPv4 address.""" 

408 # If the host exists, and it might be IPv4, check each byte in the 

409 # address. 

410 return all([0 <= int(byte, base=10) <= 255 for byte in host.split(".")]) 

411 

412 

413_COMPONENT_VALIDATORS = { 

414 "scheme": scheme_is_valid, 

415 "path": path_is_valid, 

416 "query": query_is_valid, 

417 "fragment": fragment_is_valid, 

418} 

419 

420_SUBAUTHORITY_VALIDATORS = {"userinfo", "host", "port"} 

421 

422 

423def subauthority_component_is_valid( 

424 uri: "uri.URIReference", 

425 component: str, 

426) -> bool: 

427 """Determine if the userinfo, host, and port are valid.""" 

428 try: 

429 subauthority_dict = uri.authority_info() 

430 except exceptions.InvalidAuthority: 

431 return False 

432 

433 # If we can parse the authority into sub-components and we're not 

434 # validating the port, we can assume it's valid. 

435 if component == "host": 

436 return host_is_valid(subauthority_dict["host"]) 

437 elif component != "port": 

438 return True 

439 

440 port = subauthority_dict["port"] 

441 

442 if port is None: 

443 return True 

444 

445 # We know it has to have fewer than 6 digits if it exists. 

446 if not (port.isdigit() and len(port) < 6): # pragma: no cover 

447 # This branch can only execute when this function is called directly 

448 # with a URI reference manually constructed with an invalid port. 

449 # Such a use case is unsupported, since this function isn't part of 

450 # the public API. 

451 return False 

452 

453 return 0 <= int(port) <= 65535 

454 

455 

456def ensure_components_are_valid( 

457 uri: "uri.URIReference", 

458 validated_components: t.List[str], 

459) -> None: 

460 """Assert that all components are valid in the URI.""" 

461 invalid_components: set[str] = set() 

462 for component in validated_components: 

463 if component in _SUBAUTHORITY_VALIDATORS: 

464 if not subauthority_component_is_valid(uri, component): 

465 invalid_components.add(component) 

466 # Python's peephole optimizer means that while this continue *is* 

467 # actually executed, coverage.py cannot detect that. See also, 

468 # https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered 

469 continue # nocov: Python 2.7, 3.3, 3.4 

470 

471 validator = _COMPONENT_VALIDATORS[component] 

472 if not validator(getattr(uri, component)): 

473 invalid_components.add(component) 

474 

475 if invalid_components: 

476 raise exceptions.InvalidComponentsError(uri, *invalid_components)