Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/urllib3/util/ssl_.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

170 statements  

1from __future__ import annotations 

2 

3import hashlib 

4import hmac 

5import os 

6import socket 

7import sys 

8import typing 

9import warnings 

10from binascii import unhexlify 

11 

12from ..exceptions import ProxySchemeUnsupported, SSLError 

13from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE 

14 

15SSLContext = None 

16SSLTransport = None 

17HAS_NEVER_CHECK_COMMON_NAME = False 

18IS_PYOPENSSL = False 

19ALPN_PROTOCOLS = ["http/1.1"] 

20 

21_TYPE_VERSION_INFO = tuple[int, int, int, str, int] 

22 

23# Maps the length of a digest to a possible hash function producing this digest 

24HASHFUNC_MAP = { 

25 length: getattr(hashlib, algorithm, None) 

26 for length, algorithm in ((32, "md5"), (40, "sha1"), (64, "sha256")) 

27} 

28 

29 

30def _is_has_never_check_common_name_reliable( 

31 openssl_version: str, 

32) -> bool: 

33 # As of May 2023, all released versions of LibreSSL fail to reject certificates with 

34 # only common names, see https://github.com/urllib3/urllib3/pull/3024 

35 is_openssl = openssl_version.startswith("OpenSSL ") 

36 

37 return is_openssl 

38 

39 

40if typing.TYPE_CHECKING: 

41 from ssl import VerifyMode 

42 from typing import TypedDict 

43 

44 from .ssltransport import SSLTransport as SSLTransportType 

45 

46 class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False): 

47 subjectAltName: tuple[tuple[str, str], ...] 

48 subject: tuple[tuple[tuple[str, str], ...], ...] 

49 serialNumber: str 

50 

51 

52# Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X' 

53_SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {} 

54 

55try: # Do we have ssl at all? 

56 import ssl 

57 from ssl import ( # type: ignore[assignment] 

58 CERT_REQUIRED, 

59 HAS_NEVER_CHECK_COMMON_NAME, 

60 OP_NO_COMPRESSION, 

61 OP_NO_TICKET, 

62 OPENSSL_VERSION, 

63 PROTOCOL_TLS, 

64 PROTOCOL_TLS_CLIENT, 

65 VERIFY_X509_PARTIAL_CHAIN, 

66 VERIFY_X509_STRICT, 

67 OP_NO_SSLv2, 

68 OP_NO_SSLv3, 

69 SSLContext, 

70 TLSVersion, 

71 ) 

72 

73 PROTOCOL_SSLv23 = PROTOCOL_TLS 

74 

75 # Setting SSLContext.hostname_checks_common_name = False didn't work with 

76 # LibreSSL, check details in the used function. 

77 if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable( 

78 OPENSSL_VERSION, 

79 ): # Defensive: 

80 HAS_NEVER_CHECK_COMMON_NAME = False 

81 

82 # Need to be careful here in case old TLS versions get 

83 # removed in future 'ssl' module implementations. 

84 for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"): 

85 try: 

86 _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr( 

87 TLSVersion, attr 

88 ) 

89 except AttributeError: # Defensive: 

90 continue 

91 

92 from .ssltransport import SSLTransport # type: ignore[assignment] 

93except ImportError: 

94 OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment, misc] 

95 OP_NO_TICKET = 0x4000 # type: ignore[assignment, misc] 

96 OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment, misc] 

97 OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment, misc] 

98 PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment, misc] 

99 PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment, misc] 

100 VERIFY_X509_PARTIAL_CHAIN = 0x80000 # type: ignore[assignment,misc] 

101 VERIFY_X509_STRICT = 0x20 # type: ignore[assignment, misc] 

102 

103 

104_TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None] 

105 

106 

107def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None: 

108 """ 

109 Checks if given fingerprint matches the supplied certificate. 

110 

111 :param cert: 

112 Certificate as bytes object. 

113 :param fingerprint: 

114 Fingerprint as string of hexdigits, can be interspersed by colons. 

115 """ 

116 

117 if cert is None: 

118 raise SSLError("No certificate for the peer.") 

119 

120 fingerprint = fingerprint.replace(":", "").lower() 

121 digest_length = len(fingerprint) 

122 if digest_length not in HASHFUNC_MAP: 

123 raise SSLError(f"Fingerprint of invalid length: {fingerprint}") 

124 hashfunc = HASHFUNC_MAP.get(digest_length) 

125 if hashfunc is None: 

126 raise SSLError( 

127 f"Hash function implementation unavailable for fingerprint length: {digest_length}" 

128 ) 

129 

