Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/util/ssl_.py: 31%

177 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:20 +0000

1from __future__ import annotations 

2 

3import hmac 

4import os 

5import socket 

6import sys 

7import typing 

8import warnings 

9from binascii import unhexlify 

10from hashlib import md5, sha1, sha256 

11 

12from ..exceptions import ProxySchemeUnsupported, SSLError 

13from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE 

14 

15SSLContext = None 

16SSLTransport = None 

17HAS_NEVER_CHECK_COMMON_NAME = False 

18IS_PYOPENSSL = False 

19IS_SECURETRANSPORT = False 

20ALPN_PROTOCOLS = ["http/1.1"] 

21 

22_TYPE_VERSION_INFO = typing.Tuple[int, int, int, str, int] 

23 

24# Maps the length of a digest to a possible hash function producing this digest 

25HASHFUNC_MAP = {32: md5, 40: sha1, 64: sha256} 

26 

27 

28def _is_bpo_43522_fixed( 

29 implementation_name: str, 

30 version_info: _TYPE_VERSION_INFO, 

31 pypy_version_info: _TYPE_VERSION_INFO | None, 

32) -> bool: 

33 """Return True for CPython 3.8.9+, 3.9.3+ or 3.10+ and PyPy 7.3.8+ where 

34 setting SSLContext.hostname_checks_common_name to False works. 

35 

36 Outside of CPython and PyPy we don't know which implementations work 

37 or not so we conservatively use our hostname matching as we know that works 

38 on all implementations. 

39 

40 https://github.com/urllib3/urllib3/issues/2192#issuecomment-821832963 

41 https://foss.heptapod.net/pypy/pypy/-/issues/3539 

42 """ 

43 if implementation_name == "pypy": 

44 # https://foss.heptapod.net/pypy/pypy/-/issues/3129 

45 return pypy_version_info >= (7, 3, 8) and version_info >= (3, 8) # type: ignore[operator] 

46 elif implementation_name == "cpython": 

47 major_minor = version_info[:2] 

48 micro = version_info[2] 

49 return ( 

50 (major_minor == (3, 8) and micro >= 9) 

51 or (major_minor == (3, 9) and micro >= 3) 

52 or major_minor >= (3, 10) 

53 ) 

54 else: # Defensive: 

55 return False 

56 

57 

58def _is_has_never_check_common_name_reliable( 

59 openssl_version: str, 

60 openssl_version_number: int, 

61 implementation_name: str, 

62 version_info: _TYPE_VERSION_INFO, 

63 pypy_version_info: _TYPE_VERSION_INFO | None, 

64) -> bool: 

65 # As of May 2023, all released versions of LibreSSL fail to reject certificates with 

66 # only common names, see https://github.com/urllib3/urllib3/pull/3024 

67 is_openssl = openssl_version.startswith("OpenSSL ") 

68 # Before fixing OpenSSL issue #14579, the SSL_new() API was not copying hostflags 

69 # like X509_CHECK_FLAG_NEVER_CHECK_SUBJECT, which tripped up CPython. 

70 # https://github.com/openssl/openssl/issues/14579 

71 # This was released in OpenSSL 1.1.1l+ (>=0x101010cf) 

72 is_openssl_issue_14579_fixed = openssl_version_number >= 0x101010CF 

73 

74 return is_openssl and ( 

75 is_openssl_issue_14579_fixed 

76 or _is_bpo_43522_fixed(implementation_name, version_info, pypy_version_info) 

77 ) 

78 

79 

80if typing.TYPE_CHECKING: 

81 from ssl import VerifyMode 

82 

83 from typing_extensions import Literal, TypedDict 

84 

85 from .ssltransport import SSLTransport as SSLTransportType 

86 

87 class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False): 

88 subjectAltName: tuple[tuple[str, str], ...] 

89 subject: tuple[tuple[tuple[str, str], ...], ...] 

90 serialNumber: str 

91 

92 

93# Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X' 

94_SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {} 

95 

96try: # Do we have ssl at all? 

97 import ssl 

