Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/contrib/pyopenssl.py: 28%

257 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1""" 

2Module for using pyOpenSSL as a TLS backend. This module was relevant before 

3the standard library ``ssl`` module supported SNI, but now that we've dropped 

4support for Python 2.7 all relevant Python versions support SNI so 

5**this module is no longer recommended**. 

6 

7This needs the following packages installed: 

8 

9* `pyOpenSSL`_ (tested with 16.0.0) 

10* `cryptography`_ (minimum 1.3.4, from pyopenssl) 

11* `idna`_ (minimum 2.0, from cryptography) 

12 

13However, pyOpenSSL depends on cryptography, which depends on idna, so while we 

14use all three directly here we end up having relatively few packages required. 

15 

16You can install them with the following command: 

17 

18.. code-block:: bash 

19 

20 $ python -m pip install pyopenssl cryptography idna 

21 

22To activate certificate checking, call 

23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code 

24before you begin making HTTP requests. This can be done in a ``sitecustomize`` 

25module, or at any other time before your application begins using ``urllib3``, 

26like this: 

27 

28.. code-block:: python 

29 

30 try: 

31 import urllib3.contrib.pyopenssl 

32 urllib3.contrib.pyopenssl.inject_into_urllib3() 

33 except ImportError: 

34 pass 

35 

36.. _pyopenssl: https://www.pyopenssl.org 

37.. _cryptography: https://cryptography.io 

38.. _idna: https://github.com/kjd/idna 

39""" 

40 

41from __future__ import annotations 

42 

43import OpenSSL.SSL # type: ignore[import] 

44from cryptography import x509 

45 

46try: 

47 from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined] 

48except ImportError: 

49 # UnsupportedExtension is gone in cryptography >= 2.1.0 

50 class UnsupportedExtension(Exception): # type: ignore[no-redef] 

51 pass 

52 

53 

54import logging 

55import ssl 

56import typing 

57import warnings 

58from io import BytesIO 

59from socket import socket as socket_cls 

60from socket import timeout 

61 

62from .. import util 

63 

64warnings.warn( 

65 "'urllib3.contrib.pyopenssl' module is deprecated and will be removed " 

66 "in urllib3 v2.1.0. Read more in this issue: " 

67 "https://github.com/urllib3/urllib3/issues/2680", 

68 category=DeprecationWarning, 

69 stacklevel=2, 

70) 

71 

72if typing.TYPE_CHECKING: 

73 from OpenSSL.crypto import X509 # type: ignore[import] 

74 

75 

76__all__ = ["inject_into_urllib3", "extract_from_urllib3"] 

77 

78# Map from urllib3 to PyOpenSSL compatible parameter-values. 

79_openssl_versions = { 

80 util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] 

81 util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] 

82 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, 

83} 

84 

85if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"): 

86 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD 

87 

88if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"): 

89 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD 

90 

91 

92_stdlib_to_openssl_verify = { 

93 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, 

94 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, 

95 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER 

96 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, 

97} 

98_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()} 

99 

100# The SSLvX values are the most likely to be missing in the future 

101# but we check them all just to be sure. 

102_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr( 

103 OpenSSL.SSL, "OP_NO_SSLv3", 0 

104) 

105_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0) 

106_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0) 

107_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0) 

108_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0) 

109 

110_openssl_to_ssl_minimum_version: dict[int, int] = { 

111 ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 

112 ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3, 

113 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1, 

114 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1, 

115 ssl.TLSVersion.TLSv1_3: ( 

116 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 

117 ), 

118 ssl.TLSVersion.MAXIMUM_SUPPORTED: ( 

119 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 

120 ), 

121} 

122_openssl_to_ssl_maximum_version: dict[int, int] = { 

123 ssl.TLSVersion.MINIMUM_SUPPORTED: ( 

124 _OP_NO_SSLv2_OR_SSLv3 

125 | _OP_NO_TLSv1 

126 | _OP_NO_TLSv1_1 

127 | _OP_NO_TLSv1_2 

128 | _OP_NO_TLSv1_3 

129 ), 

130 ssl.TLSVersion.TLSv1: ( 

131 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3 

132 ), 

133 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3, 

134 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3, 

135 ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3, 

136 ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 

137} 

138 

139# OpenSSL will only write 16K at a time 

140SSL_WRITE_BLOCKSIZE = 16384 

141 

142orig_util_SSLContext = util.ssl_.SSLContext 

143 

144 

145log = logging.getLogger(__name__) 

146 

147 

148def inject_into_urllib3() -> None: 

149 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support." 

150 

151 _validate_dependencies_met() 

152 

