Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/SSL.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1240 statements  

1from __future__ import annotations 

2 

3import os 

4import socket 

5import sys 

6import typing 

7import warnings 

8from collections.abc import Sequence 

9from errno import errorcode 

10from functools import partial, wraps 

11from itertools import chain, count 

12from sys import platform 

13from typing import Any, Callable, Optional, TypeVar 

14from weakref import WeakValueDictionary 

15 

16from cryptography import x509 

17from cryptography.hazmat.primitives.asymmetric import ec 

18 

19from OpenSSL._util import ( 

20 StrOrBytesPath as _StrOrBytesPath, 

21) 

22from OpenSSL._util import ( 

23 exception_from_error_queue as _exception_from_error_queue, 

24) 

25from OpenSSL._util import ( 

26 ffi as _ffi, 

27) 

28from OpenSSL._util import ( 

29 lib as _lib, 

30) 

31from OpenSSL._util import ( 

32 make_assert as _make_assert, 

33) 

34from OpenSSL._util import ( 

35 no_zero_allocator as _no_zero_allocator, 

36) 

37from OpenSSL._util import ( 

38 path_bytes as _path_bytes, 

39) 

40from OpenSSL._util import ( 

41 text_to_bytes_and_warn as _text_to_bytes_and_warn, 

42) 

43from OpenSSL.crypto import ( 

44 FILETYPE_PEM, 

45 X509, 

46 PKey, 

47 X509Name, 

48 X509Store, 

49 _EllipticCurve, 

50 _PassphraseHelper, 

51 _PrivateKey, 

52) 

53 

54__all__ = [ 

55 "DTLS_CLIENT_METHOD", 

56 "DTLS_METHOD", 

57 "DTLS_SERVER_METHOD", 

58 "MODE_RELEASE_BUFFERS", 

59 "NO_OVERLAPPING_PROTOCOLS", 

60 "OPENSSL_BUILT_ON", 

61 "OPENSSL_CFLAGS", 

62 "OPENSSL_DIR", 

63 "OPENSSL_PLATFORM", 

64 "OPENSSL_VERSION", 

65 "OPENSSL_VERSION_NUMBER", 

66 "OP_ALL", 

67 "OP_CIPHER_SERVER_PREFERENCE", 

68 "OP_DONT_INSERT_EMPTY_FRAGMENTS", 

69 "OP_EPHEMERAL_RSA", 

70 "OP_MICROSOFT_BIG_SSLV3_BUFFER", 

71 "OP_MICROSOFT_SESS_ID_BUG", 

72 "OP_MSIE_SSLV2_RSA_PADDING", 

73 "OP_NETSCAPE_CA_DN_BUG", 

74 "OP_NETSCAPE_CHALLENGE_BUG", 

75 "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG", 

76 "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG", 

77 "OP_NO_COMPRESSION", 

78 "OP_NO_QUERY_MTU", 

79 "OP_NO_TICKET", 

80 "OP_PKCS1_CHECK_1", 

81 "OP_PKCS1_CHECK_2", 

82 "OP_SINGLE_DH_USE", 

83 "OP_SINGLE_ECDH_USE", 

84 "OP_SSLEAY_080_CLIENT_DH_BUG", 

85 "OP_SSLREF2_REUSE_CERT_TYPE_BUG", 

86 "OP_TLS_BLOCK_PADDING_BUG", 

87 "OP_TLS_D5_BUG", 

88 "OP_TLS_ROLLBACK_BUG", 

89 "RECEIVED_SHUTDOWN", 

90 "SENT_SHUTDOWN", 

91 "SESS_CACHE_BOTH", 

92 "SESS_CACHE_CLIENT", 

93 "SESS_CACHE_NO_AUTO_CLEAR", 

94 "SESS_CACHE_NO_INTERNAL", 

95 "SESS_CACHE_NO_INTERNAL_LOOKUP", 

96 "SESS_CACHE_NO_INTERNAL_STORE", 

97 "SESS_CACHE_OFF", 

98 "SESS_CACHE_SERVER", 

99 "SSL3_VERSION", 

100 "SSLEAY_BUILT_ON", 

101 "SSLEAY_CFLAGS", 

102 "SSLEAY_DIR", 

103 "SSLEAY_PLATFORM", 

104 "SSLEAY_VERSION", 

105 "SSL_CB_ACCEPT_EXIT", 

106 "SSL_CB_ACCEPT_LOOP", 

107 "SSL_CB_ALERT", 

108 "SSL_CB_CONNECT_EXIT", 

109 "SSL_CB_CONNECT_LOOP", 

110 "SSL_CB_EXIT", 

111 "SSL_CB_HANDSHAKE_DONE", 

112 "SSL_CB_HANDSHAKE_START", 

113 "SSL_CB_LOOP", 

114 "SSL_CB_READ", 

115 "SSL_CB_READ_ALERT", 

116 "SSL_CB_WRITE", 

117 "SSL_CB_WRITE_ALERT", 

118 "SSL_ST_ACCEPT", 

119 "SSL_ST_CONNECT", 

120 "SSL_ST_MASK", 

121 "TLS1_1_VERSION", 

122 "TLS1_2_VERSION", 

123 "TLS1_3_VERSION", 

124 "TLS1_VERSION", 

125 "TLS_CLIENT_METHOD", 

126 "TLS_METHOD", 

127 "TLS_SERVER_METHOD", 

128 "VERIFY_CLIENT_ONCE", 

129 "VERIFY_FAIL_IF_NO_PEER_CERT", 

130 "VERIFY_NONE", 

131 "VERIFY_PEER", 

132 "Connection", 

133 "Context", 

134 "Error", 

135 "OP_NO_SSLv2", 

136 "OP_NO_SSLv3", 

137 "OP_NO_TLSv1", 

138 "OP_NO_TLSv1_1", 

139 "OP_NO_TLSv1_2", 

140 "OP_NO_TLSv1_3", 

141 "SSLeay_version", 

142 "SSLv23_METHOD", 

143 "Session", 

144 "SysCallError", 

145 "TLSv1_1_METHOD", 

146 "TLSv1_2_METHOD", 

147 "TLSv1_METHOD", 

148 "WantReadError", 

149 "WantWriteError", 

150 "WantX509LookupError", 

151 "X509VerificationCodes", 

152 "ZeroReturnError", 

153] 

154 

155 

156OPENSSL_VERSION_NUMBER: int = _lib.OPENSSL_VERSION_NUMBER 

157OPENSSL_VERSION: int = _lib.OPENSSL_VERSION 

158OPENSSL_CFLAGS: int = _lib.OPENSSL_CFLAGS 

159OPENSSL_PLATFORM: int = _lib.OPENSSL_PLATFORM 

160OPENSSL_DIR: int = _lib.OPENSSL_DIR 

161OPENSSL_BUILT_ON: int = _lib.OPENSSL_BUILT_ON 

162 

163SSLEAY_VERSION = OPENSSL_VERSION 

164SSLEAY_CFLAGS = OPENSSL_CFLAGS 

165SSLEAY_PLATFORM = OPENSSL_PLATFORM 

166SSLEAY_DIR = OPENSSL_DIR 

167SSLEAY_BUILT_ON = OPENSSL_BUILT_ON 

168 

169SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN 

170RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN 

171 

172SSLv23_METHOD = 3 

173TLSv1_METHOD = 4 

174TLSv1_1_METHOD = 5 

175TLSv1_2_METHOD = 6 

176TLS_METHOD = 7 

177TLS_SERVER_METHOD = 8 

178TLS_CLIENT_METHOD = 9 

179DTLS_METHOD = 10 

180DTLS_SERVER_METHOD = 11 

181DTLS_CLIENT_METHOD = 12 

182 

183SSL3_VERSION: int = _lib.SSL3_VERSION 

184TLS1_VERSION: int = _lib.TLS1_VERSION 

185TLS1_1_VERSION: int = _lib.TLS1_1_VERSION 

186TLS1_2_VERSION: int = _lib.TLS1_2_VERSION 

187TLS1_3_VERSION: int = _lib.TLS1_3_VERSION 

188 

189OP_NO_SSLv2: int = _lib.SSL_OP_NO_SSLv2 

190OP_NO_SSLv3: int = _lib.SSL_OP_NO_SSLv3 

191OP_NO_TLSv1: int = _lib.SSL_OP_NO_TLSv1 

192OP_NO_TLSv1_1: int = _lib.SSL_OP_NO_TLSv1_1 

193OP_NO_TLSv1_2: int = _lib.SSL_OP_NO_TLSv1_2 

194OP_NO_TLSv1_3: int = _lib.SSL_OP_NO_TLSv1_3 

195 

196MODE_RELEASE_BUFFERS: int = _lib.SSL_MODE_RELEASE_BUFFERS 

197 

198OP_SINGLE_DH_USE: int = _lib.SSL_OP_SINGLE_DH_USE 

199OP_SINGLE_ECDH_USE: int = _lib.SSL_OP_SINGLE_ECDH_USE 

200OP_EPHEMERAL_RSA: int = _lib.SSL_OP_EPHEMERAL_RSA 

201OP_MICROSOFT_SESS_ID_BUG: int = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG 

202OP_NETSCAPE_CHALLENGE_BUG: int = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG 

203OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG: int = ( 

204 _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG 

205) 

206OP_SSLREF2_REUSE_CERT_TYPE_BUG: int = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG 

207OP_MICROSOFT_BIG_SSLV3_BUFFER: int = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER 

208OP_MSIE_SSLV2_RSA_PADDING: int = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING 

209OP_SSLEAY_080_CLIENT_DH_BUG: int = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG 

210OP_TLS_D5_BUG: int = _lib.SSL_OP_TLS_D5_BUG 

211OP_TLS_BLOCK_PADDING_BUG: int = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG 

212OP_DONT_INSERT_EMPTY_FRAGMENTS: int = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS 

213OP_CIPHER_SERVER_PREFERENCE: int = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE 

214OP_TLS_ROLLBACK_BUG: int = _lib.SSL_OP_TLS_ROLLBACK_BUG 

215OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1 

216OP_PKCS1_CHECK_2: int = _lib.SSL_OP_PKCS1_CHECK_2 

217OP_NETSCAPE_CA_DN_BUG: int = _lib.SSL_OP_NETSCAPE_CA_DN_BUG 

218OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG: int = ( 

219 _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG 

220) 

221OP_NO_COMPRESSION: int = _lib.SSL_OP_NO_COMPRESSION 

222 

223OP_NO_QUERY_MTU: int = _lib.SSL_OP_NO_QUERY_MTU 

224try: 

225 OP_COOKIE_EXCHANGE: int | None = _lib.SSL_OP_COOKIE_EXCHANGE 

226 __all__.append("OP_COOKIE_EXCHANGE") 

227except AttributeError: 

228 OP_COOKIE_EXCHANGE = None 

229OP_NO_TICKET: int = _lib.SSL_OP_NO_TICKET 

230 

231try: 

232 OP_NO_RENEGOTIATION: int = _lib.SSL_OP_NO_RENEGOTIATION 

233 __all__.append("OP_NO_RENEGOTIATION") 

234except AttributeError: 

235 pass 

236 

237try: 

238 OP_IGNORE_UNEXPECTED_EOF: int = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF 

239 __all__.append("OP_IGNORE_UNEXPECTED_EOF") 

240except AttributeError: 

241 pass 

242 

243try: 

244 OP_LEGACY_SERVER_CONNECT: int = _lib.SSL_OP_LEGACY_SERVER_CONNECT 

245 __all__.append("OP_LEGACY_SERVER_CONNECT") 

246except AttributeError: 

247 pass 

248 

249OP_ALL: int = _lib.SSL_OP_ALL 

250 

251VERIFY_PEER: int = _lib.SSL_VERIFY_PEER 

252VERIFY_FAIL_IF_NO_PEER_CERT: int = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT 

253VERIFY_CLIENT_ONCE: int = _lib.SSL_VERIFY_CLIENT_ONCE 

254VERIFY_NONE: int = _lib.SSL_VERIFY_NONE 

255 

256SESS_CACHE_OFF: int = _lib.SSL_SESS_CACHE_OFF 

257SESS_CACHE_CLIENT: int = _lib.SSL_SESS_CACHE_CLIENT 

258SESS_CACHE_SERVER: int = _lib.SSL_SESS_CACHE_SERVER 

259SESS_CACHE_BOTH: int = _lib.SSL_SESS_CACHE_BOTH 

260SESS_CACHE_NO_AUTO_CLEAR: int = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR 

261SESS_CACHE_NO_INTERNAL_LOOKUP: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP 

262SESS_CACHE_NO_INTERNAL_STORE: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE 

263SESS_CACHE_NO_INTERNAL: int = _lib.SSL_SESS_CACHE_NO_INTERNAL 

264 

265SSL_ST_CONNECT: int = _lib.SSL_ST_CONNECT 

266SSL_ST_ACCEPT: int = _lib.SSL_ST_ACCEPT 

267SSL_ST_MASK: int = _lib.SSL_ST_MASK 

268 

269SSL_CB_LOOP: int = _lib.SSL_CB_LOOP 

270SSL_CB_EXIT: int = _lib.SSL_CB_EXIT 

271SSL_CB_READ: int = _lib.SSL_CB_READ 

272SSL_CB_WRITE: int = _lib.SSL_CB_WRITE 

273SSL_CB_ALERT: int = _lib.SSL_CB_ALERT 

274SSL_CB_READ_ALERT: int = _lib.SSL_CB_READ_ALERT 

275SSL_CB_WRITE_ALERT: int = _lib.SSL_CB_WRITE_ALERT 

276SSL_CB_ACCEPT_LOOP: int = _lib.SSL_CB_ACCEPT_LOOP 

277SSL_CB_ACCEPT_EXIT: int = _lib.SSL_CB_ACCEPT_EXIT 

278SSL_CB_CONNECT_LOOP: int = _lib.SSL_CB_CONNECT_LOOP 

279SSL_CB_CONNECT_EXIT: int = _lib.SSL_CB_CONNECT_EXIT 

280SSL_CB_HANDSHAKE_START: int = _lib.SSL_CB_HANDSHAKE_START 

281SSL_CB_HANDSHAKE_DONE: int = _lib.SSL_CB_HANDSHAKE_DONE 

282 

283_Buffer = typing.Union[bytes, bytearray, memoryview] 

284_T = TypeVar("_T") 

285 

286 

287class _NoOverlappingProtocols: 

288 pass 

289 

290 

291NO_OVERLAPPING_PROTOCOLS = _NoOverlappingProtocols() 

292 

293# Callback types. 

294_ALPNSelectCallback = Callable[ 

295 [ 

296 "Connection", 

297 typing.List[bytes], 

298 ], 

299 typing.Union[bytes, _NoOverlappingProtocols], 

300] 

301_CookieGenerateCallback = Callable[["Connection"], bytes] 

302_CookieVerifyCallback = Callable[["Connection", bytes], bool] 

303_OCSPClientCallback = Callable[["Connection", bytes, Optional[_T]], bool] 

304_OCSPServerCallback = Callable[["Connection", Optional[_T]], bytes] 

305_PassphraseCallback = Callable[[int, bool, Optional[_T]], bytes] 

306_VerifyCallback = Callable[["Connection", X509, int, int, int], bool] 

307 

308 

309class X509VerificationCodes: 

310 """ 

311 Success and error codes for X509 verification, as returned by the 

312 underlying ``X509_STORE_CTX_get_error()`` function and passed by pyOpenSSL 

313 to verification callback functions. 

314 

315 See `OpenSSL Verification Errors 

316 <https://www.openssl.org/docs/manmaster/man3/X509_verify_cert_error_string.html#ERROR-CODES>`_ 

317 for details. 

318 """ 

319 

320 OK = _lib.X509_V_OK 

321 ERR_UNABLE_TO_GET_ISSUER_CERT = _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT 