98 from ssl import ( # type: ignore[assignment] 

99 CERT_REQUIRED, 

100 HAS_NEVER_CHECK_COMMON_NAME, 

101 OP_NO_COMPRESSION, 

102 OP_NO_TICKET, 

103 OPENSSL_VERSION, 

104 OPENSSL_VERSION_NUMBER, 

105 PROTOCOL_TLS, 

106 PROTOCOL_TLS_CLIENT, 

107 OP_NO_SSLv2, 

108 OP_NO_SSLv3, 

109 SSLContext, 

110 TLSVersion, 

111 ) 

112 

113 PROTOCOL_SSLv23 = PROTOCOL_TLS 

114 

115 # Setting SSLContext.hostname_checks_common_name = False didn't work before CPython 

116 # 3.8.9, 3.9.3, and 3.10 (but OK on PyPy) or OpenSSL 1.1.1l+ 

117 if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable( 

118 OPENSSL_VERSION, 

119 OPENSSL_VERSION_NUMBER, 

120 sys.implementation.name, 

121 sys.version_info, 

122 sys.pypy_version_info if sys.implementation.name == "pypy" else None, # type: ignore[attr-defined] 

123 ): 

124 HAS_NEVER_CHECK_COMMON_NAME = False 

125 

126 # Need to be careful here in case old TLS versions get 

127 # removed in future 'ssl' module implementations. 

128 for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"): 

129 try: 

130 _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr( 

131 TLSVersion, attr 

132 ) 

133 except AttributeError: # Defensive: 

134 continue 

135 

136 from .ssltransport import SSLTransport # type: ignore[assignment] 

137except ImportError: 

138 OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment] 

139 OP_NO_TICKET = 0x4000 # type: ignore[assignment] 

140 OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment] 

141 OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment] 

142 PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment] 

143 PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment] 

144 

145 

146_TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None] 

147 

148 

149def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None: 

150 """ 

151 Checks if given fingerprint matches the supplied certificate. 

152 

153 :param cert: 

154 Certificate as bytes object. 

155 :param fingerprint: 

156 Fingerprint as string of hexdigits, can be interspersed by colons. 

157 """ 

158 

159 if cert is None: 

160 raise SSLError("No certificate for the peer.") 

161 

162 fingerprint = fingerprint.replace(":", "").lower() 

163 digest_length = len(fingerprint) 

164 hashfunc = HASHFUNC_MAP.get(digest_length) 

165 if not hashfunc: 

166 raise SSLError(f"Fingerprint of invalid length: {fingerprint}") 

167 

168 # We need encode() here for py32; works on py2 and p33. 

169 fingerprint_bytes = unhexlify(fingerprint.encode()) 

170 

171 cert_digest = hashfunc(cert).digest() 

172 

173 if not hmac.compare_digest(cert_digest, fingerprint_bytes): 

174 raise SSLError( 

175 f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"' 

176 ) 

177 

178 

179def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode: 

180 """ 

181 Resolves the argument to a numeric constant, which can be passed to 

182 the wrap_socket function/method from the ssl module. 

183 Defaults to :data:`ssl.CERT_REQUIRED`. 

184 If given a string it is assumed to be the name of the constant in the 

185 :mod:`ssl` module or its abbreviation. 

186 (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. 

187 If it's neither `None` nor a string we assume it is already the numeric 

188 constant which can directly be passed to wrap_socket. 

189 """ 

190 if candidate is None: 

191 return CERT_REQUIRED 

192 

193 if isinstance(candidate, str): 

194 res = getattr(ssl, candidate, None) 

195 if res is None: 

196 res = getattr(ssl, "CERT_" + candidate) 

197 return res # type: ignore[no-any-return] 

198 

199 return candidate # type: ignore[return-value] 

200 

201 

202def resolve_ssl_version(candidate: None | int | str) -> int: 

203 """ 

204 like resolve_cert_reqs 

205 """ 

206 if candidate is None: 

207 return PROTOCOL_TLS 

208 

209 if isinstance(candidate, str): 

