Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/SSL.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1244 statements  

1from __future__ import annotations 

2 

3import os 

4import socket 

5import sys 

6import typing 

7import warnings 

8from collections.abc import Sequence 

9from errno import errorcode 

10from functools import partial, wraps 

11from itertools import chain, count 

12from sys import platform 

13from typing import Any, Callable, Optional, TypeVar 

14from weakref import WeakValueDictionary 

15 

16from cryptography import x509 

17from cryptography.hazmat.primitives.asymmetric import ec 

18 

19from OpenSSL._util import ( 

20 StrOrBytesPath as _StrOrBytesPath, 

21) 

22from OpenSSL._util import ( 

23 exception_from_error_queue as _exception_from_error_queue, 

24) 

25from OpenSSL._util import ( 

26 ffi as _ffi, 

27) 

28from OpenSSL._util import ( 

29 lib as _lib, 

30) 

31from OpenSSL._util import ( 

32 make_assert as _make_assert, 

33) 

34from OpenSSL._util import ( 

35 no_zero_allocator as _no_zero_allocator, 

36) 

37from OpenSSL._util import ( 

38 path_bytes as _path_bytes, 

39) 

40from OpenSSL._util import ( 

41 text_to_bytes_and_warn as _text_to_bytes_and_warn, 

42) 

43from OpenSSL.crypto import ( 

44 FILETYPE_PEM, 

45 X509, 

46 PKey, 

47 X509Name, 

48 X509Store, 

49 _EllipticCurve, 

50 _PassphraseHelper, 

51 _PrivateKey, 

52) 

53 

54__all__ = [ 

55 "DTLS_CLIENT_METHOD", 

56 "DTLS_METHOD", 

57 "DTLS_SERVER_METHOD", 

58 "MODE_RELEASE_BUFFERS", 

59 "NO_OVERLAPPING_PROTOCOLS", 

60 "OPENSSL_BUILT_ON", 

61 "OPENSSL_CFLAGS", 

62 "OPENSSL_DIR", 

63 "OPENSSL_PLATFORM", 

64 "OPENSSL_VERSION", 

65 "OPENSSL_VERSION_NUMBER", 

66 "OP_ALL", 

67 "OP_CIPHER_SERVER_PREFERENCE", 

68 "OP_DONT_INSERT_EMPTY_FRAGMENTS", 

69 "OP_EPHEMERAL_RSA", 

70 "OP_MICROSOFT_BIG_SSLV3_BUFFER", 

71 "OP_MICROSOFT_SESS_ID_BUG", 

72 "OP_MSIE_SSLV2_RSA_PADDING", 

73 "OP_NETSCAPE_CA_DN_BUG", 

74 "OP_NETSCAPE_CHALLENGE_BUG", 

75 "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG", 

76 "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG", 

77 "OP_NO_COMPRESSION", 

78 "OP_NO_QUERY_MTU", 

79 "OP_NO_TICKET", 

80 "OP_PKCS1_CHECK_1", 

81 "OP_PKCS1_CHECK_2", 

82 "OP_SINGLE_DH_USE", 

83 "OP_SINGLE_ECDH_USE", 

84 "OP_SSLEAY_080_CLIENT_DH_BUG", 

85 "OP_SSLREF2_REUSE_CERT_TYPE_BUG", 

86 "OP_TLS_BLOCK_PADDING_BUG", 

87 "OP_TLS_D5_BUG", 

88 "OP_TLS_ROLLBACK_BUG", 

89 "RECEIVED_SHUTDOWN", 

90 "SENT_SHUTDOWN", 

91 "SESS_CACHE_BOTH", 

92 "SESS_CACHE_CLIENT", 

93 "SESS_CACHE_NO_AUTO_CLEAR", 

94 "SESS_CACHE_NO_INTERNAL", 

95 "SESS_CACHE_NO_INTERNAL_LOOKUP", 

96 "SESS_CACHE_NO_INTERNAL_STORE", 

97 "SESS_CACHE_OFF", 

98 "SESS_CACHE_SERVER", 

99 "SSL3_VERSION", 

100 "SSLEAY_BUILT_ON", 

101 "SSLEAY_CFLAGS", 

102 "SSLEAY_DIR", 

103 "SSLEAY_PLATFORM", 

104 "SSLEAY_VERSION", 

105 "SSL_CB_ACCEPT_EXIT", 

106 "SSL_CB_ACCEPT_LOOP", 

107 "SSL_CB_ALERT", 

108 "SSL_CB_CONNECT_EXIT", 

109 "SSL_CB_CONNECT_LOOP", 

110 "SSL_CB_EXIT", 

111 "SSL_CB_HANDSHAKE_DONE", 

112 "SSL_CB_HANDSHAKE_START", 

113 "SSL_CB_LOOP", 

114 "SSL_CB_READ", 

115 "SSL_CB_READ_ALERT", 

116 "SSL_CB_WRITE", 

117 "SSL_CB_WRITE_ALERT", 

118 "SSL_ST_ACCEPT", 

119 "SSL_ST_CONNECT", 

120 "SSL_ST_MASK", 

121 "TLS1_1_VERSION", 

122 "TLS1_2_VERSION", 

123 "TLS1_3_VERSION", 

124 "TLS1_VERSION", 

125 "TLS_CLIENT_METHOD", 

126 "TLS_METHOD", 

127 "TLS_SERVER_METHOD", 

128 "VERIFY_CLIENT_ONCE", 

129 "VERIFY_FAIL_IF_NO_PEER_CERT", 

130 "VERIFY_NONE", 

131 "VERIFY_PEER", 

132 "Connection", 

133 "Context", 

134 "Error", 

135 "OP_NO_SSLv2", 

136 "OP_NO_SSLv3", 

137 "OP_NO_TLSv1", 

138 "OP_NO_TLSv1_1", 

139 "OP_NO_TLSv1_2", 

140 "OP_NO_TLSv1_3", 

141 "SSLeay_version", 

142 "SSLv23_METHOD", 

143 "Session", 

144 "SysCallError", 

145 "TLSv1_1_METHOD", 

146 "TLSv1_2_METHOD", 

147 "TLSv1_METHOD", 

148 "WantReadError", 

149 "WantWriteError", 

150 "WantX509LookupError", 

151 "X509VerificationCodes", 

152 "ZeroReturnError", 

153] 

154 

155 

156OPENSSL_VERSION_NUMBER: int = _lib.OPENSSL_VERSION_NUMBER 

157OPENSSL_VERSION: int = _lib.OPENSSL_VERSION 

158OPENSSL_CFLAGS: int = _lib.OPENSSL_CFLAGS 

159OPENSSL_PLATFORM: int = _lib.OPENSSL_PLATFORM 

160OPENSSL_DIR: int = _lib.OPENSSL_DIR 

161OPENSSL_BUILT_ON: int = _lib.OPENSSL_BUILT_ON 

162 

163SSLEAY_VERSION = OPENSSL_VERSION 

164SSLEAY_CFLAGS = OPENSSL_CFLAGS 

165SSLEAY_PLATFORM = OPENSSL_PLATFORM 

166SSLEAY_DIR = OPENSSL_DIR 

167SSLEAY_BUILT_ON = OPENSSL_BUILT_ON 

168 

169SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN 

170RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN 

171 

172SSLv23_METHOD = 3 

173TLSv1_METHOD = 4 

174TLSv1_1_METHOD = 5 

175TLSv1_2_METHOD = 6 

176TLS_METHOD = 7 

177TLS_SERVER_METHOD = 8 

178TLS_CLIENT_METHOD = 9 

179DTLS_METHOD = 10 

180DTLS_SERVER_METHOD = 11 

181DTLS_CLIENT_METHOD = 12 

182 

183SSL3_VERSION: int = _lib.SSL3_VERSION 

184TLS1_VERSION: int = _lib.TLS1_VERSION 

185TLS1_1_VERSION: int = _lib.TLS1_1_VERSION 

186TLS1_2_VERSION: int = _lib.TLS1_2_VERSION 

187TLS1_3_VERSION: int = _lib.TLS1_3_VERSION 

188 

189OP_NO_SSLv2: int = _lib.SSL_OP_NO_SSLv2 

190OP_NO_SSLv3: int = _lib.SSL_OP_NO_SSLv3 

191OP_NO_TLSv1: int = _lib.SSL_OP_NO_TLSv1 

192OP_NO_TLSv1_1: int = _lib.SSL_OP_NO_TLSv1_1 

193OP_NO_TLSv1_2: int = _lib.SSL_OP_NO_TLSv1_2 

194OP_NO_TLSv1_3: int = _lib.SSL_OP_NO_TLSv1_3 

195 

196MODE_RELEASE_BUFFERS: int = _lib.SSL_MODE_RELEASE_BUFFERS 

197 

198OP_SINGLE_DH_USE: int = _lib.SSL_OP_SINGLE_DH_USE 

199OP_SINGLE_ECDH_USE: int = _lib.SSL_OP_SINGLE_ECDH_USE 

200OP_EPHEMERAL_RSA: int = _lib.SSL_OP_EPHEMERAL_RSA 

201OP_MICROSOFT_SESS_ID_BUG: int = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG 

202OP_NETSCAPE_CHALLENGE_BUG: int = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG 

203OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG: int = ( 

204 _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG 

205) 

206OP_SSLREF2_REUSE_CERT_TYPE_BUG: int = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG 

207OP_MICROSOFT_BIG_SSLV3_BUFFER: int = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER 

208OP_MSIE_SSLV2_RSA_PADDING: int = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING 

209OP_SSLEAY_080_CLIENT_DH_BUG: int = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG 

210OP_TLS_D5_BUG: int = _lib.SSL_OP_TLS_D5_BUG 

211OP_TLS_BLOCK_PADDING_BUG: int = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG 

212OP_DONT_INSERT_EMPTY_FRAGMENTS: int = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS 

213OP_CIPHER_SERVER_PREFERENCE: int = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE 

214OP_TLS_ROLLBACK_BUG: int = _lib.SSL_OP_TLS_ROLLBACK_BUG 

215OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1 

216OP_PKCS1_CHECK_2: int = _lib.SSL_OP_PKCS1_CHECK_2 

217OP_NETSCAPE_CA_DN_BUG: int = _lib.SSL_OP_NETSCAPE_CA_DN_BUG 

218OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG: int = ( 

219 _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG 

220) 

221OP_NO_COMPRESSION: int = _lib.SSL_OP_NO_COMPRESSION 

222 

223OP_NO_QUERY_MTU: int = _lib.SSL_OP_NO_QUERY_MTU 

224try: 

225 OP_COOKIE_EXCHANGE: int | None = _lib.SSL_OP_COOKIE_EXCHANGE 

226 __all__.append("OP_COOKIE_EXCHANGE") 

227except AttributeError: 

228 OP_COOKIE_EXCHANGE = None 

229OP_NO_TICKET: int = _lib.SSL_OP_NO_TICKET 

230 

231try: 

232 OP_NO_RENEGOTIATION: int = _lib.SSL_OP_NO_RENEGOTIATION 

233 __all__.append("OP_NO_RENEGOTIATION") 

234except AttributeError: 

235 pass 

236 

237try: 

238 OP_IGNORE_UNEXPECTED_EOF: int = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF 

239 __all__.append("OP_IGNORE_UNEXPECTED_EOF") 

240except AttributeError: 

241 pass 

242 

243try: 

244 OP_LEGACY_SERVER_CONNECT: int = _lib.SSL_OP_LEGACY_SERVER_CONNECT 

245 __all__.append("OP_LEGACY_SERVER_CONNECT") 

246except AttributeError: 

247 pass 

248 

249OP_ALL: int = _lib.SSL_OP_ALL 

250 

251VERIFY_PEER: int = _lib.SSL_VERIFY_PEER 

252VERIFY_FAIL_IF_NO_PEER_CERT: int = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT 

253VERIFY_CLIENT_ONCE: int = _lib.SSL_VERIFY_CLIENT_ONCE 

254VERIFY_NONE: int = _lib.SSL_VERIFY_NONE 

255 

256SESS_CACHE_OFF: int = _lib.SSL_SESS_CACHE_OFF 

257SESS_CACHE_CLIENT: int = _lib.SSL_SESS_CACHE_CLIENT 

258SESS_CACHE_SERVER: int = _lib.SSL_SESS_CACHE_SERVER 

259SESS_CACHE_BOTH: int = _lib.SSL_SESS_CACHE_BOTH 

260SESS_CACHE_NO_AUTO_CLEAR: int = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR 

261SESS_CACHE_NO_INTERNAL_LOOKUP: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP 

262SESS_CACHE_NO_INTERNAL_STORE: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE 

263SESS_CACHE_NO_INTERNAL: int = _lib.SSL_SESS_CACHE_NO_INTERNAL 

264 

265SSL_ST_CONNECT: int = _lib.SSL_ST_CONNECT 

266SSL_ST_ACCEPT: int = _lib.SSL_ST_ACCEPT 

267SSL_ST_MASK: int = _lib.SSL_ST_MASK 

268 

269SSL_CB_LOOP: int = _lib.SSL_CB_LOOP 

270SSL_CB_EXIT: int = _lib.SSL_CB_EXIT 

271SSL_CB_READ: int = _lib.SSL_CB_READ 

272SSL_CB_WRITE: int = _lib.SSL_CB_WRITE 

273SSL_CB_ALERT: int = _lib.SSL_CB_ALERT 

274SSL_CB_READ_ALERT: int = _lib.SSL_CB_READ_ALERT 

275SSL_CB_WRITE_ALERT: int = _lib.SSL_CB_WRITE_ALERT 

276SSL_CB_ACCEPT_LOOP: int = _lib.SSL_CB_ACCEPT_LOOP 

277SSL_CB_ACCEPT_EXIT: int = _lib.SSL_CB_ACCEPT_EXIT 

278SSL_CB_CONNECT_LOOP: int = _lib.SSL_CB_CONNECT_LOOP 

279SSL_CB_CONNECT_EXIT: int = _lib.SSL_CB_CONNECT_EXIT 

280SSL_CB_HANDSHAKE_START: int = _lib.SSL_CB_HANDSHAKE_START 

281SSL_CB_HANDSHAKE_DONE: int = _lib.SSL_CB_HANDSHAKE_DONE 

282 

283_Buffer = typing.Union[bytes, bytearray, memoryview] 

284_T = TypeVar("_T") 

285 

286 

287class _NoOverlappingProtocols: 

288 pass 

289 

290 

291NO_OVERLAPPING_PROTOCOLS = _NoOverlappingProtocols() 

292 

293# Callback types. 

294_ALPNSelectCallback = Callable[ 

295 [ 

296 "Connection", 

297 typing.List[bytes], 

298 ], 

299 typing.Union[bytes, _NoOverlappingProtocols], 

300] 

301_CookieGenerateCallback = Callable[["Connection"], bytes] 

302_CookieVerifyCallback = Callable[["Connection", bytes], bool] 

303_OCSPClientCallback = Callable[["Connection", bytes, Optional[_T]], bool] 

304_OCSPServerCallback = Callable[["Connection", Optional[_T]], bytes] 

305_PassphraseCallback = Callable[[int, bool, Optional[_T]], bytes] 

306_VerifyCallback = Callable[["Connection", X509, int, int, int], bool] 

307 

308 

309class X509VerificationCodes: 

310 """ 

311 Success and error codes for X509 verification, as returned by the 

312 underlying ``X509_STORE_CTX_get_error()`` function and passed by pyOpenSSL 

313 to verification callback functions. 

314 

315 See `OpenSSL Verification Errors 

316 <https://www.openssl.org/docs/manmaster/man3/X509_verify_cert_error_string.html#ERROR-CODES>`_ 

317 for details. 

318 """ 

319 

320 OK = _lib.X509_V_OK 

321 ERR_UNABLE_TO_GET_ISSUER_CERT = _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT 

