Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/urllib3/contrib/pyopenssl.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

260 statements  

1""" 

2Module for using pyOpenSSL as a TLS backend. This module was relevant before 

3the standard library ``ssl`` module supported SNI, but now that we've dropped 

4support for Python 2.7 all relevant Python versions support SNI so 

5**this module is no longer recommended**. 

6 

7This needs the following packages installed: 

8 

9* `pyOpenSSL`_ (tested with 16.0.0) 

10* `cryptography`_ (minimum 1.3.4, from pyopenssl) 

11* `idna`_ (minimum 2.0) 

12 

13However, pyOpenSSL depends on cryptography, so while we use all three directly here we 

14end up having relatively few packages required. 

15 

16You can install them with the following command: 

17 

18.. code-block:: bash 

19 

20 $ python -m pip install pyopenssl cryptography idna 

21 

22To activate certificate checking, call 

23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code 

24before you begin making HTTP requests. This can be done in a ``sitecustomize`` 

25module, or at any other time before your application begins using ``urllib3``, 

26like this: 

27 

28.. code-block:: python 

29 

30 try: 

31 import urllib3.contrib.pyopenssl 

32 urllib3.contrib.pyopenssl.inject_into_urllib3() 

33 except ImportError: 

34 pass 

35 

36.. _pyopenssl: https://www.pyopenssl.org 

37.. _cryptography: https://cryptography.io 

38.. _idna: https://github.com/kjd/idna 

39""" 

40 

41from __future__ import annotations 

42 

43import OpenSSL.SSL # type: ignore[import-untyped] 

44from cryptography import x509 

45 

46try: 

47 from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined] 

48except ImportError: 

49 # UnsupportedExtension is gone in cryptography >= 2.1.0 

50 class UnsupportedExtension(Exception): # type: ignore[no-redef] 

51 pass 

52 

53 

54import logging 

55import ssl 

56import typing 

57from io import BytesIO 

58from socket import socket as socket_cls 

59from socket import timeout 

60 

61from .. import util 

62 

63if typing.TYPE_CHECKING: 

64 from OpenSSL.crypto import X509 # type: ignore[import-untyped] 

65 

66 

67__all__ = ["inject_into_urllib3", "extract_from_urllib3"] 

68 

69# Map from urllib3 to PyOpenSSL compatible parameter-values. 

70_openssl_versions: dict[int, int] = { 

71 util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] 

72 util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] 

73 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, 

74} 

75 

76if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"): 

77 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD 

78 

79if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"): 

80 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD 

81 

82 

83_stdlib_to_openssl_verify = { 

84 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, 

85 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, 

86 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER 

87 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, 

88} 

89_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()} 

90 

91# The SSLvX values are the most likely to be missing in the future 

92# but we check them all just to be sure. 

93_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr( 

94 OpenSSL.SSL, "OP_NO_SSLv3", 0 

95) 

96_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0) 

97_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0) 

98_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0) 

99_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0) 

100 

101_openssl_to_ssl_minimum_version: dict[int, int] = { 

102 ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 

103 ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3, 

104 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1, 

105 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1, 

106 ssl.TLSVersion.TLSv1_3: ( 

107 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 

108 ), 

109 ssl.TLSVersion.MAXIMUM_SUPPORTED: ( 

110 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 

111 ), 

112} 

113_openssl_to_ssl_maximum_version: dict[int, int] = { 

114 ssl.TLSVersion.MINIMUM_SUPPORTED: ( 

115 _OP_NO_SSLv2_OR_SSLv3 

116 | _OP_NO_TLSv1 

117 | _OP_NO_TLSv1_1 

118 | _OP_NO_TLSv1_2 

119 | _OP_NO_TLSv1_3 

120 ), 

121 ssl.TLSVersion.TLSv1: ( 

122 _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3 

123 ), 

124 ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3, 

125 ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3, 

126 ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3, 

127 ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 

128} 

129 

130# OpenSSL will only write 16K at a time 

131SSL_WRITE_BLOCKSIZE = 16384 

132 

133orig_util_SSLContext = util.ssl_.SSLContext 

134 

135 

136log = logging.getLogger(__name__) 

137 

138 

139def inject_into_urllib3() -> None: 

140 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support." 

141 

142 _validate_dependencies_met() 

143 

144 util.SSLContext = PyOpenSSLContext # type: ignore[assignment] 

145 util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment] 

146 util.IS_PYOPENSSL = True 

147 util.ssl_.IS_PYOPENSSL = True 

148 

149 

150def extract_from_urllib3() -> None: 

