Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/rfc3986/validators.py: 69%

129 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:04 +0000

1# Copyright (c) 2017 Ian Stapleton Cordasco 

2# Licensed under the Apache License, Version 2.0 (the "License"); 

3# you may not use this file except in compliance with the License. 

4# You may obtain a copy of the License at 

5# 

6# http://www.apache.org/licenses/LICENSE-2.0 

7# 

8# Unless required by applicable law or agreed to in writing, software 

9# distributed under the License is distributed on an "AS IS" BASIS, 

10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

11# implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Module containing the validation logic for rfc3986.""" 

15from . import exceptions 

16from . import misc 

17from . import normalizers 

18 

19 

20class Validator: 

21 """Object used to configure validation of all objects in rfc3986. 

22 

23 .. versionadded:: 1.0 

24 

25 Example usage:: 

26 

27 >>> from rfc3986 import api, validators 

28 >>> uri = api.uri_reference('https://github.com/') 

29 >>> validator = validators.Validator().require_presence_of( 

30 ... 'scheme', 'host', 'path', 

31 ... ).allow_schemes( 

32 ... 'http', 'https', 

33 ... ).allow_hosts( 

34 ... '127.0.0.1', 'github.com', 

35 ... ) 

36 >>> validator.validate(uri) 

37 >>> invalid_uri = rfc3986.uri_reference('imap://mail.google.com') 

38 >>> validator.validate(invalid_uri) 

39 Traceback (most recent call last): 

40 ... 

41 rfc3986.exceptions.MissingComponentError: ('path was required but 

42 missing', URIReference(scheme=u'imap', authority=u'mail.google.com', 

43 path=None, query=None, fragment=None), ['path']) 

44 

45 """ 

46 

47 COMPONENT_NAMES = frozenset( 

48 ["scheme", "userinfo", "host", "port", "path", "query", "fragment"] 

49 ) 

50 

51 def __init__(self): 

52 """Initialize our default validations.""" 

53 self.allowed_schemes = set() 

54 self.allowed_hosts = set() 

55 self.allowed_ports = set() 

56 self.allow_password = True 

57 self.required_components = { 

58 "scheme": False, 

59 "userinfo": False, 

60 "host": False, 

61 "port": False, 

62 "path": False, 

63 "query": False, 

64 "fragment": False, 

65 } 

66 self.validated_components = self.required_components.copy() 

67 

68 def allow_schemes(self, *schemes): 

69 """Require the scheme to be one of the provided schemes. 

70 

71 .. versionadded:: 1.0 

72 

73 :param schemes: 

74 Schemes, without ``://`` that are allowed. 

75 :returns: 

76 The validator instance. 

77 :rtype: 

78 Validator 

79 """ 

80 for scheme in schemes: 

81 self.allowed_schemes.add(normalizers.normalize_scheme(scheme)) 

82 return self 

83 

84 def allow_hosts(self, *hosts): 

85 """Require the host to be one of the provided hosts. 

86 

87 .. versionadded:: 1.0 

88 

89 :param hosts: 

90 Hosts that are allowed. 

91 :returns: 

92 The validator instance. 

93 :rtype: 

94 Validator 

95 """ 

96 for host in hosts: 

97 self.allowed_hosts.add(normalizers.normalize_host(host)) 

98 return self 

99 

100 def allow_ports(self, *ports): 

101 """Require the port to be one of the provided ports. 

102 

103 .. versionadded:: 1.0 

104 

105 :param ports: 

106 Ports that are allowed. 

107 :returns: 

108 The validator instance. 

109 :rtype: 

110 Validator 

111 """ 

112 for port in ports: 

113 port_int = int(port, base=10) 

114 if 0 <= port_int <= 65535: 

115 self.allowed_ports.add(port) 

116 return self 

117 

118 def allow_use_of_password(self): 

119 """Allow passwords to be present in the URI. 

120 

121 .. versionadded:: 1.0 

122 

123 :returns: 

124 The validator instance. 

125 :rtype: 

126 Validator 

127 """ 

128 self.allow_password = True 

129 return self 

130 

131 def forbid_use_of_password(self): 

132 """Prevent passwords from being included in the URI. 

133 

134 .. versionadded:: 1.0 

135 

136 :returns: 

137 The validator instance. 

138 :rtype: 

139 Validator 

140 """ 

141 self.allow_password = False 

142 return self 

143 

144 def check_validity_of(self, *components): 

145 """Check the validity of the components provided. 

146 

147 This can be specified repeatedly. 

148 

149 .. versionadded:: 1.1 

150 

151 :param components: 

152 Names of components from :attr:`Validator.COMPONENT_NAMES`. 

153 :returns: 

154 The validator instance. 

155 :rtype: 

156 Validator 

157 """ 