322 ERR_UNABLE_TO_GET_CRL = _lib.X509_V_ERR_UNABLE_TO_GET_CRL 

323 ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE = ( 

324 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE 

325 ) 

326 ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE = ( 

327 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE 

328 ) 

329 ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY = ( 

330 _lib.X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY 

331 ) 

332 ERR_CERT_SIGNATURE_FAILURE = _lib.X509_V_ERR_CERT_SIGNATURE_FAILURE 

333 ERR_CRL_SIGNATURE_FAILURE = _lib.X509_V_ERR_CRL_SIGNATURE_FAILURE 

334 ERR_CERT_NOT_YET_VALID = _lib.X509_V_ERR_CERT_NOT_YET_VALID 

335 ERR_CERT_HAS_EXPIRED = _lib.X509_V_ERR_CERT_HAS_EXPIRED 

336 ERR_CRL_NOT_YET_VALID = _lib.X509_V_ERR_CRL_NOT_YET_VALID 

337 ERR_CRL_HAS_EXPIRED = _lib.X509_V_ERR_CRL_HAS_EXPIRED 

338 ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD = ( 

339 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD 

340 ) 

341 ERR_ERROR_IN_CERT_NOT_AFTER_FIELD = ( 

342 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD 

343 ) 

344 ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD = ( 

345 _lib.X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD 

346 ) 

347 ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD = ( 

348 _lib.X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD 

349 ) 

350 ERR_OUT_OF_MEM = _lib.X509_V_ERR_OUT_OF_MEM 

351 ERR_DEPTH_ZERO_SELF_SIGNED_CERT = ( 

352 _lib.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT 

353 ) 

354 ERR_SELF_SIGNED_CERT_IN_CHAIN = _lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN 

355 ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY = ( 

356 _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY 

357 ) 

358 ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE = ( 

359 _lib.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE 

360 ) 

361 ERR_CERT_CHAIN_TOO_LONG = _lib.X509_V_ERR_CERT_CHAIN_TOO_LONG 

362 ERR_CERT_REVOKED = _lib.X509_V_ERR_CERT_REVOKED 

363 ERR_INVALID_CA = _lib.X509_V_ERR_INVALID_CA 

364 ERR_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PATH_LENGTH_EXCEEDED 

365 ERR_INVALID_PURPOSE = _lib.X509_V_ERR_INVALID_PURPOSE 

366 ERR_CERT_UNTRUSTED = _lib.X509_V_ERR_CERT_UNTRUSTED 

367 ERR_CERT_REJECTED = _lib.X509_V_ERR_CERT_REJECTED 

368 ERR_SUBJECT_ISSUER_MISMATCH = _lib.X509_V_ERR_SUBJECT_ISSUER_MISMATCH 

369 ERR_AKID_SKID_MISMATCH = _lib.X509_V_ERR_AKID_SKID_MISMATCH 

370 ERR_AKID_ISSUER_SERIAL_MISMATCH = ( 

371 _lib.X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH 

372 ) 

373 ERR_KEYUSAGE_NO_CERTSIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CERTSIGN 

374 ERR_UNABLE_TO_GET_CRL_ISSUER = _lib.X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER 

375 ERR_UNHANDLED_CRITICAL_EXTENSION = ( 

376 _lib.X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION 

377 ) 

378 ERR_KEYUSAGE_NO_CRL_SIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CRL_SIGN 

379 ERR_UNHANDLED_CRITICAL_CRL_EXTENSION = ( 

380 _lib.X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION 

381 ) 

382 ERR_INVALID_NON_CA = _lib.X509_V_ERR_INVALID_NON_CA 

383 ERR_PROXY_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED 

384 ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE = ( 

385 _lib.X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE 

386 ) 

387 ERR_PROXY_CERTIFICATES_NOT_ALLOWED = ( 

388 _lib.X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED 

389 ) 

390 ERR_INVALID_EXTENSION = _lib.X509_V_ERR_INVALID_EXTENSION 

391 ERR_INVALID_POLICY_EXTENSION = _lib.X509_V_ERR_INVALID_POLICY_EXTENSION 

392 ERR_NO_EXPLICIT_POLICY = _lib.X509_V_ERR_NO_EXPLICIT_POLICY 

393 ERR_DIFFERENT_CRL_SCOPE = _lib.X509_V_ERR_DIFFERENT_CRL_SCOPE 

394 ERR_UNSUPPORTED_EXTENSION_FEATURE = ( 

395 _lib.X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE 

396 ) 

397 ERR_UNNESTED_RESOURCE = _lib.X509_V_ERR_UNNESTED_RESOURCE 

398 ERR_PERMITTED_VIOLATION = _lib.X509_V_ERR_PERMITTED_VIOLATION 

399 ERR_EXCLUDED_VIOLATION = _lib.X509_V_ERR_EXCLUDED_VIOLATION 

400 ERR_SUBTREE_MINMAX = _lib.X509_V_ERR_SUBTREE_MINMAX 

401 ERR_UNSUPPORTED_CONSTRAINT_TYPE = ( 

402 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE 

403 ) 

404 ERR_UNSUPPORTED_CONSTRAINT_SYNTAX = ( 

405 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX 

406 ) 

407 ERR_UNSUPPORTED_NAME_SYNTAX = _lib.X509_V_ERR_UNSUPPORTED_NAME_SYNTAX 

408 ERR_CRL_PATH_VALIDATION_ERROR = _lib.X509_V_ERR_CRL_PATH_VALIDATION_ERROR 

409 ERR_HOSTNAME_MISMATCH = _lib.X509_V_ERR_HOSTNAME_MISMATCH 

410 ERR_EMAIL_MISMATCH = _lib.X509_V_ERR_EMAIL_MISMATCH 

411 ERR_IP_ADDRESS_MISMATCH = _lib.X509_V_ERR_IP_ADDRESS_MISMATCH 

412 ERR_APPLICATION_VERIFICATION = _lib.X509_V_ERR_APPLICATION_VERIFICATION 

413 

414 

415# Taken from https://golang.org/src/crypto/x509/root_linux.go 

416_CERTIFICATE_FILE_LOCATIONS = [ 

417 "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc. 

418 "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6 

419 "/etc/ssl/ca-bundle.pem", # OpenSUSE 

420 "/etc/pki/tls/cacert.pem", # OpenELEC 

421 "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7 

422] 

423 

424_CERTIFICATE_PATH_LOCATIONS = [ 

425 "/etc/ssl/certs", # SLES10/SLES11 

426] 

427 

428# These values are compared to output from cffi's ffi.string so they must be 

429# byte strings. 

430_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs" 

431_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem" 

432 

433 

434class Error(Exception): 

435 """ 

436 An error occurred in an `OpenSSL.SSL` API. 

437 """ 

438 

439 

440_raise_current_error = partial(_exception_from_error_queue, Error) 

441_openssl_assert = _make_assert(Error) 

442 

443 

444class WantReadError(Error): 

445 pass 

446 

447 

448class WantWriteError(Error): 

449 pass 

450 

451 

452class WantX509LookupError(Error): 

453 pass 

454 

455 

456class ZeroReturnError(Error): 

457 pass 

458 

459 

460class SysCallError(Error): 

461 pass 

462 

463 

464class _CallbackExceptionHelper: 

465 """ 

466 A base class for wrapper classes that allow for intelligent exception 

467 handling in OpenSSL callbacks. 

468 

469 :ivar list _problems: Any exceptions that occurred while executing in a 

470 context where they could not be raised in the normal way. Typically 

471 this is because OpenSSL has called into some Python code and requires a 

472 return value. The exceptions are saved to be raised later when it is 

473 possible to do so. 

474 """ 

475 

476 def __init__(self) -> None: 

477 self._problems: list[Exception] = [] 

478 

479 def raise_if_problem(self) -> None: 

480 """ 

481 Raise an exception from the OpenSSL error queue or that was previously 

482 captured whe running a callback. 

483 """ 

484 if self._problems: 

485 try: 

486 _raise_current_error() 

487 except Error: 

488 pass 

489 raise self._problems.pop(0) 

490 

491 

492class _VerifyHelper(_CallbackExceptionHelper): 

493 """ 

494 Wrap a callback such that it can be used as a certificate verification 

495 callback. 

496 """ 

497 

498 def __init__(self, callback: _VerifyCallback) -> None: 

499 _CallbackExceptionHelper.__init__(self) 

500 

501 @wraps(callback) 

502 def wrapper(ok, store_ctx): # type: ignore[no-untyped-def] 

503 x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx) 

504 _lib.X509_up_ref(x509) 

505 cert = X509._from_raw_x509_ptr(x509) 

506 error_number = _lib.X509_STORE_CTX_get_error(store_ctx) 

507 error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx) 

508 

509 index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx() 

510 ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index) 

511 connection = Connection._reverse_mapping[ssl] 

512 

513 try: 

514 result = callback( 

515 connection, cert, error_number, error_depth, ok 

516 ) 

517 except Exception as e: 

518 self._problems.append(e) 

519 return 0 

520 else: 

521 if result: 

522 _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK) 

523 return 1 

524 else: 

525 return 0 

526 

527 self.callback = _ffi.callback( 

528 "int (*)(int, X509_STORE_CTX *)", wrapper 

529 ) 

530 

531 

532class _ALPNSelectHelper(_CallbackExceptionHelper): 

533 """ 

534 Wrap a callback such that it can be used as an ALPN selection callback. 

535 """ 

536 

537 def __init__(self, callback: _ALPNSelectCallback) -> None: 

538 _CallbackExceptionHelper.__init__(self) 

539 

540 @wraps(callback) 

541 def wrapper(ssl, out, outlen, in_, inlen, arg): # type: ignore[no-untyped-def] 

542 try: 

543 conn = Connection._reverse_mapping[ssl] 

544 

545 # The string passed to us is made up of multiple 

546 # length-prefixed bytestrings. We need to split that into a 

547 # list. 

548 instr = _ffi.buffer(in_, inlen)[:] 

549 protolist = [] 

550 while instr: 

551 encoded_len = instr[0] 

552 proto = instr[1 : encoded_len + 1] 

553 protolist.append(proto) 

554 instr = instr[encoded_len + 1 :] 

555 

556 # Call the callback 

557 outbytes = callback(conn, protolist) 

558 any_accepted = True 

559 if outbytes is NO_OVERLAPPING_PROTOCOLS: 

560 outbytes = b"" 

561 any_accepted = False 

562 elif not isinstance(outbytes, bytes): 

563 raise TypeError( 

564 "ALPN callback must return a bytestring or the " 

565 "special NO_OVERLAPPING_PROTOCOLS sentinel value." 

566 ) 

567 

568 # Save our callback arguments on the connection object to make 

569 # sure that they don't get freed before OpenSSL can use them. 

570 # Then, return them in the appropriate output parameters. 

571 conn._alpn_select_callback_args = [ 

572 _ffi.new("unsigned char *", len(outbytes)), 

573 _ffi.new("unsigned char[]", outbytes), 

574 ] 

575 outlen[0] = conn._alpn_select_callback_args[0][0] 

576 out[0] = conn._alpn_select_callback_args[1] 

577 if not any_accepted: 

578 return _lib.SSL_TLSEXT_ERR_NOACK 

579 return _lib.SSL_TLSEXT_ERR_OK 

580 except Exception as e: 

581 self._problems.append(e) 

582 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL 

583 

584 self.callback = _ffi.callback( 

585 ( 

586 "int (*)(SSL *, unsigned char **, unsigned char *, " 

587 "const unsigned char *, unsigned int, void *)" 

588 ), 

589 wrapper, 

590 ) 

591 

592 

593class _OCSPServerCallbackHelper(_CallbackExceptionHelper): 

594 """ 

595 Wrap a callback such that it can be used as an OCSP callback for the server 

596 side. 

597 

598 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different 

599 ways. For servers, that callback is expected to retrieve some OCSP data and 

600 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, 

601 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback 

602 is expected to check the OCSP data, and returns a negative value on error, 

603 0 if the response is not acceptable, or positive if it is. These are 

604 mutually exclusive return code behaviours, and they mean that we need two 

605 helpers so that we always return an appropriate error code if the user's 

606 code throws an exception. 

607 

608 Given that we have to have two helpers anyway, these helpers are a bit more 

609 helpery than most: specifically, they hide a few more of the OpenSSL 

610 functions so that the user has an easier time writing these callbacks. 

611 

612 This helper implements the server side. 

613 """ 

614 

615 def __init__(self, callback: _OCSPServerCallback[Any]) -> None: 

616 _CallbackExceptionHelper.__init__(self) 

617 

618 @wraps(callback) 

619 def wrapper(ssl, cdata): # type: ignore[no-untyped-def] 

620 try: 

621 conn = Connection._reverse_mapping[ssl] 

622 

623 # Extract the data if any was provided. 

624 if cdata != _ffi.NULL: 

625 data = _ffi.from_handle(cdata) 

626 else: 

627 data = None 

628 

629 # Call the callback. 

630 ocsp_data = callback(conn, data) 

631 

632 if not isinstance(ocsp_data, bytes): 

633 raise TypeError("OCSP callback must return a bytestring.") 

634 

635 # If the OCSP data was provided, we will pass it to OpenSSL. 

636 # However, we have an early exit here: if no OCSP data was 

637 # provided we will just exit out and tell OpenSSL that there 

638 # is nothing to do. 

639 if not ocsp_data: 

640 return 3 # SSL_TLSEXT_ERR_NOACK 

641 

642 # OpenSSL takes ownership of this data and expects it to have 

643 # been allocated by OPENSSL_malloc. 

644 ocsp_data_length = len(ocsp_data) 

645 data_ptr = _lib.OPENSSL_malloc(ocsp_data_length) 

646 _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data 

647 

648 _lib.SSL_set_tlsext_status_ocsp_resp( 

649 ssl, data_ptr, ocsp_data_length 

650 ) 

651 

652 return 0 

653 except Exception as e: 

654 self._problems.append(e) 

655 return 2 # SSL_TLSEXT_ERR_ALERT_FATAL 

656 

657 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) 

658 

659 

660class _OCSPClientCallbackHelper(_CallbackExceptionHelper): 

661 """ 

662 Wrap a callback such that it can be used as an OCSP callback for the client 

663 side. 

664 

665 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different 

666 ways. For servers, that callback is expected to retrieve some OCSP data and 

667 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, 

668 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback 

669 is expected to check the OCSP data, and returns a negative value on error, 

670 0 if the response is not acceptable, or positive if it is. These are 

671 mutually exclusive return code behaviours, and they mean that we need two 

672 helpers so that we always return an appropriate error code if the user's 

673 code throws an exception. 

674 

675 Given that we have to have two helpers anyway, these helpers are a bit more 

676 helpery than most: specifically, they hide a few more of the OpenSSL 

677 functions so that the user has an easier time writing these callbacks. 

678 

679 This helper implements the client side. 

680 """ 

681 

682 def __init__(self, callback: _OCSPClientCallback[Any]) -> None: 

683 _CallbackExceptionHelper.__init__(self) 

684 

685 @wraps(callback) 

686 def wrapper(ssl, cdata): # type: ignore[no-untyped-def] 

687 try: 

688 conn = Connection._reverse_mapping[ssl] 

689 

690 # Extract the data if any was provided. 

691 if cdata != _ffi.NULL: 

692 data = _ffi.from_handle(cdata) 

693 else: 

694 data = None 

695 

696 # Get the OCSP data. 

697 ocsp_ptr = _ffi.new("unsigned char **") 

698 ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr) 

699 if ocsp_len < 0: 

700 # No OCSP data. 

701 ocsp_data = b"" 

702 else: 

703 # Copy the OCSP data, then pass it to the callback. 

704 ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:] 

705 

706 valid = callback(conn, ocsp_data, data) 

707 

708 # Return 1 on success or 0 on error. 

709 return int(bool(valid)) 

710 

711 except Exception as e: 

