Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/SSL.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import socket
5import typing
6import warnings
7from collections.abc import Sequence
8from errno import errorcode
9from functools import partial, wraps
10from itertools import chain, count
11from sys import platform
12from typing import Any, Callable, Optional, TypeVar
13from weakref import WeakValueDictionary
15from cryptography import x509
16from cryptography.hazmat.primitives.asymmetric import ec
18from OpenSSL._util import (
19 StrOrBytesPath as _StrOrBytesPath,
20)
21from OpenSSL._util import (
22 exception_from_error_queue as _exception_from_error_queue,
23)
24from OpenSSL._util import (
25 ffi as _ffi,
26)
27from OpenSSL._util import (
28 lib as _lib,
29)
30from OpenSSL._util import (
31 make_assert as _make_assert,
32)
33from OpenSSL._util import (
34 no_zero_allocator as _no_zero_allocator,
35)
36from OpenSSL._util import (
37 path_bytes as _path_bytes,
38)
39from OpenSSL._util import (
40 text_to_bytes_and_warn as _text_to_bytes_and_warn,
41)
42from OpenSSL.crypto import (
43 FILETYPE_PEM,
44 X509,
45 PKey,
46 X509Name,
47 X509Store,
48 _EllipticCurve,
49 _PassphraseHelper,
50 _PrivateKey,
51)
53__all__ = [
54 "DTLS_CLIENT_METHOD",
55 "DTLS_METHOD",
56 "DTLS_SERVER_METHOD",
57 "MODE_RELEASE_BUFFERS",
58 "NO_OVERLAPPING_PROTOCOLS",
59 "OPENSSL_BUILT_ON",
60 "OPENSSL_CFLAGS",
61 "OPENSSL_DIR",
62 "OPENSSL_PLATFORM",
63 "OPENSSL_VERSION",
64 "OPENSSL_VERSION_NUMBER",
65 "OP_ALL",
66 "OP_CIPHER_SERVER_PREFERENCE",
67 "OP_COOKIE_EXCHANGE",
68 "OP_DONT_INSERT_EMPTY_FRAGMENTS",
69 "OP_EPHEMERAL_RSA",
70 "OP_MICROSOFT_BIG_SSLV3_BUFFER",
71 "OP_MICROSOFT_SESS_ID_BUG",
72 "OP_MSIE_SSLV2_RSA_PADDING",
73 "OP_NETSCAPE_CA_DN_BUG",
74 "OP_NETSCAPE_CHALLENGE_BUG",
75 "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG",
76 "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG",
77 "OP_NO_COMPRESSION",
78 "OP_NO_QUERY_MTU",
79 "OP_NO_TICKET",
80 "OP_PKCS1_CHECK_1",
81 "OP_PKCS1_CHECK_2",
82 "OP_SINGLE_DH_USE",
83 "OP_SINGLE_ECDH_USE",
84 "OP_SSLEAY_080_CLIENT_DH_BUG",
85 "OP_SSLREF2_REUSE_CERT_TYPE_BUG",
86 "OP_TLS_BLOCK_PADDING_BUG",
87 "OP_TLS_D5_BUG",
88 "OP_TLS_ROLLBACK_BUG",
89 "RECEIVED_SHUTDOWN",
90 "SENT_SHUTDOWN",
91 "SESS_CACHE_BOTH",
92 "SESS_CACHE_CLIENT",
93 "SESS_CACHE_NO_AUTO_CLEAR",
94 "SESS_CACHE_NO_INTERNAL",
95 "SESS_CACHE_NO_INTERNAL_LOOKUP",
96 "SESS_CACHE_NO_INTERNAL_STORE",
97 "SESS_CACHE_OFF",
98 "SESS_CACHE_SERVER",
99 "SSL3_VERSION",
100 "SSLEAY_BUILT_ON",
101 "SSLEAY_CFLAGS",
102 "SSLEAY_DIR",
103 "SSLEAY_PLATFORM",
104 "SSLEAY_VERSION",
105 "SSL_CB_ACCEPT_EXIT",
106 "SSL_CB_ACCEPT_LOOP",
107 "SSL_CB_ALERT",
108 "SSL_CB_CONNECT_EXIT",
109 "SSL_CB_CONNECT_LOOP",
110 "SSL_CB_EXIT",
111 "SSL_CB_HANDSHAKE_DONE",
112 "SSL_CB_HANDSHAKE_START",
113 "SSL_CB_LOOP",
114 "SSL_CB_READ",
115 "SSL_CB_READ_ALERT",
116 "SSL_CB_WRITE",
117 "SSL_CB_WRITE_ALERT",
118 "SSL_ST_ACCEPT",
119 "SSL_ST_CONNECT",
120 "SSL_ST_MASK",
121 "TLS1_1_VERSION",
122 "TLS1_2_VERSION",
123 "TLS1_3_VERSION",
124 "TLS1_VERSION",
125 "TLS_CLIENT_METHOD",
126 "TLS_METHOD",
127 "TLS_SERVER_METHOD",
128 "VERIFY_CLIENT_ONCE",
129 "VERIFY_FAIL_IF_NO_PEER_CERT",
130 "VERIFY_NONE",
131 "VERIFY_PEER",
132 "Connection",
133 "Context",
134 "Error",
135 "OP_NO_SSLv2",
136 "OP_NO_SSLv3",
137 "OP_NO_TLSv1",
138 "OP_NO_TLSv1_1",
139 "OP_NO_TLSv1_2",
140 "OP_NO_TLSv1_3",
141 "SSLeay_version",
142 "SSLv23_METHOD",
143 "Session",
144 "SysCallError",
145 "TLSv1_1_METHOD",
146 "TLSv1_2_METHOD",
147 "TLSv1_METHOD",
148 "WantReadError",
149 "WantWriteError",
150 "WantX509LookupError",
151 "X509VerificationCodes",
152 "ZeroReturnError",
153]
156OPENSSL_VERSION_NUMBER: int = _lib.OPENSSL_VERSION_NUMBER
157OPENSSL_VERSION: int = _lib.OPENSSL_VERSION
158OPENSSL_CFLAGS: int = _lib.OPENSSL_CFLAGS
159OPENSSL_PLATFORM: int = _lib.OPENSSL_PLATFORM
160OPENSSL_DIR: int = _lib.OPENSSL_DIR
161OPENSSL_BUILT_ON: int = _lib.OPENSSL_BUILT_ON
163SSLEAY_VERSION = OPENSSL_VERSION
164SSLEAY_CFLAGS = OPENSSL_CFLAGS
165SSLEAY_PLATFORM = OPENSSL_PLATFORM
166SSLEAY_DIR = OPENSSL_DIR
167SSLEAY_BUILT_ON = OPENSSL_BUILT_ON
169SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN
170RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN
172SSLv23_METHOD = 3
173TLSv1_METHOD = 4
174TLSv1_1_METHOD = 5
175TLSv1_2_METHOD = 6
176TLS_METHOD = 7
177TLS_SERVER_METHOD = 8
178TLS_CLIENT_METHOD = 9
179DTLS_METHOD = 10
180DTLS_SERVER_METHOD = 11
181DTLS_CLIENT_METHOD = 12
183SSL3_VERSION: int = _lib.SSL3_VERSION
184TLS1_VERSION: int = _lib.TLS1_VERSION
185TLS1_1_VERSION: int = _lib.TLS1_1_VERSION
186TLS1_2_VERSION: int = _lib.TLS1_2_VERSION
187TLS1_3_VERSION: int = _lib.TLS1_3_VERSION
189OP_NO_SSLv2: int = _lib.SSL_OP_NO_SSLv2
190OP_NO_SSLv3: int = _lib.SSL_OP_NO_SSLv3
191OP_NO_TLSv1: int = _lib.SSL_OP_NO_TLSv1
192OP_NO_TLSv1_1: int = _lib.SSL_OP_NO_TLSv1_1
193OP_NO_TLSv1_2: int = _lib.SSL_OP_NO_TLSv1_2
194OP_NO_TLSv1_3: int = _lib.SSL_OP_NO_TLSv1_3
196MODE_RELEASE_BUFFERS: int = _lib.SSL_MODE_RELEASE_BUFFERS
198OP_SINGLE_DH_USE: int = _lib.SSL_OP_SINGLE_DH_USE
199OP_SINGLE_ECDH_USE: int = _lib.SSL_OP_SINGLE_ECDH_USE
200OP_EPHEMERAL_RSA: int = _lib.SSL_OP_EPHEMERAL_RSA
201OP_MICROSOFT_SESS_ID_BUG: int = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG
202OP_NETSCAPE_CHALLENGE_BUG: int = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG
203OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG: int = (
204 _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG
205)
206OP_SSLREF2_REUSE_CERT_TYPE_BUG: int = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG
207OP_MICROSOFT_BIG_SSLV3_BUFFER: int = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
208OP_MSIE_SSLV2_RSA_PADDING: int = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING
209OP_SSLEAY_080_CLIENT_DH_BUG: int = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG
210OP_TLS_D5_BUG: int = _lib.SSL_OP_TLS_D5_BUG
211OP_TLS_BLOCK_PADDING_BUG: int = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG
212OP_DONT_INSERT_EMPTY_FRAGMENTS: int = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS
213OP_CIPHER_SERVER_PREFERENCE: int = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE
214OP_TLS_ROLLBACK_BUG: int = _lib.SSL_OP_TLS_ROLLBACK_BUG
215OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1
216OP_PKCS1_CHECK_2: int = _lib.SSL_OP_PKCS1_CHECK_2
217OP_NETSCAPE_CA_DN_BUG: int = _lib.SSL_OP_NETSCAPE_CA_DN_BUG
218OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG: int = (
219 _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG
220)
221OP_NO_COMPRESSION: int = _lib.SSL_OP_NO_COMPRESSION
223OP_NO_QUERY_MTU: int = _lib.SSL_OP_NO_QUERY_MTU
224OP_COOKIE_EXCHANGE: int = _lib.SSL_OP_COOKIE_EXCHANGE
225OP_NO_TICKET: int = _lib.SSL_OP_NO_TICKET
227try:
228 OP_NO_RENEGOTIATION: int = _lib.SSL_OP_NO_RENEGOTIATION
229 __all__.append("OP_NO_RENEGOTIATION")
230except AttributeError:
231 pass
233try:
234 OP_IGNORE_UNEXPECTED_EOF: int = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF
235 __all__.append("OP_IGNORE_UNEXPECTED_EOF")
236except AttributeError:
237 pass
239try:
240 OP_LEGACY_SERVER_CONNECT: int = _lib.SSL_OP_LEGACY_SERVER_CONNECT
241 __all__.append("OP_LEGACY_SERVER_CONNECT")
242except AttributeError:
243 pass
245OP_ALL: int = _lib.SSL_OP_ALL
247VERIFY_PEER: int = _lib.SSL_VERIFY_PEER
248VERIFY_FAIL_IF_NO_PEER_CERT: int = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT
249VERIFY_CLIENT_ONCE: int = _lib.SSL_VERIFY_CLIENT_ONCE
250VERIFY_NONE: int = _lib.SSL_VERIFY_NONE
252SESS_CACHE_OFF: int = _lib.SSL_SESS_CACHE_OFF
253SESS_CACHE_CLIENT: int = _lib.SSL_SESS_CACHE_CLIENT
254SESS_CACHE_SERVER: int = _lib.SSL_SESS_CACHE_SERVER
255SESS_CACHE_BOTH: int = _lib.SSL_SESS_CACHE_BOTH
256SESS_CACHE_NO_AUTO_CLEAR: int = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR
257SESS_CACHE_NO_INTERNAL_LOOKUP: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP
258SESS_CACHE_NO_INTERNAL_STORE: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE
259SESS_CACHE_NO_INTERNAL: int = _lib.SSL_SESS_CACHE_NO_INTERNAL
261SSL_ST_CONNECT: int = _lib.SSL_ST_CONNECT
262SSL_ST_ACCEPT: int = _lib.SSL_ST_ACCEPT
263SSL_ST_MASK: int = _lib.SSL_ST_MASK
265SSL_CB_LOOP: int = _lib.SSL_CB_LOOP
266SSL_CB_EXIT: int = _lib.SSL_CB_EXIT
267SSL_CB_READ: int = _lib.SSL_CB_READ
268SSL_CB_WRITE: int = _lib.SSL_CB_WRITE
269SSL_CB_ALERT: int = _lib.SSL_CB_ALERT
270SSL_CB_READ_ALERT: int = _lib.SSL_CB_READ_ALERT
271SSL_CB_WRITE_ALERT: int = _lib.SSL_CB_WRITE_ALERT
272SSL_CB_ACCEPT_LOOP: int = _lib.SSL_CB_ACCEPT_LOOP
273SSL_CB_ACCEPT_EXIT: int = _lib.SSL_CB_ACCEPT_EXIT
274SSL_CB_CONNECT_LOOP: int = _lib.SSL_CB_CONNECT_LOOP
275SSL_CB_CONNECT_EXIT: int = _lib.SSL_CB_CONNECT_EXIT
276SSL_CB_HANDSHAKE_START: int = _lib.SSL_CB_HANDSHAKE_START
277SSL_CB_HANDSHAKE_DONE: int = _lib.SSL_CB_HANDSHAKE_DONE
279_Buffer = typing.Union[bytes, bytearray, memoryview]
280_T = TypeVar("_T")
283class _NoOverlappingProtocols:
284 pass
287NO_OVERLAPPING_PROTOCOLS = _NoOverlappingProtocols()
289# Callback types.
290_ALPNSelectCallback = Callable[
291 [
292 "Connection",
293 typing.List[bytes],
294 ],
295 typing.Union[bytes, _NoOverlappingProtocols],
296]
297_CookieGenerateCallback = Callable[["Connection"], bytes]
298_CookieVerifyCallback = Callable[["Connection", bytes], bool]
299_OCSPClientCallback = Callable[["Connection", bytes, Optional[_T]], bool]
300_OCSPServerCallback = Callable[["Connection", Optional[_T]], bytes]
301_PassphraseCallback = Callable[[int, bool, Optional[_T]], bytes]
302_VerifyCallback = Callable[["Connection", X509, int, int, int], bool]
305class X509VerificationCodes:
306 """
307 Success and error codes for X509 verification, as returned by the
308 underlying ``X509_STORE_CTX_get_error()`` function and passed by pyOpenSSL
309 to verification callback functions.
311 See `OpenSSL Verification Errors
312 <https://www.openssl.org/docs/manmaster/man3/X509_verify_cert_error_string.html#ERROR-CODES>`_
313 for details.
314 """
316 OK = _lib.X509_V_OK
317 ERR_UNABLE_TO_GET_ISSUER_CERT = _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT
318 ERR_UNABLE_TO_GET_CRL = _lib.X509_V_ERR_UNABLE_TO_GET_CRL
319 ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE = (
320 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE
321 )
322 ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE = (
323 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE
324 )
325 ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY = (
326 _lib.X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY
327 )
328 ERR_CERT_SIGNATURE_FAILURE = _lib.X509_V_ERR_CERT_SIGNATURE_FAILURE
329 ERR_CRL_SIGNATURE_FAILURE = _lib.X509_V_ERR_CRL_SIGNATURE_FAILURE
330 ERR_CERT_NOT_YET_VALID = _lib.X509_V_ERR_CERT_NOT_YET_VALID
331 ERR_CERT_HAS_EXPIRED = _lib.X509_V_ERR_CERT_HAS_EXPIRED
332 ERR_CRL_NOT_YET_VALID = _lib.X509_V_ERR_CRL_NOT_YET_VALID
333 ERR_CRL_HAS_EXPIRED = _lib.X509_V_ERR_CRL_HAS_EXPIRED
334 ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD = (
335 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD
336 )
337 ERR_ERROR_IN_CERT_NOT_AFTER_FIELD = (
338 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD
339 )
340 ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD = (
341 _lib.X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD
342 )
343 ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD = (
344 _lib.X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD
345 )
346 ERR_OUT_OF_MEM = _lib.X509_V_ERR_OUT_OF_MEM
347 ERR_DEPTH_ZERO_SELF_SIGNED_CERT = (
348 _lib.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT
349 )
350 ERR_SELF_SIGNED_CERT_IN_CHAIN = _lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN
351 ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY = (
352 _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY
353 )
354 ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE = (
355 _lib.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
356 )
357 ERR_CERT_CHAIN_TOO_LONG = _lib.X509_V_ERR_CERT_CHAIN_TOO_LONG
358 ERR_CERT_REVOKED = _lib.X509_V_ERR_CERT_REVOKED
359 ERR_INVALID_CA = _lib.X509_V_ERR_INVALID_CA
360 ERR_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PATH_LENGTH_EXCEEDED
361 ERR_INVALID_PURPOSE = _lib.X509_V_ERR_INVALID_PURPOSE
362 ERR_CERT_UNTRUSTED = _lib.X509_V_ERR_CERT_UNTRUSTED
363 ERR_CERT_REJECTED = _lib.X509_V_ERR_CERT_REJECTED
364 ERR_SUBJECT_ISSUER_MISMATCH = _lib.X509_V_ERR_SUBJECT_ISSUER_MISMATCH
365 ERR_AKID_SKID_MISMATCH = _lib.X509_V_ERR_AKID_SKID_MISMATCH
366 ERR_AKID_ISSUER_SERIAL_MISMATCH = (
367 _lib.X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH
368 )
369 ERR_KEYUSAGE_NO_CERTSIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CERTSIGN
370 ERR_UNABLE_TO_GET_CRL_ISSUER = _lib.X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER
371 ERR_UNHANDLED_CRITICAL_EXTENSION = (
372 _lib.X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION
373 )
374 ERR_KEYUSAGE_NO_CRL_SIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CRL_SIGN
375 ERR_UNHANDLED_CRITICAL_CRL_EXTENSION = (
376 _lib.X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION
377 )
378 ERR_INVALID_NON_CA = _lib.X509_V_ERR_INVALID_NON_CA
379 ERR_PROXY_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED
380 ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE = (
381 _lib.X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE
382 )
383 ERR_PROXY_CERTIFICATES_NOT_ALLOWED = (
384 _lib.X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED
385 )
386 ERR_INVALID_EXTENSION = _lib.X509_V_ERR_INVALID_EXTENSION
387 ERR_INVALID_POLICY_EXTENSION = _lib.X509_V_ERR_INVALID_POLICY_EXTENSION
388 ERR_NO_EXPLICIT_POLICY = _lib.X509_V_ERR_NO_EXPLICIT_POLICY
389 ERR_DIFFERENT_CRL_SCOPE = _lib.X509_V_ERR_DIFFERENT_CRL_SCOPE
390 ERR_UNSUPPORTED_EXTENSION_FEATURE = (
391 _lib.X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE
392 )
393 ERR_UNNESTED_RESOURCE = _lib.X509_V_ERR_UNNESTED_RESOURCE
394 ERR_PERMITTED_VIOLATION = _lib.X509_V_ERR_PERMITTED_VIOLATION
395 ERR_EXCLUDED_VIOLATION = _lib.X509_V_ERR_EXCLUDED_VIOLATION
396 ERR_SUBTREE_MINMAX = _lib.X509_V_ERR_SUBTREE_MINMAX
397 ERR_UNSUPPORTED_CONSTRAINT_TYPE = (
398 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE
399 )
400 ERR_UNSUPPORTED_CONSTRAINT_SYNTAX = (
401 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX
402 )
403 ERR_UNSUPPORTED_NAME_SYNTAX = _lib.X509_V_ERR_UNSUPPORTED_NAME_SYNTAX
404 ERR_CRL_PATH_VALIDATION_ERROR = _lib.X509_V_ERR_CRL_PATH_VALIDATION_ERROR
405 ERR_HOSTNAME_MISMATCH = _lib.X509_V_ERR_HOSTNAME_MISMATCH
406 ERR_EMAIL_MISMATCH = _lib.X509_V_ERR_EMAIL_MISMATCH
407 ERR_IP_ADDRESS_MISMATCH = _lib.X509_V_ERR_IP_ADDRESS_MISMATCH
408 ERR_APPLICATION_VERIFICATION = _lib.X509_V_ERR_APPLICATION_VERIFICATION
411# Taken from https://golang.org/src/crypto/x509/root_linux.go
412_CERTIFICATE_FILE_LOCATIONS = [
413 "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc.
414 "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6
415 "/etc/ssl/ca-bundle.pem", # OpenSUSE
416 "/etc/pki/tls/cacert.pem", # OpenELEC
417 "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7
418]
420_CERTIFICATE_PATH_LOCATIONS = [
421 "/etc/ssl/certs", # SLES10/SLES11
422]
424# These values are compared to output from cffi's ffi.string so they must be
425# byte strings.
426_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs"
427_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem"
430class Error(Exception):
431 """
432 An error occurred in an `OpenSSL.SSL` API.
433 """
436_raise_current_error = partial(_exception_from_error_queue, Error)
437_openssl_assert = _make_assert(Error)
440class WantReadError(Error):
441 pass
444class WantWriteError(Error):
445 pass
448class WantX509LookupError(Error):
449 pass
452class ZeroReturnError(Error):
453 pass
456class SysCallError(Error):
457 pass
460class _CallbackExceptionHelper:
461 """
462 A base class for wrapper classes that allow for intelligent exception
463 handling in OpenSSL callbacks.
465 :ivar list _problems: Any exceptions that occurred while executing in a
466 context where they could not be raised in the normal way. Typically
467 this is because OpenSSL has called into some Python code and requires a
468 return value. The exceptions are saved to be raised later when it is
469 possible to do so.
470 """
472 def __init__(self) -> None:
473 self._problems: list[Exception] = []
475 def raise_if_problem(self) -> None:
476 """
477 Raise an exception from the OpenSSL error queue or that was previously
478 captured whe running a callback.
479 """
480 if self._problems:
481 try:
482 _raise_current_error()
483 except Error:
484 pass
485 raise self._problems.pop(0)
488class _VerifyHelper(_CallbackExceptionHelper):
489 """
490 Wrap a callback such that it can be used as a certificate verification
491 callback.
492 """
494 def __init__(self, callback: _VerifyCallback) -> None:
495 _CallbackExceptionHelper.__init__(self)
497 @wraps(callback)
498 def wrapper(ok, store_ctx): # type: ignore[no-untyped-def]
499 x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
500 _lib.X509_up_ref(x509)
501 cert = X509._from_raw_x509_ptr(x509)
502 error_number = _lib.X509_STORE_CTX_get_error(store_ctx)
503 error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx)
505 index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx()
506 ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index)
507 connection = Connection._reverse_mapping[ssl]
509 try:
510 result = callback(
511 connection, cert, error_number, error_depth, ok
512 )
513 except Exception as e:
514 self._problems.append(e)
515 return 0
516 else:
517 if result:
518 _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK)
519 return 1
520 else:
521 return 0
523 self.callback = _ffi.callback(
524 "int (*)(int, X509_STORE_CTX *)", wrapper
525 )
528class _ALPNSelectHelper(_CallbackExceptionHelper):
529 """
530 Wrap a callback such that it can be used as an ALPN selection callback.
531 """
533 def __init__(self, callback: _ALPNSelectCallback) -> None:
534 _CallbackExceptionHelper.__init__(self)
536 @wraps(callback)
537 def wrapper(ssl, out, outlen, in_, inlen, arg): # type: ignore[no-untyped-def]
538 try:
539 conn = Connection._reverse_mapping[ssl]
541 # The string passed to us is made up of multiple
542 # length-prefixed bytestrings. We need to split that into a
543 # list.
544 instr = _ffi.buffer(in_, inlen)[:]
545 protolist = []
546 while instr:
547 encoded_len = instr[0]
548 proto = instr[1 : encoded_len + 1]
549 protolist.append(proto)
550 instr = instr[encoded_len + 1 :]
552 # Call the callback
553 outbytes = callback(conn, protolist)
554 any_accepted = True
555 if outbytes is NO_OVERLAPPING_PROTOCOLS:
556 outbytes = b""
557 any_accepted = False
558 elif not isinstance(outbytes, bytes):
559 raise TypeError(
560 "ALPN callback must return a bytestring or the "
561 "special NO_OVERLAPPING_PROTOCOLS sentinel value."
562 )
564 # Save our callback arguments on the connection object to make
565 # sure that they don't get freed before OpenSSL can use them.
566 # Then, return them in the appropriate output parameters.
567 conn._alpn_select_callback_args = [
568 _ffi.new("unsigned char *", len(outbytes)),
569 _ffi.new("unsigned char[]", outbytes),
570 ]
571 outlen[0] = conn._alpn_select_callback_args[0][0]
572 out[0] = conn._alpn_select_callback_args[1]
573 if not any_accepted:
574 return _lib.SSL_TLSEXT_ERR_NOACK
575 return _lib.SSL_TLSEXT_ERR_OK
576 except Exception as e:
577 self._problems.append(e)
578 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL
580 self.callback = _ffi.callback(
581 (
582 "int (*)(SSL *, unsigned char **, unsigned char *, "
583 "const unsigned char *, unsigned int, void *)"
584 ),
585 wrapper,
586 )
589class _OCSPServerCallbackHelper(_CallbackExceptionHelper):
590 """
591 Wrap a callback such that it can be used as an OCSP callback for the server
592 side.
594 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
595 ways. For servers, that callback is expected to retrieve some OCSP data and
596 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
597 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
598 is expected to check the OCSP data, and returns a negative value on error,
599 0 if the response is not acceptable, or positive if it is. These are
600 mutually exclusive return code behaviours, and they mean that we need two
601 helpers so that we always return an appropriate error code if the user's
602 code throws an exception.
604 Given that we have to have two helpers anyway, these helpers are a bit more
605 helpery than most: specifically, they hide a few more of the OpenSSL
606 functions so that the user has an easier time writing these callbacks.
608 This helper implements the server side.
609 """
611 def __init__(self, callback: _OCSPServerCallback[Any]) -> None:
612 _CallbackExceptionHelper.__init__(self)
614 @wraps(callback)
615 def wrapper(ssl, cdata): # type: ignore[no-untyped-def]
616 try:
617 conn = Connection._reverse_mapping[ssl]
619 # Extract the data if any was provided.
620 if cdata != _ffi.NULL:
621 data = _ffi.from_handle(cdata)
622 else:
623 data = None
625 # Call the callback.
626 ocsp_data = callback(conn, data)
628 if not isinstance(ocsp_data, bytes):
629 raise TypeError("OCSP callback must return a bytestring.")
631 # If the OCSP data was provided, we will pass it to OpenSSL.
632 # However, we have an early exit here: if no OCSP data was
633 # provided we will just exit out and tell OpenSSL that there
634 # is nothing to do.
635 if not ocsp_data:
636 return 3 # SSL_TLSEXT_ERR_NOACK
638 # OpenSSL takes ownership of this data and expects it to have
639 # been allocated by OPENSSL_malloc.
640 ocsp_data_length = len(ocsp_data)
641 data_ptr = _lib.OPENSSL_malloc(ocsp_data_length)
642 _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data
644 _lib.SSL_set_tlsext_status_ocsp_resp(
645 ssl, data_ptr, ocsp_data_length
646 )
648 return 0
649 except Exception as e:
650 self._problems.append(e)
651 return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
653 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
656class _OCSPClientCallbackHelper(_CallbackExceptionHelper):
657 """
658 Wrap a callback such that it can be used as an OCSP callback for the client
659 side.
661 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
662 ways. For servers, that callback is expected to retrieve some OCSP data and
663 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
664 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
665 is expected to check the OCSP data, and returns a negative value on error,
666 0 if the response is not acceptable, or positive if it is. These are
667 mutually exclusive return code behaviours, and they mean that we need two
668 helpers so that we always return an appropriate error code if the user's
669 code throws an exception.
671 Given that we have to have two helpers anyway, these helpers are a bit more
672 helpery than most: specifically, they hide a few more of the OpenSSL
673 functions so that the user has an easier time writing these callbacks.
675 This helper implements the client side.
676 """
678 def __init__(self, callback: _OCSPClientCallback[Any]) -> None:
679 _CallbackExceptionHelper.__init__(self)
681 @wraps(callback)
682 def wrapper(ssl, cdata): # type: ignore[no-untyped-def]
683 try:
684 conn = Connection._reverse_mapping[ssl]
686 # Extract the data if any was provided.
687 if cdata != _ffi.NULL:
688 data = _ffi.from_handle(cdata)
689 else:
690 data = None
692 # Get the OCSP data.
693 ocsp_ptr = _ffi.new("unsigned char **")
694 ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr)
695 if ocsp_len < 0:
696 # No OCSP data.
697 ocsp_data = b""
698 else:
699 # Copy the OCSP data, then pass it to the callback.
700 ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:]
702 valid = callback(conn, ocsp_data, data)
704 # Return 1 on success or 0 on error.
705 return int(bool(valid))
707 except Exception as e:
708 self._problems.append(e)
709 # Return negative value if an exception is hit.
710 return -1
712 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
715class _CookieGenerateCallbackHelper(_CallbackExceptionHelper):
716 def __init__(self, callback: _CookieGenerateCallback) -> None:
717 _CallbackExceptionHelper.__init__(self)
719 @wraps(callback)
720 def wrapper(ssl, out, outlen): # type: ignore[no-untyped-def]
721 try:
722 conn = Connection._reverse_mapping[ssl]
723 cookie = callback(conn)
724 out[0 : len(cookie)] = cookie
725 outlen[0] = len(cookie)
726 return 1
727 except Exception as e:
728 self._problems.append(e)
729 # "a zero return value can be used to abort the handshake"
730 return 0
732 self.callback = _ffi.callback(
733 "int (*)(SSL *, unsigned char *, unsigned int *)",
734 wrapper,
735 )
738class _CookieVerifyCallbackHelper(_CallbackExceptionHelper):
739 def __init__(self, callback: _CookieVerifyCallback) -> None:
740 _CallbackExceptionHelper.__init__(self)
742 @wraps(callback)
743 def wrapper(ssl, c_cookie, cookie_len): # type: ignore[no-untyped-def]
744 try:
745 conn = Connection._reverse_mapping[ssl]
746 return callback(conn, bytes(c_cookie[0:cookie_len]))
747 except Exception as e:
748 self._problems.append(e)
749 return 0
751 self.callback = _ffi.callback(
752 "int (*)(SSL *, unsigned char *, unsigned int)",
753 wrapper,
754 )
757def _asFileDescriptor(obj: Any) -> int:
758 fd = None
759 if not isinstance(obj, int):
760 meth = getattr(obj, "fileno", None)
761 if meth is not None:
762 obj = meth()
764 if isinstance(obj, int):
765 fd = obj
767 if not isinstance(fd, int):
768 raise TypeError("argument must be an int, or have a fileno() method.")
769 elif fd < 0:
770 raise ValueError(
771 f"file descriptor cannot be a negative integer ({fd:i})"
772 )
774 return fd
777def OpenSSL_version(type: int) -> bytes:
778 """
779 Return a string describing the version of OpenSSL in use.
781 :param type: One of the :const:`OPENSSL_` constants defined in this module.
782 """
783 return _ffi.string(_lib.OpenSSL_version(type))
786SSLeay_version = OpenSSL_version
789def _make_requires(flag: int, error: str) -> Callable[[_T], _T]:
790 """
791 Builds a decorator that ensures that functions that rely on OpenSSL
792 functions that are not present in this build raise NotImplementedError,
793 rather than AttributeError coming out of cryptography.
795 :param flag: A cryptography flag that guards the functions, e.g.
796 ``Cryptography_HAS_NEXTPROTONEG``.
797 :param error: The string to be used in the exception if the flag is false.
798 """
800 def _requires_decorator(func): # type: ignore[no-untyped-def]
801 if not flag:
803 @wraps(func)
804 def explode(*args, **kwargs): # type: ignore[no-untyped-def]
805 raise NotImplementedError(error)
807 return explode
808 else:
809 return func
811 return _requires_decorator
814_requires_keylog = _make_requires(
815 getattr(_lib, "Cryptography_HAS_KEYLOG", 0), "Key logging not available"
816)
819class Session:
820 """
821 A class representing an SSL session. A session defines certain connection
822 parameters which may be re-used to speed up the setup of subsequent
823 connections.
825 .. versionadded:: 0.14
826 """
828 _session: Any
831F = TypeVar("F", bound=Callable[..., Any])
834def _require_not_used(f: F) -> F:
835 @wraps(f)
836 def inner(self: Context, *args: Any, **kwargs: Any) -> Any:
837 if self._used:
838 warnings.warn(
839 (
840 "Attempting to mutate a Context after a Connection was "
841 "created. In the future, this will raise an exception"
842 ),
843 DeprecationWarning,
844 stacklevel=2,
845 )
846 return f(self, *args, **kwargs)
848 return typing.cast(F, inner)
851class Context:
852 """
853 :class:`OpenSSL.SSL.Context` instances define the parameters for setting
854 up new SSL connections.
856 :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD,
857 DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD.
858 SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should
859 not be used.
860 """
862 _methods: typing.ClassVar[
863 dict[int, tuple[Callable[[], Any], int | None]]
864 ] = {
865 SSLv23_METHOD: (_lib.TLS_method, None),
866 TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION),
867 TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION),
868 TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION),
869 TLS_METHOD: (_lib.TLS_method, None),
870 TLS_SERVER_METHOD: (_lib.TLS_server_method, None),
871 TLS_CLIENT_METHOD: (_lib.TLS_client_method, None),
872 DTLS_METHOD: (_lib.DTLS_method, None),
873 DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None),
874 DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None),
875 }
877 def __init__(self, method: int) -> None:
878 if not isinstance(method, int):
879 raise TypeError("method must be an integer")
881 try:
882 method_func, version = self._methods[method]
883 except KeyError:
884 raise ValueError("No such protocol")
886 method_obj = method_func()
887 _openssl_assert(method_obj != _ffi.NULL)
889 context = _lib.SSL_CTX_new(method_obj)
890 _openssl_assert(context != _ffi.NULL)
891 context = _ffi.gc(context, _lib.SSL_CTX_free)
893 self._context = context
894 self._used = False
895 self._passphrase_helper: _PassphraseHelper | None = None
896 self._passphrase_callback: _PassphraseCallback[Any] | None = None
897 self._passphrase_userdata: Any | None = None
898 self._verify_helper: _VerifyHelper | None = None
899 self._verify_callback: _VerifyCallback | None = None
900 self._info_callback = None
901 self._keylog_callback = None
902 self._tlsext_servername_callback = None
903 self._app_data = None
904 self._alpn_select_helper: _ALPNSelectHelper | None = None
905 self._alpn_select_callback: _ALPNSelectCallback | None = None
906 self._ocsp_helper: (
907 _OCSPClientCallbackHelper | _OCSPServerCallbackHelper | None
908 ) = None
909 self._ocsp_callback: (
910 _OCSPClientCallback[Any] | _OCSPServerCallback[Any] | None
911 ) = None
912 self._ocsp_data: Any | None = None
913 self._cookie_generate_helper: _CookieGenerateCallbackHelper | None = (
914 None
915 )
916 self._cookie_verify_helper: _CookieVerifyCallbackHelper | None = None
918 self.set_mode(
919 _lib.SSL_MODE_ENABLE_PARTIAL_WRITE
920 | _lib.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER
921 )
922 if version is not None:
923 self.set_min_proto_version(version)
924 self.set_max_proto_version(version)
926 @_require_not_used
927 def set_min_proto_version(self, version: int) -> None:
928 """
929 Set the minimum supported protocol version. Setting the minimum
930 version to 0 will enable protocol versions down to the lowest version
931 supported by the library.
933 If the underlying OpenSSL build is missing support for the selected
934 version, this method will raise an exception.
935 """
936 _openssl_assert(
937 _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1
938 )
940 @_require_not_used
941 def set_max_proto_version(self, version: int) -> None:
942 """
943 Set the maximum supported protocol version. Setting the maximum
944 version to 0 will enable protocol versions up to the highest version
945 supported by the library.
947 If the underlying OpenSSL build is missing support for the selected
948 version, this method will raise an exception.
949 """
950 _openssl_assert(
951 _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1
952 )
954 @_require_not_used
955 def load_verify_locations(
956 self,
957 cafile: _StrOrBytesPath | None,
958 capath: _StrOrBytesPath | None = None,
959 ) -> None:
960 """
961 Let SSL know where we can find trusted certificates for the certificate
962 chain. Note that the certificates have to be in PEM format.
964 If capath is passed, it must be a directory prepared using the
965 ``c_rehash`` tool included with OpenSSL. Either, but not both, of
966 *pemfile* or *capath* may be :data:`None`.
968 :param cafile: In which file we can find the certificates (``bytes`` or
969 ``str``).
970 :param capath: In which directory we can find the certificates
971 (``bytes`` or ``str``).
973 :return: None
974 """
975 if cafile is None:
976 cafile = _ffi.NULL
977 else:
978 cafile = _path_bytes(cafile)
980 if capath is None:
981 capath = _ffi.NULL
982 else:
983 capath = _path_bytes(capath)
985 load_result = _lib.SSL_CTX_load_verify_locations(
986 self._context, cafile, capath
987 )
988 if not load_result:
989 _raise_current_error()
991 def _wrap_callback(
992 self, callback: _PassphraseCallback[_T]
993 ) -> _PassphraseHelper:
994 @wraps(callback)
995 def wrapper(size: int, verify: bool, userdata: Any) -> bytes:
996 return callback(size, verify, self._passphrase_userdata)
998 return _PassphraseHelper(
999 FILETYPE_PEM, wrapper, more_args=True, truncate=True
1000 )
1002 @_require_not_used
1003 def set_passwd_cb(
1004 self,
1005 callback: _PassphraseCallback[_T],
1006 userdata: _T | None = None,
1007 ) -> None:
1008 """
1009 Set the passphrase callback. This function will be called
1010 when a private key with a passphrase is loaded.
1012 :param callback: The Python callback to use. This must accept three
1013 positional arguments. First, an integer giving the maximum length
1014 of the passphrase it may return. If the returned passphrase is
1015 longer than this, it will be truncated. Second, a boolean value
1016 which will be true if the user should be prompted for the
1017 passphrase twice and the callback should verify that the two values
1018 supplied are equal. Third, the value given as the *userdata*
1019 parameter to :meth:`set_passwd_cb`. The *callback* must return
1020 a byte string. If an error occurs, *callback* should return a false
1021 value (e.g. an empty string).
1022 :param userdata: (optional) A Python object which will be given as
1023 argument to the callback
1024 :return: None
1025 """
1026 if not callable(callback):
1027 raise TypeError("callback must be callable")
1029 self._passphrase_helper = self._wrap_callback(callback)
1030 self._passphrase_callback = self._passphrase_helper.callback
1031 _lib.SSL_CTX_set_default_passwd_cb(
1032 self._context, self._passphrase_callback
1033 )
1034 self._passphrase_userdata = userdata
1036 @_require_not_used
1037 def set_default_verify_paths(self) -> None:
1038 """
1039 Specify that the platform provided CA certificates are to be used for
1040 verification purposes. This method has some caveats related to the
1041 binary wheels that cryptography (pyOpenSSL's primary dependency) ships:
1043 * macOS will only load certificates using this method if the user has
1044 the ``openssl@1.1`` `Homebrew <https://brew.sh>`_ formula installed
1045 in the default location.
1046 * Windows will not work.
1047 * manylinux cryptography wheels will work on most common Linux
1048 distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the
1049 manylinux wheel and attempts to load roots via a fallback path.
1051 :return: None
1052 """
1053 # SSL_CTX_set_default_verify_paths will attempt to load certs from
1054 # both a cafile and capath that are set at compile time. However,
1055 # it will first check environment variables and, if present, load
1056 # those paths instead
1057 set_result = _lib.SSL_CTX_set_default_verify_paths(self._context)
1058 _openssl_assert(set_result == 1)
1059 # After attempting to set default_verify_paths we need to know whether
1060 # to go down the fallback path.
1061 # First we'll check to see if any env vars have been set. If so,
1062 # we won't try to do anything else because the user has set the path
1063 # themselves.
1064 if not self._check_env_vars_set("SSL_CERT_DIR", "SSL_CERT_FILE"):
1065 default_dir = _ffi.string(_lib.X509_get_default_cert_dir())
1066 default_file = _ffi.string(_lib.X509_get_default_cert_file())
1067 # Now we check to see if the default_dir and default_file are set
1068 # to the exact values we use in our manylinux builds. If they are
1069 # then we know to load the fallbacks
1070 if (
1071 default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR
1072 and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE
1073 ):
1074 # This is manylinux, let's load our fallback paths
1075 self._fallback_default_verify_paths(
1076 _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS
1077 )
1079 def _check_env_vars_set(self, dir_env_var: str, file_env_var: str) -> bool:
1080 """
1081 Check to see if the default cert dir/file environment vars are present.
1083 :return: bool
1084 """
1085 return (
1086 os.environ.get(file_env_var) is not None
1087 or os.environ.get(dir_env_var) is not None
1088 )
1090 def _fallback_default_verify_paths(
1091 self, file_path: list[str], dir_path: list[str]
1092 ) -> None:
1093 """
1094 Default verify paths are based on the compiled version of OpenSSL.
1095 However, when pyca/cryptography is compiled as a manylinux wheel
1096 that compiled location can potentially be wrong. So, like Go, we
1097 will try a predefined set of paths and attempt to load roots
1098 from there.
1100 :return: None
1101 """
1102 for cafile in file_path:
1103 if os.path.isfile(cafile):
1104 self.load_verify_locations(cafile)
1105 break
1107 for capath in dir_path:
1108 if os.path.isdir(capath):
1109 self.load_verify_locations(None, capath)
1110 break
1112 @_require_not_used
1113 def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None:
1114 """
1115 Load a certificate chain from a file.
1117 :param certfile: The name of the certificate chain file (``bytes`` or
1118 ``str``). Must be PEM encoded.
1120 :return: None
1121 """
1122 certfile = _path_bytes(certfile)
1124 result = _lib.SSL_CTX_use_certificate_chain_file(
1125 self._context, certfile
1126 )
1127 if not result:
1128 _raise_current_error()
1130 @_require_not_used
1131 def use_certificate_file(
1132 self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM
1133 ) -> None:
1134 """
1135 Load a certificate from a file
1137 :param certfile: The name of the certificate file (``bytes`` or
1138 ``str``).
1139 :param filetype: (optional) The encoding of the file, which is either
1140 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
1141 :const:`FILETYPE_PEM`.
1143 :return: None
1144 """
1145 certfile = _path_bytes(certfile)
1146 if not isinstance(filetype, int):
1147 raise TypeError("filetype must be an integer")
1149 use_result = _lib.SSL_CTX_use_certificate_file(
1150 self._context, certfile, filetype
1151 )
1152 if not use_result:
1153 _raise_current_error()
1155 @_require_not_used
1156 def use_certificate(self, cert: X509 | x509.Certificate) -> None:
1157 """
1158 Load a certificate from a X509 object
1160 :param cert: The X509 object
1161 :return: None
1162 """
1163 # Mirrored at Connection.use_certificate
1164 if not isinstance(cert, X509):
1165 cert = X509.from_cryptography(cert)
1166 else:
1167 warnings.warn(
1168 (
1169 "Passing pyOpenSSL X509 objects is deprecated. You "
1170 "should use a cryptography.x509.Certificate instead."
1171 ),
1172 DeprecationWarning,
1173 stacklevel=2,
1174 )
1176 use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509)
1177 if not use_result:
1178 _raise_current_error()
1180 @_require_not_used
1181 def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None:
1182 """
1183 Add certificate to chain
1185 :param certobj: The X509 certificate object to add to the chain
1186 :return: None
1187 """
1188 if not isinstance(certobj, X509):
1189 certobj = X509.from_cryptography(certobj)
1190 else:
1191 warnings.warn(
1192 (
1193 "Passing pyOpenSSL X509 objects is deprecated. You "
1194 "should use a cryptography.x509.Certificate instead."
1195 ),
1196 DeprecationWarning,
1197 stacklevel=2,
1198 )
1200 copy = _lib.X509_dup(certobj._x509)
1201 add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy)
1202 if not add_result:
1203 # TODO: This is untested.
1204 _lib.X509_free(copy)
1205 _raise_current_error()
1207 def _raise_passphrase_exception(self) -> None:
1208 if self._passphrase_helper is not None:
1209 self._passphrase_helper.raise_if_problem(Error)
1211 _raise_current_error()
1213 @_require_not_used
1214 def use_privatekey_file(
1215 self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM
1216 ) -> None:
1217 """
1218 Load a private key from a file
1220 :param keyfile: The name of the key file (``bytes`` or ``str``)
1221 :param filetype: (optional) The encoding of the file, which is either
1222 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
1223 :const:`FILETYPE_PEM`.
1225 :return: None
1226 """
1227 keyfile = _path_bytes(keyfile)
1229 if not isinstance(filetype, int):
1230 raise TypeError("filetype must be an integer")
1232 use_result = _lib.SSL_CTX_use_PrivateKey_file(
1233 self._context, keyfile, filetype
1234 )
1235 if not use_result:
1236 self._raise_passphrase_exception()
1238 @_require_not_used
1239 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None:
1240 """
1241 Load a private key from a PKey object
1243 :param pkey: The PKey object
1244 :return: None
1245 """
1246 # Mirrored at Connection.use_privatekey
1247 if not isinstance(pkey, PKey):
1248 pkey = PKey.from_cryptography_key(pkey)
1249 else:
1250 warnings.warn(
1251 (
1252 "Passing pyOpenSSL PKey objects is deprecated. You "
1253 "should use a cryptography private key instead."
1254 ),
1255 DeprecationWarning,
1256 stacklevel=2,
1257 )
1259 use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey)
1260 if not use_result:
1261 self._raise_passphrase_exception()
1263 def check_privatekey(self) -> None:
1264 """
1265 Check if the private key (loaded with :meth:`use_privatekey`) matches
1266 the certificate (loaded with :meth:`use_certificate`)
1268 :return: :data:`None` (raises :exc:`Error` if something's wrong)
1269 """
1270 if not _lib.SSL_CTX_check_private_key(self._context):
1271 _raise_current_error()
1273 @_require_not_used
1274 def load_client_ca(self, cafile: bytes) -> None:
1275 """
1276 Load the trusted certificates that will be sent to the client. Does
1277 not actually imply any of the certificates are trusted; that must be
1278 configured separately.
1280 :param bytes cafile: The path to a certificates file in PEM format.
1281 :return: None
1282 """
1283 ca_list = _lib.SSL_load_client_CA_file(
1284 _text_to_bytes_and_warn("cafile", cafile)
1285 )
1286 _openssl_assert(ca_list != _ffi.NULL)
1287 _lib.SSL_CTX_set_client_CA_list(self._context, ca_list)
1289 @_require_not_used
1290 def set_session_id(self, buf: bytes) -> None:
1291 """
1292 Set the session id to *buf* within which a session can be reused for
1293 this Context object. This is needed when doing session resumption,
1294 because there is no way for a stored session to know which Context
1295 object it is associated with.
1297 :param bytes buf: The session id.
1299 :returns: None
1300 """
1301 buf = _text_to_bytes_and_warn("buf", buf)
1302 _openssl_assert(
1303 _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf))
1304 == 1
1305 )
1307 @_require_not_used
1308 def set_session_cache_mode(self, mode: int) -> int:
1309 """
1310 Set the behavior of the session cache used by all connections using
1311 this Context. The previously set mode is returned. See
1312 :const:`SESS_CACHE_*` for details about particular modes.
1314 :param mode: One or more of the SESS_CACHE_* flags (combine using
1315 bitwise or)
1316 :returns: The previously set caching mode.
1318 .. versionadded:: 0.14
1319 """
1320 if not isinstance(mode, int):
1321 raise TypeError("mode must be an integer")
1323 return _lib.SSL_CTX_set_session_cache_mode(self._context, mode)
1325 def get_session_cache_mode(self) -> int:
1326 """
1327 Get the current session cache mode.
1329 :returns: The currently used cache mode.
1331 .. versionadded:: 0.14
1332 """
1333 return _lib.SSL_CTX_get_session_cache_mode(self._context)
1335 @_require_not_used
1336 def set_verify(
1337 self, mode: int, callback: _VerifyCallback | None = None
1338 ) -> None:
1339 """
1340 Set the verification flags for this Context object to *mode* and
1341 specify that *callback* should be used for verification callbacks.
1343 :param mode: The verify mode, this should be one of
1344 :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If
1345 :const:`VERIFY_PEER` is used, *mode* can be OR:ed with
1346 :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and
1347 :const:`VERIFY_CLIENT_ONCE` to further control the behaviour.
1348 :param callback: The optional Python verification callback to use.
1349 This should take five arguments: A Connection object, an X509
1350 object, and three integer variables, which are in turn potential
1351 error number, error depth and return code. *callback* should
1352 return True if verification passes and False otherwise.
1353 If omitted, OpenSSL's default verification is used.
1354 :return: None
1356 See SSL_CTX_set_verify(3SSL) for further details.
1357 """
1358 if not isinstance(mode, int):
1359 raise TypeError("mode must be an integer")
1361 if callback is None:
1362 self._verify_helper = None
1363 self._verify_callback = None
1364 _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL)
1365 else:
1366 if not callable(callback):
1367 raise TypeError("callback must be callable")
1369 self._verify_helper = _VerifyHelper(callback)
1370 self._verify_callback = self._verify_helper.callback
1371 _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
1373 @_require_not_used
1374 def set_verify_depth(self, depth: int) -> None:
1375 """
1376 Set the maximum depth for the certificate chain verification that shall
1377 be allowed for this Context object.
1379 :param depth: An integer specifying the verify depth
1380 :return: None
1381 """
1382 if not isinstance(depth, int):
1383 raise TypeError("depth must be an integer")
1385 _lib.SSL_CTX_set_verify_depth(self._context, depth)
1387 def get_verify_mode(self) -> int:
1388 """
1389 Retrieve the Context object's verify mode, as set by
1390 :meth:`set_verify`.
1392 :return: The verify mode
1393 """
1394 return _lib.SSL_CTX_get_verify_mode(self._context)
1396 def get_verify_depth(self) -> int:
1397 """
1398 Retrieve the Context object's verify depth, as set by
1399 :meth:`set_verify_depth`.
1401 :return: The verify depth
1402 """
1403 return _lib.SSL_CTX_get_verify_depth(self._context)
1405 @_require_not_used
1406 def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None:
1407 """
1408 Load parameters for Ephemeral Diffie-Hellman
1410 :param dhfile: The file to load EDH parameters from (``bytes`` or
1411 ``str``).
1413 :return: None
1414 """
1415 dhfile = _path_bytes(dhfile)
1417 bio = _lib.BIO_new_file(dhfile, b"r")
1418 if bio == _ffi.NULL:
1419 _raise_current_error()
1420 bio = _ffi.gc(bio, _lib.BIO_free)
1422 dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
1423 dh = _ffi.gc(dh, _lib.DH_free)
1424 res = _lib.SSL_CTX_set_tmp_dh(self._context, dh)
1425 _openssl_assert(res == 1)
1427 @_require_not_used
1428 def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None:
1429 """
1430 Select a curve to use for ECDHE key exchange.
1432 :param curve: A curve instance from cryptography
1433 (:class:`~cryptogragraphy.hazmat.primitives.asymmetric.ec.EllipticCurve`).
1434 Alternatively (deprecated) a curve object from either
1435 :meth:`OpenSSL.crypto.get_elliptic_curve` or
1436 :meth:`OpenSSL.crypto.get_elliptic_curves`.
1438 :return: None
1439 """
1441 if isinstance(curve, _EllipticCurve):
1442 warnings.warn(
1443 (
1444 "Passing pyOpenSSL elliptic curves to set_tmp_ecdh is "
1445 "deprecated. You should use cryptography's elliptic curve "
1446 "types instead."
1447 ),
1448 DeprecationWarning,
1449 stacklevel=2,
1450 )
1451 _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY())
1452 else:
1453 name = curve.name
1454 if name == "secp192r1":
1455 name = "prime192v1"
1456 elif name == "secp256r1":
1457 name = "prime256v1"
1458 nid = _lib.OBJ_txt2nid(name.encode())
1459 if nid == _lib.NID_undef:
1460 _raise_current_error()
1462 ec = _lib.EC_KEY_new_by_curve_name(nid)
1463 _openssl_assert(ec != _ffi.NULL)
1464 ec = _ffi.gc(ec, _lib.EC_KEY_free)
1465 _lib.SSL_CTX_set_tmp_ecdh(self._context, ec)
1467 @_require_not_used
1468 def set_cipher_list(self, cipher_list: bytes) -> None:
1469 """
1470 Set the list of ciphers to be used in this context.
1472 See the OpenSSL manual for more information (e.g.
1473 :manpage:`ciphers(1)`).
1475 Note this API does not change the cipher suites used in TLS 1.3
1476 Use `set_tls13_ciphersuites` for that.
1478 :param bytes cipher_list: An OpenSSL cipher string.
1479 :return: None
1480 """
1481 cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list)
1483 if not isinstance(cipher_list, bytes):
1484 raise TypeError("cipher_list must be a byte string.")
1486 _openssl_assert(
1487 _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1
1488 )
1490 @_require_not_used
1491 def set_tls13_ciphersuites(self, ciphersuites: bytes) -> None:
1492 """
1493 Set the list of TLS 1.3 ciphers to be used in this context.
1494 OpenSSL maintains a separate list of TLS 1.3+ ciphers to
1495 ciphers for TLS 1.2 and lowers.
1497 See the OpenSSL manual for more information (e.g.
1498 :manpage:`ciphers(1)`).
1500 :param bytes ciphersuites: An OpenSSL cipher string containing
1501 TLS 1.3+ ciphersuites.
1502 :return: None
1504 .. versionadded:: 25.2.0
1505 """
1506 if not isinstance(ciphersuites, bytes):
1507 raise TypeError("ciphersuites must be a byte string.")
1509 _openssl_assert(
1510 _lib.SSL_CTX_set_ciphersuites(self._context, ciphersuites) == 1
1511 )
1513 @_require_not_used
1514 def set_client_ca_list(
1515 self, certificate_authorities: Sequence[X509Name]
1516 ) -> None:
1517 """
1518 Set the list of preferred client certificate signers for this server
1519 context.
1521 This list of certificate authorities will be sent to the client when
1522 the server requests a client certificate.
1524 :param certificate_authorities: a sequence of X509Names.
1525 :return: None
1527 .. versionadded:: 0.10
1528 """
1529 name_stack = _lib.sk_X509_NAME_new_null()
1530 _openssl_assert(name_stack != _ffi.NULL)
1532 try:
1533 for ca_name in certificate_authorities:
1534 if not isinstance(ca_name, X509Name):
1535 raise TypeError(
1536 f"client CAs must be X509Name objects, not "
1537 f"{type(ca_name).__name__} objects"
1538 )
1539 copy = _lib.X509_NAME_dup(ca_name._name)
1540 _openssl_assert(copy != _ffi.NULL)
1541 push_result = _lib.sk_X509_NAME_push(name_stack, copy)
1542 if not push_result:
1543 _lib.X509_NAME_free(copy)
1544 _raise_current_error()
1545 except Exception:
1546 _lib.sk_X509_NAME_free(name_stack)
1547 raise
1549 _lib.SSL_CTX_set_client_CA_list(self._context, name_stack)
1551 @_require_not_used
1552 def add_client_ca(
1553 self, certificate_authority: X509 | x509.Certificate
1554 ) -> None:
1555 """
1556 Add the CA certificate to the list of preferred signers for this
1557 context.
1559 The list of certificate authorities will be sent to the client when the
1560 server requests a client certificate.
1562 :param certificate_authority: certificate authority's X509 certificate.
1563 :return: None
1565 .. versionadded:: 0.10
1566 """
1567 if not isinstance(certificate_authority, X509):
1568 certificate_authority = X509.from_cryptography(
1569 certificate_authority
1570 )
1571 else:
1572 warnings.warn(
1573 (
1574 "Passing pyOpenSSL X509 objects is deprecated. You "
1575 "should use a cryptography.x509.Certificate instead."
1576 ),
1577 DeprecationWarning,
1578 stacklevel=2,
1579 )
1581 add_result = _lib.SSL_CTX_add_client_CA(
1582 self._context, certificate_authority._x509
1583 )
1584 _openssl_assert(add_result == 1)
1586 @_require_not_used
1587 def set_timeout(self, timeout: int) -> None:
1588 """
1589 Set the timeout for newly created sessions for this Context object to
1590 *timeout*. The default value is 300 seconds. See the OpenSSL manual
1591 for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`).
1593 :param timeout: The timeout in (whole) seconds
1594 :return: The previous session timeout
1595 """
1596 if not isinstance(timeout, int):
1597 raise TypeError("timeout must be an integer")
1599 return _lib.SSL_CTX_set_timeout(self._context, timeout)
1601 def get_timeout(self) -> int:
1602 """
1603 Retrieve session timeout, as set by :meth:`set_timeout`. The default
1604 is 300 seconds.
1606 :return: The session timeout
1607 """
1608 return _lib.SSL_CTX_get_timeout(self._context)
1610 @_require_not_used
1611 def set_info_callback(
1612 self, callback: Callable[[Connection, int, int], None]
1613 ) -> None:
1614 """
1615 Set the information callback to *callback*. This function will be
1616 called from time to time during SSL handshakes.
1618 :param callback: The Python callback to use. This should take three
1619 arguments: a Connection object and two integers. The first integer
1620 specifies where in the SSL handshake the function was called, and
1621 the other the return code from a (possibly failed) internal
1622 function call.
1623 :return: None
1624 """
1626 @wraps(callback)
1627 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def]
1628 callback(Connection._reverse_mapping[ssl], where, return_code)
1630 self._info_callback = _ffi.callback(
1631 "void (*)(const SSL *, int, int)", wrapper
1632 )
1633 _lib.SSL_CTX_set_info_callback(self._context, self._info_callback)
1635 @_requires_keylog
1636 @_require_not_used
1637 def set_keylog_callback(
1638 self, callback: Callable[[Connection, bytes], None]
1639 ) -> None:
1640 """
1641 Set the TLS key logging callback to *callback*. This function will be
1642 called whenever TLS key material is generated or received, in order
1643 to allow applications to store this keying material for debugging
1644 purposes.
1646 :param callback: The Python callback to use. This should take two
1647 arguments: a Connection object and a bytestring that contains
1648 the key material in the format used by NSS for its SSLKEYLOGFILE
1649 debugging output.
1650 :return: None
1651 """
1653 @wraps(callback)
1654 def wrapper(ssl, line): # type: ignore[no-untyped-def]
1655 line = _ffi.string(line)
1656 callback(Connection._reverse_mapping[ssl], line)
1658 self._keylog_callback = _ffi.callback(
1659 "void (*)(const SSL *, const char *)", wrapper
1660 )
1661 _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback)
1663 def get_app_data(self) -> Any:
1664 """
1665 Get the application data (supplied via :meth:`set_app_data()`)
1667 :return: The application data
1668 """
1669 return self._app_data
1671 @_require_not_used
1672 def set_app_data(self, data: Any) -> None:
1673 """
1674 Set the application data (will be returned from get_app_data())
1676 :param data: Any Python object
1677 :return: None
1678 """
1679 self._app_data = data
1681 def get_cert_store(self) -> X509Store | None:
1682 """
1683 Get the certificate store for the context. This can be used to add
1684 "trusted" certificates without using the
1685 :meth:`load_verify_locations` method.
1687 :return: A X509Store object or None if it does not have one.
1688 """
1689 store = _lib.SSL_CTX_get_cert_store(self._context)
1690 if store == _ffi.NULL:
1691 # TODO: This is untested.
1692 return None
1694 pystore = X509Store.__new__(X509Store)
1695 pystore._store = store
1696 return pystore
1698 @_require_not_used
1699 def set_options(self, options: int) -> int:
1700 """
1701 Add options. Options set before are not cleared!
1702 This method should be used with the :const:`OP_*` constants.
1704 :param options: The options to add.
1705 :return: The new option bitmask.
1706 """
1707 if not isinstance(options, int):
1708 raise TypeError("options must be an integer")
1710 return _lib.SSL_CTX_set_options(self._context, options)
1712 @_require_not_used
1713 def set_mode(self, mode: int) -> int:
1714 """
1715 Add modes via bitmask. Modes set before are not cleared! This method
1716 should be used with the :const:`MODE_*` constants.
1718 :param mode: The mode to add.
1719 :return: The new mode bitmask.
1720 """
1721 if not isinstance(mode, int):
1722 raise TypeError("mode must be an integer")
1724 return _lib.SSL_CTX_set_mode(self._context, mode)
1726 @_require_not_used
1727 def clear_mode(self, mode_to_clear: int) -> int:
1728 """
1729 Modes previously set cannot be overwritten without being
1730 cleared first. This method should be used to clear existing modes.
1731 """
1732 return _lib.SSL_CTX_clear_mode(self._context, mode_to_clear)
1734 @_require_not_used
1735 def set_tlsext_servername_callback(
1736 self, callback: Callable[[Connection], None]
1737 ) -> None:
1738 """
1739 Specify a callback function to be called when clients specify a server
1740 name.
1742 :param callback: The callback function. It will be invoked with one
1743 argument, the Connection instance.
1745 .. versionadded:: 0.13
1746 """
1748 @wraps(callback)
1749 def wrapper(ssl, alert, arg): # type: ignore[no-untyped-def]
1750 callback(Connection._reverse_mapping[ssl])
1751 return 0
1753 self._tlsext_servername_callback = _ffi.callback(
1754 "int (*)(SSL *, int *, void *)", wrapper
1755 )
1756 _lib.SSL_CTX_set_tlsext_servername_callback(
1757 self._context, self._tlsext_servername_callback
1758 )
1760 @_require_not_used
1761 def set_tlsext_use_srtp(self, profiles: bytes) -> None:
1762 """
1763 Enable support for negotiating SRTP keying material.
1765 :param bytes profiles: A colon delimited list of protection profile
1766 names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``.
1767 :return: None
1768 """
1769 if not isinstance(profiles, bytes):
1770 raise TypeError("profiles must be a byte string.")
1772 _openssl_assert(
1773 _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0
1774 )
1776 @_require_not_used
1777 def set_alpn_protos(self, protos: list[bytes]) -> None:
1778 """
1779 Specify the protocols that the client is prepared to speak after the
1780 TLS connection has been negotiated using Application Layer Protocol
1781 Negotiation.
1783 :param protos: A list of the protocols to be offered to the server.
1784 This list should be a Python list of bytestrings representing the
1785 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
1786 """
1787 # Different versions of OpenSSL are inconsistent about how they handle
1788 # empty proto lists (see #1043), so we avoid the problem entirely by
1789 # rejecting them ourselves.
1790 if not protos:
1791 raise ValueError("at least one protocol must be specified")
1793 # Take the list of protocols and join them together, prefixing them
1794 # with their lengths.
1795 protostr = b"".join(
1796 chain.from_iterable((bytes((len(p),)), p) for p in protos)
1797 )
1799 # Build a C string from the list. We don't need to save this off
1800 # because OpenSSL immediately copies the data out.
1801 input_str = _ffi.new("unsigned char[]", protostr)
1803 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
1804 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
1805 # return 0 on success, and non-0 on failure.
1806 # WARNING: these functions reverse the return value convention.
1807 _openssl_assert(
1808 _lib.SSL_CTX_set_alpn_protos(
1809 self._context, input_str, len(protostr)
1810 )
1811 == 0
1812 )
1814 @_require_not_used
1815 def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None:
1816 """
1817 Specify a callback function that will be called on the server when a
1818 client offers protocols using ALPN.
1820 :param callback: The callback function. It will be invoked with two
1821 arguments: the Connection, and a list of offered protocols as
1822 bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return
1823 one of those bytestrings to indicate the chosen protocol, the
1824 empty bytestring to terminate the TLS connection, or the
1825 :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered
1826 protocol was selected, but that the connection should not be
1827 aborted.
1828 """
1829 self._alpn_select_helper = _ALPNSelectHelper(callback)
1830 self._alpn_select_callback = self._alpn_select_helper.callback
1831 _lib.SSL_CTX_set_alpn_select_cb(
1832 self._context, self._alpn_select_callback, _ffi.NULL
1833 )
1835 def _set_ocsp_callback(
1836 self,
1837 helper: _OCSPClientCallbackHelper | _OCSPServerCallbackHelper,
1838 data: Any | None,
1839 ) -> None:
1840 """
1841 This internal helper does the common work for
1842 ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is
1843 almost all of it.
1844 """
1845 self._ocsp_helper = helper
1846 self._ocsp_callback = helper.callback
1847 if data is None:
1848 self._ocsp_data = _ffi.NULL
1849 else:
1850 self._ocsp_data = _ffi.new_handle(data)
1852 rc = _lib.SSL_CTX_set_tlsext_status_cb(
1853 self._context, self._ocsp_callback
1854 )
1855 _openssl_assert(rc == 1)
1856 rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data)
1857 _openssl_assert(rc == 1)
1859 @_require_not_used
1860 def set_ocsp_server_callback(
1861 self,
1862 callback: _OCSPServerCallback[_T],
1863 data: _T | None = None,
1864 ) -> None:
1865 """
1866 Set a callback to provide OCSP data to be stapled to the TLS handshake
1867 on the server side.
1869 :param callback: The callback function. It will be invoked with two
1870 arguments: the Connection, and the optional arbitrary data you have
1871 provided. The callback must return a bytestring that contains the
1872 OCSP data to staple to the handshake. If no OCSP data is available
1873 for this connection, return the empty bytestring.
1874 :param data: Some opaque data that will be passed into the callback
1875 function when called. This can be used to avoid needing to do
1876 complex data lookups or to keep track of what context is being
1877 used. This parameter is optional.
1878 """
1879 helper = _OCSPServerCallbackHelper(callback)
1880 self._set_ocsp_callback(helper, data)
1882 @_require_not_used
1883 def set_ocsp_client_callback(
1884 self,
1885 callback: _OCSPClientCallback[_T],
1886 data: _T | None = None,
1887 ) -> None:
1888 """
1889 Set a callback to validate OCSP data stapled to the TLS handshake on
1890 the client side.
1892 :param callback: The callback function. It will be invoked with three
1893 arguments: the Connection, a bytestring containing the stapled OCSP
1894 assertion, and the optional arbitrary data you have provided. The
1895 callback must return a boolean that indicates the result of
1896 validating the OCSP data: ``True`` if the OCSP data is valid and
1897 the certificate can be trusted, or ``False`` if either the OCSP
1898 data is invalid or the certificate has been revoked.
1899 :param data: Some opaque data that will be passed into the callback
1900 function when called. This can be used to avoid needing to do
1901 complex data lookups or to keep track of what context is being
1902 used. This parameter is optional.
1903 """
1904 helper = _OCSPClientCallbackHelper(callback)
1905 self._set_ocsp_callback(helper, data)
1907 @_require_not_used
1908 def set_cookie_generate_callback(
1909 self, callback: _CookieGenerateCallback
1910 ) -> None:
1911 self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback)
1912 _lib.SSL_CTX_set_cookie_generate_cb(
1913 self._context,
1914 self._cookie_generate_helper.callback,
1915 )
1917 @_require_not_used
1918 def set_cookie_verify_callback(
1919 self, callback: _CookieVerifyCallback
1920 ) -> None:
1921 self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback)
1922 _lib.SSL_CTX_set_cookie_verify_cb(
1923 self._context,
1924 self._cookie_verify_helper.callback,
1925 )
1928class Connection:
1929 _reverse_mapping: typing.MutableMapping[Any, Connection] = (
1930 WeakValueDictionary()
1931 )
1933 def __init__(
1934 self, context: Context, socket: socket.socket | None = None
1935 ) -> None:
1936 """
1937 Create a new Connection object, using the given OpenSSL.SSL.Context
1938 instance and socket.
1940 :param context: An SSL Context to use for this connection
1941 :param socket: The socket to use for transport layer
1942 """
1943 if not isinstance(context, Context):
1944 raise TypeError("context must be a Context instance")
1946 context._used = True
1948 ssl = _lib.SSL_new(context._context)
1949 self._ssl = _ffi.gc(ssl, _lib.SSL_free)
1950 # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns
1951 # an SSL_ERROR_WANT_READ when processing a non-application data packet
1952 # even though there is still data on the underlying transport.
1953 # See https://github.com/openssl/openssl/issues/6234 for more details.
1954 _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY)
1955 self._context = context
1956 self._app_data = None
1958 # References to strings used for Application Layer Protocol
1959 # Negotiation. These strings get copied at some point but it's well
1960 # after the callback returns, so we have to hang them somewhere to
1961 # avoid them getting freed.
1962 self._alpn_select_callback_args: Any = None
1964 # Reference the verify_callback of the Context. This ensures that if
1965 # set_verify is called again after the SSL object has been created we
1966 # do not point to a dangling reference
1967 self._verify_helper = context._verify_helper
1968 self._verify_callback = context._verify_callback
1970 # And likewise for the cookie callbacks
1971 self._cookie_generate_helper = context._cookie_generate_helper
1972 self._cookie_verify_helper = context._cookie_verify_helper
1974 self._reverse_mapping[self._ssl] = self
1976 if socket is None:
1977 self._socket = None
1978 # Don't set up any gc for these, SSL_free will take care of them.
1979 self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem())
1980 _openssl_assert(self._into_ssl != _ffi.NULL)
1982 self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem())
1983 _openssl_assert(self._from_ssl != _ffi.NULL)
1985 _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl)
1986 else:
1987 self._into_ssl = None
1988 self._from_ssl = None
1989 self._socket = socket
1990 set_result = _lib.SSL_set_fd(
1991 self._ssl, _asFileDescriptor(self._socket)
1992 )
1993 _openssl_assert(set_result == 1)
1995 def __getattr__(self, name: str) -> Any:
1996 """
1997 Look up attributes on the wrapped socket object if they are not found
1998 on the Connection object.
1999 """
2000 if self._socket is None:
2001 raise AttributeError(
2002 f"'{self.__class__.__name__}' object has no attribute '{name}'"
2003 )
2004 else:
2005 return getattr(self._socket, name)
2007 def _raise_ssl_error(self, ssl: Any, result: int) -> None:
2008 if self._context._verify_helper is not None:
2009 self._context._verify_helper.raise_if_problem()
2010 if self._context._alpn_select_helper is not None:
2011 self._context._alpn_select_helper.raise_if_problem()
2012 if self._context._ocsp_helper is not None:
2013 self._context._ocsp_helper.raise_if_problem()
2015 error = _lib.SSL_get_error(ssl, result)
2016 if error == _lib.SSL_ERROR_WANT_READ:
2017 raise WantReadError()
2018 elif error == _lib.SSL_ERROR_WANT_WRITE:
2019 raise WantWriteError()
2020 elif error == _lib.SSL_ERROR_ZERO_RETURN:
2021 raise ZeroReturnError()
2022 elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP:
2023 # TODO: This is untested.
2024 raise WantX509LookupError()
2025 elif error == _lib.SSL_ERROR_SYSCALL:
2026 if platform == "win32":
2027 errno = _ffi.getwinerror()[0]
2028 else:
2029 errno = _ffi.errno
2030 if _lib.ERR_peek_error() == 0 or errno != 0:
2031 if result < 0 and errno != 0:
2032 raise SysCallError(errno, errorcode.get(errno))
2033 raise SysCallError(-1, "Unexpected EOF")
2034 else:
2035 # TODO: This is untested, but I think twisted hits it?
2036 _raise_current_error()
2037 elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0:
2038 # In 3.0.x an unexpected EOF no longer triggers syscall error
2039 # but we want to maintain compatibility so we check here and
2040 # raise syscall if it is an EOF. Since we're not actually sure
2041 # what else could raise SSL_ERROR_SSL we check for the presence
2042 # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING
2043 # and if it's not present we just raise an error, which matches
2044 # the behavior before we added this elif section
2045 peeked_error = _lib.ERR_peek_error()
2046 reason = _lib.ERR_GET_REASON(peeked_error)
2047 if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING:
2048 _openssl_assert(
2049 reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING
2050 )
2051 _lib.ERR_clear_error()
2052 raise SysCallError(-1, "Unexpected EOF")
2053 else:
2054 _raise_current_error()
2055 elif error == _lib.SSL_ERROR_NONE:
2056 pass
2057 else:
2058 _raise_current_error()
2060 def get_context(self) -> Context:
2061 """
2062 Retrieve the :class:`Context` object associated with this
2063 :class:`Connection`.
2064 """
2065 return self._context
2067 def set_context(self, context: Context) -> None:
2068 """
2069 Switch this connection to a new session context.
2071 :param context: A :class:`Context` instance giving the new session
2072 context to use.
2073 """
2074 if not isinstance(context, Context):
2075 raise TypeError("context must be a Context instance")
2077 _lib.SSL_set_SSL_CTX(self._ssl, context._context)
2078 self._context = context
2079 self._context._used = True
2081 def get_servername(self) -> bytes | None:
2082 """
2083 Retrieve the servername extension value if provided in the client hello
2084 message, or None if there wasn't one.
2086 :return: A byte string giving the server name or :data:`None`.
2088 .. versionadded:: 0.13
2089 """
2090 name = _lib.SSL_get_servername(
2091 self._ssl, _lib.TLSEXT_NAMETYPE_host_name
2092 )
2093 if name == _ffi.NULL:
2094 return None
2096 return _ffi.string(name)
2098 def set_verify(
2099 self, mode: int, callback: _VerifyCallback | None = None
2100 ) -> None:
2101 """
2102 Override the Context object's verification flags for this specific
2103 connection. See :py:meth:`Context.set_verify` for details.
2104 """
2105 if not isinstance(mode, int):
2106 raise TypeError("mode must be an integer")
2108 if callback is None:
2109 self._verify_helper = None
2110 self._verify_callback = None
2111 _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL)
2112 else:
2113 if not callable(callback):
2114 raise TypeError("callback must be callable")
2116 self._verify_helper = _VerifyHelper(callback)
2117 self._verify_callback = self._verify_helper.callback
2118 _lib.SSL_set_verify(self._ssl, mode, self._verify_callback)
2120 def get_verify_mode(self) -> int:
2121 """
2122 Retrieve the Connection object's verify mode, as set by
2123 :meth:`set_verify`.
2125 :return: The verify mode
2126 """
2127 return _lib.SSL_get_verify_mode(self._ssl)
2129 def use_certificate(self, cert: X509 | x509.Certificate) -> None:
2130 """
2131 Load a certificate from a X509 object
2133 :param cert: The X509 object
2134 :return: None
2135 """
2136 # Mirrored from Context.use_certificate
2137 if not isinstance(cert, X509):
2138 cert = X509.from_cryptography(cert)
2139 else:
2140 warnings.warn(
2141 (
2142 "Passing pyOpenSSL X509 objects is deprecated. You "
2143 "should use a cryptography.x509.Certificate instead."
2144 ),
2145 DeprecationWarning,
2146 stacklevel=2,
2147 )
2149 use_result = _lib.SSL_use_certificate(self._ssl, cert._x509)
2150 if not use_result:
2151 _raise_current_error()
2153 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None:
2154 """
2155 Load a private key from a PKey object
2157 :param pkey: The PKey object
2158 :return: None
2159 """
2160 # Mirrored from Context.use_privatekey
2161 if not isinstance(pkey, PKey):
2162 pkey = PKey.from_cryptography_key(pkey)
2163 else:
2164 warnings.warn(
2165 (
2166 "Passing pyOpenSSL PKey objects is deprecated. You "
2167 "should use a cryptography private key instead."
2168 ),
2169 DeprecationWarning,
2170 stacklevel=2,
2171 )
2173 use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey)
2174 if not use_result:
2175 self._context._raise_passphrase_exception()
2177 def set_ciphertext_mtu(self, mtu: int) -> None:
2178 """
2179 For DTLS, set the maximum UDP payload size (*not* including IP/UDP
2180 overhead).
2182 Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent
2183 OpenSSL from spontaneously clearing this.
2185 :param mtu: An integer giving the maximum transmission unit.
2187 .. versionadded:: 21.1
2188 """
2189 _lib.SSL_set_mtu(self._ssl, mtu)
2191 def get_cleartext_mtu(self) -> int:
2192 """
2193 For DTLS, get the maximum size of unencrypted data you can pass to
2194 :meth:`write` without exceeding the MTU (as passed to
2195 :meth:`set_ciphertext_mtu`).
2197 :return: The effective MTU as an integer.
2199 .. versionadded:: 21.1
2200 """
2202 if not hasattr(_lib, "DTLS_get_data_mtu"):
2203 raise NotImplementedError("requires OpenSSL 1.1.1 or better")
2204 return _lib.DTLS_get_data_mtu(self._ssl)
2206 def set_tlsext_host_name(self, name: bytes) -> None:
2207 """
2208 Set the value of the servername extension to send in the client hello.
2210 :param name: A byte string giving the name.
2212 .. versionadded:: 0.13
2213 """
2214 if not isinstance(name, bytes):
2215 raise TypeError("name must be a byte string")
2216 elif b"\0" in name:
2217 raise TypeError("name must not contain NUL byte")
2219 # XXX I guess this can fail sometimes?
2220 _lib.SSL_set_tlsext_host_name(self._ssl, name)
2222 def pending(self) -> int:
2223 """
2224 Get the number of bytes that can be safely read from the SSL buffer
2225 (**not** the underlying transport buffer).
2227 :return: The number of bytes available in the receive buffer.
2228 """
2229 return _lib.SSL_pending(self._ssl)
2231 def send(self, buf: _Buffer, flags: int = 0) -> int:
2232 """
2233 Send data on the connection. NOTE: If you get one of the WantRead,
2234 WantWrite or WantX509Lookup exceptions on this, you have to call the
2235 method again with the SAME buffer.
2237 :param buf: The string, buffer or memoryview to send
2238 :param flags: (optional) Included for compatibility with the socket
2239 API, the value is ignored
2240 :return: The number of bytes written
2241 """
2242 # Backward compatibility
2243 buf = _text_to_bytes_and_warn("buf", buf)
2245 with _ffi.from_buffer(buf) as data:
2246 # check len(buf) instead of len(data) for testability
2247 if len(buf) > 2147483647:
2248 raise ValueError(
2249 "Cannot send more than 2**31-1 bytes at once."
2250 )
2252 result = _lib.SSL_write(self._ssl, data, len(data))
2253 self._raise_ssl_error(self._ssl, result)
2255 return result
2257 write = send
2259 def sendall(self, buf: _Buffer, flags: int = 0) -> int:
2260 """
2261 Send "all" data on the connection. This calls send() repeatedly until
2262 all data is sent. If an error occurs, it's impossible to tell how much
2263 data has been sent.
2265 :param buf: The string, buffer or memoryview to send
2266 :param flags: (optional) Included for compatibility with the socket
2267 API, the value is ignored
2268 :return: The number of bytes written
2269 """
2270 buf = _text_to_bytes_and_warn("buf", buf)
2272 with _ffi.from_buffer(buf) as data:
2273 left_to_send = len(buf)
2274 total_sent = 0
2276 while left_to_send:
2277 # SSL_write's num arg is an int,
2278 # so we cannot send more than 2**31-1 bytes at once.
2279 result = _lib.SSL_write(
2280 self._ssl, data + total_sent, min(left_to_send, 2147483647)
2281 )
2282 self._raise_ssl_error(self._ssl, result)
2283 total_sent += result
2284 left_to_send -= result
2286 return total_sent
2288 def recv(self, bufsiz: int, flags: int | None = None) -> bytes:
2289 """
2290 Receive data on the connection.
2292 :param bufsiz: The maximum number of bytes to read
2293 :param flags: (optional) The only supported flag is ``MSG_PEEK``,
2294 all other flags are ignored.
2295 :return: The string read from the Connection
2296 """
2297 buf = _no_zero_allocator("char[]", bufsiz)
2298 if flags is not None and flags & socket.MSG_PEEK:
2299 result = _lib.SSL_peek(self._ssl, buf, bufsiz)
2300 else:
2301 result = _lib.SSL_read(self._ssl, buf, bufsiz)
2302 self._raise_ssl_error(self._ssl, result)
2303 return _ffi.buffer(buf, result)[:]
2305 read = recv
2307 def recv_into(
2308 self,
2309 buffer: Any, # collections.abc.Buffer once we use Python 3.12+
2310 nbytes: int | None = None,
2311 flags: int | None = None,
2312 ) -> int:
2313 """
2314 Receive data on the connection and copy it directly into the provided
2315 buffer, rather than creating a new string.
2317 :param buffer: The buffer to copy into.
2318 :param nbytes: (optional) The maximum number of bytes to read into the
2319 buffer. If not present, defaults to the size of the buffer. If
2320 larger than the size of the buffer, is reduced to the size of the
2321 buffer.
2322 :param flags: (optional) The only supported flag is ``MSG_PEEK``,
2323 all other flags are ignored.
2324 :return: The number of bytes read into the buffer.
2325 """
2326 if nbytes is None:
2327 nbytes = len(buffer)
2328 else:
2329 nbytes = min(nbytes, len(buffer))
2331 # We need to create a temporary buffer. This is annoying, it would be
2332 # better if we could pass memoryviews straight into the SSL_read call,
2333 # but right now we can't. Revisit this if CFFI gets that ability.
2334 buf = _no_zero_allocator("char[]", nbytes)
2335 if flags is not None and flags & socket.MSG_PEEK:
2336 result = _lib.SSL_peek(self._ssl, buf, nbytes)
2337 else:
2338 result = _lib.SSL_read(self._ssl, buf, nbytes)
2339 self._raise_ssl_error(self._ssl, result)
2341 # This strange line is all to avoid a memory copy. The buffer protocol
2342 # should allow us to assign a CFFI buffer to the LHS of this line, but
2343 # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
2344 # wrap it in a memoryview.
2345 buffer[:result] = memoryview(_ffi.buffer(buf, result))
2347 return result
2349 def _handle_bio_errors(self, bio: Any, result: int) -> typing.NoReturn:
2350 if _lib.BIO_should_retry(bio):
2351 if _lib.BIO_should_read(bio):
2352 raise WantReadError()
2353 elif _lib.BIO_should_write(bio):
2354 # TODO: This is untested.
2355 raise WantWriteError()
2356 elif _lib.BIO_should_io_special(bio):
2357 # TODO: This is untested. I think io_special means the socket
2358 # BIO has a not-yet connected socket.
2359 raise ValueError("BIO_should_io_special")
2360 else:
2361 # TODO: This is untested.
2362 raise ValueError("unknown bio failure")
2363 else:
2364 # TODO: This is untested.
2365 _raise_current_error()
2367 def bio_read(self, bufsiz: int) -> bytes:
2368 """
2369 If the Connection was created with a memory BIO, this method can be
2370 used to read bytes from the write end of that memory BIO. Many
2371 Connection methods will add bytes which must be read in this manner or
2372 the buffer will eventually fill up and the Connection will be able to
2373 take no further actions.
2375 :param bufsiz: The maximum number of bytes to read
2376 :return: The string read.
2377 """
2378 if self._from_ssl is None:
2379 raise TypeError("Connection sock was not None")
2381 if not isinstance(bufsiz, int):
2382 raise TypeError("bufsiz must be an integer")
2384 buf = _no_zero_allocator("char[]", bufsiz)
2385 result = _lib.BIO_read(self._from_ssl, buf, bufsiz)
2386 if result <= 0:
2387 self._handle_bio_errors(self._from_ssl, result)
2389 return _ffi.buffer(buf, result)[:]
2391 def bio_write(self, buf: _Buffer) -> int:
2392 """
2393 If the Connection was created with a memory BIO, this method can be
2394 used to add bytes to the read end of that memory BIO. The Connection
2395 can then read the bytes (for example, in response to a call to
2396 :meth:`recv`).
2398 :param buf: The string to put into the memory BIO.
2399 :return: The number of bytes written
2400 """
2401 buf = _text_to_bytes_and_warn("buf", buf)
2403 if self._into_ssl is None:
2404 raise TypeError("Connection sock was not None")
2406 with _ffi.from_buffer(buf) as data:
2407 result = _lib.BIO_write(self._into_ssl, data, len(data))
2408 if result <= 0:
2409 self._handle_bio_errors(self._into_ssl, result)
2410 return result
2412 def renegotiate(self) -> bool:
2413 """
2414 Renegotiate the session.
2416 :return: True if the renegotiation can be started, False otherwise
2417 """
2418 if not self.renegotiate_pending():
2419 _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1)
2420 return True
2421 return False
2423 def do_handshake(self) -> None:
2424 """
2425 Perform an SSL handshake (usually called after :meth:`renegotiate` or
2426 one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can
2427 raise the same exceptions as :meth:`send` and :meth:`recv`.
2429 :return: None.
2430 """
2431 result = _lib.SSL_do_handshake(self._ssl)
2432 self._raise_ssl_error(self._ssl, result)
2434 def renegotiate_pending(self) -> bool:
2435 """
2436 Check if there's a renegotiation in progress, it will return False once
2437 a renegotiation is finished.
2439 :return: Whether there's a renegotiation in progress
2440 """
2441 return _lib.SSL_renegotiate_pending(self._ssl) == 1
2443 def total_renegotiations(self) -> int:
2444 """
2445 Find out the total number of renegotiations.
2447 :return: The number of renegotiations.
2448 """
2449 return _lib.SSL_total_renegotiations(self._ssl)
2451 def connect(self, addr: Any) -> None:
2452 """
2453 Call the :meth:`connect` method of the underlying socket and set up SSL
2454 on the socket, using the :class:`Context` object supplied to this
2455 :class:`Connection` object at creation.
2457 :param addr: A remote address
2458 :return: What the socket's connect method returns
2459 """
2460 _lib.SSL_set_connect_state(self._ssl)
2461 return self._socket.connect(addr) # type: ignore[return-value, union-attr]
2463 def connect_ex(self, addr: Any) -> int:
2464 """
2465 Call the :meth:`connect_ex` method of the underlying socket and set up
2466 SSL on the socket, using the Context object supplied to this Connection
2467 object at creation. Note that if the :meth:`connect_ex` method of the
2468 socket doesn't return 0, SSL won't be initialized.
2470 :param addr: A remove address
2471 :return: What the socket's connect_ex method returns
2472 """
2473 connect_ex = self._socket.connect_ex # type: ignore[union-attr]
2474 self.set_connect_state()
2475 return connect_ex(addr)
2477 def accept(self) -> tuple[Connection, Any]:
2478 """
2479 Call the :meth:`accept` method of the underlying socket and set up SSL
2480 on the returned socket, using the Context object supplied to this
2481 :class:`Connection` object at creation.
2483 :return: A *(conn, addr)* pair where *conn* is the new
2484 :class:`Connection` object created, and *address* is as returned by
2485 the socket's :meth:`accept`.
2486 """
2487 client, addr = self._socket.accept() # type: ignore[union-attr]
2488 conn = Connection(self._context, client)
2489 conn.set_accept_state()
2490 return (conn, addr)
2492 def DTLSv1_listen(self) -> None:
2493 """
2494 Call the OpenSSL function DTLSv1_listen on this connection. See the
2495 OpenSSL manual for more details.
2497 :return: None
2498 """
2499 # Possible future extension: return the BIO_ADDR in some form.
2500 bio_addr = _lib.BIO_ADDR_new()
2501 try:
2502 result = _lib.DTLSv1_listen(self._ssl, bio_addr)
2503 finally:
2504 _lib.BIO_ADDR_free(bio_addr)
2505 # DTLSv1_listen is weird. A zero return value means 'didn't find a
2506 # ClientHello with valid cookie, but keep trying'. So basically
2507 # WantReadError. But it doesn't work correctly with _raise_ssl_error.
2508 # So we raise it manually instead.
2509 if self._cookie_generate_helper is not None:
2510 self._cookie_generate_helper.raise_if_problem()
2511 if self._cookie_verify_helper is not None:
2512 self._cookie_verify_helper.raise_if_problem()
2513 if result == 0:
2514 raise WantReadError()
2515 if result < 0:
2516 self._raise_ssl_error(self._ssl, result)
2518 def DTLSv1_get_timeout(self) -> int | None:
2519 """
2520 Determine when the DTLS SSL object next needs to perform internal
2521 processing due to the passage of time.
2523 When the returned number of seconds have passed, the
2524 :meth:`DTLSv1_handle_timeout` method needs to be called.
2526 :return: The time left in seconds before the next timeout or `None`
2527 if no timeout is currently active.
2528 """
2529 ptv_sec = _ffi.new("time_t *")
2530 ptv_usec = _ffi.new("long *")
2531 if _lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec):
2532 return ptv_sec[0] + (ptv_usec[0] / 1000000)
2533 else:
2534 return None
2536 def DTLSv1_handle_timeout(self) -> bool:
2537 """
2538 Handles any timeout events which have become pending on a DTLS SSL
2539 object.
2541 :return: `True` if there was a pending timeout, `False` otherwise.
2542 """
2543 result = _lib.DTLSv1_handle_timeout(self._ssl)
2544 if result < 0:
2545 self._raise_ssl_error(self._ssl, result)
2546 assert False, "unreachable"
2547 else:
2548 return bool(result)
2550 def bio_shutdown(self) -> None:
2551 """
2552 If the Connection was created with a memory BIO, this method can be
2553 used to indicate that *end of file* has been reached on the read end of
2554 that memory BIO.
2556 :return: None
2557 """
2558 if self._from_ssl is None:
2559 raise TypeError("Connection sock was not None")
2561 _lib.BIO_set_mem_eof_return(self._into_ssl, 0)
2563 def shutdown(self) -> bool:
2564 """
2565 Send the shutdown message to the Connection.
2567 :return: True if the shutdown completed successfully (i.e. both sides
2568 have sent closure alerts), False otherwise (in which case you
2569 call :meth:`recv` or :meth:`send` when the connection becomes
2570 readable/writeable).
2571 """
2572 result = _lib.SSL_shutdown(self._ssl)
2573 if result < 0:
2574 self._raise_ssl_error(self._ssl, result)
2575 assert False, "unreachable"
2576 elif result > 0:
2577 return True
2578 else:
2579 return False
2581 def get_cipher_list(self) -> list[str]:
2582 """
2583 Retrieve the list of ciphers used by the Connection object.
2585 :return: A list of native cipher strings.
2586 """
2587 ciphers = []
2588 for i in count():
2589 result = _lib.SSL_get_cipher_list(self._ssl, i)
2590 if result == _ffi.NULL:
2591 break
2592 ciphers.append(_ffi.string(result).decode("utf-8"))
2593 return ciphers
2595 def get_client_ca_list(self) -> list[X509Name]:
2596 """
2597 Get CAs whose certificates are suggested for client authentication.
2599 :return: If this is a server connection, the list of certificate
2600 authorities that will be sent or has been sent to the client, as
2601 controlled by this :class:`Connection`'s :class:`Context`.
2603 If this is a client connection, the list will be empty until the
2604 connection with the server is established.
2606 .. versionadded:: 0.10
2607 """
2608 ca_names = _lib.SSL_get_client_CA_list(self._ssl)
2609 if ca_names == _ffi.NULL:
2610 # TODO: This is untested.
2611 return []
2613 result = []
2614 for i in range(_lib.sk_X509_NAME_num(ca_names)):
2615 name = _lib.sk_X509_NAME_value(ca_names, i)
2616 copy = _lib.X509_NAME_dup(name)
2617 _openssl_assert(copy != _ffi.NULL)
2619 pyname = X509Name.__new__(X509Name)
2620 pyname._name = _ffi.gc(copy, _lib.X509_NAME_free)
2621 result.append(pyname)
2622 return result
2624 def makefile(self, *args: Any, **kwargs: Any) -> typing.NoReturn:
2625 """
2626 The makefile() method is not implemented, since there is no dup
2627 semantics for SSL connections
2629 :raise: NotImplementedError
2630 """
2631 raise NotImplementedError(
2632 "Cannot make file object of OpenSSL.SSL.Connection"
2633 )
2635 def get_app_data(self) -> Any:
2636 """
2637 Retrieve application data as set by :meth:`set_app_data`.
2639 :return: The application data
2640 """
2641 return self._app_data
2643 def set_app_data(self, data: Any) -> None:
2644 """
2645 Set application data
2647 :param data: The application data
2648 :return: None
2649 """
2650 self._app_data = data
2652 def get_shutdown(self) -> int:
2653 """
2654 Get the shutdown state of the Connection.
2656 :return: The shutdown state, a bitvector of SENT_SHUTDOWN,
2657 RECEIVED_SHUTDOWN.
2658 """
2659 return _lib.SSL_get_shutdown(self._ssl)
2661 def set_shutdown(self, state: int) -> None:
2662 """
2663 Set the shutdown state of the Connection.
2665 :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN.
2666 :return: None
2667 """
2668 if not isinstance(state, int):
2669 raise TypeError("state must be an integer")
2671 _lib.SSL_set_shutdown(self._ssl, state)
2673 def get_state_string(self) -> bytes:
2674 """
2675 Retrieve a verbose string detailing the state of the Connection.
2677 :return: A string representing the state
2678 """
2679 return _ffi.string(_lib.SSL_state_string_long(self._ssl))
2681 def server_random(self) -> bytes | None:
2682 """
2683 Retrieve the random value used with the server hello message.
2685 :return: A string representing the state
2686 """
2687 session = _lib.SSL_get_session(self._ssl)
2688 if session == _ffi.NULL:
2689 return None
2690 length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0)
2691 _openssl_assert(length > 0)
2692 outp = _no_zero_allocator("unsigned char[]", length)
2693 _lib.SSL_get_server_random(self._ssl, outp, length)
2694 return _ffi.buffer(outp, length)[:]
2696 def client_random(self) -> bytes | None:
2697 """
2698 Retrieve the random value used with the client hello message.
2700 :return: A string representing the state
2701 """
2702 session = _lib.SSL_get_session(self._ssl)
2703 if session == _ffi.NULL:
2704 return None
2706 length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0)
2707 _openssl_assert(length > 0)
2708 outp = _no_zero_allocator("unsigned char[]", length)
2709 _lib.SSL_get_client_random(self._ssl, outp, length)
2710 return _ffi.buffer(outp, length)[:]
2712 def master_key(self) -> bytes | None:
2713 """
2714 Retrieve the value of the master key for this session.
2716 :return: A string representing the state
2717 """
2718 session = _lib.SSL_get_session(self._ssl)
2719 if session == _ffi.NULL:
2720 return None
2722 length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0)
2723 _openssl_assert(length > 0)
2724 outp = _no_zero_allocator("unsigned char[]", length)
2725 _lib.SSL_SESSION_get_master_key(session, outp, length)
2726 return _ffi.buffer(outp, length)[:]
2728 def export_keying_material(
2729 self, label: bytes, olen: int, context: bytes | None = None
2730 ) -> bytes:
2731 """
2732 Obtain keying material for application use.
2734 :param: label - a disambiguating label string as described in RFC 5705
2735 :param: olen - the length of the exported key material in bytes
2736 :param: context - a per-association context value
2737 :return: the exported key material bytes or None
2738 """
2739 outp = _no_zero_allocator("unsigned char[]", olen)
2740 context_buf = _ffi.NULL
2741 context_len = 0
2742 use_context = 0
2743 if context is not None:
2744 context_buf = context
2745 context_len = len(context)
2746 use_context = 1
2747 success = _lib.SSL_export_keying_material(
2748 self._ssl,
2749 outp,
2750 olen,
2751 label,
2752 len(label),
2753 context_buf,
2754 context_len,
2755 use_context,
2756 )
2757 _openssl_assert(success == 1)
2758 return _ffi.buffer(outp, olen)[:]
2760 def sock_shutdown(self, *args: Any, **kwargs: Any) -> None:
2761 """
2762 Call the :meth:`shutdown` method of the underlying socket.
2763 See :manpage:`shutdown(2)`.
2765 :return: What the socket's shutdown() method returns
2766 """
2767 return self._socket.shutdown(*args, **kwargs) # type: ignore[return-value, union-attr]
2769 @typing.overload
2770 def get_certificate(
2771 self, *, as_cryptography: typing.Literal[True]
2772 ) -> x509.Certificate | None:
2773 pass
2775 @typing.overload
2776 def get_certificate(
2777 self, *, as_cryptography: typing.Literal[False] = False
2778 ) -> X509 | None:
2779 pass
2781 def get_certificate(
2782 self,
2783 *,
2784 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2785 ) -> X509 | x509.Certificate | None:
2786 """
2787 Retrieve the local certificate (if any)
2789 :param bool as_cryptography: Controls whether a
2790 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509``
2791 object should be returned.
2793 :return: The local certificate
2794 """
2795 cert = _lib.SSL_get_certificate(self._ssl)
2796 if cert != _ffi.NULL:
2797 _lib.X509_up_ref(cert)
2798 pycert = X509._from_raw_x509_ptr(cert)
2799 if as_cryptography:
2800 return pycert.to_cryptography()
2801 return pycert
2802 return None
2804 @typing.overload
2805 def get_peer_certificate(
2806 self, *, as_cryptography: typing.Literal[True]
2807 ) -> x509.Certificate | None:
2808 pass
2810 @typing.overload
2811 def get_peer_certificate(
2812 self, *, as_cryptography: typing.Literal[False] = False
2813 ) -> X509 | None:
2814 pass
2816 def get_peer_certificate(
2817 self,
2818 *,
2819 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2820 ) -> X509 | x509.Certificate | None:
2821 """
2822 Retrieve the other side's certificate (if any)
2824 :param bool as_cryptography: Controls whether a
2825 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509``
2826 object should be returned.
2828 :return: The peer's certificate
2829 """
2830 cert = _lib.SSL_get_peer_certificate(self._ssl)
2831 if cert != _ffi.NULL:
2832 pycert = X509._from_raw_x509_ptr(cert)
2833 if as_cryptography:
2834 return pycert.to_cryptography()
2835 return pycert
2836 return None
2838 @staticmethod
2839 def _cert_stack_to_list(cert_stack: Any) -> list[X509]:
2840 """
2841 Internal helper to convert a STACK_OF(X509) to a list of X509
2842 instances.
2843 """
2844 result = []
2845 for i in range(_lib.sk_X509_num(cert_stack)):
2846 cert = _lib.sk_X509_value(cert_stack, i)
2847 _openssl_assert(cert != _ffi.NULL)
2848 res = _lib.X509_up_ref(cert)
2849 _openssl_assert(res >= 1)
2850 pycert = X509._from_raw_x509_ptr(cert)
2851 result.append(pycert)
2852 return result
2854 @staticmethod
2855 def _cert_stack_to_cryptography_list(
2856 cert_stack: Any,
2857 ) -> list[x509.Certificate]:
2858 """
2859 Internal helper to convert a STACK_OF(X509) to a list of X509
2860 instances.
2861 """
2862 result = []
2863 for i in range(_lib.sk_X509_num(cert_stack)):
2864 cert = _lib.sk_X509_value(cert_stack, i)
2865 _openssl_assert(cert != _ffi.NULL)
2866 res = _lib.X509_up_ref(cert)
2867 _openssl_assert(res >= 1)
2868 pycert = X509._from_raw_x509_ptr(cert)
2869 result.append(pycert.to_cryptography())
2870 return result
2872 @typing.overload
2873 def get_peer_cert_chain(
2874 self, *, as_cryptography: typing.Literal[True]
2875 ) -> list[x509.Certificate] | None:
2876 pass
2878 @typing.overload
2879 def get_peer_cert_chain(
2880 self, *, as_cryptography: typing.Literal[False] = False
2881 ) -> list[X509] | None:
2882 pass
2884 def get_peer_cert_chain(
2885 self,
2886 *,
2887 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2888 ) -> list[X509] | list[x509.Certificate] | None:
2889 """
2890 Retrieve the other side's certificate (if any)
2892 :param bool as_cryptography: Controls whether a list of
2893 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509``
2894 object should be returned.
2896 :return: A list of X509 instances giving the peer's certificate chain,
2897 or None if it does not have one.
2898 """
2899 cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl)
2900 if cert_stack == _ffi.NULL:
2901 return None
2903 if as_cryptography:
2904 return self._cert_stack_to_cryptography_list(cert_stack)
2905 return self._cert_stack_to_list(cert_stack)
2907 @typing.overload
2908 def get_verified_chain(
2909 self, *, as_cryptography: typing.Literal[True]
2910 ) -> list[x509.Certificate] | None:
2911 pass
2913 @typing.overload
2914 def get_verified_chain(
2915 self, *, as_cryptography: typing.Literal[False] = False
2916 ) -> list[X509] | None:
2917 pass
2919 def get_verified_chain(
2920 self,
2921 *,
2922 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2923 ) -> list[X509] | list[x509.Certificate] | None:
2924 """
2925 Retrieve the verified certificate chain of the peer including the
2926 peer's end entity certificate. It must be called after a session has
2927 been successfully established. If peer verification was not successful
2928 the chain may be incomplete, invalid, or None.
2930 :param bool as_cryptography: Controls whether a list of
2931 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509``
2932 object should be returned.
2934 :return: A list of X509 instances giving the peer's verified
2935 certificate chain, or None if it does not have one.
2937 .. versionadded:: 20.0
2938 """
2939 # OpenSSL 1.1+
2940 cert_stack = _lib.SSL_get0_verified_chain(self._ssl)
2941 if cert_stack == _ffi.NULL:
2942 return None
2944 if as_cryptography:
2945 return self._cert_stack_to_cryptography_list(cert_stack)
2946 return self._cert_stack_to_list(cert_stack)
2948 def want_read(self) -> bool:
2949 """
2950 Checks if more data has to be read from the transport layer to complete
2951 an operation.
2953 :return: True iff more data has to be read
2954 """
2955 return _lib.SSL_want_read(self._ssl)
2957 def want_write(self) -> bool:
2958 """
2959 Checks if there is data to write to the transport layer to complete an
2960 operation.
2962 :return: True iff there is data to write
2963 """
2964 return _lib.SSL_want_write(self._ssl)
2966 def set_accept_state(self) -> None:
2967 """
2968 Set the connection to work in server mode. The handshake will be
2969 handled automatically by read/write.
2971 :return: None
2972 """
2973 _lib.SSL_set_accept_state(self._ssl)
2975 def set_connect_state(self) -> None:
2976 """
2977 Set the connection to work in client mode. The handshake will be
2978 handled automatically by read/write.
2980 :return: None
2981 """
2982 _lib.SSL_set_connect_state(self._ssl)
2984 def get_session(self) -> Session | None:
2985 """
2986 Returns the Session currently used.
2988 :return: An instance of :class:`OpenSSL.SSL.Session` or
2989 :obj:`None` if no session exists.
2991 .. versionadded:: 0.14
2992 """
2993 session = _lib.SSL_get1_session(self._ssl)
2994 if session == _ffi.NULL:
2995 return None
2997 pysession = Session.__new__(Session)
2998 pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free)
2999 return pysession
3001 def set_session(self, session: Session) -> None:
3002 """
3003 Set the session to be used when the TLS/SSL connection is established.
3005 :param session: A Session instance representing the session to use.
3006 :returns: None
3008 .. versionadded:: 0.14
3009 """
3010 if not isinstance(session, Session):
3011 raise TypeError("session must be a Session instance")
3013 result = _lib.SSL_set_session(self._ssl, session._session)
3014 _openssl_assert(result == 1)
3016 def _get_finished_message(
3017 self, function: Callable[[Any, Any, int], int]
3018 ) -> bytes | None:
3019 """
3020 Helper to implement :meth:`get_finished` and
3021 :meth:`get_peer_finished`.
3023 :param function: Either :data:`SSL_get_finished`: or
3024 :data:`SSL_get_peer_finished`.
3026 :return: :data:`None` if the desired message has not yet been
3027 received, otherwise the contents of the message.
3028 """
3029 # The OpenSSL documentation says nothing about what might happen if the
3030 # count argument given is zero. Specifically, it doesn't say whether
3031 # the output buffer may be NULL in that case or not. Inspection of the
3032 # implementation reveals that it calls memcpy() unconditionally.
3033 # Section 7.1.4, paragraph 1 of the C standard suggests that
3034 # memcpy(NULL, source, 0) is not guaranteed to produce defined (let
3035 # alone desirable) behavior (though it probably does on just about
3036 # every implementation...)
3037 #
3038 # Allocate a tiny buffer to pass in (instead of just passing NULL as
3039 # one might expect) for the initial call so as to be safe against this
3040 # potentially undefined behavior.
3041 empty = _ffi.new("char[]", 0)
3042 size = function(self._ssl, empty, 0)
3043 if size == 0:
3044 # No Finished message so far.
3045 return None
3047 buf = _no_zero_allocator("char[]", size)
3048 function(self._ssl, buf, size)
3049 return _ffi.buffer(buf, size)[:]
3051 def get_finished(self) -> bytes | None:
3052 """
3053 Obtain the latest TLS Finished message that we sent.
3055 :return: The contents of the message or :obj:`None` if the TLS
3056 handshake has not yet completed.
3058 .. versionadded:: 0.15
3059 """
3060 return self._get_finished_message(_lib.SSL_get_finished)
3062 def get_peer_finished(self) -> bytes | None:
3063 """
3064 Obtain the latest TLS Finished message that we received from the peer.
3066 :return: The contents of the message or :obj:`None` if the TLS
3067 handshake has not yet completed.
3069 .. versionadded:: 0.15
3070 """
3071 return self._get_finished_message(_lib.SSL_get_peer_finished)
3073 def get_cipher_name(self) -> str | None:
3074 """
3075 Obtain the name of the currently used cipher.
3077 :returns: The name of the currently used cipher or :obj:`None`
3078 if no connection has been established.
3080 .. versionadded:: 0.15
3081 """
3082 cipher = _lib.SSL_get_current_cipher(self._ssl)
3083 if cipher == _ffi.NULL:
3084 return None
3085 else:
3086 name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher))
3087 return name.decode("utf-8")
3089 def get_cipher_bits(self) -> int | None:
3090 """
3091 Obtain the number of secret bits of the currently used cipher.
3093 :returns: The number of secret bits of the currently used cipher
3094 or :obj:`None` if no connection has been established.
3096 .. versionadded:: 0.15
3097 """
3098 cipher = _lib.SSL_get_current_cipher(self._ssl)
3099 if cipher == _ffi.NULL:
3100 return None
3101 else:
3102 return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL)
3104 def get_cipher_version(self) -> str | None:
3105 """
3106 Obtain the protocol version of the currently used cipher.
3108 :returns: The protocol name of the currently used cipher
3109 or :obj:`None` if no connection has been established.
3111 .. versionadded:: 0.15
3112 """
3113 cipher = _lib.SSL_get_current_cipher(self._ssl)
3114 if cipher == _ffi.NULL:
3115 return None
3116 else:
3117 version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher))
3118 return version.decode("utf-8")
3120 def get_protocol_version_name(self) -> str:
3121 """
3122 Retrieve the protocol version of the current connection.
3124 :returns: The TLS version of the current connection, for example
3125 the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown``
3126 for connections that were not successfully established.
3127 """
3128 version = _ffi.string(_lib.SSL_get_version(self._ssl))
3129 return version.decode("utf-8")
3131 def get_protocol_version(self) -> int:
3132 """
3133 Retrieve the SSL or TLS protocol version of the current connection.
3135 :returns: The TLS version of the current connection. For example,
3136 it will return ``0x769`` for connections made over TLS version 1.
3137 """
3138 version = _lib.SSL_version(self._ssl)
3139 return version
3141 def set_alpn_protos(self, protos: list[bytes]) -> None:
3142 """
3143 Specify the client's ALPN protocol list.
3145 These protocols are offered to the server during protocol negotiation.
3147 :param protos: A list of the protocols to be offered to the server.
3148 This list should be a Python list of bytestrings representing the
3149 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
3150 """
3151 # Different versions of OpenSSL are inconsistent about how they handle
3152 # empty proto lists (see #1043), so we avoid the problem entirely by
3153 # rejecting them ourselves.
3154 if not protos:
3155 raise ValueError("at least one protocol must be specified")
3157 # Take the list of protocols and join them together, prefixing them
3158 # with their lengths.
3159 protostr = b"".join(
3160 chain.from_iterable((bytes((len(p),)), p) for p in protos)
3161 )
3163 # Build a C string from the list. We don't need to save this off
3164 # because OpenSSL immediately copies the data out.
3165 input_str = _ffi.new("unsigned char[]", protostr)
3167 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
3168 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
3169 # return 0 on success, and non-0 on failure.
3170 # WARNING: these functions reverse the return value convention.
3171 _openssl_assert(
3172 _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0
3173 )
3175 def get_alpn_proto_negotiated(self) -> bytes:
3176 """
3177 Get the protocol that was negotiated by ALPN.
3179 :returns: A bytestring of the protocol name. If no protocol has been
3180 negotiated yet, returns an empty bytestring.
3181 """
3182 data = _ffi.new("unsigned char **")
3183 data_len = _ffi.new("unsigned int *")
3185 _lib.SSL_get0_alpn_selected(self._ssl, data, data_len)
3187 if not data_len:
3188 return b""
3190 return _ffi.buffer(data[0], data_len[0])[:]
3192 def get_selected_srtp_profile(self) -> bytes:
3193 """
3194 Get the SRTP protocol which was negotiated.
3196 :returns: A bytestring of the SRTP profile name. If no profile has been
3197 negotiated yet, returns an empty bytestring.
3198 """
3199 profile = _lib.SSL_get_selected_srtp_profile(self._ssl)
3200 if not profile:
3201 return b""
3203 return _ffi.string(profile.name)
3205 def request_ocsp(self) -> None:
3206 """
3207 Called to request that the server sends stapled OCSP data, if
3208 available. If this is not called on the client side then the server
3209 will not send OCSP data. Should be used in conjunction with
3210 :meth:`Context.set_ocsp_client_callback`.
3211 """
3212 rc = _lib.SSL_set_tlsext_status_type(
3213 self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp
3214 )
3215 _openssl_assert(rc == 1)
3217 def set_info_callback(
3218 self, callback: Callable[[Connection, int, int], None]
3219 ) -> None:
3220 """
3221 Set the information callback to *callback*. This function will be
3222 called from time to time during SSL handshakes.
3224 :param callback: The Python callback to use. This should take three
3225 arguments: a Connection object and two integers. The first integer
3226 specifies where in the SSL handshake the function was called, and
3227 the other the return code from a (possibly failed) internal
3228 function call.
3229 :return: None
3230 """
3232 @wraps(callback)
3233 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def]
3234 callback(Connection._reverse_mapping[ssl], where, return_code)
3236 self._info_callback = _ffi.callback(
3237 "void (*)(const SSL *, int, int)", wrapper
3238 )
3239 _lib.SSL_set_info_callback(self._ssl, self._info_callback)