Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/SSL.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1203 statements  

1from __future__ import annotations 

2 

3import os 

4import socket 

5import typing 

6import warnings 

7from collections.abc import Sequence 

8from errno import errorcode 

9from functools import partial, wraps 

10from itertools import chain, count 

11from sys import platform 

12from typing import Any, Callable, Optional, TypeVar 

13from weakref import WeakValueDictionary 

14 

15from cryptography import x509 

16from cryptography.hazmat.primitives.asymmetric import ec 

17 

18from OpenSSL._util import ( 

19 StrOrBytesPath as _StrOrBytesPath, 

20) 

21from OpenSSL._util import ( 

22 exception_from_error_queue as _exception_from_error_queue, 

23) 

24from OpenSSL._util import ( 

25 ffi as _ffi, 

26) 

27from OpenSSL._util import ( 

28 lib as _lib, 

29) 

30from OpenSSL._util import ( 

31 make_assert as _make_assert, 

32) 

33from OpenSSL._util import ( 

34 no_zero_allocator as _no_zero_allocator, 

35) 

36from OpenSSL._util import ( 

37 path_bytes as _path_bytes, 

38) 

39from OpenSSL._util import ( 

40 text_to_bytes_and_warn as _text_to_bytes_and_warn, 

41) 

42from OpenSSL.crypto import ( 

43 FILETYPE_PEM, 

44 X509, 

45 PKey, 

46 X509Name, 

47 X509Store, 

48 _EllipticCurve, 

49 _PassphraseHelper, 

50 _PrivateKey, 

51) 

52 

53__all__ = [ 

54 "DTLS_CLIENT_METHOD", 

55 "DTLS_METHOD", 

56 "DTLS_SERVER_METHOD", 

57 "MODE_RELEASE_BUFFERS", 

58 "NO_OVERLAPPING_PROTOCOLS", 

59 "OPENSSL_BUILT_ON", 

60 "OPENSSL_CFLAGS", 

61 "OPENSSL_DIR", 

62 "OPENSSL_PLATFORM", 

63 "OPENSSL_VERSION", 

64 "OPENSSL_VERSION_NUMBER", 

65 "OP_ALL", 

66 "OP_CIPHER_SERVER_PREFERENCE", 

67 "OP_COOKIE_EXCHANGE", 

68 "OP_DONT_INSERT_EMPTY_FRAGMENTS", 

69 "OP_EPHEMERAL_RSA", 

70 "OP_MICROSOFT_BIG_SSLV3_BUFFER", 

71 "OP_MICROSOFT_SESS_ID_BUG", 

72 "OP_MSIE_SSLV2_RSA_PADDING", 

73 "OP_NETSCAPE_CA_DN_BUG", 

74 "OP_NETSCAPE_CHALLENGE_BUG", 

75 "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG", 

76 "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG", 

77 "OP_NO_COMPRESSION", 

78 "OP_NO_QUERY_MTU", 

79 "OP_NO_TICKET", 

80 "OP_PKCS1_CHECK_1", 

81 "OP_PKCS1_CHECK_2", 

82 "OP_SINGLE_DH_USE", 

83 "OP_SINGLE_ECDH_USE", 

84 "OP_SSLEAY_080_CLIENT_DH_BUG", 

85 "OP_SSLREF2_REUSE_CERT_TYPE_BUG", 

86 "OP_TLS_BLOCK_PADDING_BUG", 

87 "OP_TLS_D5_BUG", 

88 "OP_TLS_ROLLBACK_BUG", 

89 "RECEIVED_SHUTDOWN", 

90 "SENT_SHUTDOWN", 

91 "SESS_CACHE_BOTH", 

92 "SESS_CACHE_CLIENT", 

93 "SESS_CACHE_NO_AUTO_CLEAR", 

94 "SESS_CACHE_NO_INTERNAL", 

95 "SESS_CACHE_NO_INTERNAL_LOOKUP", 

96 "SESS_CACHE_NO_INTERNAL_STORE", 

97 "SESS_CACHE_OFF", 

98 "SESS_CACHE_SERVER", 

99 "SSL3_VERSION", 

100 "SSLEAY_BUILT_ON", 

101 "SSLEAY_CFLAGS", 

102 "SSLEAY_DIR", 

103 "SSLEAY_PLATFORM", 

104 "SSLEAY_VERSION", 

105 "SSL_CB_ACCEPT_EXIT", 

106 "SSL_CB_ACCEPT_LOOP", 

107 "SSL_CB_ALERT", 

108 "SSL_CB_CONNECT_EXIT", 

109 "SSL_CB_CONNECT_LOOP", 

110 "SSL_CB_EXIT", 

111 "SSL_CB_HANDSHAKE_DONE", 

112 "SSL_CB_HANDSHAKE_START", 

113 "SSL_CB_LOOP", 

114 "SSL_CB_READ", 

115 "SSL_CB_READ_ALERT", 

116 "SSL_CB_WRITE", 

117 "SSL_CB_WRITE_ALERT", 

118 "SSL_ST_ACCEPT", 

119 "SSL_ST_CONNECT", 

120 "SSL_ST_MASK", 

121 "TLS1_1_VERSION", 

122 "TLS1_2_VERSION", 

123 "TLS1_3_VERSION", 

124 "TLS1_VERSION", 

125 "TLS_CLIENT_METHOD", 

126 "TLS_METHOD", 

127 "TLS_SERVER_METHOD", 

128 "VERIFY_CLIENT_ONCE", 

129 "VERIFY_FAIL_IF_NO_PEER_CERT", 

130 "VERIFY_NONE", 

131 "VERIFY_PEER", 

132 "Connection", 

133 "Context", 

134 "Error", 

135 "OP_NO_SSLv2", 

136 "OP_NO_SSLv3", 

137 "OP_NO_TLSv1", 

138 "OP_NO_TLSv1_1", 

139 "OP_NO_TLSv1_2", 

140 "OP_NO_TLSv1_3", 

141 "SSLeay_version", 

142 "SSLv23_METHOD", 

143 "Session", 

144 "SysCallError", 

145 "TLSv1_1_METHOD", 

146 "TLSv1_2_METHOD", 

147 "TLSv1_METHOD", 

148 "WantReadError", 

149 "WantWriteError", 

150 "WantX509LookupError", 

151 "X509VerificationCodes", 

152 "ZeroReturnError", 

153] 

154 

155 

156OPENSSL_VERSION_NUMBER: int = _lib.OPENSSL_VERSION_NUMBER 

157OPENSSL_VERSION: int = _lib.OPENSSL_VERSION 

158OPENSSL_CFLAGS: int = _lib.OPENSSL_CFLAGS 

159OPENSSL_PLATFORM: int = _lib.OPENSSL_PLATFORM 

160OPENSSL_DIR: int = _lib.OPENSSL_DIR 

161OPENSSL_BUILT_ON: int = _lib.OPENSSL_BUILT_ON 

162 

163SSLEAY_VERSION = OPENSSL_VERSION 

164SSLEAY_CFLAGS = OPENSSL_CFLAGS 

165SSLEAY_PLATFORM = OPENSSL_PLATFORM 

166SSLEAY_DIR = OPENSSL_DIR 

167SSLEAY_BUILT_ON = OPENSSL_BUILT_ON 

168 

169SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN 

170RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN 

171 

172SSLv23_METHOD = 3 

173TLSv1_METHOD = 4 

174TLSv1_1_METHOD = 5 

175TLSv1_2_METHOD = 6 

176TLS_METHOD = 7 

177TLS_SERVER_METHOD = 8 

178TLS_CLIENT_METHOD = 9 

179DTLS_METHOD = 10 

180DTLS_SERVER_METHOD = 11 

181DTLS_CLIENT_METHOD = 12 

182 

183SSL3_VERSION: int = _lib.SSL3_VERSION 

184TLS1_VERSION: int = _lib.TLS1_VERSION 

185TLS1_1_VERSION: int = _lib.TLS1_1_VERSION 

186TLS1_2_VERSION: int = _lib.TLS1_2_VERSION 

187TLS1_3_VERSION: int = _lib.TLS1_3_VERSION 

188 

189OP_NO_SSLv2: int = _lib.SSL_OP_NO_SSLv2 

190OP_NO_SSLv3: int = _lib.SSL_OP_NO_SSLv3 

191OP_NO_TLSv1: int = _lib.SSL_OP_NO_TLSv1 

192OP_NO_TLSv1_1: int = _lib.SSL_OP_NO_TLSv1_1 

193OP_NO_TLSv1_2: int = _lib.SSL_OP_NO_TLSv1_2 

194OP_NO_TLSv1_3: int = _lib.SSL_OP_NO_TLSv1_3 

195 

196MODE_RELEASE_BUFFERS: int = _lib.SSL_MODE_RELEASE_BUFFERS 

197 

198OP_SINGLE_DH_USE: int = _lib.SSL_OP_SINGLE_DH_USE 

199OP_SINGLE_ECDH_USE: int = _lib.SSL_OP_SINGLE_ECDH_USE 

200OP_EPHEMERAL_RSA: int = _lib.SSL_OP_EPHEMERAL_RSA 

201OP_MICROSOFT_SESS_ID_BUG: int = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG 

202OP_NETSCAPE_CHALLENGE_BUG: int = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG 

203OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG: int = ( 

204 _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG 

205) 

206OP_SSLREF2_REUSE_CERT_TYPE_BUG: int = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG 

207OP_MICROSOFT_BIG_SSLV3_BUFFER: int = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER 

208OP_MSIE_SSLV2_RSA_PADDING: int = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING 

209OP_SSLEAY_080_CLIENT_DH_BUG: int = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG 

210OP_TLS_D5_BUG: int = _lib.SSL_OP_TLS_D5_BUG 

211OP_TLS_BLOCK_PADDING_BUG: int = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG 

212OP_DONT_INSERT_EMPTY_FRAGMENTS: int = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS 

213OP_CIPHER_SERVER_PREFERENCE: int = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE 

214OP_TLS_ROLLBACK_BUG: int = _lib.SSL_OP_TLS_ROLLBACK_BUG 

215OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1 

216OP_PKCS1_CHECK_2: int = _lib.SSL_OP_PKCS1_CHECK_2 

217OP_NETSCAPE_CA_DN_BUG: int = _lib.SSL_OP_NETSCAPE_CA_DN_BUG 

218OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG: int = ( 

219 _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG 

220) 

221OP_NO_COMPRESSION: int = _lib.SSL_OP_NO_COMPRESSION 

222 

223OP_NO_QUERY_MTU: int = _lib.SSL_OP_NO_QUERY_MTU 

224OP_COOKIE_EXCHANGE: int = _lib.SSL_OP_COOKIE_EXCHANGE 

225OP_NO_TICKET: int = _lib.SSL_OP_NO_TICKET 

226 

227try: 

228 OP_NO_RENEGOTIATION: int = _lib.SSL_OP_NO_RENEGOTIATION 

229 __all__.append("OP_NO_RENEGOTIATION") 

230except AttributeError: 

231 pass 

232 

233try: 

234 OP_IGNORE_UNEXPECTED_EOF: int = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF 

235 __all__.append("OP_IGNORE_UNEXPECTED_EOF") 

236except AttributeError: 

237 pass 

238 

239try: 

240 OP_LEGACY_SERVER_CONNECT: int = _lib.SSL_OP_LEGACY_SERVER_CONNECT 

241 __all__.append("OP_LEGACY_SERVER_CONNECT") 

242except AttributeError: 

243 pass 

244 

245OP_ALL: int = _lib.SSL_OP_ALL 

246 

247VERIFY_PEER: int = _lib.SSL_VERIFY_PEER 

248VERIFY_FAIL_IF_NO_PEER_CERT: int = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT 

249VERIFY_CLIENT_ONCE: int = _lib.SSL_VERIFY_CLIENT_ONCE 

250VERIFY_NONE: int = _lib.SSL_VERIFY_NONE 

251 

252SESS_CACHE_OFF: int = _lib.SSL_SESS_CACHE_OFF 

253SESS_CACHE_CLIENT: int = _lib.SSL_SESS_CACHE_CLIENT 

254SESS_CACHE_SERVER: int = _lib.SSL_SESS_CACHE_SERVER 

255SESS_CACHE_BOTH: int = _lib.SSL_SESS_CACHE_BOTH 

256SESS_CACHE_NO_AUTO_CLEAR: int = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR 

257SESS_CACHE_NO_INTERNAL_LOOKUP: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP 

258SESS_CACHE_NO_INTERNAL_STORE: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE 

259SESS_CACHE_NO_INTERNAL: int = _lib.SSL_SESS_CACHE_NO_INTERNAL 

260 

261SSL_ST_CONNECT: int = _lib.SSL_ST_CONNECT 

262SSL_ST_ACCEPT: int = _lib.SSL_ST_ACCEPT 

263SSL_ST_MASK: int = _lib.SSL_ST_MASK 

264 

265SSL_CB_LOOP: int = _lib.SSL_CB_LOOP 

266SSL_CB_EXIT: int = _lib.SSL_CB_EXIT 

267SSL_CB_READ: int = _lib.SSL_CB_READ 

268SSL_CB_WRITE: int = _lib.SSL_CB_WRITE 

269SSL_CB_ALERT: int = _lib.SSL_CB_ALERT 

270SSL_CB_READ_ALERT: int = _lib.SSL_CB_READ_ALERT 

271SSL_CB_WRITE_ALERT: int = _lib.SSL_CB_WRITE_ALERT 

272SSL_CB_ACCEPT_LOOP: int = _lib.SSL_CB_ACCEPT_LOOP 

273SSL_CB_ACCEPT_EXIT: int = _lib.SSL_CB_ACCEPT_EXIT 

274SSL_CB_CONNECT_LOOP: int = _lib.SSL_CB_CONNECT_LOOP 

275SSL_CB_CONNECT_EXIT: int = _lib.SSL_CB_CONNECT_EXIT 

276SSL_CB_HANDSHAKE_START: int = _lib.SSL_CB_HANDSHAKE_START 

277SSL_CB_HANDSHAKE_DONE: int = _lib.SSL_CB_HANDSHAKE_DONE 

278 

279_T = TypeVar("_T") 

280 

281 

282class _NoOverlappingProtocols: 

283 pass 

284 

285 

286NO_OVERLAPPING_PROTOCOLS = _NoOverlappingProtocols() 

287 

288# Callback types. 

289_ALPNSelectCallback = Callable[ 

290 [ 

291 "Connection", 

292 typing.List[bytes], 

293 ], 

294 typing.Union[bytes, _NoOverlappingProtocols], 

295] 

296_CookieGenerateCallback = Callable[["Connection"], bytes] 

297_CookieVerifyCallback = Callable[["Connection", bytes], bool] 

298_OCSPClientCallback = Callable[["Connection", bytes, Optional[_T]], bool] 

299_OCSPServerCallback = Callable[["Connection", Optional[_T]], bytes] 

300_PassphraseCallback = Callable[[int, bool, Optional[_T]], bytes] 

301_VerifyCallback = Callable[["Connection", X509, int, int, int], bool] 

302 

303 

304class X509VerificationCodes: 

305 """ 

306 Success and error codes for X509 verification, as returned by the 

307 underlying ``X509_STORE_CTX_get_error()`` function and passed by pyOpenSSL 

308 to verification callback functions. 

309 

310 See `OpenSSL Verification Errors 

311 <https://www.openssl.org/docs/manmaster/man3/X509_verify_cert_error_string.html#ERROR-CODES>`_ 

312 for details. 

313 """ 