153 util.SSLContext = PyOpenSSLContext # type: ignore[assignment] 

154 util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment] 

155 util.IS_PYOPENSSL = True 

156 util.ssl_.IS_PYOPENSSL = True 

157 

158 

159def extract_from_urllib3() -> None: 

160 "Undo monkey-patching by :func:`inject_into_urllib3`." 

161 

162 util.SSLContext = orig_util_SSLContext 

163 util.ssl_.SSLContext = orig_util_SSLContext 

164 util.IS_PYOPENSSL = False 

165 util.ssl_.IS_PYOPENSSL = False 

166 

167 

168def _validate_dependencies_met() -> None: 

169 """ 

170 Verifies that PyOpenSSL's package-level dependencies have been met. 

171 Throws `ImportError` if they are not met. 

172 """ 

173 # Method added in `cryptography==1.1`; not available in older versions 

174 from cryptography.x509.extensions import Extensions 

175 

176 if getattr(Extensions, "get_extension_for_class", None) is None: 

177 raise ImportError( 

178 "'cryptography' module missing required functionality. " 

179 "Try upgrading to v1.3.4 or newer." 

180 ) 

181 

182 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509 

183 # attribute is only present on those versions. 

184 from OpenSSL.crypto import X509 

185 

186 x509 = X509() 

187 if getattr(x509, "_x509", None) is None: 

188 raise ImportError( 

189 "'pyOpenSSL' module missing required functionality. " 

190 "Try upgrading to v0.14 or newer." 

191 ) 

192 

193 

194def _dnsname_to_stdlib(name: str) -> str | None: 

195 """ 

196 Converts a dNSName SubjectAlternativeName field to the form used by the 

197 standard library on the given Python version. 

198 

199 Cryptography produces a dNSName as a unicode string that was idna-decoded 

200 from ASCII bytes. We need to idna-encode that string to get it back, and 

201 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib 

202 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8). 

203 

204 If the name cannot be idna-encoded then we return None signalling that 

205 the name given should be skipped. 

206 """ 

207 

208 def idna_encode(name: str) -> bytes | None: 

209 """ 

210 Borrowed wholesale from the Python Cryptography Project. It turns out 

211 that we can't just safely call `idna.encode`: it can explode for 

212 wildcard names. This avoids that problem. 

213 """ 

214 import idna 

215 

216 try: 

217 for prefix in ["*.", "."]: 

218 if name.startswith(prefix): 

219 name = name[len(prefix) :] 

220 return prefix.encode("ascii") + idna.encode(name) 

221 return idna.encode(name) 

222 except idna.core.IDNAError: 

223 return None 

224 

225 # Don't send IPv6 addresses through the IDNA encoder. 

226 if ":" in name: 

227 return name 

228 

229 encoded_name = idna_encode(name) 

230 if encoded_name is None: 

231 return None 

232 return encoded_name.decode("utf-8") 

233 

234 

235def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]: 

236 """ 

237 Given an PyOpenSSL certificate, provides all the subject alternative names. 

238 """ 

239 cert = peer_cert.to_cryptography() 

240 

241 # We want to find the SAN extension. Ask Cryptography to locate it (it's 

242 # faster than looping in Python) 

243 try: 

244 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value 

245 except x509.ExtensionNotFound: 

246 # No such extension, return the empty list. 

247 return [] 

248 except ( 

249 x509.DuplicateExtension, 

250 UnsupportedExtension, 

251 x509.UnsupportedGeneralNameType, 

252 UnicodeError, 

253 ) as e: 

254 # A problem has been found with the quality of the certificate. Assume 

255 # no SAN field is present. 

256 log.warning( 

257 "A problem was encountered with the certificate that prevented " 

258 "urllib3 from finding the SubjectAlternativeName field. This can " 

259 "affect certificate validation. The error was %s", 

260 e, 

261 ) 

262 return [] 

263 

264 # We want to return dNSName and iPAddress fields. We need to cast the IPs 

265 # back to strings because the match_hostname function wants them as 

266 # strings. 

267 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8 

268 # decoded. This is pretty frustrating, but that's what the standard library 

269 # does with certificates, and so we need to attempt to do the same. 

270 # We also want to skip over names which cannot be idna encoded. 

271 names = [ 

272 ("DNS", name) 

273 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName)) 

274 if name is not None 

275 ] 

276 names.extend( 

277 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress) 

278 ) 

279 

280 return names 

281 

282 

283class WrappedSocket: 

284 """API-compatibility wrapper for Python OpenSSL's Connection-class.""" 

285 

