Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/urllib3/contrib/pyopenssl.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

271 statements  

1""" 

2Module for using pyOpenSSL as a TLS backend. This module was relevant before 

3the standard library ``ssl`` module supported SNI, but now that we've dropped 

4support for Python 2.7 all relevant Python versions support SNI so 

5**this module is no longer recommended**. 

6 

7This needs the following packages installed: 

8 

9* `pyOpenSSL`_ (tested with 19.0.0) 

10* `cryptography`_ (minimum 2.3, from pyopenssl) 

11* `idna`_ (minimum 2.1, from cryptography) 

12 

13However, pyOpenSSL depends on cryptography, so while we use all three directly here we 

14end up having relatively few packages required. 

15 

16You can install them with the following command: 

17 

18.. code-block:: bash 

19 

20 $ python -m pip install pyopenssl cryptography idna 

21 

22To activate certificate checking, call 

23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code 

24before you begin making HTTP requests. This can be done in a ``sitecustomize`` 

25module, or at any other time before your application begins using ``urllib3``, 

26like this: 

27 

28.. code-block:: python 

29 

30 try: 

31 import urllib3.contrib.pyopenssl 

32 urllib3.contrib.pyopenssl.inject_into_urllib3() 

33 except ImportError: 

34 pass 

35 

36.. _pyopenssl: https://www.pyopenssl.org 

37.. _cryptography: https://cryptography.io 

38.. _idna: https://github.com/kjd/idna 

39""" 

40 

41from __future__ import annotations 

42 

43import OpenSSL.SSL # type: ignore[import-not-found] 

44from cryptography import x509 

45 

46try: 

47 from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined] 

48except ImportError: 

49 # UnsupportedExtension is gone in cryptography >= 2.1.0 

50 class UnsupportedExtension(Exception): # type: ignore[no-redef] 

51 pass 

52 

53 

54import logging 

55import ssl 

56import typing 

57from io import BytesIO 

58from socket import socket as socket_cls 

59 

60from .. import util 

61 

62if typing.TYPE_CHECKING: 

63 from OpenSSL.crypto import X509 # type: ignore[import-not-found] 

64 

65 

66__all__ = ["inject_into_urllib3", "extract_from_urllib3"] 

67 

68# Map from urllib3 to PyOpenSSL compatible parameter-values. 

69_openssl_versions: dict[int, int] = { 

70 util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] 

71 util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] 

72 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, 

73} 

74 

75if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"): 

76 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD 

77 

78if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"): 

79 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD 

80 

81 

82_stdlib_to_openssl_verify = { 

83 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, 

84 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, 

85 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER 

86 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, 

87} 

88_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()} 

89 

90# The SSLvX values are the most likely to be missing in the future 

91# but we check them all just to be sure. 

92_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr( 

93 OpenSSL.SSL, "OP_NO_SSLv3", 0 

94) 

95_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0) 

96_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0) 

97_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0) 

98_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0) 

99 

100_openssl_to_ssl_minimum_version: dict[int, int] = { 

101 ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 

102 ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3, 

103 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1, 

104 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1, 

105 ssl.TLSVersion.TLSv1_3: ( 

106 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 

107 ), 

108 ssl.TLSVersion.MAXIMUM_SUPPORTED: ( 

109 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 

110 ), 

111} 

112_openssl_to_ssl_maximum_version: dict[int, int] = { 

113 ssl.TLSVersion.MINIMUM_SUPPORTED: ( 

114 _OP_NO_SSLv2_OR_SSLv3 

115 | _OP_NO_TLSv1 

116 | _OP_NO_TLSv1_1 

117 | _OP_NO_TLSv1_2 

118 | _OP_NO_TLSv1_3 

119 ), 

120 ssl.TLSVersion.TLSv1: ( 

121 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3 

122 ), 

123 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3, 

124 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3, 

125 ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3, 

126 ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 

127} 

128 

129# OpenSSL will only write 16K at a time 

130SSL_WRITE_BLOCKSIZE = 16384 

131 

132orig_util_SSLContext = util.ssl_.SSLContext 

133 

134 

135log = logging.getLogger(__name__) 

136 

137 

138def inject_into_urllib3() -> None: 

139 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support." 

140 

141 _validate_dependencies_met() 

142 

143 util.SSLContext = PyOpenSSLContext # type: ignore[assignment] 

144 util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment] 

145 util.IS_PYOPENSSL = True 

146 util.ssl_.IS_PYOPENSSL = True 

147 

148 

149def extract_from_urllib3() -> None: 

150 "Undo monkey-patching by :func:`inject_into_urllib3`." 

151 

152 util.SSLContext = orig_util_SSLContext 