712 self._problems.append(e) 

713 # Return negative value if an exception is hit. 

714 return -1 

715 

716 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) 

717 

718 

719class _CookieGenerateCallbackHelper(_CallbackExceptionHelper): 

720 def __init__(self, callback: _CookieGenerateCallback) -> None: 

721 _CallbackExceptionHelper.__init__(self) 

722 

723 max_cookie_len = getattr(_lib, "DTLS1_COOKIE_LENGTH", 255) 

724 

725 @wraps(callback) 

726 def wrapper(ssl, out, outlen): # type: ignore[no-untyped-def] 

727 try: 

728 conn = Connection._reverse_mapping[ssl] 

729 cookie = callback(conn) 

730 if len(cookie) > max_cookie_len: 

731 raise ValueError( 

732 f"Cookie too long (got {len(cookie)} bytes, " 

733 f"max {max_cookie_len})" 

734 ) 

735 out[0 : len(cookie)] = cookie 

736 outlen[0] = len(cookie) 

737 return 1 

738 except Exception as e: 

739 self._problems.append(e) 

740 # "a zero return value can be used to abort the handshake" 

741 return 0 

742 

743 self.callback = _ffi.callback( 

744 "int (*)(SSL *, unsigned char *, unsigned int *)", 

745 wrapper, 

746 ) 

747 

748 

749class _CookieVerifyCallbackHelper(_CallbackExceptionHelper): 

750 def __init__(self, callback: _CookieVerifyCallback) -> None: 

751 _CallbackExceptionHelper.__init__(self) 

752 

753 @wraps(callback) 

754 def wrapper(ssl, c_cookie, cookie_len): # type: ignore[no-untyped-def] 

755 try: 

756 conn = Connection._reverse_mapping[ssl] 

757 return callback(conn, bytes(c_cookie[0:cookie_len])) 

758 except Exception as e: 

759 self._problems.append(e) 

760 return 0 

761 

762 self.callback = _ffi.callback( 

763 "int (*)(SSL *, unsigned char *, unsigned int)", 

764 wrapper, 

765 ) 

766 

767 

768def _asFileDescriptor(obj: Any) -> int: 

769 fd = None 

770 if not isinstance(obj, int): 

771 meth = getattr(obj, "fileno", None) 

772 if meth is not None: 

773 obj = meth() 

774 

775 if isinstance(obj, int): 

776 fd = obj 

777 

778 if not isinstance(fd, int): 

779 raise TypeError("argument must be an int, or have a fileno() method.") 

780 elif fd < 0: 

781 raise ValueError( 

782 f"file descriptor cannot be a negative integer ({fd:i})" 

783 ) 

784 

785 return fd 

786 

787 

788def OpenSSL_version(type: int) -> bytes: 

789 """ 

790 Return a string describing the version of OpenSSL in use. 

791 

792 :param type: One of the :const:`OPENSSL_` constants defined in this module. 

793 """ 

794 return _ffi.string(_lib.OpenSSL_version(type)) 

795 

796 

797SSLeay_version = OpenSSL_version 

798 

799 

800def _make_requires(flag: int, error: str) -> Callable[[_T], _T]: 

801 """ 

802 Builds a decorator that ensures that functions that rely on OpenSSL 

803 functions that are not present in this build raise NotImplementedError, 

804 rather than AttributeError coming out of cryptography. 

805 

806 :param flag: A cryptography flag that guards the functions, e.g. 

807 ``Cryptography_HAS_NEXTPROTONEG``. 

808 :param error: The string to be used in the exception if the flag is false. 

809 """ 

810 

811 def _requires_decorator(func): # type: ignore[no-untyped-def] 

812 if not flag: 

813 

814 @wraps(func) 

815 def explode(*args, **kwargs): # type: ignore[no-untyped-def] 

816 raise NotImplementedError(error) 

817 

818 return explode 

819 else: 

820 return func 

821 

822 return _requires_decorator 

823 

824 

825_requires_keylog = _make_requires( 

826 getattr(_lib, "Cryptography_HAS_KEYLOG", 0), "Key logging not available" 

827) 

828 

829_requires_ssl_get0_group_name = _make_requires( 

830 getattr(_lib, "Cryptography_HAS_SSL_GET0_GROUP_NAME", 0), 

831 "Getting group name is not supported by the linked OpenSSL version", 

832) 

833 

834_requires_ssl_cookie = _make_requires( 

835 getattr(_lib, "Cryptography_HAS_SSL_COOKIE", 0), 

836 "DTLS cookie support is not available", 

837) 

838 

839 

840class Session: 

841 """ 

842 A class representing an SSL session. A session defines certain connection 

843 parameters which may be re-used to speed up the setup of subsequent 

844 connections. 

845 

846 .. versionadded:: 0.14 

847 """ 

848 

849 _session: Any 

850 

851 

852F = TypeVar("F", bound=Callable[..., Any]) 

853 

854 

855def _require_not_used(f: F) -> F: 

856 @wraps(f) 

857 def inner(self: Context, *args: Any, **kwargs: Any) -> Any: 

858 if self._used: 

859 raise ValueError( 

860 "Context has already been used to create a Connection, it " 

861 "cannot be mutated again" 

862 ) 

863 return f(self, *args, **kwargs) 

864 

865 return typing.cast(F, inner) 

866 

867 

868class Context: 

869 """ 

870 :class:`OpenSSL.SSL.Context` instances define the parameters for setting 

871 up new SSL connections. 

872 

873 :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD, 

874 DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD. 

875 SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should 

876 not be used. 

877 """ 

878 

879 _methods: typing.ClassVar[ 

880 dict[int, tuple[Callable[[], Any], int | None]] 

881 ] = { 

882 SSLv23_METHOD: (_lib.TLS_method, None), 

883 TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION), 

884 TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION), 

885 TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION), 

886 TLS_METHOD: (_lib.TLS_method, None), 

887 TLS_SERVER_METHOD: (_lib.TLS_server_method, None), 

888 TLS_CLIENT_METHOD: (_lib.TLS_client_method, None), 

889 DTLS_METHOD: (_lib.DTLS_method, None), 

890 DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None), 

891 DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None), 

892 } 

893 

894 def __init__(self, method: int) -> None: 

895 if not isinstance(method, int): 

896 raise TypeError("method must be an integer") 

897 

898 try: 

899 method_func, version = self._methods[method] 

900 except KeyError: 

901 raise ValueError("No such protocol") 

902 

903 method_obj = method_func() 

904 _openssl_assert(method_obj != _ffi.NULL) 

905 

906 context = _lib.SSL_CTX_new(method_obj) 

907 _openssl_assert(context != _ffi.NULL) 

908 context = _ffi.gc(context, _lib.SSL_CTX_free) 

909 

910 self._context = context 

911 self._used = False 

912 self._passphrase_helper: _PassphraseHelper | None = None 

913 self._passphrase_callback: _PassphraseCallback[Any] | None = None 

914 self._passphrase_userdata: Any | None = None 

915 self._verify_helper: _VerifyHelper | None = None 

916 self._verify_callback: _VerifyCallback | None = None 

917 self._info_callback = None 

918 self._keylog_callback = None 

919 self._tlsext_servername_callback = None 

920 self._app_data = None 

921 self._alpn_select_helper: _ALPNSelectHelper | None = None 

922 self._alpn_select_callback: _ALPNSelectCallback | None = None 

923 self._ocsp_helper: ( 

924 _OCSPClientCallbackHelper | _OCSPServerCallbackHelper | None 

925 ) = None 

926 self._ocsp_callback: ( 

927 _OCSPClientCallback[Any] | _OCSPServerCallback[Any] | None 

928 ) = None 

929 self._ocsp_data: Any | None = None 

930 self._cookie_generate_helper: _CookieGenerateCallbackHelper | None = ( 

931 None 

932 ) 

933 self._cookie_verify_helper: _CookieVerifyCallbackHelper | None = None 

934 

935 self.set_mode( 

936 _lib.SSL_MODE_ENABLE_PARTIAL_WRITE 

937 | _lib.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER 

938 ) 

939 if version is not None: 

940 self.set_min_proto_version(version) 

941 self.set_max_proto_version(version) 

942 

943 @_require_not_used 

944 def set_min_proto_version(self, version: int) -> None: 

945 """ 

946 Set the minimum supported protocol version. Setting the minimum 

947 version to 0 will enable protocol versions down to the lowest version 

948 supported by the library. 

949 

950 If the underlying OpenSSL build is missing support for the selected 

951 version, this method will raise an exception. 

952 """ 

953 _openssl_assert( 

954 _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1 

955 ) 

956 

957 @_require_not_used 

958 def set_max_proto_version(self, version: int) -> None: 

959 """ 

960 Set the maximum supported protocol version. Setting the maximum 

961 version to 0 will enable protocol versions up to the highest version 

962 supported by the library. 

963 

964 If the underlying OpenSSL build is missing support for the selected 

965 version, this method will raise an exception. 

966 """ 

967 _openssl_assert( 

968 _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1 

969 ) 

970 

971 @_require_not_used 

972 def load_verify_locations( 

973 self, 

974 cafile: _StrOrBytesPath | None, 

975 capath: _StrOrBytesPath | None = None, 

976 ) -> None: 

977 """ 

978 Let SSL know where we can find trusted certificates for the certificate 

979 chain. Note that the certificates have to be in PEM format. 

980 

981 If capath is passed, it must be a directory prepared using the 

982 ``c_rehash`` tool included with OpenSSL. Either, but not both, of 

983 *pemfile* or *capath* may be :data:`None`. 

984 

985 :param cafile: In which file we can find the certificates (``bytes`` or 

986 ``str``). 

987 :param capath: In which directory we can find the certificates 

988 (``bytes`` or ``str``). 

989 

990 :return: None 

991 """ 

992 if cafile is None: 

993 cafile = _ffi.NULL 

994 else: 

995 cafile = _path_bytes(cafile) 

996 

997 if capath is None: 

998 capath = _ffi.NULL 

999 else: 

1000 capath = _path_bytes(capath) 

1001 

1002 load_result = _lib.SSL_CTX_load_verify_locations( 

1003 self._context, cafile, capath 

1004 ) 

1005 if not load_result: 

1006 _raise_current_error() 

1007 

1008 def _wrap_callback( 

1009 self, callback: _PassphraseCallback[_T] 

1010 ) -> _PassphraseHelper: 

1011 @wraps(callback) 

1012 def wrapper(size: int, verify: bool, userdata: Any) -> bytes: 

1013 return callback(size, verify, self._passphrase_userdata) 

1014 

1015 return _PassphraseHelper( 

1016 FILETYPE_PEM, wrapper, more_args=True, truncate=True 

1017 ) 

1018 

1019 @_require_not_used 

1020 def set_passwd_cb( 

1021 self, 

1022 callback: _PassphraseCallback[_T], 

1023 userdata: _T | None = None, 

1024 ) -> None: 

1025 """ 

1026 Set the passphrase callback. This function will be called 

1027 when a private key with a passphrase is loaded. 

1028 

1029 :param callback: The Python callback to use. This must accept three 

1030 positional arguments. First, an integer giving the maximum length 

1031 of the passphrase it may return. If the returned passphrase is 

1032 longer than this, it will be truncated. Second, a boolean value 

1033 which will be true if the user should be prompted for the 

1034 passphrase twice and the callback should verify that the two values 

1035 supplied are equal. Third, the value given as the *userdata* 

1036 parameter to :meth:`set_passwd_cb`. The *callback* must return 

1037 a byte string. If an error occurs, *callback* should return a false 

1038 value (e.g. an empty string). 

1039 :param userdata: (optional) A Python object which will be given as 

1040 argument to the callback 

1041 :return: None 

1042 """ 

1043 if not callable(callback): 

1044 raise TypeError("callback must be callable") 

1045 

1046 self._passphrase_helper = self._wrap_callback(callback) 

1047 self._passphrase_callback = self._passphrase_helper.callback 

1048 _lib.SSL_CTX_set_default_passwd_cb( 

1049 self._context, self._passphrase_callback 

1050 ) 

1051 self._passphrase_userdata = userdata 

1052 

1053 @_require_not_used 

1054 def set_default_verify_paths(self) -> None: 

1055 """ 

1056 Specify that the platform provided CA certificates are to be used for 

1057 verification purposes. This method has some caveats related to the 

1058 binary wheels that cryptography (pyOpenSSL's primary dependency) ships: 

1059 

1060 * macOS will only load certificates using this method if the user has 

1061 the ``openssl@3`` `Homebrew <https://brew.sh>`_ formula installed 

1062 in the default location. 

1063 * Windows will not work. 

1064 * manylinux cryptography wheels will work on most common Linux 

1065 distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the 

1066 manylinux wheel and attempts to load roots via a fallback path. 

1067 

1068 :return: None 

1069 """ 

1070 # SSL_CTX_set_default_verify_paths will attempt to load certs from 

1071 # both a cafile and capath that are set at compile time. However, 

1072 # it will first check environment variables and, if present, load 

1073 # those paths instead 

1074 set_result = _lib.SSL_CTX_set_default_verify_paths(self._context) 

1075 _openssl_assert(set_result == 1) 

1076 # After attempting to set default_verify_paths we need to know whether 

1077 # to go down the fallback path. 

1078 # First we'll check to see if any env vars have been set. If so, 

1079 # we won't try to do anything else because the user has set the path 

1080 # themselves. 

1081 if not self._check_env_vars_set("SSL_CERT_DIR", "SSL_CERT_FILE"): 

1082 default_dir = _ffi.string(_lib.X509_get_default_cert_dir()) 

1083 default_file = _ffi.string(_lib.X509_get_default_cert_file()) 

1084 # Now we check to see if the default_dir and default_file are set 

1085 # to the exact values we use in our manylinux builds. If they are 

1086 # then we know to load the fallbacks 

1087 if ( 

1088 default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR 

1089 and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE 

1090 ): 

1091 # This is manylinux, let's load our fallback paths 

1092 self._fallback_default_verify_paths( 

1093 _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS 

1094 ) 

1095 

1096 def _check_env_vars_set(self, dir_env_var: str, file_env_var: str) -> bool: 

1097 """ 

1098 Check to see if the default cert dir/file environment vars are present. 

1099 

1100 :return: bool 

1101 """ 

1102 return ( 

1103 os.environ.get(file_env_var) is not None 

1104 or os.environ.get(dir_env_var) is not None 

1105 ) 

1106 

1107 def _fallback_default_verify_paths( 

1108 self, file_path: list[str], dir_path: list[str] 

1109 ) -> None: 

1110 """ 

1111 Default verify paths are based on the compiled version of OpenSSL. 

1112 However, when pyca/cryptography is compiled as a manylinux wheel 

1113 that compiled location can potentially be wrong. So, like Go, we 

1114 will try a predefined set of paths and attempt to load roots 

1115 from there. 

1116 

1117 :return: None 

1118 """ 

1119 for cafile in file_path: 

1120 if os.path.isfile(cafile): 

1121 self.load_verify_locations(cafile) 

1122 break 

1123 

1124 for capath in dir_path: 

1125 if os.path.isdir(capath): 

1126 self.load_verify_locations(None, capath) 

1127 break 

1128 

1129 @_require_not_used 

1130 def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None: 

1131 """ 

1132 Load a certificate chain from a file. 

1133 

1134 :param certfile: The name of the certificate chain file (``bytes`` or 

1135 ``str``). Must be PEM encoded. 

1136 

1137 :return: None 

1138 """ 

1139 certfile = _path_bytes(certfile) 

1140 

1141 result = _lib.SSL_CTX_use_certificate_chain_file( 

1142 self._context, certfile 

1143 ) 

1144 if not result: 

1145 _raise_current_error() 

1146 

1147 @_require_not_used 

1148 def use_certificate_file( 

1149 self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM 

1150 ) -> None: 

