Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rfc3986/validators.py: 83%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

132 statements  

1# Copyright (c) 2017 Ian Stapleton Cordasco 

2# Licensed under the Apache License, Version 2.0 (the "License"); 

3# you may not use this file except in compliance with the License. 

4# You may obtain a copy of the License at 

5# 

6# http://www.apache.org/licenses/LICENSE-2.0 

7# 

8# Unless required by applicable law or agreed to in writing, software 

9# distributed under the License is distributed on an "AS IS" BASIS, 

10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

11# implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Module containing the validation logic for rfc3986.""" 

15 

16import typing as t 

17 

18from . import exceptions 

19from . import misc 

20from . import normalizers 

21from . import uri 

22from ._typing_compat import Self as _Self 

23 

24 

25class Validator: 

26 """Object used to configure validation of all objects in rfc3986. 

27 

28 .. versionadded:: 1.0 

29 

30 Example usage:: 

31 

32 >>> from rfc3986 import api, validators 

33 >>> uri = api.uri_reference('https://github.com/') 

34 >>> validator = validators.Validator().require_presence_of( 

35 ... 'scheme', 'host', 'path', 

36 ... ).allow_schemes( 

37 ... 'http', 'https', 

38 ... ).allow_hosts( 

39 ... '127.0.0.1', 'github.com', 

40 ... ) 

41 >>> validator.validate(uri) 

42 >>> invalid_uri = rfc3986.uri_reference('imap://mail.google.com') 

43 >>> validator.validate(invalid_uri) 

44 Traceback (most recent call last): 

45 ... 

46 rfc3986.exceptions.MissingComponentError: ('path was required but 

47 missing', URIReference(scheme=u'imap', authority=u'mail.google.com', 

48 path=None, query=None, fragment=None), ['path']) 

49 

50 """ 

51 

52 COMPONENT_NAMES = frozenset( 

53 ["scheme", "userinfo", "host", "port", "path", "query", "fragment"] 

54 ) 

55 

56 def __init__(self) -> None: 

57 """Initialize our default validations.""" 

58 self.allowed_schemes: t.Set[str] = set() 

59 self.allowed_hosts: t.Set[str] = set() 

60 self.allowed_ports: t.Set[str] = set() 

61 self.allow_password: bool = True 

62 self.required_components: t.Dict[str, bool] = { 

63 "scheme": False, 

64 "userinfo": False, 

65 "host": False, 

66 "port": False, 

67 "path": False, 

68 "query": False, 

69 "fragment": False, 

70 } 

71 self.validated_components: t.Dict[str, bool] = ( 

72 self.required_components.copy() 

73 ) 

74 

75 def allow_schemes(self, *schemes: str) -> _Self: 

76 """Require the scheme to be one of the provided schemes. 

77 

78 .. versionadded:: 1.0 

79 

80 :param schemes: 

81 Schemes, without ``://`` that are allowed. 

82 :returns: 

83 The validator instance. 

84 :rtype: 

85 Validator 

86 """ 

87 for scheme in schemes: 

88 self.allowed_schemes.add(normalizers.normalize_scheme(scheme)) 

89 return self 

90 

91 def allow_hosts(self, *hosts: str) -> _Self: 

92 """Require the host to be one of the provided hosts. 

93 

94 .. versionadded:: 1.0 

95 

96 :param hosts: 

97 Hosts that are allowed. 

98 :returns: 

99 The validator instance. 

100 :rtype: 

101 Validator 

102 """ 

103 for host in hosts: 

104 self.allowed_hosts.add(normalizers.normalize_host(host)) 

105 return self 

106 

107 def allow_ports(self, *ports: str) -> _Self: 

108 """Require the port to be one of the provided ports. 

109 

110 .. versionadded:: 1.0 

111 

112 :param ports: 

113 Ports that are allowed. 

114 :returns: 

115 The validator instance. 

116 :rtype: 

117 Validator 

118 """ 