322 ERR_UNABLE_TO_GET_CRL = _lib.X509_V_ERR_UNABLE_TO_GET_CRL 

323 ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE = ( 

324 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE 

325 ) 

326 ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE = ( 

327 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE 

328 ) 

329 ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY = ( 

330 _lib.X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY 

331 ) 

332 ERR_CERT_SIGNATURE_FAILURE = _lib.X509_V_ERR_CERT_SIGNATURE_FAILURE 

333 ERR_CRL_SIGNATURE_FAILURE = _lib.X509_V_ERR_CRL_SIGNATURE_FAILURE 

334 ERR_CERT_NOT_YET_VALID = _lib.X509_V_ERR_CERT_NOT_YET_VALID 

335 ERR_CERT_HAS_EXPIRED = _lib.X509_V_ERR_CERT_HAS_EXPIRED 

336 ERR_CRL_NOT_YET_VALID = _lib.X509_V_ERR_CRL_NOT_YET_VALID 

337 ERR_CRL_HAS_EXPIRED = _lib.X509_V_ERR_CRL_HAS_EXPIRED 

338 ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD = ( 

339 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD 

340 ) 

341 ERR_ERROR_IN_CERT_NOT_AFTER_FIELD = ( 

342 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD 

343 ) 

344 ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD = ( 

345 _lib.X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD 

346 ) 

347 ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD = ( 

348 _lib.X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD 

349 ) 

350 ERR_OUT_OF_MEM = _lib.X509_V_ERR_OUT_OF_MEM 

351 ERR_DEPTH_ZERO_SELF_SIGNED_CERT = ( 

352 _lib.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT 

353 ) 

354 ERR_SELF_SIGNED_CERT_IN_CHAIN = _lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN 

355 ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY = ( 

356 _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY 

357 ) 

358 ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE = ( 

359 _lib.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE 

360 ) 

361 ERR_CERT_CHAIN_TOO_LONG = _lib.X509_V_ERR_CERT_CHAIN_TOO_LONG 

362 ERR_CERT_REVOKED = _lib.X509_V_ERR_CERT_REVOKED 

363 ERR_INVALID_CA = _lib.X509_V_ERR_INVALID_CA 

364 ERR_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PATH_LENGTH_EXCEEDED 

365 ERR_INVALID_PURPOSE = _lib.X509_V_ERR_INVALID_PURPOSE 

366 ERR_CERT_UNTRUSTED = _lib.X509_V_ERR_CERT_UNTRUSTED 

367 ERR_CERT_REJECTED = _lib.X509_V_ERR_CERT_REJECTED 

368 ERR_SUBJECT_ISSUER_MISMATCH = _lib.X509_V_ERR_SUBJECT_ISSUER_MISMATCH 

369 ERR_AKID_SKID_MISMATCH = _lib.X509_V_ERR_AKID_SKID_MISMATCH 

370 ERR_AKID_ISSUER_SERIAL_MISMATCH = ( 

371 _lib.X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH 

372 ) 

373 ERR_KEYUSAGE_NO_CERTSIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CERTSIGN 

374 ERR_UNABLE_TO_GET_CRL_ISSUER = _lib.X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER 

375 ERR_UNHANDLED_CRITICAL_EXTENSION = ( 

376 _lib.X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION 

377 ) 

378 ERR_KEYUSAGE_NO_CRL_SIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CRL_SIGN 

379 ERR_UNHANDLED_CRITICAL_CRL_EXTENSION = ( 

380 _lib.X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION 

381 ) 

382 ERR_INVALID_NON_CA = _lib.X509_V_ERR_INVALID_NON_CA 

383 ERR_PROXY_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED 

384 ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE = ( 

385 _lib.X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE 

386 ) 

387 ERR_PROXY_CERTIFICATES_NOT_ALLOWED = ( 

388 _lib.X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED 

389 ) 

390 ERR_INVALID_EXTENSION = _lib.X509_V_ERR_INVALID_EXTENSION 

391 ERR_INVALID_POLICY_EXTENSION = _lib.X509_V_ERR_INVALID_POLICY_EXTENSION 

392 ERR_NO_EXPLICIT_POLICY = _lib.X509_V_ERR_NO_EXPLICIT_POLICY 

393 ERR_DIFFERENT_CRL_SCOPE = _lib.X509_V_ERR_DIFFERENT_CRL_SCOPE 

394 ERR_UNSUPPORTED_EXTENSION_FEATURE = ( 

395 _lib.X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE 

396 ) 

397 ERR_UNNESTED_RESOURCE = _lib.X509_V_ERR_UNNESTED_RESOURCE 

398 ERR_PERMITTED_VIOLATION = _lib.X509_V_ERR_PERMITTED_VIOLATION 

399 ERR_EXCLUDED_VIOLATION = _lib.X509_V_ERR_EXCLUDED_VIOLATION 

400 ERR_SUBTREE_MINMAX = _lib.X509_V_ERR_SUBTREE_MINMAX 

401 ERR_UNSUPPORTED_CONSTRAINT_TYPE = ( 

402 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE 

403 ) 

404 ERR_UNSUPPORTED_CONSTRAINT_SYNTAX = ( 

405 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX 

406 ) 

407 ERR_UNSUPPORTED_NAME_SYNTAX = _lib.X509_V_ERR_UNSUPPORTED_NAME_SYNTAX 

408 ERR_CRL_PATH_VALIDATION_ERROR = _lib.X509_V_ERR_CRL_PATH_VALIDATION_ERROR 

409 ERR_HOSTNAME_MISMATCH = _lib.X509_V_ERR_HOSTNAME_MISMATCH 

410 ERR_EMAIL_MISMATCH = _lib.X509_V_ERR_EMAIL_MISMATCH 

411 ERR_IP_ADDRESS_MISMATCH = _lib.X509_V_ERR_IP_ADDRESS_MISMATCH 

412 ERR_APPLICATION_VERIFICATION = _lib.X509_V_ERR_APPLICATION_VERIFICATION 

413 

414 

415# Taken from https://golang.org/src/crypto/x509/root_linux.go 

416_CERTIFICATE_FILE_LOCATIONS = [ 

417 "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc. 

418 "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6 

419 "/etc/ssl/ca-bundle.pem", # OpenSUSE 

420 "/etc/pki/tls/cacert.pem", # OpenELEC 

421 "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7 

422] 

423 

424_CERTIFICATE_PATH_LOCATIONS = [ 

425 "/etc/ssl/certs", # SLES10/SLES11 

426] 

427 

428# These values are compared to output from cffi's ffi.string so they must be 

429# byte strings. 

430_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs" 

431_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem" 

432 

433 

434class Error(Exception): 

435 """ 

436 An error occurred in an `OpenSSL.SSL` API. 

437 """ 

438 

439 

440_raise_current_error = partial(_exception_from_error_queue, Error) 

441_openssl_assert = _make_assert(Error) 

442 

443 

444class WantReadError(Error): 

445 pass 

446 

447 

448class WantWriteError(Error): 

449 pass 

450 

451 

452class WantX509LookupError(Error): 

453 pass 

454 

455 

456class ZeroReturnError(Error): 

457 pass 

458 

459 

460class SysCallError(Error): 

461 pass 

462 

463 

464class _CallbackExceptionHelper: 

465 """ 

466 A base class for wrapper classes that allow for intelligent exception 

467 handling in OpenSSL callbacks. 

468 

469 :ivar list _problems: Any exceptions that occurred while executing in a 

470 context where they could not be raised in the normal way. Typically 

471 this is because OpenSSL has called into some Python code and requires a 

472 return value. The exceptions are saved to be raised later when it is 

473 possible to do so. 

474 """ 

475 

476 def __init__(self) -> None: 

477 self._problems: list[Exception] = [] 

478 

479 def raise_if_problem(self) -> None: 

480 """ 

481 Raise an exception from the OpenSSL error queue or that was previously 

482 captured whe running a callback. 

483 """ 

484 if self._problems: 

485 try: 

486 _raise_current_error() 

487 except Error: 

488 pass 

489 raise self._problems.pop(0) 

490 

491 

492class _VerifyHelper(_CallbackExceptionHelper): 

493 """ 

494 Wrap a callback such that it can be used as a certificate verification 

495 callback. 

496 """ 

497 

498 def __init__(self, callback: _VerifyCallback) -> None: 

499 _CallbackExceptionHelper.__init__(self) 

500 

501 @wraps(callback) 

502 def wrapper(ok, store_ctx): # type: ignore[no-untyped-def] 

503 x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx) 

504 _lib.X509_up_ref(x509) 

505 cert = X509._from_raw_x509_ptr(x509) 

506 error_number = _lib.X509_STORE_CTX_get_error(store_ctx) 

507 error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx) 

508 

509 index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx() 

510 ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index) 

511 connection = Connection._reverse_mapping[ssl] 

512 

513 try: 

514 result = callback( 

515 connection, cert, error_number, error_depth, ok 

516 ) 

517 except Exception as e: 

518 self._problems.append(e) 

519 return 0 

520 else: 

521 if result: 

522 _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK) 

523 return 1 

524 else: 

525 return 0 

526 

527 self.callback = _ffi.callback( 

528 "int (*)(int, X509_STORE_CTX *)", wrapper 

529 ) 

530 

531 

532class _ALPNSelectHelper(_CallbackExceptionHelper): 

533 """ 

534 Wrap a callback such that it can be used as an ALPN selection callback. 

535 """ 

536 

537 def __init__(self, callback: _ALPNSelectCallback) -> None: 

538 _CallbackExceptionHelper.__init__(self) 

539 

540 @wraps(callback) 

541 def wrapper(ssl, out, outlen, in_, inlen, arg): # type: ignore[no-untyped-def] 

542 try: 

543 conn = Connection._reverse_mapping[ssl] 

544 

545 # The string passed to us is made up of multiple 

546 # length-prefixed bytestrings. We need to split that into a 

547 # list. 

548 instr = _ffi.buffer(in_, inlen)[:] 

549 protolist = [] 

550 while instr: 

551 encoded_len = instr[0] 

552 proto = instr[1 : encoded_len + 1] 

553 protolist.append(proto) 

554 instr = instr[encoded_len + 1 :] 

555 

556 # Call the callback 

557 outbytes = callback(conn, protolist) 

558 any_accepted = True 

559 if outbytes is NO_OVERLAPPING_PROTOCOLS: 

560 outbytes = b"" 

561 any_accepted = False 

562 elif not isinstance(outbytes, bytes): 

563 raise TypeError( 

564 "ALPN callback must return a bytestring or the " 

565 "special NO_OVERLAPPING_PROTOCOLS sentinel value." 

566 ) 

567 

568 # Save our callback arguments on the connection object to make 

569 # sure that they don't get freed before OpenSSL can use them. 

570 # Then, return them in the appropriate output parameters. 

571 conn._alpn_select_callback_args = [ 

572 _ffi.new("unsigned char *", len(outbytes)), 

573 _ffi.new("unsigned char[]", outbytes), 

574 ] 

575 outlen[0] = conn._alpn_select_callback_args[0][0] 

576 out[0] = conn._alpn_select_callback_args[1] 

577 if not any_accepted: 

578 return _lib.SSL_TLSEXT_ERR_NOACK 

579 return _lib.SSL_TLSEXT_ERR_OK 

580 except Exception as e: 

581 self._problems.append(e) 

582 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL 

583 

584 self.callback = _ffi.callback( 

585 ( 

586 "int (*)(SSL *, unsigned char **, unsigned char *, " 

587 "const unsigned char *, unsigned int, void *)" 

588 ), 

589 wrapper, 

590 ) 

591 

592 

593class _OCSPServerCallbackHelper(_CallbackExceptionHelper): 

594 """ 

595 Wrap a callback such that it can be used as an OCSP callback for the server 

596 side. 

597 

598 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different 

599 ways. For servers, that callback is expected to retrieve some OCSP data and 

600 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, 

601 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback 

602 is expected to check the OCSP data, and returns a negative value on error, 

603 0 if the response is not acceptable, or positive if it is. These are 

604 mutually exclusive return code behaviours, and they mean that we need two 

605 helpers so that we always return an appropriate error code if the user's 

606 code throws an exception. 

607 

608 Given that we have to have two helpers anyway, these helpers are a bit more 

609 helpery than most: specifically, they hide a few more of the OpenSSL 

610 functions so that the user has an easier time writing these callbacks. 

611 

612 This helper implements the server side. 

613 """ 

614 

615 def __init__(self, callback: _OCSPServerCallback[Any]) -> None: 

616 _CallbackExceptionHelper.__init__(self) 

617 

618 @wraps(callback) 

619 def wrapper(ssl, cdata): # type: ignore[no-untyped-def] 

620 try: 

621 conn = Connection._reverse_mapping[ssl] 

622 

623 # Extract the data if any was provided. 

624 if cdata != _ffi.NULL: 

625 data = _ffi.from_handle(cdata) 

626 else: 

627 data = None 

628 

629 # Call the callback. 

630 ocsp_data = callback(conn, data) 

631 

632 if not isinstance(ocsp_data, bytes): 

633 raise TypeError("OCSP callback must return a bytestring.") 

634 

635 # If the OCSP data was provided, we will pass it to OpenSSL. 

636 # However, we have an early exit here: if no OCSP data was 

637 # provided we will just exit out and tell OpenSSL that there 

638 # is nothing to do. 

639 if not ocsp_data: 

640 return 3 # SSL_TLSEXT_ERR_NOACK 

641 

642 # OpenSSL takes ownership of this data and expects it to have 

643 # been allocated by OPENSSL_malloc. 

644 ocsp_data_length = len(ocsp_data) 

645 data_ptr = _lib.OPENSSL_malloc(ocsp_data_length) 

646 _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data 

647 

648 _lib.SSL_set_tlsext_status_ocsp_resp( 

649 ssl, data_ptr, ocsp_data_length 

650 ) 

651 

652 return 0 

653 except Exception as e: 

654 self._problems.append(e) 

655 return 2 # SSL_TLSEXT_ERR_ALERT_FATAL 

656 

657 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) 

658 

659 

660class _OCSPClientCallbackHelper(_CallbackExceptionHelper): 

661 """ 

662 Wrap a callback such that it can be used as an OCSP callback for the client 

663 side. 

664 

665 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different 

666 ways. For servers, that callback is expected to retrieve some OCSP data and 

667 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, 

668 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback 

669 is expected to check the OCSP data, and returns a negative value on error, 

670 0 if the response is not acceptable, or positive if it is. These are 

671 mutually exclusive return code behaviours, and they mean that we need two 

672 helpers so that we always return an appropriate error code if the user's 

673 code throws an exception. 

674 

675 Given that we have to have two helpers anyway, these helpers are a bit more 

676 helpery than most: specifically, they hide a few more of the OpenSSL 

677 functions so that the user has an easier time writing these callbacks. 

678 

679 This helper implements the client side. 

680 """ 

681 

682 def __init__(self, callback: _OCSPClientCallback[Any]) -> None: 

683 _CallbackExceptionHelper.__init__(self) 

684 

685 @wraps(callback) 

686 def wrapper(ssl, cdata): # type: ignore[no-untyped-def] 

687 try: 

688 conn = Connection._reverse_mapping[ssl] 

689 

690 # Extract the data if any was provided. 

691 if cdata != _ffi.NULL: 

692 data = _ffi.from_handle(cdata) 

693 else: 

694 data = None 

695 

696 # Get the OCSP data. 

697 ocsp_ptr = _ffi.new("unsigned char **") 

698 ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr) 

699 if ocsp_len < 0: 

700 # No OCSP data. 

701 ocsp_data = b"" 

702 else: 

703 # Copy the OCSP data, then pass it to the callback. 

704 ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:] 

705 

706 valid = callback(conn, ocsp_data, data) 

707 

708 # Return 1 on success or 0 on error. 