130 # We need encode() here for py32; works on py2 and p33. 

131 fingerprint_bytes = unhexlify(fingerprint.encode()) 

132 

133 cert_digest = hashfunc(cert).digest() 

134 

135 if not hmac.compare_digest(cert_digest, fingerprint_bytes): 

136 raise SSLError( 

137 f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"' 

138 ) 

139 

140 

141def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode: 

142 """ 

143 Resolves the argument to a numeric constant, which can be passed to 

144 the wrap_socket function/method from the ssl module. 

145 Defaults to :data:`ssl.CERT_REQUIRED`. 

146 If given a string it is assumed to be the name of the constant in the 

147 :mod:`ssl` module or its abbreviation. 

148 (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. 

149 If it's neither `None` nor a string we assume it is already the numeric 

150 constant which can directly be passed to wrap_socket. 

151 """ 

152 if candidate is None: 

153 return CERT_REQUIRED 

154 

155 if isinstance(candidate, str): 

156 res = getattr(ssl, candidate, None) 

157 if res is None: 

158 res = getattr(ssl, "CERT_" + candidate) 

159 return res # type: ignore[no-any-return] 

160 

161 return candidate # type: ignore[return-value] 

162 

163 

164def resolve_ssl_version(candidate: None | int | str) -> int: 

165 """ 

166 like resolve_cert_reqs 

167 """ 

168 if candidate is None: 

169 return PROTOCOL_TLS 

170 

171 if isinstance(candidate, str): 

172 res = getattr(ssl, candidate, None) 

173 if res is None: 

174 res = getattr(ssl, "PROTOCOL_" + candidate) 

175 return typing.cast(int, res) 

176 

177 return candidate 

178 

179 

180def create_urllib3_context( 

181 ssl_version: int | None = None, 

182 cert_reqs: int | None = None, 

183 options: int | None = None, 

184 ciphers: str | None = None, 

185 ssl_minimum_version: int | None = None, 

186 ssl_maximum_version: int | None = None, 

187 verify_flags: int | None = None, 

188) -> ssl.SSLContext: 

189 """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3. 

190 

191 :param ssl_version: 

192 The desired protocol version to use. This will default to 

193 PROTOCOL_SSLv23 which will negotiate the highest protocol that both 

194 the server and your installation of OpenSSL support. 

195 

196 This parameter is deprecated instead use 'ssl_minimum_version'. 

197 :param ssl_minimum_version: 

198 The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. 

199 :param ssl_maximum_version: 

200 The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. 

201 Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the 

202 default value. 

203 :param cert_reqs: 

204 Whether to require the certificate verification. This defaults to 

205 ``ssl.CERT_REQUIRED``. 

206 :param options: 

207 Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``, 

208 ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``. 

209 :param ciphers: 

210 Which cipher suites to allow the server to select. Defaults to either system configured 

211 ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers. 

212 :param verify_flags: 

213 The flags for certificate verification operations. These default to 

214 ``ssl.VERIFY_X509_PARTIAL_CHAIN`` and ``ssl.VERIFY_X509_STRICT`` for Python 3.13+. 

215 :returns: 

216 Constructed SSLContext object with specified options 

217 :rtype: SSLContext 

218 """ 

219 if SSLContext is None: 

220 raise TypeError("Can't create an SSLContext object without an ssl module") 

221 

222 # This means 'ssl_version' was specified as an exact value. 

223 if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT): 

224 # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version' 

225 # to avoid conflicts. 

226 if ssl_minimum_version is not None or ssl_maximum_version is not None: 

227 raise ValueError( 

228 "Can't specify both 'ssl_version' and either " 

229 "'ssl_minimum_version' or 'ssl_maximum_version'" 

230 ) 

231 

232 # 'ssl_version' is deprecated and will be removed in the future. 

233 else: 

234 # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead. 

235 ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get( 

236 ssl_version, TLSVersion.MINIMUM_SUPPORTED 

237 ) 

238 ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get( 

239 ssl_version, TLSVersion.MAXIMUM_SUPPORTED 

240 ) 

241 

242 # This warning message is pushing users to use 'ssl_minimum_version' 

243 # instead of both min/max. Best practice is to only set the minimum version and 

244 # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED' 

245 warnings.warn( 

246 "'ssl_version' option is deprecated and will be " 

247 "removed in urllib3 v3.0. Instead use 'ssl_minimum_version'", 

248 category=FutureWarning, 

249 stacklevel=2, 

250 ) 

251 

252 context = SSLContext(PROTOCOL_TLS_CLIENT) 

253 if ssl_minimum_version is not None: 

254 context.minimum_version = ssl_minimum_version 