151 "Undo monkey-patching by :func:`inject_into_urllib3`." 

152 

153 util.SSLContext = orig_util_SSLContext 

154 util.ssl_.SSLContext = orig_util_SSLContext 

155 util.IS_PYOPENSSL = False 

156 util.ssl_.IS_PYOPENSSL = False 

157 

158 

159def _validate_dependencies_met() -> None: 

160 """ 

161 Verifies that PyOpenSSL's package-level dependencies have been met. 

162 Throws `ImportError` if they are not met. 

163 """ 

164 # Method added in `cryptography==1.1`; not available in older versions 

165 from cryptography.x509.extensions import Extensions 

166 

167 if getattr(Extensions, "get_extension_for_class", None) is None: 

168 raise ImportError( 

169 "'cryptography' module missing required functionality. " 

170 "Try upgrading to v1.3.4 or newer." 

171 ) 

172 

173 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509 

174 # attribute is only present on those versions. 

175 from OpenSSL.crypto import X509 

176 

177 x509 = X509() 

178 if getattr(x509, "_x509", None) is None: 

179 raise ImportError( 

180 "'pyOpenSSL' module missing required functionality. " 

181 "Try upgrading to v0.14 or newer." 

182 ) 

183 

184 

185def _dnsname_to_stdlib(name: str) -> str | None: 

186 """ 

187 Converts a dNSName SubjectAlternativeName field to the form used by the 

188 standard library on the given Python version. 

189 

190 Cryptography produces a dNSName as a unicode string that was idna-decoded 

191 from ASCII bytes. We need to idna-encode that string to get it back, and 

192 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib 

193 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8). 

194 

195 If the name cannot be idna-encoded then we return None signalling that 

196 the name given should be skipped. 

197 """ 

198 

199 def idna_encode(name: str) -> bytes | None: 

200 """ 

201 Borrowed wholesale from the Python Cryptography Project. It turns out 

202 that we can't just safely call `idna.encode`: it can explode for 

203 wildcard names. This avoids that problem. 

204 """ 

205 import idna 

206 

207 try: 

208 for prefix in ["*.", "."]: 

209 if name.startswith(prefix): 

210 name = name[len(prefix) :] 

211 return prefix.encode("ascii") + idna.encode(name) 

212 return idna.encode(name) 

213 except idna.core.IDNAError: 

214 return None 

215 

216 # Don't send IPv6 addresses through the IDNA encoder. 

217 if ":" in name: 

218 return name 

219 

220 encoded_name = idna_encode(name) 

221 if encoded_name is None: 

222 return None 

223 return encoded_name.decode("utf-8") 

224 

225 

226def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]: 

227 """ 

228 Given an PyOpenSSL certificate, provides all the subject alternative names. 

229 """ 

230 cert = peer_cert.to_cryptography() 

231 

232 # We want to find the SAN extension. Ask Cryptography to locate it (it's 

233 # faster than looping in Python) 

234 try: 

235 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value 

236 except x509.ExtensionNotFound: 

237 # No such extension, return the empty list. 

238 return [] 

239 except ( 

240 x509.DuplicateExtension, 

241 UnsupportedExtension, 

242 x509.UnsupportedGeneralNameType, 

243 UnicodeError, 

244 ) as e: 

245 # A problem has been found with the quality of the certificate. Assume 

246 # no SAN field is present. 

247 log.warning( 

248 "A problem was encountered with the certificate that prevented " 

249 "urllib3 from finding the SubjectAlternativeName field. This can " 

250 "affect certificate validation. The error was %s", 

251 e, 

252 ) 

253 return [] 

254 

255 # We want to return dNSName and iPAddress fields. We need to cast the IPs 

256 # back to strings because the match_hostname function wants them as 

257 # strings. 

258 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8 

259 # decoded. This is pretty frustrating, but that's what the standard library 

260 # does with certificates, and so we need to attempt to do the same. 

261 # We also want to skip over names which cannot be idna encoded. 

262 names = [ 

263 ("DNS", name) 

264 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName)) 

265 if name is not None 

266 ] 

267 names.extend( 

268 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress) 

269 ) 

270 

271 return names 

272 

273 

274class WrappedSocket: 

275 """API-compatibility wrapper for Python OpenSSL's Connection-class.""" 

276 

277 def __init__( 

278 self, 

279 connection: OpenSSL.SSL.Connection, 

280 socket: socket_cls, 

281 suppress_ragged_eofs: bool = True, 

282 ) -> None: 

283 self.connection = connection 

284 self.socket = socket 

285 self.suppress_ragged_eofs = suppress_ragged_eofs 

