Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/SSL.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1215 statements  

1from __future__ import annotations 

2 

3import os 

4import socket 

5import typing 

6import warnings 

7from collections.abc import Sequence 

8from errno import errorcode 

9from functools import partial, wraps 

10from itertools import chain, count 

11from sys import platform 

12from typing import Any, Callable, Optional, TypeVar 

13from weakref import WeakValueDictionary 

14 

15from cryptography import x509 

16from cryptography.hazmat.primitives.asymmetric import ec 

17 

18from OpenSSL._util import ( 

19 StrOrBytesPath as _StrOrBytesPath, 

20) 

21from OpenSSL._util import ( 

22 exception_from_error_queue as _exception_from_error_queue, 

23) 

24from OpenSSL._util import ( 

25 ffi as _ffi, 

26) 

27from OpenSSL._util import ( 

28 lib as _lib, 

29) 

30from OpenSSL._util import ( 

31 make_assert as _make_assert, 

32) 

33from OpenSSL._util import ( 

34 no_zero_allocator as _no_zero_allocator, 

35) 

36from OpenSSL._util import ( 

37 path_bytes as _path_bytes, 

38) 

39from OpenSSL._util import ( 

40 text_to_bytes_and_warn as _text_to_bytes_and_warn, 

41) 

42from OpenSSL.crypto import ( 

43 FILETYPE_PEM, 

44 X509, 

45 PKey, 

46 X509Name, 

47 X509Store, 

48 _EllipticCurve, 

49 _PassphraseHelper, 

50 _PrivateKey, 

51) 

52 

53__all__ = [ 

54 "DTLS_CLIENT_METHOD", 

55 "DTLS_METHOD", 

56 "DTLS_SERVER_METHOD", 

57 "MODE_RELEASE_BUFFERS", 

58 "NO_OVERLAPPING_PROTOCOLS", 

59 "OPENSSL_BUILT_ON", 

60 "OPENSSL_CFLAGS", 

61 "OPENSSL_DIR", 

62 "OPENSSL_PLATFORM", 

63 "OPENSSL_VERSION", 

64 "OPENSSL_VERSION_NUMBER", 

65 "OP_ALL", 

66 "OP_CIPHER_SERVER_PREFERENCE", 

67 "OP_COOKIE_EXCHANGE", 

68 "OP_DONT_INSERT_EMPTY_FRAGMENTS", 

69 "OP_EPHEMERAL_RSA", 

70 "OP_MICROSOFT_BIG_SSLV3_BUFFER", 

71 "OP_MICROSOFT_SESS_ID_BUG", 

72 "OP_MSIE_SSLV2_RSA_PADDING", 

73 "OP_NETSCAPE_CA_DN_BUG", 

74 "OP_NETSCAPE_CHALLENGE_BUG", 

75 "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG", 

76 "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG", 

77 "OP_NO_COMPRESSION", 

78 "OP_NO_QUERY_MTU", 

79 "OP_NO_TICKET", 

80 "OP_PKCS1_CHECK_1", 

81 "OP_PKCS1_CHECK_2", 

82 "OP_SINGLE_DH_USE", 

83 "OP_SINGLE_ECDH_USE", 

84 "OP_SSLEAY_080_CLIENT_DH_BUG", 

85 "OP_SSLREF2_REUSE_CERT_TYPE_BUG", 

86 "OP_TLS_BLOCK_PADDING_BUG", 

87 "OP_TLS_D5_BUG", 

88 "OP_TLS_ROLLBACK_BUG", 

89 "RECEIVED_SHUTDOWN", 

90 "SENT_SHUTDOWN", 

91 "SESS_CACHE_BOTH", 

92 "SESS_CACHE_CLIENT", 

93 "SESS_CACHE_NO_AUTO_CLEAR", 

94 "SESS_CACHE_NO_INTERNAL", 

95 "SESS_CACHE_NO_INTERNAL_LOOKUP", 

96 "SESS_CACHE_NO_INTERNAL_STORE", 

97 "SESS_CACHE_OFF", 

98 "SESS_CACHE_SERVER", 

99 "SSL3_VERSION", 

100 "SSLEAY_BUILT_ON", 

101 "SSLEAY_CFLAGS", 

102 "SSLEAY_DIR", 

103 "SSLEAY_PLATFORM", 

104 "SSLEAY_VERSION", 

105 "SSL_CB_ACCEPT_EXIT", 

106 "SSL_CB_ACCEPT_LOOP", 

107 "SSL_CB_ALERT", 

108 "SSL_CB_CONNECT_EXIT", 

109 "SSL_CB_CONNECT_LOOP", 

110 "SSL_CB_EXIT", 

111 "SSL_CB_HANDSHAKE_DONE", 

112 "SSL_CB_HANDSHAKE_START", 

113 "SSL_CB_LOOP", 

114 "SSL_CB_READ", 

115 "SSL_CB_READ_ALERT", 

116 "SSL_CB_WRITE", 

117 "SSL_CB_WRITE_ALERT", 

118 "SSL_ST_ACCEPT", 

119 "SSL_ST_CONNECT", 

120 "SSL_ST_MASK", 

121 "TLS1_1_VERSION", 

122 "TLS1_2_VERSION", 

123 "TLS1_3_VERSION", 

124 "TLS1_VERSION", 

125 "TLS_CLIENT_METHOD", 

126 "TLS_METHOD", 

127 "TLS_SERVER_METHOD", 

128 "VERIFY_CLIENT_ONCE", 

129 "VERIFY_FAIL_IF_NO_PEER_CERT", 

130 "VERIFY_NONE", 

131 "VERIFY_PEER", 

132 "Connection", 

133 "Context", 

134 "Error", 

135 "OP_NO_SSLv2", 

136 "OP_NO_SSLv3", 

137 "OP_NO_TLSv1", 

138 "OP_NO_TLSv1_1", 

139 "OP_NO_TLSv1_2", 

140 "OP_NO_TLSv1_3", 

141 "SSLeay_version", 

142 "SSLv23_METHOD", 

143 "Session", 

144 "SysCallError", 

145 "TLSv1_1_METHOD", 

146 "TLSv1_2_METHOD", 

147 "TLSv1_METHOD", 

148 "WantReadError", 

149 "WantWriteError", 

150 "WantX509LookupError", 

151 "X509VerificationCodes", 

152 "ZeroReturnError", 

153] 

154 

155 

156OPENSSL_VERSION_NUMBER: int = _lib.OPENSSL_VERSION_NUMBER 

157OPENSSL_VERSION: int = _lib.OPENSSL_VERSION 

158OPENSSL_CFLAGS: int = _lib.OPENSSL_CFLAGS 

159OPENSSL_PLATFORM: int = _lib.OPENSSL_PLATFORM 

160OPENSSL_DIR: int = _lib.OPENSSL_DIR 

161OPENSSL_BUILT_ON: int = _lib.OPENSSL_BUILT_ON 

162 

163SSLEAY_VERSION = OPENSSL_VERSION 

164SSLEAY_CFLAGS = OPENSSL_CFLAGS 

165SSLEAY_PLATFORM = OPENSSL_PLATFORM 

166SSLEAY_DIR = OPENSSL_DIR 

167SSLEAY_BUILT_ON = OPENSSL_BUILT_ON 

168 

169SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN 

170RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN 

171 

172SSLv23_METHOD = 3 

173TLSv1_METHOD = 4 

174TLSv1_1_METHOD = 5 

175TLSv1_2_METHOD = 6 

176TLS_METHOD = 7 

177TLS_SERVER_METHOD = 8 

178TLS_CLIENT_METHOD = 9 

179DTLS_METHOD = 10 

180DTLS_SERVER_METHOD = 11 

181DTLS_CLIENT_METHOD = 12 

182 

183SSL3_VERSION: int = _lib.SSL3_VERSION 

184TLS1_VERSION: int = _lib.TLS1_VERSION 

185TLS1_1_VERSION: int = _lib.TLS1_1_VERSION 

186TLS1_2_VERSION: int = _lib.TLS1_2_VERSION 

187TLS1_3_VERSION: int = _lib.TLS1_3_VERSION 

188 

189OP_NO_SSLv2: int = _lib.SSL_OP_NO_SSLv2 

190OP_NO_SSLv3: int = _lib.SSL_OP_NO_SSLv3 

191OP_NO_TLSv1: int = _lib.SSL_OP_NO_TLSv1 

192OP_NO_TLSv1_1: int = _lib.SSL_OP_NO_TLSv1_1 

193OP_NO_TLSv1_2: int = _lib.SSL_OP_NO_TLSv1_2 

194OP_NO_TLSv1_3: int = _lib.SSL_OP_NO_TLSv1_3 

195 

196MODE_RELEASE_BUFFERS: int = _lib.SSL_MODE_RELEASE_BUFFERS 

197 

198OP_SINGLE_DH_USE: int = _lib.SSL_OP_SINGLE_DH_USE 

199OP_SINGLE_ECDH_USE: int = _lib.SSL_OP_SINGLE_ECDH_USE 

200OP_EPHEMERAL_RSA: int = _lib.SSL_OP_EPHEMERAL_RSA 

201OP_MICROSOFT_SESS_ID_BUG: int = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG 

202OP_NETSCAPE_CHALLENGE_BUG: int = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG 

203OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG: int = ( 

204 _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG 

205) 

206OP_SSLREF2_REUSE_CERT_TYPE_BUG: int = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG 

207OP_MICROSOFT_BIG_SSLV3_BUFFER: int = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER 

208OP_MSIE_SSLV2_RSA_PADDING: int = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING 

209OP_SSLEAY_080_CLIENT_DH_BUG: int = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG 

210OP_TLS_D5_BUG: int = _lib.SSL_OP_TLS_D5_BUG 

211OP_TLS_BLOCK_PADDING_BUG: int = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG 

212OP_DONT_INSERT_EMPTY_FRAGMENTS: int = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS 

213OP_CIPHER_SERVER_PREFERENCE: int = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE 

214OP_TLS_ROLLBACK_BUG: int = _lib.SSL_OP_TLS_ROLLBACK_BUG 

215OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1 

216OP_PKCS1_CHECK_2: int = _lib.SSL_OP_PKCS1_CHECK_2 

217OP_NETSCAPE_CA_DN_BUG: int = _lib.SSL_OP_NETSCAPE_CA_DN_BUG 

218OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG: int = ( 

219 _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG 

220) 

221OP_NO_COMPRESSION: int = _lib.SSL_OP_NO_COMPRESSION 

222 

223OP_NO_QUERY_MTU: int = _lib.SSL_OP_NO_QUERY_MTU 

224OP_COOKIE_EXCHANGE: int = _lib.SSL_OP_COOKIE_EXCHANGE 

225OP_NO_TICKET: int = _lib.SSL_OP_NO_TICKET 

226 

227try: 

228 OP_NO_RENEGOTIATION: int = _lib.SSL_OP_NO_RENEGOTIATION 

229 __all__.append("OP_NO_RENEGOTIATION") 

230except AttributeError: 

231 pass 

232 

233try: 

234 OP_IGNORE_UNEXPECTED_EOF: int = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF 

235 __all__.append("OP_IGNORE_UNEXPECTED_EOF") 

236except AttributeError: 

237 pass 

238 

239try: 

240 OP_LEGACY_SERVER_CONNECT: int = _lib.SSL_OP_LEGACY_SERVER_CONNECT 

241 __all__.append("OP_LEGACY_SERVER_CONNECT") 

242except AttributeError: 

243 pass 

244 

245OP_ALL: int = _lib.SSL_OP_ALL 

246 

247VERIFY_PEER: int = _lib.SSL_VERIFY_PEER 

248VERIFY_FAIL_IF_NO_PEER_CERT: int = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT 

249VERIFY_CLIENT_ONCE: int = _lib.SSL_VERIFY_CLIENT_ONCE 

250VERIFY_NONE: int = _lib.SSL_VERIFY_NONE 

251 

252SESS_CACHE_OFF: int = _lib.SSL_SESS_CACHE_OFF 

253SESS_CACHE_CLIENT: int = _lib.SSL_SESS_CACHE_CLIENT 

254SESS_CACHE_SERVER: int = _lib.SSL_SESS_CACHE_SERVER 

255SESS_CACHE_BOTH: int = _lib.SSL_SESS_CACHE_BOTH 

256SESS_CACHE_NO_AUTO_CLEAR: int = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR 

257SESS_CACHE_NO_INTERNAL_LOOKUP: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP 

258SESS_CACHE_NO_INTERNAL_STORE: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE 

259SESS_CACHE_NO_INTERNAL: int = _lib.SSL_SESS_CACHE_NO_INTERNAL 

260 

261SSL_ST_CONNECT: int = _lib.SSL_ST_CONNECT 

262SSL_ST_ACCEPT: int = _lib.SSL_ST_ACCEPT 

263SSL_ST_MASK: int = _lib.SSL_ST_MASK 

264 

265SSL_CB_LOOP: int = _lib.SSL_CB_LOOP 

266SSL_CB_EXIT: int = _lib.SSL_CB_EXIT 

267SSL_CB_READ: int = _lib.SSL_CB_READ 

268SSL_CB_WRITE: int = _lib.SSL_CB_WRITE 

269SSL_CB_ALERT: int = _lib.SSL_CB_ALERT 

270SSL_CB_READ_ALERT: int = _lib.SSL_CB_READ_ALERT 

271SSL_CB_WRITE_ALERT: int = _lib.SSL_CB_WRITE_ALERT 

272SSL_CB_ACCEPT_LOOP: int = _lib.SSL_CB_ACCEPT_LOOP 

273SSL_CB_ACCEPT_EXIT: int = _lib.SSL_CB_ACCEPT_EXIT 

274SSL_CB_CONNECT_LOOP: int = _lib.SSL_CB_CONNECT_LOOP 

275SSL_CB_CONNECT_EXIT: int = _lib.SSL_CB_CONNECT_EXIT 

276SSL_CB_HANDSHAKE_START: int = _lib.SSL_CB_HANDSHAKE_START 

277SSL_CB_HANDSHAKE_DONE: int = _lib.SSL_CB_HANDSHAKE_DONE 

278 

279_Buffer = typing.Union[bytes, bytearray, memoryview] 

280_T = TypeVar("_T") 

281 

282 

283class _NoOverlappingProtocols: 

284 pass 

285 

286 

287NO_OVERLAPPING_PROTOCOLS = _NoOverlappingProtocols() 

288 

289# Callback types. 

290_ALPNSelectCallback = Callable[ 

291 [ 

292 "Connection", 

293 typing.List[bytes], 

294 ], 

295 typing.Union[bytes, _NoOverlappingProtocols], 

296] 

297_CookieGenerateCallback = Callable[["Connection"], bytes] 

298_CookieVerifyCallback = Callable[["Connection", bytes], bool] 

299_OCSPClientCallback = Callable[["Connection", bytes, Optional[_T]], bool] 

300_OCSPServerCallback = Callable[["Connection", Optional[_T]], bytes] 

301_PassphraseCallback = Callable[[int, bool, Optional[_T]], bytes] 

302_VerifyCallback = Callable[["Connection", X509, int, int, int], bool] 

303 

304 

305class X509VerificationCodes: 

306 """ 

307 Success and error codes for X509 verification, as returned by the 

308 underlying ``X509_STORE_CTX_get_error()`` function and passed by pyOpenSSL 

309 to verification callback functions. 

310 

311 See `OpenSSL Verification Errors 

312 <https://www.openssl.org/docs/manmaster/man3/X509_verify_cert_error_string.html#ERROR-CODES>`_ 

313 for details. 

314 """ 

315 

