Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/util/ssl_.py: 30%

174 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1from __future__ import annotations 

2 

3import hmac 

4import os 

5import socket 

6import sys 

7import typing 

8import warnings 

9from binascii import unhexlify 

10from hashlib import md5, sha1, sha256 

11 

12from ..exceptions import ProxySchemeUnsupported, SSLError 

13from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE 

14 

15SSLContext = None 

16SSLTransport = None 

17HAS_NEVER_CHECK_COMMON_NAME = False 

18IS_PYOPENSSL = False 

19IS_SECURETRANSPORT = False 

20ALPN_PROTOCOLS = ["http/1.1"] 

21 

22_TYPE_VERSION_INFO = typing.Tuple[int, int, int, str, int] 

23 

24# Maps the length of a digest to a possible hash function producing this digest 

25HASHFUNC_MAP = {32: md5, 40: sha1, 64: sha256} 

26 

27 

28def _is_bpo_43522_fixed( 

29 implementation_name: str, version_info: _TYPE_VERSION_INFO 

30) -> bool: 

31 """Return True for CPython 3.8.9+, 3.9.3+ or 3.10+ where setting 

32 SSLContext.hostname_checks_common_name to False works. 

33 

34 PyPy 7.3.7 doesn't work as it doesn't ship with OpenSSL 1.1.1l+ 

35 so we're waiting for a version of PyPy that works before 

36 allowing this function to return 'True'. 

37 

38 Outside of CPython and PyPy we don't know which implementations work 

39 or not so we conservatively use our hostname matching as we know that works 

40 on all implementations. 

41 

42 https://github.com/urllib3/urllib3/issues/2192#issuecomment-821832963 

43 https://foss.heptapod.net/pypy/pypy/-/issues/3539# 

44 """ 

45 if implementation_name != "cpython": 

46 return False 

47 

48 major_minor = version_info[:2] 

49 micro = version_info[2] 

50 return ( 

51 (major_minor == (3, 8) and micro >= 9) 

52 or (major_minor == (3, 9) and micro >= 3) 

53 or major_minor >= (3, 10) 

54 ) 

55 

56 

57def _is_has_never_check_common_name_reliable( 

58 openssl_version_number: int, 

59 implementation_name: str, 

60 version_info: _TYPE_VERSION_INFO, 

61) -> bool: 

62 # Before fixing OpenSSL issue #14579, the SSL_new() API was not copying hostflags 

63 # like X509_CHECK_FLAG_NEVER_CHECK_SUBJECT, which tripped up CPython. 

64 # https://github.com/openssl/openssl/issues/14579 

65 # This was released in OpenSSL 1.1.1l+ (>=0x101010cf) 

66 is_openssl_issue_14579_fixed = openssl_version_number >= 0x101010CF 

67 

68 return is_openssl_issue_14579_fixed or _is_bpo_43522_fixed( 

69 implementation_name, version_info 

70 ) 

71 

72 

73if typing.TYPE_CHECKING: 

74 from ssl import VerifyMode 

75 

76 from typing_extensions import Literal, TypedDict 

77 

78 from .ssltransport import SSLTransport as SSLTransportType 

79 

80 class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False): 

81 subjectAltName: tuple[tuple[str, str], ...] 

82 subject: tuple[tuple[tuple[str, str], ...], ...] 

83 serialNumber: str 

84 

85 

86# Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X' 

87_SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {} 

88 

89try: # Do we have ssl at all? 

90 import ssl 

91 from ssl import ( # type: ignore[assignment] 

92 CERT_REQUIRED, 

93 HAS_NEVER_CHECK_COMMON_NAME, 

94 OP_NO_COMPRESSION, 

95 OP_NO_TICKET, 

96 OPENSSL_VERSION_NUMBER, 

97 PROTOCOL_TLS, 

98 PROTOCOL_TLS_CLIENT, 

99 OP_NO_SSLv2, 

100 OP_NO_SSLv3, 

101 SSLContext, 

102 TLSVersion, 

103 ) 

104 

105 PROTOCOL_SSLv23 = PROTOCOL_TLS 

106 

107 # Setting SSLContext.hostname_checks_common_name = False didn't work before CPython 

108 # 3.8.9, 3.9.3, and 3.10 (but OK on PyPy) or OpenSSL 1.1.1l+ 

109 if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable( 

110 OPENSSL_VERSION_NUMBER, 

111 sys.implementation.name, 

112 sys.version_info, 

113 ): 

114 HAS_NEVER_CHECK_COMMON_NAME = False 

115 

116 # Need to be careful here in case old TLS versions get 

117 # removed in future 'ssl' module implementations. 

118 for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"): 

119 try: 

120 _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr( 

121 TLSVersion, attr 

122 ) 

123 except AttributeError: # Defensive: 

124 continue 

125 