314 

315 OK = _lib.X509_V_OK 

316 ERR_UNABLE_TO_GET_ISSUER_CERT = _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT 

317 ERR_UNABLE_TO_GET_CRL = _lib.X509_V_ERR_UNABLE_TO_GET_CRL 

318 ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE = ( 

319 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE 

320 ) 

321 ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE = ( 

322 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE 

323 ) 

324 ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY = ( 

325 _lib.X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY 

326 ) 

327 ERR_CERT_SIGNATURE_FAILURE = _lib.X509_V_ERR_CERT_SIGNATURE_FAILURE 

328 ERR_CRL_SIGNATURE_FAILURE = _lib.X509_V_ERR_CRL_SIGNATURE_FAILURE 

329 ERR_CERT_NOT_YET_VALID = _lib.X509_V_ERR_CERT_NOT_YET_VALID 

330 ERR_CERT_HAS_EXPIRED = _lib.X509_V_ERR_CERT_HAS_EXPIRED 

331 ERR_CRL_NOT_YET_VALID = _lib.X509_V_ERR_CRL_NOT_YET_VALID 

332 ERR_CRL_HAS_EXPIRED = _lib.X509_V_ERR_CRL_HAS_EXPIRED 

333 ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD = ( 

334 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD 

335 ) 

336 ERR_ERROR_IN_CERT_NOT_AFTER_FIELD = ( 

337 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD 

338 ) 

339 ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD = ( 

340 _lib.X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD 

341 ) 

342 ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD = ( 

343 _lib.X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD 

344 ) 

345 ERR_OUT_OF_MEM = _lib.X509_V_ERR_OUT_OF_MEM 

346 ERR_DEPTH_ZERO_SELF_SIGNED_CERT = ( 

347 _lib.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT 

348 ) 

349 ERR_SELF_SIGNED_CERT_IN_CHAIN = _lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN 

350 ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY = ( 

351 _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY 

352 ) 

353 ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE = ( 

354 _lib.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE 

355 ) 

356 ERR_CERT_CHAIN_TOO_LONG = _lib.X509_V_ERR_CERT_CHAIN_TOO_LONG 

357 ERR_CERT_REVOKED = _lib.X509_V_ERR_CERT_REVOKED 

358 ERR_INVALID_CA = _lib.X509_V_ERR_INVALID_CA 

359 ERR_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PATH_LENGTH_EXCEEDED 

360 ERR_INVALID_PURPOSE = _lib.X509_V_ERR_INVALID_PURPOSE 

361 ERR_CERT_UNTRUSTED = _lib.X509_V_ERR_CERT_UNTRUSTED 

362 ERR_CERT_REJECTED = _lib.X509_V_ERR_CERT_REJECTED 

363 ERR_SUBJECT_ISSUER_MISMATCH = _lib.X509_V_ERR_SUBJECT_ISSUER_MISMATCH 

364 ERR_AKID_SKID_MISMATCH = _lib.X509_V_ERR_AKID_SKID_MISMATCH 

365 ERR_AKID_ISSUER_SERIAL_MISMATCH = ( 

366 _lib.X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH 

367 ) 

368 ERR_KEYUSAGE_NO_CERTSIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CERTSIGN 

369 ERR_UNABLE_TO_GET_CRL_ISSUER = _lib.X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER 

370 ERR_UNHANDLED_CRITICAL_EXTENSION = ( 

371 _lib.X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION 

372 ) 

373 ERR_KEYUSAGE_NO_CRL_SIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CRL_SIGN 

374 ERR_UNHANDLED_CRITICAL_CRL_EXTENSION = ( 

375 _lib.X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION 

376 ) 

377 ERR_INVALID_NON_CA = _lib.X509_V_ERR_INVALID_NON_CA 

378 ERR_PROXY_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED 

379 ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE = ( 

380 _lib.X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE 

381 ) 

382 ERR_PROXY_CERTIFICATES_NOT_ALLOWED = ( 

383 _lib.X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED 

384 ) 

385 ERR_INVALID_EXTENSION = _lib.X509_V_ERR_INVALID_EXTENSION 

386 ERR_INVALID_POLICY_EXTENSION = _lib.X509_V_ERR_INVALID_POLICY_EXTENSION 

387 ERR_NO_EXPLICIT_POLICY = _lib.X509_V_ERR_NO_EXPLICIT_POLICY 

388 ERR_DIFFERENT_CRL_SCOPE = _lib.X509_V_ERR_DIFFERENT_CRL_SCOPE 

389 ERR_UNSUPPORTED_EXTENSION_FEATURE = ( 

390 _lib.X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE 

391 ) 

392 ERR_UNNESTED_RESOURCE = _lib.X509_V_ERR_UNNESTED_RESOURCE 

393 ERR_PERMITTED_VIOLATION = _lib.X509_V_ERR_PERMITTED_VIOLATION 

394 ERR_EXCLUDED_VIOLATION = _lib.X509_V_ERR_EXCLUDED_VIOLATION 

395 ERR_SUBTREE_MINMAX = _lib.X509_V_ERR_SUBTREE_MINMAX 

396 ERR_UNSUPPORTED_CONSTRAINT_TYPE = ( 

397 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE 

398 ) 

399 ERR_UNSUPPORTED_CONSTRAINT_SYNTAX = ( 

400 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX 

401 ) 

402 ERR_UNSUPPORTED_NAME_SYNTAX = _lib.X509_V_ERR_UNSUPPORTED_NAME_SYNTAX 

403 ERR_CRL_PATH_VALIDATION_ERROR = _lib.X509_V_ERR_CRL_PATH_VALIDATION_ERROR 

404 ERR_HOSTNAME_MISMATCH = _lib.X509_V_ERR_HOSTNAME_MISMATCH 

405 ERR_EMAIL_MISMATCH = _lib.X509_V_ERR_EMAIL_MISMATCH 

406 ERR_IP_ADDRESS_MISMATCH = _lib.X509_V_ERR_IP_ADDRESS_MISMATCH 

407 ERR_APPLICATION_VERIFICATION = _lib.X509_V_ERR_APPLICATION_VERIFICATION 

408 

409 

410# Taken from https://golang.org/src/crypto/x509/root_linux.go 

411_CERTIFICATE_FILE_LOCATIONS = [ 

412 "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc. 

413 "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6 

414 "/etc/ssl/ca-bundle.pem", # OpenSUSE 

415 "/etc/pki/tls/cacert.pem", # OpenELEC 

416 "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7 

417] 

418 

419_CERTIFICATE_PATH_LOCATIONS = [ 

420 "/etc/ssl/certs", # SLES10/SLES11 

421] 

422 

423# These values are compared to output from cffi's ffi.string so they must be 

424# byte strings. 

425_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs" 

426_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem" 

427 

428 

429class Error(Exception): 

430 """ 

431 An error occurred in an `OpenSSL.SSL` API. 

432 """ 

433 

434 

435_raise_current_error = partial(_exception_from_error_queue, Error) 

436_openssl_assert = _make_assert(Error) 

437 

438 

439class WantReadError(Error): 

440 pass 

441 

442 

443class WantWriteError(Error): 

444 pass 

445 

446 

447class WantX509LookupError(Error): 

448 pass 

449 

450 

451class ZeroReturnError(Error): 

452 pass 

453 

454 

455class SysCallError(Error): 

456 pass 

457 

458 

459class _CallbackExceptionHelper: 

460 """ 

461 A base class for wrapper classes that allow for intelligent exception 

462 handling in OpenSSL callbacks. 

463 

464 :ivar list _problems: Any exceptions that occurred while executing in a 

465 context where they could not be raised in the normal way. Typically 

466 this is because OpenSSL has called into some Python code and requires a 

467 return value. The exceptions are saved to be raised later when it is 

468 possible to do so. 

469 """ 

470 

471 def __init__(self) -> None: 

472 self._problems: list[Exception] = [] 

473 

474 def raise_if_problem(self) -> None: 

475 """ 

476 Raise an exception from the OpenSSL error queue or that was previously 

477 captured whe running a callback. 

478 """ 

479 if self._problems: 

480 try: 

481 _raise_current_error() 

482 except Error: 

483 pass 

484 raise self._problems.pop(0) 

485 

486 

487class _VerifyHelper(_CallbackExceptionHelper): 

488 """ 

489 Wrap a callback such that it can be used as a certificate verification 

490 callback. 

491 """ 

492 

493 def __init__(self, callback: _VerifyCallback) -> None: 

494 _CallbackExceptionHelper.__init__(self) 

495 

496 @wraps(callback) 

497 def wrapper(ok, store_ctx): # type: ignore[no-untyped-def] 

498 x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx) 

499 _lib.X509_up_ref(x509) 

500 cert = X509._from_raw_x509_ptr(x509) 

501 error_number = _lib.X509_STORE_CTX_get_error(store_ctx) 

502 error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx) 

503 

504 index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx() 

505 ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index) 

506 connection = Connection._reverse_mapping[ssl] 

507 

508 try: 

509 result = callback( 

510 connection, cert, error_number, error_depth, ok 

511 ) 

512 except Exception as e: 

513 self._problems.append(e) 

514 return 0 

515 else: 

516 if result: 

517 _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK) 

518 return 1 

519 else: 

520 return 0 

521 

522 self.callback = _ffi.callback( 

523 "int (*)(int, X509_STORE_CTX *)", wrapper 

524 ) 

525 

526 

527class _ALPNSelectHelper(_CallbackExceptionHelper): 

528 """ 

529 Wrap a callback such that it can be used as an ALPN selection callback. 

530 """ 

531 

532 def __init__(self, callback: _ALPNSelectCallback) -> None: 

533 _CallbackExceptionHelper.__init__(self) 

534 

535 @wraps(callback) 

536 def wrapper(ssl, out, outlen, in_, inlen, arg): # type: ignore[no-untyped-def] 

537 try: 

538 conn = Connection._reverse_mapping[ssl] 

539 

540 # The string passed to us is made up of multiple 

541 # length-prefixed bytestrings. We need to split that into a 

542 # list. 

543 instr = _ffi.buffer(in_, inlen)[:] 

544 protolist = [] 

545 while instr: 

546 encoded_len = instr[0] 

547 proto = instr[1 : encoded_len + 1] 

548 protolist.append(proto) 

549 instr = instr[encoded_len + 1 :] 

550 

551 # Call the callback 

552 outbytes = callback(conn, protolist) 

553 any_accepted = True 

554 if outbytes is NO_OVERLAPPING_PROTOCOLS: 

555 outbytes = b"" 

556 any_accepted = False 

557 elif not isinstance(outbytes, bytes): 

558 raise TypeError( 

559 "ALPN callback must return a bytestring or the " 

560 "special NO_OVERLAPPING_PROTOCOLS sentinel value." 

561 ) 

562 

563 # Save our callback arguments on the connection object to make 

564 # sure that they don't get freed before OpenSSL can use them. 

565 # Then, return them in the appropriate output parameters. 

566 conn._alpn_select_callback_args = [ 

567 _ffi.new("unsigned char *", len(outbytes)), 

568 _ffi.new("unsigned char[]", outbytes), 

569 ] 

570 outlen[0] = conn._alpn_select_callback_args[0][0] 

571 out[0] = conn._alpn_select_callback_args[1] 

572 if not any_accepted: 

573 return _lib.SSL_TLSEXT_ERR_NOACK 

574 return _lib.SSL_TLSEXT_ERR_OK 

575 except Exception as e: 

576 self._problems.append(e) 

577 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL 

578 

579 self.callback = _ffi.callback( 

580 ( 

581 "int (*)(SSL *, unsigned char **, unsigned char *, " 

582 "const unsigned char *, unsigned int, void *)" 

583 ), 

584 wrapper, 

585 ) 

586 

587 

588class _OCSPServerCallbackHelper(_CallbackExceptionHelper): 

589 """ 

590 Wrap a callback such that it can be used as an OCSP callback for the server 

591 side. 

592 

593 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different 

594 ways. For servers, that callback is expected to retrieve some OCSP data and 

595 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, 

596 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback 

597 is expected to check the OCSP data, and returns a negative value on error, 

598 0 if the response is not acceptable, or positive if it is. These are 

599 mutually exclusive return code behaviours, and they mean that we need two 

600 helpers so that we always return an appropriate error code if the user's 

601 code throws an exception. 

602 

603 Given that we have to have two helpers anyway, these helpers are a bit more 

604 helpery than most: specifically, they hide a few more of the OpenSSL 

605 functions so that the user has an easier time writing these callbacks. 

606 

607 This helper implements the server side. 

608 """ 

609 

610 def __init__(self, callback: _OCSPServerCallback[Any]) -> None: 

611 _CallbackExceptionHelper.__init__(self) 

612 

613 @wraps(callback) 

614 def wrapper(ssl, cdata): # type: ignore[no-untyped-def] 

615 try: 

616 conn = Connection._reverse_mapping[ssl] 

617 

618 # Extract the data if any was provided. 

619 if cdata != _ffi.NULL: 

620 data = _ffi.from_handle(cdata) 

621 else: 

622 data = None 

623 

624 # Call the callback. 

625 ocsp_data = callback(conn, data) 

626 

627 if not isinstance(ocsp_data, bytes): 

628 raise TypeError("OCSP callback must return a bytestring.") 

629 

630 # If the OCSP data was provided, we will pass it to OpenSSL. 

631 # However, we have an early exit here: if no OCSP data was 

632 # provided we will just exit out and tell OpenSSL that there 

633 # is nothing to do. 

634 if not ocsp_data: 

635 return 3 # SSL_TLSEXT_ERR_NOACK 

636 

637 # OpenSSL takes ownership of this data and expects it to have 

638 # been allocated by OPENSSL_malloc. 

639 ocsp_data_length = len(ocsp_data) 

640 data_ptr = _lib.OPENSSL_malloc(ocsp_data_length) 

641 _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data 

642 

643 _lib.SSL_set_tlsext_status_ocsp_resp( 

644 ssl, data_ptr, ocsp_data_length 

645 ) 

646 

647 return 0 

648 except Exception as e: 

649 self._problems.append(e) 

650 return 2 # SSL_TLSEXT_ERR_ALERT_FATAL 

651 

652 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) 

653 

654 

655class _OCSPClientCallbackHelper(_CallbackExceptionHelper): 

656 """ 

657 Wrap a callback such that it can be used as an OCSP callback for the client 

658 side. 

659 

660 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different 

661 ways. For servers, that callback is expected to retrieve some OCSP data and 

662 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, 

663 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback 

664 is expected to check the OCSP data, and returns a negative value on error, 

665 0 if the response is not acceptable, or positive if it is. These are 

666 mutually exclusive return code behaviours, and they mean that we need two 

667 helpers so that we always return an appropriate error code if the user's 

668 code throws an exception. 

669 

670 Given that we have to have two helpers anyway, these helpers are a bit more 

671 helpery than most: specifically, they hide a few more of the OpenSSL 

672 functions so that the user has an easier time writing these callbacks. 

673 

674 This helper implements the client side. 

675 """ 

676 

677 def __init__(self, callback: _OCSPClientCallback[Any]) -> None: 

678 _CallbackExceptionHelper.__init__(self) 

679 

680 @wraps(callback) 

681 def wrapper(ssl, cdata): # type: ignore[no-untyped-def] 