286 self._io_refs = 0 

287 self._closed = False 

288 

289 def fileno(self) -> int: 

290 return self.socket.fileno() 

291 

292 # Copy-pasted from Python 3.5 source code 

293 def _decref_socketios(self) -> None: 

294 if self._io_refs > 0: 

295 self._io_refs -= 1 

296 if self._closed: 

297 self.close() 

298 

299 def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes: 

300 try: 

301 data = self.connection.recv(*args, **kwargs) 

302 except OpenSSL.SSL.SysCallError as e: 

303 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 

304 return b"" 

305 else: 

306 raise OSError(e.args[0], str(e)) from e 

307 except OpenSSL.SSL.ZeroReturnError: 

308 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 

309 return b"" 

310 else: 

311 raise 

312 except OpenSSL.SSL.WantReadError as e: 

313 if not util.wait_for_read(self.socket, self.socket.gettimeout()): 

314 raise timeout("The read operation timed out") from e 

315 else: 

316 return self.recv(*args, **kwargs) 

317 

318 # TLS 1.3 post-handshake authentication 

319 except OpenSSL.SSL.Error as e: 

320 raise ssl.SSLError(f"read error: {e!r}") from e 

321 else: 

322 return data # type: ignore[no-any-return] 

323 

324 def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int: 

325 try: 

326 return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return] 

327 except OpenSSL.SSL.SysCallError as e: 

328 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 

329 return 0 

330 else: 

331 raise OSError(e.args[0], str(e)) from e 

332 except OpenSSL.SSL.ZeroReturnError: 

333 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 

334 return 0 

335 else: 

336 raise 

337 except OpenSSL.SSL.WantReadError as e: 

338 if not util.wait_for_read(self.socket, self.socket.gettimeout()): 

339 raise timeout("The read operation timed out") from e 

340 else: 

341 return self.recv_into(*args, **kwargs) 

342 

343 # TLS 1.3 post-handshake authentication 

344 except OpenSSL.SSL.Error as e: 

345 raise ssl.SSLError(f"read error: {e!r}") from e 

346 

347 def settimeout(self, timeout: float) -> None: 

348 return self.socket.settimeout(timeout) 

349 

350 def _send_until_done(self, data: bytes) -> int: 

351 while True: 

352 try: 

353 return self.connection.send(data) # type: ignore[no-any-return] 

354 except OpenSSL.SSL.WantWriteError as e: 

355 if not util.wait_for_write(self.socket, self.socket.gettimeout()): 

356 raise timeout() from e 

357 continue 

358 except OpenSSL.SSL.SysCallError as e: 

359 raise OSError(e.args[0], str(e)) from e 

360 

361 def sendall(self, data: bytes) -> None: 

362 total_sent = 0 

363 while total_sent < len(data): 

364 sent = self._send_until_done( 

365 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE] 

366 ) 

367 total_sent += sent 

368 

369 def shutdown(self) -> None: 

370 # FIXME rethrow compatible exceptions should we ever use this 

371 self.connection.shutdown() 

372 

373 def close(self) -> None: 

374 self._closed = True 

375 if self._io_refs <= 0: 

376 self._real_close() 

377 

378 def _real_close(self) -> None: 

379 try: 

380 return self.connection.close() # type: ignore[no-any-return] 

381 except OpenSSL.SSL.Error: 

382 return 

383 

384 def getpeercert( 

385 self, binary_form: bool = False 

386 ) -> dict[str, list[typing.Any]] | None: 

387 x509 = self.connection.get_peer_certificate() 

388 

389 if not x509: 

390 return x509 # type: ignore[no-any-return] 

391 

392 if binary_form: 

393 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return] 

394 

395 return { 

396 "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item] 

397 "subjectAltName": get_subj_alt_name(x509), 

398 } 

399 

400 def version(self) -> str: 

401 return self.connection.get_protocol_version_name() # type: ignore[no-any-return] 

402 

403 def selected_alpn_protocol(self) -> str | None: 

404 alpn_proto = self.connection.get_alpn_proto_negotiated() 

405 return alpn_proto.decode() if alpn_proto else None 

406 

407 

408WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined] 

409 

410 

411class PyOpenSSLContext: 

412 """ 

413 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible 

414 for translating the interface of the standard library ``SSLContext`` object 

415 to calls into PyOpenSSL. 

416 """ 

417 

418 def __init__(self, protocol: int) -> None: 

419 self.protocol = _openssl_versions[protocol] 

420 self._ctx = OpenSSL.SSL.Context(self.protocol) 

421 self._options = 0 