153 util.ssl_.SSLContext = orig_util_SSLContext 

154 util.IS_PYOPENSSL = False 

155 util.ssl_.IS_PYOPENSSL = False 

156 

157 

158def _validate_dependencies_met() -> None: 

159 """ 

160 Verifies that PyOpenSSL's package-level dependencies have been met. 

161 Throws `ImportError` if they are not met. 

162 """ 

163 # Method added in `cryptography==1.1`; not available in older versions 

164 from cryptography.x509.extensions import Extensions 

165 

166 if getattr(Extensions, "get_extension_for_class", None) is None: 

167 raise ImportError( 

168 "'cryptography' module missing required functionality. " 

169 "Try upgrading to v1.3.4 or newer." 

170 ) 

171 

172 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509 

173 # attribute is only present on those versions. 

174 from OpenSSL.crypto import X509 

175 

176 x509 = X509() 

177 if getattr(x509, "_x509", None) is None: 

178 raise ImportError( 

179 "'pyOpenSSL' module missing required functionality. " 

180 "Try upgrading to v0.14 or newer." 

181 ) 

182 

183 

184def _dnsname_to_stdlib(name: str) -> str | None: 

185 """ 

186 Converts a dNSName SubjectAlternativeName field to the form used by the 

187 standard library on the given Python version. 

188 

189 Cryptography produces a dNSName as a unicode string that was idna-decoded 

190 from ASCII bytes. We need to idna-encode that string to get it back, and 

191 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib 

192 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8). 

193 

194 If the name cannot be idna-encoded then we return None signalling that 

195 the name given should be skipped. 

196 """ 

197 

198 def idna_encode(name: str) -> bytes | None: 

199 """ 

200 Borrowed wholesale from the Python Cryptography Project. It turns out 

201 that we can't just safely call `idna.encode`: it can explode for 

202 wildcard names. This avoids that problem. 

203 """ 

204 import idna 

205 

206 try: 

207 for prefix in ["*.", "."]: 

208 if name.startswith(prefix): 

209 name = name[len(prefix) :] 

210 return prefix.encode("ascii") + idna.encode(name) 

211 return idna.encode(name) 

212 except idna.core.IDNAError: 

213 return None 

214 

215 # Don't send IPv6 addresses through the IDNA encoder. 

216 if ":" in name: 

217 return name 

218 

219 encoded_name = idna_encode(name) 

220 if encoded_name is None: 

221 return None 

222 return encoded_name.decode("utf-8") 

223 

224 

225def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]: 

226 """ 

227 Given an PyOpenSSL certificate, provides all the subject alternative names. 

228 """ 

229 cert = peer_cert.to_cryptography() 

230 

231 # We want to find the SAN extension. Ask Cryptography to locate it (it's 

232 # faster than looping in Python) 

233 try: 

234 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value 

235 except x509.ExtensionNotFound: 

236 # No such extension, return the empty list. 

237 return [] 

238 except ( 

239 x509.DuplicateExtension, 

240 UnsupportedExtension, 

241 x509.UnsupportedGeneralNameType, 

242 UnicodeError, 

243 ) as e: 

244 # A problem has been found with the quality of the certificate. Assume 

245 # no SAN field is present. 

246 log.warning( 

247 "A problem was encountered with the certificate that prevented " 

248 "urllib3 from finding the SubjectAlternativeName field. This can " 

249 "affect certificate validation. The error was %s", 

250 e, 

251 ) 

252 return [] 

253 

254 # We want to return dNSName and iPAddress fields. We need to cast the IPs 

255 # back to strings because the match_hostname function wants them as 

256 # strings. 

257 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8 

258 # decoded. This is pretty frustrating, but that's what the standard library 

259 # does with certificates, and so we need to attempt to do the same. 

260 # We also want to skip over names which cannot be idna encoded. 

261 names = [ 

262 ("DNS", name) 

263 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName)) 

264 if name is not None 

265 ] 

266 names.extend( 

267 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress) 

268 ) 

269 

270 return names 

271 

272 

273class WrappedSocket: 

274 """API-compatibility wrapper for Python OpenSSL's Connection-class.""" 

275 

276 def __init__( 

277 self, 

278 connection: OpenSSL.SSL.Connection, 

279 socket: socket_cls, 

280 suppress_ragged_eofs: bool = True, 

281 ) -> None: 

282 self.connection = connection 

283 self.socket = socket 

284 self.suppress_ragged_eofs = suppress_ragged_eofs 

285 self._io_refs = 0 

286 self._closed = False 

287 

288 def fileno(self) -> int: 

289 return self.socket.fileno() 

290 

291 # Copy-pasted from Python 3.5 source code 

292 def _decref_socketios(self) -> None: 

