Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/urllib3/util/ssl_.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

181 statements  

1from __future__ import annotations 

2 

3import hashlib 

4import hmac 

5import os 

6import socket 

7import sys 

8import typing 

9import warnings 

10from binascii import unhexlify 

11 

12from ..exceptions import ProxySchemeUnsupported, SSLError 

13from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE 

14 

15SSLContext = None 

16SSLTransport = None 

17HAS_NEVER_CHECK_COMMON_NAME = False 

18IS_PYOPENSSL = False 

19ALPN_PROTOCOLS = ["http/1.1"] 

20 

21_TYPE_VERSION_INFO = tuple[int, int, int, str, int] 

22 

23# Maps the length of a digest to a possible hash function producing this digest 

24HASHFUNC_MAP = { 

25 length: getattr(hashlib, algorithm, None) 

26 for length, algorithm in ((32, "md5"), (40, "sha1"), (64, "sha256")) 

27} 

28 

29 

30def _is_bpo_43522_fixed( 

31 implementation_name: str, 

32 version_info: _TYPE_VERSION_INFO, 

33 pypy_version_info: _TYPE_VERSION_INFO | None, 

34) -> bool: 

35 """Return True for CPython 3.9.3+ or 3.10+ and PyPy 7.3.8+ where 

36 setting SSLContext.hostname_checks_common_name to False works. 

37 

38 Outside of CPython and PyPy we don't know which implementations work 

39 or not so we conservatively use our hostname matching as we know that works 

40 on all implementations. 

41 

42 https://github.com/urllib3/urllib3/issues/2192#issuecomment-821832963 

43 https://foss.heptapod.net/pypy/pypy/-/issues/3539 

44 """ 

45 if implementation_name == "pypy": 

46 # https://foss.heptapod.net/pypy/pypy/-/issues/3129 

47 return pypy_version_info >= (7, 3, 8) # type: ignore[operator] 

48 elif implementation_name == "cpython": 

49 major_minor = version_info[:2] 

50 micro = version_info[2] 

51 return (major_minor == (3, 9) and micro >= 3) or major_minor >= (3, 10) 

52 else: # Defensive: 

53 return False 

54 

55 

56def _is_has_never_check_common_name_reliable( 

57 openssl_version: str, 

58 openssl_version_number: int, 

59 implementation_name: str, 

60 version_info: _TYPE_VERSION_INFO, 

61 pypy_version_info: _TYPE_VERSION_INFO | None, 

62) -> bool: 

63 # As of May 2023, all released versions of LibreSSL fail to reject certificates with 

64 # only common names, see https://github.com/urllib3/urllib3/pull/3024 

65 is_openssl = openssl_version.startswith("OpenSSL ") 

66 # Before fixing OpenSSL issue #14579, the SSL_new() API was not copying hostflags 

67 # like X509_CHECK_FLAG_NEVER_CHECK_SUBJECT, which tripped up CPython. 

68 # https://github.com/openssl/openssl/issues/14579 

69 # This was released in OpenSSL 1.1.1l+ (>=0x101010cf) 

70 is_openssl_issue_14579_fixed = openssl_version_number >= 0x101010CF 

71 

72 return is_openssl and ( 

73 is_openssl_issue_14579_fixed 

74 or _is_bpo_43522_fixed(implementation_name, version_info, pypy_version_info) 

75 ) 

76 

77 

78if typing.TYPE_CHECKING: 

79 from ssl import VerifyMode 

80 from typing import TypedDict 

81 

82 from .ssltransport import SSLTransport as SSLTransportType 

83 

84 class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False): 

85 subjectAltName: tuple[tuple[str, str], ...] 

86 subject: tuple[tuple[tuple[str, str], ...], ...] 

87 serialNumber: str 

88 

89 

90# Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X' 

91_SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {} 

92 

93try: # Do we have ssl at all? 

94 import ssl 

95 from ssl import ( # type: ignore[assignment] 

96 CERT_REQUIRED, 

97 HAS_NEVER_CHECK_COMMON_NAME, 

98 OP_NO_COMPRESSION, 

99 OP_NO_TICKET, 

100 OPENSSL_VERSION, 

101 OPENSSL_VERSION_NUMBER, 

102 PROTOCOL_TLS, 

103 PROTOCOL_TLS_CLIENT, 

104 VERIFY_X509_STRICT, 

105 OP_NO_SSLv2, 

106 OP_NO_SSLv3, 

107 SSLContext, 

108 TLSVersion, 

109 ) 

