Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/rfc3986/validators.py: 23%

129 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1# -*- coding: utf-8 -*- 

2# Copyright (c) 2017 Ian Stapleton Cordasco 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Module containing the validation logic for rfc3986.""" 

16from . import exceptions 

17from . import misc 

18from . import normalizers 

19 

20 

21class Validator(object): 

22 """Object used to configure validation of all objects in rfc3986. 

23 

24 .. versionadded:: 1.0 

25 

26 Example usage:: 

27 

28 >>> from rfc3986 import api, validators 

29 >>> uri = api.uri_reference('https://github.com/') 

30 >>> validator = validators.Validator().require_presence_of( 

31 ... 'scheme', 'host', 'path', 

32 ... ).allow_schemes( 

33 ... 'http', 'https', 

34 ... ).allow_hosts( 

35 ... '127.0.0.1', 'github.com', 

36 ... ) 

37 >>> validator.validate(uri) 

38 >>> invalid_uri = rfc3986.uri_reference('imap://mail.google.com') 

39 >>> validator.validate(invalid_uri) 

40 Traceback (most recent call last): 

41 ... 

42 rfc3986.exceptions.MissingComponentError: ('path was required but 

43 missing', URIReference(scheme=u'imap', authority=u'mail.google.com', 

44 path=None, query=None, fragment=None), ['path']) 

45 

46 """ 

47 

48 COMPONENT_NAMES = frozenset( 

49 ["scheme", "userinfo", "host", "port", "path", "query", "fragment"] 

50 ) 

51 

52 def __init__(self): 

53 """Initialize our default validations.""" 

54 self.allowed_schemes = set() 

55 self.allowed_hosts = set() 

56 self.allowed_ports = set() 

57 self.allow_password = True 

58 self.required_components = { 

59 "scheme": False, 

60 "userinfo": False, 

61 "host": False, 

62 "port": False, 

63 "path": False, 

64 "query": False, 

65 "fragment": False, 

66 } 

67 self.validated_components = self.required_components.copy() 

68 

69 def allow_schemes(self, *schemes): 

70 """Require the scheme to be one of the provided schemes. 

71 

72 .. versionadded:: 1.0 

73 

74 :param schemes: 

75 Schemes, without ``://`` that are allowed. 

76 :returns: 

77 The validator instance. 

78 :rtype: 

79 Validator 

80 """ 

81 for scheme in schemes: 

82 self.allowed_schemes.add(normalizers.normalize_scheme(scheme)) 

83 return self 

84 

85 def allow_hosts(self, *hosts): 

86 """Require the host to be one of the provided hosts. 

87 

88 .. versionadded:: 1.0 

89 

90 :param hosts: 

91 Hosts that are allowed. 

92 :returns: 

93 The validator instance. 

94 :rtype: 

95 Validator 

96 """ 

97 for host in hosts: 

98 self.allowed_hosts.add(normalizers.normalize_host(host)) 

99 return self 

100 

101 def allow_ports(self, *ports): 

102 """Require the port to be one of the provided ports. 

103 

104 .. versionadded:: 1.0 

105 

106 :param ports: 

107 Ports that are allowed. 

108 :returns: 

109 The validator instance. 

110 :rtype: 

111 Validator 

112 """ 

113 for port in ports: 

114 port_int = int(port, base=10) 

115 if 0 <= port_int <= 65535: 

116 self.allowed_ports.add(port) 

117 return self 

118 

119 def allow_use_of_password(self): 

120 """Allow passwords to be present in the URI. 

121 

122 .. versionadded:: 1.0 

123 

124 :returns: 

125 The validator instance. 

126 :rtype: 

127 Validator 

128 """ 

129 self.allow_password = True 

130 return self 

131 

132 def forbid_use_of_password(self): 

133 """Prevent passwords from being included in the URI. 

134 

135 .. versionadded:: 1.0 

136 

137 :returns: 

138 The validator instance. 

139 :rtype: 

140 Validator 

141 """ 

142 self.allow_password = False 

143 return self 

144 

145 def check_validity_of(self, *components): 

146 """Check the validity of the components provided. 

147 

148 This can be specified repeatedly. 

149 

150 .. versionadded:: 1.1 

151 

152 :param components: 

153 Names of components from :attr:`Validator.COMPONENT_NAMES`. 

154 :returns: 

155 The validator instance. 

156 :rtype: 

157 Validator 

158 """ 

159 components = [c.lower() for c in components] 

160 for component in components: 

161 if component not in self.COMPONENT_NAMES: 

162 raise ValueError( 

163 '"{}" is not a valid component'.format(component) 

164 ) 