316 OK = _lib.X509_V_OK 

317 ERR_UNABLE_TO_GET_ISSUER_CERT = _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT 

318 ERR_UNABLE_TO_GET_CRL = _lib.X509_V_ERR_UNABLE_TO_GET_CRL 

319 ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE = ( 

320 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE 

321 ) 

322 ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE = ( 

323 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE 

324 ) 

325 ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY = ( 

326 _lib.X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY 

327 ) 

328 ERR_CERT_SIGNATURE_FAILURE = _lib.X509_V_ERR_CERT_SIGNATURE_FAILURE 

329 ERR_CRL_SIGNATURE_FAILURE = _lib.X509_V_ERR_CRL_SIGNATURE_FAILURE 

330 ERR_CERT_NOT_YET_VALID = _lib.X509_V_ERR_CERT_NOT_YET_VALID 

331 ERR_CERT_HAS_EXPIRED = _lib.X509_V_ERR_CERT_HAS_EXPIRED 

332 ERR_CRL_NOT_YET_VALID = _lib.X509_V_ERR_CRL_NOT_YET_VALID 

333 ERR_CRL_HAS_EXPIRED = _lib.X509_V_ERR_CRL_HAS_EXPIRED 

334 ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD = ( 

335 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD 

336 ) 

337 ERR_ERROR_IN_CERT_NOT_AFTER_FIELD = ( 

338 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD 

339 ) 

340 ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD = ( 

341 _lib.X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD 

342 ) 

343 ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD = ( 

344 _lib.X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD 

345 ) 

346 ERR_OUT_OF_MEM = _lib.X509_V_ERR_OUT_OF_MEM 

347 ERR_DEPTH_ZERO_SELF_SIGNED_CERT = ( 

348 _lib.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT 

349 ) 

350 ERR_SELF_SIGNED_CERT_IN_CHAIN = _lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN 

351 ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY = ( 

352 _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY 

353 ) 

354 ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE = ( 

355 _lib.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE 

356 ) 

357 ERR_CERT_CHAIN_TOO_LONG = _lib.X509_V_ERR_CERT_CHAIN_TOO_LONG 

358 ERR_CERT_REVOKED = _lib.X509_V_ERR_CERT_REVOKED 

359 ERR_INVALID_CA = _lib.X509_V_ERR_INVALID_CA 

360 ERR_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PATH_LENGTH_EXCEEDED 

361 ERR_INVALID_PURPOSE = _lib.X509_V_ERR_INVALID_PURPOSE 

362 ERR_CERT_UNTRUSTED = _lib.X509_V_ERR_CERT_UNTRUSTED 

363 ERR_CERT_REJECTED = _lib.X509_V_ERR_CERT_REJECTED 

364 ERR_SUBJECT_ISSUER_MISMATCH = _lib.X509_V_ERR_SUBJECT_ISSUER_MISMATCH 

365 ERR_AKID_SKID_MISMATCH = _lib.X509_V_ERR_AKID_SKID_MISMATCH 

366 ERR_AKID_ISSUER_SERIAL_MISMATCH = ( 

367 _lib.X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH 

368 ) 

369 ERR_KEYUSAGE_NO_CERTSIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CERTSIGN 

370 ERR_UNABLE_TO_GET_CRL_ISSUER = _lib.X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER 

371 ERR_UNHANDLED_CRITICAL_EXTENSION = ( 

372 _lib.X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION 

373 ) 

374 ERR_KEYUSAGE_NO_CRL_SIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CRL_SIGN 

375 ERR_UNHANDLED_CRITICAL_CRL_EXTENSION = ( 

376 _lib.X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION 

377 ) 

378 ERR_INVALID_NON_CA = _lib.X509_V_ERR_INVALID_NON_CA 

379 ERR_PROXY_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED 

380 ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE = ( 

381 _lib.X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE 

382 ) 

383 ERR_PROXY_CERTIFICATES_NOT_ALLOWED = ( 

384 _lib.X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED 

385 ) 

386 ERR_INVALID_EXTENSION = _lib.X509_V_ERR_INVALID_EXTENSION 

387 ERR_INVALID_POLICY_EXTENSION = _lib.X509_V_ERR_INVALID_POLICY_EXTENSION 

388 ERR_NO_EXPLICIT_POLICY = _lib.X509_V_ERR_NO_EXPLICIT_POLICY 

389 ERR_DIFFERENT_CRL_SCOPE = _lib.X509_V_ERR_DIFFERENT_CRL_SCOPE 

390 ERR_UNSUPPORTED_EXTENSION_FEATURE = ( 

391 _lib.X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE 

392 ) 

393 ERR_UNNESTED_RESOURCE = _lib.X509_V_ERR_UNNESTED_RESOURCE 

394 ERR_PERMITTED_VIOLATION = _lib.X509_V_ERR_PERMITTED_VIOLATION 

395 ERR_EXCLUDED_VIOLATION = _lib.X509_V_ERR_EXCLUDED_VIOLATION 

396 ERR_SUBTREE_MINMAX = _lib.X509_V_ERR_SUBTREE_MINMAX 

397 ERR_UNSUPPORTED_CONSTRAINT_TYPE = ( 

398 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE 

399 ) 

400 ERR_UNSUPPORTED_CONSTRAINT_SYNTAX = ( 

401 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX 

402 ) 

403 ERR_UNSUPPORTED_NAME_SYNTAX = _lib.X509_V_ERR_UNSUPPORTED_NAME_SYNTAX 

404 ERR_CRL_PATH_VALIDATION_ERROR = _lib.X509_V_ERR_CRL_PATH_VALIDATION_ERROR 

405 ERR_HOSTNAME_MISMATCH = _lib.X509_V_ERR_HOSTNAME_MISMATCH 

406 ERR_EMAIL_MISMATCH = _lib.X509_V_ERR_EMAIL_MISMATCH 

407 ERR_IP_ADDRESS_MISMATCH = _lib.X509_V_ERR_IP_ADDRESS_MISMATCH 

408 ERR_APPLICATION_VERIFICATION = _lib.X509_V_ERR_APPLICATION_VERIFICATION 

409 

410 

411# Taken from https://golang.org/src/crypto/x509/root_linux.go 

412_CERTIFICATE_FILE_LOCATIONS = [ 

413 "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc. 

414 "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6 

415 "/etc/ssl/ca-bundle.pem", # OpenSUSE 

416 "/etc/pki/tls/cacert.pem", # OpenELEC 

417 "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7 

418] 

419 

420_CERTIFICATE_PATH_LOCATIONS = [ 

421 "/etc/ssl/certs", # SLES10/SLES11 

422] 

423 

424# These values are compared to output from cffi's ffi.string so they must be 

425# byte strings. 

426_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs" 

427_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem" 

428 

429 

430class Error(Exception): 

431 """ 

432 An error occurred in an `OpenSSL.SSL` API. 

433 """ 

434 

435 

436_raise_current_error = partial(_exception_from_error_queue, Error) 

437_openssl_assert = _make_assert(Error) 

438 

439 

440class WantReadError(Error): 

441 pass 

442 

443 

444class WantWriteError(Error): 

445 pass 

446 

447 

448class WantX509LookupError(Error): 

449 pass 

450 

451 

452class ZeroReturnError(Error): 

453 pass 

454 

455 

456class SysCallError(Error): 

457 pass 

458 

459 

460class _CallbackExceptionHelper: 

461 """ 

462 A base class for wrapper classes that allow for intelligent exception 

463 handling in OpenSSL callbacks. 

464 

465 :ivar list _problems: Any exceptions that occurred while executing in a 

466 context where they could not be raised in the normal way. Typically 

467 this is because OpenSSL has called into some Python code and requires a 

468 return value. The exceptions are saved to be raised later when it is 

469 possible to do so. 

470 """ 

471 

472 def __init__(self) -> None: 

473 self._problems: list[Exception] = [] 

474 

475 def raise_if_problem(self) -> None: 

476 """ 

477 Raise an exception from the OpenSSL error queue or that was previously 

478 captured whe running a callback. 

479 """ 

480 if self._problems: 

481 try: 

482 _raise_current_error() 

483 except Error: 

484 pass 

485 raise self._problems.pop(0) 

486 

487 

488class _VerifyHelper(_CallbackExceptionHelper): 

489 """ 

490 Wrap a callback such that it can be used as a certificate verification 

491 callback. 

492 """ 

493 

494 def __init__(self, callback: _VerifyCallback) -> None: 

495 _CallbackExceptionHelper.__init__(self) 

496 

497 @wraps(callback) 

498 def wrapper(ok, store_ctx): # type: ignore[no-untyped-def] 

499 x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx) 

500 _lib.X509_up_ref(x509) 

501 cert = X509._from_raw_x509_ptr(x509) 

502 error_number = _lib.X509_STORE_CTX_get_error(store_ctx) 

503 error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx) 

504 

505 index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx() 

506 ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index) 

507 connection = Connection._reverse_mapping[ssl] 

508 

509 try: 

510 result = callback( 

511 connection, cert, error_number, error_depth, ok 

512 ) 

513 except Exception as e: 

514 self._problems.append(e) 

515 return 0 

516 else: 

517 if result: 

518 _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK) 

519 return 1 

520 else: 

521 return 0 

522 

523 self.callback = _ffi.callback( 

524 "int (*)(int, X509_STORE_CTX *)", wrapper 

525 ) 

526 

527 

528class _ALPNSelectHelper(_CallbackExceptionHelper): 

529 """ 

530 Wrap a callback such that it can be used as an ALPN selection callback. 

531 """ 

532 

533 def __init__(self, callback: _ALPNSelectCallback) -> None: 

534 _CallbackExceptionHelper.__init__(self) 

535 

536 @wraps(callback) 

537 def wrapper(ssl, out, outlen, in_, inlen, arg): # type: ignore[no-untyped-def] 

538 try: 

539 conn = Connection._reverse_mapping[ssl] 

540 

541 # The string passed to us is made up of multiple 

542 # length-prefixed bytestrings. We need to split that into a 

543 # list. 

544 instr = _ffi.buffer(in_, inlen)[:] 

545 protolist = [] 

546 while instr: 

547 encoded_len = instr[0] 

548 proto = instr[1 : encoded_len + 1] 

549 protolist.append(proto) 

550 instr = instr[encoded_len + 1 :] 

551 

552 # Call the callback 

553 outbytes = callback(conn, protolist) 

554 any_accepted = True 

555 if outbytes is NO_OVERLAPPING_PROTOCOLS: 

556 outbytes = b"" 

557 any_accepted = False 

558 elif not isinstance(outbytes, bytes): 

559 raise TypeError( 

560 "ALPN callback must return a bytestring or the " 

561 "special NO_OVERLAPPING_PROTOCOLS sentinel value." 

562 ) 

563 

564 # Save our callback arguments on the connection object to make 

565 # sure that they don't get freed before OpenSSL can use them. 

566 # Then, return them in the appropriate output parameters. 

567 conn._alpn_select_callback_args = [ 

568 _ffi.new("unsigned char *", len(outbytes)), 

569 _ffi.new("unsigned char[]", outbytes), 

570 ] 

571 outlen[0] = conn._alpn_select_callback_args[0][0] 

572 out[0] = conn._alpn_select_callback_args[1] 

573 if not any_accepted: 

574 return _lib.SSL_TLSEXT_ERR_NOACK 

575 return _lib.SSL_TLSEXT_ERR_OK 

576 except Exception as e: 

577 self._problems.append(e) 

578 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL 

579 

580 self.callback = _ffi.callback( 

581 ( 

582 "int (*)(SSL *, unsigned char **, unsigned char *, " 

583 "const unsigned char *, unsigned int, void *)" 

584 ), 

585 wrapper, 

586 ) 

587 

588 

589class _OCSPServerCallbackHelper(_CallbackExceptionHelper): 

590 """ 

591 Wrap a callback such that it can be used as an OCSP callback for the server 

592 side. 

593 

594 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different 

595 ways. For servers, that callback is expected to retrieve some OCSP data and 

596 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, 

597 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback 

598 is expected to check the OCSP data, and returns a negative value on error, 

599 0 if the response is not acceptable, or positive if it is. These are 

600 mutually exclusive return code behaviours, and they mean that we need two 

601 helpers so that we always return an appropriate error code if the user's 

602 code throws an exception. 

603 

604 Given that we have to have two helpers anyway, these helpers are a bit more 

605 helpery than most: specifically, they hide a few more of the OpenSSL 

606 functions so that the user has an easier time writing these callbacks. 

607 

608 This helper implements the server side. 

609 """ 

610 

611 def __init__(self, callback: _OCSPServerCallback[Any]) -> None: 

612 _CallbackExceptionHelper.__init__(self) 

613 

614 @wraps(callback) 

615 def wrapper(ssl, cdata): # type: ignore[no-untyped-def] 

616 try: 

617 conn = Connection._reverse_mapping[ssl] 

618 

619 # Extract the data if any was provided. 

620 if cdata != _ffi.NULL: 

621 data = _ffi.from_handle(cdata) 

622 else: 

623 data = None 

624 

625 # Call the callback. 

626 ocsp_data = callback(conn, data) 

627 

628 if not isinstance(ocsp_data, bytes): 

629 raise TypeError("OCSP callback must return a bytestring.") 

630 

631 # If the OCSP data was provided, we will pass it to OpenSSL. 

632 # However, we have an early exit here: if no OCSP data was 

633 # provided we will just exit out and tell OpenSSL that there 

634 # is nothing to do. 

635 if not ocsp_data: 

636 return 3 # SSL_TLSEXT_ERR_NOACK 

637 

638 # OpenSSL takes ownership of this data and expects it to have 

639 # been allocated by OPENSSL_malloc. 

640 ocsp_data_length = len(ocsp_data) 

641 data_ptr = _lib.OPENSSL_malloc(ocsp_data_length) 

642 _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data 

643 

644 _lib.SSL_set_tlsext_status_ocsp_resp( 

645 ssl, data_ptr, ocsp_data_length 

646 ) 

647 

648 return 0 

649 except Exception as e: 

650 self._problems.append(e) 

651 return 2 # SSL_TLSEXT_ERR_ALERT_FATAL 

652 

653 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) 

654 

655 

656class _OCSPClientCallbackHelper(_CallbackExceptionHelper): 

657 """ 

658 Wrap a callback such that it can be used as an OCSP callback for the client 

659 side. 

660 

661 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different 

662 ways. For servers, that callback is expected to retrieve some OCSP data and 

663 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK, 

664 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback 

665 is expected to check the OCSP data, and returns a negative value on error, 

666 0 if the response is not acceptable, or positive if it is. These are 

667 mutually exclusive return code behaviours, and they mean that we need two 

668 helpers so that we always return an appropriate error code if the user's 

669 code throws an exception. 

670 

671 Given that we have to have two helpers anyway, these helpers are a bit more 

672 helpery than most: specifically, they hide a few more of the OpenSSL 

673 functions so that the user has an easier time writing these callbacks. 

674 

675 This helper implements the client side. 

676 """ 

677 

678 def __init__(self, callback: _OCSPClientCallback[Any]) -> None: 

679 _CallbackExceptionHelper.__init__(self) 

680 

681 @wraps(callback) 

682 def wrapper(ssl, cdata): # type: ignore[no-untyped-def] 

683 try: 

684 conn = Connection._reverse_mapping[ssl] 

685 

686 # Extract the data if any was provided. 

687 if cdata != _ffi.NULL: 

688 data = _ffi.from_handle(cdata) 

689 else: 

690 data = None 

691 

692 # Get the OCSP data. 

693 ocsp_ptr = _ffi.new("unsigned char **") 