709 return int(bool(valid)) 

710 

711 except Exception as e: 

712 self._problems.append(e) 

713 # Return negative value if an exception is hit. 

714 return -1 

715 

716 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) 

717 

718 

719class _CookieGenerateCallbackHelper(_CallbackExceptionHelper): 

720 def __init__(self, callback: _CookieGenerateCallback) -> None: 

721 _CallbackExceptionHelper.__init__(self) 

722 

723 max_cookie_len = getattr(_lib, "DTLS1_COOKIE_LENGTH", 255) 

724 

725 @wraps(callback) 

726 def wrapper(ssl, out, outlen): # type: ignore[no-untyped-def] 

727 try: 

728 conn = Connection._reverse_mapping[ssl] 

729 cookie = callback(conn) 

730 if len(cookie) > max_cookie_len: 

731 raise ValueError( 

732 f"Cookie too long (got {len(cookie)} bytes, " 

733 f"max {max_cookie_len})" 

734 ) 

735 out[0 : len(cookie)] = cookie 

736 outlen[0] = len(cookie) 

737 return 1 

738 except Exception as e: 

739 self._problems.append(e) 

740 # "a zero return value can be used to abort the handshake" 

741 return 0 

742 

743 self.callback = _ffi.callback( 

744 "int (*)(SSL *, unsigned char *, unsigned int *)", 

745 wrapper, 

746 ) 

747 

748 

749class _CookieVerifyCallbackHelper(_CallbackExceptionHelper): 

750 def __init__(self, callback: _CookieVerifyCallback) -> None: 

751 _CallbackExceptionHelper.__init__(self) 

752 

753 @wraps(callback) 

754 def wrapper(ssl, c_cookie, cookie_len): # type: ignore[no-untyped-def] 

755 try: 

756 conn = Connection._reverse_mapping[ssl] 

757 return callback(conn, bytes(c_cookie[0:cookie_len])) 

758 except Exception as e: 

759 self._problems.append(e) 

760 return 0 

761 

762 self.callback = _ffi.callback( 

763 "int (*)(SSL *, unsigned char *, unsigned int)", 

764 wrapper, 

765 ) 

766 

767 

768def _asFileDescriptor(obj: Any) -> int: 

769 fd = None 

770 if not isinstance(obj, int): 

771 meth = getattr(obj, "fileno", None) 

772 if meth is not None: 

773 obj = meth() 

774 

775 if isinstance(obj, int): 

776 fd = obj 

777 

778 if not isinstance(fd, int): 

779 raise TypeError("argument must be an int, or have a fileno() method.") 

780 elif fd < 0: 

781 raise ValueError( 

782 f"file descriptor cannot be a negative integer ({fd:i})" 

783 ) 

784 

785 return fd 

786 

787 

788def OpenSSL_version(type: int) -> bytes: 

789 """ 

790 Return a string describing the version of OpenSSL in use. 

791 

792 :param type: One of the :const:`OPENSSL_` constants defined in this module. 

793 """ 

794 return _ffi.string(_lib.OpenSSL_version(type)) 

795 

796 

797SSLeay_version = OpenSSL_version 

798 

799 

800def _make_requires(flag: int, error: str) -> Callable[[_T], _T]: 

801 """ 

802 Builds a decorator that ensures that functions that rely on OpenSSL 

803 functions that are not present in this build raise NotImplementedError, 

804 rather than AttributeError coming out of cryptography. 

805 

806 :param flag: A cryptography flag that guards the functions, e.g. 

807 ``Cryptography_HAS_NEXTPROTONEG``. 

808 :param error: The string to be used in the exception if the flag is false. 

809 """ 

810 

811 def _requires_decorator(func): # type: ignore[no-untyped-def] 

812 if not flag: 

813 

814 @wraps(func) 

815 def explode(*args, **kwargs): # type: ignore[no-untyped-def] 

816 raise NotImplementedError(error) 

817 

818 return explode 

819 else: 

820 return func 

821 

822 return _requires_decorator 

823 

824 

825_requires_keylog = _make_requires( 

826 getattr(_lib, "Cryptography_HAS_KEYLOG", 0), "Key logging not available" 

827) 

828 

829_requires_ssl_get0_group_name = _make_requires( 

830 getattr(_lib, "Cryptography_HAS_SSL_GET0_GROUP_NAME", 0), 

831 "Getting group name is not supported by the linked OpenSSL version", 

832) 

833 

834_requires_ssl_cookie = _make_requires( 

835 getattr(_lib, "Cryptography_HAS_SSL_COOKIE", 0), 

836 "DTLS cookie support is not available", 

837) 

838 

839 

840class Session: 

841 """ 

842 A class representing an SSL session. A session defines certain connection 

843 parameters which may be re-used to speed up the setup of subsequent 

844 connections. 

845 

846 .. versionadded:: 0.14 

847 """ 

848 

849 _session: Any 

850 

851 

852F = TypeVar("F", bound=Callable[..., Any]) 

853 

854 

855def _require_not_used(f: F) -> F: 

856 @wraps(f) 

857 def inner(self: Context, *args: Any, **kwargs: Any) -> Any: 

858 if self._used: 

859 warnings.warn( 

860 ( 

861 "Attempting to mutate a Context after a Connection was " 

862 "created. In the future, this will raise an exception" 

863 ), 

864 DeprecationWarning, 

865 stacklevel=2, 

866 ) 

867 return f(self, *args, **kwargs) 

868 

869 return typing.cast(F, inner) 

870 

871 

872class Context: 

873 """ 

874 :class:`OpenSSL.SSL.Context` instances define the parameters for setting 

875 up new SSL connections. 

876 

877 :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD, 

878 DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD. 

879 SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should 

880 not be used. 

881 """ 

882 

883 _methods: typing.ClassVar[ 

884 dict[int, tuple[Callable[[], Any], int | None]] 

885 ] = { 

886 SSLv23_METHOD: (_lib.TLS_method, None), 

887 TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION), 

888 TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION), 

889 TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION), 

890 TLS_METHOD: (_lib.TLS_method, None), 

891 TLS_SERVER_METHOD: (_lib.TLS_server_method, None), 

892 TLS_CLIENT_METHOD: (_lib.TLS_client_method, None), 

893 DTLS_METHOD: (_lib.DTLS_method, None), 

894 DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None), 

895 DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None), 

896 } 

897 

898 def __init__(self, method: int) -> None: 

899 if not isinstance(method, int): 

900 raise TypeError("method must be an integer") 

901 

902 try: 

903 method_func, version = self._methods[method] 

904 except KeyError: 

905 raise ValueError("No such protocol") 

906 

907 method_obj = method_func() 

908 _openssl_assert(method_obj != _ffi.NULL) 

909 

910 context = _lib.SSL_CTX_new(method_obj) 

911 _openssl_assert(context != _ffi.NULL) 

912 context = _ffi.gc(context, _lib.SSL_CTX_free) 

913 

914 self._context = context 

915 self._used = False 

916 self._passphrase_helper: _PassphraseHelper | None = None 

917 self._passphrase_callback: _PassphraseCallback[Any] | None = None 

918 self._passphrase_userdata: Any | None = None 

919 self._verify_helper: _VerifyHelper | None = None 

920 self._verify_callback: _VerifyCallback | None = None 

921 self._info_callback = None 

922 self._keylog_callback = None 

923 self._tlsext_servername_callback = None 

924 self._app_data = None 

925 self._alpn_select_helper: _ALPNSelectHelper | None = None 

926 self._alpn_select_callback: _ALPNSelectCallback | None = None 

927 self._ocsp_helper: ( 

928 _OCSPClientCallbackHelper | _OCSPServerCallbackHelper | None 

929 ) = None 

930 self._ocsp_callback: ( 

931 _OCSPClientCallback[Any] | _OCSPServerCallback[Any] | None 

932 ) = None 

933 self._ocsp_data: Any | None = None 

934 self._cookie_generate_helper: _CookieGenerateCallbackHelper | None = ( 

935 None 

936 ) 

937 self._cookie_verify_helper: _CookieVerifyCallbackHelper | None = None 

938 

939 self.set_mode( 

940 _lib.SSL_MODE_ENABLE_PARTIAL_WRITE 

941 | _lib.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER 

942 ) 

943 if version is not None: 

944 self.set_min_proto_version(version) 

945 self.set_max_proto_version(version) 

946 

947 @_require_not_used 

948 def set_min_proto_version(self, version: int) -> None: 

949 """ 

950 Set the minimum supported protocol version. Setting the minimum 

951 version to 0 will enable protocol versions down to the lowest version 

952 supported by the library. 

953 

954 If the underlying OpenSSL build is missing support for the selected 

955 version, this method will raise an exception. 

956 """ 

957 _openssl_assert( 

958 _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1 

959 ) 

960 

961 @_require_not_used 

962 def set_max_proto_version(self, version: int) -> None: 

963 """ 

964 Set the maximum supported protocol version. Setting the maximum 

965 version to 0 will enable protocol versions up to the highest version 

966 supported by the library. 

967 

968 If the underlying OpenSSL build is missing support for the selected 

969 version, this method will raise an exception. 

970 """ 

971 _openssl_assert( 

972 _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1 

973 ) 

974 

975 @_require_not_used 

976 def load_verify_locations( 

977 self, 

978 cafile: _StrOrBytesPath | None, 

979 capath: _StrOrBytesPath | None = None, 

980 ) -> None: 

981 """ 

982 Let SSL know where we can find trusted certificates for the certificate 

983 chain. Note that the certificates have to be in PEM format. 

984 

985 If capath is passed, it must be a directory prepared using the 

986 ``c_rehash`` tool included with OpenSSL. Either, but not both, of 

987 *pemfile* or *capath* may be :data:`None`. 

988 

989 :param cafile: In which file we can find the certificates (``bytes`` or 

990 ``str``). 

991 :param capath: In which directory we can find the certificates 

992 (``bytes`` or ``str``). 

993 

994 :return: None 

995 """ 

996 if cafile is None: 

997 cafile = _ffi.NULL 

998 else: 

999 cafile = _path_bytes(cafile) 

1000 

1001 if capath is None: 

1002 capath = _ffi.NULL 

1003 else: 

1004 capath = _path_bytes(capath) 

1005 

1006 load_result = _lib.SSL_CTX_load_verify_locations( 

1007 self._context, cafile, capath 

1008 ) 

1009 if not load_result: 

1010 _raise_current_error() 

1011 

1012 def _wrap_callback( 

1013 self, callback: _PassphraseCallback[_T] 

1014 ) -> _PassphraseHelper: 

1015 @wraps(callback) 

1016 def wrapper(size: int, verify: bool, userdata: Any) -> bytes: 

1017 return callback(size, verify, self._passphrase_userdata) 

1018 

1019 return _PassphraseHelper( 

1020 FILETYPE_PEM, wrapper, more_args=True, truncate=True 

1021 ) 

1022 

1023 @_require_not_used 

1024 def set_passwd_cb( 

1025 self, 

1026 callback: _PassphraseCallback[_T], 

1027 userdata: _T | None = None, 

1028 ) -> None: 

1029 """ 

1030 Set the passphrase callback. This function will be called 

1031 when a private key with a passphrase is loaded. 

1032 

1033 :param callback: The Python callback to use. This must accept three 

1034 positional arguments. First, an integer giving the maximum length 

1035 of the passphrase it may return. If the returned passphrase is 

1036 longer than this, it will be truncated. Second, a boolean value 

1037 which will be true if the user should be prompted for the 

1038 passphrase twice and the callback should verify that the two values 

1039 supplied are equal. Third, the value given as the *userdata* 

1040 parameter to :meth:`set_passwd_cb`. The *callback* must return 

1041 a byte string. If an error occurs, *callback* should return a false 

1042 value (e.g. an empty string). 

1043 :param userdata: (optional) A Python object which will be given as 

1044 argument to the callback 

1045 :return: None 

1046 """ 

1047 if not callable(callback): 

1048 raise TypeError("callback must be callable") 

1049 

1050 self._passphrase_helper = self._wrap_callback(callback) 

1051 self._passphrase_callback = self._passphrase_helper.callback 

1052 _lib.SSL_CTX_set_default_passwd_cb( 

1053 self._context, self._passphrase_callback 

1054 ) 

1055 self._passphrase_userdata = userdata 

1056 

1057 @_require_not_used 

1058 def set_default_verify_paths(self) -> None: 

1059 """ 

1060 Specify that the platform provided CA certificates are to be used for 

1061 verification purposes. This method has some caveats related to the 

1062 binary wheels that cryptography (pyOpenSSL's primary dependency) ships: 

1063 

1064 * macOS will only load certificates using this method if the user has 

1065 the ``openssl@3`` `Homebrew <https://brew.sh>`_ formula installed 

1066 in the default location. 

1067 * Windows will not work. 

1068 * manylinux cryptography wheels will work on most common Linux 

1069 distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the 

1070 manylinux wheel and attempts to load roots via a fallback path. 

1071 

1072 :return: None 

1073 """ 

1074 # SSL_CTX_set_default_verify_paths will attempt to load certs from 

1075 # both a cafile and capath that are set at compile time. However, 

1076 # it will first check environment variables and, if present, load 

1077 # those paths instead 

1078 set_result = _lib.SSL_CTX_set_default_verify_paths(self._context) 

1079 _openssl_assert(set_result == 1) 

1080 # After attempting to set default_verify_paths we need to know whether 

1081 # to go down the fallback path. 

1082 # First we'll check to see if any env vars have been set. If so, 

1083 # we won't try to do anything else because the user has set the path 

1084 # themselves. 

1085 if not self._check_env_vars_set("SSL_CERT_DIR", "SSL_CERT_FILE"): 

1086 default_dir = _ffi.string(_lib.X509_get_default_cert_dir()) 

1087 default_file = _ffi.string(_lib.X509_get_default_cert_file()) 

1088 # Now we check to see if the default_dir and default_file are set 

1089 # to the exact values we use in our manylinux builds. If they are 

1090 # then we know to load the fallbacks 

1091 if ( 

1092 default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR 

1093 and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE 

1094 ): 

1095 # This is manylinux, let's load our fallback paths 

1096 self._fallback_default_verify_paths( 

1097 _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS 

1098 ) 

1099 

1100 def _check_env_vars_set(self, dir_env_var: str, file_env_var: str) -> bool: 

1101 """ 

1102 Check to see if the default cert dir/file environment vars are present. 

1103 

1104 :return: bool 

1105 """ 

1106 return ( 

1107 os.environ.get(file_env_var) is not None 

1108 or os.environ.get(dir_env_var) is not None 

1109 ) 

1110 

1111 def _fallback_default_verify_paths( 

1112 self, file_path: list[str], dir_path: list[str] 

1113 ) -> None: 

1114 """ 

1115 Default verify paths are based on the compiled version of OpenSSL. 

1116 However, when pyca/cryptography is compiled as a manylinux wheel 

1117 that compiled location can potentially be wrong. So, like Go, we 

1118 will try a predefined set of paths and attempt to load roots 

1119 from there. 

1120 

1121 :return: None 

1122 """ 

1123 for cafile in file_path: 

1124 if os.path.isfile(cafile): 

1125 self.load_verify_locations(cafile) 

1126 break 

1127 

1128 for capath in dir_path: 

1129 if os.path.isdir(capath): 

1130 self.load_verify_locations(None, capath) 

1131 break 

1132 

1133 @_require_not_used 

1134 def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None: 

1135 """ 

1136 Load a certificate chain from a file. 

1137 

1138 :param certfile: The name of the certificate chain file (``bytes`` or 

1139 ``str``). Must be PEM encoded. 

1140 

1141 :return: None 

1142 """ 

1143 certfile = _path_bytes(certfile) 

1144 

1145 result = _lib.SSL_CTX_use_certificate_chain_file( 

1146 self._context, certfile 

1147 ) 