110 

111 PROTOCOL_SSLv23 = PROTOCOL_TLS 

112 

113 # Needed for Python 3.9 which does not define this 

114 VERIFY_X509_PARTIAL_CHAIN = getattr(ssl, "VERIFY_X509_PARTIAL_CHAIN", 0x80000) 

115 

116 # Setting SSLContext.hostname_checks_common_name = False didn't work before CPython 

117 # 3.9.3, and 3.10 (but OK on PyPy) or OpenSSL 1.1.1l+ 

118 if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable( 

119 OPENSSL_VERSION, 

120 OPENSSL_VERSION_NUMBER, 

121 sys.implementation.name, 

122 sys.version_info, 

123 sys.pypy_version_info if sys.implementation.name == "pypy" else None, # type: ignore[attr-defined] 

124 ): # Defensive: for Python < 3.9.3 

125 HAS_NEVER_CHECK_COMMON_NAME = False 

126 

127 # Need to be careful here in case old TLS versions get 

128 # removed in future 'ssl' module implementations. 

129 for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"): 

130 try: 

131 _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr( 

132 TLSVersion, attr 

133 ) 

134 except AttributeError: # Defensive: 

135 continue 

136 

137 from .ssltransport import SSLTransport # type: ignore[assignment] 

138except ImportError: 

139 OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment] 

140 OP_NO_TICKET = 0x4000 # type: ignore[assignment] 

141 OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment] 

142 OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment] 

143 PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment] 

144 PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment] 

145 VERIFY_X509_PARTIAL_CHAIN = 0x80000 

146 VERIFY_X509_STRICT = 0x20 # type: ignore[assignment] 

147 

148 

149_TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None] 

150 

151 

152def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None: 

153 """ 

154 Checks if given fingerprint matches the supplied certificate. 

155 

156 :param cert: 

157 Certificate as bytes object. 

158 :param fingerprint: 

159 Fingerprint as string of hexdigits, can be interspersed by colons. 

160 """ 

161 

162 if cert is None: 

163 raise SSLError("No certificate for the peer.") 

164 

165 fingerprint = fingerprint.replace(":", "").lower() 

166 digest_length = len(fingerprint) 

167 if digest_length not in HASHFUNC_MAP: 

168 raise SSLError(f"Fingerprint of invalid length: {fingerprint}") 

169 hashfunc = HASHFUNC_MAP.get(digest_length) 

170 if hashfunc is None: 

171 raise SSLError( 

172 f"Hash function implementation unavailable for fingerprint length: {digest_length}" 

173 ) 

174 

175 # We need encode() here for py32; works on py2 and p33. 

176 fingerprint_bytes = unhexlify(fingerprint.encode()) 

177 

178 cert_digest = hashfunc(cert).digest() 

179 

180 if not hmac.compare_digest(cert_digest, fingerprint_bytes): 

181 raise SSLError( 

182 f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"' 

183 ) 

184 

185 

186def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode: 

187 """ 

188 Resolves the argument to a numeric constant, which can be passed to 

189 the wrap_socket function/method from the ssl module. 

190 Defaults to :data:`ssl.CERT_REQUIRED`. 

191 If given a string it is assumed to be the name of the constant in the 

192 :mod:`ssl` module or its abbreviation. 

193 (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. 

194 If it's neither `None` nor a string we assume it is already the numeric 

195 constant which can directly be passed to wrap_socket. 

196 """ 

197 if candidate is None: 

198 return CERT_REQUIRED 

199 

200 if isinstance(candidate, str): 

201 res = getattr(ssl, candidate, None) 

202 if res is None: 

203 res = getattr(ssl, "CERT_" + candidate) 

204 return res # type: ignore[no-any-return] 

205 

206 return candidate # type: ignore[return-value] 

207 

208 

209def resolve_ssl_version(candidate: None | int | str) -> int: 

210 """ 

211 like resolve_cert_reqs 

212 """ 