119 for port in ports: 

120 port_int = int(port, base=10) 

121 if 0 <= port_int <= 65535: 

122 self.allowed_ports.add(port) 

123 return self 

124 

125 def allow_use_of_password(self) -> _Self: 

126 """Allow passwords to be present in the URI. 

127 

128 .. versionadded:: 1.0 

129 

130 :returns: 

131 The validator instance. 

132 :rtype: 

133 Validator 

134 """ 

135 self.allow_password = True 

136 return self 

137 

138 def forbid_use_of_password(self) -> _Self: 

139 """Prevent passwords from being included in the URI. 

140 

141 .. versionadded:: 1.0 

142 

143 :returns: 

144 The validator instance. 

145 :rtype: 

146 Validator 

147 """ 

148 self.allow_password = False 

149 return self 

150 

151 def check_validity_of(self, *components: str) -> _Self: 

152 """Check the validity of the components provided. 

153 

154 This can be specified repeatedly. 

155 

156 .. versionadded:: 1.1 

157 

158 :param components: 

159 Names of components from :attr:`Validator.COMPONENT_NAMES`. 

160 :returns: 

161 The validator instance. 

162 :rtype: 

163 Validator 

164 """ 

165 components = tuple(c.lower() for c in components) 

166 for component in components: 

167 if component not in self.COMPONENT_NAMES: 

168 raise ValueError(f'"{component}" is not a valid component') 

169 self.validated_components.update( 

170 {component: True for component in components} 

171 ) 

172 return self 

173 

174 def require_presence_of(self, *components: str) -> _Self: 

175 """Require the components provided. 

176 

177 This can be specified repeatedly. 

178 

179 .. versionadded:: 1.0 

180 

181 :param components: 

182 Names of components from :attr:`Validator.COMPONENT_NAMES`. 

183 :returns: 

184 The validator instance. 

185 :rtype: 

186 Validator 

187 """ 

188 components = tuple(c.lower() for c in components) 

189 for component in components: 

190 if component not in self.COMPONENT_NAMES: 

191 raise ValueError(f'"{component}" is not a valid component') 

192 self.required_components.update( 

193 {component: True for component in components} 

194 ) 

195 return self 

196 

197 def validate(self, uri: "uri.URIReference") -> None: 

198 """Check a URI for conditions specified on this validator. 

199 

200 .. versionadded:: 1.0 

201 

202 :param uri: 

203 Parsed URI to validate. 

204 :type uri: 

205 rfc3986.uri.URIReference 

206 :raises MissingComponentError: 

207 When a required component is missing. 

208 :raises UnpermittedComponentError: 

209 When a component is not one of those allowed. 

210 :raises PasswordForbidden: 

211 When a password is present in the userinfo component but is 

212 not permitted by configuration. 

213 :raises InvalidComponentsError: 

214 When a component was found to be invalid. 

215 """ 

216 if not self.allow_password: 

217 check_password(uri) 

218 

219 required_components = [ 

220 component 

221 for component, required in self.required_components.items() 

222 if required 

223 ] 

224 validated_components = [ 

225 component 

226 for component, required in self.validated_components.items() 

227 if required 

228 ] 

229 if required_components: 

230 ensure_required_components_exist(uri, required_components) 

231 if validated_components: 

232 ensure_components_are_valid(uri, validated_components) 

233 

234 ensure_one_of(self.allowed_schemes, uri, "scheme") 

235 ensure_one_of(self.allowed_hosts, uri, "host") 

236 ensure_one_of(self.allowed_ports, uri, "port") 

237 

238 

239def check_password(uri: "uri.URIReference") -> None: 

240 """Assert that there is no password present in the uri.""" 

241 userinfo = uri.userinfo 

242 if not userinfo: 

243 return 

244 credentials = userinfo.split(":", 1) 

245 if len(credentials) <= 1: 

246 return 