126 from .ssltransport import SSLTransport # type: ignore[assignment] 

127except ImportError: 

128 OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment] 

129 OP_NO_TICKET = 0x4000 # type: ignore[assignment] 

130 OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment] 

131 OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment] 

132 PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment] 

133 PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment] 

134 

135 

136_TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None] 

137 

138 

139def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None: 

140 """ 

141 Checks if given fingerprint matches the supplied certificate. 

142 

143 :param cert: 

144 Certificate as bytes object. 

145 :param fingerprint: 

146 Fingerprint as string of hexdigits, can be interspersed by colons. 

147 """ 

148 

149 if cert is None: 

150 raise SSLError("No certificate for the peer.") 

151 

152 fingerprint = fingerprint.replace(":", "").lower() 

153 digest_length = len(fingerprint) 

154 hashfunc = HASHFUNC_MAP.get(digest_length) 

155 if not hashfunc: 

156 raise SSLError(f"Fingerprint of invalid length: {fingerprint}") 

157 

158 # We need encode() here for py32; works on py2 and p33. 

159 fingerprint_bytes = unhexlify(fingerprint.encode()) 

160 

161 cert_digest = hashfunc(cert).digest() 

162 

163 if not hmac.compare_digest(cert_digest, fingerprint_bytes): 

164 raise SSLError( 

165 f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"' 

166 ) 

167 

168 

169def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode: 

170 """ 

171 Resolves the argument to a numeric constant, which can be passed to 

172 the wrap_socket function/method from the ssl module. 

173 Defaults to :data:`ssl.CERT_REQUIRED`. 

174 If given a string it is assumed to be the name of the constant in the 

175 :mod:`ssl` module or its abbreviation. 

176 (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. 

177 If it's neither `None` nor a string we assume it is already the numeric 

178 constant which can directly be passed to wrap_socket. 

179 """ 

180 if candidate is None: 

181 return CERT_REQUIRED 

182 

183 if isinstance(candidate, str): 

184 res = getattr(ssl, candidate, None) 

185 if res is None: 

186 res = getattr(ssl, "CERT_" + candidate) 

187 return res # type: ignore[no-any-return] 

188 

189 return candidate # type: ignore[return-value] 

190 

191 

192def resolve_ssl_version(candidate: None | int | str) -> int: 

193 """ 

194 like resolve_cert_reqs 

195 """ 

196 if candidate is None: 

197 return PROTOCOL_TLS 

198 

199 if isinstance(candidate, str): 

200 res = getattr(ssl, candidate, None) 

201 if res is None: 

202 res = getattr(ssl, "PROTOCOL_" + candidate) 

203 return typing.cast(int, res) 

204 

205 return candidate 

206 

207 

208def create_urllib3_context( 

209 ssl_version: int | None = None, 

210 cert_reqs: int | None = None, 

211 options: int | None = None, 

212 ciphers: str | None = None, 

213 ssl_minimum_version: int | None = None, 

214 ssl_maximum_version: int | None = None, 

215) -> ssl.SSLContext: 

216 """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3. 

217 

218 :param ssl_version: 

219 The desired protocol version to use. This will default to 

220 PROTOCOL_SSLv23 which will negotiate the highest protocol that both 

221 the server and your installation of OpenSSL support. 

222 

223 This parameter is deprecated instead use 'ssl_minimum_version'. 

224 :param ssl_minimum_version: 

225 The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. 

226 :param ssl_maximum_version: 

227 The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. 

228 Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the 

229 default value. 

230 :param cert_reqs: 

231 Whether to require the certificate verification. This defaults to 

232 ``ssl.CERT_REQUIRED``. 

233 :param options: 

234 Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``, 

235 ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``. 

236 :param ciphers: 

237 Which cipher suites to allow the server to select. Defaults to either system configured 

238 ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers. 

239 :returns: 

240 Constructed SSLContext object with specified options 

241 :rtype: SSLContext 

242 """ 

243 if SSLContext is None: 

244 raise TypeError("Can't create an SSLContext object without an ssl module") 

245 

246 # This means 'ssl_version' was specified as an exact value. 

247 if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT): 

248 # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version' 

249 # to avoid conflicts. 

250 if ssl_minimum_version is not None or ssl_maximum_version is not None: 

251 raise ValueError( 

252 "Can't specify both 'ssl_version' and either " 

253 "'ssl_minimum_version' or 'ssl_maximum_version'" 

254 ) 

255 

256 # 'ssl_version' is deprecated and will be removed in the future. 

257 else: 

258 # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead. 

259 ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get( 

260 ssl_version, TLSVersion.MINIMUM_SUPPORTED 

261 ) 

262 ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get( 

263 ssl_version, TLSVersion.MAXIMUM_SUPPORTED 

264 ) 

265 

266 # This warning message is pushing users to use 'ssl_minimum_version' 

