Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/urllib3/contrib/pyopenssl.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

272 statements  

1""" 

2Module for using pyOpenSSL as a TLS backend. This module was relevant before 

3the standard library ``ssl`` module supported SNI, but now that we've dropped 

4support for Python 2.7 all relevant Python versions support SNI so 

5**this module is no longer recommended**. 

6 

7This needs the following packages installed: 

8 

9* `pyOpenSSL`_ (tested with 16.0.0) 

10* `cryptography`_ (minimum 1.3.4, from pyopenssl) 

11* `idna`_ (minimum 2.0) 

12 

13However, pyOpenSSL depends on cryptography, so while we use all three directly here we 

14end up having relatively few packages required. 

15 

16You can install them with the following command: 

17 

18.. code-block:: bash 

19 

20 $ python -m pip install pyopenssl cryptography idna 

21 

22To activate certificate checking, call 

23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code 

24before you begin making HTTP requests. This can be done in a ``sitecustomize`` 

25module, or at any other time before your application begins using ``urllib3``, 

26like this: 

27 

28.. code-block:: python 

29 

30 try: 

31 import urllib3.contrib.pyopenssl 

32 urllib3.contrib.pyopenssl.inject_into_urllib3() 

33 except ImportError: 

34 pass 

35 

36.. _pyopenssl: https://www.pyopenssl.org 

37.. _cryptography: https://cryptography.io 

38.. _idna: https://github.com/kjd/idna 

39""" 

40 

41from __future__ import annotations 

42 

43import OpenSSL.SSL # type: ignore[import-untyped] 

44from cryptography import x509 

45 

46try: 

47 from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined] 

48except ImportError: 

49 # UnsupportedExtension is gone in cryptography >= 2.1.0 

50 class UnsupportedExtension(Exception): # type: ignore[no-redef] 

51 pass 

52 

53 

54import logging 

55import ssl 

56import typing 

57from io import BytesIO 

58from socket import socket as socket_cls 

59from socket import timeout 

60 

61from .. import util 

62 

63if typing.TYPE_CHECKING: 

64 from OpenSSL.crypto import X509 # type: ignore[import-untyped] 

65 

66 

67__all__ = ["inject_into_urllib3", "extract_from_urllib3"] 

68 

69# Map from urllib3 to PyOpenSSL compatible parameter-values. 

70_openssl_versions: dict[int, int] = { 

71 util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] 

72 util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] 

73 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, 

74} 

75 

76if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"): 

77 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD 

78 

79if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"): 

80 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD 

81 

82 

83_stdlib_to_openssl_verify = { 

84 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, 

85 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, 

86 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER 

87 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, 

88} 

89_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()} 

90 

91# The SSLvX values are the most likely to be missing in the future 

92# but we check them all just to be sure. 

93_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr( 

94 OpenSSL.SSL, "OP_NO_SSLv3", 0 

95) 

96_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0) 

97_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0) 

98_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0) 

99_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0) 

100 

101_openssl_to_ssl_minimum_version: dict[int, int] = { 

102 ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 

103 ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3, 

104 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1, 

105 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1, 

106 ssl.TLSVersion.TLSv1_3: ( 

107 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 

108 ), 

109 ssl.TLSVersion.MAXIMUM_SUPPORTED: ( 

110 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 

111 ), 

112} 

113_openssl_to_ssl_maximum_version: dict[int, int] = { 

114 ssl.TLSVersion.MINIMUM_SUPPORTED: ( 

115 _OP_NO_SSLv2_OR_SSLv3 

116 | _OP_NO_TLSv1 

117 | _OP_NO_TLSv1_1 

118 | _OP_NO_TLSv1_2 

119 | _OP_NO_TLSv1_3 

120 ), 

121 ssl.TLSVersion.TLSv1: ( 

122 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3 

123 ), 

124 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3, 

125 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3, 

126 ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3, 

127 ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 

128} 

129 

130# OpenSSL will only write 16K at a time 

131SSL_WRITE_BLOCKSIZE = 16384 

132 

133orig_util_SSLContext = util.ssl_.SSLContext 

134 

135 

136log = logging.getLogger(__name__) 

137 

138 

139def inject_into_urllib3() -> None: 

140 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support." 

141 

142 _validate_dependencies_met() 

143 

144 util.SSLContext = PyOpenSSLContext # type: ignore[assignment] 

145 util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment] 

146 util.IS_PYOPENSSL = True 

147 util.ssl_.IS_PYOPENSSL = True 

148 

149 

150def extract_from_urllib3() -> None: 

151 "Undo monkey-patching by :func:`inject_into_urllib3`." 

152 

153 util.SSLContext = orig_util_SSLContext 