694 ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr) 

695 if ocsp_len < 0: 

696 # No OCSP data. 

697 ocsp_data = b"" 

698 else: 

699 # Copy the OCSP data, then pass it to the callback. 

700 ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:] 

701 

702 valid = callback(conn, ocsp_data, data) 

703 

704 # Return 1 on success or 0 on error. 

705 return int(bool(valid)) 

706 

707 except Exception as e: 

708 self._problems.append(e) 

709 # Return negative value if an exception is hit. 

710 return -1 

711 

712 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) 

713 

714 

715class _CookieGenerateCallbackHelper(_CallbackExceptionHelper): 

716 def __init__(self, callback: _CookieGenerateCallback) -> None: 

717 _CallbackExceptionHelper.__init__(self) 

718 

719 @wraps(callback) 

720 def wrapper(ssl, out, outlen): # type: ignore[no-untyped-def] 

721 try: 

722 conn = Connection._reverse_mapping[ssl] 

723 cookie = callback(conn) 

724 out[0 : len(cookie)] = cookie 

725 outlen[0] = len(cookie) 

726 return 1 

727 except Exception as e: 

728 self._problems.append(e) 

729 # "a zero return value can be used to abort the handshake" 

730 return 0 

731 

732 self.callback = _ffi.callback( 

733 "int (*)(SSL *, unsigned char *, unsigned int *)", 

734 wrapper, 

735 ) 

736 

737 

738class _CookieVerifyCallbackHelper(_CallbackExceptionHelper): 

739 def __init__(self, callback: _CookieVerifyCallback) -> None: 

740 _CallbackExceptionHelper.__init__(self) 

741 

742 @wraps(callback) 

743 def wrapper(ssl, c_cookie, cookie_len): # type: ignore[no-untyped-def] 

744 try: 

745 conn = Connection._reverse_mapping[ssl] 

746 return callback(conn, bytes(c_cookie[0:cookie_len])) 

747 except Exception as e: 

748 self._problems.append(e) 

749 return 0 

750 

751 self.callback = _ffi.callback( 

752 "int (*)(SSL *, unsigned char *, unsigned int)", 

753 wrapper, 

754 ) 

755 

756 

757def _asFileDescriptor(obj: Any) -> int: 

758 fd = None 

759 if not isinstance(obj, int): 

760 meth = getattr(obj, "fileno", None) 

761 if meth is not None: 

762 obj = meth() 

763 

764 if isinstance(obj, int): 

765 fd = obj 

766 

767 if not isinstance(fd, int): 

768 raise TypeError("argument must be an int, or have a fileno() method.") 

769 elif fd < 0: 

770 raise ValueError( 

771 f"file descriptor cannot be a negative integer ({fd:i})" 

772 ) 

773 

774 return fd 

775 

776 

777def OpenSSL_version(type: int) -> bytes: 

778 """ 

779 Return a string describing the version of OpenSSL in use. 

780 

781 :param type: One of the :const:`OPENSSL_` constants defined in this module. 

782 """ 

783 return _ffi.string(_lib.OpenSSL_version(type)) 

784 

785 

786SSLeay_version = OpenSSL_version 

787 

788 

789def _make_requires(flag: int, error: str) -> Callable[[_T], _T]: 

790 """ 

791 Builds a decorator that ensures that functions that rely on OpenSSL 

792 functions that are not present in this build raise NotImplementedError, 

793 rather than AttributeError coming out of cryptography. 

794 

795 :param flag: A cryptography flag that guards the functions, e.g. 

796 ``Cryptography_HAS_NEXTPROTONEG``. 

797 :param error: The string to be used in the exception if the flag is false. 

798 """ 

799 

800 def _requires_decorator(func): # type: ignore[no-untyped-def] 

801 if not flag: 

802 

803 @wraps(func) 

804 def explode(*args, **kwargs): # type: ignore[no-untyped-def] 

805 raise NotImplementedError(error) 

806 

807 return explode 

808 else: 

809 return func 

810 

811 return _requires_decorator 

812 

813 

814_requires_keylog = _make_requires( 

815 getattr(_lib, "Cryptography_HAS_KEYLOG", 0), "Key logging not available" 

816) 

817 

818 

819class Session: 

820 """ 

821 A class representing an SSL session. A session defines certain connection 

822 parameters which may be re-used to speed up the setup of subsequent 

823 connections. 

824 

825 .. versionadded:: 0.14 

826 """ 

827 

828 _session: Any 

829 

830 

831F = TypeVar("F", bound=Callable[..., Any]) 

832 

833 

834def _require_not_used(f: F) -> F: 

835 @wraps(f) 

836 def inner(self: Context, *args: Any, **kwargs: Any) -> Any: 

837 if self._used: 

838 warnings.warn( 

839 ( 

840 "Attempting to mutate a Context after a Connection was " 

841 "created. In the future, this will raise an exception" 

842 ), 

843 DeprecationWarning, 

844 stacklevel=2, 

845 ) 

846 return f(self, *args, **kwargs) 

847 

848 return typing.cast(F, inner) 

849 

850 

851class Context: 

852 """ 

853 :class:`OpenSSL.SSL.Context` instances define the parameters for setting 

854 up new SSL connections. 

855 

856 :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD, 

857 DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD. 

858 SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should 

859 not be used. 

860 """ 

861 

862 _methods: typing.ClassVar[ 

863 dict[int, tuple[Callable[[], Any], int | None]] 

864 ] = { 

865 SSLv23_METHOD: (_lib.TLS_method, None), 

866 TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION), 

867 TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION), 

868 TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION), 

869 TLS_METHOD: (_lib.TLS_method, None), 

870 TLS_SERVER_METHOD: (_lib.TLS_server_method, None), 

871 TLS_CLIENT_METHOD: (_lib.TLS_client_method, None), 

872 DTLS_METHOD: (_lib.DTLS_method, None), 

873 DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None), 

874 DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None), 

875 } 

876 

877 def __init__(self, method: int) -> None: 

878 if not isinstance(method, int): 

879 raise TypeError("method must be an integer") 

880 

881 try: 

882 method_func, version = self._methods[method] 

883 except KeyError: 

884 raise ValueError("No such protocol") 

885 

886 method_obj = method_func() 

887 _openssl_assert(method_obj != _ffi.NULL) 

888 

889 context = _lib.SSL_CTX_new(method_obj) 

890 _openssl_assert(context != _ffi.NULL) 

891 context = _ffi.gc(context, _lib.SSL_CTX_free) 

892 

893 self._context = context 

894 self._used = False 

895 self._passphrase_helper: _PassphraseHelper | None = None 

896 self._passphrase_callback: _PassphraseCallback[Any] | None = None 

897 self._passphrase_userdata: Any | None = None 

898 self._verify_helper: _VerifyHelper | None = None 

899 self._verify_callback: _VerifyCallback | None = None 

900 self._info_callback = None 

901 self._keylog_callback = None 

902 self._tlsext_servername_callback = None 

903 self._app_data = None 

904 self._alpn_select_helper: _ALPNSelectHelper | None = None 

905 self._alpn_select_callback: _ALPNSelectCallback | None = None 

906 self._ocsp_helper: ( 

907 _OCSPClientCallbackHelper | _OCSPServerCallbackHelper | None 

908 ) = None 

909 self._ocsp_callback: ( 

910 _OCSPClientCallback[Any] | _OCSPServerCallback[Any] | None 

911 ) = None 

912 self._ocsp_data: Any | None = None 

913 self._cookie_generate_helper: _CookieGenerateCallbackHelper | None = ( 

914 None 

915 ) 

916 self._cookie_verify_helper: _CookieVerifyCallbackHelper | None = None 

917 

918 self.set_mode( 

919 _lib.SSL_MODE_ENABLE_PARTIAL_WRITE 

920 | _lib.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER 

921 ) 

922 if version is not None: 

923 self.set_min_proto_version(version) 

924 self.set_max_proto_version(version) 

925 

926 @_require_not_used 

927 def set_min_proto_version(self, version: int) -> None: 

928 """ 

929 Set the minimum supported protocol version. Setting the minimum 

930 version to 0 will enable protocol versions down to the lowest version 

931 supported by the library. 

932 

933 If the underlying OpenSSL build is missing support for the selected 

934 version, this method will raise an exception. 

935 """ 

936 _openssl_assert( 

937 _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1 

938 ) 

939 

940 @_require_not_used 

941 def set_max_proto_version(self, version: int) -> None: 

942 """ 

943 Set the maximum supported protocol version. Setting the maximum 

944 version to 0 will enable protocol versions up to the highest version 

945 supported by the library. 

946 

947 If the underlying OpenSSL build is missing support for the selected 

948 version, this method will raise an exception. 

949 """ 

950 _openssl_assert( 

951 _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1 

952 ) 

953 

954 @_require_not_used 

955 def load_verify_locations( 

956 self, 

957 cafile: _StrOrBytesPath | None, 

958 capath: _StrOrBytesPath | None = None, 

959 ) -> None: 

960 """ 

961 Let SSL know where we can find trusted certificates for the certificate 

962 chain. Note that the certificates have to be in PEM format. 

963 

964 If capath is passed, it must be a directory prepared using the 

965 ``c_rehash`` tool included with OpenSSL. Either, but not both, of 

966 *pemfile* or *capath* may be :data:`None`. 

967 

968 :param cafile: In which file we can find the certificates (``bytes`` or 

969 ``str``). 

970 :param capath: In which directory we can find the certificates 

971 (``bytes`` or ``str``). 

972 

973 :return: None 

974 """ 

975 if cafile is None: 

976 cafile = _ffi.NULL 

977 else: 

978 cafile = _path_bytes(cafile) 

979 

980 if capath is None: 

981 capath = _ffi.NULL 

982 else: 

983 capath = _path_bytes(capath) 

984 

985 load_result = _lib.SSL_CTX_load_verify_locations( 

986 self._context, cafile, capath 

987 ) 

988 if not load_result: 

989 _raise_current_error() 

990 

991 def _wrap_callback( 

992 self, callback: _PassphraseCallback[_T] 

993 ) -> _PassphraseHelper: 

994 @wraps(callback) 

995 def wrapper(size: int, verify: bool, userdata: Any) -> bytes: 

996 return callback(size, verify, self._passphrase_userdata) 

997 

998 return _PassphraseHelper( 

999 FILETYPE_PEM, wrapper, more_args=True, truncate=True 

1000 ) 

1001 

1002 @_require_not_used 

1003 def set_passwd_cb( 

1004 self, 

1005 callback: _PassphraseCallback[_T], 

1006 userdata: _T | None = None, 

1007 ) -> None: 

1008 """ 

1009 Set the passphrase callback. This function will be called 

1010 when a private key with a passphrase is loaded. 

1011 

1012 :param callback: The Python callback to use. This must accept three 

1013 positional arguments. First, an integer giving the maximum length 

1014 of the passphrase it may return. If the returned passphrase is 

1015 longer than this, it will be truncated. Second, a boolean value 

1016 which will be true if the user should be prompted for the 

1017 passphrase twice and the callback should verify that the two values 

1018 supplied are equal. Third, the value given as the *userdata* 

1019 parameter to :meth:`set_passwd_cb`. The *callback* must return 

1020 a byte string. If an error occurs, *callback* should return a false 

1021 value (e.g. an empty string). 

1022 :param userdata: (optional) A Python object which will be given as 

1023 argument to the callback 

1024 :return: None 

1025 """ 

1026 if not callable(callback): 

1027 raise TypeError("callback must be callable") 

1028 

1029 self._passphrase_helper = self._wrap_callback(callback) 

1030 self._passphrase_callback = self._passphrase_helper.callback 

1031 _lib.SSL_CTX_set_default_passwd_cb( 

1032 self._context, self._passphrase_callback 

1033 ) 

1034 self._passphrase_userdata = userdata 

1035 

1036 @_require_not_used 

1037 def set_default_verify_paths(self) -> None: 

1038 """ 

1039 Specify that the platform provided CA certificates are to be used for 

1040 verification purposes. This method has some caveats related to the 

1041 binary wheels that cryptography (pyOpenSSL's primary dependency) ships: 

1042 

1043 * macOS will only load certificates using this method if the user has 

1044 the ``openssl@1.1`` `Homebrew <https://brew.sh>`_ formula installed 

1045 in the default location. 

1046 * Windows will not work. 

1047 * manylinux cryptography wheels will work on most common Linux 

1048 distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the 

1049 manylinux wheel and attempts to load roots via a fallback path. 

1050 

1051 :return: None 

1052 """ 

1053 # SSL_CTX_set_default_verify_paths will attempt to load certs from 

1054 # both a cafile and capath that are set at compile time. However, 

1055 # it will first check environment variables and, if present, load 

1056 # those paths instead 

1057 set_result = _lib.SSL_CTX_set_default_verify_paths(self._context) 

1058 _openssl_assert(set_result == 1) 

1059 # After attempting to set default_verify_paths we need to know whether 

1060 # to go down the fallback path. 

1061 # First we'll check to see if any env vars have been set. If so, 

1062 # we won't try to do anything else because the user has set the path 

1063 # themselves. 

1064 if not self._check_env_vars_set("SSL_CERT_DIR", "SSL_CERT_FILE"): 

1065 default_dir = _ffi.string(_lib.X509_get_default_cert_dir()) 

1066 default_file = _ffi.string(_lib.X509_get_default_cert_file()) 

1067 # Now we check to see if the default_dir and default_file are set 

1068 # to the exact values we use in our manylinux builds. If they are 

1069 # then we know to load the fallbacks 

1070 if ( 

1071 default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR 

1072 and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE 

1073 ): 

1074 # This is manylinux, let's load our fallback paths 

1075 self._fallback_default_verify_paths( 

1076 _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS 

1077 ) 

1078 

1079 def _check_env_vars_set(self, dir_env_var: str, file_env_var: str) -> bool: 

1080 """ 

1081 Check to see if the default cert dir/file environment vars are present. 

1082 

1083 :return: bool 

1084 """ 

1085 return ( 

1086 os.environ.get(file_env_var) is not None 

1087 or os.environ.get(dir_env_var) is not None 

1088 ) 

1089 

1090 def _fallback_default_verify_paths( 

1091 self, file_path: list[str], dir_path: list[str] 

1092 ) -> None: 

1093 """ 

1094 Default verify paths are based on the compiled version of OpenSSL. 

1095 However, when pyca/cryptography is compiled as a manylinux wheel 

1096 that compiled location can potentially be wrong. So, like Go, we 

1097 will try a predefined set of paths and attempt to load roots 

1098 from there. 

1099 

1100 :return: None 

1101 """ 

1102 for cafile in file_path: 

1103 if os.path.isfile(cafile): 

1104 self.load_verify_locations(cafile) 

1105 break 

1106 

1107 for capath in dir_path: 

1108 if os.path.isdir(capath): 

1109 self.load_verify_locations(None, capath) 

1110 break 

1111 

1112 @_require_not_used 

1113 def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None: 

1114 """ 

1115 Load a certificate chain from a file. 

1116 

1117 :param certfile: The name of the certificate chain file (``bytes`` or 

1118 ``str``). Must be PEM encoded. 

1119 

1120 :return: None 

1121 """ 

1122 certfile = _path_bytes(certfile) 

1123 

1124 result = _lib.SSL_CTX_use_certificate_chain_file( 

1125 self._context, certfile 

1126 ) 

1127 if not result: 

1128 _raise_current_error() 

1129 

1130 @_require_not_used 

1131 def use_certificate_file( 

1132 self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM 

1133 ) -> None: 