247 raise exceptions.PasswordForbidden(uri) 

248 

249 

250def ensure_one_of( 

251 allowed_values: t.Collection[object], 

252 uri: "uri.URIReference", 

253 attribute: str, 

254) -> None: 

255 """Assert that the uri's attribute is one of the allowed values.""" 

256 value = getattr(uri, attribute) 

257 if value is not None and allowed_values and value not in allowed_values: 

258 raise exceptions.UnpermittedComponentError( 

259 attribute, 

260 value, 

261 allowed_values, 

262 ) 

263 

264 

265def ensure_required_components_exist( 

266 uri: "uri.URIReference", 

267 required_components: t.Iterable[str], 

268) -> None: 

269 """Assert that all required components are present in the URI.""" 

270 missing_components = sorted( 

271 component 

272 for component in required_components 

273 if getattr(uri, component) is None 

274 ) 

275 if missing_components: 

276 raise exceptions.MissingComponentError(uri, *missing_components) 

277 

278 

279def is_valid( 

280 value: t.Optional[str], 

281 matcher: t.Pattern[str], 

282 require: bool, 

283) -> bool: 

284 """Determine if a value is valid based on the provided matcher. 

285 

286 :param str value: 

287 Value to validate. 

288 :param matcher: 

289 Compiled regular expression to use to validate the value. 

290 :param require: 

291 Whether or not the value is required. 

292 """ 

293 if require: 

294 return value is not None and bool(matcher.match(value)) 

295 

296 # require is False and value is not None 

297 return value is None or bool(matcher.match(value)) 

298 

299 

300def authority_is_valid( 

301 authority: t.Optional[str], 

302 host: t.Optional[str] = None, 

303 require: bool = False, 

304) -> bool: 

305 """Determine if the authority string is valid. 

306 

307 :param str authority: 

308 The authority to validate. 

309 :param str host: 

310 (optional) The host portion of the authority to validate. 

311 :param bool require: 

312 (optional) Specify if authority must not be None. 

313 :returns: 

314 ``True`` if valid, ``False`` otherwise 

315 :rtype: 

316 bool 

317 """ 

318 validated = is_valid(authority, misc.SUBAUTHORITY_MATCHER, require) 

319 if validated and host is not None: 

320 return host_is_valid(host, require) 

321 return validated 

322 

323 

324def host_is_valid(host: t.Optional[str], require: bool = False) -> bool: 

325 """Determine if the host string is valid. 

326 

327 :param str host: 

328 The host to validate. 

329 :param bool require: 

330 (optional) Specify if host must not be None. 

331 :returns: 

332 ``True`` if valid, ``False`` otherwise 

333 :rtype: 

334 bool 

335 """ 

336 validated = is_valid(host, misc.HOST_MATCHER, require) 

337 if validated and host is not None and misc.IPv4_MATCHER.match(host): 

338 return valid_ipv4_host_address(host) 

339 elif validated and host is not None and misc.IPv6_MATCHER.match(host): 

340 return misc.IPv6_NO_RFC4007_MATCHER.match(host) is not None 

341 return validated 

342 

343 

344def scheme_is_valid(scheme: t.Optional[str], require: bool = False) -> bool: 

345 """Determine if the scheme is valid. 

346 

347 :param str scheme: 

348 The scheme string to validate. 

349 :param bool require: 

350 (optional) Set to ``True`` to require the presence of a scheme. 

351 :returns: 

352 ``True`` if the scheme is valid. ``False`` otherwise. 

353 :rtype: 

354 bool 

355 """ 

356 return is_valid(scheme, misc.SCHEME_MATCHER, require) 

357 

358 

359def path_is_valid(path: t.Optional[str], require: bool = False) -> bool: 

360 """Determine if the path component is valid. 

361 

362 :param str path: 

363 The path string to validate. 

364 :param bool require: 

365 (optional) Set to ``True`` to require the presence of a path. 

366 :returns: 

367 ``True`` if the path is valid. ``False`` otherwise. 

368 :rtype: 

369 bool 

370 """ 