293 if self._io_refs > 0: 

294 self._io_refs -= 1 

295 if self._closed: 

296 self.close() 

297 

298 def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes: 

299 try: 

300 data = self.connection.recv(*args, **kwargs) 

301 except OpenSSL.SSL.SysCallError as e: 

302 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 

303 return b"" 

304 else: 

305 raise OSError(e.args[0], str(e)) from e 

306 except OpenSSL.SSL.ZeroReturnError: 

307 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 

308 return b"" 

309 else: 

310 raise 

311 except OpenSSL.SSL.WantReadError as e: 

312 if not util.wait_for_read(self.socket, self.socket.gettimeout()): 

313 raise TimeoutError("The read operation timed out") from e 

314 else: 

315 return self.recv(*args, **kwargs) 

316 

317 # TLS 1.3 post-handshake authentication 

318 except OpenSSL.SSL.Error as e: 

319 raise ssl.SSLError(f"read error: {e!r}") from e 

320 else: 

321 return data # type: ignore[no-any-return] 

322 

323 def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int: 

324 try: 

325 return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return] 

326 except OpenSSL.SSL.SysCallError as e: 

327 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 

328 return 0 

329 else: 

330 raise OSError(e.args[0], str(e)) from e 

331 except OpenSSL.SSL.ZeroReturnError: 

332 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 

333 return 0 

334 else: 

335 raise 

336 except OpenSSL.SSL.WantReadError as e: 

337 if not util.wait_for_read(self.socket, self.socket.gettimeout()): 

338 raise TimeoutError("The read operation timed out") from e 

339 else: 

340 return self.recv_into(*args, **kwargs) 

341 

342 # TLS 1.3 post-handshake authentication 

343 except OpenSSL.SSL.Error as e: 

344 raise ssl.SSLError(f"read error: {e!r}") from e 

345 

346 def settimeout(self, timeout: float) -> None: 

347 return self.socket.settimeout(timeout) 

348 

349 def _send_until_done(self, data: bytes) -> int: 

350 while True: 

351 try: 

352 return self.connection.send(data) # type: ignore[no-any-return] 

353 except OpenSSL.SSL.WantWriteError as e: 

354 if not util.wait_for_write(self.socket, self.socket.gettimeout()): 

355 raise TimeoutError() from e 

356 continue 

357 except OpenSSL.SSL.SysCallError as e: 

358 raise OSError(e.args[0], str(e)) from e 

359 

360 def sendall(self, data: bytes) -> None: 

361 total_sent = 0 

362 while total_sent < len(data): 

363 sent = self._send_until_done( 

364 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE] 

365 ) 

366 total_sent += sent 

367 

368 def shutdown(self, how: int) -> None: 

369 try: 

370 self.connection.shutdown() 

371 except OpenSSL.SSL.Error as e: 

372 raise ssl.SSLError(f"shutdown error: {e!r}") from e 

373 

374 def close(self) -> None: 

375 self._closed = True 

376 if self._io_refs <= 0: 

377 self._real_close() 

378 

379 def _real_close(self) -> None: 

380 try: 

381 return self.connection.close() # type: ignore[no-any-return] 

382 except OpenSSL.SSL.Error: 

383 return 

384 

385 def getpeercert( 

386 self, binary_form: bool = False 

387 ) -> dict[str, list[typing.Any]] | None: 

388 x509 = self.connection.get_peer_certificate() 

389 

390 if not x509: 

391 return x509 # type: ignore[no-any-return] 

392 

393 if binary_form: 

394 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return] 

395 

396 return { 

397 "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item] 

398 "subjectAltName": get_subj_alt_name(x509), 

399 } 

400 

401 def version(self) -> str: 

402 return self.connection.get_protocol_version_name() # type: ignore[no-any-return] 

403 

404 def selected_alpn_protocol(self) -> str | None: 

405 alpn_proto = self.connection.get_alpn_proto_negotiated() 

406 return alpn_proto.decode() if alpn_proto else None 

407 

408 

409WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined] 

410 

411 

412class PyOpenSSLContext: 

413 """ 

414 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible 

415 for translating the interface of the standard library ``SSLContext`` object 

416 to calls into PyOpenSSL. 

417 """ 

418 

419 def __init__(self, protocol: int) -> None: 

420 self.protocol = _openssl_versions[protocol] 

421 self._ctx = OpenSSL.SSL.Context(self.protocol) 

422 self._options = 0 

423 self.check_hostname = False 

424 self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED 

425 self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED 

426 self._verify_flags: int = ssl.VERIFY_X509_TRUSTED_FIRST 

427 

428 @property 

429 def options(self) -> int: 

430 return self._options 

431 

432 @options.setter 