158 components = [c.lower() for c in components] 

159 for component in components: 

160 if component not in self.COMPONENT_NAMES: 

161 raise ValueError(f'"{component}" is not a valid component') 

162 self.validated_components.update( 

163 {component: True for component in components} 

164 ) 

165 return self 

166 

167 def require_presence_of(self, *components): 

168 """Require the components provided. 

169 

170 This can be specified repeatedly. 

171 

172 .. versionadded:: 1.0 

173 

174 :param components: 

175 Names of components from :attr:`Validator.COMPONENT_NAMES`. 

176 :returns: 

177 The validator instance. 

178 :rtype: 

179 Validator 

180 """ 

181 components = [c.lower() for c in components] 

182 for component in components: 

183 if component not in self.COMPONENT_NAMES: 

184 raise ValueError(f'"{component}" is not a valid component') 

185 self.required_components.update( 

186 {component: True for component in components} 

187 ) 

188 return self 

189 

190 def validate(self, uri): 

191 """Check a URI for conditions specified on this validator. 

192 

193 .. versionadded:: 1.0 

194 

195 :param uri: 

196 Parsed URI to validate. 

197 :type uri: 

198 rfc3986.uri.URIReference 

199 :raises MissingComponentError: 

200 When a required component is missing. 

201 :raises UnpermittedComponentError: 

202 When a component is not one of those allowed. 

203 :raises PasswordForbidden: 

204 When a password is present in the userinfo component but is 

205 not permitted by configuration. 

206 :raises InvalidComponentsError: 

207 When a component was found to be invalid. 

208 """ 

209 if not self.allow_password: 

210 check_password(uri) 

211 

212 required_components = [ 

213 component 

214 for component, required in self.required_components.items() 

215 if required 

216 ] 

217 validated_components = [ 

218 component 

219 for component, required in self.validated_components.items() 

220 if required 

221 ] 

222 if required_components: 

223 ensure_required_components_exist(uri, required_components) 

224 if validated_components: 

225 ensure_components_are_valid(uri, validated_components) 

226 

227 ensure_one_of(self.allowed_schemes, uri, "scheme") 

228 ensure_one_of(self.allowed_hosts, uri, "host") 

229 ensure_one_of(self.allowed_ports, uri, "port") 

230 

231 

232def check_password(uri): 

233 """Assert that there is no password present in the uri.""" 

234 userinfo = uri.userinfo 

235 if not userinfo: 

236 return 

237 credentials = userinfo.split(":", 1) 

238 if len(credentials) <= 1: 

239 return 

240 raise exceptions.PasswordForbidden(uri) 

241 

242 

243def ensure_one_of(allowed_values, uri, attribute): 

244 """Assert that the uri's attribute is one of the allowed values.""" 

245 value = getattr(uri, attribute) 

246 if value is not None and allowed_values and value not in allowed_values: 

247 raise exceptions.UnpermittedComponentError( 

248 attribute, 

249 value, 

250 allowed_values, 

251 ) 

252 

253 

254def ensure_required_components_exist(uri, required_components): 

255 """Assert that all required components are present in the URI.""" 

256 missing_components = sorted( 

257 component 

258 for component in required_components 

259 if getattr(uri, component) is None 

260 ) 

261 if missing_components: 

262 raise exceptions.MissingComponentError(uri, *missing_components) 

263 

264 

265def is_valid(value, matcher, require): 

266 """Determine if a value is valid based on the provided matcher. 

267 

268 :param str value: 

269 Value to validate. 

270 :param matcher: 

271 Compiled regular expression to use to validate the value. 

272 :param require: 

273 Whether or not the value is required. 

274 """ 

275 if require: 

276 return value is not None and matcher.match(value) 

277 

278 # require is False and value is not None 

279 return value is None or matcher.match(value) 

280 

281 

282def authority_is_valid(authority, host=None, require=False): 

283 """Determine if the authority string is valid. 

284 

285 :param str authority: 

286 The authority to validate. 

287 :param str host: 

288 (optional) The host portion of the authority to validate. 

289 :param bool require: 

290 (optional) Specify if authority must not be None. 

291 :returns: 

292 ``True`` if valid, ``False`` otherwise 

293 :rtype: 

294 bool 

295 """ 

296 validated = is_valid(authority, misc.SUBAUTHORITY_MATCHER, require) 

297 if validated and host is not None: 

298 return host_is_valid(host, require) 

299 return validated 

300 

301 

302def host_is_valid(host, require=False): 