1148 if not result: 

1149 _raise_current_error() 

1150 

1151 @_require_not_used 

1152 def use_certificate_file( 

1153 self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM 

1154 ) -> None: 

1155 """ 

1156 Load a certificate from a file 

1157 

1158 :param certfile: The name of the certificate file (``bytes`` or 

1159 ``str``). 

1160 :param filetype: (optional) The encoding of the file, which is either 

1161 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is 

1162 :const:`FILETYPE_PEM`. 

1163 

1164 :return: None 

1165 """ 

1166 certfile = _path_bytes(certfile) 

1167 if not isinstance(filetype, int): 

1168 raise TypeError("filetype must be an integer") 

1169 

1170 use_result = _lib.SSL_CTX_use_certificate_file( 

1171 self._context, certfile, filetype 

1172 ) 

1173 if not use_result: 

1174 _raise_current_error() 

1175 

1176 @_require_not_used 

1177 def use_certificate(self, cert: X509 | x509.Certificate) -> None: 

1178 """ 

1179 Load a certificate from a X509 object 

1180 

1181 :param cert: The X509 object 

1182 :return: None 

1183 """ 

1184 # Mirrored at Connection.use_certificate 

1185 if not isinstance(cert, X509): 

1186 cert = X509.from_cryptography(cert) 

1187 else: 

1188 warnings.warn( 

1189 ( 

1190 "Passing pyOpenSSL X509 objects is deprecated. You " 

1191 "should use a cryptography.x509.Certificate instead." 

1192 ), 

1193 DeprecationWarning, 

1194 stacklevel=2, 

1195 ) 

1196 

1197 use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509) 

1198 if not use_result: 

1199 _raise_current_error() 

1200 

1201 @_require_not_used 

1202 def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None: 

1203 """ 

1204 Add certificate to chain 

1205 

1206 :param certobj: The X509 certificate object to add to the chain 

1207 :return: None 

1208 """ 

1209 if not isinstance(certobj, X509): 

1210 certobj = X509.from_cryptography(certobj) 

1211 else: 

1212 warnings.warn( 

1213 ( 

1214 "Passing pyOpenSSL X509 objects is deprecated. You " 

1215 "should use a cryptography.x509.Certificate instead." 

1216 ), 

1217 DeprecationWarning, 

1218 stacklevel=2, 

1219 ) 

1220 

1221 copy = _lib.X509_dup(certobj._x509) 

1222 add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy) 

1223 if not add_result: 

1224 # TODO: This is untested. 

1225 _lib.X509_free(copy) 

1226 _raise_current_error() 

1227 

1228 def _raise_passphrase_exception(self) -> None: 

1229 if self._passphrase_helper is not None: 

1230 self._passphrase_helper.raise_if_problem(Error) 

1231 

1232 _raise_current_error() 

1233 

1234 @_require_not_used 

1235 def use_privatekey_file( 

1236 self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM 

1237 ) -> None: 

1238 """ 

1239 Load a private key from a file 

1240 

1241 :param keyfile: The name of the key file (``bytes`` or ``str``) 

1242 :param filetype: (optional) The encoding of the file, which is either 

1243 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is 

1244 :const:`FILETYPE_PEM`. 

1245 

1246 :return: None 

1247 """ 

1248 keyfile = _path_bytes(keyfile) 

1249 

1250 if not isinstance(filetype, int): 

1251 raise TypeError("filetype must be an integer") 

1252 

1253 use_result = _lib.SSL_CTX_use_PrivateKey_file( 

1254 self._context, keyfile, filetype 

1255 ) 

1256 if not use_result: 

1257 self._raise_passphrase_exception() 

1258 

1259 @_require_not_used 

1260 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: 

1261 """ 

1262 Load a private key from a PKey object 

1263 

1264 :param pkey: The PKey object 

1265 :return: None 

1266 """ 

1267 # Mirrored at Connection.use_privatekey 

1268 if not isinstance(pkey, PKey): 

1269 pkey = PKey.from_cryptography_key(pkey) 

1270 else: 

1271 warnings.warn( 

1272 ( 

1273 "Passing pyOpenSSL PKey objects is deprecated. You " 

1274 "should use a cryptography private key instead." 

1275 ), 

1276 DeprecationWarning, 

1277 stacklevel=2, 

1278 ) 

1279 

1280 use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey) 

1281 if not use_result: 

1282 self._raise_passphrase_exception() 

1283 

1284 def check_privatekey(self) -> None: 

1285 """ 

1286 Check if the private key (loaded with :meth:`use_privatekey`) matches 

1287 the certificate (loaded with :meth:`use_certificate`) 

1288 

1289 :return: :data:`None` (raises :exc:`Error` if something's wrong) 

1290 """ 

1291 if not _lib.SSL_CTX_check_private_key(self._context): 

1292 _raise_current_error() 

1293 

1294 @_require_not_used 

1295 def load_client_ca(self, cafile: bytes) -> None: 

1296 """ 

1297 Load the trusted certificates that will be sent to the client. Does 

1298 not actually imply any of the certificates are trusted; that must be 

1299 configured separately. 

1300 

1301 :param bytes cafile: The path to a certificates file in PEM format. 

1302 :return: None 

1303 """ 

1304 ca_list = _lib.SSL_load_client_CA_file( 

1305 _text_to_bytes_and_warn("cafile", cafile) 

1306 ) 

1307 _openssl_assert(ca_list != _ffi.NULL) 

1308 _lib.SSL_CTX_set_client_CA_list(self._context, ca_list) 

1309 

1310 @_require_not_used 

1311 def set_session_id(self, buf: bytes) -> None: 

1312 """ 

1313 Set the session id to *buf* within which a session can be reused for 

1314 this Context object. This is needed when doing session resumption, 

1315 because there is no way for a stored session to know which Context 

1316 object it is associated with. 

1317 

1318 :param bytes buf: The session id. 

1319 

1320 :returns: None 

1321 """ 

1322 buf = _text_to_bytes_and_warn("buf", buf) 

1323 _openssl_assert( 

1324 _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf)) 

1325 == 1 

1326 ) 

1327 

1328 @_require_not_used 

1329 def set_session_cache_mode(self, mode: int) -> int: 

1330 """ 

1331 Set the behavior of the session cache used by all connections using 

1332 this Context. The previously set mode is returned. See 

1333 :const:`SESS_CACHE_*` for details about particular modes. 

1334 

1335 :param mode: One or more of the SESS_CACHE_* flags (combine using 

1336 bitwise or) 

1337 :returns: The previously set caching mode. 

1338 

1339 .. versionadded:: 0.14 

1340 """ 

1341 if not isinstance(mode, int): 

1342 raise TypeError("mode must be an integer") 

1343 

1344 return _lib.SSL_CTX_set_session_cache_mode(self._context, mode) 

1345 

1346 def get_session_cache_mode(self) -> int: 

1347 """ 

1348 Get the current session cache mode. 

1349 

1350 :returns: The currently used cache mode. 

1351 

1352 .. versionadded:: 0.14 

1353 """ 

1354 return _lib.SSL_CTX_get_session_cache_mode(self._context) 

1355 

1356 @_require_not_used 

1357 def set_verify( 

1358 self, mode: int, callback: _VerifyCallback | None = None 

1359 ) -> None: 

1360 """ 

1361 Set the verification flags for this Context object to *mode* and 

1362 specify that *callback* should be used for verification callbacks. 

1363 

1364 :param mode: The verify mode, this should be one of 

1365 :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If 

1366 :const:`VERIFY_PEER` is used, *mode* can be OR:ed with 

1367 :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and 

1368 :const:`VERIFY_CLIENT_ONCE` to further control the behaviour. 

1369 :param callback: The optional Python verification callback to use. 

1370 This should take five arguments: A Connection object, an X509 

1371 object, and three integer variables, which are in turn potential 

1372 error number, error depth and return code. *callback* should 

1373 return True if verification passes and False otherwise. 

1374 If omitted, OpenSSL's default verification is used. 

1375 :return: None 

1376 

1377 See SSL_CTX_set_verify(3SSL) for further details. 

1378 """ 

1379 if not isinstance(mode, int): 

1380 raise TypeError("mode must be an integer") 

1381 

1382 if callback is None: 

1383 self._verify_helper = None 

1384 self._verify_callback = None 

1385 _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL) 

1386 else: 

1387 if not callable(callback): 

1388 raise TypeError("callback must be callable") 

1389 

1390 self._verify_helper = _VerifyHelper(callback) 

1391 self._verify_callback = self._verify_helper.callback 

1392 _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback) 

1393 

1394 @_require_not_used 

1395 def set_verify_depth(self, depth: int) -> None: 

1396 """ 

1397 Set the maximum depth for the certificate chain verification that shall 

1398 be allowed for this Context object. 

1399 

1400 :param depth: An integer specifying the verify depth 

1401 :return: None 

1402 """ 

1403 if not isinstance(depth, int): 

1404 raise TypeError("depth must be an integer") 

1405 

1406 _lib.SSL_CTX_set_verify_depth(self._context, depth) 

1407 

1408 def get_verify_mode(self) -> int: 

1409 """ 

1410 Retrieve the Context object's verify mode, as set by 

1411 :meth:`set_verify`. 

1412 

1413 :return: The verify mode 

1414 """ 

1415 return _lib.SSL_CTX_get_verify_mode(self._context) 

1416 

1417 def get_verify_depth(self) -> int: 

1418 """ 

1419 Retrieve the Context object's verify depth, as set by 

1420 :meth:`set_verify_depth`. 

1421 

1422 :return: The verify depth 

1423 """ 

1424 return _lib.SSL_CTX_get_verify_depth(self._context) 

1425 

1426 @_require_not_used 

1427 def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None: 

1428 """ 

1429 Load parameters for Ephemeral Diffie-Hellman 

1430 

1431 :param dhfile: The file to load EDH parameters from (``bytes`` or 

1432 ``str``). 

1433 

1434 :return: None 

1435 """ 

1436 dhfile = _path_bytes(dhfile) 

1437 

1438 bio = _lib.BIO_new_file(dhfile, b"r") 

1439 if bio == _ffi.NULL: 

1440 _raise_current_error() 

1441 bio = _ffi.gc(bio, _lib.BIO_free) 

1442 

1443 dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

1444 dh = _ffi.gc(dh, _lib.DH_free) 

1445 res = _lib.SSL_CTX_set_tmp_dh(self._context, dh) 

1446 _openssl_assert(res == 1) 

1447 

1448 @_require_not_used 

1449 def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None: 

1450 """ 

1451 Select a curve to use for ECDHE key exchange. 

1452 

1453 :param curve: A curve instance from cryptography 

1454 (:class:`~cryptogragraphy.hazmat.primitives.asymmetric.ec.EllipticCurve`). 

1455 Alternatively (deprecated) a curve object from either 

1456 :meth:`OpenSSL.crypto.get_elliptic_curve` or 

1457 :meth:`OpenSSL.crypto.get_elliptic_curves`. 

1458 

1459 :return: None 

1460 """ 

1461 

1462 if isinstance(curve, _EllipticCurve): 

1463 warnings.warn( 

1464 ( 

1465 "Passing pyOpenSSL elliptic curves to set_tmp_ecdh is " 

1466 "deprecated. You should use cryptography's elliptic curve " 

1467 "types instead." 

1468 ), 

1469 DeprecationWarning, 

1470 stacklevel=2, 

1471 ) 

1472 _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY()) 

1473 else: 

1474 name = curve.name 

1475 if name == "secp192r1": 

1476 name = "prime192v1" 

1477 elif name == "secp256r1": 

1478 name = "prime256v1" 

1479 nid = _lib.OBJ_txt2nid(name.encode()) 

1480 if nid == _lib.NID_undef: 

1481 _raise_current_error() 

1482 

1483 ec = _lib.EC_KEY_new_by_curve_name(nid) 

1484 _openssl_assert(ec != _ffi.NULL) 

1485 ec = _ffi.gc(ec, _lib.EC_KEY_free) 

1486 _lib.SSL_CTX_set_tmp_ecdh(self._context, ec) 

1487 

1488 @_require_not_used 

1489 def set_cipher_list(self, cipher_list: bytes) -> None: 

1490 """ 

1491 Set the list of ciphers to be used in this context. 

1492 

1493 See the OpenSSL manual for more information (e.g. 

1494 :manpage:`ciphers(1)`). 

1495 

1496 Note this API does not change the cipher suites used in TLS 1.3 

1497 Use `set_tls13_ciphersuites` for that. 

1498 

1499 :param bytes cipher_list: An OpenSSL cipher string. 

1500 :return: None 

1501 """ 

1502 cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list) 

1503 

1504 if not isinstance(cipher_list, bytes): 

1505 raise TypeError("cipher_list must be a byte string.") 

1506 

1507 _openssl_assert( 

1508 _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1 

1509 ) 

1510 

1511 @_require_not_used 

1512 def set_tls13_ciphersuites(self, ciphersuites: bytes) -> None: 

1513 """ 

1514 Set the list of TLS 1.3 ciphers to be used in this context. 

1515 OpenSSL maintains a separate list of TLS 1.3+ ciphers to 

1516 ciphers for TLS 1.2 and lowers. 

1517 

1518 See the OpenSSL manual for more information (e.g. 

1519 :manpage:`ciphers(1)`). 

1520 

1521 :param bytes ciphersuites: An OpenSSL cipher string containing 

1522 TLS 1.3+ ciphersuites. 

1523 :return: None 

1524 

1525 .. versionadded:: 25.2.0 

1526 """ 

1527 if not isinstance(ciphersuites, bytes): 

1528 raise TypeError("ciphersuites must be a byte string.") 

1529 

1530 _openssl_assert( 

1531 _lib.SSL_CTX_set_ciphersuites(self._context, ciphersuites) == 1 

1532 ) 

1533 

1534 @_require_not_used 

1535 def set_client_ca_list( 

1536 self, certificate_authorities: Sequence[X509Name] 

1537 ) -> None: 

1538 """ 

1539 Set the list of preferred client certificate signers for this server 

1540 context. 

1541 

1542 This list of certificate authorities will be sent to the client when 

1543 the server requests a client certificate. 

1544 

1545 :param certificate_authorities: a sequence of X509Names. 

1546 :return: None 

1547 

1548 .. versionadded:: 0.10 

1549 """ 

1550 name_stack = _lib.sk_X509_NAME_new_null() 

1551 _openssl_assert(name_stack != _ffi.NULL) 

1552 

1553 try: 

1554 for ca_name in certificate_authorities: 

1555 if not isinstance(ca_name, X509Name): 

1556 raise TypeError( 

1557 f"client CAs must be X509Name objects, not " 

1558 f"{type(ca_name).__name__} objects" 

1559 ) 

1560 copy = _lib.X509_NAME_dup(ca_name._name) 

1561 _openssl_assert(copy != _ffi.NULL) 

1562 push_result = _lib.sk_X509_NAME_push(name_stack, copy) 

1563 if not push_result: 

1564 _lib.X509_NAME_free(copy) 

1565 _raise_current_error() 

1566 except Exception: 

1567 _lib.sk_X509_NAME_free(name_stack) 

1568 raise 

1569 

1570 _lib.SSL_CTX_set_client_CA_list(self._context, name_stack) 

1571 

1572 @_require_not_used 

1573 def add_client_ca( 

1574 self, certificate_authority: X509 | x509.Certificate 

1575 ) -> None: 

1576 """ 

1577 Add the CA certificate to the list of preferred signers for this 

1578 context. 

1579 

1580 The list of certificate authorities will be sent to the client when the 

1581 server requests a client certificate. 

1582 

1583 :param certificate_authority: certificate authority's X509 certificate. 

1584 :return: None 

1585 

1586 .. versionadded:: 0.10 

1587 """ 

1588 if not isinstance(certificate_authority, X509): 