1134 """ 

1135 Load a certificate from a file 

1136 

1137 :param certfile: The name of the certificate file (``bytes`` or 

1138 ``str``). 

1139 :param filetype: (optional) The encoding of the file, which is either 

1140 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is 

1141 :const:`FILETYPE_PEM`. 

1142 

1143 :return: None 

1144 """ 

1145 certfile = _path_bytes(certfile) 

1146 if not isinstance(filetype, int): 

1147 raise TypeError("filetype must be an integer") 

1148 

1149 use_result = _lib.SSL_CTX_use_certificate_file( 

1150 self._context, certfile, filetype 

1151 ) 

1152 if not use_result: 

1153 _raise_current_error() 

1154 

1155 @_require_not_used 

1156 def use_certificate(self, cert: X509 | x509.Certificate) -> None: 

1157 """ 

1158 Load a certificate from a X509 object 

1159 

1160 :param cert: The X509 object 

1161 :return: None 

1162 """ 

1163 # Mirrored at Connection.use_certificate 

1164 if not isinstance(cert, X509): 

1165 cert = X509.from_cryptography(cert) 

1166 else: 

1167 warnings.warn( 

1168 ( 

1169 "Passing pyOpenSSL X509 objects is deprecated. You " 

1170 "should use a cryptography.x509.Certificate instead." 

1171 ), 

1172 DeprecationWarning, 

1173 stacklevel=2, 

1174 ) 

1175 

1176 use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509) 

1177 if not use_result: 

1178 _raise_current_error() 

1179 

1180 @_require_not_used 

1181 def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None: 

1182 """ 

1183 Add certificate to chain 

1184 

1185 :param certobj: The X509 certificate object to add to the chain 

1186 :return: None 

1187 """ 

1188 if not isinstance(certobj, X509): 

1189 certobj = X509.from_cryptography(certobj) 

1190 else: 

1191 warnings.warn( 

1192 ( 

1193 "Passing pyOpenSSL X509 objects is deprecated. You " 

1194 "should use a cryptography.x509.Certificate instead." 

1195 ), 

1196 DeprecationWarning, 

1197 stacklevel=2, 

1198 ) 

1199 

1200 copy = _lib.X509_dup(certobj._x509) 

1201 add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy) 

1202 if not add_result: 

1203 # TODO: This is untested. 

1204 _lib.X509_free(copy) 

1205 _raise_current_error() 

1206 

1207 def _raise_passphrase_exception(self) -> None: 

1208 if self._passphrase_helper is not None: 

1209 self._passphrase_helper.raise_if_problem(Error) 

1210 

1211 _raise_current_error() 

1212 

1213 @_require_not_used 

1214 def use_privatekey_file( 

1215 self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM 

1216 ) -> None: 

1217 """ 

1218 Load a private key from a file 

1219 

1220 :param keyfile: The name of the key file (``bytes`` or ``str``) 

1221 :param filetype: (optional) The encoding of the file, which is either 

1222 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is 

1223 :const:`FILETYPE_PEM`. 

1224 

1225 :return: None 

1226 """ 

1227 keyfile = _path_bytes(keyfile) 

1228 

1229 if not isinstance(filetype, int): 

1230 raise TypeError("filetype must be an integer") 

1231 

1232 use_result = _lib.SSL_CTX_use_PrivateKey_file( 

1233 self._context, keyfile, filetype 

1234 ) 

1235 if not use_result: 

1236 self._raise_passphrase_exception() 

1237 

1238 @_require_not_used 

1239 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: 

1240 """ 

1241 Load a private key from a PKey object 

1242 

1243 :param pkey: The PKey object 

1244 :return: None 

1245 """ 

1246 # Mirrored at Connection.use_privatekey 

1247 if not isinstance(pkey, PKey): 

1248 pkey = PKey.from_cryptography_key(pkey) 

1249 else: 

1250 warnings.warn( 

1251 ( 

1252 "Passing pyOpenSSL PKey objects is deprecated. You " 

1253 "should use a cryptography private key instead." 

1254 ), 

1255 DeprecationWarning, 

1256 stacklevel=2, 

1257 ) 

1258 

1259 use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey) 

1260 if not use_result: 

1261 self._raise_passphrase_exception() 

1262 

1263 def check_privatekey(self) -> None: 

1264 """ 

1265 Check if the private key (loaded with :meth:`use_privatekey`) matches 

1266 the certificate (loaded with :meth:`use_certificate`) 

1267 

1268 :return: :data:`None` (raises :exc:`Error` if something's wrong) 

1269 """ 

1270 if not _lib.SSL_CTX_check_private_key(self._context): 

1271 _raise_current_error() 

1272 

1273 @_require_not_used 

1274 def load_client_ca(self, cafile: bytes) -> None: 

1275 """ 

1276 Load the trusted certificates that will be sent to the client. Does 

1277 not actually imply any of the certificates are trusted; that must be 

1278 configured separately. 

1279 

1280 :param bytes cafile: The path to a certificates file in PEM format. 

1281 :return: None 

1282 """ 

1283 ca_list = _lib.SSL_load_client_CA_file( 

1284 _text_to_bytes_and_warn("cafile", cafile) 

1285 ) 

1286 _openssl_assert(ca_list != _ffi.NULL) 

1287 _lib.SSL_CTX_set_client_CA_list(self._context, ca_list) 

1288 

1289 @_require_not_used 

1290 def set_session_id(self, buf: bytes) -> None: 

1291 """ 

1292 Set the session id to *buf* within which a session can be reused for 

1293 this Context object. This is needed when doing session resumption, 

1294 because there is no way for a stored session to know which Context 

1295 object it is associated with. 

1296 

1297 :param bytes buf: The session id. 

1298 

1299 :returns: None 

1300 """ 

1301 buf = _text_to_bytes_and_warn("buf", buf) 

1302 _openssl_assert( 

1303 _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf)) 

1304 == 1 

1305 ) 

1306 

1307 @_require_not_used 

1308 def set_session_cache_mode(self, mode: int) -> int: 

1309 """ 

1310 Set the behavior of the session cache used by all connections using 

1311 this Context. The previously set mode is returned. See 

1312 :const:`SESS_CACHE_*` for details about particular modes. 

1313 

1314 :param mode: One or more of the SESS_CACHE_* flags (combine using 

1315 bitwise or) 

1316 :returns: The previously set caching mode. 

1317 

1318 .. versionadded:: 0.14 

1319 """ 

1320 if not isinstance(mode, int): 

1321 raise TypeError("mode must be an integer") 

1322 

1323 return _lib.SSL_CTX_set_session_cache_mode(self._context, mode) 

1324 

1325 def get_session_cache_mode(self) -> int: 

1326 """ 

1327 Get the current session cache mode. 

1328 

1329 :returns: The currently used cache mode. 

1330 

1331 .. versionadded:: 0.14 

1332 """ 

1333 return _lib.SSL_CTX_get_session_cache_mode(self._context) 

1334 

1335 @_require_not_used 

1336 def set_verify( 

1337 self, mode: int, callback: _VerifyCallback | None = None 

1338 ) -> None: 

1339 """ 

1340 Set the verification flags for this Context object to *mode* and 

1341 specify that *callback* should be used for verification callbacks. 

1342 

1343 :param mode: The verify mode, this should be one of 

1344 :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If 

1345 :const:`VERIFY_PEER` is used, *mode* can be OR:ed with 

1346 :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and 

1347 :const:`VERIFY_CLIENT_ONCE` to further control the behaviour. 

1348 :param callback: The optional Python verification callback to use. 

1349 This should take five arguments: A Connection object, an X509 

1350 object, and three integer variables, which are in turn potential 

1351 error number, error depth and return code. *callback* should 

1352 return True if verification passes and False otherwise. 

1353 If omitted, OpenSSL's default verification is used. 

1354 :return: None 

1355 

1356 See SSL_CTX_set_verify(3SSL) for further details. 

1357 """ 

1358 if not isinstance(mode, int): 

1359 raise TypeError("mode must be an integer") 

1360 

1361 if callback is None: 

1362 self._verify_helper = None 

1363 self._verify_callback = None 

1364 _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL) 

1365 else: 

1366 if not callable(callback): 

1367 raise TypeError("callback must be callable") 

1368 

1369 self._verify_helper = _VerifyHelper(callback) 

1370 self._verify_callback = self._verify_helper.callback 

1371 _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback) 

1372 

1373 @_require_not_used 

1374 def set_verify_depth(self, depth: int) -> None: 

1375 """ 

1376 Set the maximum depth for the certificate chain verification that shall 

1377 be allowed for this Context object. 

1378 

1379 :param depth: An integer specifying the verify depth 

1380 :return: None 

1381 """ 

1382 if not isinstance(depth, int): 

1383 raise TypeError("depth must be an integer") 

1384 

1385 _lib.SSL_CTX_set_verify_depth(self._context, depth) 

1386 

1387 def get_verify_mode(self) -> int: 

1388 """ 

1389 Retrieve the Context object's verify mode, as set by 

1390 :meth:`set_verify`. 

1391 

1392 :return: The verify mode 

1393 """ 

1394 return _lib.SSL_CTX_get_verify_mode(self._context) 

1395 

1396 def get_verify_depth(self) -> int: 

1397 """ 

1398 Retrieve the Context object's verify depth, as set by 

1399 :meth:`set_verify_depth`. 

1400 

1401 :return: The verify depth 

1402 """ 

1403 return _lib.SSL_CTX_get_verify_depth(self._context) 

1404 

1405 @_require_not_used 

1406 def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None: 

1407 """ 

1408 Load parameters for Ephemeral Diffie-Hellman 

1409 

1410 :param dhfile: The file to load EDH parameters from (``bytes`` or 

1411 ``str``). 

1412 

1413 :return: None 

1414 """ 

1415 dhfile = _path_bytes(dhfile) 

1416 

1417 bio = _lib.BIO_new_file(dhfile, b"r") 

1418 if bio == _ffi.NULL: 

1419 _raise_current_error() 

1420 bio = _ffi.gc(bio, _lib.BIO_free) 

1421 

1422 dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 

1423 dh = _ffi.gc(dh, _lib.DH_free) 

1424 res = _lib.SSL_CTX_set_tmp_dh(self._context, dh) 

1425 _openssl_assert(res == 1) 

1426 

1427 @_require_not_used 

1428 def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None: 

1429 """ 

1430 Select a curve to use for ECDHE key exchange. 

1431 

1432 :param curve: A curve instance from cryptography 

1433 (:class:`~cryptogragraphy.hazmat.primitives.asymmetric.ec.EllipticCurve`). 

1434 Alternatively (deprecated) a curve object from either 

1435 :meth:`OpenSSL.crypto.get_elliptic_curve` or 

1436 :meth:`OpenSSL.crypto.get_elliptic_curves`. 

1437 

1438 :return: None 

1439 """ 

1440 

1441 if isinstance(curve, _EllipticCurve): 

1442 warnings.warn( 

1443 ( 

1444 "Passing pyOpenSSL elliptic curves to set_tmp_ecdh is " 

1445 "deprecated. You should use cryptography's elliptic curve " 

1446 "types instead." 

1447 ), 

1448 DeprecationWarning, 

1449 stacklevel=2, 

1450 ) 

1451 _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY()) 

1452 else: 

1453 name = curve.name 

1454 if name == "secp192r1": 

1455 name = "prime192v1" 

1456 elif name == "secp256r1": 

1457 name = "prime256v1" 

1458 nid = _lib.OBJ_txt2nid(name.encode()) 

1459 if nid == _lib.NID_undef: 

1460 _raise_current_error() 

1461 

1462 ec = _lib.EC_KEY_new_by_curve_name(nid) 

1463 _openssl_assert(ec != _ffi.NULL) 

1464 ec = _ffi.gc(ec, _lib.EC_KEY_free) 

1465 _lib.SSL_CTX_set_tmp_ecdh(self._context, ec) 

1466 

1467 @_require_not_used 

1468 def set_cipher_list(self, cipher_list: bytes) -> None: 

1469 """ 

1470 Set the list of ciphers to be used in this context. 

1471 

1472 See the OpenSSL manual for more information (e.g. 

1473 :manpage:`ciphers(1)`). 

1474 

1475 Note this API does not change the cipher suites used in TLS 1.3 

1476 Use `set_tls13_ciphersuites` for that. 

1477 

1478 :param bytes cipher_list: An OpenSSL cipher string. 

1479 :return: None 

1480 """ 

1481 cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list) 

1482 

1483 if not isinstance(cipher_list, bytes): 

1484 raise TypeError("cipher_list must be a byte string.") 

1485 

1486 _openssl_assert( 

1487 _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1 

1488 ) 

1489 

1490 @_require_not_used 

1491 def set_tls13_ciphersuites(self, ciphersuites: bytes) -> None: 

1492 """ 

1493 Set the list of TLS 1.3 ciphers to be used in this context. 

1494 OpenSSL maintains a separate list of TLS 1.3+ ciphers to 

1495 ciphers for TLS 1.2 and lowers. 

1496 

1497 See the OpenSSL manual for more information (e.g. 

1498 :manpage:`ciphers(1)`). 

1499 

1500 :param bytes ciphersuites: An OpenSSL cipher string containing 

1501 TLS 1.3+ ciphersuites. 

1502 :return: None 

1503 

1504 .. versionadded:: 25.2.0 

1505 """ 

1506 if not isinstance(ciphersuites, bytes): 

1507 raise TypeError("ciphersuites must be a byte string.") 

1508 

1509 _openssl_assert( 

1510 _lib.SSL_CTX_set_ciphersuites(self._context, ciphersuites) == 1 

1511 ) 

1512 

1513 @_require_not_used 

1514 def set_client_ca_list( 

1515 self, certificate_authorities: Sequence[X509Name] 

1516 ) -> None: 

1517 """ 

1518 Set the list of preferred client certificate signers for this server 

1519 context. 

1520 

1521 This list of certificate authorities will be sent to the client when 

1522 the server requests a client certificate. 

1523 

1524 :param certificate_authorities: a sequence of X509Names. 

1525 :return: None 

1526 

1527 .. versionadded:: 0.10 

1528 """ 

1529 name_stack = _lib.sk_X509_NAME_new_null() 

1530 _openssl_assert(name_stack != _ffi.NULL) 

1531 

1532 try: 

1533 for ca_name in certificate_authorities: 

1534 if not isinstance(ca_name, X509Name): 

1535 raise TypeError( 

1536 f"client CAs must be X509Name objects, not " 

1537 f"{type(ca_name).__name__} objects" 

1538 ) 

1539 copy = _lib.X509_NAME_dup(ca_name._name) 

1540 _openssl_assert(copy != _ffi.NULL) 

1541 push_result = _lib.sk_X509_NAME_push(name_stack, copy) 

1542 if not push_result: 

1543 _lib.X509_NAME_free(copy) 

1544 _raise_current_error() 

1545 except Exception: 

1546 _lib.sk_X509_NAME_free(name_stack) 

1547 raise 

1548 

1549 _lib.SSL_CTX_set_client_CA_list(self._context, name_stack) 

1550 

1551 @_require_not_used 

1552 def add_client_ca( 

1553 self, certificate_authority: X509 | x509.Certificate 

1554 ) -> None: 

1555 """ 

1556 Add the CA certificate to the list of preferred signers for this 

1557 context. 

1558 

1559 The list of certificate authorities will be sent to the client when the 

1560 server requests a client certificate. 

1561 

1562 :param certificate_authority: certificate authority's X509 certificate. 

1563 :return: None 

1564 

1565 .. versionadded:: 0.10 

1566 """ 

1567 if not isinstance(certificate_authority, X509): 