255 else: # pyOpenSSL defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here 

256 context.minimum_version = TLSVersion.TLSv1_2 

257 

258 if ssl_maximum_version is not None: 

259 context.maximum_version = ssl_maximum_version 

260 

261 # Unless we're given ciphers defer to either system ciphers in 

262 # the case of OpenSSL 1.1.1+ or use our own secure default ciphers. 

263 if ciphers: 

264 context.set_ciphers(ciphers) 

265 

266 # Setting the default here, as we may have no ssl module on import 

267 cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs 

268 

269 if options is None: 

270 options = 0 

271 # SSLv2 is easily broken and is considered harmful and dangerous 

272 options |= OP_NO_SSLv2 

273 # SSLv3 has several problems and is now dangerous 

274 options |= OP_NO_SSLv3 

275 # Disable compression to prevent CRIME attacks for OpenSSL 1.0+ 

276 # (issue #309) 

277 options |= OP_NO_COMPRESSION 

278 # TLSv1.2 only. Unless set explicitly, do not request tickets. 

279 # This may save some bandwidth on wire, and although the ticket is encrypted, 

280 # there is a risk associated with it being on wire, 

281 # if the server is not rotating its ticketing keys properly. 

282 options |= OP_NO_TICKET 

283 

284 context.options |= options 

285 

286 if verify_flags is None: 

287 verify_flags = 0 

288 # In Python 3.13+ ssl.create_default_context() sets VERIFY_X509_PARTIAL_CHAIN 

289 # and VERIFY_X509_STRICT so we do the same 

290 if sys.version_info >= (3, 13): 

291 verify_flags |= VERIFY_X509_PARTIAL_CHAIN 

292 verify_flags |= VERIFY_X509_STRICT 

293 

294 context.verify_flags |= verify_flags 

295 

296 # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is 

297 # necessary for conditional client cert authentication with TLS 1.3. 

298 # The attribute is None for OpenSSL <= 1.1.0 or does not exist when using 

299 # an SSLContext created by pyOpenSSL. 

300 if getattr(context, "post_handshake_auth", None) is not None: 

301 context.post_handshake_auth = True 

302 

303 # The order of the below lines setting verify_mode and check_hostname 

304 # matter due to safe-guards SSLContext has to prevent an SSLContext with 

305 # check_hostname=True, verify_mode=NONE/OPTIONAL. 

306 # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own 

307 # 'ssl.match_hostname()' implementation. 

308 if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL: 

309 context.verify_mode = cert_reqs 

310 context.check_hostname = True 

311 else: 

312 context.check_hostname = False 

313 context.verify_mode = cert_reqs 

314 

315 context.hostname_checks_common_name = False 

316 

317 if "SSLKEYLOGFILE" in os.environ: 

318 sslkeylogfile = os.path.expandvars(os.environ.get("SSLKEYLOGFILE")) 

319 else: 

320 sslkeylogfile = None 

321 if sslkeylogfile: 

322 context.keylog_filename = sslkeylogfile 

323 

324 return context 

325 

326 

327@typing.overload 

328def ssl_wrap_socket( 

329 sock: socket.socket, 

330 keyfile: str | None = ..., 

331 certfile: str | None = ..., 

332 cert_reqs: int | None = ..., 

333 ca_certs: str | None = ..., 

334 server_hostname: str | None = ..., 

335 ssl_version: int | None = ..., 

336 ciphers: str | None = ..., 

337 ssl_context: ssl.SSLContext | None = ..., 

338 ca_cert_dir: str | None = ..., 

339 key_password: str | None = ..., 

340 ca_cert_data: None | str | bytes = ..., 

341 tls_in_tls: typing.Literal[False] = ..., 

342) -> ssl.SSLSocket: ... 

343 

344 

345@typing.overload 

346def ssl_wrap_socket( 

347 sock: socket.socket, 

348 keyfile: str | None = ..., 

349 certfile: str | None = ..., 

350 cert_reqs: int | None = ..., 

351 ca_certs: str | None = ..., 

352 server_hostname: str | None = ..., 

353 ssl_version: int | None = ..., 

354 ciphers: str | None = ..., 

355 ssl_context: ssl.SSLContext | None = ..., 

356 ca_cert_dir: str | None = ..., 

357 key_password: str | None = ..., 

358 ca_cert_data: None | str | bytes = ..., 

359 tls_in_tls: bool = ..., 

360) -> ssl.SSLSocket | SSLTransportType: ... 

361 

362 