371 return is_valid(path, misc.PATH_MATCHER, require) 

372 

373 

374def query_is_valid(query: t.Optional[str], require: bool = False) -> bool: 

375 """Determine if the query component is valid. 

376 

377 :param str query: 

378 The query string to validate. 

379 :param bool require: 

380 (optional) Set to ``True`` to require the presence of a query. 

381 :returns: 

382 ``True`` if the query is valid. ``False`` otherwise. 

383 :rtype: 

384 bool 

385 """ 

386 return is_valid(query, misc.QUERY_MATCHER, require) 

387 

388 

389def fragment_is_valid( 

390 fragment: t.Optional[str], 

391 require: bool = False, 

392) -> bool: 

393 """Determine if the fragment component is valid. 

394 

395 :param str fragment: 

396 The fragment string to validate. 

397 :param bool require: 

398 (optional) Set to ``True`` to require the presence of a fragment. 

399 :returns: 

400 ``True`` if the fragment is valid. ``False`` otherwise. 

401 :rtype: 

402 bool 

403 """ 

404 return is_valid(fragment, misc.FRAGMENT_MATCHER, require) 

405 

406 

407def valid_ipv4_host_address(host: str) -> bool: 

408 """Determine if the given host is a valid IPv4 address.""" 

409 # If the host exists, and it might be IPv4, check each byte in the 

410 # address. 

411 return all([0 <= int(byte, base=10) <= 255 for byte in host.split(".")]) 

412 

413 

414_COMPONENT_VALIDATORS = { 

415 "scheme": scheme_is_valid, 

416 "path": path_is_valid, 

417 "query": query_is_valid, 

418 "fragment": fragment_is_valid, 

419} 

420 

421_SUBAUTHORITY_VALIDATORS = {"userinfo", "host", "port"} 

422 

423 

424def subauthority_component_is_valid( 

425 uri: "uri.URIReference", 

426 component: str, 

427) -> bool: 

428 """Determine if the userinfo, host, and port are valid.""" 

429 try: 

430 subauthority_dict = uri.authority_info() 

431 except exceptions.InvalidAuthority: 

432 return False 

433 

434 # If we can parse the authority into sub-components and we're not 

435 # validating the port, we can assume it's valid. 

436 if component == "host": 

437 return host_is_valid(subauthority_dict["host"]) 

438 elif component != "port": 

439 return True 

440 

441 port = subauthority_dict["port"] 

442 

443 if port is None: 

444 return True 

445 

446 # We know it has to have fewer than 6 digits if it exists. 

447 if not (port.isdigit() and len(port) < 6): # pragma: no cover 

448 # This branch can only execute when this function is called directly 

449 # with a URI reference manually constructed with an invalid port. 

450 # Such a use case is unsupported, since this function isn't part of 

451 # the public API. 

452 return False 

453 

454 return 0 <= int(port) <= 65535 

455 

456 

457def ensure_components_are_valid( 

458 uri: "uri.URIReference", 

459 validated_components: t.List[str], 

460) -> None: 

461 """Assert that all components are valid in the URI.""" 

462 invalid_components: set[str] = set() 

463 for component in validated_components: 

464 if component in _SUBAUTHORITY_VALIDATORS: 

465 if not subauthority_component_is_valid(uri, component): 

466 invalid_components.add(component) 

467 # Python's peephole optimizer means that while this continue *is* 

468 # actually executed, coverage.py cannot detect that. See also, 

469 # https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered 

470 continue # nocov: Python 2.7, 3.3, 3.4 

471 

472 validator = _COMPONENT_VALIDATORS[component] 

473 if not validator(getattr(uri, component)): 

474 invalid_components.add(component) 

475 

476 if invalid_components: 

477 raise exceptions.InvalidComponentsError(uri, *invalid_components)