1568 certificate_authority = X509.from_cryptography( 

1569 certificate_authority 

1570 ) 

1571 else: 

1572 warnings.warn( 

1573 ( 

1574 "Passing pyOpenSSL X509 objects is deprecated. You " 

1575 "should use a cryptography.x509.Certificate instead." 

1576 ), 

1577 DeprecationWarning, 

1578 stacklevel=2, 

1579 ) 

1580 

1581 add_result = _lib.SSL_CTX_add_client_CA( 

1582 self._context, certificate_authority._x509 

1583 ) 

1584 _openssl_assert(add_result == 1) 

1585 

1586 @_require_not_used 

1587 def set_timeout(self, timeout: int) -> None: 

1588 """ 

1589 Set the timeout for newly created sessions for this Context object to 

1590 *timeout*. The default value is 300 seconds. See the OpenSSL manual 

1591 for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`). 

1592 

1593 :param timeout: The timeout in (whole) seconds 

1594 :return: The previous session timeout 

1595 """ 

1596 if not isinstance(timeout, int): 

1597 raise TypeError("timeout must be an integer") 

1598 

1599 return _lib.SSL_CTX_set_timeout(self._context, timeout) 

1600 

1601 def get_timeout(self) -> int: 

1602 """ 

1603 Retrieve session timeout, as set by :meth:`set_timeout`. The default 

1604 is 300 seconds. 

1605 

1606 :return: The session timeout 

1607 """ 

1608 return _lib.SSL_CTX_get_timeout(self._context) 

1609 

1610 @_require_not_used 

1611 def set_info_callback( 

1612 self, callback: Callable[[Connection, int, int], None] 

1613 ) -> None: 

1614 """ 

1615 Set the information callback to *callback*. This function will be 

1616 called from time to time during SSL handshakes. 

1617 

1618 :param callback: The Python callback to use. This should take three 

1619 arguments: a Connection object and two integers. The first integer 

1620 specifies where in the SSL handshake the function was called, and 

1621 the other the return code from a (possibly failed) internal 

1622 function call. 

1623 :return: None 

1624 """ 

1625 

1626 @wraps(callback) 

1627 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def] 

1628 callback(Connection._reverse_mapping[ssl], where, return_code) 

1629 

1630 self._info_callback = _ffi.callback( 

1631 "void (*)(const SSL *, int, int)", wrapper 

1632 ) 

1633 _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) 

1634 

1635 @_requires_keylog 

1636 @_require_not_used 

1637 def set_keylog_callback( 

1638 self, callback: Callable[[Connection, bytes], None] 

1639 ) -> None: 

1640 """ 

1641 Set the TLS key logging callback to *callback*. This function will be 

1642 called whenever TLS key material is generated or received, in order 

1643 to allow applications to store this keying material for debugging 

1644 purposes. 

1645 

1646 :param callback: The Python callback to use. This should take two 

1647 arguments: a Connection object and a bytestring that contains 

1648 the key material in the format used by NSS for its SSLKEYLOGFILE 

1649 debugging output. 

1650 :return: None 

1651 """ 

1652 

1653 @wraps(callback) 

1654 def wrapper(ssl, line): # type: ignore[no-untyped-def] 

1655 line = _ffi.string(line) 

1656 callback(Connection._reverse_mapping[ssl], line) 

1657 

1658 self._keylog_callback = _ffi.callback( 

1659 "void (*)(const SSL *, const char *)", wrapper 

1660 ) 

1661 _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback) 

1662 

1663 def get_app_data(self) -> Any: 

1664 """ 

1665 Get the application data (supplied via :meth:`set_app_data()`) 

1666 

1667 :return: The application data 

1668 """ 

1669 return self._app_data 

1670 

1671 @_require_not_used 

1672 def set_app_data(self, data: Any) -> None: 

1673 """ 

1674 Set the application data (will be returned from get_app_data()) 

1675 

1676 :param data: Any Python object 

1677 :return: None 

1678 """ 

1679 self._app_data = data 

1680 

1681 def get_cert_store(self) -> X509Store | None: 

1682 """ 

1683 Get the certificate store for the context. This can be used to add 

1684 "trusted" certificates without using the 

1685 :meth:`load_verify_locations` method. 

1686 

1687 :return: A X509Store object or None if it does not have one. 

1688 """ 

1689 store = _lib.SSL_CTX_get_cert_store(self._context) 

1690 if store == _ffi.NULL: 

1691 # TODO: This is untested. 

1692 return None 

1693 

1694 pystore = X509Store.__new__(X509Store) 

1695 pystore._store = store 

1696 return pystore 

1697 

1698 @_require_not_used 

1699 def set_options(self, options: int) -> int: 

1700 """ 

1701 Add options. Options set before are not cleared! 

1702 This method should be used with the :const:`OP_*` constants. 

1703 

1704 :param options: The options to add. 

1705 :return: The new option bitmask. 

1706 """ 

1707 if not isinstance(options, int): 

1708 raise TypeError("options must be an integer") 

1709 

1710 return _lib.SSL_CTX_set_options(self._context, options) 

1711 

1712 @_require_not_used 

1713 def set_mode(self, mode: int) -> int: 

1714 """ 

1715 Add modes via bitmask. Modes set before are not cleared! This method 

1716 should be used with the :const:`MODE_*` constants. 

1717 

1718 :param mode: The mode to add. 

1719 :return: The new mode bitmask. 

1720 """ 

1721 if not isinstance(mode, int): 

1722 raise TypeError("mode must be an integer") 

1723 

1724 return _lib.SSL_CTX_set_mode(self._context, mode) 

1725 

1726 @_require_not_used 

1727 def clear_mode(self, mode_to_clear: int) -> int: 

1728 """ 

1729 Modes previously set cannot be overwritten without being 

1730 cleared first. This method should be used to clear existing modes. 

1731 """ 

1732 return _lib.SSL_CTX_clear_mode(self._context, mode_to_clear) 

1733 

1734 @_require_not_used 

1735 def set_tlsext_servername_callback( 

1736 self, callback: Callable[[Connection], None] 

1737 ) -> None: 

1738 """ 

1739 Specify a callback function to be called when clients specify a server 

1740 name. 

1741 

1742 :param callback: The callback function. It will be invoked with one 

1743 argument, the Connection instance. 

1744 

1745 .. versionadded:: 0.13 

1746 """ 

1747 

1748 @wraps(callback) 

1749 def wrapper(ssl, alert, arg): # type: ignore[no-untyped-def] 

1750 callback(Connection._reverse_mapping[ssl]) 

1751 return 0 

1752 

1753 self._tlsext_servername_callback = _ffi.callback( 

1754 "int (*)(SSL *, int *, void *)", wrapper 

1755 ) 

1756 _lib.SSL_CTX_set_tlsext_servername_callback( 

1757 self._context, self._tlsext_servername_callback 

1758 ) 

1759 

1760 @_require_not_used 

1761 def set_tlsext_use_srtp(self, profiles: bytes) -> None: 

1762 """ 

1763 Enable support for negotiating SRTP keying material. 

1764 

1765 :param bytes profiles: A colon delimited list of protection profile 

1766 names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``. 

1767 :return: None 

1768 """ 

1769 if not isinstance(profiles, bytes): 

1770 raise TypeError("profiles must be a byte string.") 

1771 

1772 _openssl_assert( 

1773 _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0 

1774 ) 

1775 

1776 @_require_not_used 

1777 def set_alpn_protos(self, protos: list[bytes]) -> None: 

1778 """ 

1779 Specify the protocols that the client is prepared to speak after the 

1780 TLS connection has been negotiated using Application Layer Protocol 

1781 Negotiation. 

1782 

1783 :param protos: A list of the protocols to be offered to the server. 

1784 This list should be a Python list of bytestrings representing the 

1785 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. 

1786 """ 

1787 # Different versions of OpenSSL are inconsistent about how they handle 

1788 # empty proto lists (see #1043), so we avoid the problem entirely by 

1789 # rejecting them ourselves. 

1790 if not protos: 

1791 raise ValueError("at least one protocol must be specified") 

1792 

1793 # Take the list of protocols and join them together, prefixing them 

1794 # with their lengths. 

1795 protostr = b"".join( 

1796 chain.from_iterable((bytes((len(p),)), p) for p in protos) 

1797 ) 

1798 

1799 # Build a C string from the list. We don't need to save this off 

1800 # because OpenSSL immediately copies the data out. 

1801 input_str = _ffi.new("unsigned char[]", protostr) 

1802 

1803 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: 

1804 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() 

1805 # return 0 on success, and non-0 on failure. 

1806 # WARNING: these functions reverse the return value convention. 

1807 _openssl_assert( 

1808 _lib.SSL_CTX_set_alpn_protos( 

1809 self._context, input_str, len(protostr) 

1810 ) 

1811 == 0 

1812 ) 

1813 

1814 @_require_not_used 

1815 def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None: 

1816 """ 

1817 Specify a callback function that will be called on the server when a 

1818 client offers protocols using ALPN. 

1819 

1820 :param callback: The callback function. It will be invoked with two 

1821 arguments: the Connection, and a list of offered protocols as 

1822 bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return 

1823 one of those bytestrings to indicate the chosen protocol, the 

1824 empty bytestring to terminate the TLS connection, or the 

1825 :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered 

1826 protocol was selected, but that the connection should not be 

1827 aborted. 

1828 """ 

1829 self._alpn_select_helper = _ALPNSelectHelper(callback) 

1830 self._alpn_select_callback = self._alpn_select_helper.callback 

1831 _lib.SSL_CTX_set_alpn_select_cb( 

1832 self._context, self._alpn_select_callback, _ffi.NULL 

1833 ) 

1834 

1835 def _set_ocsp_callback( 

1836 self, 

1837 helper: _OCSPClientCallbackHelper | _OCSPServerCallbackHelper, 

1838 data: Any | None, 

1839 ) -> None: 

1840 """ 

1841 This internal helper does the common work for 

1842 ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is 

1843 almost all of it. 

1844 """ 

1845 self._ocsp_helper = helper 

1846 self._ocsp_callback = helper.callback 

1847 if data is None: 

1848 self._ocsp_data = _ffi.NULL 

1849 else: 

1850 self._ocsp_data = _ffi.new_handle(data) 

1851 

1852 rc = _lib.SSL_CTX_set_tlsext_status_cb( 

1853 self._context, self._ocsp_callback 

1854 ) 

1855 _openssl_assert(rc == 1) 

1856 rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data) 

1857 _openssl_assert(rc == 1) 

1858 

1859 @_require_not_used 

1860 def set_ocsp_server_callback( 

1861 self, 

1862 callback: _OCSPServerCallback[_T], 

1863 data: _T | None = None, 

1864 ) -> None: 

1865 """ 

1866 Set a callback to provide OCSP data to be stapled to the TLS handshake 

1867 on the server side. 

1868 

1869 :param callback: The callback function. It will be invoked with two 

1870 arguments: the Connection, and the optional arbitrary data you have 

1871 provided. The callback must return a bytestring that contains the 

1872 OCSP data to staple to the handshake. If no OCSP data is available 

1873 for this connection, return the empty bytestring. 

1874 :param data: Some opaque data that will be passed into the callback 

1875 function when called. This can be used to avoid needing to do 

1876 complex data lookups or to keep track of what context is being 

1877 used. This parameter is optional. 

1878 """ 

1879 helper = _OCSPServerCallbackHelper(callback) 

1880 self._set_ocsp_callback(helper, data) 

1881 

1882 @_require_not_used 

1883 def set_ocsp_client_callback( 

1884 self, 

1885 callback: _OCSPClientCallback[_T], 

1886 data: _T | None = None, 

1887 ) -> None: 

1888 """ 

1889 Set a callback to validate OCSP data stapled to the TLS handshake on 

1890 the client side. 

1891 

1892 :param callback: The callback function. It will be invoked with three 

1893 arguments: the Connection, a bytestring containing the stapled OCSP 

1894 assertion, and the optional arbitrary data you have provided. The 

1895 callback must return a boolean that indicates the result of 

1896 validating the OCSP data: ``True`` if the OCSP data is valid and 

1897 the certificate can be trusted, or ``False`` if either the OCSP 

1898 data is invalid or the certificate has been revoked. 

1899 :param data: Some opaque data that will be passed into the callback 

1900 function when called. This can be used to avoid needing to do 

1901 complex data lookups or to keep track of what context is being 

1902 used. This parameter is optional. 

1903 """ 

1904 helper = _OCSPClientCallbackHelper(callback) 

1905 self._set_ocsp_callback(helper, data) 

1906 

1907 @_require_not_used 

1908 def set_cookie_generate_callback( 

1909 self, callback: _CookieGenerateCallback 

1910 ) -> None: 

1911 self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback) 

1912 _lib.SSL_CTX_set_cookie_generate_cb( 

1913 self._context, 

1914 self._cookie_generate_helper.callback, 

1915 ) 

1916 

1917 @_require_not_used 

1918 def set_cookie_verify_callback( 

1919 self, callback: _CookieVerifyCallback 

1920 ) -> None: 

1921 self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback) 

1922 _lib.SSL_CTX_set_cookie_verify_cb( 

1923 self._context, 

1924 self._cookie_verify_helper.callback, 

1925 ) 

1926 

1927 

1928class Connection: 

1929 _reverse_mapping: typing.MutableMapping[Any, Connection] = ( 

1930 WeakValueDictionary() 

1931 ) 

1932 

1933 def __init__( 

1934 self, context: Context, socket: socket.socket | None = None 

1935 ) -> None: 

1936 """ 

1937 Create a new Connection object, using the given OpenSSL.SSL.Context 

1938 instance and socket. 

1939 

1940 :param context: An SSL Context to use for this connection 

1941 :param socket: The socket to use for transport layer 

1942 """ 

1943 if not isinstance(context, Context): 

1944 raise TypeError("context must be a Context instance") 

1945 

1946 context._used = True 

1947 

1948 ssl = _lib.SSL_new(context._context) 

1949 self._ssl = _ffi.gc(ssl, _lib.SSL_free) 

1950 # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns 

1951 # an SSL_ERROR_WANT_READ when processing a non-application data packet 

1952 # even though there is still data on the underlying transport. 

1953 # See https://github.com/openssl/openssl/issues/6234 for more details. 

1954 _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY) 

1955 self._context = context 

1956 self._app_data = None 

1957 

1958 # References to strings used for Application Layer Protocol 

1959 # Negotiation. These strings get copied at some point but it's well 

1960 # after the callback returns, so we have to hang them somewhere to 

1961 # avoid them getting freed. 

1962 self._alpn_select_callback_args: Any = None 

1963 

1964 # Reference the verify_callback of the Context. This ensures that if 

1965 # set_verify is called again after the SSL object has been created we 

1966 # do not point to a dangling reference 

1967 self._verify_helper = context._verify_helper 

1968 self._verify_callback = context._verify_callback 

1969 

1970 # And likewise for the cookie callbacks 

1971 self._cookie_generate_helper = context._cookie_generate_helper 

1972 self._cookie_verify_helper = context._cookie_verify_helper 

1973 

1974 self._reverse_mapping[self._ssl] = self 

1975 

1976 if socket is None: 

1977 self._socket = None 

1978 # Don't set up any gc for these, SSL_free will take care of them. 

1979 self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem()) 

1980 _openssl_assert(self._into_ssl != _ffi.NULL) 

1981 

1982 self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem()) 

1983 _openssl_assert(self._from_ssl != _ffi.NULL) 

1984 

1985 _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl) 

1986 else: 

1987 self._into_ssl = None 

1988 self._from_ssl = None 

1989 self._socket = socket 