682 try: 

683 conn = Connection._reverse_mapping[ssl] 

684 

685 # Extract the data if any was provided. 

686 if cdata != _ffi.NULL: 

687 data = _ffi.from_handle(cdata) 

688 else: 

689 data = None 

690 

691 # Get the OCSP data. 

692 ocsp_ptr = _ffi.new("unsigned char **") 

693 ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr) 

694 if ocsp_len < 0: 

695 # No OCSP data. 

696 ocsp_data = b"" 

697 else: 

698 # Copy the OCSP data, then pass it to the callback. 

699 ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:] 

700 

701 valid = callback(conn, ocsp_data, data) 

702 

703 # Return 1 on success or 0 on error. 

704 return int(bool(valid)) 

705 

706 except Exception as e: 

707 self._problems.append(e) 

708 # Return negative value if an exception is hit. 

709 return -1 

710 

711 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) 

712 

713 

714class _CookieGenerateCallbackHelper(_CallbackExceptionHelper): 

715 def __init__(self, callback: _CookieGenerateCallback) -> None: 

716 _CallbackExceptionHelper.__init__(self) 

717 

718 @wraps(callback) 

719 def wrapper(ssl, out, outlen): # type: ignore[no-untyped-def] 

720 try: 

721 conn = Connection._reverse_mapping[ssl] 

722 cookie = callback(conn) 

723 out[0 : len(cookie)] = cookie 

724 outlen[0] = len(cookie) 

725 return 1 

726 except Exception as e: 

727 self._problems.append(e) 

728 # "a zero return value can be used to abort the handshake" 

729 return 0 

730 

731 self.callback = _ffi.callback( 

732 "int (*)(SSL *, unsigned char *, unsigned int *)", 

733 wrapper, 

734 ) 

735 

736 

737class _CookieVerifyCallbackHelper(_CallbackExceptionHelper): 

738 def __init__(self, callback: _CookieVerifyCallback) -> None: 

739 _CallbackExceptionHelper.__init__(self) 

740 

741 @wraps(callback) 

742 def wrapper(ssl, c_cookie, cookie_len): # type: ignore[no-untyped-def] 

743 try: 

744 conn = Connection._reverse_mapping[ssl] 

745 return callback(conn, bytes(c_cookie[0:cookie_len])) 

746 except Exception as e: 

747 self._problems.append(e) 

748 return 0 

749 

750 self.callback = _ffi.callback( 

751 "int (*)(SSL *, unsigned char *, unsigned int)", 

752 wrapper, 

753 ) 

754 

755 

756def _asFileDescriptor(obj: Any) -> int: 

757 fd = None 

758 if not isinstance(obj, int): 

759 meth = getattr(obj, "fileno", None) 

760 if meth is not None: 

761 obj = meth() 

762 

763 if isinstance(obj, int): 

764 fd = obj 

765 

766 if not isinstance(fd, int): 

767 raise TypeError("argument must be an int, or have a fileno() method.") 

768 elif fd < 0: 

769 raise ValueError( 

770 f"file descriptor cannot be a negative integer ({fd:i})" 

771 ) 

772 

773 return fd 

774 

775 

776def OpenSSL_version(type: int) -> bytes: 

777 """ 

778 Return a string describing the version of OpenSSL in use. 

779 

780 :param type: One of the :const:`OPENSSL_` constants defined in this module. 

781 """ 

782 return _ffi.string(_lib.OpenSSL_version(type)) 

783 

784 

785SSLeay_version = OpenSSL_version 

786 

787 

788def _make_requires(flag: int, error: str) -> Callable[[_T], _T]: 

789 """ 

790 Builds a decorator that ensures that functions that rely on OpenSSL 

791 functions that are not present in this build raise NotImplementedError, 

792 rather than AttributeError coming out of cryptography. 

793 

794 :param flag: A cryptography flag that guards the functions, e.g. 

795 ``Cryptography_HAS_NEXTPROTONEG``. 

796 :param error: The string to be used in the exception if the flag is false. 

797 """ 

798 

799 def _requires_decorator(func): # type: ignore[no-untyped-def] 

800 if not flag: 

801 

802 @wraps(func) 

803 def explode(*args, **kwargs): # type: ignore[no-untyped-def] 

804 raise NotImplementedError(error) 

805 

806 return explode 

807 else: 

808 return func 

809 

810 return _requires_decorator 

811 

812 

813_requires_keylog = _make_requires( 

814 getattr(_lib, "Cryptography_HAS_KEYLOG", 0), "Key logging not available" 

815) 

816 

817 

818class Session: 

819 """ 

820 A class representing an SSL session. A session defines certain connection 

821 parameters which may be re-used to speed up the setup of subsequent 

822 connections. 

823 

824 .. versionadded:: 0.14 

825 """ 

826 

827 _session: Any 

828 

829 

830F = TypeVar("F", bound=Callable[..., Any]) 

831 

832 

833def _require_not_used(f: F) -> F: 

834 @wraps(f) 

835 def inner(self: Context, *args: Any, **kwargs: Any) -> Any: 

836 if self._used: 

837 warnings.warn( 

838 ( 

839 "Attempting to mutate a Context after a Connection was " 

840 "created. In the future, this will raise an exception" 

841 ), 

842 DeprecationWarning, 

843 stacklevel=2, 

844 ) 

845 return f(self, *args, **kwargs) 

846 

847 return typing.cast(F, inner) 

848 

849 

850class Context: 

851 """ 

852 :class:`OpenSSL.SSL.Context` instances define the parameters for setting 

853 up new SSL connections. 

854 

855 :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD, 

856 DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD. 

857 SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should 

858 not be used. 

859 """ 

860 

861 _methods: typing.ClassVar[ 

862 dict[int, tuple[Callable[[], Any], int | None]] 

863 ] = { 

864 SSLv23_METHOD: (_lib.TLS_method, None), 

865 TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION), 

866 TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION), 

867 TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION), 

868 TLS_METHOD: (_lib.TLS_method, None), 

869 TLS_SERVER_METHOD: (_lib.TLS_server_method, None), 

870 TLS_CLIENT_METHOD: (_lib.TLS_client_method, None), 

871 DTLS_METHOD: (_lib.DTLS_method, None), 

872 DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None), 

873 DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None), 

874 } 

875 

876 def __init__(self, method: int) -> None: 

877 if not isinstance(method, int): 

878 raise TypeError("method must be an integer") 

879 

880 try: 

881 method_func, version = self._methods[method] 

882 except KeyError: 

883 raise ValueError("No such protocol") 

884 

885 method_obj = method_func() 

886 _openssl_assert(method_obj != _ffi.NULL) 

887 

888 context = _lib.SSL_CTX_new(method_obj) 

889 _openssl_assert(context != _ffi.NULL) 

890 context = _ffi.gc(context, _lib.SSL_CTX_free) 

891 

892 self._context = context 

893 self._used = False 

894 self._passphrase_helper: _PassphraseHelper | None = None 

895 self._passphrase_callback: _PassphraseCallback[Any] | None = None 

896 self._passphrase_userdata: Any | None = None 

897 self._verify_helper: _VerifyHelper | None = None 

898 self._verify_callback: _VerifyCallback | None = None 

899 self._info_callback = None 

900 self._keylog_callback = None 

901 self._tlsext_servername_callback = None 

902 self._app_data = None 

903 self._alpn_select_helper: _ALPNSelectHelper | None = None 

904 self._alpn_select_callback: _ALPNSelectCallback | None = None 

905 self._ocsp_helper: ( 

906 _OCSPClientCallbackHelper | _OCSPServerCallbackHelper | None 

907 ) = None 

908 self._ocsp_callback: ( 

909 _OCSPClientCallback[Any] | _OCSPServerCallback[Any] | None 

910 ) = None 

911 self._ocsp_data: Any | None = None 

912 self._cookie_generate_helper: _CookieGenerateCallbackHelper | None = ( 

913 None 

914 ) 

915 self._cookie_verify_helper: _CookieVerifyCallbackHelper | None = None 

916 

917 self.set_mode(_lib.SSL_MODE_ENABLE_PARTIAL_WRITE) 

918 if version is not None: 

919 self.set_min_proto_version(version) 

920 self.set_max_proto_version(version) 

921 

922 @_require_not_used 

923 def set_min_proto_version(self, version: int) -> None: 

924 """ 

925 Set the minimum supported protocol version. Setting the minimum 

926 version to 0 will enable protocol versions down to the lowest version 

927 supported by the library. 

928 

929 If the underlying OpenSSL build is missing support for the selected 

930 version, this method will raise an exception. 

931 """ 

932 _openssl_assert( 

933 _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1 

934 ) 

935 

936 @_require_not_used 

937 def set_max_proto_version(self, version: int) -> None: 

938 """ 

939 Set the maximum supported protocol version. Setting the maximum 

940 version to 0 will enable protocol versions up to the highest version 

941 supported by the library. 

942 

943 If the underlying OpenSSL build is missing support for the selected 

944 version, this method will raise an exception. 

945 """ 

946 _openssl_assert( 

947 _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1 

948 ) 

949 

950 @_require_not_used 

951 def load_verify_locations( 

952 self, 

953 cafile: _StrOrBytesPath | None, 

954 capath: _StrOrBytesPath | None = None, 

955 ) -> None: 

956 """ 

957 Let SSL know where we can find trusted certificates for the certificate 

958 chain. Note that the certificates have to be in PEM format. 

959 

960 If capath is passed, it must be a directory prepared using the 

961 ``c_rehash`` tool included with OpenSSL. Either, but not both, of 

962 *pemfile* or *capath* may be :data:`None`. 

963 

964 :param cafile: In which file we can find the certificates (``bytes`` or 

965 ``str``). 

966 :param capath: In which directory we can find the certificates 

967 (``bytes`` or ``str``). 

968 

969 :return: None 

970 """ 

971 if cafile is None: 

972 cafile = _ffi.NULL 

973 else: 

974 cafile = _path_bytes(cafile) 

975 

976 if capath is None: 

977 capath = _ffi.NULL 

978 else: 

979 capath = _path_bytes(capath) 

980 

981 load_result = _lib.SSL_CTX_load_verify_locations( 

982 self._context, cafile, capath 

983 ) 

984 if not load_result: 

985 _raise_current_error() 

986 

987 def _wrap_callback( 

988 self, callback: _PassphraseCallback[_T] 

989 ) -> _PassphraseHelper: 

990 @wraps(callback) 

991 def wrapper(size: int, verify: bool, userdata: Any) -> bytes: 

992 return callback(size, verify, self._passphrase_userdata) 

993 

994 return _PassphraseHelper( 

995 FILETYPE_PEM, wrapper, more_args=True, truncate=True 

996 ) 

997 

998 @_require_not_used 

999 def set_passwd_cb( 

1000 self, 

1001 callback: _PassphraseCallback[_T], 

1002 userdata: _T | None = None, 

1003 ) -> None: 

1004 """ 

1005 Set the passphrase callback. This function will be called 

1006 when a private key with a passphrase is loaded. 

1007 

1008 :param callback: The Python callback to use. This must accept three 

1009 positional arguments. First, an integer giving the maximum length 

1010 of the passphrase it may return. If the returned passphrase is 

1011 longer than this, it will be truncated. Second, a boolean value 

1012 which will be true if the user should be prompted for the 

1013 passphrase twice and the callback should verify that the two values 

1014 supplied are equal. Third, the value given as the *userdata* 

1015 parameter to :meth:`set_passwd_cb`. The *callback* must return 

1016 a byte string. If an error occurs, *callback* should return a false 

1017 value (e.g. an empty string). 

1018 :param userdata: (optional) A Python object which will be given as 

1019 argument to the callback 

1020 :return: None 

1021 """ 

1022 if not callable(callback): 

1023 raise TypeError("callback must be callable") 

1024 

1025 self._passphrase_helper = self._wrap_callback(callback) 

1026 self._passphrase_callback = self._passphrase_helper.callback 

1027 _lib.SSL_CTX_set_default_passwd_cb( 

1028 self._context, self._passphrase_callback 

1029 ) 

1030 self._passphrase_userdata = userdata 

1031 

1032 @_require_not_used 

1033 def set_default_verify_paths(self) -> None: 

1034 """ 

1035 Specify that the platform provided CA certificates are to be used for 

1036 verification purposes. This method has some caveats related to the 

1037 binary wheels that cryptography (pyOpenSSL's primary dependency) ships: 

1038 

1039 * macOS will only load certificates using this method if the user has 

1040 the ``openssl@1.1`` `Homebrew <https://brew.sh>`_ formula installed 

1041 in the default location. 

1042 * Windows will not work. 

1043 * manylinux cryptography wheels will work on most common Linux 

1044 distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the 

1045 manylinux wheel and attempts to load roots via a fallback path. 

1046 

1047 :return: None 

1048 """ 

1049 # SSL_CTX_set_default_verify_paths will attempt to load certs from 

1050 # both a cafile and capath that are set at compile time. However, 

1051 # it will first check environment variables and, if present, load 

1052 # those paths instead 

1053 set_result = _lib.SSL_CTX_set_default_verify_paths(self._context) 

1054 _openssl_assert(set_result == 1) 

1055 # After attempting to set default_verify_paths we need to know whether 

1056 # to go down the fallback path. 

1057 # First we'll check to see if any env vars have been set. If so, 

1058 # we won't try to do anything else because the user has set the path 

1059 # themselves. 

1060 if not self._check_env_vars_set("SSL_CERT_DIR", "SSL_CERT_FILE"): 

1061 default_dir = _ffi.string(_lib.X509_get_default_cert_dir()) 

1062 default_file = _ffi.string(_lib.X509_get_default_cert_file()) 

1063 # Now we check to see if the default_dir and default_file are set 

1064 # to the exact values we use in our manylinux builds. If they are 

1065 # then we know to load the fallbacks 

1066 if ( 

1067 default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR 

1068 and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE 

1069 ): 

1070 # This is manylinux, let's load our fallback paths 

1071 self._fallback_default_verify_paths( 

1072 _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS 

1073 ) 

1074 

1075 def _check_env_vars_set(self, dir_env_var: str, file_env_var: str) -> bool: 

1076 """ 

1077 Check to see if the default cert dir/file environment vars are present. 

1078 

1079 :return: bool 

1080 """ 

1081 return ( 

1082 os.environ.get(file_env_var) is not None 

1083 or os.environ.get(dir_env_var) is not None 

1084 ) 

1085 

1086 def _fallback_default_verify_paths( 

1087 self, file_path: list[str], dir_path: list[str] 

1088 ) -> None: 

1089 """ 

1090 Default verify paths are based on the compiled version of OpenSSL. 

1091 However, when pyca/cryptography is compiled as a manylinux wheel 

1092 that compiled location can potentially be wrong. So, like Go, we 

1093 will try a predefined set of paths and attempt to load roots 

1094 from there. 

1095 

1096 :return: None 

1097 """ 

1098 for cafile in file_path: 

1099 if os.path.isfile(cafile): 

1100 self.load_verify_locations(cafile) 

1101 break 

1102 

1103 for capath in dir_path: 

1104 if os.path.isdir(capath): 

1105 self.load_verify_locations(None, capath) 

1106 break 

1107 

1108 @_require_not_used 

1109 def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None: 

1110 """ 

1111 Load a certificate chain from a file. 

1112 

1113 :param certfile: The name of the certificate chain file (``bytes`` or 

1114 ``str``). Must be PEM encoded. 

1115 

1116 :return: None 

1117 """ 