165 self.validated_components.update( 

166 {component: True for component in components} 

167 ) 

168 return self 

169 

170 def require_presence_of(self, *components): 

171 """Require the components provided. 

172 

173 This can be specified repeatedly. 

174 

175 .. versionadded:: 1.0 

176 

177 :param components: 

178 Names of components from :attr:`Validator.COMPONENT_NAMES`. 

179 :returns: 

180 The validator instance. 

181 :rtype: 

182 Validator 

183 """ 

184 components = [c.lower() for c in components] 

185 for component in components: 

186 if component not in self.COMPONENT_NAMES: 

187 raise ValueError( 

188 '"{}" is not a valid component'.format(component) 

189 ) 

190 self.required_components.update( 

191 {component: True for component in components} 

192 ) 

193 return self 

194 

195 def validate(self, uri): 

196 """Check a URI for conditions specified on this validator. 

197 

198 .. versionadded:: 1.0 

199 

200 :param uri: 

201 Parsed URI to validate. 

202 :type uri: 

203 rfc3986.uri.URIReference 

204 :raises MissingComponentError: 

205 When a required component is missing. 

206 :raises UnpermittedComponentError: 

207 When a component is not one of those allowed. 

208 :raises PasswordForbidden: 

209 When a password is present in the userinfo component but is 

210 not permitted by configuration. 

211 :raises InvalidComponentsError: 

212 When a component was found to be invalid. 

213 """ 

214 if not self.allow_password: 

215 check_password(uri) 

216 

217 required_components = [ 

218 component 

219 for component, required in self.required_components.items() 

220 if required 

221 ] 

222 validated_components = [ 

223 component 

224 for component, required in self.validated_components.items() 

225 if required 

226 ] 

227 if required_components: 

228 ensure_required_components_exist(uri, required_components) 

229 if validated_components: 

230 ensure_components_are_valid(uri, validated_components) 

231 

232 ensure_one_of(self.allowed_schemes, uri, "scheme") 

233 ensure_one_of(self.allowed_hosts, uri, "host") 

234 ensure_one_of(self.allowed_ports, uri, "port") 

235 

236 

237def check_password(uri): 

238 """Assert that there is no password present in the uri.""" 

239 userinfo = uri.userinfo 

240 if not userinfo: 

241 return 

242 credentials = userinfo.split(":", 1) 

243 if len(credentials) <= 1: 

244 return 

245 raise exceptions.PasswordForbidden(uri) 

246 

247 

248def ensure_one_of(allowed_values, uri, attribute): 

249 """Assert that the uri's attribute is one of the allowed values.""" 

250 value = getattr(uri, attribute) 

251 if value is not None and allowed_values and value not in allowed_values: 

252 raise exceptions.UnpermittedComponentError( 

253 attribute, 

254 value, 

255 allowed_values, 

256 ) 

257 

258 

259def ensure_required_components_exist(uri, required_components): 

260 """Assert that all required components are present in the URI.""" 

261 missing_components = sorted( 

262 [ 

263 component 

264 for component in required_components 

265 if getattr(uri, component) is None 

266 ] 

267 ) 

268 if missing_components: 

269 raise exceptions.MissingComponentError(uri, *missing_components) 

270 

271 

272def is_valid(value, matcher, require): 

273 """Determine if a value is valid based on the provided matcher. 

274 

275 :param str value: 

276 Value to validate. 

277 :param matcher: 

278 Compiled regular expression to use to validate the value. 

279 :param require: 

280 Whether or not the value is required. 

281 """ 

282 if require: 

283 return value is not None and matcher.match(value) 

284 

285 # require is False and value is not None 

286 return value is None or matcher.match(value) 

287 

288 

289def authority_is_valid(authority, host=None, require=False): 

290 """Determine if the authority string is valid. 

291 

292 :param str authority: 

293 The authority to validate. 

294 :param str host: 

295 (optional) The host portion of the authority to validate. 

296 :param bool require: 

297 (optional) Specify if authority must not be None. 

298 :returns: 

299 ``True`` if valid, ``False`` otherwise 

300 :rtype: 

301 bool 

302 """ 

303 validated = is_valid(authority, misc.SUBAUTHORITY_MATCHER, require) 

304 if validated and host is not None: 

305 return host_is_valid(host, require) 

306 return validated 

307 

308 

309def host_is_valid(host, require=False): 

310 """Determine if the host string is valid. 

311 

312 :param str host: 

313 The host to validate. 

314 :param bool require: 

315 (optional) Specify if host must not be None. 

316 :returns: 

317 ``True`` if valid, ``False`` otherwise 

318 :rtype: 

319 bool 

320 """ 