1990 set_result = _lib.SSL_set_fd( 

1991 self._ssl, _asFileDescriptor(self._socket) 

1992 ) 

1993 _openssl_assert(set_result == 1) 

1994 

1995 def __getattr__(self, name: str) -> Any: 

1996 """ 

1997 Look up attributes on the wrapped socket object if they are not found 

1998 on the Connection object. 

1999 """ 

2000 if self._socket is None: 

2001 raise AttributeError( 

2002 f"'{self.__class__.__name__}' object has no attribute '{name}'" 

2003 ) 

2004 else: 

2005 return getattr(self._socket, name) 

2006 

2007 def _raise_ssl_error(self, ssl: Any, result: int) -> None: 

2008 if self._context._verify_helper is not None: 

2009 self._context._verify_helper.raise_if_problem() 

2010 if self._context._alpn_select_helper is not None: 

2011 self._context._alpn_select_helper.raise_if_problem() 

2012 if self._context._ocsp_helper is not None: 

2013 self._context._ocsp_helper.raise_if_problem() 

2014 

2015 error = _lib.SSL_get_error(ssl, result) 

2016 if error == _lib.SSL_ERROR_WANT_READ: 

2017 raise WantReadError() 

2018 elif error == _lib.SSL_ERROR_WANT_WRITE: 

2019 raise WantWriteError() 

2020 elif error == _lib.SSL_ERROR_ZERO_RETURN: 

2021 raise ZeroReturnError() 

2022 elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP: 

2023 # TODO: This is untested. 

2024 raise WantX509LookupError() 

2025 elif error == _lib.SSL_ERROR_SYSCALL: 

2026 if platform == "win32": 

2027 errno = _ffi.getwinerror()[0] 

2028 else: 

2029 errno = _ffi.errno 

2030 if _lib.ERR_peek_error() == 0 or errno != 0: 

2031 if result < 0 and errno != 0: 

2032 raise SysCallError(errno, errorcode.get(errno)) 

2033 raise SysCallError(-1, "Unexpected EOF") 

2034 else: 

2035 # TODO: This is untested, but I think twisted hits it? 

2036 _raise_current_error() 

2037 elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0: 

2038 # In 3.0.x an unexpected EOF no longer triggers syscall error 

2039 # but we want to maintain compatibility so we check here and 

2040 # raise syscall if it is an EOF. Since we're not actually sure 

2041 # what else could raise SSL_ERROR_SSL we check for the presence 

2042 # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING 

2043 # and if it's not present we just raise an error, which matches 

2044 # the behavior before we added this elif section 

2045 peeked_error = _lib.ERR_peek_error() 

2046 reason = _lib.ERR_GET_REASON(peeked_error) 

2047 if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING: 

2048 _openssl_assert( 

2049 reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING 

2050 ) 

2051 _lib.ERR_clear_error() 

2052 raise SysCallError(-1, "Unexpected EOF") 

2053 else: 

2054 _raise_current_error() 

2055 elif error == _lib.SSL_ERROR_NONE: 

2056 pass 

2057 else: 

2058 _raise_current_error() 

2059 

2060 def get_context(self) -> Context: 

2061 """ 

2062 Retrieve the :class:`Context` object associated with this 

2063 :class:`Connection`. 

2064 """ 

2065 return self._context 

2066 

2067 def set_context(self, context: Context) -> None: 

2068 """ 

2069 Switch this connection to a new session context. 

2070 

2071 :param context: A :class:`Context` instance giving the new session 

2072 context to use. 

2073 """ 

2074 if not isinstance(context, Context): 

2075 raise TypeError("context must be a Context instance") 

2076 

2077 _lib.SSL_set_SSL_CTX(self._ssl, context._context) 

2078 self._context = context 

2079 self._context._used = True 

2080 

2081 def get_servername(self) -> bytes | None: 

2082 """ 

2083 Retrieve the servername extension value if provided in the client hello 

2084 message, or None if there wasn't one. 

2085 

2086 :return: A byte string giving the server name or :data:`None`. 

2087 

2088 .. versionadded:: 0.13 

2089 """ 

2090 name = _lib.SSL_get_servername( 

2091 self._ssl, _lib.TLSEXT_NAMETYPE_host_name 

2092 ) 

2093 if name == _ffi.NULL: 

2094 return None 

2095 

2096 return _ffi.string(name) 

2097 

2098 def set_verify( 

2099 self, mode: int, callback: _VerifyCallback | None = None 

2100 ) -> None: 

2101 """ 

2102 Override the Context object's verification flags for this specific 

2103 connection. See :py:meth:`Context.set_verify` for details. 

2104 """ 

2105 if not isinstance(mode, int): 

2106 raise TypeError("mode must be an integer") 

2107 

2108 if callback is None: 

2109 self._verify_helper = None 

2110 self._verify_callback = None 

2111 _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL) 

2112 else: 

2113 if not callable(callback): 

2114 raise TypeError("callback must be callable") 

2115 

2116 self._verify_helper = _VerifyHelper(callback) 

2117 self._verify_callback = self._verify_helper.callback 

2118 _lib.SSL_set_verify(self._ssl, mode, self._verify_callback) 

2119 

2120 def get_verify_mode(self) -> int: 

2121 """ 

2122 Retrieve the Connection object's verify mode, as set by 

2123 :meth:`set_verify`. 

2124 

2125 :return: The verify mode 

2126 """ 

2127 return _lib.SSL_get_verify_mode(self._ssl) 

2128 

2129 def use_certificate(self, cert: X509 | x509.Certificate) -> None: 

2130 """ 

2131 Load a certificate from a X509 object 

2132 

2133 :param cert: The X509 object 

2134 :return: None 

2135 """ 

2136 # Mirrored from Context.use_certificate 

2137 if not isinstance(cert, X509): 

2138 cert = X509.from_cryptography(cert) 

2139 else: 

2140 warnings.warn( 

2141 ( 

2142 "Passing pyOpenSSL X509 objects is deprecated. You " 

2143 "should use a cryptography.x509.Certificate instead." 

2144 ), 

2145 DeprecationWarning, 

2146 stacklevel=2, 

2147 ) 

2148 

2149 use_result = _lib.SSL_use_certificate(self._ssl, cert._x509) 

2150 if not use_result: 

2151 _raise_current_error() 

2152 

2153 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: 

2154 """ 

2155 Load a private key from a PKey object 

2156 

2157 :param pkey: The PKey object 

2158 :return: None 

2159 """ 

2160 # Mirrored from Context.use_privatekey 

2161 if not isinstance(pkey, PKey): 

2162 pkey = PKey.from_cryptography_key(pkey) 

2163 else: 

2164 warnings.warn( 

2165 ( 

2166 "Passing pyOpenSSL PKey objects is deprecated. You " 

2167 "should use a cryptography private key instead." 

2168 ), 

2169 DeprecationWarning, 

2170 stacklevel=2, 

2171 ) 

2172 

2173 use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey) 

2174 if not use_result: 

2175 self._context._raise_passphrase_exception() 

2176 

2177 def set_ciphertext_mtu(self, mtu: int) -> None: 

2178 """ 

2179 For DTLS, set the maximum UDP payload size (*not* including IP/UDP 

2180 overhead). 

2181 

2182 Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent 

2183 OpenSSL from spontaneously clearing this. 

2184 

2185 :param mtu: An integer giving the maximum transmission unit. 

2186 

2187 .. versionadded:: 21.1 

2188 """ 

2189 _lib.SSL_set_mtu(self._ssl, mtu) 

2190 

2191 def get_cleartext_mtu(self) -> int: 

2192 """ 

2193 For DTLS, get the maximum size of unencrypted data you can pass to 

2194 :meth:`write` without exceeding the MTU (as passed to 

2195 :meth:`set_ciphertext_mtu`). 

2196 

2197 :return: The effective MTU as an integer. 

2198 

2199 .. versionadded:: 21.1 

2200 """ 

2201 

2202 if not hasattr(_lib, "DTLS_get_data_mtu"): 

2203 raise NotImplementedError("requires OpenSSL 1.1.1 or better") 

2204 return _lib.DTLS_get_data_mtu(self._ssl) 

2205 

2206 def set_tlsext_host_name(self, name: bytes) -> None: 

2207 """ 

2208 Set the value of the servername extension to send in the client hello. 

2209 

2210 :param name: A byte string giving the name. 

2211 

2212 .. versionadded:: 0.13 

2213 """ 

2214 if not isinstance(name, bytes): 

2215 raise TypeError("name must be a byte string") 

2216 elif b"\0" in name: 

2217 raise TypeError("name must not contain NUL byte") 

2218 

2219 # XXX I guess this can fail sometimes? 

2220 _lib.SSL_set_tlsext_host_name(self._ssl, name) 

2221 

2222 def pending(self) -> int: 

2223 """ 

2224 Get the number of bytes that can be safely read from the SSL buffer 

2225 (**not** the underlying transport buffer). 

2226 

2227 :return: The number of bytes available in the receive buffer. 

2228 """ 

2229 return _lib.SSL_pending(self._ssl) 

2230 

2231 def send(self, buf: _Buffer, flags: int = 0) -> int: 

2232 """ 

2233 Send data on the connection. NOTE: If you get one of the WantRead, 

2234 WantWrite or WantX509Lookup exceptions on this, you have to call the 

2235 method again with the SAME buffer. 

2236 

2237 :param buf: The string, buffer or memoryview to send 

2238 :param flags: (optional) Included for compatibility with the socket 

2239 API, the value is ignored 

2240 :return: The number of bytes written 

2241 """ 

2242 # Backward compatibility 

2243 buf = _text_to_bytes_and_warn("buf", buf) 

2244 

2245 with _ffi.from_buffer(buf) as data: 

2246 # check len(buf) instead of len(data) for testability 

2247 if len(buf) > 2147483647: 

2248 raise ValueError( 

2249 "Cannot send more than 2**31-1 bytes at once." 

2250 ) 

2251 

2252 result = _lib.SSL_write(self._ssl, data, len(data)) 

2253 self._raise_ssl_error(self._ssl, result) 

2254 

2255 return result 

2256 

2257 write = send 

2258 

2259 def sendall(self, buf: _Buffer, flags: int = 0) -> int: 

2260 """ 

2261 Send "all" data on the connection. This calls send() repeatedly until 

2262 all data is sent. If an error occurs, it's impossible to tell how much 

2263 data has been sent. 

2264 

2265 :param buf: The string, buffer or memoryview to send 

2266 :param flags: (optional) Included for compatibility with the socket 

2267 API, the value is ignored 

2268 :return: The number of bytes written 

2269 """ 

2270 buf = _text_to_bytes_and_warn("buf", buf) 

2271 

2272 with _ffi.from_buffer(buf) as data: 

2273 left_to_send = len(buf) 

2274 total_sent = 0 

2275 

2276 while left_to_send: 

2277 # SSL_write's num arg is an int, 

2278 # so we cannot send more than 2**31-1 bytes at once. 

2279 result = _lib.SSL_write( 

2280 self._ssl, data + total_sent, min(left_to_send, 2147483647) 

2281 ) 

2282 self._raise_ssl_error(self._ssl, result) 

2283 total_sent += result 

2284 left_to_send -= result 

2285 

2286 return total_sent 

2287 

2288 def recv(self, bufsiz: int, flags: int | None = None) -> bytes: 

2289 """ 

2290 Receive data on the connection. 

2291 

2292 :param bufsiz: The maximum number of bytes to read 

2293 :param flags: (optional) The only supported flag is ``MSG_PEEK``, 

2294 all other flags are ignored. 

2295 :return: The string read from the Connection 

2296 """ 

2297 buf = _no_zero_allocator("char[]", bufsiz) 

2298 if flags is not None and flags & socket.MSG_PEEK: 

2299 result = _lib.SSL_peek(self._ssl, buf, bufsiz) 

2300 else: 

2301 result = _lib.SSL_read(self._ssl, buf, bufsiz) 

2302 self._raise_ssl_error(self._ssl, result) 

2303 return _ffi.buffer(buf, result)[:] 

2304 

2305 read = recv 

2306 

2307 def recv_into( 

2308 self, 

2309 buffer: Any, # collections.abc.Buffer once we use Python 3.12+ 

2310 nbytes: int | None = None, 

2311 flags: int | None = None, 

2312 ) -> int: 

2313 """ 

2314 Receive data on the connection and copy it directly into the provided 

2315 buffer, rather than creating a new string. 

2316 

2317 :param buffer: The buffer to copy into. 

2318 :param nbytes: (optional) The maximum number of bytes to read into the 

2319 buffer. If not present, defaults to the size of the buffer. If 

2320 larger than the size of the buffer, is reduced to the size of the 

2321 buffer. 

2322 :param flags: (optional) The only supported flag is ``MSG_PEEK``, 

2323 all other flags are ignored. 

2324 :return: The number of bytes read into the buffer. 

2325 """ 

2326 if nbytes is None: 

2327 nbytes = len(buffer) 

2328 else: 

2329 nbytes = min(nbytes, len(buffer)) 

2330 

2331 # We need to create a temporary buffer. This is annoying, it would be 

2332 # better if we could pass memoryviews straight into the SSL_read call, 

2333 # but right now we can't. Revisit this if CFFI gets that ability. 

2334 buf = _no_zero_allocator("char[]", nbytes) 

2335 if flags is not None and flags & socket.MSG_PEEK: 

2336 result = _lib.SSL_peek(self._ssl, buf, nbytes) 

2337 else: 

2338 result = _lib.SSL_read(self._ssl, buf, nbytes) 

2339 self._raise_ssl_error(self._ssl, result) 

2340 

2341 # This strange line is all to avoid a memory copy. The buffer protocol 

2342 # should allow us to assign a CFFI buffer to the LHS of this line, but 

2343 # on CPython 3.3+ that segfaults. As a workaround, we can temporarily 

2344 # wrap it in a memoryview. 

2345 buffer[:result] = memoryview(_ffi.buffer(buf, result)) 

2346 

2347 return result 

2348 

2349 def _handle_bio_errors(self, bio: Any, result: int) -> typing.NoReturn: 

2350 if _lib.BIO_should_retry(bio): 

2351 if _lib.BIO_should_read(bio): 

2352 raise WantReadError() 

2353 elif _lib.BIO_should_write(bio): 

2354 # TODO: This is untested. 

2355 raise WantWriteError() 

2356 elif _lib.BIO_should_io_special(bio): 

2357 # TODO: This is untested. I think io_special means the socket 

2358 # BIO has a not-yet connected socket. 

2359 raise ValueError("BIO_should_io_special") 

2360 else: 

2361 # TODO: This is untested. 

2362 raise ValueError("unknown bio failure") 

2363 else: 

2364 # TODO: This is untested. 

2365 _raise_current_error() 

2366 

2367 def bio_read(self, bufsiz: int) -> bytes: 

2368 """ 

2369 If the Connection was created with a memory BIO, this method can be 

2370 used to read bytes from the write end of that memory BIO. Many 

2371 Connection methods will add bytes which must be read in this manner or 

2372 the buffer will eventually fill up and the Connection will be able to 

2373 take no further actions. 

2374 

2375 :param bufsiz: The maximum number of bytes to read 

2376 :return: The string read. 

2377 """ 

2378 if self._from_ssl is None: 

2379 raise TypeError("Connection sock was not None") 

2380 

2381 if not isinstance(bufsiz, int): 

2382 raise TypeError("bufsiz must be an integer") 

2383 

2384 buf = _no_zero_allocator("char[]", bufsiz) 

2385 result = _lib.BIO_read(self._from_ssl, buf, bufsiz) 

2386 if result <= 0: 