1118 certfile = _path_bytes(certfile) 

1119 

1120 result = _lib.SSL_CTX_use_certificate_chain_file( 

1121 self._context, certfile 

1122 ) 

1123 if not result: 

1124 _raise_current_error() 

1125 

1126 @_require_not_used 

1127 def use_certificate_file( 

1128 self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM 

1129 ) -> None: 

1130 """ 

1131 Load a certificate from a file 

1132 

1133 :param certfile: The name of the certificate file (``bytes`` or 

1134 ``str``). 

1135 :param filetype: (optional) The encoding of the file, which is either 

1136 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is 

1137 :const:`FILETYPE_PEM`. 

1138 

1139 :return: None 

1140 """ 

1141 certfile = _path_bytes(certfile) 

1142 if not isinstance(filetype, int): 

1143 raise TypeError("filetype must be an integer") 

1144 

1145 use_result = _lib.SSL_CTX_use_certificate_file( 

1146 self._context, certfile, filetype 

1147 ) 

1148 if not use_result: 

1149 _raise_current_error() 

1150 

1151 @_require_not_used 

1152 def use_certificate(self, cert: X509 | x509.Certificate) -> None: 

1153 """ 

1154 Load a certificate from a X509 object 

1155 

1156 :param cert: The X509 object 

1157 :return: None 

1158 """ 

1159 # Mirrored at Connection.use_certificate 

1160 if not isinstance(cert, X509): 

1161 cert = X509.from_cryptography(cert) 

1162 else: 

1163 warnings.warn( 

1164 ( 

1165 "Passing pyOpenSSL X509 objects is deprecated. You " 

1166 "should use a cryptography.x509.Certificate instead." 

1167 ), 

1168 DeprecationWarning, 

1169 stacklevel=2, 

1170 ) 

1171 

1172 use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509) 

1173 if not use_result: 

1174 _raise_current_error() 

1175 

1176 @_require_not_used 

1177 def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None: 

1178 """ 

1179 Add certificate to chain 

1180 

1181 :param certobj: The X509 certificate object to add to the chain 

1182 :return: None 

1183 """ 

1184 if not isinstance(certobj, X509): 

1185 certobj = X509.from_cryptography(certobj) 

1186 else: 

1187 warnings.warn( 

1188 ( 

1189 "Passing pyOpenSSL X509 objects is deprecated. You " 

1190 "should use a cryptography.x509.Certificate instead." 

1191 ), 

1192 DeprecationWarning, 

1193 stacklevel=2, 

1194 ) 

1195 

1196 copy = _lib.X509_dup(certobj._x509) 

1197 add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy) 

1198 if not add_result: 

1199 # TODO: This is untested. 

1200 _lib.X509_free(copy) 

1201 _raise_current_error() 

1202 

1203 def _raise_passphrase_exception(self) -> None: 

1204 if self._passphrase_helper is not None: 

1205 self._passphrase_helper.raise_if_problem(Error) 

1206 

1207 _raise_current_error() 

1208 

1209 @_require_not_used 

1210 def use_privatekey_file( 

1211 self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM 

1212 ) -> None: 

1213 """ 

1214 Load a private key from a file 

1215 

1216 :param keyfile: The name of the key file (``bytes`` or ``str``) 

1217 :param filetype: (optional) The encoding of the file, which is either 

1218 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is 

1219 :const:`FILETYPE_PEM`. 

1220 

1221 :return: None 

1222 """ 

1223 keyfile = _path_bytes(keyfile) 

1224 

1225 if not isinstance(filetype, int): 

1226 raise TypeError("filetype must be an integer") 

1227 

1228 use_result = _lib.SSL_CTX_use_PrivateKey_file( 

1229 self._context, keyfile, filetype 

1230 ) 

1231 if not use_result: 

1232 self._raise_passphrase_exception() 

1233 

1234 @_require_not_used 

1235 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: 

1236 """ 

1237 Load a private key from a PKey object 

1238 

1239 :param pkey: The PKey object 

1240 :return: None 

1241 """ 

1242 # Mirrored at Connection.use_privatekey 

1243 if not isinstance(pkey, PKey): 

1244 pkey = PKey.from_cryptography_key(pkey) 

1245 else: 

1246 warnings.warn( 

1247 ( 

1248 "Passing pyOpenSSL PKey objects is deprecated. You " 

1249 "should use a cryptography private key instead." 

1250 ), 

1251 DeprecationWarning, 

1252 stacklevel=2, 

1253 ) 

1254 

1255 use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey) 

1256 if not use_result: 

1257 self._raise_passphrase_exception() 

1258 

1259 def check_privatekey(self) -> None: 

1260 """ 

1261 Check if the private key (loaded with :meth:`use_privatekey`) matches 

1262 the certificate (loaded with :meth:`use_certificate`) 

1263 

1264 :return: :data:`None` (raises :exc:`Error` if something's wrong) 

1265 """ 

1266 if not _lib.SSL_CTX_check_private_key(self._context): 

1267 _raise_current_error() 

1268 

1269 @_require_not_used 

1270 def load_client_ca(self, cafile: bytes) -> None: 

1271 """ 

1272 Load the trusted certificates that will be sent to the client. Does 

1273 not actually imply any of the certificates are trusted; that must be 

1274 configured separately. 

1275 

1276 :param bytes cafile: The path to a certificates file in PEM format. 

1277 :return: None 

1278 """ 

1279 ca_list = _lib.SSL_load_client_CA_file( 

1280 _text_to_bytes_and_warn("cafile", cafile) 

1281 ) 

1282 _openssl_assert(ca_list != _ffi.NULL) 

1283 _lib.SSL_CTX_set_client_CA_list(self._context, ca_list) 

1284 

1285 @_require_not_used 

1286 def set_session_id(self, buf: bytes) -> None: 

1287 """ 

1288 Set the session id to *buf* within which a session can be reused for 

1289 this Context object. This is needed when doing session resumption, 

1290 because there is no way for a stored session to know which Context 

1291 object it is associated with. 

1292 

1293 :param bytes buf: The session id. 

1294 

1295 :returns: None 

1296 """ 

1297 buf = _text_to_bytes_and_warn("buf", buf) 

1298 _openssl_assert( 

1299 _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf)) 

1300 == 1 

1301 ) 

1302 

1303 @_require_not_used 

1304 def set_session_cache_mode(self, mode: int) -> int: 

1305 """ 

1306 Set the behavior of the session cache used by all connections using 

1307 this Context. The previously set mode is returned. See 

1308 :const:`SESS_CACHE_*` for details about particular modes. 

1309 

1310 :param mode: One or more of the SESS_CACHE_* flags (combine using 

1311 bitwise or) 

1312 :returns: The previously set caching mode. 

1313 

1314 .. versionadded:: 0.14 

1315 """ 

1316 if not isinstance(mode, int): 

1317 raise TypeError("mode must be an integer") 

1318 

1319 return _lib.SSL_CTX_set_session_cache_mode(self._context, mode) 

1320 

1321 def get_session_cache_mode(self) -> int: 

1322 """ 

1323 Get the current session cache mode. 

1324 

1325 :returns: The currently used cache mode. 

1326 

1327 .. versionadded:: 0.14 

1328 """ 

1329 return _lib.SSL_CTX_get_session_cache_mode(self._context) 

1330 

1331 @_require_not_used 

1332 def set_verify( 

1333 self, mode: int, callback: _VerifyCallback | None = None 

1334 ) -> None: 

1335 """ 

1336 Set the verification flags for this Context object to *mode* and 

1337 specify that *callback* should be used for verification callbacks. 

1338 

1339 :param mode: The verify mode, this should be one of 

1340 :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If 

1341 :const:`VERIFY_PEER` is used, *mode* can be OR:ed with 

1342 :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and 

1343 :const:`VERIFY_CLIENT_ONCE` to further control the behaviour. 

1344 :param callback: The optional Python verification callback to use. 

1345 This should take five arguments: A Connection object, an X509 

1346 object, and three integer variables, which are in turn potential 

1347 error number, error depth and return code. *callback* should 

1348 return True if verification passes and False otherwise. 

1349 If omitted, OpenSSL's default verification is used. 

1350 :return: None 

1351 

1352 See SSL_CTX_set_verify(3SSL) for further details. 

1353 """ 

1354 if not isinstance(mode, int): 

1355 raise TypeError("mode must be an integer") 

1356 

1357 if callback is None: 

1358 self._verify_helper = None 

1359 self._verify_callback = None 

1360 _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL) 

1361 else: 

1362 if not callable(callback): 

1363 raise TypeError("callback must be callable") 

1364 

1365 self._verify_helper = _VerifyHelper(callback) 

1366 self._verify_callback = self._verify_helper.callback 

1367 _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback) 

1368 

1369 @_require_not_used 

1370 def set_verify_depth(self, depth: int) -> None: 

1371 """ 

1372 Set the maximum depth for the certificate chain verification that shall 

1373 be allowed for this Context object. 

1374 

1375 :param depth: An integer specifying the verify depth 

1376 :return: None 

1377 """ 

1378 if not isinstance(depth, int): 

1379 raise TypeError("depth must be an integer") 

1380 

1381 _lib.SSL_CTX_set_verify_depth(self._context, depth) 

1382 

1383 def get_verify_mode(self) -> int: 

1384 """ 

1385 Retrieve the Context object's verify mode, as set by 

1386 :meth:`set_verify`. 

1387 

1388 :return: The verify mode 

1389 """ 

1390 return _lib.SSL_CTX_get_verify_mode(self._context) 

1391 

1392 def get_verify_depth(self) -> int: 

1393 """ 

1394 Retrieve the Context object's verify depth, as set by 

1395 :meth:`set_verify_depth`. 

1396 

1397 :return: The verify depth 

1398 """ 

1399 return _lib.SSL_CTX_get_verify_depth(self._context) 

1400 

1401 @_require_not_used 

1402 def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None: 

1403 """ 

1404 Load parameters for Ephemeral Diffie-Hellman 

1405 

1406 :param dhfile: The file to load EDH parameters from (``bytes`` or 

1407 ``str``). 

1408 

1409 :return: None 

1410 """ 

1411 dhfile = _path_bytes(dhfile) 

1412 

1413 bio = _lib.BIO_new_file(dhfile, b"r") 

1414 if bio == _ffi.NULL: 

1415 _raise_current_error() 

1416 bio = _ffi.gc(bio, _lib.BIO_free) 

1417 

1418 dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

1419 dh = _ffi.gc(dh, _lib.DH_free) 

1420 res = _lib.SSL_CTX_set_tmp_dh(self._context, dh) 

1421 _openssl_assert(res == 1) 

1422 

1423 @_require_not_used 

1424 def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None: 

1425 """ 

1426 Select a curve to use for ECDHE key exchange. 

1427 

1428 :param curve: A curve instance from cryptography 

1429 (:class:`~cryptogragraphy.hazmat.primitives.asymmetric.ec.EllipticCurve`). 

1430 Alternatively (deprecated) a curve object from either 

1431 :meth:`OpenSSL.crypto.get_elliptic_curve` or 

1432 :meth:`OpenSSL.crypto.get_elliptic_curves`. 

1433 

1434 :return: None 

1435 """ 

1436 

1437 if isinstance(curve, _EllipticCurve): 

1438 warnings.warn( 

1439 ( 

1440 "Passing pyOpenSSL elliptic curves to set_tmp_ecdh is " 

1441 "deprecated. You should use cryptography's elliptic curve " 

1442 "types instead." 

1443 ), 

1444 DeprecationWarning, 

1445 stacklevel=2, 

1446 ) 

1447 _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY()) 

1448 else: 

1449 name = curve.name 

1450 if name == "secp192r1": 

1451 name = "prime192v1" 

1452 elif name == "secp256r1": 

1453 name = "prime256v1" 

1454 nid = _lib.OBJ_txt2nid(name.encode()) 

1455 if nid == _lib.NID_undef: 

1456 _raise_current_error() 

1457 

1458 ec = _lib.EC_KEY_new_by_curve_name(nid) 

1459 _openssl_assert(ec != _ffi.NULL) 

1460 ec = _ffi.gc(ec, _lib.EC_KEY_free) 

1461 _lib.SSL_CTX_set_tmp_ecdh(self._context, ec) 

1462 

1463 @_require_not_used 

1464 def set_cipher_list(self, cipher_list: bytes) -> None: 

1465 """ 

1466 Set the list of ciphers to be used in this context. 

1467 

1468 See the OpenSSL manual for more information (e.g. 

1469 :manpage:`ciphers(1)`). 

1470 

1471 :param bytes cipher_list: An OpenSSL cipher string. 

1472 :return: None 

1473 """ 

1474 cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list) 

1475 

1476 if not isinstance(cipher_list, bytes): 

1477 raise TypeError("cipher_list must be a byte string.") 

1478 

1479 _openssl_assert( 

1480 _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1 

1481 ) 

1482 # In OpenSSL 1.1.1 setting the cipher list will always return TLS 1.3 

1483 # ciphers even if you pass an invalid cipher. Applications (like 

1484 # Twisted) have tests that depend on an error being raised if an 

1485 # invalid cipher string is passed, but without the following check 

1486 # for the TLS 1.3 specific cipher suites it would never error. 

1487 tmpconn = Connection(self, None) 

1488 if tmpconn.get_cipher_list() == [ 

1489 "TLS_AES_256_GCM_SHA384", 

1490 "TLS_CHACHA20_POLY1305_SHA256", 

1491 "TLS_AES_128_GCM_SHA256", 

1492 ]: 

1493 raise Error( 

1494 [ 

1495 ( 

1496 "SSL routines", 

1497 "SSL_CTX_set_cipher_list", 

1498 "no cipher match", 

1499 ), 

1500 ], 

1501 ) 

1502 

1503 @_require_not_used 

1504 def set_client_ca_list( 

1505 self, certificate_authorities: Sequence[X509Name] 

1506 ) -> None: 

1507 """ 

1508 Set the list of preferred client certificate signers for this server 

1509 context. 

1510 

1511 This list of certificate authorities will be sent to the client when 

1512 the server requests a client certificate. 

1513 

1514 :param certificate_authorities: a sequence of X509Names. 

1515 :return: None 

1516 

1517 .. versionadded:: 0.10 

1518 """ 

1519 name_stack = _lib.sk_X509_NAME_new_null() 

1520 _openssl_assert(name_stack != _ffi.NULL) 

1521 

1522 try: 

1523 for ca_name in certificate_authorities: 

1524 if not isinstance(ca_name, X509Name): 

1525 raise TypeError( 

1526 f"client CAs must be X509Name objects, not " 

1527 f"{type(ca_name).__name__} objects" 

1528 ) 

1529 copy = _lib.X509_NAME_dup(ca_name._name) 

1530 _openssl_assert(copy != _ffi.NULL) 

1531 push_result = _lib.sk_X509_NAME_push(name_stack, copy) 

1532 if not push_result: 

1533 _lib.X509_NAME_free(copy) 

1534 _raise_current_error() 

1535 except Exception: 

1536 _lib.sk_X509_NAME_free(name_stack) 

1537 raise 

1538 

1539 _lib.SSL_CTX_set_client_CA_list(self._context, name_stack) 

1540 

1541 @_require_not_used 

1542 def add_client_ca( 

1543 self, certificate_authority: X509 | x509.Certificate 

1544 ) -> None: 