286 def __init__( 

287 self, 

288 connection: OpenSSL.SSL.Connection, 

289 socket: socket_cls, 

290 suppress_ragged_eofs: bool = True, 

291 ) -> None: 

292 self.connection = connection 

293 self.socket = socket 

294 self.suppress_ragged_eofs = suppress_ragged_eofs 

295 self._io_refs = 0 

296 self._closed = False 

297 

298 def fileno(self) -> int: 

299 return self.socket.fileno() 

300 

301 # Copy-pasted from Python 3.5 source code 

302 def _decref_socketios(self) -> None: 

303 if self._io_refs > 0: 

304 self._io_refs -= 1 

305 if self._closed: 

306 self.close() 

307 

308 def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes: 

309 try: 

310 data = self.connection.recv(*args, **kwargs) 

311 except OpenSSL.SSL.SysCallError as e: 

312 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 

313 return b"" 

314 else: 

315 raise OSError(e.args[0], str(e)) from e 

316 except OpenSSL.SSL.ZeroReturnError: 

317 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 

318 return b"" 

319 else: 

320 raise 

321 except OpenSSL.SSL.WantReadError as e: 

322 if not util.wait_for_read(self.socket, self.socket.gettimeout()): 

323 raise timeout("The read operation timed out") from e 

324 else: 

325 return self.recv(*args, **kwargs) 

326 

327 # TLS 1.3 post-handshake authentication 

328 except OpenSSL.SSL.Error as e: 

329 raise ssl.SSLError(f"read error: {e!r}") from e 

330 else: 

331 return data # type: ignore[no-any-return] 

332 

333 def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int: 

334 try: 

335 return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return] 

336 except OpenSSL.SSL.SysCallError as e: 

337 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 

338 return 0 

339 else: 

340 raise OSError(e.args[0], str(e)) from e 

341 except OpenSSL.SSL.ZeroReturnError: 

342 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 

343 return 0 

344 else: 

345 raise 

346 except OpenSSL.SSL.WantReadError as e: 

347 if not util.wait_for_read(self.socket, self.socket.gettimeout()): 

348 raise timeout("The read operation timed out") from e 

349 else: 

350 return self.recv_into(*args, **kwargs) 

351 

352 # TLS 1.3 post-handshake authentication 

353 except OpenSSL.SSL.Error as e: 

354 raise ssl.SSLError(f"read error: {e!r}") from e 

355 

356 def settimeout(self, timeout: float) -> None: 

357 return self.socket.settimeout(timeout) 

358 

359 def _send_until_done(self, data: bytes) -> int: 

360 while True: 

361 try: 

362 return self.connection.send(data) # type: ignore[no-any-return] 

363 except OpenSSL.SSL.WantWriteError as e: 

364 if not util.wait_for_write(self.socket, self.socket.gettimeout()): 

365 raise timeout() from e 

366 continue 

367 except OpenSSL.SSL.SysCallError as e: 

368 raise OSError(e.args[0], str(e)) from e 

369 

370 def sendall(self, data: bytes) -> None: 

371 total_sent = 0 

372 while total_sent < len(data): 

373 sent = self._send_until_done( 

374 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE] 

375 ) 

376 total_sent += sent 

377 

378 def shutdown(self) -> None: 

379 # FIXME rethrow compatible exceptions should we ever use this 

380 self.connection.shutdown() 

381 

382 def close(self) -> None: 

383 self._closed = True 

384 if self._io_refs <= 0: 

385 self._real_close() 

386 

387 def _real_close(self) -> None: 

388 try: 

389 return self.connection.close() # type: ignore[no-any-return] 

390 except OpenSSL.SSL.Error: 

391 return 

392 

393 def getpeercert( 

394 self, binary_form: bool = False 

395 ) -> dict[str, list[typing.Any]] | None: 

396 x509 = self.connection.get_peer_certificate() 

397 

398 if not x509: 

399 return x509 # type: ignore[no-any-return] 

400 

401 if binary_form: 

402 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return] 

403 

404 return { 

405 "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item] 

406 "subjectAltName": get_subj_alt_name(x509), 

407 } 

408 

409 def version(self) -> str: 

410 return self.connection.get_protocol_version_name() # type: ignore[no-any-return] 

411 

412 

413WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined] 

414 

415 

416class PyOpenSSLContext: 

417 """ 

418 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible 

419 for translating the interface of the standard library ``SSLContext`` object 

420 to calls into PyOpenSSL. 

421 """ 

422 

423 def __init__(self, protocol: int) -> None: 

424 self.protocol = _openssl_versions[protocol] 

425 self._ctx = OpenSSL.SSL.Context(self.protocol) 