363def ssl_wrap_socket( 

364 sock: socket.socket, 

365 keyfile: str | None = None, 

366 certfile: str | None = None, 

367 cert_reqs: int | None = None, 

368 ca_certs: str | None = None, 

369 server_hostname: str | None = None, 

370 ssl_version: int | None = None, 

371 ciphers: str | None = None, 

372 ssl_context: ssl.SSLContext | None = None, 

373 ca_cert_dir: str | None = None, 

374 key_password: str | None = None, 

375 ca_cert_data: None | str | bytes = None, 

376 tls_in_tls: bool = False, 

377) -> ssl.SSLSocket | SSLTransportType: 

378 """ 

379 All arguments except for server_hostname, ssl_context, tls_in_tls, ca_cert_data and 

380 ca_cert_dir have the same meaning as they do when using 

381 :func:`ssl.create_default_context`, :meth:`ssl.SSLContext.load_cert_chain`, 

382 :meth:`ssl.SSLContext.set_ciphers` and :meth:`ssl.SSLContext.wrap_socket`. 

383 

384 :param server_hostname: 

385 When SNI is supported, the expected hostname of the certificate 

386 :param ssl_context: 

387 A pre-made :class:`SSLContext` object. If none is provided, one will 

388 be created using :func:`create_urllib3_context`. 

389 :param ciphers: 

390 A string of ciphers we wish the client to support. 

391 :param ca_cert_dir: 

392 A directory containing CA certificates in multiple separate files, as 

393 supported by OpenSSL's -CApath flag or the capath argument to 

394 SSLContext.load_verify_locations(). 

395 :param key_password: 

396 Optional password if the keyfile is encrypted. 

397 :param ca_cert_data: 

398 Optional string containing CA certificates in PEM format suitable for 

399 passing as the cadata parameter to SSLContext.load_verify_locations() 

400 :param tls_in_tls: 

401 Use SSLTransport to wrap the existing socket. 

402 """ 

403 context = ssl_context 

404 if context is None: 

405 # Note: This branch of code and all the variables in it are only used in tests. 

406 # We should consider deprecating and removing this code. 

407 context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers) 

408 

409 if ca_certs or ca_cert_dir or ca_cert_data: 

410 try: 

411 context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data) 

412 except OSError as e: 

413 raise SSLError(e) from e 

414 

415 elif ssl_context is None and hasattr(context, "load_default_certs"): 

416 # try to load OS default certs; works well on Windows. 

417 context.load_default_certs() 

418 

419 # Attempt to detect if we get the goofy behavior of the 

420 # keyfile being encrypted and OpenSSL asking for the 

421 # passphrase via the terminal and instead error out. 

422 if keyfile and key_password is None and _is_key_file_encrypted(keyfile): 

423 raise SSLError("Client private key is encrypted, password is required") 

424 

425 if certfile: 

426 if key_password is None: 

427 context.load_cert_chain(certfile, keyfile) 

428 else: 

429 context.load_cert_chain(certfile, keyfile, key_password) 

430 

431 context.set_alpn_protocols(ALPN_PROTOCOLS) 

432 

433 ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname) 

434 return ssl_sock 

435 

436 

437def is_ipaddress(hostname: str | bytes) -> bool: 

438 """Detects whether the hostname given is an IPv4 or IPv6 address. 

439 Also detects IPv6 addresses with Zone IDs. 

440 

441 :param str hostname: Hostname to examine. 

442 :return: True if the hostname is an IP address, False otherwise. 

443 """ 

444 if isinstance(hostname, bytes): 

445 # IDN A-label bytes are ASCII compatible. 

446 hostname = hostname.decode("ascii") 

447 return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname)) 

448 

449 

450def _is_key_file_encrypted(key_file: str) -> bool: 

451 """Detects if a key file is encrypted or not.""" 

452 with open(key_file) as f: 

453 for line in f: 

454 # Look for Proc-Type: 4,ENCRYPTED 

455 if "ENCRYPTED" in line: 

456 return True 

457 

458 return False 

459 

460 

461def _ssl_wrap_socket_impl( 

462 sock: socket.socket, 

463 ssl_context: ssl.SSLContext, 

464 tls_in_tls: bool, 

465 server_hostname: str | None = None, 

466) -> ssl.SSLSocket | SSLTransportType: 

467 if tls_in_tls: 

468 if not SSLTransport: 

469 # Import error, ssl is not available. 

470 raise ProxySchemeUnsupported( 

471 "TLS in TLS requires support for the 'ssl' module" 

472 ) 

473 

474 SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context) 

475 return SSLTransport(sock, ssl_context, server_hostname) 

476 

477 return ssl_context.wrap_socket(sock, server_hostname=server_hostname)