1545 """ 

1546 Add the CA certificate to the list of preferred signers for this 

1547 context. 

1548 

1549 The list of certificate authorities will be sent to the client when the 

1550 server requests a client certificate. 

1551 

1552 :param certificate_authority: certificate authority's X509 certificate. 

1553 :return: None 

1554 

1555 .. versionadded:: 0.10 

1556 """ 

1557 if not isinstance(certificate_authority, X509): 

1558 certificate_authority = X509.from_cryptography( 

1559 certificate_authority 

1560 ) 

1561 else: 

1562 warnings.warn( 

1563 ( 

1564 "Passing pyOpenSSL X509 objects is deprecated. You " 

1565 "should use a cryptography.x509.Certificate instead." 

1566 ), 

1567 DeprecationWarning, 

1568 stacklevel=2, 

1569 ) 

1570 

1571 add_result = _lib.SSL_CTX_add_client_CA( 

1572 self._context, certificate_authority._x509 

1573 ) 

1574 _openssl_assert(add_result == 1) 

1575 

1576 @_require_not_used 

1577 def set_timeout(self, timeout: int) -> None: 

1578 """ 

1579 Set the timeout for newly created sessions for this Context object to 

1580 *timeout*. The default value is 300 seconds. See the OpenSSL manual 

1581 for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`). 

1582 

1583 :param timeout: The timeout in (whole) seconds 

1584 :return: The previous session timeout 

1585 """ 

1586 if not isinstance(timeout, int): 

1587 raise TypeError("timeout must be an integer") 

1588 

1589 return _lib.SSL_CTX_set_timeout(self._context, timeout) 

1590 

1591 def get_timeout(self) -> int: 

1592 """ 

1593 Retrieve session timeout, as set by :meth:`set_timeout`. The default 

1594 is 300 seconds. 

1595 

1596 :return: The session timeout 

1597 """ 

1598 return _lib.SSL_CTX_get_timeout(self._context) 

1599 

1600 @_require_not_used 

1601 def set_info_callback( 

1602 self, callback: Callable[[Connection, int, int], None] 

1603 ) -> None: 

1604 """ 

1605 Set the information callback to *callback*. This function will be 

1606 called from time to time during SSL handshakes. 

1607 

1608 :param callback: The Python callback to use. This should take three 

1609 arguments: a Connection object and two integers. The first integer 

1610 specifies where in the SSL handshake the function was called, and 

1611 the other the return code from a (possibly failed) internal 

1612 function call. 

1613 :return: None 

1614 """ 

1615 

1616 @wraps(callback) 

1617 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def] 

1618 callback(Connection._reverse_mapping[ssl], where, return_code) 

1619 

1620 self._info_callback = _ffi.callback( 

1621 "void (*)(const SSL *, int, int)", wrapper 

1622 ) 

1623 _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) 

1624 

1625 @_requires_keylog 

1626 @_require_not_used 

1627 def set_keylog_callback( 

1628 self, callback: Callable[[Connection, bytes], None] 

1629 ) -> None: 

1630 """ 

1631 Set the TLS key logging callback to *callback*. This function will be 

1632 called whenever TLS key material is generated or received, in order 

1633 to allow applications to store this keying material for debugging 

1634 purposes. 

1635 

1636 :param callback: The Python callback to use. This should take two 

1637 arguments: a Connection object and a bytestring that contains 

1638 the key material in the format used by NSS for its SSLKEYLOGFILE 

1639 debugging output. 

1640 :return: None 

1641 """ 

1642 

1643 @wraps(callback) 

1644 def wrapper(ssl, line): # type: ignore[no-untyped-def] 

1645 line = _ffi.string(line) 

1646 callback(Connection._reverse_mapping[ssl], line) 

1647 

1648 self._keylog_callback = _ffi.callback( 

1649 "void (*)(const SSL *, const char *)", wrapper 

1650 ) 

1651 _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback) 

1652 

1653 def get_app_data(self) -> Any: 

1654 """ 

1655 Get the application data (supplied via :meth:`set_app_data()`) 

1656 

1657 :return: The application data 

1658 """ 

1659 return self._app_data 

1660 

1661 @_require_not_used 

1662 def set_app_data(self, data: Any) -> None: 

1663 """ 

1664 Set the application data (will be returned from get_app_data()) 

1665 

1666 :param data: Any Python object 

1667 :return: None 

1668 """ 

1669 self._app_data = data 

1670 

1671 def get_cert_store(self) -> X509Store | None: 

1672 """ 

1673 Get the certificate store for the context. This can be used to add 

1674 "trusted" certificates without using the 

1675 :meth:`load_verify_locations` method. 

1676 

1677 :return: A X509Store object or None if it does not have one. 

1678 """ 

1679 store = _lib.SSL_CTX_get_cert_store(self._context) 

1680 if store == _ffi.NULL: 

1681 # TODO: This is untested. 

1682 return None 

1683 

1684 pystore = X509Store.__new__(X509Store) 

1685 pystore._store = store 

1686 return pystore 

1687 

1688 @_require_not_used 

1689 def set_options(self, options: int) -> int: 

1690 """ 

1691 Add options. Options set before are not cleared! 

1692 This method should be used with the :const:`OP_*` constants. 

1693 

1694 :param options: The options to add. 

1695 :return: The new option bitmask. 

1696 """ 

1697 if not isinstance(options, int): 

1698 raise TypeError("options must be an integer") 

1699 

1700 return _lib.SSL_CTX_set_options(self._context, options) 

1701 

1702 @_require_not_used 

1703 def set_mode(self, mode: int) -> int: 

1704 """ 

1705 Add modes via bitmask. Modes set before are not cleared! This method 

1706 should be used with the :const:`MODE_*` constants. 

1707 

1708 :param mode: The mode to add. 

1709 :return: The new mode bitmask. 

1710 """ 

1711 if not isinstance(mode, int): 

1712 raise TypeError("mode must be an integer") 

1713 

1714 return _lib.SSL_CTX_set_mode(self._context, mode) 

1715 

1716 @_require_not_used 

1717 def set_tlsext_servername_callback( 

1718 self, callback: Callable[[Connection], None] 

1719 ) -> None: 

1720 """ 

1721 Specify a callback function to be called when clients specify a server 

1722 name. 

1723 

1724 :param callback: The callback function. It will be invoked with one 

1725 argument, the Connection instance. 

1726 

1727 .. versionadded:: 0.13 

1728 """ 

1729 

1730 @wraps(callback) 

1731 def wrapper(ssl, alert, arg): # type: ignore[no-untyped-def] 

1732 callback(Connection._reverse_mapping[ssl]) 

1733 return 0 

1734 

1735 self._tlsext_servername_callback = _ffi.callback( 

1736 "int (*)(SSL *, int *, void *)", wrapper 

1737 ) 

1738 _lib.SSL_CTX_set_tlsext_servername_callback( 

1739 self._context, self._tlsext_servername_callback 

1740 ) 

1741 

1742 @_require_not_used 

1743 def set_tlsext_use_srtp(self, profiles: bytes) -> None: 

1744 """ 

1745 Enable support for negotiating SRTP keying material. 

1746 

1747 :param bytes profiles: A colon delimited list of protection profile 

1748 names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``. 

1749 :return: None 

1750 """ 

1751 if not isinstance(profiles, bytes): 

1752 raise TypeError("profiles must be a byte string.") 

1753 

1754 _openssl_assert( 

1755 _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0 

1756 ) 

1757 

1758 @_require_not_used 

1759 def set_alpn_protos(self, protos: list[bytes]) -> None: 

1760 """ 

1761 Specify the protocols that the client is prepared to speak after the 

1762 TLS connection has been negotiated using Application Layer Protocol 

1763 Negotiation. 

1764 

1765 :param protos: A list of the protocols to be offered to the server. 

1766 This list should be a Python list of bytestrings representing the 

1767 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. 

1768 """ 

1769 # Different versions of OpenSSL are inconsistent about how they handle 

1770 # empty proto lists (see #1043), so we avoid the problem entirely by 

1771 # rejecting them ourselves. 

1772 if not protos: 

1773 raise ValueError("at least one protocol must be specified") 

1774 

1775 # Take the list of protocols and join them together, prefixing them 

1776 # with their lengths. 

1777 protostr = b"".join( 

1778 chain.from_iterable((bytes((len(p),)), p) for p in protos) 

1779 ) 

1780 

1781 # Build a C string from the list. We don't need to save this off 

1782 # because OpenSSL immediately copies the data out. 

1783 input_str = _ffi.new("unsigned char[]", protostr) 

1784 

1785 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: 

1786 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() 

1787 # return 0 on success, and non-0 on failure. 

1788 # WARNING: these functions reverse the return value convention. 

1789 _openssl_assert( 

1790 _lib.SSL_CTX_set_alpn_protos( 

1791 self._context, input_str, len(protostr) 

1792 ) 

1793 == 0 

1794 ) 

1795 

1796 @_require_not_used 

1797 def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None: 

1798 """ 

1799 Specify a callback function that will be called on the server when a 

1800 client offers protocols using ALPN. 

1801 

1802 :param callback: The callback function. It will be invoked with two 

1803 arguments: the Connection, and a list of offered protocols as 

1804 bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return 

1805 one of those bytestrings to indicate the chosen protocol, the 

1806 empty bytestring to terminate the TLS connection, or the 

1807 :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered 

1808 protocol was selected, but that the connection should not be 

1809 aborted. 

1810 """ 

1811 self._alpn_select_helper = _ALPNSelectHelper(callback) 

1812 self._alpn_select_callback = self._alpn_select_helper.callback 

1813 _lib.SSL_CTX_set_alpn_select_cb( 

1814 self._context, self._alpn_select_callback, _ffi.NULL 

1815 ) 

1816 

1817 def _set_ocsp_callback( 

1818 self, 

1819 helper: _OCSPClientCallbackHelper | _OCSPServerCallbackHelper, 

1820 data: Any | None, 

1821 ) -> None: 

1822 """ 

1823 This internal helper does the common work for 

1824 ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is 

1825 almost all of it. 

1826 """ 

1827 self._ocsp_helper = helper 

1828 self._ocsp_callback = helper.callback 

1829 if data is None: 

1830 self._ocsp_data = _ffi.NULL 

1831 else: 

1832 self._ocsp_data = _ffi.new_handle(data) 

1833 

1834 rc = _lib.SSL_CTX_set_tlsext_status_cb( 

1835 self._context, self._ocsp_callback 

1836 ) 

1837 _openssl_assert(rc == 1) 

1838 rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data) 

1839 _openssl_assert(rc == 1) 

1840 

1841 @_require_not_used 

1842 def set_ocsp_server_callback( 

1843 self, 

1844 callback: _OCSPServerCallback[_T], 

1845 data: _T | None = None, 

1846 ) -> None: 

1847 """ 

1848 Set a callback to provide OCSP data to be stapled to the TLS handshake 

1849 on the server side. 

1850 

1851 :param callback: The callback function. It will be invoked with two 

1852 arguments: the Connection, and the optional arbitrary data you have 

1853 provided. The callback must return a bytestring that contains the 

1854 OCSP data to staple to the handshake. If no OCSP data is available 

1855 for this connection, return the empty bytestring. 

1856 :param data: Some opaque data that will be passed into the callback 

1857 function when called. This can be used to avoid needing to do 

1858 complex data lookups or to keep track of what context is being 

1859 used. This parameter is optional. 

1860 """ 

1861 helper = _OCSPServerCallbackHelper(callback) 

1862 self._set_ocsp_callback(helper, data) 

1863 

1864 @_require_not_used 

1865 def set_ocsp_client_callback( 

1866 self, 

1867 callback: _OCSPClientCallback[_T], 

1868 data: _T | None = None, 

1869 ) -> None: 

1870 """ 

1871 Set a callback to validate OCSP data stapled to the TLS handshake on 

1872 the client side. 

1873 

1874 :param callback: The callback function. It will be invoked with three 

1875 arguments: the Connection, a bytestring containing the stapled OCSP 

1876 assertion, and the optional arbitrary data you have provided. The 

1877 callback must return a boolean that indicates the result of 

1878 validating the OCSP data: ``True`` if the OCSP data is valid and 

1879 the certificate can be trusted, or ``False`` if either the OCSP 

1880 data is invalid or the certificate has been revoked. 

1881 :param data: Some opaque data that will be passed into the callback 

1882 function when called. This can be used to avoid needing to do 

1883 complex data lookups or to keep track of what context is being 

1884 used. This parameter is optional. 

1885 """ 

1886 helper = _OCSPClientCallbackHelper(callback) 

1887 self._set_ocsp_callback(helper, data) 

1888 

1889 @_require_not_used 

1890 def set_cookie_generate_callback( 

1891 self, callback: _CookieGenerateCallback 

1892 ) -> None: 

1893 self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback) 

1894 _lib.SSL_CTX_set_cookie_generate_cb( 

1895 self._context, 

1896 self._cookie_generate_helper.callback, 

1897 ) 

1898 

1899 @_require_not_used 

1900 def set_cookie_verify_callback( 

1901 self, callback: _CookieVerifyCallback 

1902 ) -> None: 

1903 self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback) 

1904 _lib.SSL_CTX_set_cookie_verify_cb( 

1905 self._context, 

1906 self._cookie_verify_helper.callback, 

1907 ) 

1908 

1909 

1910class Connection: 

1911 _reverse_mapping: typing.MutableMapping[Any, Connection] = ( 

1912 WeakValueDictionary() 

1913 ) 

1914 

1915 def __init__( 

1916 self, context: Context, socket: socket.socket | None = None 

1917 ) -> None: 

1918 """ 

1919 Create a new Connection object, using the given OpenSSL.SSL.Context 

1920 instance and socket. 

1921 

1922 :param context: An SSL Context to use for this connection 

1923 :param socket: The socket to use for transport layer 

1924 """ 

1925 if not isinstance(context, Context): 

1926 raise TypeError("context must be a Context instance") 

1927 

1928 context._used = True 

1929 

1930 ssl = _lib.SSL_new(context._context) 

1931 self._ssl = _ffi.gc(ssl, _lib.SSL_free) 

1932 # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns 

1933 # an SSL_ERROR_WANT_READ when processing a non-application data packet 

1934 # even though there is still data on the underlying transport. 

1935 # See https://github.com/openssl/openssl/issues/6234 for more details. 

1936 _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY) 

1937 self._context = context 

1938 self._app_data = None 

1939 

1940 # References to strings used for Application Layer Protocol 

1941 # Negotiation. These strings get copied at some point but it's well 

1942 # after the callback returns, so we have to hang them somewhere to 

1943 # avoid them getting freed. 

1944 self._alpn_select_callback_args: Any = None 

1945 

1946 # Reference the verify_callback of the Context. This ensures that if 

1947 # set_verify is called again after the SSL object has been created we 

1948 # do not point to a dangling reference 

1949 self._verify_helper = context._verify_helper 

1950 self._verify_callback = context._verify_callback 

1951 

1952 # And likewise for the cookie callbacks 

1953 self._cookie_generate_helper = context._cookie_generate_helper 

1954 self._cookie_verify_helper = context._cookie_verify_helper 

1955 

1956 self._reverse_mapping[self._ssl] = self 

1957 

1958 if socket is None: 

1959 self._socket = None 

1960 # Don't set up any gc for these, SSL_free will take care of them. 