210 res = getattr(ssl, candidate, None) 

211 if res is None: 

212 res = getattr(ssl, "PROTOCOL_" + candidate) 

213 return typing.cast(int, res) 

214 

215 return candidate 

216 

217 

218def create_urllib3_context( 

219 ssl_version: int | None = None, 

220 cert_reqs: int | None = None, 

221 options: int | None = None, 

222 ciphers: str | None = None, 

223 ssl_minimum_version: int | None = None, 

224 ssl_maximum_version: int | None = None, 

225) -> ssl.SSLContext: 

226 """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3. 

227 

228 :param ssl_version: 

229 The desired protocol version to use. This will default to 

230 PROTOCOL_SSLv23 which will negotiate the highest protocol that both 

231 the server and your installation of OpenSSL support. 

232 

233 This parameter is deprecated instead use 'ssl_minimum_version'. 

234 :param ssl_minimum_version: 

235 The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. 

236 :param ssl_maximum_version: 

237 The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. 

238 Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the 

239 default value. 

240 :param cert_reqs: 

241 Whether to require the certificate verification. This defaults to 

242 ``ssl.CERT_REQUIRED``. 

243 :param options: 

244 Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``, 

245 ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``. 

246 :param ciphers: 

247 Which cipher suites to allow the server to select. Defaults to either system configured 

248 ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers. 

249 :returns: 

250 Constructed SSLContext object with specified options 

251 :rtype: SSLContext 

252 """ 

253 if SSLContext is None: 

254 raise TypeError("Can't create an SSLContext object without an ssl module") 

255 

256 # This means 'ssl_version' was specified as an exact value. 

257 if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT): 

258 # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version' 

259 # to avoid conflicts. 

260 if ssl_minimum_version is not None or ssl_maximum_version is not None: 

261 raise ValueError( 

262 "Can't specify both 'ssl_version' and either " 

263 "'ssl_minimum_version' or 'ssl_maximum_version'" 

264 ) 

265 

266 # 'ssl_version' is deprecated and will be removed in the future. 

267 else: 

268 # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead. 

269 ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get( 

270 ssl_version, TLSVersion.MINIMUM_SUPPORTED 

271 ) 

272 ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get( 

273 ssl_version, TLSVersion.MAXIMUM_SUPPORTED 

274 ) 

275 

276 # This warning message is pushing users to use 'ssl_minimum_version' 

277 # instead of both min/max. Best practice is to only set the minimum version and 

278 # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED' 

279 warnings.warn( 

280 "'ssl_version' option is deprecated and will be " 

281 "removed in urllib3 v2.1.0. Instead use 'ssl_minimum_version'", 

282 category=DeprecationWarning, 

283 stacklevel=2, 

284 ) 

285 

286 # PROTOCOL_TLS is deprecated in Python 3.10 so we always use PROTOCOL_TLS_CLIENT 

287 context = SSLContext(PROTOCOL_TLS_CLIENT) 

288 

289 if ssl_minimum_version is not None: 

290 context.minimum_version = ssl_minimum_version 

291 else: # Python <3.10 defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here 

292 context.minimum_version = TLSVersion.TLSv1_2 

293 

294 if ssl_maximum_version is not None: 

295 context.maximum_version = ssl_maximum_version 

296 

297 # Unless we're given ciphers defer to either system ciphers in 

298 # the case of OpenSSL 1.1.1+ or use our own secure default ciphers. 

299 if ciphers: 

300 context.set_ciphers(ciphers) 

301 

302 # Setting the default here, as we may have no ssl module on import 

303 cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs 

304 

305 if options is None: 

306 options = 0 

307 # SSLv2 is easily broken and is considered harmful and dangerous 

308 options |= OP_NO_SSLv2 

309 # SSLv3 has several problems and is now dangerous 

310 options |= OP_NO_SSLv3 

311 # Disable compression to prevent CRIME attacks for OpenSSL 1.0+ 

312 # (issue #309) 

313 options |= OP_NO_COMPRESSION 

314 # TLSv1.2 only. Unless set explicitly, do not request tickets. 