1151 """ 

1152 Load a certificate from a file 

1153 

1154 :param certfile: The name of the certificate file (``bytes`` or 

1155 ``str``). 

1156 :param filetype: (optional) The encoding of the file, which is either 

1157 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is 

1158 :const:`FILETYPE_PEM`. 

1159 

1160 :return: None 

1161 """ 

1162 certfile = _path_bytes(certfile) 

1163 if not isinstance(filetype, int): 

1164 raise TypeError("filetype must be an integer") 

1165 

1166 use_result = _lib.SSL_CTX_use_certificate_file( 

1167 self._context, certfile, filetype 

1168 ) 

1169 if not use_result: 

1170 _raise_current_error() 

1171 

1172 @_require_not_used 

1173 def use_certificate(self, cert: X509 | x509.Certificate) -> None: 

1174 """ 

1175 Load a certificate from a X509 object 

1176 

1177 :param cert: The X509 object 

1178 :return: None 

1179 """ 

1180 # Mirrored at Connection.use_certificate 

1181 if not isinstance(cert, X509): 

1182 cert = X509.from_cryptography(cert) 

1183 else: 

1184 warnings.warn( 

1185 ( 

1186 "Passing pyOpenSSL X509 objects is deprecated. You " 

1187 "should use a cryptography.x509.Certificate instead." 

1188 ), 

1189 DeprecationWarning, 

1190 stacklevel=2, 

1191 ) 

1192 

1193 use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509) 

1194 if not use_result: 

1195 _raise_current_error() 

1196 

1197 @_require_not_used 

1198 def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None: 

1199 """ 

1200 Add certificate to chain 

1201 

1202 :param certobj: The X509 certificate object to add to the chain 

1203 :return: None 

1204 """ 

1205 if not isinstance(certobj, X509): 

1206 certobj = X509.from_cryptography(certobj) 

1207 else: 

1208 warnings.warn( 

1209 ( 

1210 "Passing pyOpenSSL X509 objects is deprecated. You " 

1211 "should use a cryptography.x509.Certificate instead." 

1212 ), 

1213 DeprecationWarning, 

1214 stacklevel=2, 

1215 ) 

1216 

1217 copy = _lib.X509_dup(certobj._x509) 

1218 add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy) 

1219 if not add_result: 

1220 # TODO: This is untested. 

1221 _lib.X509_free(copy) 

1222 _raise_current_error() 

1223 

1224 def _raise_passphrase_exception(self) -> None: 

1225 if self._passphrase_helper is not None: 

1226 self._passphrase_helper.raise_if_problem(Error) 

1227 

1228 _raise_current_error() 

1229 

1230 @_require_not_used 

1231 def use_privatekey_file( 

1232 self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM 

1233 ) -> None: 

1234 """ 

1235 Load a private key from a file 

1236 

1237 :param keyfile: The name of the key file (``bytes`` or ``str``) 

1238 :param filetype: (optional) The encoding of the file, which is either 

1239 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is 

1240 :const:`FILETYPE_PEM`. 

1241 

1242 :return: None 

1243 """ 

1244 keyfile = _path_bytes(keyfile) 

1245 

1246 if not isinstance(filetype, int): 

1247 raise TypeError("filetype must be an integer") 

1248 

1249 use_result = _lib.SSL_CTX_use_PrivateKey_file( 

1250 self._context, keyfile, filetype 

1251 ) 

1252 if not use_result: 

1253 self._raise_passphrase_exception() 

1254 

1255 @_require_not_used 

1256 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: 

1257 """ 

1258 Load a private key from a PKey object 

1259 

1260 :param pkey: The PKey object 

1261 :return: None 

1262 """ 

1263 # Mirrored at Connection.use_privatekey 

1264 if not isinstance(pkey, PKey): 

1265 pkey = PKey.from_cryptography_key(pkey) 

1266 else: 

1267 warnings.warn( 

1268 ( 

1269 "Passing pyOpenSSL PKey objects is deprecated. You " 

1270 "should use a cryptography private key instead." 

1271 ), 

1272 DeprecationWarning, 

1273 stacklevel=2, 

1274 ) 

1275 

1276 use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey) 

1277 if not use_result: 

1278 self._raise_passphrase_exception() 

1279 

1280 def check_privatekey(self) -> None: 

1281 """ 

1282 Check if the private key (loaded with :meth:`use_privatekey`) matches 

1283 the certificate (loaded with :meth:`use_certificate`) 

1284 

1285 :return: :data:`None` (raises :exc:`Error` if something's wrong) 

1286 """ 

1287 if not _lib.SSL_CTX_check_private_key(self._context): 

1288 _raise_current_error() 

1289 

1290 @_require_not_used 

1291 def load_client_ca(self, cafile: bytes) -> None: 

1292 """ 

1293 Load the trusted certificates that will be sent to the client. Does 

1294 not actually imply any of the certificates are trusted; that must be 

1295 configured separately. 

1296 

1297 :param bytes cafile: The path to a certificates file in PEM format. 

1298 :return: None 

1299 """ 

1300 ca_list = _lib.SSL_load_client_CA_file( 

1301 _text_to_bytes_and_warn("cafile", cafile) 

1302 ) 

1303 _openssl_assert(ca_list != _ffi.NULL) 

1304 _lib.SSL_CTX_set_client_CA_list(self._context, ca_list) 

1305 

1306 @_require_not_used 

1307 def set_session_id(self, buf: bytes) -> None: 

1308 """ 

1309 Set the session id to *buf* within which a session can be reused for 

1310 this Context object. This is needed when doing session resumption, 

1311 because there is no way for a stored session to know which Context 

1312 object it is associated with. 

1313 

1314 :param bytes buf: The session id. 

1315 

1316 :returns: None 

1317 """ 

1318 buf = _text_to_bytes_and_warn("buf", buf) 

1319 _openssl_assert( 

1320 _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf)) 

1321 == 1 

1322 ) 

1323 

1324 @_require_not_used 

1325 def set_session_cache_mode(self, mode: int) -> int: 

1326 """ 

1327 Set the behavior of the session cache used by all connections using 

1328 this Context. The previously set mode is returned. See 

1329 :const:`SESS_CACHE_*` for details about particular modes. 

1330 

1331 :param mode: One or more of the SESS_CACHE_* flags (combine using 

1332 bitwise or) 

1333 :returns: The previously set caching mode. 

1334 

1335 .. versionadded:: 0.14 

1336 """ 

1337 if not isinstance(mode, int): 

1338 raise TypeError("mode must be an integer") 

1339 

1340 return _lib.SSL_CTX_set_session_cache_mode(self._context, mode) 

1341 

1342 def get_session_cache_mode(self) -> int: 

1343 """ 

1344 Get the current session cache mode. 

1345 

1346 :returns: The currently used cache mode. 

1347 

1348 .. versionadded:: 0.14 

1349 """ 

1350 return _lib.SSL_CTX_get_session_cache_mode(self._context) 

1351 

1352 @_require_not_used 

1353 def set_verify( 

1354 self, mode: int, callback: _VerifyCallback | None = None 

1355 ) -> None: 

1356 """ 

1357 Set the verification flags for this Context object to *mode* and 

1358 specify that *callback* should be used for verification callbacks. 

1359 

1360 :param mode: The verify mode, this should be one of 

1361 :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If 

1362 :const:`VERIFY_PEER` is used, *mode* can be OR:ed with 

1363 :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and 

1364 :const:`VERIFY_CLIENT_ONCE` to further control the behaviour. 

1365 :param callback: The optional Python verification callback to use. 

1366 This should take five arguments: A Connection object, an X509 

1367 object, and three integer variables, which are in turn potential 

1368 error number, error depth and return code. *callback* should 

1369 return True if verification passes and False otherwise. 

1370 If omitted, OpenSSL's default verification is used. 

1371 :return: None 

1372 

1373 See SSL_CTX_set_verify(3SSL) for further details. 

1374 """ 

1375 if not isinstance(mode, int): 

1376 raise TypeError("mode must be an integer") 

1377 

1378 if callback is None: 

1379 self._verify_helper = None 

1380 self._verify_callback = None 

1381 _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL) 

1382 else: 

1383 if not callable(callback): 

1384 raise TypeError("callback must be callable") 

1385 

1386 self._verify_helper = _VerifyHelper(callback) 

1387 self._verify_callback = self._verify_helper.callback 

1388 _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback) 

1389 

1390 @_require_not_used 

1391 def set_verify_depth(self, depth: int) -> None: 

1392 """ 

1393 Set the maximum depth for the certificate chain verification that shall 

1394 be allowed for this Context object. 

1395 

1396 :param depth: An integer specifying the verify depth 

1397 :return: None 

1398 """ 

1399 if not isinstance(depth, int): 

1400 raise TypeError("depth must be an integer") 

1401 

1402 _lib.SSL_CTX_set_verify_depth(self._context, depth) 

1403 

1404 def get_verify_mode(self) -> int: 

1405 """ 

1406 Retrieve the Context object's verify mode, as set by 

1407 :meth:`set_verify`. 

1408 

1409 :return: The verify mode 

1410 """ 

1411 return _lib.SSL_CTX_get_verify_mode(self._context) 

1412 

1413 def get_verify_depth(self) -> int: 

1414 """ 

1415 Retrieve the Context object's verify depth, as set by 

1416 :meth:`set_verify_depth`. 

1417 

1418 :return: The verify depth 

1419 """ 

1420 return _lib.SSL_CTX_get_verify_depth(self._context) 

1421 

1422 @_require_not_used 

1423 def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None: 

1424 """ 

1425 Load parameters for Ephemeral Diffie-Hellman 

1426 

1427 :param dhfile: The file to load EDH parameters from (``bytes`` or 

1428 ``str``). 

1429 

1430 :return: None 

1431 """ 

1432 dhfile = _path_bytes(dhfile) 

1433 

1434 bio = _lib.BIO_new_file(dhfile, b"r") 

1435 if bio == _ffi.NULL: 

1436 _raise_current_error() 

1437 bio = _ffi.gc(bio, _lib.BIO_free) 

1438 

1439 dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

1440 dh = _ffi.gc(dh, _lib.DH_free) 

1441 res = _lib.SSL_CTX_set_tmp_dh(self._context, dh) 

1442 _openssl_assert(res == 1) 

1443 

1444 @_require_not_used 

1445 def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None: 

1446 """ 

1447 Select a curve to use for ECDHE key exchange. 

1448 

1449 :param curve: A curve instance from cryptography 

1450 (:class:`~cryptogragraphy.hazmat.primitives.asymmetric.ec.EllipticCurve`). 

1451 Alternatively (deprecated) a curve object from either 

1452 :meth:`OpenSSL.crypto.get_elliptic_curve` or 

1453 :meth:`OpenSSL.crypto.get_elliptic_curves`. 

1454 

1455 :return: None 

1456 """ 

1457 

1458 if isinstance(curve, _EllipticCurve): 

1459 warnings.warn( 

1460 ( 

1461 "Passing pyOpenSSL elliptic curves to set_tmp_ecdh is " 

1462 "deprecated. You should use cryptography's elliptic curve " 

1463 "types instead." 

1464 ), 

1465 DeprecationWarning, 

1466 stacklevel=2, 

1467 ) 

1468 _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY()) 

1469 else: 

1470 name = curve.name 

1471 if name == "secp192r1": 

1472 name = "prime192v1" 

1473 elif name == "secp256r1": 

1474 name = "prime256v1" 

1475 nid = _lib.OBJ_txt2nid(name.encode()) 

1476 if nid == _lib.NID_undef: 

1477 _raise_current_error() 

1478 

1479 ec = _lib.EC_KEY_new_by_curve_name(nid) 

1480 _openssl_assert(ec != _ffi.NULL) 

1481 ec = _ffi.gc(ec, _lib.EC_KEY_free) 

1482 _lib.SSL_CTX_set_tmp_ecdh(self._context, ec) 

1483 

1484 @_require_not_used 

1485 def set_cipher_list(self, cipher_list: bytes) -> None: 

1486 """ 

1487 Set the list of ciphers to be used in this context. 

1488 

1489 See the OpenSSL manual for more information (e.g. 

1490 :manpage:`ciphers(1)`). 

1491 

1492 Note this API does not change the cipher suites used in TLS 1.3 

1493 Use `set_tls13_ciphersuites` for that. 

1494 

1495 :param bytes cipher_list: An OpenSSL cipher string. 

1496 :return: None 

1497 """ 

1498 cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list) 

1499 

1500 if not isinstance(cipher_list, bytes): 

1501 raise TypeError("cipher_list must be a byte string.") 

1502 

1503 _openssl_assert( 

1504 _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1 

1505 ) 

1506 

1507 @_require_not_used 

1508 def set_tls13_ciphersuites(self, ciphersuites: bytes) -> None: 

1509 """ 

1510 Set the list of TLS 1.3 ciphers to be used in this context. 

1511 OpenSSL maintains a separate list of TLS 1.3+ ciphers to 

1512 ciphers for TLS 1.2 and lowers. 

1513 

1514 See the OpenSSL manual for more information (e.g. 

1515 :manpage:`ciphers(1)`). 

1516 

1517 :param bytes ciphersuites: An OpenSSL cipher string containing 

1518 TLS 1.3+ ciphersuites. 

1519 :return: None 

1520 

1521 .. versionadded:: 25.2.0 

1522 """ 

1523 if not isinstance(ciphersuites, bytes): 

1524 raise TypeError("ciphersuites must be a byte string.") 

1525 

1526 _openssl_assert( 

1527 _lib.SSL_CTX_set_ciphersuites(self._context, ciphersuites) == 1 

1528 ) 

1529 

1530 @_require_not_used 

1531 def set_client_ca_list( 

1532 self, certificate_authorities: Sequence[X509Name] 

1533 ) -> None: 

1534 """ 

1535 Set the list of preferred client certificate signers for this server 

1536 context. 

1537 

1538 This list of certificate authorities will be sent to the client when 

1539 the server requests a client certificate. 

1540 

1541 :param certificate_authorities: a sequence of X509Names. 

1542 :return: None 

1543 

1544 .. versionadded:: 0.10 

1545 """ 

1546 name_stack = _lib.sk_X509_NAME_new_null() 

1547 _openssl_assert(name_stack != _ffi.NULL) 

1548 

1549 try: 

1550 for ca_name in certificate_authorities: 

1551 if not isinstance(ca_name, X509Name): 

1552 raise TypeError( 

1553 f"client CAs must be X509Name objects, not " 

1554 f"{type(ca_name).__name__} objects" 

1555 ) 

1556 copy = _lib.X509_NAME_dup(ca_name._name) 

1557 _openssl_assert(copy != _ffi.NULL) 

1558 push_result = _lib.sk_X509_NAME_push(name_stack, copy) 

1559 if not push_result: 

1560 _lib.X509_NAME_free(copy) 

1561 _raise_current_error() 

1562 except Exception: 

1563 _lib.sk_X509_NAME_free(name_stack) 

1564 raise 

1565 

1566 _lib.SSL_CTX_set_client_CA_list(self._context, name_stack) 

1567 

1568 @_require_not_used 

1569 def add_client_ca( 

1570 self, certificate_authority: X509 | x509.Certificate 

1571 ) -> None: 

1572 """ 

1573 Add the CA certificate to the list of preferred signers for this 

1574 context. 

1575 

1576 The list of certificate authorities will be sent to the client when the 

1577 server requests a client certificate. 

1578 

1579 :param certificate_authority: certificate authority's X509 certificate. 

1580 :return: None 

1581 

1582 .. versionadded:: 0.10 

1583 """ 

1584 if not isinstance(certificate_authority, X509): 

1585 certificate_authority = X509.from_cryptography( 

1586 certificate_authority 

1587 ) 

1588 else: 

1589 warnings.warn( 

1590 ( 

1591 "Passing pyOpenSSL X509 objects is deprecated. You " 

1592 "should use a cryptography.x509.Certificate instead." 

1593 ), 

1594 DeprecationWarning, 

1595 stacklevel=2, 

1596 ) 