1961 self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem()) 

1962 _openssl_assert(self._into_ssl != _ffi.NULL) 

1963 

1964 self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem()) 

1965 _openssl_assert(self._from_ssl != _ffi.NULL) 

1966 

1967 _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl) 

1968 else: 

1969 self._into_ssl = None 

1970 self._from_ssl = None 

1971 self._socket = socket 

1972 set_result = _lib.SSL_set_fd( 

1973 self._ssl, _asFileDescriptor(self._socket) 

1974 ) 

1975 _openssl_assert(set_result == 1) 

1976 

1977 def __getattr__(self, name: str) -> Any: 

1978 """ 

1979 Look up attributes on the wrapped socket object if they are not found 

1980 on the Connection object. 

1981 """ 

1982 if self._socket is None: 

1983 raise AttributeError( 

1984 f"'{self.__class__.__name__}' object has no attribute '{name}'" 

1985 ) 

1986 else: 

1987 return getattr(self._socket, name) 

1988 

1989 def _raise_ssl_error(self, ssl: Any, result: int) -> None: 

1990 if self._context._verify_helper is not None: 

1991 self._context._verify_helper.raise_if_problem() 

1992 if self._context._alpn_select_helper is not None: 

1993 self._context._alpn_select_helper.raise_if_problem() 

1994 if self._context._ocsp_helper is not None: 

1995 self._context._ocsp_helper.raise_if_problem() 

1996 

1997 error = _lib.SSL_get_error(ssl, result) 

1998 if error == _lib.SSL_ERROR_WANT_READ: 

1999 raise WantReadError() 

2000 elif error == _lib.SSL_ERROR_WANT_WRITE: 

2001 raise WantWriteError() 

2002 elif error == _lib.SSL_ERROR_ZERO_RETURN: 

2003 raise ZeroReturnError() 

2004 elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP: 

2005 # TODO: This is untested. 

2006 raise WantX509LookupError() 

2007 elif error == _lib.SSL_ERROR_SYSCALL: 

2008 if platform == "win32": 

2009 errno = _ffi.getwinerror()[0] 

2010 else: 

2011 errno = _ffi.errno 

2012 if _lib.ERR_peek_error() == 0 or errno != 0: 

2013 if result < 0 and errno != 0: 

2014 raise SysCallError(errno, errorcode.get(errno)) 

2015 raise SysCallError(-1, "Unexpected EOF") 

2016 else: 

2017 # TODO: This is untested, but I think twisted hits it? 

2018 _raise_current_error() 

2019 elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0: 

2020 # In 3.0.x an unexpected EOF no longer triggers syscall error 

2021 # but we want to maintain compatibility so we check here and 

2022 # raise syscall if it is an EOF. Since we're not actually sure 

2023 # what else could raise SSL_ERROR_SSL we check for the presence 

2024 # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING 

2025 # and if it's not present we just raise an error, which matches 

2026 # the behavior before we added this elif section 

2027 peeked_error = _lib.ERR_peek_error() 

2028 reason = _lib.ERR_GET_REASON(peeked_error) 

2029 if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING: 

2030 _openssl_assert( 

2031 reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING 

2032 ) 

2033 _lib.ERR_clear_error() 

2034 raise SysCallError(-1, "Unexpected EOF") 

2035 else: 

2036 _raise_current_error() 

2037 elif error == _lib.SSL_ERROR_NONE: 

2038 pass 

2039 else: 

2040 _raise_current_error() 

2041 

2042 def get_context(self) -> Context: 

2043 """ 

2044 Retrieve the :class:`Context` object associated with this 

2045 :class:`Connection`. 

2046 """ 

2047 return self._context 

2048 

2049 def set_context(self, context: Context) -> None: 

2050 """ 

2051 Switch this connection to a new session context. 

2052 

2053 :param context: A :class:`Context` instance giving the new session 

2054 context to use. 

2055 """ 

2056 if not isinstance(context, Context): 

2057 raise TypeError("context must be a Context instance") 

2058 

2059 _lib.SSL_set_SSL_CTX(self._ssl, context._context) 

2060 self._context = context 

2061 self._context._used = True 

2062 

2063 def get_servername(self) -> bytes | None: 

2064 """ 

2065 Retrieve the servername extension value if provided in the client hello 

2066 message, or None if there wasn't one. 

2067 

2068 :return: A byte string giving the server name or :data:`None`. 

2069 

2070 .. versionadded:: 0.13 

2071 """ 

2072 name = _lib.SSL_get_servername( 

2073 self._ssl, _lib.TLSEXT_NAMETYPE_host_name 

2074 ) 

2075 if name == _ffi.NULL: 

2076 return None 

2077 

2078 return _ffi.string(name) 

2079 

2080 def set_verify( 

2081 self, mode: int, callback: _VerifyCallback | None = None 

2082 ) -> None: 

2083 """ 

2084 Override the Context object's verification flags for this specific 

2085 connection. See :py:meth:`Context.set_verify` for details. 

2086 """ 

2087 if not isinstance(mode, int): 

2088 raise TypeError("mode must be an integer") 

2089 

2090 if callback is None: 

2091 self._verify_helper = None 

2092 self._verify_callback = None 

2093 _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL) 

2094 else: 

2095 if not callable(callback): 

2096 raise TypeError("callback must be callable") 

2097 

2098 self._verify_helper = _VerifyHelper(callback) 

2099 self._verify_callback = self._verify_helper.callback 

2100 _lib.SSL_set_verify(self._ssl, mode, self._verify_callback) 

2101 

2102 def get_verify_mode(self) -> int: 

2103 """ 

2104 Retrieve the Connection object's verify mode, as set by 

2105 :meth:`set_verify`. 

2106 

2107 :return: The verify mode 

2108 """ 

2109 return _lib.SSL_get_verify_mode(self._ssl) 

2110 

2111 def use_certificate(self, cert: X509 | x509.Certificate) -> None: 

2112 """ 

2113 Load a certificate from a X509 object 

2114 

2115 :param cert: The X509 object 

2116 :return: None 

2117 """ 

2118 # Mirrored from Context.use_certificate 

2119 if not isinstance(cert, X509): 

2120 cert = X509.from_cryptography(cert) 

2121 else: 

2122 warnings.warn( 

2123 ( 

2124 "Passing pyOpenSSL X509 objects is deprecated. You " 

2125 "should use a cryptography.x509.Certificate instead." 

2126 ), 

2127 DeprecationWarning, 

2128 stacklevel=2, 

2129 ) 

2130 

2131 use_result = _lib.SSL_use_certificate(self._ssl, cert._x509) 

2132 if not use_result: 

2133 _raise_current_error() 

2134 

2135 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: 

2136 """ 

2137 Load a private key from a PKey object 

2138 

2139 :param pkey: The PKey object 

2140 :return: None 

2141 """ 

2142 # Mirrored from Context.use_privatekey 

2143 if not isinstance(pkey, PKey): 

2144 pkey = PKey.from_cryptography_key(pkey) 

2145 else: 

2146 warnings.warn( 

2147 ( 

2148 "Passing pyOpenSSL PKey objects is deprecated. You " 

2149 "should use a cryptography private key instead." 

2150 ), 

2151 DeprecationWarning, 

2152 stacklevel=2, 

2153 ) 

2154 

2155 use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey) 

2156 if not use_result: 

2157 self._context._raise_passphrase_exception() 

2158 

2159 def set_ciphertext_mtu(self, mtu: int) -> None: 

2160 """ 

2161 For DTLS, set the maximum UDP payload size (*not* including IP/UDP 

2162 overhead). 

2163 

2164 Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent 

2165 OpenSSL from spontaneously clearing this. 

2166 

2167 :param mtu: An integer giving the maximum transmission unit. 

2168 

2169 .. versionadded:: 21.1 

2170 """ 

2171 _lib.SSL_set_mtu(self._ssl, mtu) 

2172 

2173 def get_cleartext_mtu(self) -> int: 

2174 """ 

2175 For DTLS, get the maximum size of unencrypted data you can pass to 

2176 :meth:`write` without exceeding the MTU (as passed to 

2177 :meth:`set_ciphertext_mtu`). 

2178 

2179 :return: The effective MTU as an integer. 

2180 

2181 .. versionadded:: 21.1 

2182 """ 

2183 

2184 if not hasattr(_lib, "DTLS_get_data_mtu"): 

2185 raise NotImplementedError("requires OpenSSL 1.1.1 or better") 

2186 return _lib.DTLS_get_data_mtu(self._ssl) 

2187 

2188 def set_tlsext_host_name(self, name: bytes) -> None: 

2189 """ 

2190 Set the value of the servername extension to send in the client hello. 

2191 

2192 :param name: A byte string giving the name. 

2193 

2194 .. versionadded:: 0.13 

2195 """ 

2196 if not isinstance(name, bytes): 

2197 raise TypeError("name must be a byte string") 

2198 elif b"\0" in name: 

2199 raise TypeError("name must not contain NUL byte") 

2200 

2201 # XXX I guess this can fail sometimes? 

2202 _lib.SSL_set_tlsext_host_name(self._ssl, name) 

2203 

2204 def pending(self) -> int: 

2205 """ 

2206 Get the number of bytes that can be safely read from the SSL buffer 

2207 (**not** the underlying transport buffer). 

2208 

2209 :return: The number of bytes available in the receive buffer. 

2210 """ 

2211 return _lib.SSL_pending(self._ssl) 

2212 

2213 def send(self, buf: bytes, flags: int = 0) -> int: 

2214 """ 

2215 Send data on the connection. NOTE: If you get one of the WantRead, 

2216 WantWrite or WantX509Lookup exceptions on this, you have to call the 

2217 method again with the SAME buffer. 

2218 

2219 :param buf: The string, buffer or memoryview to send 

2220 :param flags: (optional) Included for compatibility with the socket 

2221 API, the value is ignored 

2222 :return: The number of bytes written 

2223 """ 

2224 # Backward compatibility 

2225 buf = _text_to_bytes_and_warn("buf", buf) 

2226 

2227 with _ffi.from_buffer(buf) as data: 

2228 # check len(buf) instead of len(data) for testability 

2229 if len(buf) > 2147483647: 

2230 raise ValueError( 

2231 "Cannot send more than 2**31-1 bytes at once." 

2232 ) 

2233 

2234 result = _lib.SSL_write(self._ssl, data, len(data)) 

2235 self._raise_ssl_error(self._ssl, result) 

2236 

2237 return result 

2238 

2239 write = send 

2240 

2241 def sendall(self, buf: bytes, flags: int = 0) -> int: 

2242 """ 

2243 Send "all" data on the connection. This calls send() repeatedly until 

2244 all data is sent. If an error occurs, it's impossible to tell how much 

2245 data has been sent. 

2246 

2247 :param buf: The string, buffer or memoryview to send 

2248 :param flags: (optional) Included for compatibility with the socket 

2249 API, the value is ignored 

2250 :return: The number of bytes written 

2251 """ 

2252 buf = _text_to_bytes_and_warn("buf", buf) 

2253 

2254 with _ffi.from_buffer(buf) as data: 

2255 left_to_send = len(buf) 

2256 total_sent = 0 

2257 

2258 while left_to_send: 

2259 # SSL_write's num arg is an int, 

2260 # so we cannot send more than 2**31-1 bytes at once. 

2261 result = _lib.SSL_write( 

2262 self._ssl, data + total_sent, min(left_to_send, 2147483647) 

2263 ) 

2264 self._raise_ssl_error(self._ssl, result) 

2265 total_sent += result 

2266 left_to_send -= result 

2267 

2268 return total_sent 

2269 

2270 def recv(self, bufsiz: int, flags: int | None = None) -> bytes: 

2271 """ 

2272 Receive data on the connection. 

2273 

2274 :param bufsiz: The maximum number of bytes to read 

2275 :param flags: (optional) The only supported flag is ``MSG_PEEK``, 

2276 all other flags are ignored. 

2277 :return: The string read from the Connection 

2278 """ 

2279 buf = _no_zero_allocator("char[]", bufsiz) 

2280 if flags is not None and flags & socket.MSG_PEEK: 

2281 result = _lib.SSL_peek(self._ssl, buf, bufsiz) 

2282 else: 

2283 result = _lib.SSL_read(self._ssl, buf, bufsiz) 

2284 self._raise_ssl_error(self._ssl, result) 

2285 return _ffi.buffer(buf, result)[:] 

2286 

2287 read = recv 

2288 

2289 def recv_into( 

2290 self, 

2291 buffer: Any, # collections.abc.Buffer once we use Python 3.12+ 

2292 nbytes: int | None = None, 

2293 flags: int | None = None, 

2294 ) -> int: 

2295 """ 

2296 Receive data on the connection and copy it directly into the provided 

2297 buffer, rather than creating a new string. 

2298 

2299 :param buffer: The buffer to copy into. 

2300 :param nbytes: (optional) The maximum number of bytes to read into the 

2301 buffer. If not present, defaults to the size of the buffer. If 

2302 larger than the size of the buffer, is reduced to the size of the 

2303 buffer. 

2304 :param flags: (optional) The only supported flag is ``MSG_PEEK``, 

2305 all other flags are ignored. 

2306 :return: The number of bytes read into the buffer. 

2307 """ 

2308 if nbytes is None: 

2309 nbytes = len(buffer) 

2310 else: 

2311 nbytes = min(nbytes, len(buffer)) 

2312 

2313 # We need to create a temporary buffer. This is annoying, it would be 

2314 # better if we could pass memoryviews straight into the SSL_read call, 

2315 # but right now we can't. Revisit this if CFFI gets that ability. 

2316 buf = _no_zero_allocator("char[]", nbytes) 

2317 if flags is not None and flags & socket.MSG_PEEK: 

2318 result = _lib.SSL_peek(self._ssl, buf, nbytes) 

2319 else: 

2320 result = _lib.SSL_read(self._ssl, buf, nbytes) 

2321 self._raise_ssl_error(self._ssl, result) 

2322 

2323 # This strange line is all to avoid a memory copy. The buffer protocol 

2324 # should allow us to assign a CFFI buffer to the LHS of this line, but 

2325 # on CPython 3.3+ that segfaults. As a workaround, we can temporarily 

2326 # wrap it in a memoryview. 

2327 buffer[:result] = memoryview(_ffi.buffer(buf, result)) 

2328 

2329 return result 

2330 

2331 def _handle_bio_errors(self, bio: Any, result: int) -> typing.NoReturn: 

2332 if _lib.BIO_should_retry(bio): 

2333 if _lib.BIO_should_read(bio): 

2334 raise WantReadError() 

2335 elif _lib.BIO_should_write(bio): 

2336 # TODO: This is untested. 

2337 raise WantWriteError() 

2338 elif _lib.BIO_should_io_special(bio): 

2339 # TODO: This is untested. I think io_special means the socket 

2340 # BIO has a not-yet connected socket. 

2341 raise ValueError("BIO_should_io_special") 

2342 else: 

2343 # TODO: This is untested. 

2344 raise ValueError("unknown bio failure") 

2345 else: 

2346 # TODO: This is untested. 

2347 _raise_current_error() 

2348 

2349 def bio_read(self, bufsiz: int) -> bytes: 