267 # instead of both min/max. Best practice is to only set the minimum version and 

268 # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED' 

269 warnings.warn( 

270 "'ssl_version' option is deprecated and will be " 

271 "removed in urllib3 v2.1.0. Instead use 'ssl_minimum_version'", 

272 category=DeprecationWarning, 

273 stacklevel=2, 

274 ) 

275 

276 # PROTOCOL_TLS is deprecated in Python 3.10 so we always use PROTOCOL_TLS_CLIENT 

277 context = SSLContext(PROTOCOL_TLS_CLIENT) 

278 

279 if ssl_minimum_version is not None: 

280 context.minimum_version = ssl_minimum_version 

281 else: # Python <3.10 defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here 

282 context.minimum_version = TLSVersion.TLSv1_2 

283 

284 if ssl_maximum_version is not None: 

285 context.maximum_version = ssl_maximum_version 

286 

287 # Unless we're given ciphers defer to either system ciphers in 

288 # the case of OpenSSL 1.1.1+ or use our own secure default ciphers. 

289 if ciphers: 

290 context.set_ciphers(ciphers) 

291 

292 # Setting the default here, as we may have no ssl module on import 

293 cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs 

294 

295 if options is None: 

296 options = 0 

297 # SSLv2 is easily broken and is considered harmful and dangerous 

298 options |= OP_NO_SSLv2 

299 # SSLv3 has several problems and is now dangerous 

300 options |= OP_NO_SSLv3 

301 # Disable compression to prevent CRIME attacks for OpenSSL 1.0+ 

302 # (issue #309) 

303 options |= OP_NO_COMPRESSION 

304 # TLSv1.2 only. Unless set explicitly, do not request tickets. 

305 # This may save some bandwidth on wire, and although the ticket is encrypted, 

306 # there is a risk associated with it being on wire, 

307 # if the server is not rotating its ticketing keys properly. 

308 options |= OP_NO_TICKET 

309 

310 context.options |= options 

311 

312 # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is 

313 # necessary for conditional client cert authentication with TLS 1.3. 

314 # The attribute is None for OpenSSL <= 1.1.0 or does not exist in older 

315 # versions of Python. We only enable on Python 3.7.4+ or if certificate 

316 # verification is enabled to work around Python issue #37428 

317 # See: https://bugs.python.org/issue37428 

318 if (cert_reqs == ssl.CERT_REQUIRED or sys.version_info >= (3, 7, 4)) and getattr( 

319 context, "post_handshake_auth", None 

320 ) is not None: 

321 context.post_handshake_auth = True 

322 

323 # The order of the below lines setting verify_mode and check_hostname 

324 # matter due to safe-guards SSLContext has to prevent an SSLContext with 

325 # check_hostname=True, verify_mode=NONE/OPTIONAL. 

326 # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own 

327 # 'ssl.match_hostname()' implementation. 

328 if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL: 

329 context.verify_mode = cert_reqs 

330 context.check_hostname = True 

331 else: 

332 context.check_hostname = False 

333 context.verify_mode = cert_reqs 

334 

335 try: 

336 context.hostname_checks_common_name = False 

337 except AttributeError: 

338 pass 

339 

340 # Enable logging of TLS session keys via defacto standard environment variable 

341 # 'SSLKEYLOGFILE', if the feature is available (Python 3.8+). Skip empty values. 

342 if hasattr(context, "keylog_filename"): 

343 sslkeylogfile = os.environ.get("SSLKEYLOGFILE") 

344 if sslkeylogfile: 

345 context.keylog_filename = sslkeylogfile 

346 

347 return context 

348 

349 

350@typing.overload 

351def ssl_wrap_socket( 

352 sock: socket.socket, 

353 keyfile: str | None = ..., 

354 certfile: str | None = ..., 

355 cert_reqs: int | None = ..., 

356 ca_certs: str | None = ..., 

357 server_hostname: str | None = ..., 

358 ssl_version: int | None = ..., 

359 ciphers: str | None = ..., 

360 ssl_context: ssl.SSLContext | None = ..., 

361 ca_cert_dir: str | None = ..., 

362 key_password: str | None = ..., 

363 ca_cert_data: None | str | bytes = ..., 

364 tls_in_tls: Literal[False] = ..., 

365) -> ssl.SSLSocket: 

366 ... 

367 

368 

369@typing.overload 

370def ssl_wrap_socket( 

371 sock: socket.socket, 

372 keyfile: str | None = ..., 

373 certfile: str | None = ..., 

374 cert_reqs: int | None = ..., 

375 ca_certs: str | None = ..., 

376 server_hostname: str | None = ..., 

377 ssl_version: int | None = ..., 

378 ciphers: str | None = ..., 

379 ssl_context: ssl.SSLContext | None = ..., 

380 ca_cert_dir: str | None = ..., 

381 key_password: str | None = ..., 

382 ca_cert_data: None | str | bytes = ..., 

383 tls_in_tls: bool = ..., 

384) -> ssl.SSLSocket | SSLTransportType: 