1597 

1598 add_result = _lib.SSL_CTX_add_client_CA( 

1599 self._context, certificate_authority._x509 

1600 ) 

1601 _openssl_assert(add_result == 1) 

1602 

1603 @_require_not_used 

1604 def set_timeout(self, timeout: int) -> None: 

1605 """ 

1606 Set the timeout for newly created sessions for this Context object to 

1607 *timeout*. The default value is 300 seconds. See the OpenSSL manual 

1608 for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`). 

1609 

1610 :param timeout: The timeout in (whole) seconds 

1611 :return: The previous session timeout 

1612 """ 

1613 if not isinstance(timeout, int): 

1614 raise TypeError("timeout must be an integer") 

1615 

1616 return _lib.SSL_CTX_set_timeout(self._context, timeout) 

1617 

1618 def get_timeout(self) -> int: 

1619 """ 

1620 Retrieve session timeout, as set by :meth:`set_timeout`. The default 

1621 is 300 seconds. 

1622 

1623 :return: The session timeout 

1624 """ 

1625 return _lib.SSL_CTX_get_timeout(self._context) 

1626 

1627 @_require_not_used 

1628 def set_info_callback( 

1629 self, callback: Callable[[Connection, int, int], None] 

1630 ) -> None: 

1631 """ 

1632 Set the information callback to *callback*. This function will be 

1633 called from time to time during SSL handshakes. 

1634 

1635 :param callback: The Python callback to use. This should take three 

1636 arguments: a Connection object and two integers. The first integer 

1637 specifies where in the SSL handshake the function was called, and 

1638 the other the return code from a (possibly failed) internal 

1639 function call. 

1640 :return: None 

1641 """ 

1642 

1643 @wraps(callback) 

1644 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def] 

1645 callback(Connection._reverse_mapping[ssl], where, return_code) 

1646 

1647 self._info_callback = _ffi.callback( 

1648 "void (*)(const SSL *, int, int)", wrapper 

1649 ) 

1650 _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) 

1651 

1652 @_requires_keylog 

1653 @_require_not_used 

1654 def set_keylog_callback( 

1655 self, callback: Callable[[Connection, bytes], None] 

1656 ) -> None: 

1657 """ 

1658 Set the TLS key logging callback to *callback*. This function will be 

1659 called whenever TLS key material is generated or received, in order 

1660 to allow applications to store this keying material for debugging 

1661 purposes. 

1662 

1663 :param callback: The Python callback to use. This should take two 

1664 arguments: a Connection object and a bytestring that contains 

1665 the key material in the format used by NSS for its SSLKEYLOGFILE 

1666 debugging output. 

1667 :return: None 

1668 """ 

1669 

1670 @wraps(callback) 

1671 def wrapper(ssl, line): # type: ignore[no-untyped-def] 

1672 line = _ffi.string(line) 

1673 callback(Connection._reverse_mapping[ssl], line) 

1674 

1675 self._keylog_callback = _ffi.callback( 

1676 "void (*)(const SSL *, const char *)", wrapper 

1677 ) 

1678 _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback) 

1679 

1680 def get_app_data(self) -> Any: 

1681 """ 

1682 Get the application data (supplied via :meth:`set_app_data()`) 

1683 

1684 :return: The application data 

1685 """ 

1686 return self._app_data 

1687 

1688 @_require_not_used 

1689 def set_app_data(self, data: Any) -> None: 

1690 """ 

1691 Set the application data (will be returned from get_app_data()) 

1692 

1693 :param data: Any Python object 

1694 :return: None 

1695 """ 

1696 self._app_data = data 

1697 

1698 def get_cert_store(self) -> X509Store | None: 

1699 """ 

1700 Get the certificate store for the context. This can be used to add 

1701 "trusted" certificates without using the 

1702 :meth:`load_verify_locations` method. 

1703 

1704 :return: A X509Store object or None if it does not have one. 

1705 """ 

1706 store = _lib.SSL_CTX_get_cert_store(self._context) 

1707 if store == _ffi.NULL: 

1708 # TODO: This is untested. 

1709 return None 

1710 

1711 pystore = X509Store.__new__(X509Store) 

1712 pystore._store = store 

1713 return pystore 

1714 

1715 @_require_not_used 

1716 def set_options(self, options: int) -> int: 

1717 """ 

1718 Add options. Options set before are not cleared! 

1719 This method should be used with the :const:`OP_*` constants. 

1720 

1721 :param options: The options to add. 

1722 :return: The new option bitmask. 

1723 """ 

1724 if not isinstance(options, int): 

1725 raise TypeError("options must be an integer") 

1726 

1727 return _lib.SSL_CTX_set_options(self._context, options) 

1728 

1729 @_require_not_used 

1730 def set_mode(self, mode: int) -> int: 

1731 """ 

1732 Add modes via bitmask. Modes set before are not cleared! This method 

1733 should be used with the :const:`MODE_*` constants. 

1734 

1735 :param mode: The mode to add. 

1736 :return: The new mode bitmask. 

1737 """ 

1738 if not isinstance(mode, int): 

1739 raise TypeError("mode must be an integer") 

1740 

1741 return _lib.SSL_CTX_set_mode(self._context, mode) 

1742 

1743 @_require_not_used 

1744 def clear_mode(self, mode_to_clear: int) -> int: 

1745 """ 

1746 Modes previously set cannot be overwritten without being 

1747 cleared first. This method should be used to clear existing modes. 

1748 """ 

1749 return _lib.SSL_CTX_clear_mode(self._context, mode_to_clear) 

1750 

1751 @_require_not_used 

1752 def set_tlsext_servername_callback( 

1753 self, callback: Callable[[Connection], None] 

1754 ) -> None: 

1755 """ 

1756 Specify a callback function to be called when clients specify a server 

1757 name. 

1758 

1759 :param callback: The callback function. It will be invoked with one 

1760 argument, the Connection instance. 

1761 

1762 .. versionadded:: 0.13 

1763 """ 

1764 

1765 @wraps(callback) 

1766 def wrapper(ssl, alert, arg): # type: ignore[no-untyped-def] 

1767 try: 

1768 callback(Connection._reverse_mapping[ssl]) 

1769 except Exception: 

1770 sys.excepthook(*sys.exc_info()) 

1771 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL 

1772 return 0 

1773 

1774 self._tlsext_servername_callback = _ffi.callback( 

1775 "int (*)(SSL *, int *, void *)", wrapper 

1776 ) 

1777 _lib.SSL_CTX_set_tlsext_servername_callback( 

1778 self._context, self._tlsext_servername_callback 

1779 ) 

1780 

1781 @_require_not_used 

1782 def set_tlsext_use_srtp(self, profiles: bytes) -> None: 

1783 """ 

1784 Enable support for negotiating SRTP keying material. 

1785 

1786 :param bytes profiles: A colon delimited list of protection profile 

1787 names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``. 

1788 :return: None 

1789 """ 

1790 if not isinstance(profiles, bytes): 

1791 raise TypeError("profiles must be a byte string.") 

1792 

1793 _openssl_assert( 

1794 _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0 

1795 ) 

1796 

1797 @_require_not_used 

1798 def set_alpn_protos(self, protos: list[bytes]) -> None: 

1799 """ 

1800 Specify the protocols that the client is prepared to speak after the 

1801 TLS connection has been negotiated using Application Layer Protocol 

1802 Negotiation. 

1803 

1804 :param protos: A list of the protocols to be offered to the server. 

1805 This list should be a Python list of bytestrings representing the 

1806 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. 

1807 """ 

1808 # Different versions of OpenSSL are inconsistent about how they handle 

1809 # empty proto lists (see #1043), so we avoid the problem entirely by 

1810 # rejecting them ourselves. 

1811 if not protos: 

1812 raise ValueError("at least one protocol must be specified") 

1813 

1814 # Take the list of protocols and join them together, prefixing them 

1815 # with their lengths. 

1816 protostr = b"".join( 

1817 chain.from_iterable((bytes((len(p),)), p) for p in protos) 

1818 ) 

1819 

1820 # Build a C string from the list. We don't need to save this off 

1821 # because OpenSSL immediately copies the data out. 

1822 input_str = _ffi.new("unsigned char[]", protostr) 

1823 

1824 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: 

1825 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() 

1826 # return 0 on success, and non-0 on failure. 

1827 # WARNING: these functions reverse the return value convention. 

1828 _openssl_assert( 

1829 _lib.SSL_CTX_set_alpn_protos( 

1830 self._context, input_str, len(protostr) 

1831 ) 

1832 == 0 

1833 ) 

1834 

1835 @_require_not_used 

1836 def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None: 

1837 """ 

1838 Specify a callback function that will be called on the server when a 

1839 client offers protocols using ALPN. 

1840 

1841 :param callback: The callback function. It will be invoked with two 

1842 arguments: the Connection, and a list of offered protocols as 

1843 bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return 

1844 one of those bytestrings to indicate the chosen protocol, the 

1845 empty bytestring to terminate the TLS connection, or the 

1846 :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered 

1847 protocol was selected, but that the connection should not be 

1848 aborted. 

1849 """ 

1850 self._alpn_select_helper = _ALPNSelectHelper(callback) 

1851 self._alpn_select_callback = self._alpn_select_helper.callback 

1852 _lib.SSL_CTX_set_alpn_select_cb( 

1853 self._context, self._alpn_select_callback, _ffi.NULL 

1854 ) 

1855 

1856 def _set_ocsp_callback( 

1857 self, 

1858 helper: _OCSPClientCallbackHelper | _OCSPServerCallbackHelper, 

1859 data: Any | None, 

1860 ) -> None: 

1861 """ 

1862 This internal helper does the common work for 

1863 ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is 

1864 almost all of it. 

1865 """ 

1866 self._ocsp_helper = helper 

1867 self._ocsp_callback = helper.callback 

1868 if data is None: 

1869 self._ocsp_data = _ffi.NULL 

1870 else: 

1871 self._ocsp_data = _ffi.new_handle(data) 

1872 

1873 rc = _lib.SSL_CTX_set_tlsext_status_cb( 

1874 self._context, self._ocsp_callback 

1875 ) 

1876 _openssl_assert(rc == 1) 

1877 rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data) 

1878 _openssl_assert(rc == 1) 

1879 

1880 @_require_not_used 

1881 def set_ocsp_server_callback( 

1882 self, 

1883 callback: _OCSPServerCallback[_T], 

1884 data: _T | None = None, 

1885 ) -> None: 

1886 """ 

1887 Set a callback to provide OCSP data to be stapled to the TLS handshake 

1888 on the server side. 

1889 

1890 :param callback: The callback function. It will be invoked with two 

1891 arguments: the Connection, and the optional arbitrary data you have 

1892 provided. The callback must return a bytestring that contains the 

1893 OCSP data to staple to the handshake. If no OCSP data is available 

1894 for this connection, return the empty bytestring. 

1895 :param data: Some opaque data that will be passed into the callback 

1896 function when called. This can be used to avoid needing to do 

1897 complex data lookups or to keep track of what context is being 

1898 used. This parameter is optional. 

1899 """ 

1900 helper = _OCSPServerCallbackHelper(callback) 

1901 self._set_ocsp_callback(helper, data) 

1902 

1903 @_require_not_used 

1904 def set_ocsp_client_callback( 

1905 self, 

1906 callback: _OCSPClientCallback[_T], 

1907 data: _T | None = None, 

1908 ) -> None: 

1909 """ 

1910 Set a callback to validate OCSP data stapled to the TLS handshake on 

1911 the client side. 

1912 

1913 :param callback: The callback function. It will be invoked with three 

1914 arguments: the Connection, a bytestring containing the stapled OCSP 

1915 assertion, and the optional arbitrary data you have provided. The 

1916 callback must return a boolean that indicates the result of 

1917 validating the OCSP data: ``True`` if the OCSP data is valid and 

1918 the certificate can be trusted, or ``False`` if either the OCSP 

1919 data is invalid or the certificate has been revoked. 

1920 :param data: Some opaque data that will be passed into the callback 

1921 function when called. This can be used to avoid needing to do 

1922 complex data lookups or to keep track of what context is being 

1923 used. This parameter is optional. 

1924 """ 

1925 helper = _OCSPClientCallbackHelper(callback) 

1926 self._set_ocsp_callback(helper, data) 

1927 

1928 @_require_not_used 

1929 @_requires_ssl_cookie 

1930 def set_cookie_generate_callback( 

1931 self, callback: _CookieGenerateCallback 

1932 ) -> None: 

1933 self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback) 

1934 _lib.SSL_CTX_set_cookie_generate_cb( 

1935 self._context, 

1936 self._cookie_generate_helper.callback, 

1937 ) 

1938 

1939 @_require_not_used 

1940 @_requires_ssl_cookie 

1941 def set_cookie_verify_callback( 

1942 self, callback: _CookieVerifyCallback 

1943 ) -> None: 

1944 self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback) 

1945 _lib.SSL_CTX_set_cookie_verify_cb( 

1946 self._context, 

1947 self._cookie_verify_helper.callback, 

1948 ) 

1949 

1950 

1951class Connection: 

1952 _reverse_mapping: typing.MutableMapping[Any, Connection] = ( 

1953 WeakValueDictionary() 

1954 ) 

1955 

1956 def __init__( 

1957 self, context: Context, socket: socket.socket | None = None 

1958 ) -> None: 

1959 """ 

1960 Create a new Connection object, using the given OpenSSL.SSL.Context 

1961 instance and socket. 

1962 

1963 :param context: An SSL Context to use for this connection 

1964 :param socket: The socket to use for transport layer 

1965 """ 

1966 if not isinstance(context, Context): 

1967 raise TypeError("context must be a Context instance") 

1968 

1969 context._used = True 

1970 

1971 ssl = _lib.SSL_new(context._context) 

1972 self._ssl = _ffi.gc(ssl, _lib.SSL_free) 

1973 # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns 

1974 # an SSL_ERROR_WANT_READ when processing a non-application data packet 

1975 # even though there is still data on the underlying transport. 

1976 # See https://github.com/openssl/openssl/issues/6234 for more details. 

1977 _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY) 

1978 self._context = context 

1979 self._app_data = None 

1980 

1981 # References to strings used for Application Layer Protocol 

1982 # Negotiation. These strings get copied at some point but it's well 

1983 # after the callback returns, so we have to hang them somewhere to 

1984 # avoid them getting freed. 

1985 self._alpn_select_callback_args: Any = None 

1986 

1987 # Reference the verify_callback of the Context. This ensures that if 

1988 # set_verify is called again after the SSL object has been created we 

1989 # do not point to a dangling reference 

1990 self._verify_helper = context._verify_helper 

1991 self._verify_callback = context._verify_callback 

1992 

1993 # And likewise for the cookie callbacks 

1994 self._cookie_generate_helper = context._cookie_generate_helper 

1995 self._cookie_verify_helper = context._cookie_verify_helper 

1996 

1997 self._reverse_mapping[self._ssl] = self 

1998 

1999 if socket is None: 

2000 self._socket = None 

2001 # Don't set up any gc for these, SSL_free will take care of them. 

2002 self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem()) 

2003 _openssl_assert(self._into_ssl != _ffi.NULL) 

2004 

2005 self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem()) 

2006 _openssl_assert(self._from_ssl != _ffi.NULL) 

2007 

2008 _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl) 

2009 else: 

2010 self._into_ssl = None 

2011 self._from_ssl = None 

2012 self._socket = socket 

2013 set_result = _lib.SSL_set_fd( 

2014 self._ssl, _asFileDescriptor(self._socket) 

2015 ) 

2016 _openssl_assert(set_result == 1) 

2017 

2018 def __getattr__(self, name: str) -> Any: 