2350 """ 

2351 If the Connection was created with a memory BIO, this method can be 

2352 used to read bytes from the write end of that memory BIO. Many 

2353 Connection methods will add bytes which must be read in this manner or 

2354 the buffer will eventually fill up and the Connection will be able to 

2355 take no further actions. 

2356 

2357 :param bufsiz: The maximum number of bytes to read 

2358 :return: The string read. 

2359 """ 

2360 if self._from_ssl is None: 

2361 raise TypeError("Connection sock was not None") 

2362 

2363 if not isinstance(bufsiz, int): 

2364 raise TypeError("bufsiz must be an integer") 

2365 

2366 buf = _no_zero_allocator("char[]", bufsiz) 

2367 result = _lib.BIO_read(self._from_ssl, buf, bufsiz) 

2368 if result <= 0: 

2369 self._handle_bio_errors(self._from_ssl, result) 

2370 

2371 return _ffi.buffer(buf, result)[:] 

2372 

2373 def bio_write(self, buf: bytes) -> int: 

2374 """ 

2375 If the Connection was created with a memory BIO, this method can be 

2376 used to add bytes to the read end of that memory BIO. The Connection 

2377 can then read the bytes (for example, in response to a call to 

2378 :meth:`recv`). 

2379 

2380 :param buf: The string to put into the memory BIO. 

2381 :return: The number of bytes written 

2382 """ 

2383 buf = _text_to_bytes_and_warn("buf", buf) 

2384 

2385 if self._into_ssl is None: 

2386 raise TypeError("Connection sock was not None") 

2387 

2388 with _ffi.from_buffer(buf) as data: 

2389 result = _lib.BIO_write(self._into_ssl, data, len(data)) 

2390 if result <= 0: 

2391 self._handle_bio_errors(self._into_ssl, result) 

2392 return result 

2393 

2394 def renegotiate(self) -> bool: 

2395 """ 

2396 Renegotiate the session. 

2397 

2398 :return: True if the renegotiation can be started, False otherwise 

2399 """ 

2400 if not self.renegotiate_pending(): 

2401 _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1) 

2402 return True 

2403 return False 

2404 

2405 def do_handshake(self) -> None: 

2406 """ 

2407 Perform an SSL handshake (usually called after :meth:`renegotiate` or 

2408 one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can 

2409 raise the same exceptions as :meth:`send` and :meth:`recv`. 

2410 

2411 :return: None. 

2412 """ 

2413 result = _lib.SSL_do_handshake(self._ssl) 

2414 self._raise_ssl_error(self._ssl, result) 

2415 

2416 def renegotiate_pending(self) -> bool: 

2417 """ 

2418 Check if there's a renegotiation in progress, it will return False once 

2419 a renegotiation is finished. 

2420 

2421 :return: Whether there's a renegotiation in progress 

2422 """ 

2423 return _lib.SSL_renegotiate_pending(self._ssl) == 1 

2424 

2425 def total_renegotiations(self) -> int: 

2426 """ 

2427 Find out the total number of renegotiations. 

2428 

2429 :return: The number of renegotiations. 

2430 """ 

2431 return _lib.SSL_total_renegotiations(self._ssl) 

2432 

2433 def connect(self, addr: Any) -> None: 

2434 """ 

2435 Call the :meth:`connect` method of the underlying socket and set up SSL 

2436 on the socket, using the :class:`Context` object supplied to this 

2437 :class:`Connection` object at creation. 

2438 

2439 :param addr: A remote address 

2440 :return: What the socket's connect method returns 

2441 """ 

2442 _lib.SSL_set_connect_state(self._ssl) 

2443 return self._socket.connect(addr) # type: ignore[return-value, union-attr] 

2444 

2445 def connect_ex(self, addr: Any) -> int: 

2446 """ 

2447 Call the :meth:`connect_ex` method of the underlying socket and set up 

2448 SSL on the socket, using the Context object supplied to this Connection 

2449 object at creation. Note that if the :meth:`connect_ex` method of the 

2450 socket doesn't return 0, SSL won't be initialized. 

2451 

2452 :param addr: A remove address 

2453 :return: What the socket's connect_ex method returns 

2454 """ 

2455 connect_ex = self._socket.connect_ex # type: ignore[union-attr] 

2456 self.set_connect_state() 

2457 return connect_ex(addr) 

2458 

2459 def accept(self) -> tuple[Connection, Any]: 

2460 """ 

2461 Call the :meth:`accept` method of the underlying socket and set up SSL 

2462 on the returned socket, using the Context object supplied to this 

2463 :class:`Connection` object at creation. 

2464 

2465 :return: A *(conn, addr)* pair where *conn* is the new 

2466 :class:`Connection` object created, and *address* is as returned by 

2467 the socket's :meth:`accept`. 

2468 """ 

2469 client, addr = self._socket.accept() # type: ignore[union-attr] 

2470 conn = Connection(self._context, client) 

2471 conn.set_accept_state() 

2472 return (conn, addr) 

2473 

2474 def DTLSv1_listen(self) -> None: 

2475 """ 

2476 Call the OpenSSL function DTLSv1_listen on this connection. See the 

2477 OpenSSL manual for more details. 

2478 

2479 :return: None 

2480 """ 

2481 # Possible future extension: return the BIO_ADDR in some form. 

2482 bio_addr = _lib.BIO_ADDR_new() 

2483 try: 

2484 result = _lib.DTLSv1_listen(self._ssl, bio_addr) 

2485 finally: 

2486 _lib.BIO_ADDR_free(bio_addr) 

2487 # DTLSv1_listen is weird. A zero return value means 'didn't find a 

2488 # ClientHello with valid cookie, but keep trying'. So basically 

2489 # WantReadError. But it doesn't work correctly with _raise_ssl_error. 

2490 # So we raise it manually instead. 

2491 if self._cookie_generate_helper is not None: 

2492 self._cookie_generate_helper.raise_if_problem() 

2493 if self._cookie_verify_helper is not None: 

2494 self._cookie_verify_helper.raise_if_problem() 

2495 if result == 0: 

2496 raise WantReadError() 

2497 if result < 0: 

2498 self._raise_ssl_error(self._ssl, result) 

2499 

2500 def DTLSv1_get_timeout(self) -> int | None: 

2501 """ 

2502 Determine when the DTLS SSL object next needs to perform internal 

2503 processing due to the passage of time. 

2504 

2505 When the returned number of seconds have passed, the 

2506 :meth:`DTLSv1_handle_timeout` method needs to be called. 

2507 

2508 :return: The time left in seconds before the next timeout or `None` 

2509 if no timeout is currently active. 

2510 """ 

2511 ptv_sec = _ffi.new("time_t *") 

2512 ptv_usec = _ffi.new("long *") 

2513 if _lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec): 

2514 return ptv_sec[0] + (ptv_usec[0] / 1000000) 

2515 else: 

2516 return None 

2517 

2518 def DTLSv1_handle_timeout(self) -> bool: 

2519 """ 

2520 Handles any timeout events which have become pending on a DTLS SSL 

2521 object. 

2522 

2523 :return: `True` if there was a pending timeout, `False` otherwise. 

2524 """ 

2525 result = _lib.DTLSv1_handle_timeout(self._ssl) 

2526 if result < 0: 

2527 self._raise_ssl_error(self._ssl, result) 

2528 assert False, "unreachable" 

2529 else: 

2530 return bool(result) 

2531 

2532 def bio_shutdown(self) -> None: 

2533 """ 

2534 If the Connection was created with a memory BIO, this method can be 

2535 used to indicate that *end of file* has been reached on the read end of 

2536 that memory BIO. 

2537 

2538 :return: None 

2539 """ 

2540 if self._from_ssl is None: 

2541 raise TypeError("Connection sock was not None") 

2542 

2543 _lib.BIO_set_mem_eof_return(self._into_ssl, 0) 

2544 

2545 def shutdown(self) -> bool: 

2546 """ 

2547 Send the shutdown message to the Connection. 

2548 

2549 :return: True if the shutdown completed successfully (i.e. both sides 

2550 have sent closure alerts), False otherwise (in which case you 

2551 call :meth:`recv` or :meth:`send` when the connection becomes 

2552 readable/writeable). 

2553 """ 

2554 result = _lib.SSL_shutdown(self._ssl) 

2555 if result < 0: 

2556 self._raise_ssl_error(self._ssl, result) 

2557 assert False, "unreachable" 

2558 elif result > 0: 

2559 return True 

2560 else: 

2561 return False 

2562 

2563 def get_cipher_list(self) -> list[str]: 

2564 """ 

2565 Retrieve the list of ciphers used by the Connection object. 

2566 

2567 :return: A list of native cipher strings. 

2568 """ 

2569 ciphers = [] 

2570 for i in count(): 

2571 result = _lib.SSL_get_cipher_list(self._ssl, i) 

2572 if result == _ffi.NULL: 

2573 break 

2574 ciphers.append(_ffi.string(result).decode("utf-8")) 

2575 return ciphers 

2576 

2577 def get_client_ca_list(self) -> list[X509Name]: 

2578 """ 

2579 Get CAs whose certificates are suggested for client authentication. 

2580 

2581 :return: If this is a server connection, the list of certificate 

2582 authorities that will be sent or has been sent to the client, as 

2583 controlled by this :class:`Connection`'s :class:`Context`. 

2584 

2585 If this is a client connection, the list will be empty until the 

2586 connection with the server is established. 

2587 

2588 .. versionadded:: 0.10 

2589 """ 

2590 ca_names = _lib.SSL_get_client_CA_list(self._ssl) 

2591 if ca_names == _ffi.NULL: 

2592 # TODO: This is untested. 

2593 return [] 

2594 

2595 result = [] 

2596 for i in range(_lib.sk_X509_NAME_num(ca_names)): 

2597 name = _lib.sk_X509_NAME_value(ca_names, i) 

2598 copy = _lib.X509_NAME_dup(name) 

2599 _openssl_assert(copy != _ffi.NULL) 

2600 

2601 pyname = X509Name.__new__(X509Name) 

2602 pyname._name = _ffi.gc(copy, _lib.X509_NAME_free) 

2603 result.append(pyname) 

2604 return result 

2605 

2606 def makefile(self, *args: Any, **kwargs: Any) -> typing.NoReturn: 

2607 """ 

2608 The makefile() method is not implemented, since there is no dup 

2609 semantics for SSL connections 

2610 

2611 :raise: NotImplementedError 

2612 """ 

2613 raise NotImplementedError( 

2614 "Cannot make file object of OpenSSL.SSL.Connection" 

2615 ) 

2616 

2617 def get_app_data(self) -> Any: 

2618 """ 

2619 Retrieve application data as set by :meth:`set_app_data`. 

2620 

2621 :return: The application data 

2622 """ 

2623 return self._app_data 

2624 

2625 def set_app_data(self, data: Any) -> None: 

2626 """ 

2627 Set application data 

2628 

2629 :param data: The application data 

2630 :return: None 

2631 """ 

2632 self._app_data = data 

2633 

2634 def get_shutdown(self) -> int: 

2635 """ 

2636 Get the shutdown state of the Connection. 

2637 

2638 :return: The shutdown state, a bitvector of SENT_SHUTDOWN, 

2639 RECEIVED_SHUTDOWN. 

2640 """ 

2641 return _lib.SSL_get_shutdown(self._ssl) 

2642 

2643 def set_shutdown(self, state: int) -> None: 

2644 """ 

2645 Set the shutdown state of the Connection. 

2646 

2647 :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN. 

2648 :return: None 

2649 """ 

2650 if not isinstance(state, int): 

2651 raise TypeError("state must be an integer") 

2652 

2653 _lib.SSL_set_shutdown(self._ssl, state) 

2654 

2655 def get_state_string(self) -> bytes: 

2656 """ 

2657 Retrieve a verbose string detailing the state of the Connection. 

2658 

2659 :return: A string representing the state 

2660 """ 

2661 return _ffi.string(_lib.SSL_state_string_long(self._ssl)) 

2662 

2663 def server_random(self) -> bytes | None: 

2664 """ 

2665 Retrieve the random value used with the server hello message. 

2666 

2667 :return: A string representing the state 

2668 """ 

2669 session = _lib.SSL_get_session(self._ssl) 

2670 if session == _ffi.NULL: 

2671 return None 

2672 length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0) 

2673 _openssl_assert(length > 0) 

2674 outp = _no_zero_allocator("unsigned char[]", length) 

2675 _lib.SSL_get_server_random(self._ssl, outp, length) 

2676 return _ffi.buffer(outp, length)[:] 

2677 

2678 def client_random(self) -> bytes | None: 

2679 """ 

2680 Retrieve the random value used with the client hello message. 

2681 

2682 :return: A string representing the state 

2683 """ 

2684 session = _lib.SSL_get_session(self._ssl) 

2685 if session == _ffi.NULL: 

2686 return None 

2687 

2688 length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0) 

2689 _openssl_assert(length > 0) 

2690 outp = _no_zero_allocator("unsigned char[]", length) 

2691 _lib.SSL_get_client_random(self._ssl, outp, length) 

2692 return _ffi.buffer(outp, length)[:] 

2693 

2694 def master_key(self) -> bytes | None: 

2695 """ 

2696 Retrieve the value of the master key for this session. 

2697 

2698 :return: A string representing the state 

2699 """ 

2700 session = _lib.SSL_get_session(self._ssl) 

2701 if session == _ffi.NULL: 

2702 return None 

2703 

2704 length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0) 

2705 _openssl_assert(length > 0) 

2706 outp = _no_zero_allocator("unsigned char[]", length) 

2707 _lib.SSL_SESSION_get_master_key(session, outp, length) 

2708 return _ffi.buffer(outp, length)[:] 

2709 

2710 def export_keying_material( 

2711 self, label: bytes, olen: int, context: bytes | None = None 

2712 ) -> bytes: 

2713 """ 

2714 Obtain keying material for application use. 

2715 

2716 :param: label - a disambiguating label string as described in RFC 5705 

2717 :param: olen - the length of the exported key material in bytes 

2718 :param: context - a per-association context value 

2719 :return: the exported key material bytes or None 

2720 """ 

2721 outp = _no_zero_allocator("unsigned char[]", olen) 

2722 context_buf = _ffi.NULL 

2723 context_len = 0 

2724 use_context = 0 

2725 if context is not None: 

2726 context_buf = context 

2727 context_len = len(context) 

2728 use_context = 1 

2729 success = _lib.SSL_export_keying_material( 

2730 self._ssl, 

2731 outp, 

2732 olen, 

2733 label, 

2734 len(label), 

2735 context_buf, 

2736 context_len, 

2737 use_context, 

2738 ) 

2739 _openssl_assert(success == 1) 

2740 return _ffi.buffer(outp, olen)[:] 

2741 

2742 def sock_shutdown(self, *args: Any, **kwargs: Any) -> None: 

2743 """ 

2744 Call the :meth:`shutdown` method of the underlying socket. 

2745 See :manpage:`shutdown(2)`. 

2746 

2747 :return: What the socket's shutdown() method returns 

2748 """ 

2749 return self._socket.shutdown(*args, **kwargs) # type: ignore[return-value, union-attr] 

2750 

2751 @typing.overload 

2752 def get_certificate( 

2753 self, *, as_cryptography: typing.Literal[True] 

2754 ) -> x509.Certificate | None: 

2755 pass 

2756 

2757 @typing.overload 

2758 def get_certificate( 

2759 self, *, as_cryptography: typing.Literal[False] = False 

2760 ) -> X509 | None: 

2761 pass 

2762 