385 ... 

386 

387 

388def ssl_wrap_socket( 

389 sock: socket.socket, 

390 keyfile: str | None = None, 

391 certfile: str | None = None, 

392 cert_reqs: int | None = None, 

393 ca_certs: str | None = None, 

394 server_hostname: str | None = None, 

395 ssl_version: int | None = None, 

396 ciphers: str | None = None, 

397 ssl_context: ssl.SSLContext | None = None, 

398 ca_cert_dir: str | None = None, 

399 key_password: str | None = None, 

400 ca_cert_data: None | str | bytes = None, 

401 tls_in_tls: bool = False, 

402) -> ssl.SSLSocket | SSLTransportType: 

403 """ 

404 All arguments except for server_hostname, ssl_context, and ca_cert_dir have 

405 the same meaning as they do when using :func:`ssl.wrap_socket`. 

406 

407 :param server_hostname: 

408 When SNI is supported, the expected hostname of the certificate 

409 :param ssl_context: 

410 A pre-made :class:`SSLContext` object. If none is provided, one will 

411 be created using :func:`create_urllib3_context`. 

412 :param ciphers: 

413 A string of ciphers we wish the client to support. 

414 :param ca_cert_dir: 

415 A directory containing CA certificates in multiple separate files, as 

416 supported by OpenSSL's -CApath flag or the capath argument to 

417 SSLContext.load_verify_locations(). 

418 :param key_password: 

419 Optional password if the keyfile is encrypted. 

420 :param ca_cert_data: 

421 Optional string containing CA certificates in PEM format suitable for 

422 passing as the cadata parameter to SSLContext.load_verify_locations() 

423 :param tls_in_tls: 

424 Use SSLTransport to wrap the existing socket. 

425 """ 

426 context = ssl_context 

427 if context is None: 

428 # Note: This branch of code and all the variables in it are only used in tests. 

429 # We should consider deprecating and removing this code. 

430 context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers) 

431 

432 if ca_certs or ca_cert_dir or ca_cert_data: 

433 try: 

434 context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data) 

435 except OSError as e: 

436 raise SSLError(e) from e 

437 

438 elif ssl_context is None and hasattr(context, "load_default_certs"): 

439 # try to load OS default certs; works well on Windows. 

440 context.load_default_certs() 

441 

442 # Attempt to detect if we get the goofy behavior of the 

443 # keyfile being encrypted and OpenSSL asking for the 

444 # passphrase via the terminal and instead error out. 

445 if keyfile and key_password is None and _is_key_file_encrypted(keyfile): 

446 raise SSLError("Client private key is encrypted, password is required") 

447 

448 if certfile: 

449 if key_password is None: 

450 context.load_cert_chain(certfile, keyfile) 

451 else: 

452 context.load_cert_chain(certfile, keyfile, key_password) 

453 

454 try: 

455 context.set_alpn_protocols(ALPN_PROTOCOLS) 

456 except NotImplementedError: # Defensive: in CI, we always have set_alpn_protocols 

457 pass 

458 

459 ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname) 

460 return ssl_sock 

461 

462 

463def is_ipaddress(hostname: str | bytes) -> bool: 

464 """Detects whether the hostname given is an IPv4 or IPv6 address. 

465 Also detects IPv6 addresses with Zone IDs. 

466 

467 :param str hostname: Hostname to examine. 

468 :return: True if the hostname is an IP address, False otherwise. 

469 """ 

470 if isinstance(hostname, bytes): 

471 # IDN A-label bytes are ASCII compatible. 

472 hostname = hostname.decode("ascii") 

473 return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname)) 

474 

475 

476def _is_key_file_encrypted(key_file: str) -> bool: 

477 """Detects if a key file is encrypted or not.""" 

478 with open(key_file) as f: 

479 for line in f: 

480 # Look for Proc-Type: 4,ENCRYPTED 

481 if "ENCRYPTED" in line: 

482 return True 

483 

484 return False 

485 

486 

487def _ssl_wrap_socket_impl( 

488 sock: socket.socket, 

489 ssl_context: ssl.SSLContext, 

490 tls_in_tls: bool, 

491 server_hostname: str | None = None, 

492) -> ssl.SSLSocket | SSLTransportType: 

493 if tls_in_tls: 

494 if not SSLTransport: 

495 # Import error, ssl is not available. 

496 raise ProxySchemeUnsupported( 

497 "TLS in TLS requires support for the 'ssl' module" 

498 ) 

499 

500 SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context) 

501 return SSLTransport(sock, ssl_context, server_hostname) 

502 

503 return ssl_context.wrap_socket(sock, server_hostname=server_hostname)