2019 """ 

2020 Look up attributes on the wrapped socket object if they are not found 

2021 on the Connection object. 

2022 """ 

2023 if self._socket is None: 

2024 raise AttributeError( 

2025 f"'{self.__class__.__name__}' object has no attribute '{name}'" 

2026 ) 

2027 else: 

2028 return getattr(self._socket, name) 

2029 

2030 def _raise_ssl_error(self, ssl: Any, result: int) -> None: 

2031 if self._context._verify_helper is not None: 

2032 self._context._verify_helper.raise_if_problem() 

2033 if self._context._alpn_select_helper is not None: 

2034 self._context._alpn_select_helper.raise_if_problem() 

2035 if self._context._ocsp_helper is not None: 

2036 self._context._ocsp_helper.raise_if_problem() 

2037 

2038 error = _lib.SSL_get_error(ssl, result) 

2039 if error == _lib.SSL_ERROR_WANT_READ: 

2040 raise WantReadError() 

2041 elif error == _lib.SSL_ERROR_WANT_WRITE: 

2042 raise WantWriteError() 

2043 elif error == _lib.SSL_ERROR_ZERO_RETURN: 

2044 raise ZeroReturnError() 

2045 elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP: 

2046 # TODO: This is untested. 

2047 raise WantX509LookupError() 

2048 elif error == _lib.SSL_ERROR_SYSCALL: 

2049 if platform == "win32": 

2050 errno = _ffi.getwinerror()[0] 

2051 else: 

2052 errno = _ffi.errno 

2053 if _lib.ERR_peek_error() == 0 or errno != 0: 

2054 if result < 0 and errno != 0: 

2055 raise SysCallError(errno, errorcode.get(errno)) 

2056 raise SysCallError(-1, "Unexpected EOF") 

2057 else: 

2058 # TODO: This is untested, but I think twisted hits it? 

2059 _raise_current_error() 

2060 elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0: 

2061 # In 3.0.x an unexpected EOF no longer triggers syscall error 

2062 # but we want to maintain compatibility so we check here and 

2063 # raise syscall if it is an EOF. Since we're not actually sure 

2064 # what else could raise SSL_ERROR_SSL we check for the presence 

2065 # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING 

2066 # and if it's not present we just raise an error, which matches 

2067 # the behavior before we added this elif section 

2068 peeked_error = _lib.ERR_peek_error() 

2069 reason = _lib.ERR_GET_REASON(peeked_error) 

2070 if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING: 

2071 _openssl_assert( 

2072 reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING 

2073 ) 

2074 _lib.ERR_clear_error() 

2075 raise SysCallError(-1, "Unexpected EOF") 

2076 else: 

2077 _raise_current_error() 

2078 elif error == _lib.SSL_ERROR_NONE: 

2079 pass 

2080 else: 

2081 _raise_current_error() 

2082 

2083 def get_context(self) -> Context: 

2084 """ 

2085 Retrieve the :class:`Context` object associated with this 

2086 :class:`Connection`. 

2087 """ 

2088 return self._context 

2089 

2090 def set_context(self, context: Context) -> None: 

2091 """ 

2092 Switch this connection to a new session context. 

2093 

2094 :param context: A :class:`Context` instance giving the new session 

2095 context to use. 

2096 """ 

2097 if not isinstance(context, Context): 

2098 raise TypeError("context must be a Context instance") 

2099 

2100 _lib.SSL_set_SSL_CTX(self._ssl, context._context) 

2101 self._context = context 

2102 self._context._used = True 

2103 

2104 def set_options(self, options: int) -> int: 

2105 """ 

2106 Add options. Options set before are not cleared! 

2107 This method should be used with the :const:`OP_*` constants. 

2108 

2109 :param options: The options to add. 

2110 :return: The new option bitmask. 

2111 """ 

2112 if not isinstance(options, int): 

2113 raise TypeError("options must be an integer") 

2114 

2115 return _lib.SSL_set_options(self._ssl, options) 

2116 

2117 def get_servername(self) -> bytes | None: 

2118 """ 

2119 Retrieve the servername extension value if provided in the client hello 

2120 message, or None if there wasn't one. 

2121 

2122 :return: A byte string giving the server name or :data:`None`. 

2123 

2124 .. versionadded:: 0.13 

2125 """ 

2126 name = _lib.SSL_get_servername( 

2127 self._ssl, _lib.TLSEXT_NAMETYPE_host_name 

2128 ) 

2129 if name == _ffi.NULL: 

2130 return None 

2131 

2132 return _ffi.string(name) 

2133 

2134 def set_verify( 

2135 self, mode: int, callback: _VerifyCallback | None = None 

2136 ) -> None: 

2137 """ 

2138 Override the Context object's verification flags for this specific 

2139 connection. See :py:meth:`Context.set_verify` for details. 

2140 """ 

2141 if not isinstance(mode, int): 

2142 raise TypeError("mode must be an integer") 

2143 

2144 if callback is None: 

2145 self._verify_helper = None 

2146 self._verify_callback = None 

2147 _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL) 

2148 else: 

2149 if not callable(callback): 

2150 raise TypeError("callback must be callable") 

2151 

2152 self._verify_helper = _VerifyHelper(callback) 

2153 self._verify_callback = self._verify_helper.callback 

2154 _lib.SSL_set_verify(self._ssl, mode, self._verify_callback) 

2155 

2156 def get_verify_mode(self) -> int: 

2157 """ 

2158 Retrieve the Connection object's verify mode, as set by 

2159 :meth:`set_verify`. 

2160 

2161 :return: The verify mode 

2162 """ 

2163 return _lib.SSL_get_verify_mode(self._ssl) 

2164 

2165 def use_certificate(self, cert: X509 | x509.Certificate) -> None: 

2166 """ 

2167 Load a certificate from a X509 object 

2168 

2169 :param cert: The X509 object 

2170 :return: None 

2171 """ 

2172 # Mirrored from Context.use_certificate 

2173 if not isinstance(cert, X509): 

2174 cert = X509.from_cryptography(cert) 

2175 else: 

2176 warnings.warn( 

2177 ( 

2178 "Passing pyOpenSSL X509 objects is deprecated. You " 

2179 "should use a cryptography.x509.Certificate instead." 

2180 ), 

2181 DeprecationWarning, 

2182 stacklevel=2, 

2183 ) 

2184 

2185 use_result = _lib.SSL_use_certificate(self._ssl, cert._x509) 

2186 if not use_result: 

2187 _raise_current_error() 

2188 

2189 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: 

2190 """ 

2191 Load a private key from a PKey object 

2192 

2193 :param pkey: The PKey object 

2194 :return: None 

2195 """ 

2196 # Mirrored from Context.use_privatekey 

2197 if not isinstance(pkey, PKey): 

2198 pkey = PKey.from_cryptography_key(pkey) 

2199 else: 

2200 warnings.warn( 

2201 ( 

2202 "Passing pyOpenSSL PKey objects is deprecated. You " 

2203 "should use a cryptography private key instead." 

2204 ), 

2205 DeprecationWarning, 

2206 stacklevel=2, 

2207 ) 

2208 

2209 use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey) 

2210 if not use_result: 

2211 self._context._raise_passphrase_exception() 

2212 

2213 def set_ciphertext_mtu(self, mtu: int) -> None: 

2214 """ 

2215 For DTLS, set the maximum UDP payload size (*not* including IP/UDP 

2216 overhead). 

2217 

2218 Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent 

2219 OpenSSL from spontaneously clearing this. 

2220 

2221 :param mtu: An integer giving the maximum transmission unit. 

2222 

2223 .. versionadded:: 21.1 

2224 """ 

2225 _lib.SSL_set_mtu(self._ssl, mtu) 

2226 

2227 def get_cleartext_mtu(self) -> int: 

2228 """ 

2229 For DTLS, get the maximum size of unencrypted data you can pass to 

2230 :meth:`write` without exceeding the MTU (as passed to 

2231 :meth:`set_ciphertext_mtu`). 

2232 

2233 :return: The effective MTU as an integer. 

2234 

2235 .. versionadded:: 21.1 

2236 """ 

2237 

2238 if not hasattr(_lib, "DTLS_get_data_mtu"): 

2239 raise NotImplementedError("requires OpenSSL 1.1.1 or better") 

2240 return _lib.DTLS_get_data_mtu(self._ssl) 

2241 

2242 def set_tlsext_host_name(self, name: bytes) -> None: 

2243 """ 

2244 Set the value of the servername extension to send in the client hello. 

2245 

2246 :param name: A byte string giving the name. 

2247 

2248 .. versionadded:: 0.13 

2249 """ 

2250 if not isinstance(name, bytes): 

2251 raise TypeError("name must be a byte string") 

2252 elif b"\0" in name: 

2253 raise TypeError("name must not contain NUL byte") 

2254 

2255 # XXX I guess this can fail sometimes? 

2256 _lib.SSL_set_tlsext_host_name(self._ssl, name) 

2257 

2258 def pending(self) -> int: 

2259 """ 

2260 Get the number of bytes that can be safely read from the SSL buffer 

2261 (**not** the underlying transport buffer). 

2262 

2263 :return: The number of bytes available in the receive buffer. 

2264 """ 

2265 return _lib.SSL_pending(self._ssl) 

2266 

2267 def send(self, buf: _Buffer, flags: int = 0) -> int: 

2268 """ 

2269 Send data on the connection. NOTE: If you get one of the WantRead, 

2270 WantWrite or WantX509Lookup exceptions on this, you have to call the 

2271 method again with the SAME buffer. 

2272 

2273 :param buf: The string, buffer or memoryview to send 

2274 :param flags: (optional) Included for compatibility with the socket 

2275 API, the value is ignored 

2276 :return: The number of bytes written 

2277 """ 

2278 # Backward compatibility 

2279 buf = _text_to_bytes_and_warn("buf", buf) 

2280 

2281 with _ffi.from_buffer(buf) as data: 

2282 # check len(buf) instead of len(data) for testability 

2283 if len(buf) > 2147483647: 

2284 raise ValueError( 

2285 "Cannot send more than 2**31-1 bytes at once." 

2286 ) 

2287 

2288 result = _lib.SSL_write(self._ssl, data, len(data)) 

2289 self._raise_ssl_error(self._ssl, result) 

2290 

2291 return result 

2292 

2293 write = send 

2294 

2295 def sendall(self, buf: _Buffer, flags: int = 0) -> int: 

2296 """ 

2297 Send "all" data on the connection. This calls send() repeatedly until 

2298 all data is sent. If an error occurs, it's impossible to tell how much 

2299 data has been sent. 

2300 

2301 :param buf: The string, buffer or memoryview to send 

2302 :param flags: (optional) Included for compatibility with the socket 

2303 API, the value is ignored 

2304 :return: The number of bytes written 

2305 """ 

2306 buf = _text_to_bytes_and_warn("buf", buf) 

2307 

2308 with _ffi.from_buffer(buf) as data: 

2309 left_to_send = len(buf) 

2310 total_sent = 0 

2311 

2312 while left_to_send: 

2313 # SSL_write's num arg is an int, 

2314 # so we cannot send more than 2**31-1 bytes at once. 

2315 result = _lib.SSL_write( 

2316 self._ssl, data + total_sent, min(left_to_send, 2147483647) 

2317 ) 

2318 self._raise_ssl_error(self._ssl, result) 

2319 total_sent += result 

2320 left_to_send -= result 

2321 

2322 return total_sent 

2323 

2324 def recv(self, bufsiz: int, flags: int | None = None) -> bytes: 

2325 """ 

2326 Receive data on the connection. 

2327 

2328 :param bufsiz: The maximum number of bytes to read 

2329 :param flags: (optional) The only supported flag is ``MSG_PEEK``, 

2330 all other flags are ignored. 

2331 :return: The string read from the Connection 

2332 """ 

2333 buf = _no_zero_allocator("char[]", bufsiz) 

2334 if flags is not None and flags & socket.MSG_PEEK: 

2335 result = _lib.SSL_peek(self._ssl, buf, bufsiz) 

2336 else: 

2337 result = _lib.SSL_read(self._ssl, buf, bufsiz) 

2338 self._raise_ssl_error(self._ssl, result) 

2339 return _ffi.buffer(buf, result)[:] 

2340 

2341 read = recv 

2342 

2343 def recv_into( 

2344 self, 

2345 buffer: Any, # collections.abc.Buffer once we use Python 3.12+ 

2346 nbytes: int | None = None, 

2347 flags: int | None = None, 

2348 ) -> int: 

2349 """ 

2350 Receive data on the connection and copy it directly into the provided 

2351 buffer, rather than creating a new string. 

2352 

2353 :param buffer: The buffer to copy into. 

2354 :param nbytes: (optional) The maximum number of bytes to read into the 

2355 buffer. If not present, defaults to the size of the buffer. If 

2356 larger than the size of the buffer, is reduced to the size of the 

2357 buffer. 

2358 :param flags: (optional) The only supported flag is ``MSG_PEEK``, 

2359 all other flags are ignored. 

2360 :return: The number of bytes read into the buffer. 

2361 """ 

2362 if nbytes is None: 

2363 nbytes = len(buffer) 

2364 else: 

2365 nbytes = min(nbytes, len(buffer)) 

2366 

2367 # We need to create a temporary buffer. This is annoying, it would be 

2368 # better if we could pass memoryviews straight into the SSL_read call, 

2369 # but right now we can't. Revisit this if CFFI gets that ability. 

2370 buf = _no_zero_allocator("char[]", nbytes) 

2371 if flags is not None and flags & socket.MSG_PEEK: 

2372 result = _lib.SSL_peek(self._ssl, buf, nbytes) 

2373 else: 

2374 result = _lib.SSL_read(self._ssl, buf, nbytes) 

2375 self._raise_ssl_error(self._ssl, result) 

2376 

2377 # This strange line is all to avoid a memory copy. The buffer protocol 

2378 # should allow us to assign a CFFI buffer to the LHS of this line, but 

2379 # on CPython 3.3+ that segfaults. As a workaround, we can temporarily 

2380 # wrap it in a memoryview. 

2381 buffer[:result] = memoryview(_ffi.buffer(buf, result)) 

2382 

2383 return result 

2384 

2385 def _handle_bio_errors(self, bio: Any, result: int) -> typing.NoReturn: 

2386 if _lib.BIO_should_retry(bio): 

2387 if _lib.BIO_should_read(bio): 

2388 raise WantReadError() 

2389 elif _lib.BIO_should_write(bio): 

2390 # TODO: This is untested. 

2391 raise WantWriteError() 

2392 elif _lib.BIO_should_io_special(bio): 

2393 # TODO: This is untested. I think io_special means the socket 

2394 # BIO has a not-yet connected socket. 

2395 raise ValueError("BIO_should_io_special") 

2396 else: 

2397 # TODO: This is untested. 

2398 raise ValueError("unknown bio failure") 

2399 else: 

2400 # TODO: This is untested. 

2401 _raise_current_error() 

2402 

2403 def bio_read(self, bufsiz: int) -> bytes: 

2404 """ 

2405 If the Connection was created with a memory BIO, this method can be 

2406 used to read bytes from the write end of that memory BIO. Many 

2407 Connection methods will add bytes which must be read in this manner or 

2408 the buffer will eventually fill up and the Connection will be able to 

2409 take no further actions. 

2410 

2411 :param bufsiz: The maximum number of bytes to read 

2412 :return: The string read. 

2413 """ 

2414 if self._from_ssl is None: 

2415 raise TypeError("Connection sock was not None") 

2416 

2417 if not isinstance(bufsiz, int): 

2418 raise TypeError("bufsiz must be an integer") 

2419 

2420 buf = _no_zero_allocator("char[]", bufsiz) 

2421 result = _lib.BIO_read(self._from_ssl, buf, bufsiz) 

2422 if result <= 0: 

2423 self._handle_bio_errors(self._from_ssl, result) 

2424 