2763 def get_certificate( 

2764 self, 

2765 *, 

2766 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2767 ) -> X509 | x509.Certificate | None: 

2768 """ 

2769 Retrieve the local certificate (if any) 

2770 

2771 :param bool as_cryptography: Controls whether a 

2772 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509`` 

2773 object should be returned. 

2774 

2775 :return: The local certificate 

2776 """ 

2777 cert = _lib.SSL_get_certificate(self._ssl) 

2778 if cert != _ffi.NULL: 

2779 _lib.X509_up_ref(cert) 

2780 pycert = X509._from_raw_x509_ptr(cert) 

2781 if as_cryptography: 

2782 return pycert.to_cryptography() 

2783 return pycert 

2784 return None 

2785 

2786 @typing.overload 

2787 def get_peer_certificate( 

2788 self, *, as_cryptography: typing.Literal[True] 

2789 ) -> x509.Certificate | None: 

2790 pass 

2791 

2792 @typing.overload 

2793 def get_peer_certificate( 

2794 self, *, as_cryptography: typing.Literal[False] = False 

2795 ) -> X509 | None: 

2796 pass 

2797 

2798 def get_peer_certificate( 

2799 self, 

2800 *, 

2801 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2802 ) -> X509 | x509.Certificate | None: 

2803 """ 

2804 Retrieve the other side's certificate (if any) 

2805 

2806 :param bool as_cryptography: Controls whether a 

2807 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509`` 

2808 object should be returned. 

2809 

2810 :return: The peer's certificate 

2811 """ 

2812 cert = _lib.SSL_get_peer_certificate(self._ssl) 

2813 if cert != _ffi.NULL: 

2814 pycert = X509._from_raw_x509_ptr(cert) 

2815 if as_cryptography: 

2816 return pycert.to_cryptography() 

2817 return pycert 

2818 return None 

2819 

2820 @staticmethod 

2821 def _cert_stack_to_list(cert_stack: Any) -> list[X509]: 

2822 """ 

2823 Internal helper to convert a STACK_OF(X509) to a list of X509 

2824 instances. 

2825 """ 

2826 result = [] 

2827 for i in range(_lib.sk_X509_num(cert_stack)): 

2828 cert = _lib.sk_X509_value(cert_stack, i) 

2829 _openssl_assert(cert != _ffi.NULL) 

2830 res = _lib.X509_up_ref(cert) 

2831 _openssl_assert(res >= 1) 

2832 pycert = X509._from_raw_x509_ptr(cert) 

2833 result.append(pycert) 

2834 return result 

2835 

2836 @staticmethod 

2837 def _cert_stack_to_cryptography_list( 

2838 cert_stack: Any, 

2839 ) -> list[x509.Certificate]: 

2840 """ 

2841 Internal helper to convert a STACK_OF(X509) to a list of X509 

2842 instances. 

2843 """ 

2844 result = [] 

2845 for i in range(_lib.sk_X509_num(cert_stack)): 

2846 cert = _lib.sk_X509_value(cert_stack, i) 

2847 _openssl_assert(cert != _ffi.NULL) 

2848 res = _lib.X509_up_ref(cert) 

2849 _openssl_assert(res >= 1) 

2850 pycert = X509._from_raw_x509_ptr(cert) 

2851 result.append(pycert.to_cryptography()) 

2852 return result 

2853 

2854 @typing.overload 

2855 def get_peer_cert_chain( 

2856 self, *, as_cryptography: typing.Literal[True] 

2857 ) -> list[x509.Certificate] | None: 

2858 pass 

2859 

2860 @typing.overload 

2861 def get_peer_cert_chain( 

2862 self, *, as_cryptography: typing.Literal[False] = False 

2863 ) -> list[X509] | None: 

2864 pass 

2865 

2866 def get_peer_cert_chain( 

2867 self, 

2868 *, 

2869 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2870 ) -> list[X509] | list[x509.Certificate] | None: 

2871 """ 

2872 Retrieve the other side's certificate (if any) 

2873 

2874 :param bool as_cryptography: Controls whether a list of 

2875 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509`` 

2876 object should be returned. 

2877 

2878 :return: A list of X509 instances giving the peer's certificate chain, 

2879 or None if it does not have one. 

2880 """ 

2881 cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl) 

2882 if cert_stack == _ffi.NULL: 

2883 return None 

2884 

2885 if as_cryptography: 

2886 return self._cert_stack_to_cryptography_list(cert_stack) 

2887 return self._cert_stack_to_list(cert_stack) 

2888 

2889 @typing.overload 

2890 def get_verified_chain( 

2891 self, *, as_cryptography: typing.Literal[True] 

2892 ) -> list[x509.Certificate] | None: 

2893 pass 

2894 

2895 @typing.overload 

2896 def get_verified_chain( 

2897 self, *, as_cryptography: typing.Literal[False] = False 

2898 ) -> list[X509] | None: 

2899 pass 

2900 

2901 def get_verified_chain( 

2902 self, 

2903 *, 

2904 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2905 ) -> list[X509] | list[x509.Certificate] | None: 

2906 """ 

2907 Retrieve the verified certificate chain of the peer including the 

2908 peer's end entity certificate. It must be called after a session has 

2909 been successfully established. If peer verification was not successful 

2910 the chain may be incomplete, invalid, or None. 

2911 

2912 :param bool as_cryptography: Controls whether a list of 

2913 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509`` 

2914 object should be returned. 

2915 

2916 :return: A list of X509 instances giving the peer's verified 

2917 certificate chain, or None if it does not have one. 

2918 

2919 .. versionadded:: 20.0 

2920 """ 

2921 # OpenSSL 1.1+ 

2922 cert_stack = _lib.SSL_get0_verified_chain(self._ssl) 

2923 if cert_stack == _ffi.NULL: 

2924 return None 

2925 

2926 if as_cryptography: 

2927 return self._cert_stack_to_cryptography_list(cert_stack) 

2928 return self._cert_stack_to_list(cert_stack) 

2929 

2930 def want_read(self) -> bool: 

2931 """ 

2932 Checks if more data has to be read from the transport layer to complete 

2933 an operation. 

2934 

2935 :return: True iff more data has to be read 

2936 """ 

2937 return _lib.SSL_want_read(self._ssl) 

2938 

2939 def want_write(self) -> bool: 

2940 """ 

2941 Checks if there is data to write to the transport layer to complete an 

2942 operation. 

2943 

2944 :return: True iff there is data to write 

2945 """ 

2946 return _lib.SSL_want_write(self._ssl) 

2947 

2948 def set_accept_state(self) -> None: 

2949 """ 

2950 Set the connection to work in server mode. The handshake will be 

2951 handled automatically by read/write. 

2952 

2953 :return: None 

2954 """ 

2955 _lib.SSL_set_accept_state(self._ssl) 

2956 

2957 def set_connect_state(self) -> None: 

2958 """ 

2959 Set the connection to work in client mode. The handshake will be 

2960 handled automatically by read/write. 

2961 

2962 :return: None 

2963 """ 

2964 _lib.SSL_set_connect_state(self._ssl) 

2965 

2966 def get_session(self) -> Session | None: 

2967 """ 

2968 Returns the Session currently used. 

2969 

2970 :return: An instance of :class:`OpenSSL.SSL.Session` or 

2971 :obj:`None` if no session exists. 

2972 

2973 .. versionadded:: 0.14 

2974 """ 

2975 session = _lib.SSL_get1_session(self._ssl) 

2976 if session == _ffi.NULL: 

2977 return None 

2978 

2979 pysession = Session.__new__(Session) 

2980 pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free) 

2981 return pysession 

2982 

2983 def set_session(self, session: Session) -> None: 

2984 """ 

2985 Set the session to be used when the TLS/SSL connection is established. 

2986 

2987 :param session: A Session instance representing the session to use. 

2988 :returns: None 

2989 

2990 .. versionadded:: 0.14 

2991 """ 

2992 if not isinstance(session, Session): 

2993 raise TypeError("session must be a Session instance") 

2994 

2995 result = _lib.SSL_set_session(self._ssl, session._session) 

2996 _openssl_assert(result == 1) 

2997 

2998 def _get_finished_message( 

2999 self, function: Callable[[Any, Any, int], int] 

3000 ) -> bytes | None: 

3001 """ 

3002 Helper to implement :meth:`get_finished` and 

3003 :meth:`get_peer_finished`. 

3004 

3005 :param function: Either :data:`SSL_get_finished`: or 

3006 :data:`SSL_get_peer_finished`. 

3007 

3008 :return: :data:`None` if the desired message has not yet been 

3009 received, otherwise the contents of the message. 

3010 """ 

3011 # The OpenSSL documentation says nothing about what might happen if the 

3012 # count argument given is zero. Specifically, it doesn't say whether 

3013 # the output buffer may be NULL in that case or not. Inspection of the 

3014 # implementation reveals that it calls memcpy() unconditionally. 

3015 # Section 7.1.4, paragraph 1 of the C standard suggests that 

3016 # memcpy(NULL, source, 0) is not guaranteed to produce defined (let 

3017 # alone desirable) behavior (though it probably does on just about 

3018 # every implementation...) 

3019 # 

3020 # Allocate a tiny buffer to pass in (instead of just passing NULL as 

3021 # one might expect) for the initial call so as to be safe against this 

3022 # potentially undefined behavior. 

3023 empty = _ffi.new("char[]", 0) 

3024 size = function(self._ssl, empty, 0) 

3025 if size == 0: 

3026 # No Finished message so far. 

3027 return None 

3028 

3029 buf = _no_zero_allocator("char[]", size) 

3030 function(self._ssl, buf, size) 

3031 return _ffi.buffer(buf, size)[:] 

3032 

3033 def get_finished(self) -> bytes | None: 

3034 """ 

3035 Obtain the latest TLS Finished message that we sent. 

3036 

3037 :return: The contents of the message or :obj:`None` if the TLS 

3038 handshake has not yet completed. 

3039 

3040 .. versionadded:: 0.15 

3041 """ 

3042 return self._get_finished_message(_lib.SSL_get_finished) 

3043 

3044 def get_peer_finished(self) -> bytes | None: 

3045 """ 

3046 Obtain the latest TLS Finished message that we received from the peer. 

3047 

3048 :return: The contents of the message or :obj:`None` if the TLS 

3049 handshake has not yet completed. 

3050 

3051 .. versionadded:: 0.15 

3052 """ 

3053 return self._get_finished_message(_lib.SSL_get_peer_finished) 

3054 

3055 def get_cipher_name(self) -> str | None: 

3056 """ 

3057 Obtain the name of the currently used cipher. 

3058 

3059 :returns: The name of the currently used cipher or :obj:`None` 

3060 if no connection has been established. 

3061 

3062 .. versionadded:: 0.15 

3063 """ 

3064 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3065 if cipher == _ffi.NULL: 

3066 return None 

3067 else: 

3068 name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher)) 

3069 return name.decode("utf-8") 

3070 

3071 def get_cipher_bits(self) -> int | None: 

3072 """ 

3073 Obtain the number of secret bits of the currently used cipher. 

3074 

3075 :returns: The number of secret bits of the currently used cipher 

3076 or :obj:`None` if no connection has been established. 

3077 

3078 .. versionadded:: 0.15 

3079 """ 

3080 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3081 if cipher == _ffi.NULL: 

3082 return None 

3083 else: 

3084 return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL) 

3085 

3086 def get_cipher_version(self) -> str | None: 

3087 """ 

3088 Obtain the protocol version of the currently used cipher. 

3089 

3090 :returns: The protocol name of the currently used cipher 

3091 or :obj:`None` if no connection has been established. 

3092 

3093 .. versionadded:: 0.15 

3094 """ 

3095 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3096 if cipher == _ffi.NULL: 

3097 return None 

3098 else: 

3099 version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher)) 

3100 return version.decode("utf-8") 

3101 

3102 def get_protocol_version_name(self) -> str: 

3103 """ 

3104 Retrieve the protocol version of the current connection. 

3105 

3106 :returns: The TLS version of the current connection, for example 

3107 the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown`` 

3108 for connections that were not successfully established. 

3109 """ 

3110 version = _ffi.string(_lib.SSL_get_version(self._ssl)) 

3111 return version.decode("utf-8") 

3112 

3113 def get_protocol_version(self) -> int: 

3114 """ 

3115 Retrieve the SSL or TLS protocol version of the current connection. 

3116 

3117 :returns: The TLS version of the current connection. For example, 

3118 it will return ``0x769`` for connections made over TLS version 1. 

3119 """ 

3120 version = _lib.SSL_version(self._ssl) 

3121 return version 

3122 

3123 def set_alpn_protos(self, protos: list[bytes]) -> None: 

3124 """ 

3125 Specify the client's ALPN protocol list. 

3126 

3127 These protocols are offered to the server during protocol negotiation. 

3128 

3129 :param protos: A list of the protocols to be offered to the server. 

3130 This list should be a Python list of bytestrings representing the 

3131 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. 

3132 """ 

3133 # Different versions of OpenSSL are inconsistent about how they handle 

3134 # empty proto lists (see #1043), so we avoid the problem entirely by 

3135 # rejecting them ourselves. 

3136 if not protos: 

3137 raise ValueError("at least one protocol must be specified") 

3138 

3139 # Take the list of protocols and join them together, prefixing them 

3140 # with their lengths. 

3141 protostr = b"".join( 

3142 chain.from_iterable((bytes((len(p),)), p) for p in protos) 

3143 ) 

3144 

3145 # Build a C string from the list. We don't need to save this off 

3146 # because OpenSSL immediately copies the data out. 

3147 input_str = _ffi.new("unsigned char[]", protostr) 

3148 

3149 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: 

3150 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() 

3151 # return 0 on success, and non-0 on failure. 

3152 # WARNING: these functions reverse the return value convention. 

3153 _openssl_assert( 

3154 _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0 

3155 ) 

3156 

3157 def get_alpn_proto_negotiated(self) -> bytes: 

3158 """ 

3159 Get the protocol that was negotiated by ALPN. 

3160 

3161 :returns: A bytestring of the protocol name. If no protocol has been 

3162 negotiated yet, returns an empty bytestring. 

3163 """ 

3164 data = _ffi.new("unsigned char **") 

3165 data_len = _ffi.new("unsigned int *") 

3166 

3167 _lib.SSL_get0_alpn_selected(self._ssl, data, data_len) 

3168 

3169 if not data_len: 

3170 return b"" 

3171 

3172 return _ffi.buffer(data[0], data_len[0])[:] 

3173 

3174 def get_selected_srtp_profile(self) -> bytes: 

3175 """ 

3176 Get the SRTP protocol which was negotiated. 

3177 

3178 :returns: A bytestring of the SRTP profile name. If no profile has been 

3179 negotiated yet, returns an empty bytestring. 

3180 """ 

3181 profile = _lib.SSL_get_selected_srtp_profile(self._ssl) 

3182 if not profile: 

3183 return b"" 

3184 

3185 return _ffi.string(profile.name) 

3186 

3187 def request_ocsp(self) -> None: 

3188 """ 

3189 Called to request that the server sends stapled OCSP data, if 

3190 available. If this is not called on the client side then the server 

3191 will not send OCSP data. Should be used in conjunction with 

3192 :meth:`Context.set_ocsp_client_callback`. 

3193 """ 

3194 rc = _lib.SSL_set_tlsext_status_type( 

3195 self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp 

3196 ) 

3197 _openssl_assert(rc == 1)