Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/SSL.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import socket
5import sys
6import typing
7import warnings
8from collections.abc import Sequence
9from errno import errorcode
10from functools import partial, wraps
11from itertools import chain, count
12from sys import platform
13from typing import Any, Callable, Optional, TypeVar
14from weakref import WeakValueDictionary
16from cryptography import x509
17from cryptography.hazmat.primitives.asymmetric import ec
19from OpenSSL._util import (
20 StrOrBytesPath as _StrOrBytesPath,
21)
22from OpenSSL._util import (
23 exception_from_error_queue as _exception_from_error_queue,
24)
25from OpenSSL._util import (
26 ffi as _ffi,
27)
28from OpenSSL._util import (
29 lib as _lib,
30)
31from OpenSSL._util import (
32 make_assert as _make_assert,
33)
34from OpenSSL._util import (
35 no_zero_allocator as _no_zero_allocator,
36)
37from OpenSSL._util import (
38 path_bytes as _path_bytes,
39)
40from OpenSSL._util import (
41 text_to_bytes_and_warn as _text_to_bytes_and_warn,
42)
43from OpenSSL.crypto import (
44 FILETYPE_PEM,
45 X509,
46 PKey,
47 X509Name,
48 X509Store,
49 _EllipticCurve,
50 _PassphraseHelper,
51 _PrivateKey,
52)
54__all__ = [
55 "DTLS_CLIENT_METHOD",
56 "DTLS_METHOD",
57 "DTLS_SERVER_METHOD",
58 "MODE_RELEASE_BUFFERS",
59 "NO_OVERLAPPING_PROTOCOLS",
60 "OPENSSL_BUILT_ON",
61 "OPENSSL_CFLAGS",
62 "OPENSSL_DIR",
63 "OPENSSL_PLATFORM",
64 "OPENSSL_VERSION",
65 "OPENSSL_VERSION_NUMBER",
66 "OP_ALL",
67 "OP_CIPHER_SERVER_PREFERENCE",
68 "OP_DONT_INSERT_EMPTY_FRAGMENTS",
69 "OP_EPHEMERAL_RSA",
70 "OP_MICROSOFT_BIG_SSLV3_BUFFER",
71 "OP_MICROSOFT_SESS_ID_BUG",
72 "OP_MSIE_SSLV2_RSA_PADDING",
73 "OP_NETSCAPE_CA_DN_BUG",
74 "OP_NETSCAPE_CHALLENGE_BUG",
75 "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG",
76 "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG",
77 "OP_NO_COMPRESSION",
78 "OP_NO_QUERY_MTU",
79 "OP_NO_TICKET",
80 "OP_PKCS1_CHECK_1",
81 "OP_PKCS1_CHECK_2",
82 "OP_SINGLE_DH_USE",
83 "OP_SINGLE_ECDH_USE",
84 "OP_SSLEAY_080_CLIENT_DH_BUG",
85 "OP_SSLREF2_REUSE_CERT_TYPE_BUG",
86 "OP_TLS_BLOCK_PADDING_BUG",
87 "OP_TLS_D5_BUG",
88 "OP_TLS_ROLLBACK_BUG",
89 "RECEIVED_SHUTDOWN",
90 "SENT_SHUTDOWN",
91 "SESS_CACHE_BOTH",
92 "SESS_CACHE_CLIENT",
93 "SESS_CACHE_NO_AUTO_CLEAR",
94 "SESS_CACHE_NO_INTERNAL",
95 "SESS_CACHE_NO_INTERNAL_LOOKUP",
96 "SESS_CACHE_NO_INTERNAL_STORE",
97 "SESS_CACHE_OFF",
98 "SESS_CACHE_SERVER",
99 "SSL3_VERSION",
100 "SSLEAY_BUILT_ON",
101 "SSLEAY_CFLAGS",
102 "SSLEAY_DIR",
103 "SSLEAY_PLATFORM",
104 "SSLEAY_VERSION",
105 "SSL_CB_ACCEPT_EXIT",
106 "SSL_CB_ACCEPT_LOOP",
107 "SSL_CB_ALERT",
108 "SSL_CB_CONNECT_EXIT",
109 "SSL_CB_CONNECT_LOOP",
110 "SSL_CB_EXIT",
111 "SSL_CB_HANDSHAKE_DONE",
112 "SSL_CB_HANDSHAKE_START",
113 "SSL_CB_LOOP",
114 "SSL_CB_READ",
115 "SSL_CB_READ_ALERT",
116 "SSL_CB_WRITE",
117 "SSL_CB_WRITE_ALERT",
118 "SSL_ST_ACCEPT",
119 "SSL_ST_CONNECT",
120 "SSL_ST_MASK",
121 "TLS1_1_VERSION",
122 "TLS1_2_VERSION",
123 "TLS1_3_VERSION",
124 "TLS1_VERSION",
125 "TLS_CLIENT_METHOD",
126 "TLS_METHOD",
127 "TLS_SERVER_METHOD",
128 "VERIFY_CLIENT_ONCE",
129 "VERIFY_FAIL_IF_NO_PEER_CERT",
130 "VERIFY_NONE",
131 "VERIFY_PEER",
132 "Connection",
133 "Context",
134 "Error",
135 "OP_NO_SSLv2",
136 "OP_NO_SSLv3",
137 "OP_NO_TLSv1",
138 "OP_NO_TLSv1_1",
139 "OP_NO_TLSv1_2",
140 "OP_NO_TLSv1_3",
141 "SSLeay_version",
142 "SSLv23_METHOD",
143 "Session",
144 "SysCallError",
145 "TLSv1_1_METHOD",
146 "TLSv1_2_METHOD",
147 "TLSv1_METHOD",
148 "WantReadError",
149 "WantWriteError",
150 "WantX509LookupError",
151 "X509VerificationCodes",
152 "ZeroReturnError",
153]
156OPENSSL_VERSION_NUMBER: int = _lib.OPENSSL_VERSION_NUMBER
157OPENSSL_VERSION: int = _lib.OPENSSL_VERSION
158OPENSSL_CFLAGS: int = _lib.OPENSSL_CFLAGS
159OPENSSL_PLATFORM: int = _lib.OPENSSL_PLATFORM
160OPENSSL_DIR: int = _lib.OPENSSL_DIR
161OPENSSL_BUILT_ON: int = _lib.OPENSSL_BUILT_ON
163SSLEAY_VERSION = OPENSSL_VERSION
164SSLEAY_CFLAGS = OPENSSL_CFLAGS
165SSLEAY_PLATFORM = OPENSSL_PLATFORM
166SSLEAY_DIR = OPENSSL_DIR
167SSLEAY_BUILT_ON = OPENSSL_BUILT_ON
169SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN
170RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN
172SSLv23_METHOD = 3
173TLSv1_METHOD = 4
174TLSv1_1_METHOD = 5
175TLSv1_2_METHOD = 6
176TLS_METHOD = 7
177TLS_SERVER_METHOD = 8
178TLS_CLIENT_METHOD = 9
179DTLS_METHOD = 10
180DTLS_SERVER_METHOD = 11
181DTLS_CLIENT_METHOD = 12
183SSL3_VERSION: int = _lib.SSL3_VERSION
184TLS1_VERSION: int = _lib.TLS1_VERSION
185TLS1_1_VERSION: int = _lib.TLS1_1_VERSION
186TLS1_2_VERSION: int = _lib.TLS1_2_VERSION
187TLS1_3_VERSION: int = _lib.TLS1_3_VERSION
189OP_NO_SSLv2: int = _lib.SSL_OP_NO_SSLv2
190OP_NO_SSLv3: int = _lib.SSL_OP_NO_SSLv3
191OP_NO_TLSv1: int = _lib.SSL_OP_NO_TLSv1
192OP_NO_TLSv1_1: int = _lib.SSL_OP_NO_TLSv1_1
193OP_NO_TLSv1_2: int = _lib.SSL_OP_NO_TLSv1_2
194OP_NO_TLSv1_3: int = _lib.SSL_OP_NO_TLSv1_3
196MODE_RELEASE_BUFFERS: int = _lib.SSL_MODE_RELEASE_BUFFERS
198OP_SINGLE_DH_USE: int = _lib.SSL_OP_SINGLE_DH_USE
199OP_SINGLE_ECDH_USE: int = _lib.SSL_OP_SINGLE_ECDH_USE
200OP_EPHEMERAL_RSA: int = _lib.SSL_OP_EPHEMERAL_RSA
201OP_MICROSOFT_SESS_ID_BUG: int = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG
202OP_NETSCAPE_CHALLENGE_BUG: int = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG
203OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG: int = (
204 _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG
205)
206OP_SSLREF2_REUSE_CERT_TYPE_BUG: int = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG
207OP_MICROSOFT_BIG_SSLV3_BUFFER: int = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
208OP_MSIE_SSLV2_RSA_PADDING: int = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING
209OP_SSLEAY_080_CLIENT_DH_BUG: int = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG
210OP_TLS_D5_BUG: int = _lib.SSL_OP_TLS_D5_BUG
211OP_TLS_BLOCK_PADDING_BUG: int = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG
212OP_DONT_INSERT_EMPTY_FRAGMENTS: int = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS
213OP_CIPHER_SERVER_PREFERENCE: int = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE
214OP_TLS_ROLLBACK_BUG: int = _lib.SSL_OP_TLS_ROLLBACK_BUG
215OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1
216OP_PKCS1_CHECK_2: int = _lib.SSL_OP_PKCS1_CHECK_2
217OP_NETSCAPE_CA_DN_BUG: int = _lib.SSL_OP_NETSCAPE_CA_DN_BUG
218OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG: int = (
219 _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG
220)
221OP_NO_COMPRESSION: int = _lib.SSL_OP_NO_COMPRESSION
223OP_NO_QUERY_MTU: int = _lib.SSL_OP_NO_QUERY_MTU
224try:
225 OP_COOKIE_EXCHANGE: int | None = _lib.SSL_OP_COOKIE_EXCHANGE
226 __all__.append("OP_COOKIE_EXCHANGE")
227except AttributeError:
228 OP_COOKIE_EXCHANGE = None
229OP_NO_TICKET: int = _lib.SSL_OP_NO_TICKET
231try:
232 OP_NO_RENEGOTIATION: int = _lib.SSL_OP_NO_RENEGOTIATION
233 __all__.append("OP_NO_RENEGOTIATION")
234except AttributeError:
235 pass
237try:
238 OP_IGNORE_UNEXPECTED_EOF: int = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF
239 __all__.append("OP_IGNORE_UNEXPECTED_EOF")
240except AttributeError:
241 pass
243try:
244 OP_LEGACY_SERVER_CONNECT: int = _lib.SSL_OP_LEGACY_SERVER_CONNECT
245 __all__.append("OP_LEGACY_SERVER_CONNECT")
246except AttributeError:
247 pass
249OP_ALL: int = _lib.SSL_OP_ALL
251VERIFY_PEER: int = _lib.SSL_VERIFY_PEER
252VERIFY_FAIL_IF_NO_PEER_CERT: int = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT
253VERIFY_CLIENT_ONCE: int = _lib.SSL_VERIFY_CLIENT_ONCE
254VERIFY_NONE: int = _lib.SSL_VERIFY_NONE
256SESS_CACHE_OFF: int = _lib.SSL_SESS_CACHE_OFF
257SESS_CACHE_CLIENT: int = _lib.SSL_SESS_CACHE_CLIENT
258SESS_CACHE_SERVER: int = _lib.SSL_SESS_CACHE_SERVER
259SESS_CACHE_BOTH: int = _lib.SSL_SESS_CACHE_BOTH
260SESS_CACHE_NO_AUTO_CLEAR: int = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR
261SESS_CACHE_NO_INTERNAL_LOOKUP: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP
262SESS_CACHE_NO_INTERNAL_STORE: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE
263SESS_CACHE_NO_INTERNAL: int = _lib.SSL_SESS_CACHE_NO_INTERNAL
265SSL_ST_CONNECT: int = _lib.SSL_ST_CONNECT
266SSL_ST_ACCEPT: int = _lib.SSL_ST_ACCEPT
267SSL_ST_MASK: int = _lib.SSL_ST_MASK
269SSL_CB_LOOP: int = _lib.SSL_CB_LOOP
270SSL_CB_EXIT: int = _lib.SSL_CB_EXIT
271SSL_CB_READ: int = _lib.SSL_CB_READ
272SSL_CB_WRITE: int = _lib.SSL_CB_WRITE
273SSL_CB_ALERT: int = _lib.SSL_CB_ALERT
274SSL_CB_READ_ALERT: int = _lib.SSL_CB_READ_ALERT
275SSL_CB_WRITE_ALERT: int = _lib.SSL_CB_WRITE_ALERT
276SSL_CB_ACCEPT_LOOP: int = _lib.SSL_CB_ACCEPT_LOOP
277SSL_CB_ACCEPT_EXIT: int = _lib.SSL_CB_ACCEPT_EXIT
278SSL_CB_CONNECT_LOOP: int = _lib.SSL_CB_CONNECT_LOOP
279SSL_CB_CONNECT_EXIT: int = _lib.SSL_CB_CONNECT_EXIT
280SSL_CB_HANDSHAKE_START: int = _lib.SSL_CB_HANDSHAKE_START
281SSL_CB_HANDSHAKE_DONE: int = _lib.SSL_CB_HANDSHAKE_DONE
283_Buffer = typing.Union[bytes, bytearray, memoryview]
284_T = TypeVar("_T")
287class _NoOverlappingProtocols:
288 pass
291NO_OVERLAPPING_PROTOCOLS = _NoOverlappingProtocols()
293# Callback types.
294_ALPNSelectCallback = Callable[
295 [
296 "Connection",
297 typing.List[bytes],
298 ],
299 typing.Union[bytes, _NoOverlappingProtocols],
300]
301_CookieGenerateCallback = Callable[["Connection"], bytes]
302_CookieVerifyCallback = Callable[["Connection", bytes], bool]
303_OCSPClientCallback = Callable[["Connection", bytes, Optional[_T]], bool]
304_OCSPServerCallback = Callable[["Connection", Optional[_T]], bytes]
305_PassphraseCallback = Callable[[int, bool, Optional[_T]], bytes]
306_VerifyCallback = Callable[["Connection", X509, int, int, int], bool]
309class X509VerificationCodes:
310 """
311 Success and error codes for X509 verification, as returned by the
312 underlying ``X509_STORE_CTX_get_error()`` function and passed by pyOpenSSL
313 to verification callback functions.
315 See `OpenSSL Verification Errors
316 <https://www.openssl.org/docs/manmaster/man3/X509_verify_cert_error_string.html#ERROR-CODES>`_
317 for details.
318 """
320 OK = _lib.X509_V_OK
321 ERR_UNABLE_TO_GET_ISSUER_CERT = _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT
322 ERR_UNABLE_TO_GET_CRL = _lib.X509_V_ERR_UNABLE_TO_GET_CRL
323 ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE = (
324 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE
325 )
326 ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE = (
327 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE
328 )
329 ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY = (
330 _lib.X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY
331 )
332 ERR_CERT_SIGNATURE_FAILURE = _lib.X509_V_ERR_CERT_SIGNATURE_FAILURE
333 ERR_CRL_SIGNATURE_FAILURE = _lib.X509_V_ERR_CRL_SIGNATURE_FAILURE
334 ERR_CERT_NOT_YET_VALID = _lib.X509_V_ERR_CERT_NOT_YET_VALID
335 ERR_CERT_HAS_EXPIRED = _lib.X509_V_ERR_CERT_HAS_EXPIRED
336 ERR_CRL_NOT_YET_VALID = _lib.X509_V_ERR_CRL_NOT_YET_VALID
337 ERR_CRL_HAS_EXPIRED = _lib.X509_V_ERR_CRL_HAS_EXPIRED
338 ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD = (
339 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD
340 )
341 ERR_ERROR_IN_CERT_NOT_AFTER_FIELD = (
342 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD
343 )
344 ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD = (
345 _lib.X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD
346 )
347 ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD = (
348 _lib.X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD
349 )
350 ERR_OUT_OF_MEM = _lib.X509_V_ERR_OUT_OF_MEM
351 ERR_DEPTH_ZERO_SELF_SIGNED_CERT = (
352 _lib.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT
353 )
354 ERR_SELF_SIGNED_CERT_IN_CHAIN = _lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN
355 ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY = (
356 _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY
357 )
358 ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE = (
359 _lib.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
360 )
361 ERR_CERT_CHAIN_TOO_LONG = _lib.X509_V_ERR_CERT_CHAIN_TOO_LONG
362 ERR_CERT_REVOKED = _lib.X509_V_ERR_CERT_REVOKED
363 ERR_INVALID_CA = _lib.X509_V_ERR_INVALID_CA
364 ERR_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PATH_LENGTH_EXCEEDED
365 ERR_INVALID_PURPOSE = _lib.X509_V_ERR_INVALID_PURPOSE
366 ERR_CERT_UNTRUSTED = _lib.X509_V_ERR_CERT_UNTRUSTED
367 ERR_CERT_REJECTED = _lib.X509_V_ERR_CERT_REJECTED
368 ERR_SUBJECT_ISSUER_MISMATCH = _lib.X509_V_ERR_SUBJECT_ISSUER_MISMATCH
369 ERR_AKID_SKID_MISMATCH = _lib.X509_V_ERR_AKID_SKID_MISMATCH
370 ERR_AKID_ISSUER_SERIAL_MISMATCH = (
371 _lib.X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH
372 )
373 ERR_KEYUSAGE_NO_CERTSIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CERTSIGN
374 ERR_UNABLE_TO_GET_CRL_ISSUER = _lib.X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER
375 ERR_UNHANDLED_CRITICAL_EXTENSION = (
376 _lib.X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION
377 )
378 ERR_KEYUSAGE_NO_CRL_SIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CRL_SIGN
379 ERR_UNHANDLED_CRITICAL_CRL_EXTENSION = (
380 _lib.X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION
381 )
382 ERR_INVALID_NON_CA = _lib.X509_V_ERR_INVALID_NON_CA
383 ERR_PROXY_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED
384 ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE = (
385 _lib.X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE
386 )
387 ERR_PROXY_CERTIFICATES_NOT_ALLOWED = (
388 _lib.X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED
389 )
390 ERR_INVALID_EXTENSION = _lib.X509_V_ERR_INVALID_EXTENSION
391 ERR_INVALID_POLICY_EXTENSION = _lib.X509_V_ERR_INVALID_POLICY_EXTENSION
392 ERR_NO_EXPLICIT_POLICY = _lib.X509_V_ERR_NO_EXPLICIT_POLICY
393 ERR_DIFFERENT_CRL_SCOPE = _lib.X509_V_ERR_DIFFERENT_CRL_SCOPE
394 ERR_UNSUPPORTED_EXTENSION_FEATURE = (
395 _lib.X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE
396 )
397 ERR_UNNESTED_RESOURCE = _lib.X509_V_ERR_UNNESTED_RESOURCE
398 ERR_PERMITTED_VIOLATION = _lib.X509_V_ERR_PERMITTED_VIOLATION
399 ERR_EXCLUDED_VIOLATION = _lib.X509_V_ERR_EXCLUDED_VIOLATION
400 ERR_SUBTREE_MINMAX = _lib.X509_V_ERR_SUBTREE_MINMAX
401 ERR_UNSUPPORTED_CONSTRAINT_TYPE = (
402 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE
403 )
404 ERR_UNSUPPORTED_CONSTRAINT_SYNTAX = (
405 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX
406 )
407 ERR_UNSUPPORTED_NAME_SYNTAX = _lib.X509_V_ERR_UNSUPPORTED_NAME_SYNTAX
408 ERR_CRL_PATH_VALIDATION_ERROR = _lib.X509_V_ERR_CRL_PATH_VALIDATION_ERROR
409 ERR_HOSTNAME_MISMATCH = _lib.X509_V_ERR_HOSTNAME_MISMATCH
410 ERR_EMAIL_MISMATCH = _lib.X509_V_ERR_EMAIL_MISMATCH
411 ERR_IP_ADDRESS_MISMATCH = _lib.X509_V_ERR_IP_ADDRESS_MISMATCH
412 ERR_APPLICATION_VERIFICATION = _lib.X509_V_ERR_APPLICATION_VERIFICATION
415# Taken from https://golang.org/src/crypto/x509/root_linux.go
416_CERTIFICATE_FILE_LOCATIONS = [
417 "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc.
418 "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6
419 "/etc/ssl/ca-bundle.pem", # OpenSUSE
420 "/etc/pki/tls/cacert.pem", # OpenELEC
421 "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7
422]
424_CERTIFICATE_PATH_LOCATIONS = [
425 "/etc/ssl/certs", # SLES10/SLES11
426]
428# These values are compared to output from cffi's ffi.string so they must be
429# byte strings.
430_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs"
431_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem"
434class Error(Exception):
435 """
436 An error occurred in an `OpenSSL.SSL` API.
437 """
440_raise_current_error = partial(_exception_from_error_queue, Error)
441_openssl_assert = _make_assert(Error)
444class WantReadError(Error):
445 pass
448class WantWriteError(Error):
449 pass
452class WantX509LookupError(Error):
453 pass
456class ZeroReturnError(Error):
457 pass
460class SysCallError(Error):
461 pass
464class _CallbackExceptionHelper:
465 """
466 A base class for wrapper classes that allow for intelligent exception
467 handling in OpenSSL callbacks.
469 :ivar list _problems: Any exceptions that occurred while executing in a
470 context where they could not be raised in the normal way. Typically
471 this is because OpenSSL has called into some Python code and requires a
472 return value. The exceptions are saved to be raised later when it is
473 possible to do so.
474 """
476 def __init__(self) -> None:
477 self._problems: list[Exception] = []
479 def raise_if_problem(self) -> None:
480 """
481 Raise an exception from the OpenSSL error queue or that was previously
482 captured whe running a callback.
483 """
484 if self._problems:
485 try:
486 _raise_current_error()
487 except Error:
488 pass
489 raise self._problems.pop(0)
492class _VerifyHelper(_CallbackExceptionHelper):
493 """
494 Wrap a callback such that it can be used as a certificate verification
495 callback.
496 """
498 def __init__(self, callback: _VerifyCallback) -> None:
499 _CallbackExceptionHelper.__init__(self)
501 @wraps(callback)
502 def wrapper(ok, store_ctx): # type: ignore[no-untyped-def]
503 x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
504 _lib.X509_up_ref(x509)
505 cert = X509._from_raw_x509_ptr(x509)
506 error_number = _lib.X509_STORE_CTX_get_error(store_ctx)
507 error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx)
509 index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx()
510 ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index)
511 connection = Connection._reverse_mapping[ssl]
513 try:
514 result = callback(
515 connection, cert, error_number, error_depth, ok
516 )
517 except Exception as e:
518 self._problems.append(e)
519 return 0
520 else:
521 if result:
522 _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK)
523 return 1
524 else:
525 return 0
527 self.callback = _ffi.callback(
528 "int (*)(int, X509_STORE_CTX *)", wrapper
529 )
532class _ALPNSelectHelper(_CallbackExceptionHelper):
533 """
534 Wrap a callback such that it can be used as an ALPN selection callback.
535 """
537 def __init__(self, callback: _ALPNSelectCallback) -> None:
538 _CallbackExceptionHelper.__init__(self)
540 @wraps(callback)
541 def wrapper(ssl, out, outlen, in_, inlen, arg): # type: ignore[no-untyped-def]
542 try:
543 conn = Connection._reverse_mapping[ssl]
545 # The string passed to us is made up of multiple
546 # length-prefixed bytestrings. We need to split that into a
547 # list.
548 instr = _ffi.buffer(in_, inlen)[:]
549 protolist = []
550 while instr:
551 encoded_len = instr[0]
552 proto = instr[1 : encoded_len + 1]
553 protolist.append(proto)
554 instr = instr[encoded_len + 1 :]
556 # Call the callback
557 outbytes = callback(conn, protolist)
558 any_accepted = True
559 if outbytes is NO_OVERLAPPING_PROTOCOLS:
560 outbytes = b""
561 any_accepted = False
562 elif not isinstance(outbytes, bytes):
563 raise TypeError(
564 "ALPN callback must return a bytestring or the "
565 "special NO_OVERLAPPING_PROTOCOLS sentinel value."
566 )
568 # Save our callback arguments on the connection object to make
569 # sure that they don't get freed before OpenSSL can use them.
570 # Then, return them in the appropriate output parameters.
571 conn._alpn_select_callback_args = [
572 _ffi.new("unsigned char *", len(outbytes)),
573 _ffi.new("unsigned char[]", outbytes),
574 ]
575 outlen[0] = conn._alpn_select_callback_args[0][0]
576 out[0] = conn._alpn_select_callback_args[1]
577 if not any_accepted:
578 return _lib.SSL_TLSEXT_ERR_NOACK
579 return _lib.SSL_TLSEXT_ERR_OK
580 except Exception as e:
581 self._problems.append(e)
582 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL
584 self.callback = _ffi.callback(
585 (
586 "int (*)(SSL *, unsigned char **, unsigned char *, "
587 "const unsigned char *, unsigned int, void *)"
588 ),
589 wrapper,
590 )
593class _OCSPServerCallbackHelper(_CallbackExceptionHelper):
594 """
595 Wrap a callback such that it can be used as an OCSP callback for the server
596 side.
598 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
599 ways. For servers, that callback is expected to retrieve some OCSP data and
600 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
601 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
602 is expected to check the OCSP data, and returns a negative value on error,
603 0 if the response is not acceptable, or positive if it is. These are
604 mutually exclusive return code behaviours, and they mean that we need two
605 helpers so that we always return an appropriate error code if the user's
606 code throws an exception.
608 Given that we have to have two helpers anyway, these helpers are a bit more
609 helpery than most: specifically, they hide a few more of the OpenSSL
610 functions so that the user has an easier time writing these callbacks.
612 This helper implements the server side.
613 """
615 def __init__(self, callback: _OCSPServerCallback[Any]) -> None:
616 _CallbackExceptionHelper.__init__(self)
618 @wraps(callback)
619 def wrapper(ssl, cdata): # type: ignore[no-untyped-def]
620 try:
621 conn = Connection._reverse_mapping[ssl]
623 # Extract the data if any was provided.
624 if cdata != _ffi.NULL:
625 data = _ffi.from_handle(cdata)
626 else:
627 data = None
629 # Call the callback.
630 ocsp_data = callback(conn, data)
632 if not isinstance(ocsp_data, bytes):
633 raise TypeError("OCSP callback must return a bytestring.")
635 # If the OCSP data was provided, we will pass it to OpenSSL.
636 # However, we have an early exit here: if no OCSP data was
637 # provided we will just exit out and tell OpenSSL that there
638 # is nothing to do.
639 if not ocsp_data:
640 return 3 # SSL_TLSEXT_ERR_NOACK
642 # OpenSSL takes ownership of this data and expects it to have
643 # been allocated by OPENSSL_malloc.
644 ocsp_data_length = len(ocsp_data)
645 data_ptr = _lib.OPENSSL_malloc(ocsp_data_length)
646 _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data
648 _lib.SSL_set_tlsext_status_ocsp_resp(
649 ssl, data_ptr, ocsp_data_length
650 )
652 return 0
653 except Exception as e:
654 self._problems.append(e)
655 return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
657 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
660class _OCSPClientCallbackHelper(_CallbackExceptionHelper):
661 """
662 Wrap a callback such that it can be used as an OCSP callback for the client
663 side.
665 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
666 ways. For servers, that callback is expected to retrieve some OCSP data and
667 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
668 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
669 is expected to check the OCSP data, and returns a negative value on error,
670 0 if the response is not acceptable, or positive if it is. These are
671 mutually exclusive return code behaviours, and they mean that we need two
672 helpers so that we always return an appropriate error code if the user's
673 code throws an exception.
675 Given that we have to have two helpers anyway, these helpers are a bit more
676 helpery than most: specifically, they hide a few more of the OpenSSL
677 functions so that the user has an easier time writing these callbacks.
679 This helper implements the client side.
680 """
682 def __init__(self, callback: _OCSPClientCallback[Any]) -> None:
683 _CallbackExceptionHelper.__init__(self)
685 @wraps(callback)
686 def wrapper(ssl, cdata): # type: ignore[no-untyped-def]
687 try:
688 conn = Connection._reverse_mapping[ssl]
690 # Extract the data if any was provided.
691 if cdata != _ffi.NULL:
692 data = _ffi.from_handle(cdata)
693 else:
694 data = None
696 # Get the OCSP data.
697 ocsp_ptr = _ffi.new("unsigned char **")
698 ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr)
699 if ocsp_len < 0:
700 # No OCSP data.
701 ocsp_data = b""
702 else:
703 # Copy the OCSP data, then pass it to the callback.
704 ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:]
706 valid = callback(conn, ocsp_data, data)
708 # Return 1 on success or 0 on error.
709 return int(bool(valid))
711 except Exception as e:
712 self._problems.append(e)
713 # Return negative value if an exception is hit.
714 return -1
716 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
719class _CookieGenerateCallbackHelper(_CallbackExceptionHelper):
720 def __init__(self, callback: _CookieGenerateCallback) -> None:
721 _CallbackExceptionHelper.__init__(self)
723 max_cookie_len = getattr(_lib, "DTLS1_COOKIE_LENGTH", 255)
725 @wraps(callback)
726 def wrapper(ssl, out, outlen): # type: ignore[no-untyped-def]
727 try:
728 conn = Connection._reverse_mapping[ssl]
729 cookie = callback(conn)
730 if len(cookie) > max_cookie_len:
731 raise ValueError(
732 f"Cookie too long (got {len(cookie)} bytes, "
733 f"max {max_cookie_len})"
734 )
735 out[0 : len(cookie)] = cookie
736 outlen[0] = len(cookie)
737 return 1
738 except Exception as e:
739 self._problems.append(e)
740 # "a zero return value can be used to abort the handshake"
741 return 0
743 self.callback = _ffi.callback(
744 "int (*)(SSL *, unsigned char *, unsigned int *)",
745 wrapper,
746 )
749class _CookieVerifyCallbackHelper(_CallbackExceptionHelper):
750 def __init__(self, callback: _CookieVerifyCallback) -> None:
751 _CallbackExceptionHelper.__init__(self)
753 @wraps(callback)
754 def wrapper(ssl, c_cookie, cookie_len): # type: ignore[no-untyped-def]
755 try:
756 conn = Connection._reverse_mapping[ssl]
757 return callback(conn, bytes(c_cookie[0:cookie_len]))
758 except Exception as e:
759 self._problems.append(e)
760 return 0
762 self.callback = _ffi.callback(
763 "int (*)(SSL *, unsigned char *, unsigned int)",
764 wrapper,
765 )
768def _asFileDescriptor(obj: Any) -> int:
769 fd = None
770 if not isinstance(obj, int):
771 meth = getattr(obj, "fileno", None)
772 if meth is not None:
773 obj = meth()
775 if isinstance(obj, int):
776 fd = obj
778 if not isinstance(fd, int):
779 raise TypeError("argument must be an int, or have a fileno() method.")
780 elif fd < 0:
781 raise ValueError(
782 f"file descriptor cannot be a negative integer ({fd:i})"
783 )
785 return fd
788def OpenSSL_version(type: int) -> bytes:
789 """
790 Return a string describing the version of OpenSSL in use.
792 :param type: One of the :const:`OPENSSL_` constants defined in this module.
793 """
794 return _ffi.string(_lib.OpenSSL_version(type))
797SSLeay_version = OpenSSL_version
800def _make_requires(flag: int, error: str) -> Callable[[_T], _T]:
801 """
802 Builds a decorator that ensures that functions that rely on OpenSSL
803 functions that are not present in this build raise NotImplementedError,
804 rather than AttributeError coming out of cryptography.
806 :param flag: A cryptography flag that guards the functions, e.g.
807 ``Cryptography_HAS_NEXTPROTONEG``.
808 :param error: The string to be used in the exception if the flag is false.
809 """
811 def _requires_decorator(func): # type: ignore[no-untyped-def]
812 if not flag:
814 @wraps(func)
815 def explode(*args, **kwargs): # type: ignore[no-untyped-def]
816 raise NotImplementedError(error)
818 return explode
819 else:
820 return func
822 return _requires_decorator
825_requires_keylog = _make_requires(
826 getattr(_lib, "Cryptography_HAS_KEYLOG", 0), "Key logging not available"
827)
829_requires_ssl_get0_group_name = _make_requires(
830 getattr(_lib, "Cryptography_HAS_SSL_GET0_GROUP_NAME", 0),
831 "Getting group name is not supported by the linked OpenSSL version",
832)
834_requires_ssl_cookie = _make_requires(
835 getattr(_lib, "Cryptography_HAS_SSL_COOKIE", 0),
836 "DTLS cookie support is not available",
837)
840class Session:
841 """
842 A class representing an SSL session. A session defines certain connection
843 parameters which may be re-used to speed up the setup of subsequent
844 connections.
846 .. versionadded:: 0.14
847 """
849 _session: Any
852F = TypeVar("F", bound=Callable[..., Any])
855def _require_not_used(f: F) -> F:
856 @wraps(f)
857 def inner(self: Context, *args: Any, **kwargs: Any) -> Any:
858 if self._used:
859 warnings.warn(
860 (
861 "Attempting to mutate a Context after a Connection was "
862 "created. In the future, this will raise an exception"
863 ),
864 DeprecationWarning,
865 stacklevel=2,
866 )
867 return f(self, *args, **kwargs)
869 return typing.cast(F, inner)
872class Context:
873 """
874 :class:`OpenSSL.SSL.Context` instances define the parameters for setting
875 up new SSL connections.
877 :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD,
878 DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD.
879 SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should
880 not be used.
881 """
883 _methods: typing.ClassVar[
884 dict[int, tuple[Callable[[], Any], int | None]]
885 ] = {
886 SSLv23_METHOD: (_lib.TLS_method, None),
887 TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION),
888 TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION),
889 TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION),
890 TLS_METHOD: (_lib.TLS_method, None),
891 TLS_SERVER_METHOD: (_lib.TLS_server_method, None),
892 TLS_CLIENT_METHOD: (_lib.TLS_client_method, None),
893 DTLS_METHOD: (_lib.DTLS_method, None),
894 DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None),
895 DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None),
896 }
898 def __init__(self, method: int) -> None:
899 if not isinstance(method, int):
900 raise TypeError("method must be an integer")
902 try:
903 method_func, version = self._methods[method]
904 except KeyError:
905 raise ValueError("No such protocol")
907 method_obj = method_func()
908 _openssl_assert(method_obj != _ffi.NULL)
910 context = _lib.SSL_CTX_new(method_obj)
911 _openssl_assert(context != _ffi.NULL)
912 context = _ffi.gc(context, _lib.SSL_CTX_free)
914 self._context = context
915 self._used = False
916 self._passphrase_helper: _PassphraseHelper | None = None
917 self._passphrase_callback: _PassphraseCallback[Any] | None = None
918 self._passphrase_userdata: Any | None = None
919 self._verify_helper: _VerifyHelper | None = None
920 self._verify_callback: _VerifyCallback | None = None
921 self._info_callback = None
922 self._keylog_callback = None
923 self._tlsext_servername_callback = None
924 self._app_data = None
925 self._alpn_select_helper: _ALPNSelectHelper | None = None
926 self._alpn_select_callback: _ALPNSelectCallback | None = None
927 self._ocsp_helper: (
928 _OCSPClientCallbackHelper | _OCSPServerCallbackHelper | None
929 ) = None
930 self._ocsp_callback: (
931 _OCSPClientCallback[Any] | _OCSPServerCallback[Any] | None
932 ) = None
933 self._ocsp_data: Any | None = None
934 self._cookie_generate_helper: _CookieGenerateCallbackHelper | None = (
935 None
936 )
937 self._cookie_verify_helper: _CookieVerifyCallbackHelper | None = None
939 self.set_mode(
940 _lib.SSL_MODE_ENABLE_PARTIAL_WRITE
941 | _lib.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER
942 )
943 if version is not None:
944 self.set_min_proto_version(version)
945 self.set_max_proto_version(version)
947 @_require_not_used
948 def set_min_proto_version(self, version: int) -> None:
949 """
950 Set the minimum supported protocol version. Setting the minimum
951 version to 0 will enable protocol versions down to the lowest version
952 supported by the library.
954 If the underlying OpenSSL build is missing support for the selected
955 version, this method will raise an exception.
956 """
957 _openssl_assert(
958 _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1
959 )
961 @_require_not_used
962 def set_max_proto_version(self, version: int) -> None:
963 """
964 Set the maximum supported protocol version. Setting the maximum
965 version to 0 will enable protocol versions up to the highest version
966 supported by the library.
968 If the underlying OpenSSL build is missing support for the selected
969 version, this method will raise an exception.
970 """
971 _openssl_assert(
972 _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1
973 )
975 @_require_not_used
976 def load_verify_locations(
977 self,
978 cafile: _StrOrBytesPath | None,
979 capath: _StrOrBytesPath | None = None,
980 ) -> None:
981 """
982 Let SSL know where we can find trusted certificates for the certificate
983 chain. Note that the certificates have to be in PEM format.
985 If capath is passed, it must be a directory prepared using the
986 ``c_rehash`` tool included with OpenSSL. Either, but not both, of
987 *pemfile* or *capath* may be :data:`None`.
989 :param cafile: In which file we can find the certificates (``bytes`` or
990 ``str``).
991 :param capath: In which directory we can find the certificates
992 (``bytes`` or ``str``).
994 :return: None
995 """
996 if cafile is None:
997 cafile = _ffi.NULL
998 else:
999 cafile = _path_bytes(cafile)
1001 if capath is None:
1002 capath = _ffi.NULL
1003 else:
1004 capath = _path_bytes(capath)
1006 load_result = _lib.SSL_CTX_load_verify_locations(
1007 self._context, cafile, capath
1008 )
1009 if not load_result:
1010 _raise_current_error()
1012 def _wrap_callback(
1013 self, callback: _PassphraseCallback[_T]
1014 ) -> _PassphraseHelper:
1015 @wraps(callback)
1016 def wrapper(size: int, verify: bool, userdata: Any) -> bytes:
1017 return callback(size, verify, self._passphrase_userdata)
1019 return _PassphraseHelper(
1020 FILETYPE_PEM, wrapper, more_args=True, truncate=True
1021 )
1023 @_require_not_used
1024 def set_passwd_cb(
1025 self,
1026 callback: _PassphraseCallback[_T],
1027 userdata: _T | None = None,
1028 ) -> None:
1029 """
1030 Set the passphrase callback. This function will be called
1031 when a private key with a passphrase is loaded.
1033 :param callback: The Python callback to use. This must accept three
1034 positional arguments. First, an integer giving the maximum length
1035 of the passphrase it may return. If the returned passphrase is
1036 longer than this, it will be truncated. Second, a boolean value
1037 which will be true if the user should be prompted for the
1038 passphrase twice and the callback should verify that the two values
1039 supplied are equal. Third, the value given as the *userdata*
1040 parameter to :meth:`set_passwd_cb`. The *callback* must return
1041 a byte string. If an error occurs, *callback* should return a false
1042 value (e.g. an empty string).
1043 :param userdata: (optional) A Python object which will be given as
1044 argument to the callback
1045 :return: None
1046 """
1047 if not callable(callback):
1048 raise TypeError("callback must be callable")
1050 self._passphrase_helper = self._wrap_callback(callback)
1051 self._passphrase_callback = self._passphrase_helper.callback
1052 _lib.SSL_CTX_set_default_passwd_cb(
1053 self._context, self._passphrase_callback
1054 )
1055 self._passphrase_userdata = userdata
1057 @_require_not_used
1058 def set_default_verify_paths(self) -> None:
1059 """
1060 Specify that the platform provided CA certificates are to be used for
1061 verification purposes. This method has some caveats related to the
1062 binary wheels that cryptography (pyOpenSSL's primary dependency) ships:
1064 * macOS will only load certificates using this method if the user has
1065 the ``openssl@3`` `Homebrew <https://brew.sh>`_ formula installed
1066 in the default location.
1067 * Windows will not work.
1068 * manylinux cryptography wheels will work on most common Linux
1069 distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the
1070 manylinux wheel and attempts to load roots via a fallback path.
1072 :return: None
1073 """
1074 # SSL_CTX_set_default_verify_paths will attempt to load certs from
1075 # both a cafile and capath that are set at compile time. However,
1076 # it will first check environment variables and, if present, load
1077 # those paths instead
1078 set_result = _lib.SSL_CTX_set_default_verify_paths(self._context)
1079 _openssl_assert(set_result == 1)
1080 # After attempting to set default_verify_paths we need to know whether
1081 # to go down the fallback path.
1082 # First we'll check to see if any env vars have been set. If so,
1083 # we won't try to do anything else because the user has set the path
1084 # themselves.
1085 if not self._check_env_vars_set("SSL_CERT_DIR", "SSL_CERT_FILE"):
1086 default_dir = _ffi.string(_lib.X509_get_default_cert_dir())
1087 default_file = _ffi.string(_lib.X509_get_default_cert_file())
1088 # Now we check to see if the default_dir and default_file are set
1089 # to the exact values we use in our manylinux builds. If they are
1090 # then we know to load the fallbacks
1091 if (
1092 default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR
1093 and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE
1094 ):
1095 # This is manylinux, let's load our fallback paths
1096 self._fallback_default_verify_paths(
1097 _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS
1098 )
1100 def _check_env_vars_set(self, dir_env_var: str, file_env_var: str) -> bool:
1101 """
1102 Check to see if the default cert dir/file environment vars are present.
1104 :return: bool
1105 """
1106 return (
1107 os.environ.get(file_env_var) is not None
1108 or os.environ.get(dir_env_var) is not None
1109 )
1111 def _fallback_default_verify_paths(
1112 self, file_path: list[str], dir_path: list[str]
1113 ) -> None:
1114 """
1115 Default verify paths are based on the compiled version of OpenSSL.
1116 However, when pyca/cryptography is compiled as a manylinux wheel
1117 that compiled location can potentially be wrong. So, like Go, we
1118 will try a predefined set of paths and attempt to load roots
1119 from there.
1121 :return: None
1122 """
1123 for cafile in file_path:
1124 if os.path.isfile(cafile):
1125 self.load_verify_locations(cafile)
1126 break
1128 for capath in dir_path:
1129 if os.path.isdir(capath):
1130 self.load_verify_locations(None, capath)
1131 break
1133 @_require_not_used
1134 def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None:
1135 """
1136 Load a certificate chain from a file.
1138 :param certfile: The name of the certificate chain file (``bytes`` or
1139 ``str``). Must be PEM encoded.
1141 :return: None
1142 """
1143 certfile = _path_bytes(certfile)
1145 result = _lib.SSL_CTX_use_certificate_chain_file(
1146 self._context, certfile
1147 )
1148 if not result:
1149 _raise_current_error()
1151 @_require_not_used
1152 def use_certificate_file(
1153 self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM
1154 ) -> None:
1155 """
1156 Load a certificate from a file
1158 :param certfile: The name of the certificate file (``bytes`` or
1159 ``str``).
1160 :param filetype: (optional) The encoding of the file, which is either
1161 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
1162 :const:`FILETYPE_PEM`.
1164 :return: None
1165 """
1166 certfile = _path_bytes(certfile)
1167 if not isinstance(filetype, int):
1168 raise TypeError("filetype must be an integer")
1170 use_result = _lib.SSL_CTX_use_certificate_file(
1171 self._context, certfile, filetype
1172 )
1173 if not use_result:
1174 _raise_current_error()
1176 @_require_not_used
1177 def use_certificate(self, cert: X509 | x509.Certificate) -> None:
1178 """
1179 Load a certificate from a X509 object
1181 :param cert: The X509 object
1182 :return: None
1183 """
1184 # Mirrored at Connection.use_certificate
1185 if not isinstance(cert, X509):
1186 cert = X509.from_cryptography(cert)
1187 else:
1188 warnings.warn(
1189 (
1190 "Passing pyOpenSSL X509 objects is deprecated. You "
1191 "should use a cryptography.x509.Certificate instead."
1192 ),
1193 DeprecationWarning,
1194 stacklevel=2,
1195 )
1197 use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509)
1198 if not use_result:
1199 _raise_current_error()
1201 @_require_not_used
1202 def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None:
1203 """
1204 Add certificate to chain
1206 :param certobj: The X509 certificate object to add to the chain
1207 :return: None
1208 """
1209 if not isinstance(certobj, X509):
1210 certobj = X509.from_cryptography(certobj)
1211 else:
1212 warnings.warn(
1213 (
1214 "Passing pyOpenSSL X509 objects is deprecated. You "
1215 "should use a cryptography.x509.Certificate instead."
1216 ),
1217 DeprecationWarning,
1218 stacklevel=2,
1219 )
1221 copy = _lib.X509_dup(certobj._x509)
1222 add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy)
1223 if not add_result:
1224 # TODO: This is untested.
1225 _lib.X509_free(copy)
1226 _raise_current_error()
1228 def _raise_passphrase_exception(self) -> None:
1229 if self._passphrase_helper is not None:
1230 self._passphrase_helper.raise_if_problem(Error)
1232 _raise_current_error()
1234 @_require_not_used
1235 def use_privatekey_file(
1236 self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM
1237 ) -> None:
1238 """
1239 Load a private key from a file
1241 :param keyfile: The name of the key file (``bytes`` or ``str``)
1242 :param filetype: (optional) The encoding of the file, which is either
1243 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
1244 :const:`FILETYPE_PEM`.
1246 :return: None
1247 """
1248 keyfile = _path_bytes(keyfile)
1250 if not isinstance(filetype, int):
1251 raise TypeError("filetype must be an integer")
1253 use_result = _lib.SSL_CTX_use_PrivateKey_file(
1254 self._context, keyfile, filetype
1255 )
1256 if not use_result:
1257 self._raise_passphrase_exception()
1259 @_require_not_used
1260 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None:
1261 """
1262 Load a private key from a PKey object
1264 :param pkey: The PKey object
1265 :return: None
1266 """
1267 # Mirrored at Connection.use_privatekey
1268 if not isinstance(pkey, PKey):
1269 pkey = PKey.from_cryptography_key(pkey)
1270 else:
1271 warnings.warn(
1272 (
1273 "Passing pyOpenSSL PKey objects is deprecated. You "
1274 "should use a cryptography private key instead."
1275 ),
1276 DeprecationWarning,
1277 stacklevel=2,
1278 )
1280 use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey)
1281 if not use_result:
1282 self._raise_passphrase_exception()
1284 def check_privatekey(self) -> None:
1285 """
1286 Check if the private key (loaded with :meth:`use_privatekey`) matches
1287 the certificate (loaded with :meth:`use_certificate`)
1289 :return: :data:`None` (raises :exc:`Error` if something's wrong)
1290 """
1291 if not _lib.SSL_CTX_check_private_key(self._context):
1292 _raise_current_error()
1294 @_require_not_used
1295 def load_client_ca(self, cafile: bytes) -> None:
1296 """
1297 Load the trusted certificates that will be sent to the client. Does
1298 not actually imply any of the certificates are trusted; that must be
1299 configured separately.
1301 :param bytes cafile: The path to a certificates file in PEM format.
1302 :return: None
1303 """
1304 ca_list = _lib.SSL_load_client_CA_file(
1305 _text_to_bytes_and_warn("cafile", cafile)
1306 )
1307 _openssl_assert(ca_list != _ffi.NULL)
1308 _lib.SSL_CTX_set_client_CA_list(self._context, ca_list)
1310 @_require_not_used
1311 def set_session_id(self, buf: bytes) -> None:
1312 """
1313 Set the session id to *buf* within which a session can be reused for
1314 this Context object. This is needed when doing session resumption,
1315 because there is no way for a stored session to know which Context
1316 object it is associated with.
1318 :param bytes buf: The session id.
1320 :returns: None
1321 """
1322 buf = _text_to_bytes_and_warn("buf", buf)
1323 _openssl_assert(
1324 _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf))
1325 == 1
1326 )
1328 @_require_not_used
1329 def set_session_cache_mode(self, mode: int) -> int:
1330 """
1331 Set the behavior of the session cache used by all connections using
1332 this Context. The previously set mode is returned. See
1333 :const:`SESS_CACHE_*` for details about particular modes.
1335 :param mode: One or more of the SESS_CACHE_* flags (combine using
1336 bitwise or)
1337 :returns: The previously set caching mode.
1339 .. versionadded:: 0.14
1340 """
1341 if not isinstance(mode, int):
1342 raise TypeError("mode must be an integer")
1344 return _lib.SSL_CTX_set_session_cache_mode(self._context, mode)
1346 def get_session_cache_mode(self) -> int:
1347 """
1348 Get the current session cache mode.
1350 :returns: The currently used cache mode.
1352 .. versionadded:: 0.14
1353 """
1354 return _lib.SSL_CTX_get_session_cache_mode(self._context)
1356 @_require_not_used
1357 def set_verify(
1358 self, mode: int, callback: _VerifyCallback | None = None
1359 ) -> None:
1360 """
1361 Set the verification flags for this Context object to *mode* and
1362 specify that *callback* should be used for verification callbacks.
1364 :param mode: The verify mode, this should be one of
1365 :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If
1366 :const:`VERIFY_PEER` is used, *mode* can be OR:ed with
1367 :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and
1368 :const:`VERIFY_CLIENT_ONCE` to further control the behaviour.
1369 :param callback: The optional Python verification callback to use.
1370 This should take five arguments: A Connection object, an X509
1371 object, and three integer variables, which are in turn potential
1372 error number, error depth and return code. *callback* should
1373 return True if verification passes and False otherwise.
1374 If omitted, OpenSSL's default verification is used.
1375 :return: None
1377 See SSL_CTX_set_verify(3SSL) for further details.
1378 """
1379 if not isinstance(mode, int):
1380 raise TypeError("mode must be an integer")
1382 if callback is None:
1383 self._verify_helper = None
1384 self._verify_callback = None
1385 _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL)
1386 else:
1387 if not callable(callback):
1388 raise TypeError("callback must be callable")
1390 self._verify_helper = _VerifyHelper(callback)
1391 self._verify_callback = self._verify_helper.callback
1392 _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
1394 @_require_not_used
1395 def set_verify_depth(self, depth: int) -> None:
1396 """
1397 Set the maximum depth for the certificate chain verification that shall
1398 be allowed for this Context object.
1400 :param depth: An integer specifying the verify depth
1401 :return: None
1402 """
1403 if not isinstance(depth, int):
1404 raise TypeError("depth must be an integer")
1406 _lib.SSL_CTX_set_verify_depth(self._context, depth)
1408 def get_verify_mode(self) -> int:
1409 """
1410 Retrieve the Context object's verify mode, as set by
1411 :meth:`set_verify`.
1413 :return: The verify mode
1414 """
1415 return _lib.SSL_CTX_get_verify_mode(self._context)
1417 def get_verify_depth(self) -> int:
1418 """
1419 Retrieve the Context object's verify depth, as set by
1420 :meth:`set_verify_depth`.
1422 :return: The verify depth
1423 """
1424 return _lib.SSL_CTX_get_verify_depth(self._context)
1426 @_require_not_used
1427 def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None:
1428 """
1429 Load parameters for Ephemeral Diffie-Hellman
1431 :param dhfile: The file to load EDH parameters from (``bytes`` or
1432 ``str``).
1434 :return: None
1435 """
1436 dhfile = _path_bytes(dhfile)
1438 bio = _lib.BIO_new_file(dhfile, b"r")
1439 if bio == _ffi.NULL:
1440 _raise_current_error()
1441 bio = _ffi.gc(bio, _lib.BIO_free)
1443 dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
1444 dh = _ffi.gc(dh, _lib.DH_free)
1445 res = _lib.SSL_CTX_set_tmp_dh(self._context, dh)
1446 _openssl_assert(res == 1)
1448 @_require_not_used
1449 def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None:
1450 """
1451 Select a curve to use for ECDHE key exchange.
1453 :param curve: A curve instance from cryptography
1454 (:class:`~cryptogragraphy.hazmat.primitives.asymmetric.ec.EllipticCurve`).
1455 Alternatively (deprecated) a curve object from either
1456 :meth:`OpenSSL.crypto.get_elliptic_curve` or
1457 :meth:`OpenSSL.crypto.get_elliptic_curves`.
1459 :return: None
1460 """
1462 if isinstance(curve, _EllipticCurve):
1463 warnings.warn(
1464 (
1465 "Passing pyOpenSSL elliptic curves to set_tmp_ecdh is "
1466 "deprecated. You should use cryptography's elliptic curve "
1467 "types instead."
1468 ),
1469 DeprecationWarning,
1470 stacklevel=2,
1471 )
1472 _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY())
1473 else:
1474 name = curve.name
1475 if name == "secp192r1":
1476 name = "prime192v1"
1477 elif name == "secp256r1":
1478 name = "prime256v1"
1479 nid = _lib.OBJ_txt2nid(name.encode())
1480 if nid == _lib.NID_undef:
1481 _raise_current_error()
1483 ec = _lib.EC_KEY_new_by_curve_name(nid)
1484 _openssl_assert(ec != _ffi.NULL)
1485 ec = _ffi.gc(ec, _lib.EC_KEY_free)
1486 _lib.SSL_CTX_set_tmp_ecdh(self._context, ec)
1488 @_require_not_used
1489 def set_cipher_list(self, cipher_list: bytes) -> None:
1490 """
1491 Set the list of ciphers to be used in this context.
1493 See the OpenSSL manual for more information (e.g.
1494 :manpage:`ciphers(1)`).
1496 Note this API does not change the cipher suites used in TLS 1.3
1497 Use `set_tls13_ciphersuites` for that.
1499 :param bytes cipher_list: An OpenSSL cipher string.
1500 :return: None
1501 """
1502 cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list)
1504 if not isinstance(cipher_list, bytes):
1505 raise TypeError("cipher_list must be a byte string.")
1507 _openssl_assert(
1508 _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1
1509 )
1511 @_require_not_used
1512 def set_tls13_ciphersuites(self, ciphersuites: bytes) -> None:
1513 """
1514 Set the list of TLS 1.3 ciphers to be used in this context.
1515 OpenSSL maintains a separate list of TLS 1.3+ ciphers to
1516 ciphers for TLS 1.2 and lowers.
1518 See the OpenSSL manual for more information (e.g.
1519 :manpage:`ciphers(1)`).
1521 :param bytes ciphersuites: An OpenSSL cipher string containing
1522 TLS 1.3+ ciphersuites.
1523 :return: None
1525 .. versionadded:: 25.2.0
1526 """
1527 if not isinstance(ciphersuites, bytes):
1528 raise TypeError("ciphersuites must be a byte string.")
1530 _openssl_assert(
1531 _lib.SSL_CTX_set_ciphersuites(self._context, ciphersuites) == 1
1532 )
1534 @_require_not_used
1535 def set_client_ca_list(
1536 self, certificate_authorities: Sequence[X509Name]
1537 ) -> None:
1538 """
1539 Set the list of preferred client certificate signers for this server
1540 context.
1542 This list of certificate authorities will be sent to the client when
1543 the server requests a client certificate.
1545 :param certificate_authorities: a sequence of X509Names.
1546 :return: None
1548 .. versionadded:: 0.10
1549 """
1550 name_stack = _lib.sk_X509_NAME_new_null()
1551 _openssl_assert(name_stack != _ffi.NULL)
1553 try:
1554 for ca_name in certificate_authorities:
1555 if not isinstance(ca_name, X509Name):
1556 raise TypeError(
1557 f"client CAs must be X509Name objects, not "
1558 f"{type(ca_name).__name__} objects"
1559 )
1560 copy = _lib.X509_NAME_dup(ca_name._name)
1561 _openssl_assert(copy != _ffi.NULL)
1562 push_result = _lib.sk_X509_NAME_push(name_stack, copy)
1563 if not push_result:
1564 _lib.X509_NAME_free(copy)
1565 _raise_current_error()
1566 except Exception:
1567 _lib.sk_X509_NAME_free(name_stack)
1568 raise
1570 _lib.SSL_CTX_set_client_CA_list(self._context, name_stack)
1572 @_require_not_used
1573 def add_client_ca(
1574 self, certificate_authority: X509 | x509.Certificate
1575 ) -> None:
1576 """
1577 Add the CA certificate to the list of preferred signers for this
1578 context.
1580 The list of certificate authorities will be sent to the client when the
1581 server requests a client certificate.
1583 :param certificate_authority: certificate authority's X509 certificate.
1584 :return: None
1586 .. versionadded:: 0.10
1587 """
1588 if not isinstance(certificate_authority, X509):
1589 certificate_authority = X509.from_cryptography(
1590 certificate_authority
1591 )
1592 else:
1593 warnings.warn(
1594 (
1595 "Passing pyOpenSSL X509 objects is deprecated. You "
1596 "should use a cryptography.x509.Certificate instead."
1597 ),
1598 DeprecationWarning,
1599 stacklevel=2,
1600 )
1602 add_result = _lib.SSL_CTX_add_client_CA(
1603 self._context, certificate_authority._x509
1604 )
1605 _openssl_assert(add_result == 1)
1607 @_require_not_used
1608 def set_timeout(self, timeout: int) -> None:
1609 """
1610 Set the timeout for newly created sessions for this Context object to
1611 *timeout*. The default value is 300 seconds. See the OpenSSL manual
1612 for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`).
1614 :param timeout: The timeout in (whole) seconds
1615 :return: The previous session timeout
1616 """
1617 if not isinstance(timeout, int):
1618 raise TypeError("timeout must be an integer")
1620 return _lib.SSL_CTX_set_timeout(self._context, timeout)
1622 def get_timeout(self) -> int:
1623 """
1624 Retrieve session timeout, as set by :meth:`set_timeout`. The default
1625 is 300 seconds.
1627 :return: The session timeout
1628 """
1629 return _lib.SSL_CTX_get_timeout(self._context)
1631 @_require_not_used
1632 def set_info_callback(
1633 self, callback: Callable[[Connection, int, int], None]
1634 ) -> None:
1635 """
1636 Set the information callback to *callback*. This function will be
1637 called from time to time during SSL handshakes.
1639 :param callback: The Python callback to use. This should take three
1640 arguments: a Connection object and two integers. The first integer
1641 specifies where in the SSL handshake the function was called, and
1642 the other the return code from a (possibly failed) internal
1643 function call.
1644 :return: None
1645 """
1647 @wraps(callback)
1648 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def]
1649 callback(Connection._reverse_mapping[ssl], where, return_code)
1651 self._info_callback = _ffi.callback(
1652 "void (*)(const SSL *, int, int)", wrapper
1653 )
1654 _lib.SSL_CTX_set_info_callback(self._context, self._info_callback)
1656 @_requires_keylog
1657 @_require_not_used
1658 def set_keylog_callback(
1659 self, callback: Callable[[Connection, bytes], None]
1660 ) -> None:
1661 """
1662 Set the TLS key logging callback to *callback*. This function will be
1663 called whenever TLS key material is generated or received, in order
1664 to allow applications to store this keying material for debugging
1665 purposes.
1667 :param callback: The Python callback to use. This should take two
1668 arguments: a Connection object and a bytestring that contains
1669 the key material in the format used by NSS for its SSLKEYLOGFILE
1670 debugging output.
1671 :return: None
1672 """
1674 @wraps(callback)
1675 def wrapper(ssl, line): # type: ignore[no-untyped-def]
1676 line = _ffi.string(line)
1677 callback(Connection._reverse_mapping[ssl], line)
1679 self._keylog_callback = _ffi.callback(
1680 "void (*)(const SSL *, const char *)", wrapper
1681 )
1682 _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback)
1684 def get_app_data(self) -> Any:
1685 """
1686 Get the application data (supplied via :meth:`set_app_data()`)
1688 :return: The application data
1689 """
1690 return self._app_data
1692 @_require_not_used
1693 def set_app_data(self, data: Any) -> None:
1694 """
1695 Set the application data (will be returned from get_app_data())
1697 :param data: Any Python object
1698 :return: None
1699 """
1700 self._app_data = data
1702 def get_cert_store(self) -> X509Store | None:
1703 """
1704 Get the certificate store for the context. This can be used to add
1705 "trusted" certificates without using the
1706 :meth:`load_verify_locations` method.
1708 :return: A X509Store object or None if it does not have one.
1709 """
1710 store = _lib.SSL_CTX_get_cert_store(self._context)
1711 if store == _ffi.NULL:
1712 # TODO: This is untested.
1713 return None
1715 pystore = X509Store.__new__(X509Store)
1716 pystore._store = store
1717 return pystore
1719 @_require_not_used
1720 def set_options(self, options: int) -> int:
1721 """
1722 Add options. Options set before are not cleared!
1723 This method should be used with the :const:`OP_*` constants.
1725 :param options: The options to add.
1726 :return: The new option bitmask.
1727 """
1728 if not isinstance(options, int):
1729 raise TypeError("options must be an integer")
1731 return _lib.SSL_CTX_set_options(self._context, options)
1733 @_require_not_used
1734 def set_mode(self, mode: int) -> int:
1735 """
1736 Add modes via bitmask. Modes set before are not cleared! This method
1737 should be used with the :const:`MODE_*` constants.
1739 :param mode: The mode to add.
1740 :return: The new mode bitmask.
1741 """
1742 if not isinstance(mode, int):
1743 raise TypeError("mode must be an integer")
1745 return _lib.SSL_CTX_set_mode(self._context, mode)
1747 @_require_not_used
1748 def clear_mode(self, mode_to_clear: int) -> int:
1749 """
1750 Modes previously set cannot be overwritten without being
1751 cleared first. This method should be used to clear existing modes.
1752 """
1753 return _lib.SSL_CTX_clear_mode(self._context, mode_to_clear)
1755 @_require_not_used
1756 def set_tlsext_servername_callback(
1757 self, callback: Callable[[Connection], None]
1758 ) -> None:
1759 """
1760 Specify a callback function to be called when clients specify a server
1761 name.
1763 :param callback: The callback function. It will be invoked with one
1764 argument, the Connection instance.
1766 .. versionadded:: 0.13
1767 """
1769 @wraps(callback)
1770 def wrapper(ssl, alert, arg): # type: ignore[no-untyped-def]
1771 try:
1772 callback(Connection._reverse_mapping[ssl])
1773 except Exception:
1774 sys.excepthook(*sys.exc_info())
1775 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL
1776 return 0
1778 self._tlsext_servername_callback = _ffi.callback(
1779 "int (*)(SSL *, int *, void *)", wrapper
1780 )
1781 _lib.SSL_CTX_set_tlsext_servername_callback(
1782 self._context, self._tlsext_servername_callback
1783 )
1785 @_require_not_used
1786 def set_tlsext_use_srtp(self, profiles: bytes) -> None:
1787 """
1788 Enable support for negotiating SRTP keying material.
1790 :param bytes profiles: A colon delimited list of protection profile
1791 names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``.
1792 :return: None
1793 """
1794 if not isinstance(profiles, bytes):
1795 raise TypeError("profiles must be a byte string.")
1797 _openssl_assert(
1798 _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0
1799 )
1801 @_require_not_used
1802 def set_alpn_protos(self, protos: list[bytes]) -> None:
1803 """
1804 Specify the protocols that the client is prepared to speak after the
1805 TLS connection has been negotiated using Application Layer Protocol
1806 Negotiation.
1808 :param protos: A list of the protocols to be offered to the server.
1809 This list should be a Python list of bytestrings representing the
1810 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
1811 """
1812 # Different versions of OpenSSL are inconsistent about how they handle
1813 # empty proto lists (see #1043), so we avoid the problem entirely by
1814 # rejecting them ourselves.
1815 if not protos:
1816 raise ValueError("at least one protocol must be specified")
1818 # Take the list of protocols and join them together, prefixing them
1819 # with their lengths.
1820 protostr = b"".join(
1821 chain.from_iterable((bytes((len(p),)), p) for p in protos)
1822 )
1824 # Build a C string from the list. We don't need to save this off
1825 # because OpenSSL immediately copies the data out.
1826 input_str = _ffi.new("unsigned char[]", protostr)
1828 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
1829 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
1830 # return 0 on success, and non-0 on failure.
1831 # WARNING: these functions reverse the return value convention.
1832 _openssl_assert(
1833 _lib.SSL_CTX_set_alpn_protos(
1834 self._context, input_str, len(protostr)
1835 )
1836 == 0
1837 )
1839 @_require_not_used
1840 def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None:
1841 """
1842 Specify a callback function that will be called on the server when a
1843 client offers protocols using ALPN.
1845 :param callback: The callback function. It will be invoked with two
1846 arguments: the Connection, and a list of offered protocols as
1847 bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return
1848 one of those bytestrings to indicate the chosen protocol, the
1849 empty bytestring to terminate the TLS connection, or the
1850 :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered
1851 protocol was selected, but that the connection should not be
1852 aborted.
1853 """
1854 self._alpn_select_helper = _ALPNSelectHelper(callback)
1855 self._alpn_select_callback = self._alpn_select_helper.callback
1856 _lib.SSL_CTX_set_alpn_select_cb(
1857 self._context, self._alpn_select_callback, _ffi.NULL
1858 )
1860 def _set_ocsp_callback(
1861 self,
1862 helper: _OCSPClientCallbackHelper | _OCSPServerCallbackHelper,
1863 data: Any | None,
1864 ) -> None:
1865 """
1866 This internal helper does the common work for
1867 ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is
1868 almost all of it.
1869 """
1870 self._ocsp_helper = helper
1871 self._ocsp_callback = helper.callback
1872 if data is None:
1873 self._ocsp_data = _ffi.NULL
1874 else:
1875 self._ocsp_data = _ffi.new_handle(data)
1877 rc = _lib.SSL_CTX_set_tlsext_status_cb(
1878 self._context, self._ocsp_callback
1879 )
1880 _openssl_assert(rc == 1)
1881 rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data)
1882 _openssl_assert(rc == 1)
1884 @_require_not_used
1885 def set_ocsp_server_callback(
1886 self,
1887 callback: _OCSPServerCallback[_T],
1888 data: _T | None = None,
1889 ) -> None:
1890 """
1891 Set a callback to provide OCSP data to be stapled to the TLS handshake
1892 on the server side.
1894 :param callback: The callback function. It will be invoked with two
1895 arguments: the Connection, and the optional arbitrary data you have
1896 provided. The callback must return a bytestring that contains the
1897 OCSP data to staple to the handshake. If no OCSP data is available
1898 for this connection, return the empty bytestring.
1899 :param data: Some opaque data that will be passed into the callback
1900 function when called. This can be used to avoid needing to do
1901 complex data lookups or to keep track of what context is being
1902 used. This parameter is optional.
1903 """
1904 helper = _OCSPServerCallbackHelper(callback)
1905 self._set_ocsp_callback(helper, data)
1907 @_require_not_used
1908 def set_ocsp_client_callback(
1909 self,
1910 callback: _OCSPClientCallback[_T],
1911 data: _T | None = None,
1912 ) -> None:
1913 """
1914 Set a callback to validate OCSP data stapled to the TLS handshake on
1915 the client side.
1917 :param callback: The callback function. It will be invoked with three
1918 arguments: the Connection, a bytestring containing the stapled OCSP
1919 assertion, and the optional arbitrary data you have provided. The
1920 callback must return a boolean that indicates the result of
1921 validating the OCSP data: ``True`` if the OCSP data is valid and
1922 the certificate can be trusted, or ``False`` if either the OCSP
1923 data is invalid or the certificate has been revoked.
1924 :param data: Some opaque data that will be passed into the callback
1925 function when called. This can be used to avoid needing to do
1926 complex data lookups or to keep track of what context is being
1927 used. This parameter is optional.
1928 """
1929 helper = _OCSPClientCallbackHelper(callback)
1930 self._set_ocsp_callback(helper, data)
1932 @_require_not_used
1933 @_requires_ssl_cookie
1934 def set_cookie_generate_callback(
1935 self, callback: _CookieGenerateCallback
1936 ) -> None:
1937 self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback)
1938 _lib.SSL_CTX_set_cookie_generate_cb(
1939 self._context,
1940 self._cookie_generate_helper.callback,
1941 )
1943 @_require_not_used
1944 @_requires_ssl_cookie
1945 def set_cookie_verify_callback(
1946 self, callback: _CookieVerifyCallback
1947 ) -> None:
1948 self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback)
1949 _lib.SSL_CTX_set_cookie_verify_cb(
1950 self._context,
1951 self._cookie_verify_helper.callback,
1952 )
1955class Connection:
1956 _reverse_mapping: typing.MutableMapping[Any, Connection] = (
1957 WeakValueDictionary()
1958 )
1960 def __init__(
1961 self, context: Context, socket: socket.socket | None = None
1962 ) -> None:
1963 """
1964 Create a new Connection object, using the given OpenSSL.SSL.Context
1965 instance and socket.
1967 :param context: An SSL Context to use for this connection
1968 :param socket: The socket to use for transport layer
1969 """
1970 if not isinstance(context, Context):
1971 raise TypeError("context must be a Context instance")
1973 context._used = True
1975 ssl = _lib.SSL_new(context._context)
1976 self._ssl = _ffi.gc(ssl, _lib.SSL_free)
1977 # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns
1978 # an SSL_ERROR_WANT_READ when processing a non-application data packet
1979 # even though there is still data on the underlying transport.
1980 # See https://github.com/openssl/openssl/issues/6234 for more details.
1981 _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY)
1982 self._context = context
1983 self._app_data = None
1985 # References to strings used for Application Layer Protocol
1986 # Negotiation. These strings get copied at some point but it's well
1987 # after the callback returns, so we have to hang them somewhere to
1988 # avoid them getting freed.
1989 self._alpn_select_callback_args: Any = None
1991 # Reference the verify_callback of the Context. This ensures that if
1992 # set_verify is called again after the SSL object has been created we
1993 # do not point to a dangling reference
1994 self._verify_helper = context._verify_helper
1995 self._verify_callback = context._verify_callback
1997 # And likewise for the cookie callbacks
1998 self._cookie_generate_helper = context._cookie_generate_helper
1999 self._cookie_verify_helper = context._cookie_verify_helper
2001 self._reverse_mapping[self._ssl] = self
2003 if socket is None:
2004 self._socket = None
2005 # Don't set up any gc for these, SSL_free will take care of them.
2006 self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem())
2007 _openssl_assert(self._into_ssl != _ffi.NULL)
2009 self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem())
2010 _openssl_assert(self._from_ssl != _ffi.NULL)
2012 _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl)
2013 else:
2014 self._into_ssl = None
2015 self._from_ssl = None
2016 self._socket = socket
2017 set_result = _lib.SSL_set_fd(
2018 self._ssl, _asFileDescriptor(self._socket)
2019 )
2020 _openssl_assert(set_result == 1)
2022 def __getattr__(self, name: str) -> Any:
2023 """
2024 Look up attributes on the wrapped socket object if they are not found
2025 on the Connection object.
2026 """
2027 if self._socket is None:
2028 raise AttributeError(
2029 f"'{self.__class__.__name__}' object has no attribute '{name}'"
2030 )
2031 else:
2032 return getattr(self._socket, name)
2034 def _raise_ssl_error(self, ssl: Any, result: int) -> None:
2035 if self._context._verify_helper is not None:
2036 self._context._verify_helper.raise_if_problem()
2037 if self._context._alpn_select_helper is not None:
2038 self._context._alpn_select_helper.raise_if_problem()
2039 if self._context._ocsp_helper is not None:
2040 self._context._ocsp_helper.raise_if_problem()
2042 error = _lib.SSL_get_error(ssl, result)
2043 if error == _lib.SSL_ERROR_WANT_READ:
2044 raise WantReadError()
2045 elif error == _lib.SSL_ERROR_WANT_WRITE:
2046 raise WantWriteError()
2047 elif error == _lib.SSL_ERROR_ZERO_RETURN:
2048 raise ZeroReturnError()
2049 elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP:
2050 # TODO: This is untested.
2051 raise WantX509LookupError()
2052 elif error == _lib.SSL_ERROR_SYSCALL:
2053 if platform == "win32":
2054 errno = _ffi.getwinerror()[0]
2055 else:
2056 errno = _ffi.errno
2057 if _lib.ERR_peek_error() == 0 or errno != 0:
2058 if result < 0 and errno != 0:
2059 raise SysCallError(errno, errorcode.get(errno))
2060 raise SysCallError(-1, "Unexpected EOF")
2061 else:
2062 # TODO: This is untested, but I think twisted hits it?
2063 _raise_current_error()
2064 elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0:
2065 # In 3.0.x an unexpected EOF no longer triggers syscall error
2066 # but we want to maintain compatibility so we check here and
2067 # raise syscall if it is an EOF. Since we're not actually sure
2068 # what else could raise SSL_ERROR_SSL we check for the presence
2069 # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING
2070 # and if it's not present we just raise an error, which matches
2071 # the behavior before we added this elif section
2072 peeked_error = _lib.ERR_peek_error()
2073 reason = _lib.ERR_GET_REASON(peeked_error)
2074 if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING:
2075 _openssl_assert(
2076 reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING
2077 )
2078 _lib.ERR_clear_error()
2079 raise SysCallError(-1, "Unexpected EOF")
2080 else:
2081 _raise_current_error()
2082 elif error == _lib.SSL_ERROR_NONE:
2083 pass
2084 else:
2085 _raise_current_error()
2087 def get_context(self) -> Context:
2088 """
2089 Retrieve the :class:`Context` object associated with this
2090 :class:`Connection`.
2091 """
2092 return self._context
2094 def set_context(self, context: Context) -> None:
2095 """
2096 Switch this connection to a new session context.
2098 :param context: A :class:`Context` instance giving the new session
2099 context to use.
2100 """
2101 if not isinstance(context, Context):
2102 raise TypeError("context must be a Context instance")
2104 _lib.SSL_set_SSL_CTX(self._ssl, context._context)
2105 self._context = context
2106 self._context._used = True
2108 def get_servername(self) -> bytes | None:
2109 """
2110 Retrieve the servername extension value if provided in the client hello
2111 message, or None if there wasn't one.
2113 :return: A byte string giving the server name or :data:`None`.
2115 .. versionadded:: 0.13
2116 """
2117 name = _lib.SSL_get_servername(
2118 self._ssl, _lib.TLSEXT_NAMETYPE_host_name
2119 )
2120 if name == _ffi.NULL:
2121 return None
2123 return _ffi.string(name)
2125 def set_verify(
2126 self, mode: int, callback: _VerifyCallback | None = None
2127 ) -> None:
2128 """
2129 Override the Context object's verification flags for this specific
2130 connection. See :py:meth:`Context.set_verify` for details.
2131 """
2132 if not isinstance(mode, int):
2133 raise TypeError("mode must be an integer")
2135 if callback is None:
2136 self._verify_helper = None
2137 self._verify_callback = None
2138 _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL)
2139 else:
2140 if not callable(callback):
2141 raise TypeError("callback must be callable")
2143 self._verify_helper = _VerifyHelper(callback)
2144 self._verify_callback = self._verify_helper.callback
2145 _lib.SSL_set_verify(self._ssl, mode, self._verify_callback)
2147 def get_verify_mode(self) -> int:
2148 """
2149 Retrieve the Connection object's verify mode, as set by
2150 :meth:`set_verify`.
2152 :return: The verify mode
2153 """
2154 return _lib.SSL_get_verify_mode(self._ssl)
2156 def use_certificate(self, cert: X509 | x509.Certificate) -> None:
2157 """
2158 Load a certificate from a X509 object
2160 :param cert: The X509 object
2161 :return: None
2162 """
2163 # Mirrored from Context.use_certificate
2164 if not isinstance(cert, X509):
2165 cert = X509.from_cryptography(cert)
2166 else:
2167 warnings.warn(
2168 (
2169 "Passing pyOpenSSL X509 objects is deprecated. You "
2170 "should use a cryptography.x509.Certificate instead."
2171 ),
2172 DeprecationWarning,
2173 stacklevel=2,
2174 )
2176 use_result = _lib.SSL_use_certificate(self._ssl, cert._x509)
2177 if not use_result:
2178 _raise_current_error()
2180 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None:
2181 """
2182 Load a private key from a PKey object
2184 :param pkey: The PKey object
2185 :return: None
2186 """
2187 # Mirrored from Context.use_privatekey
2188 if not isinstance(pkey, PKey):
2189 pkey = PKey.from_cryptography_key(pkey)
2190 else:
2191 warnings.warn(
2192 (
2193 "Passing pyOpenSSL PKey objects is deprecated. You "
2194 "should use a cryptography private key instead."
2195 ),
2196 DeprecationWarning,
2197 stacklevel=2,
2198 )
2200 use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey)
2201 if not use_result:
2202 self._context._raise_passphrase_exception()
2204 def set_ciphertext_mtu(self, mtu: int) -> None:
2205 """
2206 For DTLS, set the maximum UDP payload size (*not* including IP/UDP
2207 overhead).
2209 Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent
2210 OpenSSL from spontaneously clearing this.
2212 :param mtu: An integer giving the maximum transmission unit.
2214 .. versionadded:: 21.1
2215 """
2216 _lib.SSL_set_mtu(self._ssl, mtu)
2218 def get_cleartext_mtu(self) -> int:
2219 """
2220 For DTLS, get the maximum size of unencrypted data you can pass to
2221 :meth:`write` without exceeding the MTU (as passed to
2222 :meth:`set_ciphertext_mtu`).
2224 :return: The effective MTU as an integer.
2226 .. versionadded:: 21.1
2227 """
2229 if not hasattr(_lib, "DTLS_get_data_mtu"):
2230 raise NotImplementedError("requires OpenSSL 1.1.1 or better")
2231 return _lib.DTLS_get_data_mtu(self._ssl)
2233 def set_tlsext_host_name(self, name: bytes) -> None:
2234 """
2235 Set the value of the servername extension to send in the client hello.
2237 :param name: A byte string giving the name.
2239 .. versionadded:: 0.13
2240 """
2241 if not isinstance(name, bytes):
2242 raise TypeError("name must be a byte string")
2243 elif b"\0" in name:
2244 raise TypeError("name must not contain NUL byte")
2246 # XXX I guess this can fail sometimes?
2247 _lib.SSL_set_tlsext_host_name(self._ssl, name)
2249 def pending(self) -> int:
2250 """
2251 Get the number of bytes that can be safely read from the SSL buffer
2252 (**not** the underlying transport buffer).
2254 :return: The number of bytes available in the receive buffer.
2255 """
2256 return _lib.SSL_pending(self._ssl)
2258 def send(self, buf: _Buffer, flags: int = 0) -> int:
2259 """
2260 Send data on the connection. NOTE: If you get one of the WantRead,
2261 WantWrite or WantX509Lookup exceptions on this, you have to call the
2262 method again with the SAME buffer.
2264 :param buf: The string, buffer or memoryview to send
2265 :param flags: (optional) Included for compatibility with the socket
2266 API, the value is ignored
2267 :return: The number of bytes written
2268 """
2269 # Backward compatibility
2270 buf = _text_to_bytes_and_warn("buf", buf)
2272 with _ffi.from_buffer(buf) as data:
2273 # check len(buf) instead of len(data) for testability
2274 if len(buf) > 2147483647:
2275 raise ValueError(
2276 "Cannot send more than 2**31-1 bytes at once."
2277 )
2279 result = _lib.SSL_write(self._ssl, data, len(data))
2280 self._raise_ssl_error(self._ssl, result)
2282 return result
2284 write = send
2286 def sendall(self, buf: _Buffer, flags: int = 0) -> int:
2287 """
2288 Send "all" data on the connection. This calls send() repeatedly until
2289 all data is sent. If an error occurs, it's impossible to tell how much
2290 data has been sent.
2292 :param buf: The string, buffer or memoryview to send
2293 :param flags: (optional) Included for compatibility with the socket
2294 API, the value is ignored
2295 :return: The number of bytes written
2296 """
2297 buf = _text_to_bytes_and_warn("buf", buf)
2299 with _ffi.from_buffer(buf) as data:
2300 left_to_send = len(buf)
2301 total_sent = 0
2303 while left_to_send:
2304 # SSL_write's num arg is an int,
2305 # so we cannot send more than 2**31-1 bytes at once.
2306 result = _lib.SSL_write(
2307 self._ssl, data + total_sent, min(left_to_send, 2147483647)
2308 )
2309 self._raise_ssl_error(self._ssl, result)
2310 total_sent += result
2311 left_to_send -= result
2313 return total_sent
2315 def recv(self, bufsiz: int, flags: int | None = None) -> bytes:
2316 """
2317 Receive data on the connection.
2319 :param bufsiz: The maximum number of bytes to read
2320 :param flags: (optional) The only supported flag is ``MSG_PEEK``,
2321 all other flags are ignored.
2322 :return: The string read from the Connection
2323 """
2324 buf = _no_zero_allocator("char[]", bufsiz)
2325 if flags is not None and flags & socket.MSG_PEEK:
2326 result = _lib.SSL_peek(self._ssl, buf, bufsiz)
2327 else:
2328 result = _lib.SSL_read(self._ssl, buf, bufsiz)
2329 self._raise_ssl_error(self._ssl, result)
2330 return _ffi.buffer(buf, result)[:]
2332 read = recv
2334 def recv_into(
2335 self,
2336 buffer: Any, # collections.abc.Buffer once we use Python 3.12+
2337 nbytes: int | None = None,
2338 flags: int | None = None,
2339 ) -> int:
2340 """
2341 Receive data on the connection and copy it directly into the provided
2342 buffer, rather than creating a new string.
2344 :param buffer: The buffer to copy into.
2345 :param nbytes: (optional) The maximum number of bytes to read into the
2346 buffer. If not present, defaults to the size of the buffer. If
2347 larger than the size of the buffer, is reduced to the size of the
2348 buffer.
2349 :param flags: (optional) The only supported flag is ``MSG_PEEK``,
2350 all other flags are ignored.
2351 :return: The number of bytes read into the buffer.
2352 """
2353 if nbytes is None:
2354 nbytes = len(buffer)
2355 else:
2356 nbytes = min(nbytes, len(buffer))
2358 # We need to create a temporary buffer. This is annoying, it would be
2359 # better if we could pass memoryviews straight into the SSL_read call,
2360 # but right now we can't. Revisit this if CFFI gets that ability.
2361 buf = _no_zero_allocator("char[]", nbytes)
2362 if flags is not None and flags & socket.MSG_PEEK:
2363 result = _lib.SSL_peek(self._ssl, buf, nbytes)
2364 else:
2365 result = _lib.SSL_read(self._ssl, buf, nbytes)
2366 self._raise_ssl_error(self._ssl, result)
2368 # This strange line is all to avoid a memory copy. The buffer protocol
2369 # should allow us to assign a CFFI buffer to the LHS of this line, but
2370 # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
2371 # wrap it in a memoryview.
2372 buffer[:result] = memoryview(_ffi.buffer(buf, result))
2374 return result
2376 def _handle_bio_errors(self, bio: Any, result: int) -> typing.NoReturn:
2377 if _lib.BIO_should_retry(bio):
2378 if _lib.BIO_should_read(bio):
2379 raise WantReadError()
2380 elif _lib.BIO_should_write(bio):
2381 # TODO: This is untested.
2382 raise WantWriteError()
2383 elif _lib.BIO_should_io_special(bio):
2384 # TODO: This is untested. I think io_special means the socket
2385 # BIO has a not-yet connected socket.
2386 raise ValueError("BIO_should_io_special")
2387 else:
2388 # TODO: This is untested.
2389 raise ValueError("unknown bio failure")
2390 else:
2391 # TODO: This is untested.
2392 _raise_current_error()
2394 def bio_read(self, bufsiz: int) -> bytes:
2395 """
2396 If the Connection was created with a memory BIO, this method can be
2397 used to read bytes from the write end of that memory BIO. Many
2398 Connection methods will add bytes which must be read in this manner or
2399 the buffer will eventually fill up and the Connection will be able to
2400 take no further actions.
2402 :param bufsiz: The maximum number of bytes to read
2403 :return: The string read.
2404 """
2405 if self._from_ssl is None:
2406 raise TypeError("Connection sock was not None")
2408 if not isinstance(bufsiz, int):
2409 raise TypeError("bufsiz must be an integer")
2411 buf = _no_zero_allocator("char[]", bufsiz)
2412 result = _lib.BIO_read(self._from_ssl, buf, bufsiz)
2413 if result <= 0:
2414 self._handle_bio_errors(self._from_ssl, result)
2416 return _ffi.buffer(buf, result)[:]
2418 def bio_write(self, buf: _Buffer) -> int:
2419 """
2420 If the Connection was created with a memory BIO, this method can be
2421 used to add bytes to the read end of that memory BIO. The Connection
2422 can then read the bytes (for example, in response to a call to
2423 :meth:`recv`).
2425 :param buf: The string to put into the memory BIO.
2426 :return: The number of bytes written
2427 """
2428 buf = _text_to_bytes_and_warn("buf", buf)
2430 if self._into_ssl is None:
2431 raise TypeError("Connection sock was not None")
2433 with _ffi.from_buffer(buf) as data:
2434 result = _lib.BIO_write(self._into_ssl, data, len(data))
2435 if result <= 0:
2436 self._handle_bio_errors(self._into_ssl, result)
2437 return result
2439 def renegotiate(self) -> bool:
2440 """
2441 Renegotiate the session.
2443 :return: True if the renegotiation can be started, False otherwise
2444 """
2445 if not self.renegotiate_pending():
2446 _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1)
2447 return True
2448 return False
2450 def do_handshake(self) -> None:
2451 """
2452 Perform an SSL handshake (usually called after :meth:`renegotiate` or
2453 one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can
2454 raise the same exceptions as :meth:`send` and :meth:`recv`.
2456 :return: None.
2457 """
2458 result = _lib.SSL_do_handshake(self._ssl)
2459 self._raise_ssl_error(self._ssl, result)
2461 def renegotiate_pending(self) -> bool:
2462 """
2463 Check if there's a renegotiation in progress, it will return False once
2464 a renegotiation is finished.
2466 :return: Whether there's a renegotiation in progress
2467 """
2468 return _lib.SSL_renegotiate_pending(self._ssl) == 1
2470 def total_renegotiations(self) -> int:
2471 """
2472 Find out the total number of renegotiations.
2474 :return: The number of renegotiations.
2475 """
2476 return _lib.SSL_total_renegotiations(self._ssl)
2478 def connect(self, addr: Any) -> None:
2479 """
2480 Call the :meth:`connect` method of the underlying socket and set up SSL
2481 on the socket, using the :class:`Context` object supplied to this
2482 :class:`Connection` object at creation.
2484 :param addr: A remote address
2485 :return: What the socket's connect method returns
2486 """
2487 _lib.SSL_set_connect_state(self._ssl)
2488 return self._socket.connect(addr) # type: ignore[return-value, union-attr]
2490 def connect_ex(self, addr: Any) -> int:
2491 """
2492 Call the :meth:`connect_ex` method of the underlying socket and set up
2493 SSL on the socket, using the Context object supplied to this Connection
2494 object at creation. Note that if the :meth:`connect_ex` method of the
2495 socket doesn't return 0, SSL won't be initialized.
2497 :param addr: A remove address
2498 :return: What the socket's connect_ex method returns
2499 """
2500 connect_ex = self._socket.connect_ex # type: ignore[union-attr]
2501 self.set_connect_state()
2502 return connect_ex(addr)
2504 def accept(self) -> tuple[Connection, Any]:
2505 """
2506 Call the :meth:`accept` method of the underlying socket and set up SSL
2507 on the returned socket, using the Context object supplied to this
2508 :class:`Connection` object at creation.
2510 :return: A *(conn, addr)* pair where *conn* is the new
2511 :class:`Connection` object created, and *address* is as returned by
2512 the socket's :meth:`accept`.
2513 """
2514 client, addr = self._socket.accept() # type: ignore[union-attr]
2515 conn = Connection(self._context, client)
2516 conn.set_accept_state()
2517 return (conn, addr)
2519 def DTLSv1_listen(self) -> None:
2520 """
2521 Call the OpenSSL function DTLSv1_listen on this connection. See the
2522 OpenSSL manual for more details.
2524 :return: None
2525 """
2526 # Possible future extension: return the BIO_ADDR in some form.
2527 bio_addr = _lib.BIO_ADDR_new()
2528 try:
2529 result = _lib.DTLSv1_listen(self._ssl, bio_addr)
2530 finally:
2531 _lib.BIO_ADDR_free(bio_addr)
2532 # DTLSv1_listen is weird. A zero return value means 'didn't find a
2533 # ClientHello with valid cookie, but keep trying'. So basically
2534 # WantReadError. But it doesn't work correctly with _raise_ssl_error.
2535 # So we raise it manually instead.
2536 if self._cookie_generate_helper is not None:
2537 self._cookie_generate_helper.raise_if_problem()
2538 if self._cookie_verify_helper is not None:
2539 self._cookie_verify_helper.raise_if_problem()
2540 if result == 0:
2541 raise WantReadError()
2542 if result < 0:
2543 self._raise_ssl_error(self._ssl, result)
2545 def DTLSv1_get_timeout(self) -> int | None:
2546 """
2547 Determine when the DTLS SSL object next needs to perform internal
2548 processing due to the passage of time.
2550 When the returned number of seconds have passed, the
2551 :meth:`DTLSv1_handle_timeout` method needs to be called.
2553 :return: The time left in seconds before the next timeout or `None`
2554 if no timeout is currently active.
2555 """
2556 ptv_sec = _ffi.new("time_t *")
2557 ptv_usec = _ffi.new("long *")
2558 if _lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec):
2559 return ptv_sec[0] + (ptv_usec[0] / 1000000)
2560 else:
2561 return None
2563 def DTLSv1_handle_timeout(self) -> bool:
2564 """
2565 Handles any timeout events which have become pending on a DTLS SSL
2566 object.
2568 :return: `True` if there was a pending timeout, `False` otherwise.
2569 """
2570 result = _lib.DTLSv1_handle_timeout(self._ssl)
2571 if result < 0:
2572 self._raise_ssl_error(self._ssl, result)
2573 assert False, "unreachable"
2574 else:
2575 return bool(result)
2577 def bio_shutdown(self) -> None:
2578 """
2579 If the Connection was created with a memory BIO, this method can be
2580 used to indicate that *end of file* has been reached on the read end of
2581 that memory BIO.
2583 :return: None
2584 """
2585 if self._from_ssl is None:
2586 raise TypeError("Connection sock was not None")
2588 _lib.BIO_set_mem_eof_return(self._into_ssl, 0)
2590 def shutdown(self) -> bool:
2591 """
2592 Send the shutdown message to the Connection.
2594 :return: True if the shutdown completed successfully (i.e. both sides
2595 have sent closure alerts), False otherwise (in which case you
2596 call :meth:`recv` or :meth:`send` when the connection becomes
2597 readable/writeable).
2598 """
2599 result = _lib.SSL_shutdown(self._ssl)
2600 if result < 0:
2601 self._raise_ssl_error(self._ssl, result)
2602 assert False, "unreachable"
2603 elif result > 0:
2604 return True
2605 else:
2606 return False
2608 def get_cipher_list(self) -> list[str]:
2609 """
2610 Retrieve the list of ciphers used by the Connection object.
2612 :return: A list of native cipher strings.
2613 """
2614 ciphers = []
2615 for i in count():
2616 result = _lib.SSL_get_cipher_list(self._ssl, i)
2617 if result == _ffi.NULL:
2618 break
2619 ciphers.append(_ffi.string(result).decode("utf-8"))
2620 return ciphers
2622 def get_client_ca_list(self) -> list[X509Name]:
2623 """
2624 Get CAs whose certificates are suggested for client authentication.
2626 :return: If this is a server connection, the list of certificate
2627 authorities that will be sent or has been sent to the client, as
2628 controlled by this :class:`Connection`'s :class:`Context`.
2630 If this is a client connection, the list will be empty until the
2631 connection with the server is established.
2633 .. versionadded:: 0.10
2634 """
2635 ca_names = _lib.SSL_get_client_CA_list(self._ssl)
2636 if ca_names == _ffi.NULL:
2637 # TODO: This is untested.
2638 return []
2640 result = []
2641 for i in range(_lib.sk_X509_NAME_num(ca_names)):
2642 name = _lib.sk_X509_NAME_value(ca_names, i)
2643 copy = _lib.X509_NAME_dup(name)
2644 _openssl_assert(copy != _ffi.NULL)
2646 pyname = X509Name.__new__(X509Name)
2647 pyname._name = _ffi.gc(copy, _lib.X509_NAME_free)
2648 result.append(pyname)
2649 return result
2651 def makefile(self, *args: Any, **kwargs: Any) -> typing.NoReturn:
2652 """
2653 The makefile() method is not implemented, since there is no dup
2654 semantics for SSL connections
2656 :raise: NotImplementedError
2657 """
2658 raise NotImplementedError(
2659 "Cannot make file object of OpenSSL.SSL.Connection"
2660 )
2662 def get_app_data(self) -> Any:
2663 """
2664 Retrieve application data as set by :meth:`set_app_data`.
2666 :return: The application data
2667 """
2668 return self._app_data
2670 def set_app_data(self, data: Any) -> None:
2671 """
2672 Set application data
2674 :param data: The application data
2675 :return: None
2676 """
2677 self._app_data = data
2679 def get_shutdown(self) -> int:
2680 """
2681 Get the shutdown state of the Connection.
2683 :return: The shutdown state, a bitvector of SENT_SHUTDOWN,
2684 RECEIVED_SHUTDOWN.
2685 """
2686 return _lib.SSL_get_shutdown(self._ssl)
2688 def set_shutdown(self, state: int) -> None:
2689 """
2690 Set the shutdown state of the Connection.
2692 :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN.
2693 :return: None
2694 """
2695 if not isinstance(state, int):
2696 raise TypeError("state must be an integer")
2698 _lib.SSL_set_shutdown(self._ssl, state)
2700 def get_state_string(self) -> bytes:
2701 """
2702 Retrieve a verbose string detailing the state of the Connection.
2704 :return: A string representing the state
2705 """
2706 return _ffi.string(_lib.SSL_state_string_long(self._ssl))
2708 def server_random(self) -> bytes | None:
2709 """
2710 Retrieve the random value used with the server hello message.
2712 :return: A string representing the state
2713 """
2714 session = _lib.SSL_get_session(self._ssl)
2715 if session == _ffi.NULL:
2716 return None
2717 length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0)
2718 _openssl_assert(length > 0)
2719 outp = _no_zero_allocator("unsigned char[]", length)
2720 _lib.SSL_get_server_random(self._ssl, outp, length)
2721 return _ffi.buffer(outp, length)[:]
2723 def client_random(self) -> bytes | None:
2724 """
2725 Retrieve the random value used with the client hello message.
2727 :return: A string representing the state
2728 """
2729 session = _lib.SSL_get_session(self._ssl)
2730 if session == _ffi.NULL:
2731 return None
2733 length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0)
2734 _openssl_assert(length > 0)
2735 outp = _no_zero_allocator("unsigned char[]", length)
2736 _lib.SSL_get_client_random(self._ssl, outp, length)
2737 return _ffi.buffer(outp, length)[:]
2739 def master_key(self) -> bytes | None:
2740 """
2741 Retrieve the value of the master key for this session.
2743 :return: A string representing the state
2744 """
2745 session = _lib.SSL_get_session(self._ssl)
2746 if session == _ffi.NULL:
2747 return None
2749 length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0)
2750 _openssl_assert(length > 0)
2751 outp = _no_zero_allocator("unsigned char[]", length)
2752 _lib.SSL_SESSION_get_master_key(session, outp, length)
2753 return _ffi.buffer(outp, length)[:]
2755 def export_keying_material(
2756 self, label: bytes, olen: int, context: bytes | None = None
2757 ) -> bytes:
2758 """
2759 Obtain keying material for application use.
2761 :param: label - a disambiguating label string as described in RFC 5705
2762 :param: olen - the length of the exported key material in bytes
2763 :param: context - a per-association context value
2764 :return: the exported key material bytes or None
2765 """
2766 outp = _no_zero_allocator("unsigned char[]", olen)
2767 context_buf = _ffi.NULL
2768 context_len = 0
2769 use_context = 0
2770 if context is not None:
2771 context_buf = context
2772 context_len = len(context)
2773 use_context = 1
2774 success = _lib.SSL_export_keying_material(
2775 self._ssl,
2776 outp,
2777 olen,
2778 label,
2779 len(label),
2780 context_buf,
2781 context_len,
2782 use_context,
2783 )
2784 _openssl_assert(success == 1)
2785 return _ffi.buffer(outp, olen)[:]
2787 def sock_shutdown(self, *args: Any, **kwargs: Any) -> None:
2788 """
2789 Call the :meth:`shutdown` method of the underlying socket.
2790 See :manpage:`shutdown(2)`.
2792 :return: What the socket's shutdown() method returns
2793 """
2794 return self._socket.shutdown(*args, **kwargs) # type: ignore[return-value, union-attr]
2796 @typing.overload
2797 def get_certificate(
2798 self, *, as_cryptography: typing.Literal[True]
2799 ) -> x509.Certificate | None:
2800 pass
2802 @typing.overload
2803 def get_certificate(
2804 self, *, as_cryptography: typing.Literal[False] = False
2805 ) -> X509 | None:
2806 pass
2808 def get_certificate(
2809 self,
2810 *,
2811 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2812 ) -> X509 | x509.Certificate | None:
2813 """
2814 Retrieve the local certificate (if any)
2816 :param bool as_cryptography: Controls whether a
2817 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509``
2818 object should be returned.
2820 :return: The local certificate
2821 """
2822 cert = _lib.SSL_get_certificate(self._ssl)
2823 if cert != _ffi.NULL:
2824 _lib.X509_up_ref(cert)
2825 pycert = X509._from_raw_x509_ptr(cert)
2826 if as_cryptography:
2827 return pycert.to_cryptography()
2828 return pycert
2829 return None
2831 @typing.overload
2832 def get_peer_certificate(
2833 self, *, as_cryptography: typing.Literal[True]
2834 ) -> x509.Certificate | None:
2835 pass
2837 @typing.overload
2838 def get_peer_certificate(
2839 self, *, as_cryptography: typing.Literal[False] = False
2840 ) -> X509 | None:
2841 pass
2843 def get_peer_certificate(
2844 self,
2845 *,
2846 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2847 ) -> X509 | x509.Certificate | None:
2848 """
2849 Retrieve the other side's certificate (if any)
2851 :param bool as_cryptography: Controls whether a
2852 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509``
2853 object should be returned.
2855 :return: The peer's certificate
2856 """
2857 cert = _lib.SSL_get_peer_certificate(self._ssl)
2858 if cert != _ffi.NULL:
2859 pycert = X509._from_raw_x509_ptr(cert)
2860 if as_cryptography:
2861 return pycert.to_cryptography()
2862 return pycert
2863 return None
2865 @staticmethod
2866 def _cert_stack_to_list(cert_stack: Any) -> list[X509]:
2867 """
2868 Internal helper to convert a STACK_OF(X509) to a list of X509
2869 instances.
2870 """
2871 result = []
2872 for i in range(_lib.sk_X509_num(cert_stack)):
2873 cert = _lib.sk_X509_value(cert_stack, i)
2874 _openssl_assert(cert != _ffi.NULL)
2875 res = _lib.X509_up_ref(cert)
2876 _openssl_assert(res >= 1)
2877 pycert = X509._from_raw_x509_ptr(cert)
2878 result.append(pycert)
2879 return result
2881 @staticmethod
2882 def _cert_stack_to_cryptography_list(
2883 cert_stack: Any,
2884 ) -> list[x509.Certificate]:
2885 """
2886 Internal helper to convert a STACK_OF(X509) to a list of X509
2887 instances.
2888 """
2889 result = []
2890 for i in range(_lib.sk_X509_num(cert_stack)):
2891 cert = _lib.sk_X509_value(cert_stack, i)
2892 _openssl_assert(cert != _ffi.NULL)
2893 res = _lib.X509_up_ref(cert)
2894 _openssl_assert(res >= 1)
2895 pycert = X509._from_raw_x509_ptr(cert)
2896 result.append(pycert.to_cryptography())
2897 return result
2899 @typing.overload
2900 def get_peer_cert_chain(
2901 self, *, as_cryptography: typing.Literal[True]
2902 ) -> list[x509.Certificate] | None:
2903 pass
2905 @typing.overload
2906 def get_peer_cert_chain(
2907 self, *, as_cryptography: typing.Literal[False] = False
2908 ) -> list[X509] | None:
2909 pass
2911 def get_peer_cert_chain(
2912 self,
2913 *,
2914 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2915 ) -> list[X509] | list[x509.Certificate] | None:
2916 """
2917 Retrieve the other side's certificate (if any)
2919 :param bool as_cryptography: Controls whether a list of
2920 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509``
2921 object should be returned.
2923 :return: A list of X509 instances giving the peer's certificate chain,
2924 or None if it does not have one.
2925 """
2926 cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl)
2927 if cert_stack == _ffi.NULL:
2928 return None
2930 if as_cryptography:
2931 return self._cert_stack_to_cryptography_list(cert_stack)
2932 return self._cert_stack_to_list(cert_stack)
2934 @typing.overload
2935 def get_verified_chain(
2936 self, *, as_cryptography: typing.Literal[True]
2937 ) -> list[x509.Certificate] | None:
2938 pass
2940 @typing.overload
2941 def get_verified_chain(
2942 self, *, as_cryptography: typing.Literal[False] = False
2943 ) -> list[X509] | None:
2944 pass
2946 def get_verified_chain(
2947 self,
2948 *,
2949 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2950 ) -> list[X509] | list[x509.Certificate] | None:
2951 """
2952 Retrieve the verified certificate chain of the peer including the
2953 peer's end entity certificate. It must be called after a session has
2954 been successfully established. If peer verification was not successful
2955 the chain may be incomplete, invalid, or None.
2957 :param bool as_cryptography: Controls whether a list of
2958 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509``
2959 object should be returned.
2961 :return: A list of X509 instances giving the peer's verified
2962 certificate chain, or None if it does not have one.
2964 .. versionadded:: 20.0
2965 """
2966 # OpenSSL 1.1+
2967 cert_stack = _lib.SSL_get0_verified_chain(self._ssl)
2968 if cert_stack == _ffi.NULL:
2969 return None
2971 if as_cryptography:
2972 return self._cert_stack_to_cryptography_list(cert_stack)
2973 return self._cert_stack_to_list(cert_stack)
2975 def want_read(self) -> bool:
2976 """
2977 Checks if more data has to be read from the transport layer to complete
2978 an operation.
2980 :return: True iff more data has to be read
2981 """
2982 return _lib.SSL_want_read(self._ssl)
2984 def want_write(self) -> bool:
2985 """
2986 Checks if there is data to write to the transport layer to complete an
2987 operation.
2989 :return: True iff there is data to write
2990 """
2991 return _lib.SSL_want_write(self._ssl)
2993 def set_accept_state(self) -> None:
2994 """
2995 Set the connection to work in server mode. The handshake will be
2996 handled automatically by read/write.
2998 :return: None
2999 """
3000 _lib.SSL_set_accept_state(self._ssl)
3002 def set_connect_state(self) -> None:
3003 """
3004 Set the connection to work in client mode. The handshake will be
3005 handled automatically by read/write.
3007 :return: None
3008 """
3009 _lib.SSL_set_connect_state(self._ssl)
3011 def get_session(self) -> Session | None:
3012 """
3013 Returns the Session currently used.
3015 :return: An instance of :class:`OpenSSL.SSL.Session` or
3016 :obj:`None` if no session exists.
3018 .. versionadded:: 0.14
3019 """
3020 session = _lib.SSL_get1_session(self._ssl)
3021 if session == _ffi.NULL:
3022 return None
3024 pysession = Session.__new__(Session)
3025 pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free)
3026 return pysession
3028 def set_session(self, session: Session) -> None:
3029 """
3030 Set the session to be used when the TLS/SSL connection is established.
3032 :param session: A Session instance representing the session to use.
3033 :returns: None
3035 .. versionadded:: 0.14
3036 """
3037 if not isinstance(session, Session):
3038 raise TypeError("session must be a Session instance")
3040 result = _lib.SSL_set_session(self._ssl, session._session)
3041 _openssl_assert(result == 1)
3043 def _get_finished_message(
3044 self, function: Callable[[Any, Any, int], int]
3045 ) -> bytes | None:
3046 """
3047 Helper to implement :meth:`get_finished` and
3048 :meth:`get_peer_finished`.
3050 :param function: Either :data:`SSL_get_finished`: or
3051 :data:`SSL_get_peer_finished`.
3053 :return: :data:`None` if the desired message has not yet been
3054 received, otherwise the contents of the message.
3055 """
3056 # The OpenSSL documentation says nothing about what might happen if the
3057 # count argument given is zero. Specifically, it doesn't say whether
3058 # the output buffer may be NULL in that case or not. Inspection of the
3059 # implementation reveals that it calls memcpy() unconditionally.
3060 # Section 7.1.4, paragraph 1 of the C standard suggests that
3061 # memcpy(NULL, source, 0) is not guaranteed to produce defined (let
3062 # alone desirable) behavior (though it probably does on just about
3063 # every implementation...)
3064 #
3065 # Allocate a tiny buffer to pass in (instead of just passing NULL as
3066 # one might expect) for the initial call so as to be safe against this
3067 # potentially undefined behavior.
3068 empty = _ffi.new("char[]", 0)
3069 size = function(self._ssl, empty, 0)
3070 if size == 0:
3071 # No Finished message so far.
3072 return None
3074 buf = _no_zero_allocator("char[]", size)
3075 function(self._ssl, buf, size)
3076 return _ffi.buffer(buf, size)[:]
3078 def get_finished(self) -> bytes | None:
3079 """
3080 Obtain the latest TLS Finished message that we sent.
3082 :return: The contents of the message or :obj:`None` if the TLS
3083 handshake has not yet completed.
3085 .. versionadded:: 0.15
3086 """
3087 return self._get_finished_message(_lib.SSL_get_finished)
3089 def get_peer_finished(self) -> bytes | None:
3090 """
3091 Obtain the latest TLS Finished message that we received from the peer.
3093 :return: The contents of the message or :obj:`None` if the TLS
3094 handshake has not yet completed.
3096 .. versionadded:: 0.15
3097 """
3098 return self._get_finished_message(_lib.SSL_get_peer_finished)
3100 def get_cipher_name(self) -> str | None:
3101 """
3102 Obtain the name of the currently used cipher.
3104 :returns: The name of the currently used cipher or :obj:`None`
3105 if no connection has been established.
3107 .. versionadded:: 0.15
3108 """
3109 cipher = _lib.SSL_get_current_cipher(self._ssl)
3110 if cipher == _ffi.NULL:
3111 return None
3112 else:
3113 name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher))
3114 return name.decode("utf-8")
3116 def get_cipher_bits(self) -> int | None:
3117 """
3118 Obtain the number of secret bits of the currently used cipher.
3120 :returns: The number of secret bits of the currently used cipher
3121 or :obj:`None` if no connection has been established.
3123 .. versionadded:: 0.15
3124 """
3125 cipher = _lib.SSL_get_current_cipher(self._ssl)
3126 if cipher == _ffi.NULL:
3127 return None
3128 else:
3129 return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL)
3131 def get_cipher_version(self) -> str | None:
3132 """
3133 Obtain the protocol version of the currently used cipher.
3135 :returns: The protocol name of the currently used cipher
3136 or :obj:`None` if no connection has been established.
3138 .. versionadded:: 0.15
3139 """
3140 cipher = _lib.SSL_get_current_cipher(self._ssl)
3141 if cipher == _ffi.NULL:
3142 return None
3143 else:
3144 version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher))
3145 return version.decode("utf-8")
3147 def get_protocol_version_name(self) -> str:
3148 """
3149 Retrieve the protocol version of the current connection.
3151 :returns: The TLS version of the current connection, for example
3152 the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown``
3153 for connections that were not successfully established.
3154 """
3155 version = _ffi.string(_lib.SSL_get_version(self._ssl))
3156 return version.decode("utf-8")
3158 def get_protocol_version(self) -> int:
3159 """
3160 Retrieve the SSL or TLS protocol version of the current connection.
3162 :returns: The TLS version of the current connection. For example,
3163 it will return ``0x769`` for connections made over TLS version 1.
3164 """
3165 version = _lib.SSL_version(self._ssl)
3166 return version
3168 def set_alpn_protos(self, protos: list[bytes]) -> None:
3169 """
3170 Specify the client's ALPN protocol list.
3172 These protocols are offered to the server during protocol negotiation.
3174 :param protos: A list of the protocols to be offered to the server.
3175 This list should be a Python list of bytestrings representing the
3176 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
3177 """
3178 # Different versions of OpenSSL are inconsistent about how they handle
3179 # empty proto lists (see #1043), so we avoid the problem entirely by
3180 # rejecting them ourselves.
3181 if not protos:
3182 raise ValueError("at least one protocol must be specified")
3184 # Take the list of protocols and join them together, prefixing them
3185 # with their lengths.
3186 protostr = b"".join(
3187 chain.from_iterable((bytes((len(p),)), p) for p in protos)
3188 )
3190 # Build a C string from the list. We don't need to save this off
3191 # because OpenSSL immediately copies the data out.
3192 input_str = _ffi.new("unsigned char[]", protostr)
3194 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
3195 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
3196 # return 0 on success, and non-0 on failure.
3197 # WARNING: these functions reverse the return value convention.
3198 _openssl_assert(
3199 _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0
3200 )
3202 def get_alpn_proto_negotiated(self) -> bytes:
3203 """
3204 Get the protocol that was negotiated by ALPN.
3206 :returns: A bytestring of the protocol name. If no protocol has been
3207 negotiated yet, returns an empty bytestring.
3208 """
3209 data = _ffi.new("unsigned char **")
3210 data_len = _ffi.new("unsigned int *")
3212 _lib.SSL_get0_alpn_selected(self._ssl, data, data_len)
3214 if not data_len:
3215 return b""
3217 return _ffi.buffer(data[0], data_len[0])[:]
3219 def get_selected_srtp_profile(self) -> bytes:
3220 """
3221 Get the SRTP protocol which was negotiated.
3223 :returns: A bytestring of the SRTP profile name. If no profile has been
3224 negotiated yet, returns an empty bytestring.
3225 """
3226 profile = _lib.SSL_get_selected_srtp_profile(self._ssl)
3227 if not profile:
3228 return b""
3230 return _ffi.string(profile.name)
3232 @_requires_ssl_get0_group_name
3233 def get_group_name(self) -> str | None:
3234 """
3235 Get the name of the negotiated group for the key exchange.
3237 :return: A string giving the group name or :data:`None`.
3238 """
3239 # Do not remove this guard.
3240 # SSL_get0_group_name crashes with a segfault if called without
3241 # an established connection (should return NULL but doesn't).
3242 session = _lib.SSL_get_session(self._ssl)
3243 if session == _ffi.NULL:
3244 return None
3246 group_name = _lib.SSL_get0_group_name(self._ssl)
3247 if group_name == _ffi.NULL:
3248 return None
3250 return _ffi.string(group_name).decode("utf-8")
3252 def request_ocsp(self) -> None:
3253 """
3254 Called to request that the server sends stapled OCSP data, if
3255 available. If this is not called on the client side then the server
3256 will not send OCSP data. Should be used in conjunction with
3257 :meth:`Context.set_ocsp_client_callback`.
3258 """
3259 rc = _lib.SSL_set_tlsext_status_type(
3260 self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp
3261 )
3262 _openssl_assert(rc == 1)
3264 def set_info_callback(
3265 self, callback: Callable[[Connection, int, int], None]
3266 ) -> None:
3267 """
3268 Set the information callback to *callback*. This function will be
3269 called from time to time during SSL handshakes.
3271 :param callback: The Python callback to use. This should take three
3272 arguments: a Connection object and two integers. The first integer
3273 specifies where in the SSL handshake the function was called, and
3274 the other the return code from a (possibly failed) internal
3275 function call.
3276 :return: None
3277 """
3279 @wraps(callback)
3280 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def]
3281 callback(Connection._reverse_mapping[ssl], where, return_code)
3283 self._info_callback = _ffi.callback(
3284 "void (*)(const SSL *, int, int)", wrapper
3285 )
3286 _lib.SSL_set_info_callback(self._ssl, self._info_callback)