213 if candidate is None: 

214 return PROTOCOL_TLS 

215 

216 if isinstance(candidate, str): 

217 res = getattr(ssl, candidate, None) 

218 if res is None: 

219 res = getattr(ssl, "PROTOCOL_" + candidate) 

220 return typing.cast(int, res) 

221 

222 return candidate 

223 

224 

225def create_urllib3_context( 

226 ssl_version: int | None = None, 

227 cert_reqs: int | None = None, 

228 options: int | None = None, 

229 ciphers: str | None = None, 

230 ssl_minimum_version: int | None = None, 

231 ssl_maximum_version: int | None = None, 

232 verify_flags: int | None = None, 

233) -> ssl.SSLContext: 

234 """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3. 

235 

236 :param ssl_version: 

237 The desired protocol version to use. This will default to 

238 PROTOCOL_SSLv23 which will negotiate the highest protocol that both 

239 the server and your installation of OpenSSL support. 

240 

241 This parameter is deprecated instead use 'ssl_minimum_version'. 

242 :param ssl_minimum_version: 

243 The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. 

244 :param ssl_maximum_version: 

245 The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. 

246 Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the 

247 default value. 

248 :param cert_reqs: 

249 Whether to require the certificate verification. This defaults to 

250 ``ssl.CERT_REQUIRED``. 

251 :param options: 

252 Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``, 

253 ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``. 

254 :param ciphers: 

255 Which cipher suites to allow the server to select. Defaults to either system configured 

256 ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers. 

257 :param verify_flags: 

258 The flags for certificate verification operations. These default to 

259 ``ssl.VERIFY_X509_PARTIAL_CHAIN`` and ``ssl.VERIFY_X509_STRICT`` for Python 3.13+. 

260 :returns: 

261 Constructed SSLContext object with specified options 

262 :rtype: SSLContext 

263 """ 

264 if SSLContext is None: 

265 raise TypeError("Can't create an SSLContext object without an ssl module") 

266 

267 # This means 'ssl_version' was specified as an exact value. 

268 if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT): 

269 # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version' 

270 # to avoid conflicts. 

271 if ssl_minimum_version is not None or ssl_maximum_version is not None: 

272 raise ValueError( 

273 "Can't specify both 'ssl_version' and either " 

274 "'ssl_minimum_version' or 'ssl_maximum_version'" 

275 ) 

276 

277 # 'ssl_version' is deprecated and will be removed in the future. 

278 else: 

279 # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead. 

280 ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get( 

281 ssl_version, TLSVersion.MINIMUM_SUPPORTED 

282 ) 

283 ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get( 

284 ssl_version, TLSVersion.MAXIMUM_SUPPORTED 

285 ) 

286 

287 # This warning message is pushing users to use 'ssl_minimum_version' 

288 # instead of both min/max. Best practice is to only set the minimum version and 

289 # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED' 

290 warnings.warn( 

291 "'ssl_version' option is deprecated and will be " 

292 "removed in urllib3 v2.6.0. Instead use 'ssl_minimum_version'", 

293 category=DeprecationWarning, 

294 stacklevel=2, 

295 ) 

296 

297 # PROTOCOL_TLS is deprecated in Python 3.10 so we always use PROTOCOL_TLS_CLIENT 

298 context = SSLContext(PROTOCOL_TLS_CLIENT) 

299 

300 if ssl_minimum_version is not None: 

301 context.minimum_version = ssl_minimum_version 

302 else: # Python <3.10 defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here 

303 context.minimum_version = TLSVersion.TLSv1_2 

304 

305 if ssl_maximum_version is not None: 

306 context.maximum_version = ssl_maximum_version 

307 

308 # Unless we're given ciphers defer to either system ciphers in 

309 # the case of OpenSSL 1.1.1+ or use our own secure default ciphers. 

310 if ciphers: 

311 context.set_ciphers(ciphers) 

312 

313 # Setting the default here, as we may have no ssl module on import 

314 cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs 

315 

316 if options is None: 

317 options = 0 

318 # SSLv2 is easily broken and is considered harmful and dangerous 

319 options |= OP_NO_SSLv2 

320 # SSLv3 has several problems and is now dangerous 

321 options |= OP_NO_SSLv3 