315 # This may save some bandwidth on wire, and although the ticket is encrypted, 

316 # there is a risk associated with it being on wire, 

317 # if the server is not rotating its ticketing keys properly. 

318 options |= OP_NO_TICKET 

319 

320 context.options |= options 

321 

322 # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is 

323 # necessary for conditional client cert authentication with TLS 1.3. 

324 # The attribute is None for OpenSSL <= 1.1.0 or does not exist in older 

325 # versions of Python. We only enable on Python 3.7.4+ or if certificate 

326 # verification is enabled to work around Python issue #37428 

327 # See: https://bugs.python.org/issue37428 

328 if (cert_reqs == ssl.CERT_REQUIRED or sys.version_info >= (3, 7, 4)) and getattr( 

329 context, "post_handshake_auth", None 

330 ) is not None: 

331 context.post_handshake_auth = True 

332 

333 # The order of the below lines setting verify_mode and check_hostname 

334 # matter due to safe-guards SSLContext has to prevent an SSLContext with 

335 # check_hostname=True, verify_mode=NONE/OPTIONAL. 

336 # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own 

337 # 'ssl.match_hostname()' implementation. 

338 if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL: 

339 context.verify_mode = cert_reqs 

340 context.check_hostname = True 

341 else: 

342 context.check_hostname = False 

343 context.verify_mode = cert_reqs 

344 

345 try: 

346 context.hostname_checks_common_name = False 

347 except AttributeError: # Defensive: for CPython < 3.8.9 and 3.9.3; for PyPy < 7.3.8 

348 pass 

349 

350 # Enable logging of TLS session keys via defacto standard environment variable 

351 # 'SSLKEYLOGFILE', if the feature is available (Python 3.8+). Skip empty values. 

352 if hasattr(context, "keylog_filename"): 

353 sslkeylogfile = os.environ.get("SSLKEYLOGFILE") 

354 if sslkeylogfile: 

355 context.keylog_filename = sslkeylogfile 

356 

357 return context 

358 

359 

360@typing.overload 

361def ssl_wrap_socket( 

362 sock: socket.socket, 

363 keyfile: str | None = ..., 

364 certfile: str | None = ..., 

365 cert_reqs: int | None = ..., 

366 ca_certs: str | None = ..., 

367 server_hostname: str | None = ..., 

368 ssl_version: int | None = ..., 

369 ciphers: str | None = ..., 

370 ssl_context: ssl.SSLContext | None = ..., 

371 ca_cert_dir: str | None = ..., 

372 key_password: str | None = ..., 

373 ca_cert_data: None | str | bytes = ..., 

374 tls_in_tls: Literal[False] = ..., 

375) -> ssl.SSLSocket: 

376 ... 

377 

378 

379@typing.overload 

380def ssl_wrap_socket( 

381 sock: socket.socket, 

382 keyfile: str | None = ..., 

383 certfile: str | None = ..., 

384 cert_reqs: int | None = ..., 

385 ca_certs: str | None = ..., 

386 server_hostname: str | None = ..., 

387 ssl_version: int | None = ..., 

388 ciphers: str | None = ..., 

389 ssl_context: ssl.SSLContext | None = ..., 

390 ca_cert_dir: str | None = ..., 

391 key_password: str | None = ..., 

392 ca_cert_data: None | str | bytes = ..., 

393 tls_in_tls: bool = ..., 

394) -> ssl.SSLSocket | SSLTransportType: 

395 ... 

396 

397 

398def ssl_wrap_socket( 

399 sock: socket.socket, 

400 keyfile: str | None = None, 

401 certfile: str | None = None, 

402 cert_reqs: int | None = None, 

403 ca_certs: str | None = None, 

404 server_hostname: str | None = None, 

405 ssl_version: int | None = None, 

406 ciphers: str | None = None, 

407 ssl_context: ssl.SSLContext | None = None, 

408 ca_cert_dir: str | None = None, 

409 key_password: str | None = None, 

410 ca_cert_data: None | str | bytes = None, 

411 tls_in_tls: bool = False, 

412) -> ssl.SSLSocket | SSLTransportType: 