2387 self._handle_bio_errors(self._from_ssl, result) 

2388 

2389 return _ffi.buffer(buf, result)[:] 

2390 

2391 def bio_write(self, buf: _Buffer) -> int: 

2392 """ 

2393 If the Connection was created with a memory BIO, this method can be 

2394 used to add bytes to the read end of that memory BIO. The Connection 

2395 can then read the bytes (for example, in response to a call to 

2396 :meth:`recv`). 

2397 

2398 :param buf: The string to put into the memory BIO. 

2399 :return: The number of bytes written 

2400 """ 

2401 buf = _text_to_bytes_and_warn("buf", buf) 

2402 

2403 if self._into_ssl is None: 

2404 raise TypeError("Connection sock was not None") 

2405 

2406 with _ffi.from_buffer(buf) as data: 

2407 result = _lib.BIO_write(self._into_ssl, data, len(data)) 

2408 if result <= 0: 

2409 self._handle_bio_errors(self._into_ssl, result) 

2410 return result 

2411 

2412 def renegotiate(self) -> bool: 

2413 """ 

2414 Renegotiate the session. 

2415 

2416 :return: True if the renegotiation can be started, False otherwise 

2417 """ 

2418 if not self.renegotiate_pending(): 

2419 _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1) 

2420 return True 

2421 return False 

2422 

2423 def do_handshake(self) -> None: 

2424 """ 

2425 Perform an SSL handshake (usually called after :meth:`renegotiate` or 

2426 one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can 

2427 raise the same exceptions as :meth:`send` and :meth:`recv`. 

2428 

2429 :return: None. 

2430 """ 

2431 result = _lib.SSL_do_handshake(self._ssl) 

2432 self._raise_ssl_error(self._ssl, result) 

2433 

2434 def renegotiate_pending(self) -> bool: 

2435 """ 

2436 Check if there's a renegotiation in progress, it will return False once 

2437 a renegotiation is finished. 

2438 

2439 :return: Whether there's a renegotiation in progress 

2440 """ 

2441 return _lib.SSL_renegotiate_pending(self._ssl) == 1 

2442 

2443 def total_renegotiations(self) -> int: 

2444 """ 

2445 Find out the total number of renegotiations. 

2446 

2447 :return: The number of renegotiations. 

2448 """ 

2449 return _lib.SSL_total_renegotiations(self._ssl) 

2450 

2451 def connect(self, addr: Any) -> None: 

2452 """ 

2453 Call the :meth:`connect` method of the underlying socket and set up SSL 

2454 on the socket, using the :class:`Context` object supplied to this 

2455 :class:`Connection` object at creation. 

2456 

2457 :param addr: A remote address 

2458 :return: What the socket's connect method returns 

2459 """ 

2460 _lib.SSL_set_connect_state(self._ssl) 

2461 return self._socket.connect(addr) # type: ignore[return-value, union-attr] 

2462 

2463 def connect_ex(self, addr: Any) -> int: 

2464 """ 

2465 Call the :meth:`connect_ex` method of the underlying socket and set up 

2466 SSL on the socket, using the Context object supplied to this Connection 

2467 object at creation. Note that if the :meth:`connect_ex` method of the 

2468 socket doesn't return 0, SSL won't be initialized. 

2469 

2470 :param addr: A remove address 

2471 :return: What the socket's connect_ex method returns 

2472 """ 

2473 connect_ex = self._socket.connect_ex # type: ignore[union-attr] 

2474 self.set_connect_state() 

2475 return connect_ex(addr) 

2476 

2477 def accept(self) -> tuple[Connection, Any]: 

2478 """ 

2479 Call the :meth:`accept` method of the underlying socket and set up SSL 

2480 on the returned socket, using the Context object supplied to this 

2481 :class:`Connection` object at creation. 

2482 

2483 :return: A *(conn, addr)* pair where *conn* is the new 

2484 :class:`Connection` object created, and *address* is as returned by 

2485 the socket's :meth:`accept`. 

2486 """ 

2487 client, addr = self._socket.accept() # type: ignore[union-attr] 

2488 conn = Connection(self._context, client) 

2489 conn.set_accept_state() 

2490 return (conn, addr) 

2491 

2492 def DTLSv1_listen(self) -> None: 

2493 """ 

2494 Call the OpenSSL function DTLSv1_listen on this connection. See the 

2495 OpenSSL manual for more details. 

2496 

2497 :return: None 

2498 """ 

2499 # Possible future extension: return the BIO_ADDR in some form. 

2500 bio_addr = _lib.BIO_ADDR_new() 

2501 try: 

2502 result = _lib.DTLSv1_listen(self._ssl, bio_addr) 

2503 finally: 

2504 _lib.BIO_ADDR_free(bio_addr) 

2505 # DTLSv1_listen is weird. A zero return value means 'didn't find a 

2506 # ClientHello with valid cookie, but keep trying'. So basically 

2507 # WantReadError. But it doesn't work correctly with _raise_ssl_error. 

2508 # So we raise it manually instead. 

2509 if self._cookie_generate_helper is not None: 

2510 self._cookie_generate_helper.raise_if_problem() 

2511 if self._cookie_verify_helper is not None: 

2512 self._cookie_verify_helper.raise_if_problem() 

2513 if result == 0: 

2514 raise WantReadError() 

2515 if result < 0: 

2516 self._raise_ssl_error(self._ssl, result) 

2517 

2518 def DTLSv1_get_timeout(self) -> int | None: 

2519 """ 

2520 Determine when the DTLS SSL object next needs to perform internal 

2521 processing due to the passage of time. 

2522 

2523 When the returned number of seconds have passed, the 

2524 :meth:`DTLSv1_handle_timeout` method needs to be called. 

2525 

2526 :return: The time left in seconds before the next timeout or `None` 

2527 if no timeout is currently active. 

2528 """ 

2529 ptv_sec = _ffi.new("time_t *") 

2530 ptv_usec = _ffi.new("long *") 

2531 if _lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec): 

2532 return ptv_sec[0] + (ptv_usec[0] / 1000000) 

2533 else: 

2534 return None 

2535 

2536 def DTLSv1_handle_timeout(self) -> bool: 

2537 """ 

2538 Handles any timeout events which have become pending on a DTLS SSL 

2539 object. 

2540 

2541 :return: `True` if there was a pending timeout, `False` otherwise. 

2542 """ 

2543 result = _lib.DTLSv1_handle_timeout(self._ssl) 

2544 if result < 0: 

2545 self._raise_ssl_error(self._ssl, result) 

2546 assert False, "unreachable" 

2547 else: 

2548 return bool(result) 

2549 

2550 def bio_shutdown(self) -> None: 

2551 """ 

2552 If the Connection was created with a memory BIO, this method can be 

2553 used to indicate that *end of file* has been reached on the read end of 

2554 that memory BIO. 

2555 

2556 :return: None 

2557 """ 

2558 if self._from_ssl is None: 

2559 raise TypeError("Connection sock was not None") 

2560 

2561 _lib.BIO_set_mem_eof_return(self._into_ssl, 0) 

2562 

2563 def shutdown(self) -> bool: 

2564 """ 

2565 Send the shutdown message to the Connection. 

2566 

2567 :return: True if the shutdown completed successfully (i.e. both sides 

2568 have sent closure alerts), False otherwise (in which case you 

2569 call :meth:`recv` or :meth:`send` when the connection becomes 

2570 readable/writeable). 

2571 """ 

2572 result = _lib.SSL_shutdown(self._ssl) 

2573 if result < 0: 

2574 self._raise_ssl_error(self._ssl, result) 

2575 assert False, "unreachable" 

2576 elif result > 0: 

2577 return True 

2578 else: 

2579 return False 

2580 

2581 def get_cipher_list(self) -> list[str]: 

2582 """ 

2583 Retrieve the list of ciphers used by the Connection object. 

2584 

2585 :return: A list of native cipher strings. 

2586 """ 

2587 ciphers = [] 

2588 for i in count(): 

2589 result = _lib.SSL_get_cipher_list(self._ssl, i) 

2590 if result == _ffi.NULL: 

2591 break 

2592 ciphers.append(_ffi.string(result).decode("utf-8")) 

2593 return ciphers 

2594 

2595 def get_client_ca_list(self) -> list[X509Name]: 

2596 """ 

2597 Get CAs whose certificates are suggested for client authentication. 

2598 

2599 :return: If this is a server connection, the list of certificate 

2600 authorities that will be sent or has been sent to the client, as 

2601 controlled by this :class:`Connection`'s :class:`Context`. 

2602 

2603 If this is a client connection, the list will be empty until the 

2604 connection with the server is established. 

2605 

2606 .. versionadded:: 0.10 

2607 """ 

2608 ca_names = _lib.SSL_get_client_CA_list(self._ssl) 

2609 if ca_names == _ffi.NULL: 

2610 # TODO: This is untested. 

2611 return [] 

2612 

2613 result = [] 

2614 for i in range(_lib.sk_X509_NAME_num(ca_names)): 

2615 name = _lib.sk_X509_NAME_value(ca_names, i) 

2616 copy = _lib.X509_NAME_dup(name) 

2617 _openssl_assert(copy != _ffi.NULL) 

2618 

2619 pyname = X509Name.__new__(X509Name) 

2620 pyname._name = _ffi.gc(copy, _lib.X509_NAME_free) 

2621 result.append(pyname) 

2622 return result 

2623 

2624 def makefile(self, *args: Any, **kwargs: Any) -> typing.NoReturn: 

2625 """ 

2626 The makefile() method is not implemented, since there is no dup 

2627 semantics for SSL connections 

2628 

2629 :raise: NotImplementedError 

2630 """ 

2631 raise NotImplementedError( 

2632 "Cannot make file object of OpenSSL.SSL.Connection" 

2633 ) 

2634 

2635 def get_app_data(self) -> Any: 

2636 """ 

2637 Retrieve application data as set by :meth:`set_app_data`. 

2638 

2639 :return: The application data 

2640 """ 

2641 return self._app_data 

2642 

2643 def set_app_data(self, data: Any) -> None: 

2644 """ 

2645 Set application data 

2646 

2647 :param data: The application data 

2648 :return: None 

2649 """ 

2650 self._app_data = data 

2651 

2652 def get_shutdown(self) -> int: 

2653 """ 

2654 Get the shutdown state of the Connection. 

2655 

2656 :return: The shutdown state, a bitvector of SENT_SHUTDOWN, 

2657 RECEIVED_SHUTDOWN. 

2658 """ 

2659 return _lib.SSL_get_shutdown(self._ssl) 

2660 

2661 def set_shutdown(self, state: int) -> None: 

2662 """ 

2663 Set the shutdown state of the Connection. 

2664 

2665 :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN. 

2666 :return: None 

2667 """ 

2668 if not isinstance(state, int): 

2669 raise TypeError("state must be an integer") 

2670 

2671 _lib.SSL_set_shutdown(self._ssl, state) 

2672 

2673 def get_state_string(self) -> bytes: 

2674 """ 

2675 Retrieve a verbose string detailing the state of the Connection. 

2676 

2677 :return: A string representing the state 

2678 """ 

2679 return _ffi.string(_lib.SSL_state_string_long(self._ssl)) 

2680 

2681 def server_random(self) -> bytes | None: 

2682 """ 

2683 Retrieve the random value used with the server hello message. 

2684 

2685 :return: A string representing the state 

2686 """ 

2687 session = _lib.SSL_get_session(self._ssl) 

2688 if session == _ffi.NULL: 

2689 return None 

2690 length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0) 

2691 _openssl_assert(length > 0) 

2692 outp = _no_zero_allocator("unsigned char[]", length) 

2693 _lib.SSL_get_server_random(self._ssl, outp, length) 

2694 return _ffi.buffer(outp, length)[:] 

2695 

2696 def client_random(self) -> bytes | None: 

2697 """ 

2698 Retrieve the random value used with the client hello message. 

2699 

2700 :return: A string representing the state 

2701 """ 

2702 session = _lib.SSL_get_session(self._ssl) 

2703 if session == _ffi.NULL: 

2704 return None 

2705 

2706 length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0) 

2707 _openssl_assert(length > 0) 

2708 outp = _no_zero_allocator("unsigned char[]", length) 

2709 _lib.SSL_get_client_random(self._ssl, outp, length) 

2710 return _ffi.buffer(outp, length)[:] 

2711 

2712 def master_key(self) -> bytes | None: 

2713 """ 

2714 Retrieve the value of the master key for this session. 

2715 

2716 :return: A string representing the state 

2717 """ 

2718 session = _lib.SSL_get_session(self._ssl) 

2719 if session == _ffi.NULL: 

2720 return None 

2721 

2722 length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0) 

2723 _openssl_assert(length > 0) 

2724 outp = _no_zero_allocator("unsigned char[]", length) 

2725 _lib.SSL_SESSION_get_master_key(session, outp, length) 

2726 return _ffi.buffer(outp, length)[:] 

2727 

2728 def export_keying_material( 

2729 self, label: bytes, olen: int, context: bytes | None = None 

2730 ) -> bytes: 

2731 """ 

2732 Obtain keying material for application use. 

2733 

2734 :param: label - a disambiguating label string as described in RFC 5705 

2735 :param: olen - the length of the exported key material in bytes 

2736 :param: context - a per-association context value 

2737 :return: the exported key material bytes or None 

2738 """ 

2739 outp = _no_zero_allocator("unsigned char[]", olen) 

2740 context_buf = _ffi.NULL 

2741 context_len = 0 

2742 use_context = 0 

2743 if context is not None: 

2744 context_buf = context 

2745 context_len = len(context) 

2746 use_context = 1 

2747 success = _lib.SSL_export_keying_material( 

2748 self._ssl, 

2749 outp, 

2750 olen, 

2751 label, 

2752 len(label), 

2753 context_buf, 

2754 context_len, 

2755 use_context, 

2756 ) 

2757 _openssl_assert(success == 1) 

2758 return _ffi.buffer(outp, olen)[:] 

2759 

2760 def sock_shutdown(self, *args: Any, **kwargs: Any) -> None: 

2761 """ 

2762 Call the :meth:`shutdown` method of the underlying socket. 

2763 See :manpage:`shutdown(2)`. 

2764 

2765 :return: What the socket's shutdown() method returns 

2766 """ 

2767 return self._socket.shutdown(*args, **kwargs) # type: ignore[return-value, union-attr] 

2768 

2769 @typing.overload 

2770 def get_certificate( 

2771 self, *, as_cryptography: typing.Literal[True] 

2772 ) -> x509.Certificate | None: 

2773 pass 

2774 

2775 @typing.overload 

2776 def get_certificate( 

2777 self, *, as_cryptography: typing.Literal[False] = False 

2778 ) -> X509 | None: 

2779 pass 

2780 

2781 def get_certificate( 

2782 self, 

2783 *, 

2784 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2785 ) -> X509 | x509.Certificate | None: 

2786 """ 

2787 Retrieve the local certificate (if any) 

2788 

2789 :param bool as_cryptography: Controls whether a 

2790 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509`` 

2791 object should be returned. 

2792 

2793 :return: The local certificate 

2794 """ 

2795 cert = _lib.SSL_get_certificate(self._ssl) 

2796 if cert != _ffi.NULL: 

2797 _lib.X509_up_ref(cert) 

2798 pycert = X509._from_raw_x509_ptr(cert) 

2799 if as_cryptography: 

2800 return pycert.to_cryptography() 

2801 return pycert 

2802 return None 

2803 

2804 @typing.overload 

2805 def get_peer_certificate( 

2806 self, *, as_cryptography: typing.Literal[True] 

2807 ) -> x509.Certificate | None: 