154 util.ssl_.SSLContext = orig_util_SSLContext 

155 util.IS_PYOPENSSL = False 

156 util.ssl_.IS_PYOPENSSL = False 

157 

158 

159def _validate_dependencies_met() -> None: 

160 """ 

161 Verifies that PyOpenSSL's package-level dependencies have been met. 

162 Throws `ImportError` if they are not met. 

163 """ 

164 # Method added in `cryptography==1.1`; not available in older versions 

165 from cryptography.x509.extensions import Extensions 

166 

167 if getattr(Extensions, "get_extension_for_class", None) is None: 

168 raise ImportError( 

169 "'cryptography' module missing required functionality. " 

170 "Try upgrading to v1.3.4 or newer." 

171 ) 

172 

173 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509 

174 # attribute is only present on those versions. 

175 from OpenSSL.crypto import X509 

176 

177 x509 = X509() 

178 if getattr(x509, "_x509", None) is None: 

179 raise ImportError( 

180 "'pyOpenSSL' module missing required functionality. " 

181 "Try upgrading to v0.14 or newer." 

182 ) 

183 

184 

185def _dnsname_to_stdlib(name: str) -> str | None: 

186 """ 

187 Converts a dNSName SubjectAlternativeName field to the form used by the 

188 standard library on the given Python version. 

189 

190 Cryptography produces a dNSName as a unicode string that was idna-decoded 

191 from ASCII bytes. We need to idna-encode that string to get it back, and 

192 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib 

193 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8). 

194 

195 If the name cannot be idna-encoded then we return None signalling that 

196 the name given should be skipped. 

197 """ 

198 

199 def idna_encode(name: str) -> bytes | None: 

200 """ 

201 Borrowed wholesale from the Python Cryptography Project. It turns out 

202 that we can't just safely call `idna.encode`: it can explode for 

203 wildcard names. This avoids that problem. 

204 """ 

205 import idna 

206 

207 try: 

208 for prefix in ["*.", "."]: 

209 if name.startswith(prefix): 

210 name = name[len(prefix) :] 

211 return prefix.encode("ascii") + idna.encode(name) 

212 return idna.encode(name) 

213 except idna.core.IDNAError: 

214 return None 

215 

216 # Don't send IPv6 addresses through the IDNA encoder. 

217 if ":" in name: 

218 return name 

219 

220 encoded_name = idna_encode(name) 

221 if encoded_name is None: 

222 return None 

223 return encoded_name.decode("utf-8") 

224 

225 

226def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]: 

227 """ 

228 Given an PyOpenSSL certificate, provides all the subject alternative names. 

229 """ 

230 cert = peer_cert.to_cryptography() 

231 

232 # We want to find the SAN extension. Ask Cryptography to locate it (it's 

233 # faster than looping in Python) 

234 try: 

235 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value 

236 except x509.ExtensionNotFound: 

237 # No such extension, return the empty list. 

238 return [] 

239 except ( 

240 x509.DuplicateExtension, 

241 UnsupportedExtension, 

242 x509.UnsupportedGeneralNameType, 

243 UnicodeError, 

244 ) as e: 

245 # A problem has been found with the quality of the certificate. Assume 

246 # no SAN field is present. 

247 log.warning( 

248 "A problem was encountered with the certificate that prevented " 

249 "urllib3 from finding the SubjectAlternativeName field. This can " 

250 "affect certificate validation. The error was %s", 

251 e, 

252 ) 

253 return [] 

254 

255 # We want to return dNSName and iPAddress fields. We need to cast the IPs 

256 # back to strings because the match_hostname function wants them as 

257 # strings. 

258 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8 

259 # decoded. This is pretty frustrating, but that's what the standard library 

260 # does with certificates, and so we need to attempt to do the same. 

261 # We also want to skip over names which cannot be idna encoded. 

262 names = [ 

263 ("DNS", name) 

264 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName)) 

265 if name is not None 

266 ] 

267 names.extend( 

268 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress) 

269 ) 

270 

271 return names 

272 

273 

274class WrappedSocket: 

275 """API-compatibility wrapper for Python OpenSSL's Connection-class.""" 

276 

277 def __init__( 

278 self, 

279 connection: OpenSSL.SSL.Connection, 

280 socket: socket_cls, 

281 suppress_ragged_eofs: bool = True, 

282 ) -> None: 

283 self.connection = connection 

284 self.socket = socket 

285 self.suppress_ragged_eofs = suppress_ragged_eofs 

286 self._io_refs = 0 

287 self._closed = False 

288 

289 def fileno(self) -> int: 

290 return self.socket.fileno() 

291 

292 # Copy-pasted from Python 3.5 source code 