1589 certificate_authority = X509.from_cryptography( 

1590 certificate_authority 

1591 ) 

1592 else: 

1593 warnings.warn( 

1594 ( 

1595 "Passing pyOpenSSL X509 objects is deprecated. You " 

1596 "should use a cryptography.x509.Certificate instead." 

1597 ), 

1598 DeprecationWarning, 

1599 stacklevel=2, 

1600 ) 

1601 

1602 add_result = _lib.SSL_CTX_add_client_CA( 

1603 self._context, certificate_authority._x509 

1604 ) 

1605 _openssl_assert(add_result == 1) 

1606 

1607 @_require_not_used 

1608 def set_timeout(self, timeout: int) -> None: 

1609 """ 

1610 Set the timeout for newly created sessions for this Context object to 

1611 *timeout*. The default value is 300 seconds. See the OpenSSL manual 

1612 for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`). 

1613 

1614 :param timeout: The timeout in (whole) seconds 

1615 :return: The previous session timeout 

1616 """ 

1617 if not isinstance(timeout, int): 

1618 raise TypeError("timeout must be an integer") 

1619 

1620 return _lib.SSL_CTX_set_timeout(self._context, timeout) 

1621 

1622 def get_timeout(self) -> int: 

1623 """ 

1624 Retrieve session timeout, as set by :meth:`set_timeout`. The default 

1625 is 300 seconds. 

1626 

1627 :return: The session timeout 

1628 """ 

1629 return _lib.SSL_CTX_get_timeout(self._context) 

1630 

1631 @_require_not_used 

1632 def set_info_callback( 

1633 self, callback: Callable[[Connection, int, int], None] 

1634 ) -> None: 

1635 """ 

1636 Set the information callback to *callback*. This function will be 

1637 called from time to time during SSL handshakes. 

1638 

1639 :param callback: The Python callback to use. This should take three 

1640 arguments: a Connection object and two integers. The first integer 

1641 specifies where in the SSL handshake the function was called, and 

1642 the other the return code from a (possibly failed) internal 

1643 function call. 

1644 :return: None 

1645 """ 

1646 

1647 @wraps(callback) 

1648 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def] 

1649 callback(Connection._reverse_mapping[ssl], where, return_code) 

1650 

1651 self._info_callback = _ffi.callback( 

1652 "void (*)(const SSL *, int, int)", wrapper 

1653 ) 

1654 _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) 

1655 

1656 @_requires_keylog 

1657 @_require_not_used 

1658 def set_keylog_callback( 

1659 self, callback: Callable[[Connection, bytes], None] 

1660 ) -> None: 

1661 """ 

1662 Set the TLS key logging callback to *callback*. This function will be 

1663 called whenever TLS key material is generated or received, in order 

1664 to allow applications to store this keying material for debugging 

1665 purposes. 

1666 

1667 :param callback: The Python callback to use. This should take two 

1668 arguments: a Connection object and a bytestring that contains 

1669 the key material in the format used by NSS for its SSLKEYLOGFILE 

1670 debugging output. 

1671 :return: None 

1672 """ 

1673 

1674 @wraps(callback) 

1675 def wrapper(ssl, line): # type: ignore[no-untyped-def] 

1676 line = _ffi.string(line) 

1677 callback(Connection._reverse_mapping[ssl], line) 

1678 

1679 self._keylog_callback = _ffi.callback( 

1680 "void (*)(const SSL *, const char *)", wrapper 

1681 ) 

1682 _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback) 

1683 

1684 def get_app_data(self) -> Any: 

1685 """ 

1686 Get the application data (supplied via :meth:`set_app_data()`) 

1687 

1688 :return: The application data 

1689 """ 

1690 return self._app_data 

1691 

1692 @_require_not_used 

1693 def set_app_data(self, data: Any) -> None: 

1694 """ 

1695 Set the application data (will be returned from get_app_data()) 

1696 

1697 :param data: Any Python object 

1698 :return: None 

1699 """ 

1700 self._app_data = data 

1701 

1702 def get_cert_store(self) -> X509Store | None: 

1703 """ 

1704 Get the certificate store for the context. This can be used to add 

1705 "trusted" certificates without using the 

1706 :meth:`load_verify_locations` method. 

1707 

1708 :return: A X509Store object or None if it does not have one. 

1709 """ 

1710 store = _lib.SSL_CTX_get_cert_store(self._context) 

1711 if store == _ffi.NULL: 

1712 # TODO: This is untested. 

1713 return None 

1714 

1715 pystore = X509Store.__new__(X509Store) 

1716 pystore._store = store 

1717 return pystore 

1718 

1719 @_require_not_used 

1720 def set_options(self, options: int) -> int: 

1721 """ 

1722 Add options. Options set before are not cleared! 

1723 This method should be used with the :const:`OP_*` constants. 

1724 

1725 :param options: The options to add. 

1726 :return: The new option bitmask. 

1727 """ 

1728 if not isinstance(options, int): 

1729 raise TypeError("options must be an integer") 

1730 

1731 return _lib.SSL_CTX_set_options(self._context, options) 

1732 

1733 @_require_not_used 

1734 def set_mode(self, mode: int) -> int: 

1735 """ 

1736 Add modes via bitmask. Modes set before are not cleared! This method 

1737 should be used with the :const:`MODE_*` constants. 

1738 

1739 :param mode: The mode to add. 

1740 :return: The new mode bitmask. 

1741 """ 

1742 if not isinstance(mode, int): 

1743 raise TypeError("mode must be an integer") 

1744 

1745 return _lib.SSL_CTX_set_mode(self._context, mode) 

1746 

1747 @_require_not_used 

1748 def clear_mode(self, mode_to_clear: int) -> int: 

1749 """ 

1750 Modes previously set cannot be overwritten without being 

1751 cleared first. This method should be used to clear existing modes. 

1752 """ 

1753 return _lib.SSL_CTX_clear_mode(self._context, mode_to_clear) 

1754 

1755 @_require_not_used 

1756 def set_tlsext_servername_callback( 

1757 self, callback: Callable[[Connection], None] 

1758 ) -> None: 

1759 """ 

1760 Specify a callback function to be called when clients specify a server 

1761 name. 

1762 

1763 :param callback: The callback function. It will be invoked with one 

1764 argument, the Connection instance. 

1765 

1766 .. versionadded:: 0.13 

1767 """ 

1768 

1769 @wraps(callback) 

1770 def wrapper(ssl, alert, arg): # type: ignore[no-untyped-def] 

1771 try: 

1772 callback(Connection._reverse_mapping[ssl]) 

1773 except Exception: 

1774 sys.excepthook(*sys.exc_info()) 

1775 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL 

1776 return 0 

1777 

1778 self._tlsext_servername_callback = _ffi.callback( 

1779 "int (*)(SSL *, int *, void *)", wrapper 

1780 ) 

1781 _lib.SSL_CTX_set_tlsext_servername_callback( 

1782 self._context, self._tlsext_servername_callback 

1783 ) 

1784 

1785 @_require_not_used 

1786 def set_tlsext_use_srtp(self, profiles: bytes) -> None: 

1787 """ 

1788 Enable support for negotiating SRTP keying material. 

1789 

1790 :param bytes profiles: A colon delimited list of protection profile 

1791 names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``. 

1792 :return: None 

1793 """ 

1794 if not isinstance(profiles, bytes): 

1795 raise TypeError("profiles must be a byte string.") 

1796 

1797 _openssl_assert( 

1798 _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0 

1799 ) 

1800 

1801 @_require_not_used 

1802 def set_alpn_protos(self, protos: list[bytes]) -> None: 

1803 """ 

1804 Specify the protocols that the client is prepared to speak after the 

1805 TLS connection has been negotiated using Application Layer Protocol 

1806 Negotiation. 

1807 

1808 :param protos: A list of the protocols to be offered to the server. 

1809 This list should be a Python list of bytestrings representing the 

1810 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. 

1811 """ 

1812 # Different versions of OpenSSL are inconsistent about how they handle 

1813 # empty proto lists (see #1043), so we avoid the problem entirely by 

1814 # rejecting them ourselves. 

1815 if not protos: 

1816 raise ValueError("at least one protocol must be specified") 

1817 

1818 # Take the list of protocols and join them together, prefixing them 

1819 # with their lengths. 

1820 protostr = b"".join( 

1821 chain.from_iterable((bytes((len(p),)), p) for p in protos) 

1822 ) 

1823 

1824 # Build a C string from the list. We don't need to save this off 

1825 # because OpenSSL immediately copies the data out. 

1826 input_str = _ffi.new("unsigned char[]", protostr) 

1827 

1828 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: 

1829 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() 

1830 # return 0 on success, and non-0 on failure. 

1831 # WARNING: these functions reverse the return value convention. 

1832 _openssl_assert( 

1833 _lib.SSL_CTX_set_alpn_protos( 

1834 self._context, input_str, len(protostr) 

1835 ) 

1836 == 0 

1837 ) 

1838 

1839 @_require_not_used 

1840 def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None: 

1841 """ 

1842 Specify a callback function that will be called on the server when a 

1843 client offers protocols using ALPN. 

1844 

1845 :param callback: The callback function. It will be invoked with two 

1846 arguments: the Connection, and a list of offered protocols as 

1847 bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return 

1848 one of those bytestrings to indicate the chosen protocol, the 

1849 empty bytestring to terminate the TLS connection, or the 

1850 :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered 

1851 protocol was selected, but that the connection should not be 

1852 aborted. 

1853 """ 

1854 self._alpn_select_helper = _ALPNSelectHelper(callback) 

1855 self._alpn_select_callback = self._alpn_select_helper.callback 

1856 _lib.SSL_CTX_set_alpn_select_cb( 

1857 self._context, self._alpn_select_callback, _ffi.NULL 

1858 ) 

1859 

1860 def _set_ocsp_callback( 

1861 self, 

1862 helper: _OCSPClientCallbackHelper | _OCSPServerCallbackHelper, 

1863 data: Any | None, 

1864 ) -> None: 

1865 """ 

1866 This internal helper does the common work for 

1867 ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is 

1868 almost all of it. 

1869 """ 

1870 self._ocsp_helper = helper 

1871 self._ocsp_callback = helper.callback 

1872 if data is None: 

1873 self._ocsp_data = _ffi.NULL 

1874 else: 

1875 self._ocsp_data = _ffi.new_handle(data) 

1876 

1877 rc = _lib.SSL_CTX_set_tlsext_status_cb( 

1878 self._context, self._ocsp_callback 

1879 ) 

1880 _openssl_assert(rc == 1) 

1881 rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data) 

1882 _openssl_assert(rc == 1) 

1883 

1884 @_require_not_used 

1885 def set_ocsp_server_callback( 

1886 self, 

1887 callback: _OCSPServerCallback[_T], 

1888 data: _T | None = None, 

1889 ) -> None: 

1890 """ 

1891 Set a callback to provide OCSP data to be stapled to the TLS handshake 

1892 on the server side. 

1893 

1894 :param callback: The callback function. It will be invoked with two 

1895 arguments: the Connection, and the optional arbitrary data you have 

1896 provided. The callback must return a bytestring that contains the 

1897 OCSP data to staple to the handshake. If no OCSP data is available 

1898 for this connection, return the empty bytestring. 

1899 :param data: Some opaque data that will be passed into the callback 

1900 function when called. This can be used to avoid needing to do 

1901 complex data lookups or to keep track of what context is being 

1902 used. This parameter is optional. 

1903 """ 

1904 helper = _OCSPServerCallbackHelper(callback) 

1905 self._set_ocsp_callback(helper, data) 

1906 

1907 @_require_not_used 

1908 def set_ocsp_client_callback( 

1909 self, 

1910 callback: _OCSPClientCallback[_T], 

1911 data: _T | None = None, 

1912 ) -> None: 

1913 """ 

1914 Set a callback to validate OCSP data stapled to the TLS handshake on 

1915 the client side. 

1916 

1917 :param callback: The callback function. It will be invoked with three 

1918 arguments: the Connection, a bytestring containing the stapled OCSP 

1919 assertion, and the optional arbitrary data you have provided. The 

1920 callback must return a boolean that indicates the result of 

1921 validating the OCSP data: ``True`` if the OCSP data is valid and 

1922 the certificate can be trusted, or ``False`` if either the OCSP 

1923 data is invalid or the certificate has been revoked. 

1924 :param data: Some opaque data that will be passed into the callback 

1925 function when called. This can be used to avoid needing to do 

1926 complex data lookups or to keep track of what context is being 

1927 used. This parameter is optional. 

1928 """ 

1929 helper = _OCSPClientCallbackHelper(callback) 

1930 self._set_ocsp_callback(helper, data) 

1931 

1932 @_require_not_used 

1933 @_requires_ssl_cookie 

1934 def set_cookie_generate_callback( 

1935 self, callback: _CookieGenerateCallback 

1936 ) -> None: 

1937 self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback) 

1938 _lib.SSL_CTX_set_cookie_generate_cb( 

1939 self._context, 

1940 self._cookie_generate_helper.callback, 

1941 ) 

1942 

1943 @_require_not_used 

1944 @_requires_ssl_cookie 

1945 def set_cookie_verify_callback( 

1946 self, callback: _CookieVerifyCallback 

1947 ) -> None: 

1948 self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback) 

1949 _lib.SSL_CTX_set_cookie_verify_cb( 

1950 self._context, 

1951 self._cookie_verify_helper.callback, 

1952 ) 

1953 

1954 

1955class Connection: 

1956 _reverse_mapping: typing.MutableMapping[Any, Connection] = ( 

1957 WeakValueDictionary() 

1958 ) 

1959 

1960 def __init__( 

1961 self, context: Context, socket: socket.socket | None = None 

1962 ) -> None: 

1963 """ 

1964 Create a new Connection object, using the given OpenSSL.SSL.Context 

1965 instance and socket. 

1966 

1967 :param context: An SSL Context to use for this connection 

1968 :param socket: The socket to use for transport layer 

1969 """ 

1970 if not isinstance(context, Context): 

1971 raise TypeError("context must be a Context instance") 

1972 

1973 context._used = True 

1974 

1975 ssl = _lib.SSL_new(context._context) 

1976 self._ssl = _ffi.gc(ssl, _lib.SSL_free) 

1977 # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns 

1978 # an SSL_ERROR_WANT_READ when processing a non-application data packet 

1979 # even though there is still data on the underlying transport. 

1980 # See https://github.com/openssl/openssl/issues/6234 for more details. 

1981 _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY) 

1982 self._context = context 

1983 self._app_data = None 

1984 

1985 # References to strings used for Application Layer Protocol 

1986 # Negotiation. These strings get copied at some point but it's well 

1987 # after the callback returns, so we have to hang them somewhere to 

1988 # avoid them getting freed. 

1989 self._alpn_select_callback_args: Any = None 

1990 

1991 # Reference the verify_callback of the Context. This ensures that if 

1992 # set_verify is called again after the SSL object has been created we 

1993 # do not point to a dangling reference 

1994 self._verify_helper = context._verify_helper 

1995 self._verify_callback = context._verify_callback 

1996 

1997 # And likewise for the cookie callbacks 

1998 self._cookie_generate_helper = context._cookie_generate_helper 

1999 self._cookie_verify_helper = context._cookie_verify_helper 

2000 

2001 self._reverse_mapping[self._ssl] = self 

2002 

2003 if socket is None: 

2004 self._socket = None 

2005 # Don't set up any gc for these, SSL_free will take care of them. 

2006 self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem()) 

2007 _openssl_assert(self._into_ssl != _ffi.NULL) 

2008 

2009 self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem()) 

2010 _openssl_assert(self._from_ssl != _ffi.NULL) 

2011 

2012 _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl) 

2013 else: 

2014 self._into_ssl = None 

2015 self._from_ssl = None 