413 """ 

414 All arguments except for server_hostname, ssl_context, and ca_cert_dir have 

415 the same meaning as they do when using :func:`ssl.wrap_socket`. 

416 

417 :param server_hostname: 

418 When SNI is supported, the expected hostname of the certificate 

419 :param ssl_context: 

420 A pre-made :class:`SSLContext` object. If none is provided, one will 

421 be created using :func:`create_urllib3_context`. 

422 :param ciphers: 

423 A string of ciphers we wish the client to support. 

424 :param ca_cert_dir: 

425 A directory containing CA certificates in multiple separate files, as 

426 supported by OpenSSL's -CApath flag or the capath argument to 

427 SSLContext.load_verify_locations(). 

428 :param key_password: 

429 Optional password if the keyfile is encrypted. 

430 :param ca_cert_data: 

431 Optional string containing CA certificates in PEM format suitable for 

432 passing as the cadata parameter to SSLContext.load_verify_locations() 

433 :param tls_in_tls: 

434 Use SSLTransport to wrap the existing socket. 

435 """ 

436 context = ssl_context 

437 if context is None: 

438 # Note: This branch of code and all the variables in it are only used in tests. 

439 # We should consider deprecating and removing this code. 

440 context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers) 

441 

442 if ca_certs or ca_cert_dir or ca_cert_data: 

443 try: 

444 context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data) 

445 except OSError as e: 

446 raise SSLError(e) from e 

447 

448 elif ssl_context is None and hasattr(context, "load_default_certs"): 

449 # try to load OS default certs; works well on Windows. 

450 context.load_default_certs() 

451 

452 # Attempt to detect if we get the goofy behavior of the 

453 # keyfile being encrypted and OpenSSL asking for the 

454 # passphrase via the terminal and instead error out. 

455 if keyfile and key_password is None and _is_key_file_encrypted(keyfile): 

456 raise SSLError("Client private key is encrypted, password is required") 

457 

458 if certfile: 

459 if key_password is None: 

460 context.load_cert_chain(certfile, keyfile) 

461 else: 

462 context.load_cert_chain(certfile, keyfile, key_password) 

463 

464 try: 

465 context.set_alpn_protocols(ALPN_PROTOCOLS) 

466 except NotImplementedError: # Defensive: in CI, we always have set_alpn_protocols 

467 pass 

468 

469 ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname) 

470 return ssl_sock 

471 

472 

473def is_ipaddress(hostname: str | bytes) -> bool: 

474 """Detects whether the hostname given is an IPv4 or IPv6 address. 

475 Also detects IPv6 addresses with Zone IDs. 

476 

477 :param str hostname: Hostname to examine. 

478 :return: True if the hostname is an IP address, False otherwise. 

479 """ 

480 if isinstance(hostname, bytes): 

481 # IDN A-label bytes are ASCII compatible. 

482 hostname = hostname.decode("ascii") 

483 return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname)) 

484 

485 

486def _is_key_file_encrypted(key_file: str) -> bool: 

487 """Detects if a key file is encrypted or not.""" 

488 with open(key_file) as f: 

489 for line in f: 

490 # Look for Proc-Type: 4,ENCRYPTED 

491 if "ENCRYPTED" in line: 

492 return True 

493 

494 return False 

495 

496 

497def _ssl_wrap_socket_impl( 

498 sock: socket.socket, 

499 ssl_context: ssl.SSLContext, 

500 tls_in_tls: bool, 

501 server_hostname: str | None = None, 

502) -> ssl.SSLSocket | SSLTransportType: 

503 if tls_in_tls: 

504 if not SSLTransport: 

505 # Import error, ssl is not available. 

506 raise ProxySchemeUnsupported( 

507 "TLS in TLS requires support for the 'ssl' module" 

508 ) 

509 

510 SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context) 

511 return SSLTransport(sock, ssl_context, server_hostname) 

512 

513 return ssl_context.wrap_socket(sock, server_hostname=server_hostname)