2425 return _ffi.buffer(buf, result)[:] 

2426 

2427 def bio_write(self, buf: _Buffer) -> int: 

2428 """ 

2429 If the Connection was created with a memory BIO, this method can be 

2430 used to add bytes to the read end of that memory BIO. The Connection 

2431 can then read the bytes (for example, in response to a call to 

2432 :meth:`recv`). 

2433 

2434 :param buf: The string to put into the memory BIO. 

2435 :return: The number of bytes written 

2436 """ 

2437 buf = _text_to_bytes_and_warn("buf", buf) 

2438 

2439 if self._into_ssl is None: 

2440 raise TypeError("Connection sock was not None") 

2441 

2442 with _ffi.from_buffer(buf) as data: 

2443 result = _lib.BIO_write(self._into_ssl, data, len(data)) 

2444 if result <= 0: 

2445 self._handle_bio_errors(self._into_ssl, result) 

2446 return result 

2447 

2448 def renegotiate(self) -> bool: 

2449 """ 

2450 Renegotiate the session. 

2451 

2452 :return: True if the renegotiation can be started, False otherwise 

2453 """ 

2454 if not self.renegotiate_pending(): 

2455 _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1) 

2456 return True 

2457 return False 

2458 

2459 def do_handshake(self) -> None: 

2460 """ 

2461 Perform an SSL handshake (usually called after :meth:`renegotiate` or 

2462 one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can 

2463 raise the same exceptions as :meth:`send` and :meth:`recv`. 

2464 

2465 :return: None. 

2466 """ 

2467 result = _lib.SSL_do_handshake(self._ssl) 

2468 self._raise_ssl_error(self._ssl, result) 

2469 

2470 def renegotiate_pending(self) -> bool: 

2471 """ 

2472 Check if there's a renegotiation in progress, it will return False once 

2473 a renegotiation is finished. 

2474 

2475 :return: Whether there's a renegotiation in progress 

2476 """ 

2477 return _lib.SSL_renegotiate_pending(self._ssl) == 1 

2478 

2479 def total_renegotiations(self) -> int: 

2480 """ 

2481 Find out the total number of renegotiations. 

2482 

2483 :return: The number of renegotiations. 

2484 """ 

2485 return _lib.SSL_total_renegotiations(self._ssl) 

2486 

2487 def connect(self, addr: Any) -> None: 

2488 """ 

2489 Call the :meth:`connect` method of the underlying socket and set up SSL 

2490 on the socket, using the :class:`Context` object supplied to this 

2491 :class:`Connection` object at creation. 

2492 

2493 :param addr: A remote address 

2494 :return: What the socket's connect method returns 

2495 """ 

2496 _lib.SSL_set_connect_state(self._ssl) 

2497 return self._socket.connect(addr) # type: ignore[return-value, union-attr] 

2498 

2499 def connect_ex(self, addr: Any) -> int: 

2500 """ 

2501 Call the :meth:`connect_ex` method of the underlying socket and set up 

2502 SSL on the socket, using the Context object supplied to this Connection 

2503 object at creation. Note that if the :meth:`connect_ex` method of the 

2504 socket doesn't return 0, SSL won't be initialized. 

2505 

2506 :param addr: A remove address 

2507 :return: What the socket's connect_ex method returns 

2508 """ 

2509 connect_ex = self._socket.connect_ex # type: ignore[union-attr] 

2510 self.set_connect_state() 

2511 return connect_ex(addr) 

2512 

2513 def accept(self) -> tuple[Connection, Any]: 

2514 """ 

2515 Call the :meth:`accept` method of the underlying socket and set up SSL 

2516 on the returned socket, using the Context object supplied to this 

2517 :class:`Connection` object at creation. 

2518 

2519 :return: A *(conn, addr)* pair where *conn* is the new 

2520 :class:`Connection` object created, and *address* is as returned by 

2521 the socket's :meth:`accept`. 

2522 """ 

2523 client, addr = self._socket.accept() # type: ignore[union-attr] 

2524 conn = Connection(self._context, client) 

2525 conn.set_accept_state() 

2526 return (conn, addr) 

2527 

2528 def DTLSv1_listen(self) -> None: 

2529 """ 

2530 Call the OpenSSL function DTLSv1_listen on this connection. See the 

2531 OpenSSL manual for more details. 

2532 

2533 :return: None 

2534 """ 

2535 # Possible future extension: return the BIO_ADDR in some form. 

2536 bio_addr = _lib.BIO_ADDR_new() 

2537 try: 

2538 result = _lib.DTLSv1_listen(self._ssl, bio_addr) 

2539 finally: 

2540 _lib.BIO_ADDR_free(bio_addr) 

2541 # DTLSv1_listen is weird. A zero return value means 'didn't find a 

2542 # ClientHello with valid cookie, but keep trying'. So basically 

2543 # WantReadError. But it doesn't work correctly with _raise_ssl_error. 

2544 # So we raise it manually instead. 

2545 if self._cookie_generate_helper is not None: 

2546 self._cookie_generate_helper.raise_if_problem() 

2547 if self._cookie_verify_helper is not None: 

2548 self._cookie_verify_helper.raise_if_problem() 

2549 if result == 0: 

2550 raise WantReadError() 

2551 if result < 0: 

2552 self._raise_ssl_error(self._ssl, result) 

2553 

2554 def DTLSv1_get_timeout(self) -> int | None: 

2555 """ 

2556 Determine when the DTLS SSL object next needs to perform internal 

2557 processing due to the passage of time. 

2558 

2559 When the returned number of seconds have passed, the 

2560 :meth:`DTLSv1_handle_timeout` method needs to be called. 

2561 

2562 :return: The time left in seconds before the next timeout or `None` 

2563 if no timeout is currently active. 

2564 """ 

2565 ptv_sec = _ffi.new("time_t *") 

2566 ptv_usec = _ffi.new("long *") 

2567 if _lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec): 

2568 return ptv_sec[0] + (ptv_usec[0] / 1000000) 

2569 else: 

2570 return None 

2571 

2572 def DTLSv1_handle_timeout(self) -> bool: 

2573 """ 

2574 Handles any timeout events which have become pending on a DTLS SSL 

2575 object. 

2576 

2577 :return: `True` if there was a pending timeout, `False` otherwise. 

2578 """ 

2579 result = _lib.DTLSv1_handle_timeout(self._ssl) 

2580 if result < 0: 

2581 self._raise_ssl_error(self._ssl, result) 

2582 assert False, "unreachable" 

2583 else: 

2584 return bool(result) 

2585 

2586 def bio_shutdown(self) -> None: 

2587 """ 

2588 If the Connection was created with a memory BIO, this method can be 

2589 used to indicate that *end of file* has been reached on the read end of 

2590 that memory BIO. 

2591 

2592 :return: None 

2593 """ 

2594 if self._from_ssl is None: 

2595 raise TypeError("Connection sock was not None") 

2596 

2597 _lib.BIO_set_mem_eof_return(self._into_ssl, 0) 

2598 

2599 def shutdown(self) -> bool: 

2600 """ 

2601 Send the shutdown message to the Connection. 

2602 

2603 :return: True if the shutdown completed successfully (i.e. both sides 

2604 have sent closure alerts), False otherwise (in which case you 

2605 call :meth:`recv` or :meth:`send` when the connection becomes 

2606 readable/writeable). 

2607 """ 

2608 result = _lib.SSL_shutdown(self._ssl) 

2609 if result < 0: 

2610 self._raise_ssl_error(self._ssl, result) 

2611 assert False, "unreachable" 

2612 elif result > 0: 

2613 return True 

2614 else: 

2615 return False 

2616 

2617 def get_cipher_list(self) -> list[str]: 

2618 """ 

2619 Retrieve the list of ciphers used by the Connection object. 

2620 

2621 :return: A list of native cipher strings. 

2622 """ 

2623 ciphers = [] 

2624 for i in count(): 

2625 result = _lib.SSL_get_cipher_list(self._ssl, i) 

2626 if result == _ffi.NULL: 

2627 break 

2628 ciphers.append(_ffi.string(result).decode("utf-8")) 

2629 return ciphers 

2630 

2631 def get_client_ca_list(self) -> list[X509Name]: 

2632 """ 

2633 Get CAs whose certificates are suggested for client authentication. 

2634 

2635 :return: If this is a server connection, the list of certificate 

2636 authorities that will be sent or has been sent to the client, as 

2637 controlled by this :class:`Connection`'s :class:`Context`. 

2638 

2639 If this is a client connection, the list will be empty until the 

2640 connection with the server is established. 

2641 

2642 .. versionadded:: 0.10 

2643 """ 

2644 ca_names = _lib.SSL_get_client_CA_list(self._ssl) 

2645 if ca_names == _ffi.NULL: 

2646 # TODO: This is untested. 

2647 return [] 

2648 

2649 result = [] 

2650 for i in range(_lib.sk_X509_NAME_num(ca_names)): 

2651 name = _lib.sk_X509_NAME_value(ca_names, i) 

2652 copy = _lib.X509_NAME_dup(name) 

2653 _openssl_assert(copy != _ffi.NULL) 

2654 

2655 pyname = X509Name.__new__(X509Name) 

2656 pyname._name = _ffi.gc(copy, _lib.X509_NAME_free) 

2657 result.append(pyname) 

2658 return result 

2659 

2660 def makefile(self, *args: Any, **kwargs: Any) -> typing.NoReturn: 

2661 """ 

2662 The makefile() method is not implemented, since there is no dup 

2663 semantics for SSL connections 

2664 

2665 :raise: NotImplementedError 

2666 """ 

2667 raise NotImplementedError( 

2668 "Cannot make file object of OpenSSL.SSL.Connection" 

2669 ) 

2670 

2671 def get_app_data(self) -> Any: 

2672 """ 

2673 Retrieve application data as set by :meth:`set_app_data`. 

2674 

2675 :return: The application data 

2676 """ 

2677 return self._app_data 

2678 

2679 def set_app_data(self, data: Any) -> None: 

2680 """ 

2681 Set application data 

2682 

2683 :param data: The application data 

2684 :return: None 

2685 """ 

2686 self._app_data = data 

2687 

2688 def get_shutdown(self) -> int: 

2689 """ 

2690 Get the shutdown state of the Connection. 

2691 

2692 :return: The shutdown state, a bitvector of SENT_SHUTDOWN, 

2693 RECEIVED_SHUTDOWN. 

2694 """ 

2695 return _lib.SSL_get_shutdown(self._ssl) 

2696 

2697 def set_shutdown(self, state: int) -> None: 

2698 """ 

2699 Set the shutdown state of the Connection. 

2700 

2701 :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN. 

2702 :return: None 

2703 """ 

2704 if not isinstance(state, int): 

2705 raise TypeError("state must be an integer") 

2706 

2707 _lib.SSL_set_shutdown(self._ssl, state) 

2708 

2709 def get_state_string(self) -> bytes: 

2710 """ 

2711 Retrieve a verbose string detailing the state of the Connection. 

2712 

2713 :return: A string representing the state 

2714 """ 

2715 return _ffi.string(_lib.SSL_state_string_long(self._ssl)) 

2716 

2717 def server_random(self) -> bytes | None: 

2718 """ 

2719 Retrieve the random value used with the server hello message. 

2720 

2721 :return: A string representing the state 

2722 """ 

2723 session = _lib.SSL_get_session(self._ssl) 

2724 if session == _ffi.NULL: 

2725 return None 

2726 length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0) 

2727 _openssl_assert(length > 0) 

2728 outp = _no_zero_allocator("unsigned char[]", length) 

2729 _lib.SSL_get_server_random(self._ssl, outp, length) 

2730 return _ffi.buffer(outp, length)[:] 

2731 

2732 def client_random(self) -> bytes | None: 

2733 """ 

2734 Retrieve the random value used with the client hello message. 

2735 

2736 :return: A string representing the state 

2737 """ 

2738 session = _lib.SSL_get_session(self._ssl) 

2739 if session == _ffi.NULL: 

2740 return None 

2741 

2742 length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0) 

2743 _openssl_assert(length > 0) 

2744 outp = _no_zero_allocator("unsigned char[]", length) 

2745 _lib.SSL_get_client_random(self._ssl, outp, length) 

2746 return _ffi.buffer(outp, length)[:] 

2747 

2748 def master_key(self) -> bytes | None: 

2749 """ 

2750 Retrieve the value of the master key for this session. 

2751 

2752 :return: A string representing the state 

2753 """ 

2754 session = _lib.SSL_get_session(self._ssl) 

2755 if session == _ffi.NULL: 

2756 return None 

2757 

2758 length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0) 

2759 _openssl_assert(length > 0) 

2760 outp = _no_zero_allocator("unsigned char[]", length) 

2761 _lib.SSL_SESSION_get_master_key(session, outp, length) 

2762 return _ffi.buffer(outp, length)[:] 

2763 

2764 def export_keying_material( 

2765 self, label: bytes, olen: int, context: bytes | None = None 

2766 ) -> bytes: 

2767 """ 

2768 Obtain keying material for application use. 

2769 

2770 :param: label - a disambiguating label string as described in RFC 5705 

2771 :param: olen - the length of the exported key material in bytes 

2772 :param: context - a per-association context value 

2773 :return: the exported key material bytes or None 

2774 """ 

2775 outp = _no_zero_allocator("unsigned char[]", olen) 

2776 context_buf = _ffi.NULL 

2777 context_len = 0 

2778 use_context = 0 

2779 if context is not None: 

2780 context_buf = context 

2781 context_len = len(context) 

2782 use_context = 1 

2783 success = _lib.SSL_export_keying_material( 

2784 self._ssl, 

2785 outp, 

2786 olen, 

2787 label, 

2788 len(label), 

2789 context_buf, 

2790 context_len, 

2791 use_context, 

2792 ) 

2793 _openssl_assert(success == 1) 

2794 return _ffi.buffer(outp, olen)[:] 

2795 

2796 def sock_shutdown(self, *args: Any, **kwargs: Any) -> None: 

2797 """ 

2798 Call the :meth:`shutdown` method of the underlying socket. 

2799 See :manpage:`shutdown(2)`. 

2800 

2801 :return: What the socket's shutdown() method returns 

2802 """ 

2803 return self._socket.shutdown(*args, **kwargs) # type: ignore[return-value, union-attr] 

2804 

2805 @typing.overload 

2806 def get_certificate( 

2807 self, *, as_cryptography: typing.Literal[True] 

2808 ) -> x509.Certificate | None: 

2809 pass 

2810 

2811 @typing.overload 

2812 def get_certificate( 

2813 self, *, as_cryptography: typing.Literal[False] = False 

2814 ) -> X509 | None: 

2815 pass 

2816 

2817 def get_certificate( 

2818 self, 

2819 *, 

2820 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2821 ) -> X509 | x509.Certificate | None: 

2822 """ 

2823 Retrieve the local certificate (if any) 

2824 

2825 :param bool as_cryptography: Controls whether a 

2826 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509`` 

2827 object should be returned. 

2828 

2829 :return: The local certificate 

2830 """ 

2831 cert = _lib.SSL_get_certificate(self._ssl) 

2832 if cert != _ffi.NULL: 

2833 _lib.X509_up_ref(cert) 

2834 pycert = X509._from_raw_x509_ptr(cert) 

2835 if as_cryptography: 

2836 return pycert.to_cryptography() 

2837 return pycert 

2838 return None 

2839 

2840 @typing.overload 

2841 def get_peer_certificate( 

2842 self, *, as_cryptography: typing.Literal[True] 

2843 ) -> x509.Certificate | None: 

2844 pass 

2845 

2846 @typing.overload 

2847 def get_peer_certificate( 

2848 self, *, as_cryptography: typing.Literal[False] = False 

2849 ) -> X509 | None: 

2850 pass 

2851 

2852 def get_peer_certificate( 

2853 self, 

2854 *, 

2855 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2856 ) -> X509 | x509.Certificate | None: 