2016 self._socket = socket 

2017 set_result = _lib.SSL_set_fd( 

2018 self._ssl, _asFileDescriptor(self._socket) 

2019 ) 

2020 _openssl_assert(set_result == 1) 

2021 

2022 def __getattr__(self, name: str) -> Any: 

2023 """ 

2024 Look up attributes on the wrapped socket object if they are not found 

2025 on the Connection object. 

2026 """ 

2027 if self._socket is None: 

2028 raise AttributeError( 

2029 f"'{self.__class__.__name__}' object has no attribute '{name}'" 

2030 ) 

2031 else: 

2032 return getattr(self._socket, name) 

2033 

2034 def _raise_ssl_error(self, ssl: Any, result: int) -> None: 

2035 if self._context._verify_helper is not None: 

2036 self._context._verify_helper.raise_if_problem() 

2037 if self._context._alpn_select_helper is not None: 

2038 self._context._alpn_select_helper.raise_if_problem() 

2039 if self._context._ocsp_helper is not None: 

2040 self._context._ocsp_helper.raise_if_problem() 

2041 

2042 error = _lib.SSL_get_error(ssl, result) 

2043 if error == _lib.SSL_ERROR_WANT_READ: 

2044 raise WantReadError() 

2045 elif error == _lib.SSL_ERROR_WANT_WRITE: 

2046 raise WantWriteError() 

2047 elif error == _lib.SSL_ERROR_ZERO_RETURN: 

2048 raise ZeroReturnError() 

2049 elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP: 

2050 # TODO: This is untested. 

2051 raise WantX509LookupError() 

2052 elif error == _lib.SSL_ERROR_SYSCALL: 

2053 if platform == "win32": 

2054 errno = _ffi.getwinerror()[0] 

2055 else: 

2056 errno = _ffi.errno 

2057 if _lib.ERR_peek_error() == 0 or errno != 0: 

2058 if result < 0 and errno != 0: 

2059 raise SysCallError(errno, errorcode.get(errno)) 

2060 raise SysCallError(-1, "Unexpected EOF") 

2061 else: 

2062 # TODO: This is untested, but I think twisted hits it? 

2063 _raise_current_error() 

2064 elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0: 

2065 # In 3.0.x an unexpected EOF no longer triggers syscall error 

2066 # but we want to maintain compatibility so we check here and 

2067 # raise syscall if it is an EOF. Since we're not actually sure 

2068 # what else could raise SSL_ERROR_SSL we check for the presence 

2069 # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING 

2070 # and if it's not present we just raise an error, which matches 

2071 # the behavior before we added this elif section 

2072 peeked_error = _lib.ERR_peek_error() 

2073 reason = _lib.ERR_GET_REASON(peeked_error) 

2074 if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING: 

2075 _openssl_assert( 

2076 reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING 

2077 ) 

2078 _lib.ERR_clear_error() 

2079 raise SysCallError(-1, "Unexpected EOF") 

2080 else: 

2081 _raise_current_error() 

2082 elif error == _lib.SSL_ERROR_NONE: 

2083 pass 

2084 else: 

2085 _raise_current_error() 

2086 

2087 def get_context(self) -> Context: 

2088 """ 

2089 Retrieve the :class:`Context` object associated with this 

2090 :class:`Connection`. 

2091 """ 

2092 return self._context 

2093 

2094 def set_context(self, context: Context) -> None: 

2095 """ 

2096 Switch this connection to a new session context. 

2097 

2098 :param context: A :class:`Context` instance giving the new session 

2099 context to use. 

2100 """ 

2101 if not isinstance(context, Context): 

2102 raise TypeError("context must be a Context instance") 

2103 

2104 _lib.SSL_set_SSL_CTX(self._ssl, context._context) 

2105 self._context = context 

2106 self._context._used = True 

2107 

2108 def get_servername(self) -> bytes | None: 

2109 """ 

2110 Retrieve the servername extension value if provided in the client hello 

2111 message, or None if there wasn't one. 

2112 

2113 :return: A byte string giving the server name or :data:`None`. 

2114 

2115 .. versionadded:: 0.13 

2116 """ 

2117 name = _lib.SSL_get_servername( 

2118 self._ssl, _lib.TLSEXT_NAMETYPE_host_name 

2119 ) 

2120 if name == _ffi.NULL: 

2121 return None 

2122 

2123 return _ffi.string(name) 

2124 

2125 def set_verify( 

2126 self, mode: int, callback: _VerifyCallback | None = None 

2127 ) -> None: 

2128 """ 

2129 Override the Context object's verification flags for this specific 

2130 connection. See :py:meth:`Context.set_verify` for details. 

2131 """ 

2132 if not isinstance(mode, int): 

2133 raise TypeError("mode must be an integer") 

2134 

2135 if callback is None: 

2136 self._verify_helper = None 

2137 self._verify_callback = None 

2138 _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL) 

2139 else: 

2140 if not callable(callback): 

2141 raise TypeError("callback must be callable") 

2142 

2143 self._verify_helper = _VerifyHelper(callback) 

2144 self._verify_callback = self._verify_helper.callback 

2145 _lib.SSL_set_verify(self._ssl, mode, self._verify_callback) 

2146 

2147 def get_verify_mode(self) -> int: 

2148 """ 

2149 Retrieve the Connection object's verify mode, as set by 

2150 :meth:`set_verify`. 

2151 

2152 :return: The verify mode 

2153 """ 

2154 return _lib.SSL_get_verify_mode(self._ssl) 

2155 

2156 def use_certificate(self, cert: X509 | x509.Certificate) -> None: 

2157 """ 

2158 Load a certificate from a X509 object 

2159 

2160 :param cert: The X509 object 

2161 :return: None 

2162 """ 

2163 # Mirrored from Context.use_certificate 

2164 if not isinstance(cert, X509): 

2165 cert = X509.from_cryptography(cert) 

2166 else: 

2167 warnings.warn( 

2168 ( 

2169 "Passing pyOpenSSL X509 objects is deprecated. You " 

2170 "should use a cryptography.x509.Certificate instead." 

2171 ), 

2172 DeprecationWarning, 

2173 stacklevel=2, 

2174 ) 

2175 

2176 use_result = _lib.SSL_use_certificate(self._ssl, cert._x509) 

2177 if not use_result: 

2178 _raise_current_error() 

2179 

2180 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: 

2181 """ 

2182 Load a private key from a PKey object 

2183 

2184 :param pkey: The PKey object 

2185 :return: None 

2186 """ 

2187 # Mirrored from Context.use_privatekey 

2188 if not isinstance(pkey, PKey): 

2189 pkey = PKey.from_cryptography_key(pkey) 

2190 else: 

2191 warnings.warn( 

2192 ( 

2193 "Passing pyOpenSSL PKey objects is deprecated. You " 

2194 "should use a cryptography private key instead." 

2195 ), 

2196 DeprecationWarning, 

2197 stacklevel=2, 

2198 ) 

2199 

2200 use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey) 

2201 if not use_result: 

2202 self._context._raise_passphrase_exception() 

2203 

2204 def set_ciphertext_mtu(self, mtu: int) -> None: 

2205 """ 

2206 For DTLS, set the maximum UDP payload size (*not* including IP/UDP 

2207 overhead). 

2208 

2209 Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent 

2210 OpenSSL from spontaneously clearing this. 

2211 

2212 :param mtu: An integer giving the maximum transmission unit. 

2213 

2214 .. versionadded:: 21.1 

2215 """ 

2216 _lib.SSL_set_mtu(self._ssl, mtu) 

2217 

2218 def get_cleartext_mtu(self) -> int: 

2219 """ 

2220 For DTLS, get the maximum size of unencrypted data you can pass to 

2221 :meth:`write` without exceeding the MTU (as passed to 

2222 :meth:`set_ciphertext_mtu`). 

2223 

2224 :return: The effective MTU as an integer. 

2225 

2226 .. versionadded:: 21.1 

2227 """ 

2228 

2229 if not hasattr(_lib, "DTLS_get_data_mtu"): 

2230 raise NotImplementedError("requires OpenSSL 1.1.1 or better") 

2231 return _lib.DTLS_get_data_mtu(self._ssl) 

2232 

2233 def set_tlsext_host_name(self, name: bytes) -> None: 

2234 """ 

2235 Set the value of the servername extension to send in the client hello. 

2236 

2237 :param name: A byte string giving the name. 

2238 

2239 .. versionadded:: 0.13 

2240 """ 

2241 if not isinstance(name, bytes): 

2242 raise TypeError("name must be a byte string") 

2243 elif b"\0" in name: 

2244 raise TypeError("name must not contain NUL byte") 

2245 

2246 # XXX I guess this can fail sometimes? 

2247 _lib.SSL_set_tlsext_host_name(self._ssl, name) 

2248 

2249 def pending(self) -> int: 

2250 """ 

2251 Get the number of bytes that can be safely read from the SSL buffer 

2252 (**not** the underlying transport buffer). 

2253 

2254 :return: The number of bytes available in the receive buffer. 

2255 """ 

2256 return _lib.SSL_pending(self._ssl) 

2257 

2258 def send(self, buf: _Buffer, flags: int = 0) -> int: 

2259 """ 

2260 Send data on the connection. NOTE: If you get one of the WantRead, 

2261 WantWrite or WantX509Lookup exceptions on this, you have to call the 

2262 method again with the SAME buffer. 

2263 

2264 :param buf: The string, buffer or memoryview to send 

2265 :param flags: (optional) Included for compatibility with the socket 

2266 API, the value is ignored 

2267 :return: The number of bytes written 

2268 """ 

2269 # Backward compatibility 

2270 buf = _text_to_bytes_and_warn("buf", buf) 

2271 

2272 with _ffi.from_buffer(buf) as data: 

2273 # check len(buf) instead of len(data) for testability 

2274 if len(buf) > 2147483647: 

2275 raise ValueError( 

2276 "Cannot send more than 2**31-1 bytes at once." 

2277 ) 

2278 

2279 result = _lib.SSL_write(self._ssl, data, len(data)) 

2280 self._raise_ssl_error(self._ssl, result) 

2281 

2282 return result 

2283 

2284 write = send 

2285 

2286 def sendall(self, buf: _Buffer, flags: int = 0) -> int: 

2287 """ 

2288 Send "all" data on the connection. This calls send() repeatedly until 

2289 all data is sent. If an error occurs, it's impossible to tell how much 

2290 data has been sent. 

2291 

2292 :param buf: The string, buffer or memoryview to send 

2293 :param flags: (optional) Included for compatibility with the socket 

2294 API, the value is ignored 

2295 :return: The number of bytes written 

2296 """ 

2297 buf = _text_to_bytes_and_warn("buf", buf) 

2298 

2299 with _ffi.from_buffer(buf) as data: 

2300 left_to_send = len(buf) 

2301 total_sent = 0 

2302 

2303 while left_to_send: 

2304 # SSL_write's num arg is an int, 

2305 # so we cannot send more than 2**31-1 bytes at once. 

2306 result = _lib.SSL_write( 

2307 self._ssl, data + total_sent, min(left_to_send, 2147483647) 

2308 ) 

2309 self._raise_ssl_error(self._ssl, result) 

2310 total_sent += result 

2311 left_to_send -= result 

2312 

2313 return total_sent 

2314 

2315 def recv(self, bufsiz: int, flags: int | None = None) -> bytes: 

2316 """ 

2317 Receive data on the connection. 

2318 

2319 :param bufsiz: The maximum number of bytes to read 

2320 :param flags: (optional) The only supported flag is ``MSG_PEEK``, 

2321 all other flags are ignored. 

2322 :return: The string read from the Connection 

2323 """ 

2324 buf = _no_zero_allocator("char[]", bufsiz) 

2325 if flags is not None and flags & socket.MSG_PEEK: 

2326 result = _lib.SSL_peek(self._ssl, buf, bufsiz) 

2327 else: 

2328 result = _lib.SSL_read(self._ssl, buf, bufsiz) 

2329 self._raise_ssl_error(self._ssl, result) 

2330 return _ffi.buffer(buf, result)[:] 

2331 

2332 read = recv 

2333 

2334 def recv_into( 

2335 self, 

2336 buffer: Any, # collections.abc.Buffer once we use Python 3.12+ 

2337 nbytes: int | None = None, 

2338 flags: int | None = None, 

2339 ) -> int: 

2340 """ 

2341 Receive data on the connection and copy it directly into the provided 

2342 buffer, rather than creating a new string. 

2343 

2344 :param buffer: The buffer to copy into. 

2345 :param nbytes: (optional) The maximum number of bytes to read into the 

2346 buffer. If not present, defaults to the size of the buffer. If 

2347 larger than the size of the buffer, is reduced to the size of the 

2348 buffer. 

2349 :param flags: (optional) The only supported flag is ``MSG_PEEK``, 

2350 all other flags are ignored. 

2351 :return: The number of bytes read into the buffer. 

2352 """ 

2353 if nbytes is None: 

2354 nbytes = len(buffer) 

2355 else: 

2356 nbytes = min(nbytes, len(buffer)) 

2357 

2358 # We need to create a temporary buffer. This is annoying, it would be 

2359 # better if we could pass memoryviews straight into the SSL_read call, 

2360 # but right now we can't. Revisit this if CFFI gets that ability. 

2361 buf = _no_zero_allocator("char[]", nbytes) 

2362 if flags is not None and flags & socket.MSG_PEEK: 

2363 result = _lib.SSL_peek(self._ssl, buf, nbytes) 

2364 else: 

2365 result = _lib.SSL_read(self._ssl, buf, nbytes) 

2366 self._raise_ssl_error(self._ssl, result) 

2367 

2368 # This strange line is all to avoid a memory copy. The buffer protocol 

2369 # should allow us to assign a CFFI buffer to the LHS of this line, but 

2370 # on CPython 3.3+ that segfaults. As a workaround, we can temporarily 

2371 # wrap it in a memoryview. 

2372 buffer[:result] = memoryview(_ffi.buffer(buf, result)) 

2373 

2374 return result 

2375 

2376 def _handle_bio_errors(self, bio: Any, result: int) -> typing.NoReturn: 

2377 if _lib.BIO_should_retry(bio): 

2378 if _lib.BIO_should_read(bio): 

2379 raise WantReadError() 

2380 elif _lib.BIO_should_write(bio): 

2381 # TODO: This is untested. 

2382 raise WantWriteError() 

2383 elif _lib.BIO_should_io_special(bio): 

2384 # TODO: This is untested. I think io_special means the socket 

2385 # BIO has a not-yet connected socket. 

2386 raise ValueError("BIO_should_io_special") 

2387 else: 

2388 # TODO: This is untested. 

2389 raise ValueError("unknown bio failure") 

2390 else: 

2391 # TODO: This is untested. 

2392 _raise_current_error() 

2393 

2394 def bio_read(self, bufsiz: int) -> bytes: 

2395 """ 

2396 If the Connection was created with a memory BIO, this method can be 

2397 used to read bytes from the write end of that memory BIO. Many 

2398 Connection methods will add bytes which must be read in this manner or 

2399 the buffer will eventually fill up and the Connection will be able to 

2400 take no further actions. 

2401 

2402 :param bufsiz: The maximum number of bytes to read 

2403 :return: The string read. 

2404 """ 

2405 if self._from_ssl is None: 

2406 raise TypeError("Connection sock was not None") 

2407 

2408 if not isinstance(bufsiz, int): 

2409 raise TypeError("bufsiz must be an integer") 

2410 

2411 buf = _no_zero_allocator("char[]", bufsiz) 

2412 result = _lib.BIO_read(self._from_ssl, buf, bufsiz) 

2413 if result <= 0: 

2414 self._handle_bio_errors(self._from_ssl, result) 

2415 

2416 return _ffi.buffer(buf, result)[:] 

2417 

2418 def bio_write(self, buf: _Buffer) -> int: 