422 self.check_hostname = False 

423 self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED 

424 self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED 

425 

426 @property 

427 def options(self) -> int: 

428 return self._options 

429 

430 @options.setter 

431 def options(self, value: int) -> None: 

432 self._options = value 

433 self._set_ctx_options() 

434 

435 @property 

436 def verify_mode(self) -> int: 

437 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()] 

438 

439 @verify_mode.setter 

440 def verify_mode(self, value: ssl.VerifyMode) -> None: 

441 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback) 

442 

443 def set_default_verify_paths(self) -> None: 

444 self._ctx.set_default_verify_paths() 

445 

446 def set_ciphers(self, ciphers: bytes | str) -> None: 

447 if isinstance(ciphers, str): 

448 ciphers = ciphers.encode("utf-8") 

449 self._ctx.set_cipher_list(ciphers) 

450 

451 def load_verify_locations( 

452 self, 

453 cafile: str | None = None, 

454 capath: str | None = None, 

455 cadata: bytes | None = None, 

456 ) -> None: 

457 if cafile is not None: 

458 cafile = cafile.encode("utf-8") # type: ignore[assignment] 

459 if capath is not None: 

460 capath = capath.encode("utf-8") # type: ignore[assignment] 

461 try: 

462 self._ctx.load_verify_locations(cafile, capath) 

463 if cadata is not None: 

464 self._ctx.load_verify_locations(BytesIO(cadata)) 

465 except OpenSSL.SSL.Error as e: 

466 raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e 

467 

468 def load_cert_chain( 

469 self, 

470 certfile: str, 

471 keyfile: str | None = None, 

472 password: str | None = None, 

473 ) -> None: 

474 try: 

475 self._ctx.use_certificate_chain_file(certfile) 

476 if password is not None: 

477 if not isinstance(password, bytes): 

478 password = password.encode("utf-8") # type: ignore[assignment] 

479 self._ctx.set_passwd_cb(lambda *_: password) 

480 self._ctx.use_privatekey_file(keyfile or certfile) 

481 except OpenSSL.SSL.Error as e: 

482 raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e 

483 

484 def set_alpn_protocols(self, protocols: list[bytes | str]) -> None: 

485 protocols = [util.util.to_bytes(p, "ascii") for p in protocols] 

486 return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return] 

487 

488 def wrap_socket( 

489 self, 

490 sock: socket_cls, 

491 server_side: bool = False, 

492 do_handshake_on_connect: bool = True, 

493 suppress_ragged_eofs: bool = True, 

494 server_hostname: bytes | str | None = None, 

495 ) -> WrappedSocket: 

496 cnx = OpenSSL.SSL.Connection(self._ctx, sock) 

497 

498 # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3 

499 if server_hostname and not util.ssl_.is_ipaddress(server_hostname): 

500 if isinstance(server_hostname, str): 

501 server_hostname = server_hostname.encode("utf-8") 

502 cnx.set_tlsext_host_name(server_hostname) 

503 

504 cnx.set_connect_state() 

505 

506 while True: 

507 try: 

508 cnx.do_handshake() 

509 except OpenSSL.SSL.WantReadError as e: 

510 if not util.wait_for_read(sock, sock.gettimeout()): 

511 raise timeout("select timed out") from e 

512 continue 

513 except OpenSSL.SSL.Error as e: 

514 raise ssl.SSLError(f"bad handshake: {e!r}") from e 

515 break 

516 

517 return WrappedSocket(cnx, sock) 

518 

519 def _set_ctx_options(self) -> None: 

520 self._ctx.set_options( 

521 self._options 

522 | _openssl_to_ssl_minimum_version[self._minimum_version] 

523 | _openssl_to_ssl_maximum_version[self._maximum_version] 

524 ) 

525 

526 @property 

527 def minimum_version(self) -> int: 

528 return self._minimum_version 

529 

530 @minimum_version.setter 

531 def minimum_version(self, minimum_version: int) -> None: 

532 self._minimum_version = minimum_version 

533 self._set_ctx_options() 

534 

535 @property 

536 def maximum_version(self) -> int: 

537 return self._maximum_version 

538 

539 @maximum_version.setter 

540 def maximum_version(self, maximum_version: int) -> None: 

541 self._maximum_version = maximum_version 

542 self._set_ctx_options() 

543 

544 

545def _verify_callback( 

546 cnx: OpenSSL.SSL.Connection, 

547 x509: X509, 

548 err_no: int, 

549 err_depth: int, 

550 return_code: int, 

551) -> bool: 

552 return err_no == 0