322 # Disable compression to prevent CRIME attacks for OpenSSL 1.0+ 

323 # (issue #309) 

324 options |= OP_NO_COMPRESSION 

325 # TLSv1.2 only. Unless set explicitly, do not request tickets. 

326 # This may save some bandwidth on wire, and although the ticket is encrypted, 

327 # there is a risk associated with it being on wire, 

328 # if the server is not rotating its ticketing keys properly. 

329 options |= OP_NO_TICKET 

330 

331 context.options |= options 

332 

333 if verify_flags is None: 

334 verify_flags = 0 

335 # In Python 3.13+ ssl.create_default_context() sets VERIFY_X509_PARTIAL_CHAIN 

336 # and VERIFY_X509_STRICT so we do the same 

337 if sys.version_info >= (3, 13): 

338 verify_flags |= VERIFY_X509_PARTIAL_CHAIN 

339 verify_flags |= VERIFY_X509_STRICT 

340 

341 context.verify_flags |= verify_flags 

342 

343 # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is 

344 # necessary for conditional client cert authentication with TLS 1.3. 

345 # The attribute is None for OpenSSL <= 1.1.0 or does not exist when using 

346 # an SSLContext created by pyOpenSSL. 

347 if getattr(context, "post_handshake_auth", None) is not None: 

348 context.post_handshake_auth = True 

349 

350 # The order of the below lines setting verify_mode and check_hostname 

351 # matter due to safe-guards SSLContext has to prevent an SSLContext with 

352 # check_hostname=True, verify_mode=NONE/OPTIONAL. 

353 # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own 

354 # 'ssl.match_hostname()' implementation. 

355 if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL: 

356 context.verify_mode = cert_reqs 

357 context.check_hostname = True 

358 else: 

359 context.check_hostname = False 

360 context.verify_mode = cert_reqs 

361 

362 try: 

363 context.hostname_checks_common_name = False 

364 except AttributeError: # Defensive: for CPython < 3.9.3; for PyPy < 7.3.8 

365 pass 

366 

367 sslkeylogfile = os.environ.get("SSLKEYLOGFILE") 

368 if sslkeylogfile: 

369 context.keylog_filename = sslkeylogfile 

370 

371 return context 

372 

373 

374@typing.overload 

375def ssl_wrap_socket( 

376 sock: socket.socket, 

377 keyfile: str | None = ..., 

378 certfile: str | None = ..., 

379 cert_reqs: int | None = ..., 

380 ca_certs: str | None = ..., 

381 server_hostname: str | None = ..., 

382 ssl_version: int | None = ..., 

383 ciphers: str | None = ..., 

384 ssl_context: ssl.SSLContext | None = ..., 

385 ca_cert_dir: str | None = ..., 

386 key_password: str | None = ..., 

387 ca_cert_data: None | str | bytes = ..., 

388 tls_in_tls: typing.Literal[False] = ..., 

389) -> ssl.SSLSocket: ... 

390 

391 

392@typing.overload 

393def ssl_wrap_socket( 

394 sock: socket.socket, 

395 keyfile: str | None = ..., 

396 certfile: str | None = ..., 

397 cert_reqs: int | None = ..., 

398 ca_certs: str | None = ..., 

399 server_hostname: str | None = ..., 

400 ssl_version: int | None = ..., 

401 ciphers: str | None = ..., 

402 ssl_context: ssl.SSLContext | None = ..., 

403 ca_cert_dir: str | None = ..., 

404 key_password: str | None = ..., 

405 ca_cert_data: None | str | bytes = ..., 

406 tls_in_tls: bool = ..., 

407) -> ssl.SSLSocket | SSLTransportType: ... 

408 

409 

410def ssl_wrap_socket( 

411 sock: socket.socket, 

412 keyfile: str | None = None, 

413 certfile: str | None = None, 

414 cert_reqs: int | None = None, 

415 ca_certs: str | None = None, 

416 server_hostname: str | None = None, 

417 ssl_version: int | None = None, 

418 ciphers: str | None = None, 

419 ssl_context: ssl.SSLContext | None = None, 

420 ca_cert_dir: str | None = None, 

421 key_password: str | None = None, 

422 ca_cert_data: None | str | bytes = None, 

423 tls_in_tls: bool = False, 

424) -> ssl.SSLSocket | SSLTransportType: 