2857 """ 

2858 Retrieve the other side's certificate (if any) 

2859 

2860 :param bool as_cryptography: Controls whether a 

2861 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509`` 

2862 object should be returned. 

2863 

2864 :return: The peer's certificate 

2865 """ 

2866 cert = _lib.SSL_get_peer_certificate(self._ssl) 

2867 if cert != _ffi.NULL: 

2868 pycert = X509._from_raw_x509_ptr(cert) 

2869 if as_cryptography: 

2870 return pycert.to_cryptography() 

2871 return pycert 

2872 return None 

2873 

2874 @staticmethod 

2875 def _cert_stack_to_list(cert_stack: Any) -> list[X509]: 

2876 """ 

2877 Internal helper to convert a STACK_OF(X509) to a list of X509 

2878 instances. 

2879 """ 

2880 result = [] 

2881 for i in range(_lib.sk_X509_num(cert_stack)): 

2882 cert = _lib.sk_X509_value(cert_stack, i) 

2883 _openssl_assert(cert != _ffi.NULL) 

2884 res = _lib.X509_up_ref(cert) 

2885 _openssl_assert(res >= 1) 

2886 pycert = X509._from_raw_x509_ptr(cert) 

2887 result.append(pycert) 

2888 return result 

2889 

2890 @staticmethod 

2891 def _cert_stack_to_cryptography_list( 

2892 cert_stack: Any, 

2893 ) -> list[x509.Certificate]: 

2894 """ 

2895 Internal helper to convert a STACK_OF(X509) to a list of X509 

2896 instances. 

2897 """ 

2898 result = [] 

2899 for i in range(_lib.sk_X509_num(cert_stack)): 

2900 cert = _lib.sk_X509_value(cert_stack, i) 

2901 _openssl_assert(cert != _ffi.NULL) 

2902 res = _lib.X509_up_ref(cert) 

2903 _openssl_assert(res >= 1) 

2904 pycert = X509._from_raw_x509_ptr(cert) 

2905 result.append(pycert.to_cryptography()) 

2906 return result 

2907 

2908 @typing.overload 

2909 def get_peer_cert_chain( 

2910 self, *, as_cryptography: typing.Literal[True] 

2911 ) -> list[x509.Certificate] | None: 

2912 pass 

2913 

2914 @typing.overload 

2915 def get_peer_cert_chain( 

2916 self, *, as_cryptography: typing.Literal[False] = False 

2917 ) -> list[X509] | None: 

2918 pass 

2919 

2920 def get_peer_cert_chain( 

2921 self, 

2922 *, 

2923 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2924 ) -> list[X509] | list[x509.Certificate] | None: 

2925 """ 

2926 Retrieve the other side's certificate (if any) 

2927 

2928 :param bool as_cryptography: Controls whether a list of 

2929 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509`` 

2930 object should be returned. 

2931 

2932 :return: A list of X509 instances giving the peer's certificate chain, 

2933 or None if it does not have one. 

2934 """ 

2935 cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl) 

2936 if cert_stack == _ffi.NULL: 

2937 return None 

2938 

2939 if as_cryptography: 

2940 return self._cert_stack_to_cryptography_list(cert_stack) 

2941 return self._cert_stack_to_list(cert_stack) 

2942 

2943 @typing.overload 

2944 def get_verified_chain( 

2945 self, *, as_cryptography: typing.Literal[True] 

2946 ) -> list[x509.Certificate] | None: 

2947 pass 

2948 

2949 @typing.overload 

2950 def get_verified_chain( 

2951 self, *, as_cryptography: typing.Literal[False] = False 

2952 ) -> list[X509] | None: 

2953 pass 

2954 

2955 def get_verified_chain( 

2956 self, 

2957 *, 

2958 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2959 ) -> list[X509] | list[x509.Certificate] | None: 

2960 """ 

2961 Retrieve the verified certificate chain of the peer including the 

2962 peer's end entity certificate. It must be called after a session has 

2963 been successfully established. If peer verification was not successful 

2964 the chain may be incomplete, invalid, or None. 

2965 

2966 :param bool as_cryptography: Controls whether a list of 

2967 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509`` 

2968 object should be returned. 

2969 

2970 :return: A list of X509 instances giving the peer's verified 

2971 certificate chain, or None if it does not have one. 

2972 

2973 .. versionadded:: 20.0 

2974 """ 

2975 # OpenSSL 1.1+ 

2976 cert_stack = _lib.SSL_get0_verified_chain(self._ssl) 

2977 if cert_stack == _ffi.NULL: 

2978 return None 

2979 

2980 if as_cryptography: 

2981 return self._cert_stack_to_cryptography_list(cert_stack) 

2982 return self._cert_stack_to_list(cert_stack) 

2983 

2984 def want_read(self) -> bool: 

2985 """ 

2986 Checks if more data has to be read from the transport layer to complete 

2987 an operation. 

2988 

2989 :return: True iff more data has to be read 

2990 """ 

2991 return _lib.SSL_want_read(self._ssl) 

2992 

2993 def want_write(self) -> bool: 

2994 """ 

2995 Checks if there is data to write to the transport layer to complete an 

2996 operation. 

2997 

2998 :return: True iff there is data to write 

2999 """ 

3000 return _lib.SSL_want_write(self._ssl) 

3001 

3002 def set_accept_state(self) -> None: 

3003 """ 

3004 Set the connection to work in server mode. The handshake will be 

3005 handled automatically by read/write. 

3006 

3007 :return: None 

3008 """ 

3009 _lib.SSL_set_accept_state(self._ssl) 

3010 

3011 def set_connect_state(self) -> None: 

3012 """ 

3013 Set the connection to work in client mode. The handshake will be 

3014 handled automatically by read/write. 

3015 

3016 :return: None 

3017 """ 

3018 _lib.SSL_set_connect_state(self._ssl) 

3019 

3020 def get_session(self) -> Session | None: 

3021 """ 

3022 Returns the Session currently used. 

3023 

3024 :return: An instance of :class:`OpenSSL.SSL.Session` or 

3025 :obj:`None` if no session exists. 

3026 

3027 .. versionadded:: 0.14 

3028 """ 

3029 session = _lib.SSL_get1_session(self._ssl) 

3030 if session == _ffi.NULL: 

3031 return None 

3032 

3033 pysession = Session.__new__(Session) 

3034 pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free) 

3035 return pysession 

3036 

3037 def set_session(self, session: Session) -> None: 

3038 """ 

3039 Set the session to be used when the TLS/SSL connection is established. 

3040 

3041 :param session: A Session instance representing the session to use. 

3042 :returns: None 

3043 

3044 .. versionadded:: 0.14 

3045 """ 

3046 if not isinstance(session, Session): 

3047 raise TypeError("session must be a Session instance") 

3048 

3049 result = _lib.SSL_set_session(self._ssl, session._session) 

3050 _openssl_assert(result == 1) 

3051 

3052 def _get_finished_message( 

3053 self, function: Callable[[Any, Any, int], int] 

3054 ) -> bytes | None: 

3055 """ 

3056 Helper to implement :meth:`get_finished` and 

3057 :meth:`get_peer_finished`. 

3058 

3059 :param function: Either :data:`SSL_get_finished`: or 

3060 :data:`SSL_get_peer_finished`. 

3061 

3062 :return: :data:`None` if the desired message has not yet been 

3063 received, otherwise the contents of the message. 

3064 """ 

3065 # The OpenSSL documentation says nothing about what might happen if the 

3066 # count argument given is zero. Specifically, it doesn't say whether 

3067 # the output buffer may be NULL in that case or not. Inspection of the 

3068 # implementation reveals that it calls memcpy() unconditionally. 

3069 # Section 7.1.4, paragraph 1 of the C standard suggests that 

3070 # memcpy(NULL, source, 0) is not guaranteed to produce defined (let 

3071 # alone desirable) behavior (though it probably does on just about 

3072 # every implementation...) 

3073 # 

3074 # Allocate a tiny buffer to pass in (instead of just passing NULL as 

3075 # one might expect) for the initial call so as to be safe against this 

3076 # potentially undefined behavior. 

3077 empty = _ffi.new("char[]", 0) 

3078 size = function(self._ssl, empty, 0) 

3079 if size == 0: 

3080 # No Finished message so far. 

3081 return None 

3082 

3083 buf = _no_zero_allocator("char[]", size) 

3084 function(self._ssl, buf, size) 

3085 return _ffi.buffer(buf, size)[:] 

3086 

3087 def get_finished(self) -> bytes | None: 

3088 """ 

3089 Obtain the latest TLS Finished message that we sent. 

3090 

3091 :return: The contents of the message or :obj:`None` if the TLS 

3092 handshake has not yet completed. 

3093 

3094 .. versionadded:: 0.15 

3095 """ 

3096 return self._get_finished_message(_lib.SSL_get_finished) 

3097 

3098 def get_peer_finished(self) -> bytes | None: 

3099 """ 

3100 Obtain the latest TLS Finished message that we received from the peer. 

3101 

3102 :return: The contents of the message or :obj:`None` if the TLS 

3103 handshake has not yet completed. 

3104 

3105 .. versionadded:: 0.15 

3106 """ 

3107 return self._get_finished_message(_lib.SSL_get_peer_finished) 

3108 

3109 def get_cipher_name(self) -> str | None: 

3110 """ 

3111 Obtain the name of the currently used cipher. 

3112 

3113 :returns: The name of the currently used cipher or :obj:`None` 

3114 if no connection has been established. 

3115 

3116 .. versionadded:: 0.15 

3117 """ 

3118 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3119 if cipher == _ffi.NULL: 

3120 return None 

3121 else: 

3122 name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher)) 

3123 return name.decode("utf-8") 

3124 

3125 def get_cipher_bits(self) -> int | None: 

3126 """ 

3127 Obtain the number of secret bits of the currently used cipher. 

3128 

3129 :returns: The number of secret bits of the currently used cipher 

3130 or :obj:`None` if no connection has been established. 

3131 

3132 .. versionadded:: 0.15 

3133 """ 

3134 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3135 if cipher == _ffi.NULL: 

3136 return None 

3137 else: 

3138 return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL) 

3139 

3140 def get_cipher_version(self) -> str | None: 

3141 """ 

3142 Obtain the protocol version of the currently used cipher. 

3143 

3144 :returns: The protocol name of the currently used cipher 

3145 or :obj:`None` if no connection has been established. 

3146 

3147 .. versionadded:: 0.15 

3148 """ 

3149 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3150 if cipher == _ffi.NULL: 

3151 return None 

3152 else: 

3153 version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher)) 

3154 return version.decode("utf-8") 

3155 

3156 def get_protocol_version_name(self) -> str: 

3157 """ 

3158 Retrieve the protocol version of the current connection. 

3159 

3160 :returns: The TLS version of the current connection, for example 

3161 the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown`` 

3162 for connections that were not successfully established. 

3163 """ 

3164 version = _ffi.string(_lib.SSL_get_version(self._ssl)) 

3165 return version.decode("utf-8") 

3166 

3167 def get_protocol_version(self) -> int: 

3168 """ 

3169 Retrieve the SSL or TLS protocol version of the current connection. 

3170 

3171 :returns: The TLS version of the current connection. For example, 

3172 it will return ``0x769`` for connections made over TLS version 1. 

3173 """ 

3174 version = _lib.SSL_version(self._ssl) 

3175 return version 

3176 

3177 def set_alpn_protos(self, protos: list[bytes]) -> None: 

3178 """ 

3179 Specify the client's ALPN protocol list. 

3180 

3181 These protocols are offered to the server during protocol negotiation. 

3182 

3183 :param protos: A list of the protocols to be offered to the server. 

3184 This list should be a Python list of bytestrings representing the 

3185 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. 

3186 """ 

3187 # Different versions of OpenSSL are inconsistent about how they handle 

3188 # empty proto lists (see #1043), so we avoid the problem entirely by 

3189 # rejecting them ourselves. 

3190 if not protos: 

3191 raise ValueError("at least one protocol must be specified") 

3192 

3193 # Take the list of protocols and join them together, prefixing them 

3194 # with their lengths. 

3195 protostr = b"".join( 

3196 chain.from_iterable((bytes((len(p),)), p) for p in protos) 

3197 ) 

3198 

3199 # Build a C string from the list. We don't need to save this off 

3200 # because OpenSSL immediately copies the data out. 

3201 input_str = _ffi.new("unsigned char[]", protostr) 

3202 

3203 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: 

3204 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() 

3205 # return 0 on success, and non-0 on failure. 

3206 # WARNING: these functions reverse the return value convention. 

3207 _openssl_assert( 

3208 _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0 

3209 ) 

3210 

3211 def get_alpn_proto_negotiated(self) -> bytes: 

3212 """ 

3213 Get the protocol that was negotiated by ALPN. 

3214 

3215 :returns: A bytestring of the protocol name. If no protocol has been 

3216 negotiated yet, returns an empty bytestring. 

3217 """ 

3218 data = _ffi.new("unsigned char **") 

3219 data_len = _ffi.new("unsigned int *") 

3220 

3221 _lib.SSL_get0_alpn_selected(self._ssl, data, data_len) 

3222 

3223 if not data_len: 

3224 return b"" 

3225 

3226 return _ffi.buffer(data[0], data_len[0])[:] 

3227 

3228 def get_selected_srtp_profile(self) -> bytes: 

3229 """ 

3230 Get the SRTP protocol which was negotiated. 

3231 

3232 :returns: A bytestring of the SRTP profile name. If no profile has been 

3233 negotiated yet, returns an empty bytestring. 

3234 """ 

3235 profile = _lib.SSL_get_selected_srtp_profile(self._ssl) 

3236 if not profile: 

3237 return b"" 

3238 

3239 return _ffi.string(profile.name) 

3240 

3241 @_requires_ssl_get0_group_name 

3242 def get_group_name(self) -> str | None: 

3243 """ 

3244 Get the name of the negotiated group for the key exchange. 

3245 

3246 :return: A string giving the group name or :data:`None`. 

3247 """ 

3248 # Do not remove this guard. 

3249 # SSL_get0_group_name crashes with a segfault if called without 

3250 # an established connection (should return NULL but doesn't). 

3251 session = _lib.SSL_get_session(self._ssl) 

3252 if session == _ffi.NULL: 

3253 return None 

3254 

3255 group_name = _lib.SSL_get0_group_name(self._ssl) 

3256 if group_name == _ffi.NULL: 

3257 return None 

3258 

3259 return _ffi.string(group_name).decode("utf-8") 

3260 

3261 def request_ocsp(self) -> None: 

3262 """ 

3263 Called to request that the server sends stapled OCSP data, if 

3264 available. If this is not called on the client side then the server 

3265 will not send OCSP data. Should be used in conjunction with 

3266 :meth:`Context.set_ocsp_client_callback`. 

3267 """ 

3268 rc = _lib.SSL_set_tlsext_status_type( 

3269 self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp 

3270 ) 

3271 _openssl_assert(rc == 1) 

3272 

3273 def set_info_callback( 

3274 self, callback: Callable[[Connection, int, int], None] 

3275 ) -> None: 

3276 """ 

3277 Set the information callback to *callback*. This function will be 

3278 called from time to time during SSL handshakes. 

3279 

3280 :param callback: The Python callback to use. This should take three 

3281 arguments: a Connection object and two integers. The first integer 

3282 specifies where in the SSL handshake the function was called, and 

3283 the other the return code from a (possibly failed) internal 

3284 function call. 

3285 :return: None 

3286 """ 

3287 

3288 @wraps(callback) 

3289 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def] 

3290 callback(Connection._reverse_mapping[ssl], where, return_code) 

3291 

3292 self._info_callback = _ffi.callback( 

3293 "void (*)(const SSL *, int, int)", wrapper 

3294 ) 

3295 _lib.SSL_set_info_callback(self._ssl, self._info_callback)