2419 """ 

2420 If the Connection was created with a memory BIO, this method can be 

2421 used to add bytes to the read end of that memory BIO. The Connection 

2422 can then read the bytes (for example, in response to a call to 

2423 :meth:`recv`). 

2424 

2425 :param buf: The string to put into the memory BIO. 

2426 :return: The number of bytes written 

2427 """ 

2428 buf = _text_to_bytes_and_warn("buf", buf) 

2429 

2430 if self._into_ssl is None: 

2431 raise TypeError("Connection sock was not None") 

2432 

2433 with _ffi.from_buffer(buf) as data: 

2434 result = _lib.BIO_write(self._into_ssl, data, len(data)) 

2435 if result <= 0: 

2436 self._handle_bio_errors(self._into_ssl, result) 

2437 return result 

2438 

2439 def renegotiate(self) -> bool: 

2440 """ 

2441 Renegotiate the session. 

2442 

2443 :return: True if the renegotiation can be started, False otherwise 

2444 """ 

2445 if not self.renegotiate_pending(): 

2446 _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1) 

2447 return True 

2448 return False 

2449 

2450 def do_handshake(self) -> None: 

2451 """ 

2452 Perform an SSL handshake (usually called after :meth:`renegotiate` or 

2453 one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can 

2454 raise the same exceptions as :meth:`send` and :meth:`recv`. 

2455 

2456 :return: None. 

2457 """ 

2458 result = _lib.SSL_do_handshake(self._ssl) 

2459 self._raise_ssl_error(self._ssl, result) 

2460 

2461 def renegotiate_pending(self) -> bool: 

2462 """ 

2463 Check if there's a renegotiation in progress, it will return False once 

2464 a renegotiation is finished. 

2465 

2466 :return: Whether there's a renegotiation in progress 

2467 """ 

2468 return _lib.SSL_renegotiate_pending(self._ssl) == 1 

2469 

2470 def total_renegotiations(self) -> int: 

2471 """ 

2472 Find out the total number of renegotiations. 

2473 

2474 :return: The number of renegotiations. 

2475 """ 

2476 return _lib.SSL_total_renegotiations(self._ssl) 

2477 

2478 def connect(self, addr: Any) -> None: 

2479 """ 

2480 Call the :meth:`connect` method of the underlying socket and set up SSL 

2481 on the socket, using the :class:`Context` object supplied to this 

2482 :class:`Connection` object at creation. 

2483 

2484 :param addr: A remote address 

2485 :return: What the socket's connect method returns 

2486 """ 

2487 _lib.SSL_set_connect_state(self._ssl) 

2488 return self._socket.connect(addr) # type: ignore[return-value, union-attr] 

2489 

2490 def connect_ex(self, addr: Any) -> int: 

2491 """ 

2492 Call the :meth:`connect_ex` method of the underlying socket and set up 

2493 SSL on the socket, using the Context object supplied to this Connection 

2494 object at creation. Note that if the :meth:`connect_ex` method of the 

2495 socket doesn't return 0, SSL won't be initialized. 

2496 

2497 :param addr: A remove address 

2498 :return: What the socket's connect_ex method returns 

2499 """ 

2500 connect_ex = self._socket.connect_ex # type: ignore[union-attr] 

2501 self.set_connect_state() 

2502 return connect_ex(addr) 

2503 

2504 def accept(self) -> tuple[Connection, Any]: 

2505 """ 

2506 Call the :meth:`accept` method of the underlying socket and set up SSL 

2507 on the returned socket, using the Context object supplied to this 

2508 :class:`Connection` object at creation. 

2509 

2510 :return: A *(conn, addr)* pair where *conn* is the new 

2511 :class:`Connection` object created, and *address* is as returned by 

2512 the socket's :meth:`accept`. 

2513 """ 

2514 client, addr = self._socket.accept() # type: ignore[union-attr] 

2515 conn = Connection(self._context, client) 

2516 conn.set_accept_state() 

2517 return (conn, addr) 

2518 

2519 def DTLSv1_listen(self) -> None: 

2520 """ 

2521 Call the OpenSSL function DTLSv1_listen on this connection. See the 

2522 OpenSSL manual for more details. 

2523 

2524 :return: None 

2525 """ 

2526 # Possible future extension: return the BIO_ADDR in some form. 

2527 bio_addr = _lib.BIO_ADDR_new() 

2528 try: 

2529 result = _lib.DTLSv1_listen(self._ssl, bio_addr) 

2530 finally: 

2531 _lib.BIO_ADDR_free(bio_addr) 

2532 # DTLSv1_listen is weird. A zero return value means 'didn't find a 

2533 # ClientHello with valid cookie, but keep trying'. So basically 

2534 # WantReadError. But it doesn't work correctly with _raise_ssl_error. 

2535 # So we raise it manually instead. 

2536 if self._cookie_generate_helper is not None: 

2537 self._cookie_generate_helper.raise_if_problem() 

2538 if self._cookie_verify_helper is not None: 

2539 self._cookie_verify_helper.raise_if_problem() 

2540 if result == 0: 

2541 raise WantReadError() 

2542 if result < 0: 

2543 self._raise_ssl_error(self._ssl, result) 

2544 

2545 def DTLSv1_get_timeout(self) -> int | None: 

2546 """ 

2547 Determine when the DTLS SSL object next needs to perform internal 

2548 processing due to the passage of time. 

2549 

2550 When the returned number of seconds have passed, the 

2551 :meth:`DTLSv1_handle_timeout` method needs to be called. 

2552 

2553 :return: The time left in seconds before the next timeout or `None` 

2554 if no timeout is currently active. 

2555 """ 

2556 ptv_sec = _ffi.new("time_t *") 

2557 ptv_usec = _ffi.new("long *") 

2558 if _lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec): 

2559 return ptv_sec[0] + (ptv_usec[0] / 1000000) 

2560 else: 

2561 return None 

2562 

2563 def DTLSv1_handle_timeout(self) -> bool: 

2564 """ 

2565 Handles any timeout events which have become pending on a DTLS SSL 

2566 object. 

2567 

2568 :return: `True` if there was a pending timeout, `False` otherwise. 

2569 """ 

2570 result = _lib.DTLSv1_handle_timeout(self._ssl) 

2571 if result < 0: 

2572 self._raise_ssl_error(self._ssl, result) 

2573 assert False, "unreachable" 

2574 else: 

2575 return bool(result) 

2576 

2577 def bio_shutdown(self) -> None: 

2578 """ 

2579 If the Connection was created with a memory BIO, this method can be 

2580 used to indicate that *end of file* has been reached on the read end of 

2581 that memory BIO. 

2582 

2583 :return: None 

2584 """ 

2585 if self._from_ssl is None: 

2586 raise TypeError("Connection sock was not None") 

2587 

2588 _lib.BIO_set_mem_eof_return(self._into_ssl, 0) 

2589 

2590 def shutdown(self) -> bool: 

2591 """ 

2592 Send the shutdown message to the Connection. 

2593 

2594 :return: True if the shutdown completed successfully (i.e. both sides 

2595 have sent closure alerts), False otherwise (in which case you 

2596 call :meth:`recv` or :meth:`send` when the connection becomes 

2597 readable/writeable). 

2598 """ 

2599 result = _lib.SSL_shutdown(self._ssl) 

2600 if result < 0: 

2601 self._raise_ssl_error(self._ssl, result) 

2602 assert False, "unreachable" 

2603 elif result > 0: 

2604 return True 

2605 else: 

2606 return False 

2607 

2608 def get_cipher_list(self) -> list[str]: 

2609 """ 

2610 Retrieve the list of ciphers used by the Connection object. 

2611 

2612 :return: A list of native cipher strings. 

2613 """ 

2614 ciphers = [] 

2615 for i in count(): 

2616 result = _lib.SSL_get_cipher_list(self._ssl, i) 

2617 if result == _ffi.NULL: 

2618 break 

2619 ciphers.append(_ffi.string(result).decode("utf-8")) 

2620 return ciphers 

2621 

2622 def get_client_ca_list(self) -> list[X509Name]: 

2623 """ 

2624 Get CAs whose certificates are suggested for client authentication. 

2625 

2626 :return: If this is a server connection, the list of certificate 

2627 authorities that will be sent or has been sent to the client, as 

2628 controlled by this :class:`Connection`'s :class:`Context`. 

2629 

2630 If this is a client connection, the list will be empty until the 

2631 connection with the server is established. 

2632 

2633 .. versionadded:: 0.10 

2634 """ 

2635 ca_names = _lib.SSL_get_client_CA_list(self._ssl) 

2636 if ca_names == _ffi.NULL: 

2637 # TODO: This is untested. 

2638 return [] 

2639 

2640 result = [] 

2641 for i in range(_lib.sk_X509_NAME_num(ca_names)): 

2642 name = _lib.sk_X509_NAME_value(ca_names, i) 

2643 copy = _lib.X509_NAME_dup(name) 

2644 _openssl_assert(copy != _ffi.NULL) 

2645 

2646 pyname = X509Name.__new__(X509Name) 

2647 pyname._name = _ffi.gc(copy, _lib.X509_NAME_free) 

2648 result.append(pyname) 

2649 return result 

2650 

2651 def makefile(self, *args: Any, **kwargs: Any) -> typing.NoReturn: 

2652 """ 

2653 The makefile() method is not implemented, since there is no dup 

2654 semantics for SSL connections 

2655 

2656 :raise: NotImplementedError 

2657 """ 

2658 raise NotImplementedError( 

2659 "Cannot make file object of OpenSSL.SSL.Connection" 

2660 ) 

2661 

2662 def get_app_data(self) -> Any: 

2663 """ 

2664 Retrieve application data as set by :meth:`set_app_data`. 

2665 

2666 :return: The application data 

2667 """ 

2668 return self._app_data 

2669 

2670 def set_app_data(self, data: Any) -> None: 

2671 """ 

2672 Set application data 

2673 

2674 :param data: The application data 

2675 :return: None 

2676 """ 

2677 self._app_data = data 

2678 

2679 def get_shutdown(self) -> int: 

2680 """ 

2681 Get the shutdown state of the Connection. 

2682 

2683 :return: The shutdown state, a bitvector of SENT_SHUTDOWN, 

2684 RECEIVED_SHUTDOWN. 

2685 """ 

2686 return _lib.SSL_get_shutdown(self._ssl) 

2687 

2688 def set_shutdown(self, state: int) -> None: 

2689 """ 

2690 Set the shutdown state of the Connection. 

2691 

2692 :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN. 

2693 :return: None 

2694 """ 

2695 if not isinstance(state, int): 

2696 raise TypeError("state must be an integer") 

2697 

2698 _lib.SSL_set_shutdown(self._ssl, state) 

2699 

2700 def get_state_string(self) -> bytes: 

2701 """ 

2702 Retrieve a verbose string detailing the state of the Connection. 

2703 

2704 :return: A string representing the state 

2705 """ 

2706 return _ffi.string(_lib.SSL_state_string_long(self._ssl)) 

2707 

2708 def server_random(self) -> bytes | None: 

2709 """ 

2710 Retrieve the random value used with the server hello message. 

2711 

2712 :return: A string representing the state 

2713 """ 

2714 session = _lib.SSL_get_session(self._ssl) 

2715 if session == _ffi.NULL: 

2716 return None 

2717 length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0) 

2718 _openssl_assert(length > 0) 

2719 outp = _no_zero_allocator("unsigned char[]", length) 

2720 _lib.SSL_get_server_random(self._ssl, outp, length) 

2721 return _ffi.buffer(outp, length)[:] 

2722 

2723 def client_random(self) -> bytes | None: 

2724 """ 

2725 Retrieve the random value used with the client hello message. 

2726 

2727 :return: A string representing the state 

2728 """ 

2729 session = _lib.SSL_get_session(self._ssl) 

2730 if session == _ffi.NULL: 

2731 return None 

2732 

2733 length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0) 

2734 _openssl_assert(length > 0) 

2735 outp = _no_zero_allocator("unsigned char[]", length) 

2736 _lib.SSL_get_client_random(self._ssl, outp, length) 

2737 return _ffi.buffer(outp, length)[:] 

2738 

2739 def master_key(self) -> bytes | None: 

2740 """ 

2741 Retrieve the value of the master key for this session. 

2742 

2743 :return: A string representing the state 

2744 """ 

2745 session = _lib.SSL_get_session(self._ssl) 

2746 if session == _ffi.NULL: 

2747 return None 

2748 

2749 length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0) 

2750 _openssl_assert(length > 0) 

2751 outp = _no_zero_allocator("unsigned char[]", length) 

2752 _lib.SSL_SESSION_get_master_key(session, outp, length) 

2753 return _ffi.buffer(outp, length)[:] 

2754 

2755 def export_keying_material( 

2756 self, label: bytes, olen: int, context: bytes | None = None 

2757 ) -> bytes: 

2758 """ 

2759 Obtain keying material for application use. 

2760 

2761 :param: label - a disambiguating label string as described in RFC 5705 

2762 :param: olen - the length of the exported key material in bytes 

2763 :param: context - a per-association context value 

2764 :return: the exported key material bytes or None 

2765 """ 

2766 outp = _no_zero_allocator("unsigned char[]", olen) 

2767 context_buf = _ffi.NULL 

2768 context_len = 0 

2769 use_context = 0 

2770 if context is not None: 

2771 context_buf = context 

2772 context_len = len(context) 

2773 use_context = 1 

2774 success = _lib.SSL_export_keying_material( 

2775 self._ssl, 

2776 outp, 

2777 olen, 

2778 label, 

2779 len(label), 

2780 context_buf, 

2781 context_len, 

2782 use_context, 

2783 ) 

2784 _openssl_assert(success == 1) 

2785 return _ffi.buffer(outp, olen)[:] 

2786 

2787 def sock_shutdown(self, *args: Any, **kwargs: Any) -> None: 

2788 """ 

2789 Call the :meth:`shutdown` method of the underlying socket. 

2790 See :manpage:`shutdown(2)`. 

2791 

2792 :return: What the socket's shutdown() method returns 

2793 """ 

2794 return self._socket.shutdown(*args, **kwargs) # type: ignore[return-value, union-attr] 

2795 

2796 @typing.overload 

2797 def get_certificate( 

2798 self, *, as_cryptography: typing.Literal[True] 

2799 ) -> x509.Certificate | None: 

2800 pass 

2801 

2802 @typing.overload 

2803 def get_certificate( 

2804 self, *, as_cryptography: typing.Literal[False] = False 

2805 ) -> X509 | None: 

2806 pass 

2807 

2808 def get_certificate( 

2809 self, 

2810 *, 

2811 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2812 ) -> X509 | x509.Certificate | None: 

2813 """ 

2814 Retrieve the local certificate (if any) 

2815 

2816 :param bool as_cryptography: Controls whether a 

2817 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509`` 

2818 object should be returned. 

2819 

2820 :return: The local certificate 

2821 """ 

2822 cert = _lib.SSL_get_certificate(self._ssl) 

2823 if cert != _ffi.NULL: 

2824 _lib.X509_up_ref(cert) 

2825 pycert = X509._from_raw_x509_ptr(cert) 

2826 if as_cryptography: 

2827 return pycert.to_cryptography() 

2828 return pycert 

2829 return None 

2830 

2831 @typing.overload 

2832 def get_peer_certificate( 

2833 self, *, as_cryptography: typing.Literal[True] 

2834 ) -> x509.Certificate | None: 

2835 pass 

2836 

2837 @typing.overload 

2838 def get_peer_certificate( 

2839 self, *, as_cryptography: typing.Literal[False] = False 

2840 ) -> X509 | None: 

2841 pass 

2842 

2843 def get_peer_certificate( 

2844 self, 

2845 *, 

2846 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2847 ) -> X509 | x509.Certificate | None: 