2808 pass 

2809 

2810 @typing.overload 

2811 def get_peer_certificate( 

2812 self, *, as_cryptography: typing.Literal[False] = False 

2813 ) -> X509 | None: 

2814 pass 

2815 

2816 def get_peer_certificate( 

2817 self, 

2818 *, 

2819 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2820 ) -> X509 | x509.Certificate | None: 

2821 """ 

2822 Retrieve the other side's certificate (if any) 

2823 

2824 :param bool as_cryptography: Controls whether a 

2825 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509`` 

2826 object should be returned. 

2827 

2828 :return: The peer's certificate 

2829 """ 

2830 cert = _lib.SSL_get_peer_certificate(self._ssl) 

2831 if cert != _ffi.NULL: 

2832 pycert = X509._from_raw_x509_ptr(cert) 

2833 if as_cryptography: 

2834 return pycert.to_cryptography() 

2835 return pycert 

2836 return None 

2837 

2838 @staticmethod 

2839 def _cert_stack_to_list(cert_stack: Any) -> list[X509]: 

2840 """ 

2841 Internal helper to convert a STACK_OF(X509) to a list of X509 

2842 instances. 

2843 """ 

2844 result = [] 

2845 for i in range(_lib.sk_X509_num(cert_stack)): 

2846 cert = _lib.sk_X509_value(cert_stack, i) 

2847 _openssl_assert(cert != _ffi.NULL) 

2848 res = _lib.X509_up_ref(cert) 

2849 _openssl_assert(res >= 1) 

2850 pycert = X509._from_raw_x509_ptr(cert) 

2851 result.append(pycert) 

2852 return result 

2853 

2854 @staticmethod 

2855 def _cert_stack_to_cryptography_list( 

2856 cert_stack: Any, 

2857 ) -> list[x509.Certificate]: 

2858 """ 

2859 Internal helper to convert a STACK_OF(X509) to a list of X509 

2860 instances. 

2861 """ 

2862 result = [] 

2863 for i in range(_lib.sk_X509_num(cert_stack)): 

2864 cert = _lib.sk_X509_value(cert_stack, i) 

2865 _openssl_assert(cert != _ffi.NULL) 

2866 res = _lib.X509_up_ref(cert) 

2867 _openssl_assert(res >= 1) 

2868 pycert = X509._from_raw_x509_ptr(cert) 

2869 result.append(pycert.to_cryptography()) 

2870 return result 

2871 

2872 @typing.overload 

2873 def get_peer_cert_chain( 

2874 self, *, as_cryptography: typing.Literal[True] 

2875 ) -> list[x509.Certificate] | None: 

2876 pass 

2877 

2878 @typing.overload 

2879 def get_peer_cert_chain( 

2880 self, *, as_cryptography: typing.Literal[False] = False 

2881 ) -> list[X509] | None: 

2882 pass 

2883 

2884 def get_peer_cert_chain( 

2885 self, 

2886 *, 

2887 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2888 ) -> list[X509] | list[x509.Certificate] | None: 

2889 """ 

2890 Retrieve the other side's certificate (if any) 

2891 

2892 :param bool as_cryptography: Controls whether a list of 

2893 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509`` 

2894 object should be returned. 

2895 

2896 :return: A list of X509 instances giving the peer's certificate chain, 

2897 or None if it does not have one. 

2898 """ 

2899 cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl) 

2900 if cert_stack == _ffi.NULL: 

2901 return None 

2902 

2903 if as_cryptography: 

2904 return self._cert_stack_to_cryptography_list(cert_stack) 

2905 return self._cert_stack_to_list(cert_stack) 

2906 

2907 @typing.overload 

2908 def get_verified_chain( 

2909 self, *, as_cryptography: typing.Literal[True] 

2910 ) -> list[x509.Certificate] | None: 

2911 pass 

2912 

2913 @typing.overload 

2914 def get_verified_chain( 

2915 self, *, as_cryptography: typing.Literal[False] = False 

2916 ) -> list[X509] | None: 

2917 pass 

2918 

2919 def get_verified_chain( 

2920 self, 

2921 *, 

2922 as_cryptography: typing.Literal[True] | typing.Literal[False] = False, 

2923 ) -> list[X509] | list[x509.Certificate] | None: 

2924 """ 

2925 Retrieve the verified certificate chain of the peer including the 

2926 peer's end entity certificate. It must be called after a session has 

2927 been successfully established. If peer verification was not successful 

2928 the chain may be incomplete, invalid, or None. 

2929 

2930 :param bool as_cryptography: Controls whether a list of 

2931 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509`` 

2932 object should be returned. 

2933 

2934 :return: A list of X509 instances giving the peer's verified 

2935 certificate chain, or None if it does not have one. 

2936 

2937 .. versionadded:: 20.0 

2938 """ 

2939 # OpenSSL 1.1+ 

2940 cert_stack = _lib.SSL_get0_verified_chain(self._ssl) 

2941 if cert_stack == _ffi.NULL: 

2942 return None 

2943 

2944 if as_cryptography: 

2945 return self._cert_stack_to_cryptography_list(cert_stack) 

2946 return self._cert_stack_to_list(cert_stack) 

2947 

2948 def want_read(self) -> bool: 

2949 """ 

2950 Checks if more data has to be read from the transport layer to complete 

2951 an operation. 

2952 

2953 :return: True iff more data has to be read 

2954 """ 

2955 return _lib.SSL_want_read(self._ssl) 

2956 

2957 def want_write(self) -> bool: 

2958 """ 

2959 Checks if there is data to write to the transport layer to complete an 

2960 operation. 

2961 

2962 :return: True iff there is data to write 

2963 """ 

2964 return _lib.SSL_want_write(self._ssl) 

2965 

2966 def set_accept_state(self) -> None: 

2967 """ 

2968 Set the connection to work in server mode. The handshake will be 

2969 handled automatically by read/write. 

2970 

2971 :return: None 

2972 """ 

2973 _lib.SSL_set_accept_state(self._ssl) 

2974 

2975 def set_connect_state(self) -> None: 

2976 """ 

2977 Set the connection to work in client mode. The handshake will be 

2978 handled automatically by read/write. 

2979 

2980 :return: None 

2981 """ 

2982 _lib.SSL_set_connect_state(self._ssl) 

2983 

2984 def get_session(self) -> Session | None: 

2985 """ 

2986 Returns the Session currently used. 

2987 

2988 :return: An instance of :class:`OpenSSL.SSL.Session` or 

2989 :obj:`None` if no session exists. 

2990 

2991 .. versionadded:: 0.14 

2992 """ 

2993 session = _lib.SSL_get1_session(self._ssl) 

2994 if session == _ffi.NULL: 

2995 return None 

2996 

2997 pysession = Session.__new__(Session) 

2998 pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free) 

2999 return pysession 

3000 

3001 def set_session(self, session: Session) -> None: 

3002 """ 

3003 Set the session to be used when the TLS/SSL connection is established. 

3004 

3005 :param session: A Session instance representing the session to use. 

3006 :returns: None 

3007 

3008 .. versionadded:: 0.14 

3009 """ 

3010 if not isinstance(session, Session): 

3011 raise TypeError("session must be a Session instance") 

3012 

3013 result = _lib.SSL_set_session(self._ssl, session._session) 

3014 _openssl_assert(result == 1) 

3015 

3016 def _get_finished_message( 

3017 self, function: Callable[[Any, Any, int], int] 

3018 ) -> bytes | None: 

3019 """ 

3020 Helper to implement :meth:`get_finished` and 

3021 :meth:`get_peer_finished`. 

3022 

3023 :param function: Either :data:`SSL_get_finished`: or 

3024 :data:`SSL_get_peer_finished`. 

3025 

3026 :return: :data:`None` if the desired message has not yet been 

3027 received, otherwise the contents of the message. 

3028 """ 

3029 # The OpenSSL documentation says nothing about what might happen if the 

3030 # count argument given is zero. Specifically, it doesn't say whether 

3031 # the output buffer may be NULL in that case or not. Inspection of the 

3032 # implementation reveals that it calls memcpy() unconditionally. 

3033 # Section 7.1.4, paragraph 1 of the C standard suggests that 

3034 # memcpy(NULL, source, 0) is not guaranteed to produce defined (let 

3035 # alone desirable) behavior (though it probably does on just about 

3036 # every implementation...) 

3037 # 

3038 # Allocate a tiny buffer to pass in (instead of just passing NULL as 

3039 # one might expect) for the initial call so as to be safe against this 

3040 # potentially undefined behavior. 

3041 empty = _ffi.new("char[]", 0) 

3042 size = function(self._ssl, empty, 0) 

3043 if size == 0: 

3044 # No Finished message so far. 

3045 return None 

3046 

3047 buf = _no_zero_allocator("char[]", size) 

3048 function(self._ssl, buf, size) 

3049 return _ffi.buffer(buf, size)[:] 

3050 

3051 def get_finished(self) -> bytes | None: 

3052 """ 

3053 Obtain the latest TLS Finished message that we sent. 

3054 

3055 :return: The contents of the message or :obj:`None` if the TLS 

3056 handshake has not yet completed. 

3057 

3058 .. versionadded:: 0.15 

3059 """ 

3060 return self._get_finished_message(_lib.SSL_get_finished) 

3061 

3062 def get_peer_finished(self) -> bytes | None: 

3063 """ 

3064 Obtain the latest TLS Finished message that we received from the peer. 

3065 

3066 :return: The contents of the message or :obj:`None` if the TLS 

3067 handshake has not yet completed. 

3068 

3069 .. versionadded:: 0.15 

3070 """ 

3071 return self._get_finished_message(_lib.SSL_get_peer_finished) 

3072 

3073 def get_cipher_name(self) -> str | None: 

3074 """ 

3075 Obtain the name of the currently used cipher. 

3076 

3077 :returns: The name of the currently used cipher or :obj:`None` 

3078 if no connection has been established. 

3079 

3080 .. versionadded:: 0.15 

3081 """ 

3082 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3083 if cipher == _ffi.NULL: 

3084 return None 

3085 else: 

3086 name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher)) 

3087 return name.decode("utf-8") 

3088 

3089 def get_cipher_bits(self) -> int | None: 

3090 """ 

3091 Obtain the number of secret bits of the currently used cipher. 

3092 

3093 :returns: The number of secret bits of the currently used cipher 

3094 or :obj:`None` if no connection has been established. 

3095 

3096 .. versionadded:: 0.15 

3097 """ 

3098 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3099 if cipher == _ffi.NULL: 

3100 return None 

3101 else: 

3102 return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL) 

3103 

3104 def get_cipher_version(self) -> str | None: 

3105 """ 

3106 Obtain the protocol version of the currently used cipher. 

3107 

3108 :returns: The protocol name of the currently used cipher 

3109 or :obj:`None` if no connection has been established. 

3110 

3111 .. versionadded:: 0.15 

3112 """ 

3113 cipher = _lib.SSL_get_current_cipher(self._ssl) 

3114 if cipher == _ffi.NULL: 

3115 return None 

3116 else: 

3117 version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher)) 

3118 return version.decode("utf-8") 

3119 

3120 def get_protocol_version_name(self) -> str: 

3121 """ 

3122 Retrieve the protocol version of the current connection. 

3123 

3124 :returns: The TLS version of the current connection, for example 

3125 the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown`` 

3126 for connections that were not successfully established. 

3127 """ 

3128 version = _ffi.string(_lib.SSL_get_version(self._ssl)) 

3129 return version.decode("utf-8") 

3130 

3131 def get_protocol_version(self) -> int: 

3132 """ 

3133 Retrieve the SSL or TLS protocol version of the current connection. 

3134 

3135 :returns: The TLS version of the current connection. For example, 

3136 it will return ``0x769`` for connections made over TLS version 1. 

3137 """ 

3138 version = _lib.SSL_version(self._ssl) 

3139 return version 

3140 

3141 def set_alpn_protos(self, protos: list[bytes]) -> None: 

3142 """ 

3143 Specify the client's ALPN protocol list. 

3144 

3145 These protocols are offered to the server during protocol negotiation. 

3146 

3147 :param protos: A list of the protocols to be offered to the server. 

3148 This list should be a Python list of bytestrings representing the 

3149 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. 

3150 """ 

3151 # Different versions of OpenSSL are inconsistent about how they handle 

3152 # empty proto lists (see #1043), so we avoid the problem entirely by 

3153 # rejecting them ourselves. 

3154 if not protos: 

3155 raise ValueError("at least one protocol must be specified") 

3156 

3157 # Take the list of protocols and join them together, prefixing them 

3158 # with their lengths. 

3159 protostr = b"".join( 

3160 chain.from_iterable((bytes((len(p),)), p) for p in protos) 

3161 ) 

3162 

3163 # Build a C string from the list. We don't need to save this off 

3164 # because OpenSSL immediately copies the data out. 

3165 input_str = _ffi.new("unsigned char[]", protostr) 

3166 

3167 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html: 

3168 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos() 

3169 # return 0 on success, and non-0 on failure. 

3170 # WARNING: these functions reverse the return value convention. 

3171 _openssl_assert( 

3172 _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0 

3173 ) 

3174 

3175 def get_alpn_proto_negotiated(self) -> bytes: 

3176 """ 

3177 Get the protocol that was negotiated by ALPN. 

3178 

3179 :returns: A bytestring of the protocol name. If no protocol has been 

3180 negotiated yet, returns an empty bytestring. 

3181 """ 

3182 data = _ffi.new("unsigned char **") 

3183 data_len = _ffi.new("unsigned int *") 

3184 

3185 _lib.SSL_get0_alpn_selected(self._ssl, data, data_len) 

3186 

3187 if not data_len: 

3188 return b"" 

3189 

3190 return _ffi.buffer(data[0], data_len[0])[:] 

3191 

3192 def get_selected_srtp_profile(self) -> bytes: 

3193 """ 

3194 Get the SRTP protocol which was negotiated. 

3195 

3196 :returns: A bytestring of the SRTP profile name. If no profile has been 

3197 negotiated yet, returns an empty bytestring. 

3198 """ 

3199 profile = _lib.SSL_get_selected_srtp_profile(self._ssl) 

3200 if not profile: 

3201 return b"" 

3202 

3203 return _ffi.string(profile.name) 

3204 

3205 def request_ocsp(self) -> None: 

3206 """ 

3207 Called to request that the server sends stapled OCSP data, if 

3208 available. If this is not called on the client side then the server 

3209 will not send OCSP data. Should be used in conjunction with 

3210 :meth:`Context.set_ocsp_client_callback`. 

3211 """ 

3212 rc = _lib.SSL_set_tlsext_status_type( 

3213 self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp 

3214 ) 

3215 _openssl_assert(rc == 1) 

3216 

3217 def set_info_callback( 

3218 self, callback: Callable[[Connection, int, int], None] 

3219 ) -> None: 

3220 """ 

3221 Set the information callback to *callback*. This function will be 

3222 called from time to time during SSL handshakes. 

3223 

3224 :param callback: The Python callback to use. This should take three 

3225 arguments: a Connection object and two integers. The first integer 

3226 specifies where in the SSL handshake the function was called, and 

3227 the other the return code from a (possibly failed) internal 

3228 function call. 

3229 :return: None 

3230 """ 

3231 

3232 @wraps(callback) 

3233 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def] 

3234 callback(Connection._reverse_mapping[ssl], where, return_code) 

3235 

3236 self._info_callback = _ffi.callback( 

3237 "void (*)(const SSL *, int, int)", wrapper 

3238 ) 

3239 _lib.SSL_set_info_callback(self._ssl, self._info_callback)