433 def options(self, value: int) -> None: 

434 self._options = value 

435 self._set_ctx_options() 

436 

437 @property 

438 def verify_flags(self) -> int: 

439 return self._verify_flags 

440 

441 @verify_flags.setter 

442 def verify_flags(self, value: int) -> None: 

443 self._verify_flags = value 

444 self._ctx.get_cert_store().set_flags(self._verify_flags) 

445 

446 @property 

447 def verify_mode(self) -> int: 

448 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()] 

449 

450 @verify_mode.setter 

451 def verify_mode(self, value: ssl.VerifyMode) -> None: 

452 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback) 

453 

454 def set_default_verify_paths(self) -> None: 

455 self._ctx.set_default_verify_paths() 

456 

457 def set_ciphers(self, ciphers: bytes | str) -> None: 

458 if isinstance(ciphers, str): 

459 ciphers = ciphers.encode("utf-8") 

460 self._ctx.set_cipher_list(ciphers) 

461 

462 def load_verify_locations( 

463 self, 

464 cafile: str | None = None, 

465 capath: str | None = None, 

466 cadata: bytes | None = None, 

467 ) -> None: 

468 if cafile is not None: 

469 cafile = cafile.encode("utf-8") # type: ignore[assignment] 

470 if capath is not None: 

471 capath = capath.encode("utf-8") # type: ignore[assignment] 

472 try: 

473 self._ctx.load_verify_locations(cafile, capath) 

474 if cadata is not None: 

475 self._ctx.load_verify_locations(BytesIO(cadata)) 

476 except OpenSSL.SSL.Error as e: 

477 raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e 

478 

479 def load_cert_chain( 

480 self, 

481 certfile: str, 

482 keyfile: str | None = None, 

483 password: str | None = None, 

484 ) -> None: 

485 try: 

486 self._ctx.use_certificate_chain_file(certfile) 

487 if password is not None: 

488 if not isinstance(password, bytes): 

489 password = password.encode("utf-8") # type: ignore[assignment] 

490 self._ctx.set_passwd_cb(lambda *_: password) 

491 self._ctx.use_privatekey_file(keyfile or certfile) 

492 except OpenSSL.SSL.Error as e: 

493 raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e 

494 

495 def set_alpn_protocols(self, protocols: list[bytes | str]) -> None: 

496 protocols = [util.util.to_bytes(p, "ascii") for p in protocols] 

497 return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return] 

498 

499 def wrap_socket( 

500 self, 

501 sock: socket_cls, 

502 server_side: bool = False, 

503 do_handshake_on_connect: bool = True, 

504 suppress_ragged_eofs: bool = True, 

505 server_hostname: bytes | str | None = None, 

506 ) -> WrappedSocket: 

507 cnx = OpenSSL.SSL.Connection(self._ctx, sock) 

508 

509 # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3 

510 if server_hostname and not util.ssl_.is_ipaddress(server_hostname): 

511 if isinstance(server_hostname, str): 

512 server_hostname = server_hostname.encode("utf-8") 

513 cnx.set_tlsext_host_name(server_hostname) 

514 

515 cnx.set_connect_state() 

516 

517 while True: 

518 try: 

519 cnx.do_handshake() 

520 except OpenSSL.SSL.WantReadError as e: 

521 if not util.wait_for_read(sock, sock.gettimeout()): 

522 raise TimeoutError("select timed out") from e 

523 continue 

524 except OpenSSL.SSL.Error as e: 

525 raise ssl.SSLError(f"bad handshake: {e!r}") from e 

526 break 

527 

528 return WrappedSocket(cnx, sock) 

529 

530 def _set_ctx_options(self) -> None: 

531 self._ctx.set_options( 

532 self._options 

533 | _openssl_to_ssl_minimum_version[self._minimum_version] 

534 | _openssl_to_ssl_maximum_version[self._maximum_version] 

535 ) 

536 

537 @property 

538 def minimum_version(self) -> int: 

539 return self._minimum_version 

540 

541 @minimum_version.setter 

542 def minimum_version(self, minimum_version: int) -> None: 

543 self._minimum_version = minimum_version 

544 self._set_ctx_options() 

545 

546 @property 

547 def maximum_version(self) -> int: 

548 return self._maximum_version 

549 

550 @maximum_version.setter 

551 def maximum_version(self, maximum_version: int) -> None: 

552 self._maximum_version = maximum_version 

553 self._set_ctx_options() 

554 

555 

556def _verify_callback( 

557 cnx: OpenSSL.SSL.Connection, 

558 x509: X509, 

559 err_no: int, 

560 err_depth: int, 

561 return_code: int, 

562) -> bool: 

563 return err_no == 0