2848 """ 

2849 Retrieve the other side's certificate (if any) 

2850 

2851 :param bool as_cryptography: Controls whether a 

2852 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509`` 

2853 object should be returned. 

2854 

2855 :return: The peer's certificate 

2856 """ 

2857 cert = _lib.SSL_get_peer_certificate(self._ssl) 

2858 if cert != _ffi.NULL: 

2859 pycert = X509._from_raw_x509_ptr(cert) 

2860 if as_cryptography: 

2861 return pycert.to_cryptography() 

2862 return pycert 

2863 return None 

2864 

2865 @staticmethod 

2866 def _cert_stack_to_list(cert_stack: Any) -> list[X509]: 

2867 """ 

2868 Internal helper to convert a STACK_OF(X509) to a list of X509 

2869 instances. 

2870 """ 

2871 result = [] 

2872 for i in range(_lib.sk_X509_num(cert_stack)): 

2873 cert = _lib.sk_X509_value(cert_stack, i) 

2874 _openssl_assert(cert != _ffi.NULL) 

2875 res = _lib.X509_up_ref(cert) 

2876 _openssl_assert(res >= 1) 

2877 pycert = X509._from_raw_x509_ptr(cert) 

2878 result.append(pycert) 

2879 return result 

2880 

2881 @staticmethod 

2882 def _cert_stack_to_cryptography_list( 

2883 cert_stack: Any, 

2884 ) -> list[x509.Certificate]: 

2885 """ 

2886 Internal helper to convert a STACK_OF(X509) to a list of X509 

2887 instances. 

2888 """ 

2889 result = [] 

2890 for i in range(_lib.sk_X509_num(cert_stack)): 

2891 cert = _lib.sk_X509_value(cert_stack, i) 

2892 _openssl_assert(cert != _ffi.NULL) 

2893 res = _lib.X509_up_ref(cert) 

2894 _openssl_assert(res >= 1) 

2895 pycert = X509._from_raw_x509_ptr(cert) 

2896 result.append(pycert.to_cryptography()) 

2897 return result 

2898 

2899 @typing.overload 

2900 def get_peer_cert_chain( 

2901 self, *, as_cryptography: typing.Literal[True] 

2902 ) -> list[x509.Certificate] | None: 

2903 pass 

2904 

2905 @typing.overload 

2906 def get_peer_cert_chain( 

2907 self, *, as_cryptography: typing.Literal[False] = False 

2908 ) -> list[X509] | None: 

2909 pass 

2910 

2911 def get_peer_cert_chain( 

2912 self, 

2913 *, 

2914 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2915 ) -> list[X509] | list[x509.Certificate] | None: 

2916 """ 

2917 Retrieve the other side's certificate (if any) 

2918 

2919 :param bool as_cryptography: Controls whether a list of 

2920 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509`` 

2921 object should be returned. 

2922 

2923 :return: A list of X509 instances giving the peer's certificate chain, 

2924 or None if it does not have one. 

2925 """ 

2926 cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl) 

2927 if cert_stack == _ffi.NULL: 

2928 return None 

2929 

2930 if as_cryptography: 

2931 return self._cert_stack_to_cryptography_list(cert_stack) 

2932 return self._cert_stack_to_list(cert_stack) 

2933 

2934 @typing.overload 

2935 def get_verified_chain( 

2936 self, *, as_cryptography: typing.Literal[True] 

2937 ) -> list[x509.Certificate] | None: 

2938 pass 

2939 

2940 @typing.overload 

2941 def get_verified_chain( 

2942 self, *, as_cryptography: typing.Literal[False] = False 

2943 ) -> list[X509] | None: 

2944 pass 

2945 

2946 def get_verified_chain( 

2947 self, 

2948 *, 

2949 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2950 ) -> list[X509] | list[x509.Certificate] | None: 

2951 """ 

2952 Retrieve the verified certificate chain of the peer including the 

2953 peer's end entity certificate. It must be called after a session has 

2954 been successfully established. If peer verification was not successful 

2955 the chain may be incomplete, invalid, or None. 

2956 

2957 :param bool as_cryptography: Controls whether a list of 

2958 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509`` 

2959 object should be returned. 

2960 

2961 :return: A list of X509 instances giving the peer's verified 

2962 certificate chain, or None if it does not have one. 

2963 

2964 .. versionadded:: 20.0 

2965 """ 

2966 # OpenSSL 1.1+ 

2967 cert_stack = _lib.SSL_get0_verified_chain(self._ssl) 

2968 if cert_stack == _ffi.NULL: 

2969 return None 

2970 

2971 if as_cryptography: 

2972 return self._cert_stack_to_cryptography_list(cert_stack) 

2973 return self._cert_stack_to_list(cert_stack) 

2974 

2975 def want_read(self) -> bool: 

2976 """ 

2977 Checks if more data has to be read from the transport layer to complete 

2978 an operation. 

2979 

2980 :return: True iff more data has to be read 

2981 """ 

2982 return _lib.SSL_want_read(self._ssl) 

2983 

2984 def want_write(self) -> bool: 

2985 """ 

2986 Checks if there is data to write to the transport layer to complete an 

2987 operation. 

2988 

2989 :return: True iff there is data to write 

2990 """ 

2991 return _lib.SSL_want_write(self._ssl) 

2992 

2993 def set_accept_state(self) -> None: 

2994 """ 

2995 Set the connection to work in server mode. The handshake will be 

2996 handled automatically by read/write. 

2997 

2998 :return: None 

2999 """ 

3000 _lib.SSL_set_accept_state(self._ssl) 

3001 

3002 def set_connect_state(self) -> None: 

3003 """ 

3004 Set the connection to work in client mode. The handshake will be 

3005 handled automatically by read/write. 

3006 

3007 :return: None 

3008 """ 

3009 _lib.SSL_set_connect_state(self._ssl) 

3010 

3011 def get_session(self) -> Session | None: 

3012 """ 

3013 Returns the Session currently used. 

3014 

3015 :return: An instance of :class:`OpenSSL.SSL.Session` or 

3016 :obj:`None` if no session exists. 

3017 

3018 .. versionadded:: 0.14 

3019 """ 

3020 session = _lib.SSL_get1_session(self._ssl) 

3021 if session == _ffi.NULL: 

3022 return None 

3023 

3024 pysession = Session.__new__(Session) 

3025 pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free) 

3026 return pysession 

3027 

3028 def set_session(self, session: Session) -> None: 

3029 """ 

3030 Set the session to be used when the TLS/SSL connection is established. 

3031 

3032 :param session: A Session instance representing the session to use. 

3033 :returns: None 

3034 

3035 .. versionadded:: 0.14 

3036 """ 

3037 if not isinstance(session, Session): 

3038 raise TypeError("session must be a Session instance") 

3039 

3040 result = _lib.SSL_set_session(self._ssl, session._session) 

3041 _openssl_assert(result == 1) 

3042 

3043 def _get_finished_message( 

3044 self, function: Callable[[Any, Any, int], int] 

3045 ) -> bytes | None: 

3046 """ 

3047 Helper to implement :meth:`get_finished` and 

3048 :meth:`get_peer_finished`. 

3049 

3050 :param function: Either :data:`SSL_get_finished`: or 

3051 :data:`SSL_get_peer_finished`. 

3052 

3053 :return: :data:`None` if the desired message has not yet been 

3054 received, otherwise the contents of the message. 

3055 """ 

3056 # The OpenSSL documentation says nothing about what might happen if the 

3057 # count argument given is zero. Specifically, it doesn't say whether 

3058 # the output buffer may be NULL in that case or not. Inspection of the 

3059 # implementation reveals that it calls memcpy() unconditionally. 

3060 # Section 7.1.4, paragraph 1 of the C standard suggests that 

3061 # memcpy(NULL, source, 0) is not guaranteed to produce defined (let 

3062 # alone desirable) behavior (though it probably does on just about 

3063 # every implementation...) 

3064 # 

3065 # Allocate a tiny buffer to pass in (instead of just passing NULL as 

3066 # one might expect) for the initial call so as to be safe against this 

3067 # potentially undefined behavior. 

3068 empty = _ffi.new("char[]", 0) 

3069 size = function(self._ssl, empty, 0) 

3070 if size == 0: 

3071 # No Finished message so far. 

3072 return None 

3073 

3074 buf = _no_zero_allocator("char[]", size) 

3075 function(self._ssl, buf, size) 

3076 return _ffi.buffer(buf, size)[:] 

3077 

3078 def get_finished(self) -> bytes | None: 

3079 """ 

3080 Obtain the latest TLS Finished message that we sent. 

3081 

3082 :return: The contents of the message or :obj:`None` if the TLS 

3083 handshake has not yet completed. 

3084 

3085 .. versionadded:: 0.15 

3086 """ 

3087 return self._get_finished_message(_lib.SSL_get_finished) 

3088 

3089 def get_peer_finished(self) -> bytes | None: 

3090 """ 

3091 Obtain the latest TLS Finished message that we received from the peer. 

3092 

3093 :return: The contents of the message or :obj:`None` if the TLS 

3094 handshake has not yet completed. 

3095 

3096 .. versionadded:: 0.15 

3097 """ 

3098 return self._get_finished_message(_lib.SSL_get_peer_finished) 

3099 

3100 def get_cipher_name(self) -> str | None: 

3101 """ 

3102 Obtain the name of the currently used cipher. 

3103 

3104 :returns: The name of the currently used cipher or :obj:`None` 

3105 if no connection has been established. 

3106 

3107 .. versionadded:: 0.15 

3108 """ 

3109 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3110 if cipher == _ffi.NULL: 

3111 return None 

3112 else: 

3113 name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher)) 

3114 return name.decode("utf-8") 

3115 

3116 def get_cipher_bits(self) -> int | None: 

3117 """ 

3118 Obtain the number of secret bits of the currently used cipher. 

3119 

3120 :returns: The number of secret bits of the currently used cipher 

3121 or :obj:`None` if no connection has been established. 

3122 

3123 .. versionadded:: 0.15 

3124 """ 

3125 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3126 if cipher == _ffi.NULL: 

3127 return None 

3128 else: 

3129 return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL) 

3130 

3131 def get_cipher_version(self) -> str | None: 

3132 """ 

3133 Obtain the protocol version of the currently used cipher. 

3134 

3135 :returns: The protocol name of the currently used cipher 

3136 or :obj:`None` if no connection has been established. 

3137 

3138 .. versionadded:: 0.15 

3139 """ 

3140 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3141 if cipher == _ffi.NULL: 

3142 return None 

3143 else: 

3144 version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher)) 

3145 return version.decode("utf-8") 

3146 

3147 def get_protocol_version_name(self) -> str: 

3148 """ 

3149 Retrieve the protocol version of the current connection. 

3150 

3151 :returns: The TLS version of the current connection, for example 

3152 the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown`` 

3153 for connections that were not successfully established. 

3154 """ 

3155 version = _ffi.string(_lib.SSL_get_version(self._ssl)) 

3156 return version.decode("utf-8") 

3157 

3158 def get_protocol_version(self) -> int: 

3159 """ 

3160 Retrieve the SSL or TLS protocol version of the current connection. 

3161 

3162 :returns: The TLS version of the current connection. For example, 

3163 it will return ``0x769`` for connections made over TLS version 1. 

3164 """ 

3165 version = _lib.SSL_version(self._ssl) 

3166 return version 

3167 

3168 def set_alpn_protos(self, protos: list[bytes]) -> None: 

3169 """ 

3170 Specify the client's ALPN protocol list. 

3171 

3172 These protocols are offered to the server during protocol negotiation. 

3173 

3174 :param protos: A list of the protocols to be offered to the server. 

3175 This list should be a Python list of bytestrings representing the 

3176 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. 

3177 """ 

3178 # Different versions of OpenSSL are inconsistent about how they handle 

3179 # empty proto lists (see #1043), so we avoid the problem entirely by 

3180 # rejecting them ourselves. 

3181 if not protos: 

3182 raise ValueError("at least one protocol must be specified") 

3183 

3184 # Take the list of protocols and join them together, prefixing them 

3185 # with their lengths. 

3186 protostr = b"".join( 

3187 chain.from_iterable((bytes((len(p),)), p) for p in protos) 

3188 ) 

3189 

3190 # Build a C string from the list. We don't need to save this off 

3191 # because OpenSSL immediately copies the data out. 

3192 input_str = _ffi.new("unsigned char[]", protostr) 

3193 

3194 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: 

3195 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() 

3196 # return 0 on success, and non-0 on failure. 

3197 # WARNING: these functions reverse the return value convention. 

3198 _openssl_assert( 

3199 _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0 

3200 ) 

3201 

3202 def get_alpn_proto_negotiated(self) -> bytes: 

3203 """ 

3204 Get the protocol that was negotiated by ALPN. 

3205 

3206 :returns: A bytestring of the protocol name. If no protocol has been 

3207 negotiated yet, returns an empty bytestring. 

3208 """ 

3209 data = _ffi.new("unsigned char **") 

3210 data_len = _ffi.new("unsigned int *") 

3211 

3212 _lib.SSL_get0_alpn_selected(self._ssl, data, data_len) 

3213 

3214 if not data_len: 

3215 return b"" 

3216 

3217 return _ffi.buffer(data[0], data_len[0])[:] 

3218 

3219 def get_selected_srtp_profile(self) -> bytes: 

3220 """ 

3221 Get the SRTP protocol which was negotiated. 

3222 

3223 :returns: A bytestring of the SRTP profile name. If no profile has been 

3224 negotiated yet, returns an empty bytestring. 

3225 """ 

3226 profile = _lib.SSL_get_selected_srtp_profile(self._ssl) 

3227 if not profile: 

3228 return b"" 

3229 

3230 return _ffi.string(profile.name) 

3231 

3232 @_requires_ssl_get0_group_name 

3233 def get_group_name(self) -> str | None: 

3234 """ 

3235 Get the name of the negotiated group for the key exchange. 

3236 

3237 :return: A string giving the group name or :data:`None`. 

3238 """ 

3239 # Do not remove this guard. 

3240 # SSL_get0_group_name crashes with a segfault if called without 

3241 # an established connection (should return NULL but doesn't). 

3242 session = _lib.SSL_get_session(self._ssl) 

3243 if session == _ffi.NULL: 

3244 return None 

3245 

3246 group_name = _lib.SSL_get0_group_name(self._ssl) 

3247 if group_name == _ffi.NULL: 

3248 return None 

3249 

3250 return _ffi.string(group_name).decode("utf-8") 

3251 

3252 def request_ocsp(self) -> None: 

3253 """ 

3254 Called to request that the server sends stapled OCSP data, if 

3255 available. If this is not called on the client side then the server 

3256 will not send OCSP data. Should be used in conjunction with 

3257 :meth:`Context.set_ocsp_client_callback`. 

3258 """ 

3259 rc = _lib.SSL_set_tlsext_status_type( 

3260 self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp 

3261 ) 

3262 _openssl_assert(rc == 1) 

3263 

3264 def set_info_callback( 

3265 self, callback: Callable[[Connection, int, int], None] 

3266 ) -> None: 

3267 """ 

3268 Set the information callback to *callback*. This function will be 

3269 called from time to time during SSL handshakes. 

3270 

3271 :param callback: The Python callback to use. This should take three 

3272 arguments: a Connection object and two integers. The first integer 

3273 specifies where in the SSL handshake the function was called, and 

3274 the other the return code from a (possibly failed) internal 

3275 function call. 

3276 :return: None 

3277 """ 

3278 

3279 @wraps(callback) 

3280 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def] 

3281 callback(Connection._reverse_mapping[ssl], where, return_code) 

3282 

3283 self._info_callback = _ffi.callback( 

3284 "void (*)(const SSL *, int, int)", wrapper 

3285 ) 

3286 _lib.SSL_set_info_callback(self._ssl, self._info_callback)