425 """ 

426 All arguments except for server_hostname, ssl_context, tls_in_tls, ca_cert_data and 

427 ca_cert_dir have the same meaning as they do when using 

428 :func:`ssl.create_default_context`, :meth:`ssl.SSLContext.load_cert_chain`, 

429 :meth:`ssl.SSLContext.set_ciphers` and :meth:`ssl.SSLContext.wrap_socket`. 

430 

431 :param server_hostname: 

432 When SNI is supported, the expected hostname of the certificate 

433 :param ssl_context: 

434 A pre-made :class:`SSLContext` object. If none is provided, one will 

435 be created using :func:`create_urllib3_context`. 

436 :param ciphers: 

437 A string of ciphers we wish the client to support. 

438 :param ca_cert_dir: 

439 A directory containing CA certificates in multiple separate files, as 

440 supported by OpenSSL's -CApath flag or the capath argument to 

441 SSLContext.load_verify_locations(). 

442 :param key_password: 

443 Optional password if the keyfile is encrypted. 

444 :param ca_cert_data: 

445 Optional string containing CA certificates in PEM format suitable for 

446 passing as the cadata parameter to SSLContext.load_verify_locations() 

447 :param tls_in_tls: 

448 Use SSLTransport to wrap the existing socket. 

449 """ 

450 context = ssl_context 

451 if context is None: 

452 # Note: This branch of code and all the variables in it are only used in tests. 

453 # We should consider deprecating and removing this code. 

454 context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers) 

455 

456 if ca_certs or ca_cert_dir or ca_cert_data: 

457 try: 

458 context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data) 

459 except OSError as e: 

460 raise SSLError(e) from e 

461 

462 elif ssl_context is None and hasattr(context, "load_default_certs"): 

463 # try to load OS default certs; works well on Windows. 

464 context.load_default_certs() 

465 

466 # Attempt to detect if we get the goofy behavior of the 

467 # keyfile being encrypted and OpenSSL asking for the 

468 # passphrase via the terminal and instead error out. 

469 if keyfile and key_password is None and _is_key_file_encrypted(keyfile): 

470 raise SSLError("Client private key is encrypted, password is required") 

471 

472 if certfile: 

473 if key_password is None: 

474 context.load_cert_chain(certfile, keyfile) 

475 else: 

476 context.load_cert_chain(certfile, keyfile, key_password) 

477 

478 context.set_alpn_protocols(ALPN_PROTOCOLS) 

479 

480 ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname) 

481 return ssl_sock 

482 

483 

484def is_ipaddress(hostname: str | bytes) -> bool: 

485 """Detects whether the hostname given is an IPv4 or IPv6 address. 

486 Also detects IPv6 addresses with Zone IDs. 

487 

488 :param str hostname: Hostname to examine. 

489 :return: True if the hostname is an IP address, False otherwise. 

490 """ 

491 if isinstance(hostname, bytes): 

492 # IDN A-label bytes are ASCII compatible. 

493 hostname = hostname.decode("ascii") 

494 return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname)) 

495 

496 

497def _is_key_file_encrypted(key_file: str) -> bool: 

498 """Detects if a key file is encrypted or not.""" 

499 with open(key_file) as f: 

500 for line in f: 

501 # Look for Proc-Type: 4,ENCRYPTED 

502 if "ENCRYPTED" in line: 

503 return True 

504 

505 return False 

506 

507 

508def _ssl_wrap_socket_impl( 

509 sock: socket.socket, 

510 ssl_context: ssl.SSLContext, 

511 tls_in_tls: bool, 

512 server_hostname: str | None = None, 

513) -> ssl.SSLSocket | SSLTransportType: 

514 if tls_in_tls: 

515 if not SSLTransport: 

516 # Import error, ssl is not available. 

517 raise ProxySchemeUnsupported( 

518 "TLS in TLS requires support for the 'ssl' module" 

519 ) 

520 

521 SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context) 

522 return SSLTransport(sock, ssl_context, server_hostname) 

523 

524 return ssl_context.wrap_socket(sock, server_hostname=server_hostname)