293 def _decref_socketios(self) -> None: 

294 if self._io_refs > 0: 

295 self._io_refs -= 1 

296 if self._closed: 

297 self.close() 

298 

299 def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes: 

300 try: 

301 data = self.connection.recv(*args, **kwargs) 

302 except OpenSSL.SSL.SysCallError as e: 

303 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 

304 return b"" 

305 else: 

306 raise OSError(e.args[0], str(e)) from e 

307 except OpenSSL.SSL.ZeroReturnError: 

308 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 

309 return b"" 

310 else: 

311 raise 

312 except OpenSSL.SSL.WantReadError as e: 

313 if not util.wait_for_read(self.socket, self.socket.gettimeout()): 

314 raise timeout("The read operation timed out") from e 

315 else: 

316 return self.recv(*args, **kwargs) 

317 

318 # TLS 1.3 post-handshake authentication 

319 except OpenSSL.SSL.Error as e: 

320 raise ssl.SSLError(f"read error: {e!r}") from e 

321 else: 

322 return data # type: ignore[no-any-return] 

323 

324 def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int: 

325 try: 

326 return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return] 

327 except OpenSSL.SSL.SysCallError as e: 

328 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 

329 return 0 

330 else: 

331 raise OSError(e.args[0], str(e)) from e 

332 except OpenSSL.SSL.ZeroReturnError: 

333 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 

334 return 0 

335 else: 

336 raise 

337 except OpenSSL.SSL.WantReadError as e: 

338 if not util.wait_for_read(self.socket, self.socket.gettimeout()): 

339 raise timeout("The read operation timed out") from e 

340 else: 

341 return self.recv_into(*args, **kwargs) 

342 

343 # TLS 1.3 post-handshake authentication 

344 except OpenSSL.SSL.Error as e: 

345 raise ssl.SSLError(f"read error: {e!r}") from e 

346 

347 def settimeout(self, timeout: float) -> None: 

348 return self.socket.settimeout(timeout) 

349 

350 def _send_until_done(self, data: bytes) -> int: 

351 while True: 

352 try: 

353 return self.connection.send(data) # type: ignore[no-any-return] 

354 except OpenSSL.SSL.WantWriteError as e: 

355 if not util.wait_for_write(self.socket, self.socket.gettimeout()): 

356 raise timeout() from e 

357 continue 

358 except OpenSSL.SSL.SysCallError as e: 

359 raise OSError(e.args[0], str(e)) from e 

360 

361 def sendall(self, data: bytes) -> None: 

362 total_sent = 0 

363 while total_sent < len(data): 

364 sent = self._send_until_done( 

365 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE] 

366 ) 

367 total_sent += sent 

368 

369 def shutdown(self, how: int) -> None: 

370 try: 

371 self.connection.shutdown() 

372 except OpenSSL.SSL.Error as e: 

373 raise ssl.SSLError(f"shutdown error: {e!r}") from e 

374 

375 def close(self) -> None: 

376 self._closed = True 

377 if self._io_refs <= 0: 

378 self._real_close() 

379 

380 def _real_close(self) -> None: 

381 try: 

382 return self.connection.close() # type: ignore[no-any-return] 

383 except OpenSSL.SSL.Error: 

384 return 

385 

386 def getpeercert( 

387 self, binary_form: bool = False 

388 ) -> dict[str, list[typing.Any]] | None: 

389 x509 = self.connection.get_peer_certificate() 

390 

391 if not x509: 

392 return x509 # type: ignore[no-any-return] 

393 

394 if binary_form: 

395 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return] 

396 

397 return { 

398 "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item] 

399 "subjectAltName": get_subj_alt_name(x509), 

400 } 

401 

402 def version(self) -> str: 

403 return self.connection.get_protocol_version_name() # type: ignore[no-any-return] 

404 

405 def selected_alpn_protocol(self) -> str | None: 

406 alpn_proto = self.connection.get_alpn_proto_negotiated() 

407 return alpn_proto.decode() if alpn_proto else None 

408 

409 

410WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined] 

411 

412 

413class PyOpenSSLContext: 

414 """ 

415 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible 

416 for translating the interface of the standard library ``SSLContext`` object 

417 to calls into PyOpenSSL. 

418 """ 

419 

420 def __init__(self, protocol: int) -> None: 

421 self.protocol = _openssl_versions[protocol] 

422 self._ctx = OpenSSL.SSL.Context(self.protocol) 

423 self._options = 0 

424 self.check_hostname = False 

425 self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED 

426 self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED 

427 self._verify_flags: int = ssl.VERIFY_X509_TRUSTED_FIRST 

428 

429 @property 

430 def options(self) -> int: 

431 return self._options 

432 