303 """Determine if the host string is valid. 

304 

305 :param str host: 

306 The host to validate. 

307 :param bool require: 

308 (optional) Specify if host must not be None. 

309 :returns: 

310 ``True`` if valid, ``False`` otherwise 

311 :rtype: 

312 bool 

313 """ 

314 validated = is_valid(host, misc.HOST_MATCHER, require) 

315 if validated and host is not None and misc.IPv4_MATCHER.match(host): 

316 return valid_ipv4_host_address(host) 

317 elif validated and host is not None and misc.IPv6_MATCHER.match(host): 

318 return misc.IPv6_NO_RFC4007_MATCHER.match(host) is not None 

319 return validated 

320 

321 

322def scheme_is_valid(scheme, require=False): 

323 """Determine if the scheme is valid. 

324 

325 :param str scheme: 

326 The scheme string to validate. 

327 :param bool require: 

328 (optional) Set to ``True`` to require the presence of a scheme. 

329 :returns: 

330 ``True`` if the scheme is valid. ``False`` otherwise. 

331 :rtype: 

332 bool 

333 """ 

334 return is_valid(scheme, misc.SCHEME_MATCHER, require) 

335 

336 

337def path_is_valid(path, require=False): 

338 """Determine if the path component is valid. 

339 

340 :param str path: 

341 The path string to validate. 

342 :param bool require: 

343 (optional) Set to ``True`` to require the presence of a path. 

344 :returns: 

345 ``True`` if the path is valid. ``False`` otherwise. 

346 :rtype: 

347 bool 

348 """ 

349 return is_valid(path, misc.PATH_MATCHER, require) 

350 

351 

352def query_is_valid(query, require=False): 

353 """Determine if the query component is valid. 

354 

355 :param str query: 

356 The query string to validate. 

357 :param bool require: 

358 (optional) Set to ``True`` to require the presence of a query. 

359 :returns: 

360 ``True`` if the query is valid. ``False`` otherwise. 

361 :rtype: 

362 bool 

363 """ 

364 return is_valid(query, misc.QUERY_MATCHER, require) 

365 

366 

367def fragment_is_valid(fragment, require=False): 

368 """Determine if the fragment component is valid. 

369 

370 :param str fragment: 

371 The fragment string to validate. 

372 :param bool require: 

373 (optional) Set to ``True`` to require the presence of a fragment. 

374 :returns: 

375 ``True`` if the fragment is valid. ``False`` otherwise. 

376 :rtype: 

377 bool 

378 """ 

379 return is_valid(fragment, misc.FRAGMENT_MATCHER, require) 

380 

381 

382def valid_ipv4_host_address(host): 

383 """Determine if the given host is a valid IPv4 address.""" 

384 # If the host exists, and it might be IPv4, check each byte in the 

385 # address. 

386 return all([0 <= int(byte, base=10) <= 255 for byte in host.split(".")]) 

387 

388 

389_COMPONENT_VALIDATORS = { 

390 "scheme": scheme_is_valid, 

391 "path": path_is_valid, 

392 "query": query_is_valid, 

393 "fragment": fragment_is_valid, 

394} 

395 

396_SUBAUTHORITY_VALIDATORS = {"userinfo", "host", "port"} 

397 

398 

399def subauthority_component_is_valid(uri, component): 

400 """Determine if the userinfo, host, and port are valid.""" 

401 try: 

402 subauthority_dict = uri.authority_info() 

403 except exceptions.InvalidAuthority: 

404 return False 

405 

406 # If we can parse the authority into sub-components and we're not 

407 # validating the port, we can assume it's valid. 

408 if component == "host": 

409 return host_is_valid(subauthority_dict["host"]) 

410 elif component != "port": 

411 return True 

412 

413 try: 

414 port = int(subauthority_dict["port"]) 

415 except TypeError: 

416 # If the port wasn't provided it'll be None and int(None) raises a 

417 # TypeError 

418 return True 

419 

420 return 0 <= port <= 65535 

421 

422 

423def ensure_components_are_valid(uri, validated_components): 

424 """Assert that all components are valid in the URI.""" 

425 invalid_components = set() 

426 for component in validated_components: 

427 if component in _SUBAUTHORITY_VALIDATORS: 

428 if not subauthority_component_is_valid(uri, component): 

429 invalid_components.add(component) 

430 # Python's peephole optimizer means that while this continue *is* 

431 # actually executed, coverage.py cannot detect that. See also, 

432 # https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered 

433 continue # nocov: Python 2.7, 3.3, 3.4 

434 

435 validator = _COMPONENT_VALIDATORS[component] 

436 if not validator(getattr(uri, component)): 

437 invalid_components.add(component) 

438 

439 if invalid_components: 

440 raise exceptions.InvalidComponentsError(uri, *invalid_components)