321 validated = is_valid(host, misc.HOST_MATCHER, require) 

322 if validated and host is not None and misc.IPv4_MATCHER.match(host): 

323 return valid_ipv4_host_address(host) 

324 elif validated and host is not None and misc.IPv6_MATCHER.match(host): 

325 return misc.IPv6_NO_RFC4007_MATCHER.match(host) is not None 

326 return validated 

327 

328 

329def scheme_is_valid(scheme, require=False): 

330 """Determine if the scheme is valid. 

331 

332 :param str scheme: 

333 The scheme string to validate. 

334 :param bool require: 

335 (optional) Set to ``True`` to require the presence of a scheme. 

336 :returns: 

337 ``True`` if the scheme is valid. ``False`` otherwise. 

338 :rtype: 

339 bool 

340 """ 

341 return is_valid(scheme, misc.SCHEME_MATCHER, require) 

342 

343 

344def path_is_valid(path, require=False): 

345 """Determine if the path component is valid. 

346 

347 :param str path: 

348 The path string to validate. 

349 :param bool require: 

350 (optional) Set to ``True`` to require the presence of a path. 

351 :returns: 

352 ``True`` if the path is valid. ``False`` otherwise. 

353 :rtype: 

354 bool 

355 """ 

356 return is_valid(path, misc.PATH_MATCHER, require) 

357 

358 

359def query_is_valid(query, require=False): 

360 """Determine if the query component is valid. 

361 

362 :param str query: 

363 The query string to validate. 

364 :param bool require: 

365 (optional) Set to ``True`` to require the presence of a query. 

366 :returns: 

367 ``True`` if the query is valid. ``False`` otherwise. 

368 :rtype: 

369 bool 

370 """ 

371 return is_valid(query, misc.QUERY_MATCHER, require) 

372 

373 

374def fragment_is_valid(fragment, require=False): 

375 """Determine if the fragment component is valid. 

376 

377 :param str fragment: 

378 The fragment string to validate. 

379 :param bool require: 

380 (optional) Set to ``True`` to require the presence of a fragment. 

381 :returns: 

382 ``True`` if the fragment is valid. ``False`` otherwise. 

383 :rtype: 

384 bool 

385 """ 

386 return is_valid(fragment, misc.FRAGMENT_MATCHER, require) 

387 

388 

389def valid_ipv4_host_address(host): 

390 """Determine if the given host is a valid IPv4 address.""" 

391 # If the host exists, and it might be IPv4, check each byte in the 

392 # address. 

393 return all([0 <= int(byte, base=10) <= 255 for byte in host.split(".")]) 

394 

395 

396_COMPONENT_VALIDATORS = { 

397 "scheme": scheme_is_valid, 

398 "path": path_is_valid, 

399 "query": query_is_valid, 

400 "fragment": fragment_is_valid, 

401} 

402 

403_SUBAUTHORITY_VALIDATORS = set(["userinfo", "host", "port"]) 

404 

405 

406def subauthority_component_is_valid(uri, component): 

407 """Determine if the userinfo, host, and port are valid.""" 

408 try: 

409 subauthority_dict = uri.authority_info() 

410 except exceptions.InvalidAuthority: 

411 return False 

412 

413 # If we can parse the authority into sub-components and we're not 

414 # validating the port, we can assume it's valid. 

415 if component == "host": 

416 return host_is_valid(subauthority_dict["host"]) 

417 elif component != "port": 

418 return True 

419 

420 try: 

421 port = int(subauthority_dict["port"]) 

422 except TypeError: 

423 # If the port wasn't provided it'll be None and int(None) raises a 

424 # TypeError 

425 return True 

426 

427 return 0 <= port <= 65535 

428 

429 

430def ensure_components_are_valid(uri, validated_components): 

431 """Assert that all components are valid in the URI.""" 

432 invalid_components = set([]) 

433 for component in validated_components: 

434 if component in _SUBAUTHORITY_VALIDATORS: 

435 if not subauthority_component_is_valid(uri, component): 

436 invalid_components.add(component) 

437 # Python's peephole optimizer means that while this continue *is* 

438 # actually executed, coverage.py cannot detect that. See also, 

439 # https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered 

440 continue # nocov: Python 2.7, 3.3, 3.4 

441 

442 validator = _COMPONENT_VALIDATORS[component] 

443 if not validator(getattr(uri, component)): 

444 invalid_components.add(component) 

445 

446 if invalid_components: 

447 raise exceptions.InvalidComponentsError(uri, *invalid_components)