433 @options.setter 

434 def options(self, value: int) -> None: 

435 self._options = value 

436 self._set_ctx_options() 

437 

438 @property 

439 def verify_flags(self) -> int: 

440 return self._verify_flags 

441 

442 @verify_flags.setter 

443 def verify_flags(self, value: int) -> None: 

444 self._verify_flags = value 

445 self._ctx.get_cert_store().set_flags(self._verify_flags) 

446 

447 @property 

448 def verify_mode(self) -> int: 

449 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()] 

450 

451 @verify_mode.setter 

452 def verify_mode(self, value: ssl.VerifyMode) -> None: 

453 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback) 

454 

455 def set_default_verify_paths(self) -> None: 

456 self._ctx.set_default_verify_paths() 

457 

458 def set_ciphers(self, ciphers: bytes | str) -> None: 

459 if isinstance(ciphers, str): 

460 ciphers = ciphers.encode("utf-8") 

461 self._ctx.set_cipher_list(ciphers) 

462 

463 def load_verify_locations( 

464 self, 

465 cafile: str | None = None, 

466 capath: str | None = None, 

467 cadata: bytes | None = None, 

468 ) -> None: 

469 if cafile is not None: 

470 cafile = cafile.encode("utf-8") # type: ignore[assignment] 

471 if capath is not None: 

472 capath = capath.encode("utf-8") # type: ignore[assignment] 

473 try: 

474 self._ctx.load_verify_locations(cafile, capath) 

475 if cadata is not None: 

476 self._ctx.load_verify_locations(BytesIO(cadata)) 

477 except OpenSSL.SSL.Error as e: 

478 raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e 

479 

480 def load_cert_chain( 

481 self, 

482 certfile: str, 

483 keyfile: str | None = None, 

484 password: str | None = None, 

485 ) -> None: 

486 try: 

487 self._ctx.use_certificate_chain_file(certfile) 

488 if password is not None: 

489 if not isinstance(password, bytes): 

490 password = password.encode("utf-8") # type: ignore[assignment] 

491 self._ctx.set_passwd_cb(lambda *_: password) 

492 self._ctx.use_privatekey_file(keyfile or certfile) 

493 except OpenSSL.SSL.Error as e: 

494 raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e 

495 

496 def set_alpn_protocols(self, protocols: list[bytes | str]) -> None: 

497 protocols = [util.util.to_bytes(p, "ascii") for p in protocols] 

498 return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return] 

499 

500 def wrap_socket( 

501 self, 

502 sock: socket_cls, 

503 server_side: bool = False, 

504 do_handshake_on_connect: bool = True, 

505 suppress_ragged_eofs: bool = True, 

506 server_hostname: bytes | str | None = None, 

507 ) -> WrappedSocket: 

508 cnx = OpenSSL.SSL.Connection(self._ctx, sock) 

509 

510 # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3 

511 if server_hostname and not util.ssl_.is_ipaddress(server_hostname): 

512 if isinstance(server_hostname, str): 

513 server_hostname = server_hostname.encode("utf-8") 

514 cnx.set_tlsext_host_name(server_hostname) 

515 

516 cnx.set_connect_state() 

517 

518 while True: 

519 try: 

520 cnx.do_handshake() 

521 except OpenSSL.SSL.WantReadError as e: 

522 if not util.wait_for_read(sock, sock.gettimeout()): 

523 raise timeout("select timed out") from e 

524 continue 

525 except OpenSSL.SSL.Error as e: 

526 raise ssl.SSLError(f"bad handshake: {e!r}") from e 

527 break 

528 

529 return WrappedSocket(cnx, sock) 

530 

531 def _set_ctx_options(self) -> None: 

532 self._ctx.set_options( 

533 self._options 

534 | _openssl_to_ssl_minimum_version[self._minimum_version] 

535 | _openssl_to_ssl_maximum_version[self._maximum_version] 

536 ) 

537 

538 @property 

539 def minimum_version(self) -> int: 

540 return self._minimum_version 

541 

542 @minimum_version.setter 

543 def minimum_version(self, minimum_version: int) -> None: 

544 self._minimum_version = minimum_version 

545 self._set_ctx_options() 

546 

547 @property 

548 def maximum_version(self) -> int: 

549 return self._maximum_version 

550 

551 @maximum_version.setter 

552 def maximum_version(self, maximum_version: int) -> None: 

553 self._maximum_version = maximum_version 

554 self._set_ctx_options() 

555 

556 

557def _verify_callback( 

558 cnx: OpenSSL.SSL.Connection, 

559 x509: X509, 

560 err_no: int, 

561 err_depth: int, 

562 return_code: int, 

563) -> bool: 

564 return err_no == 0