426 self._options = 0 

427 self.check_hostname = False 

428 self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED 

429 self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED 

430 

431 @property 

432 def options(self) -> int: 

433 return self._options 

434 

435 @options.setter 

436 def options(self, value: int) -> None: 

437 self._options = value 

438 self._set_ctx_options() 

439 

440 @property 

441 def verify_mode(self) -> int: 

442 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()] 

443 

444 @verify_mode.setter 

445 def verify_mode(self, value: ssl.VerifyMode) -> None: 

446 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback) 

447 

448 def set_default_verify_paths(self) -> None: 

449 self._ctx.set_default_verify_paths() 

450 

451 def set_ciphers(self, ciphers: bytes | str) -> None: 

452 if isinstance(ciphers, str): 

453 ciphers = ciphers.encode("utf-8") 

454 self._ctx.set_cipher_list(ciphers) 

455 

456 def load_verify_locations( 

457 self, 

458 cafile: str | None = None, 

459 capath: str | None = None, 

460 cadata: bytes | None = None, 

461 ) -> None: 

462 if cafile is not None: 

463 cafile = cafile.encode("utf-8") # type: ignore[assignment] 

464 if capath is not None: 

465 capath = capath.encode("utf-8") # type: ignore[assignment] 

466 try: 

467 self._ctx.load_verify_locations(cafile, capath) 

468 if cadata is not None: 

469 self._ctx.load_verify_locations(BytesIO(cadata)) 

470 except OpenSSL.SSL.Error as e: 

471 raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e 

472 

473 def load_cert_chain( 

474 self, 

475 certfile: str, 

476 keyfile: str | None = None, 

477 password: str | None = None, 

478 ) -> None: 

479 try: 

480 self._ctx.use_certificate_chain_file(certfile) 

481 if password is not None: 

482 if not isinstance(password, bytes): 

483 password = password.encode("utf-8") # type: ignore[assignment] 

484 self._ctx.set_passwd_cb(lambda *_: password) 

485 self._ctx.use_privatekey_file(keyfile or certfile) 

486 except OpenSSL.SSL.Error as e: 

487 raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e 

488 

489 def set_alpn_protocols(self, protocols: list[bytes | str]) -> None: 

490 protocols = [util.util.to_bytes(p, "ascii") for p in protocols] 

491 return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return] 

492 

493 def wrap_socket( 

494 self, 

495 sock: socket_cls, 

496 server_side: bool = False, 

497 do_handshake_on_connect: bool = True, 

498 suppress_ragged_eofs: bool = True, 

499 server_hostname: bytes | str | None = None, 

500 ) -> WrappedSocket: 

501 cnx = OpenSSL.SSL.Connection(self._ctx, sock) 

502 

503 # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3 

504 if server_hostname and not util.ssl_.is_ipaddress(server_hostname): 

505 if isinstance(server_hostname, str): 

506 server_hostname = server_hostname.encode("utf-8") 

507 cnx.set_tlsext_host_name(server_hostname) 

508 

509 cnx.set_connect_state() 

510 

511 while True: 

512 try: 

513 cnx.do_handshake() 

514 except OpenSSL.SSL.WantReadError as e: 

515 if not util.wait_for_read(sock, sock.gettimeout()): 

516 raise timeout("select timed out") from e 

517 continue 

518 except OpenSSL.SSL.Error as e: 

519 raise ssl.SSLError(f"bad handshake: {e!r}") from e 

520 break 

521 

522 return WrappedSocket(cnx, sock) 

523 

524 def _set_ctx_options(self) -> None: 

525 self._ctx.set_options( 

526 self._options 

527 | _openssl_to_ssl_minimum_version[self._minimum_version] 

528 | _openssl_to_ssl_maximum_version[self._maximum_version] 

529 ) 

530 

531 @property 

532 def minimum_version(self) -> int: 

533 return self._minimum_version 

534 

535 @minimum_version.setter 

536 def minimum_version(self, minimum_version: int) -> None: 

537 self._minimum_version = minimum_version 

538 self._set_ctx_options() 

539 

540 @property 

541 def maximum_version(self) -> int: 

542 return self._maximum_version 

543 

544 @maximum_version.setter 

545 def maximum_version(self, maximum_version: int) -> None: 

546 self._maximum_version = maximum_version 

547 self._set_ctx_options() 

548 

549 

550def _verify_callback( 

551 cnx: OpenSSL.SSL.Connection, 

552 x509: X509, 

553 err_no: int, 

554 err_depth: int, 

555 return_code: int, 

556) -> bool: 

557 return err_no == 0