Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/SSL.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import socket
5import sys
6import typing
7import warnings
8from collections.abc import Sequence
9from errno import errorcode
10from functools import partial, wraps
11from itertools import chain, count
12from sys import platform
13from typing import Any, Callable, Optional, TypeVar
14from weakref import WeakValueDictionary
16from cryptography import x509
17from cryptography.hazmat.primitives.asymmetric import ec
19from OpenSSL._util import (
20 StrOrBytesPath as _StrOrBytesPath,
21)
22from OpenSSL._util import (
23 exception_from_error_queue as _exception_from_error_queue,
24)
25from OpenSSL._util import (
26 ffi as _ffi,
27)
28from OpenSSL._util import (
29 lib as _lib,
30)
31from OpenSSL._util import (
32 make_assert as _make_assert,
33)
34from OpenSSL._util import (
35 no_zero_allocator as _no_zero_allocator,
36)
37from OpenSSL._util import (
38 path_bytes as _path_bytes,
39)
40from OpenSSL._util import (
41 text_to_bytes_and_warn as _text_to_bytes_and_warn,
42)
43from OpenSSL.crypto import (
44 FILETYPE_PEM,
45 X509,
46 PKey,
47 X509Name,
48 X509Store,
49 _EllipticCurve,
50 _PassphraseHelper,
51 _PrivateKey,
52)
54__all__ = [
55 "DTLS_CLIENT_METHOD",
56 "DTLS_METHOD",
57 "DTLS_SERVER_METHOD",
58 "MODE_RELEASE_BUFFERS",
59 "NO_OVERLAPPING_PROTOCOLS",
60 "OPENSSL_BUILT_ON",
61 "OPENSSL_CFLAGS",
62 "OPENSSL_DIR",
63 "OPENSSL_PLATFORM",
64 "OPENSSL_VERSION",
65 "OPENSSL_VERSION_NUMBER",
66 "OP_ALL",
67 "OP_CIPHER_SERVER_PREFERENCE",
68 "OP_DONT_INSERT_EMPTY_FRAGMENTS",
69 "OP_EPHEMERAL_RSA",
70 "OP_MICROSOFT_BIG_SSLV3_BUFFER",
71 "OP_MICROSOFT_SESS_ID_BUG",
72 "OP_MSIE_SSLV2_RSA_PADDING",
73 "OP_NETSCAPE_CA_DN_BUG",
74 "OP_NETSCAPE_CHALLENGE_BUG",
75 "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG",
76 "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG",
77 "OP_NO_COMPRESSION",
78 "OP_NO_QUERY_MTU",
79 "OP_NO_TICKET",
80 "OP_PKCS1_CHECK_1",
81 "OP_PKCS1_CHECK_2",
82 "OP_SINGLE_DH_USE",
83 "OP_SINGLE_ECDH_USE",
84 "OP_SSLEAY_080_CLIENT_DH_BUG",
85 "OP_SSLREF2_REUSE_CERT_TYPE_BUG",
86 "OP_TLS_BLOCK_PADDING_BUG",
87 "OP_TLS_D5_BUG",
88 "OP_TLS_ROLLBACK_BUG",
89 "RECEIVED_SHUTDOWN",
90 "SENT_SHUTDOWN",
91 "SESS_CACHE_BOTH",
92 "SESS_CACHE_CLIENT",
93 "SESS_CACHE_NO_AUTO_CLEAR",
94 "SESS_CACHE_NO_INTERNAL",
95 "SESS_CACHE_NO_INTERNAL_LOOKUP",
96 "SESS_CACHE_NO_INTERNAL_STORE",
97 "SESS_CACHE_OFF",
98 "SESS_CACHE_SERVER",
99 "SSL3_VERSION",
100 "SSLEAY_BUILT_ON",
101 "SSLEAY_CFLAGS",
102 "SSLEAY_DIR",
103 "SSLEAY_PLATFORM",
104 "SSLEAY_VERSION",
105 "SSL_CB_ACCEPT_EXIT",
106 "SSL_CB_ACCEPT_LOOP",
107 "SSL_CB_ALERT",
108 "SSL_CB_CONNECT_EXIT",
109 "SSL_CB_CONNECT_LOOP",
110 "SSL_CB_EXIT",
111 "SSL_CB_HANDSHAKE_DONE",
112 "SSL_CB_HANDSHAKE_START",
113 "SSL_CB_LOOP",
114 "SSL_CB_READ",
115 "SSL_CB_READ_ALERT",
116 "SSL_CB_WRITE",
117 "SSL_CB_WRITE_ALERT",
118 "SSL_ST_ACCEPT",
119 "SSL_ST_CONNECT",
120 "SSL_ST_MASK",
121 "TLS1_1_VERSION",
122 "TLS1_2_VERSION",
123 "TLS1_3_VERSION",
124 "TLS1_VERSION",
125 "TLS_CLIENT_METHOD",
126 "TLS_METHOD",
127 "TLS_SERVER_METHOD",
128 "VERIFY_CLIENT_ONCE",
129 "VERIFY_FAIL_IF_NO_PEER_CERT",
130 "VERIFY_NONE",
131 "VERIFY_PEER",
132 "Connection",
133 "Context",
134 "Error",
135 "OP_NO_SSLv2",
136 "OP_NO_SSLv3",
137 "OP_NO_TLSv1",
138 "OP_NO_TLSv1_1",
139 "OP_NO_TLSv1_2",
140 "OP_NO_TLSv1_3",
141 "SSLeay_version",
142 "SSLv23_METHOD",
143 "Session",
144 "SysCallError",
145 "TLSv1_1_METHOD",
146 "TLSv1_2_METHOD",
147 "TLSv1_METHOD",
148 "WantReadError",
149 "WantWriteError",
150 "WantX509LookupError",
151 "X509VerificationCodes",
152 "ZeroReturnError",
153]
156OPENSSL_VERSION_NUMBER: int = _lib.OPENSSL_VERSION_NUMBER
157OPENSSL_VERSION: int = _lib.OPENSSL_VERSION
158OPENSSL_CFLAGS: int = _lib.OPENSSL_CFLAGS
159OPENSSL_PLATFORM: int = _lib.OPENSSL_PLATFORM
160OPENSSL_DIR: int = _lib.OPENSSL_DIR
161OPENSSL_BUILT_ON: int = _lib.OPENSSL_BUILT_ON
163SSLEAY_VERSION = OPENSSL_VERSION
164SSLEAY_CFLAGS = OPENSSL_CFLAGS
165SSLEAY_PLATFORM = OPENSSL_PLATFORM
166SSLEAY_DIR = OPENSSL_DIR
167SSLEAY_BUILT_ON = OPENSSL_BUILT_ON
169SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN
170RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN
172SSLv23_METHOD = 3
173TLSv1_METHOD = 4
174TLSv1_1_METHOD = 5
175TLSv1_2_METHOD = 6
176TLS_METHOD = 7
177TLS_SERVER_METHOD = 8
178TLS_CLIENT_METHOD = 9
179DTLS_METHOD = 10
180DTLS_SERVER_METHOD = 11
181DTLS_CLIENT_METHOD = 12
183SSL3_VERSION: int = _lib.SSL3_VERSION
184TLS1_VERSION: int = _lib.TLS1_VERSION
185TLS1_1_VERSION: int = _lib.TLS1_1_VERSION
186TLS1_2_VERSION: int = _lib.TLS1_2_VERSION
187TLS1_3_VERSION: int = _lib.TLS1_3_VERSION
189OP_NO_SSLv2: int = _lib.SSL_OP_NO_SSLv2
190OP_NO_SSLv3: int = _lib.SSL_OP_NO_SSLv3
191OP_NO_TLSv1: int = _lib.SSL_OP_NO_TLSv1
192OP_NO_TLSv1_1: int = _lib.SSL_OP_NO_TLSv1_1
193OP_NO_TLSv1_2: int = _lib.SSL_OP_NO_TLSv1_2
194OP_NO_TLSv1_3: int = _lib.SSL_OP_NO_TLSv1_3
196MODE_RELEASE_BUFFERS: int = _lib.SSL_MODE_RELEASE_BUFFERS
198OP_SINGLE_DH_USE: int = _lib.SSL_OP_SINGLE_DH_USE
199OP_SINGLE_ECDH_USE: int = _lib.SSL_OP_SINGLE_ECDH_USE
200OP_EPHEMERAL_RSA: int = _lib.SSL_OP_EPHEMERAL_RSA
201OP_MICROSOFT_SESS_ID_BUG: int = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG
202OP_NETSCAPE_CHALLENGE_BUG: int = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG
203OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG: int = (
204 _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG
205)
206OP_SSLREF2_REUSE_CERT_TYPE_BUG: int = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG
207OP_MICROSOFT_BIG_SSLV3_BUFFER: int = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
208OP_MSIE_SSLV2_RSA_PADDING: int = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING
209OP_SSLEAY_080_CLIENT_DH_BUG: int = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG
210OP_TLS_D5_BUG: int = _lib.SSL_OP_TLS_D5_BUG
211OP_TLS_BLOCK_PADDING_BUG: int = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG
212OP_DONT_INSERT_EMPTY_FRAGMENTS: int = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS
213OP_CIPHER_SERVER_PREFERENCE: int = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE
214OP_TLS_ROLLBACK_BUG: int = _lib.SSL_OP_TLS_ROLLBACK_BUG
215OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1
216OP_PKCS1_CHECK_2: int = _lib.SSL_OP_PKCS1_CHECK_2
217OP_NETSCAPE_CA_DN_BUG: int = _lib.SSL_OP_NETSCAPE_CA_DN_BUG
218OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG: int = (
219 _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG
220)
221OP_NO_COMPRESSION: int = _lib.SSL_OP_NO_COMPRESSION
223OP_NO_QUERY_MTU: int = _lib.SSL_OP_NO_QUERY_MTU
224try:
225 OP_COOKIE_EXCHANGE: int | None = _lib.SSL_OP_COOKIE_EXCHANGE
226 __all__.append("OP_COOKIE_EXCHANGE")
227except AttributeError:
228 OP_COOKIE_EXCHANGE = None
229OP_NO_TICKET: int = _lib.SSL_OP_NO_TICKET
231try:
232 OP_NO_RENEGOTIATION: int = _lib.SSL_OP_NO_RENEGOTIATION
233 __all__.append("OP_NO_RENEGOTIATION")
234except AttributeError:
235 pass
237try:
238 OP_IGNORE_UNEXPECTED_EOF: int = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF
239 __all__.append("OP_IGNORE_UNEXPECTED_EOF")
240except AttributeError:
241 pass
243try:
244 OP_LEGACY_SERVER_CONNECT: int = _lib.SSL_OP_LEGACY_SERVER_CONNECT
245 __all__.append("OP_LEGACY_SERVER_CONNECT")
246except AttributeError:
247 pass
249OP_ALL: int = _lib.SSL_OP_ALL
251VERIFY_PEER: int = _lib.SSL_VERIFY_PEER
252VERIFY_FAIL_IF_NO_PEER_CERT: int = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT
253VERIFY_CLIENT_ONCE: int = _lib.SSL_VERIFY_CLIENT_ONCE
254VERIFY_NONE: int = _lib.SSL_VERIFY_NONE
256SESS_CACHE_OFF: int = _lib.SSL_SESS_CACHE_OFF
257SESS_CACHE_CLIENT: int = _lib.SSL_SESS_CACHE_CLIENT
258SESS_CACHE_SERVER: int = _lib.SSL_SESS_CACHE_SERVER
259SESS_CACHE_BOTH: int = _lib.SSL_SESS_CACHE_BOTH
260SESS_CACHE_NO_AUTO_CLEAR: int = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR
261SESS_CACHE_NO_INTERNAL_LOOKUP: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP
262SESS_CACHE_NO_INTERNAL_STORE: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE
263SESS_CACHE_NO_INTERNAL: int = _lib.SSL_SESS_CACHE_NO_INTERNAL
265SSL_ST_CONNECT: int = _lib.SSL_ST_CONNECT
266SSL_ST_ACCEPT: int = _lib.SSL_ST_ACCEPT
267SSL_ST_MASK: int = _lib.SSL_ST_MASK
269SSL_CB_LOOP: int = _lib.SSL_CB_LOOP
270SSL_CB_EXIT: int = _lib.SSL_CB_EXIT
271SSL_CB_READ: int = _lib.SSL_CB_READ
272SSL_CB_WRITE: int = _lib.SSL_CB_WRITE
273SSL_CB_ALERT: int = _lib.SSL_CB_ALERT
274SSL_CB_READ_ALERT: int = _lib.SSL_CB_READ_ALERT
275SSL_CB_WRITE_ALERT: int = _lib.SSL_CB_WRITE_ALERT
276SSL_CB_ACCEPT_LOOP: int = _lib.SSL_CB_ACCEPT_LOOP
277SSL_CB_ACCEPT_EXIT: int = _lib.SSL_CB_ACCEPT_EXIT
278SSL_CB_CONNECT_LOOP: int = _lib.SSL_CB_CONNECT_LOOP
279SSL_CB_CONNECT_EXIT: int = _lib.SSL_CB_CONNECT_EXIT
280SSL_CB_HANDSHAKE_START: int = _lib.SSL_CB_HANDSHAKE_START
281SSL_CB_HANDSHAKE_DONE: int = _lib.SSL_CB_HANDSHAKE_DONE
283_Buffer = typing.Union[bytes, bytearray, memoryview]
284_T = TypeVar("_T")
287class _NoOverlappingProtocols:
288 pass
291NO_OVERLAPPING_PROTOCOLS = _NoOverlappingProtocols()
293# Callback types.
294_ALPNSelectCallback = Callable[
295 [
296 "Connection",
297 typing.List[bytes],
298 ],
299 typing.Union[bytes, _NoOverlappingProtocols],
300]
301_CookieGenerateCallback = Callable[["Connection"], bytes]
302_CookieVerifyCallback = Callable[["Connection", bytes], bool]
303_OCSPClientCallback = Callable[["Connection", bytes, Optional[_T]], bool]
304_OCSPServerCallback = Callable[["Connection", Optional[_T]], bytes]
305_PassphraseCallback = Callable[[int, bool, Optional[_T]], bytes]
306_VerifyCallback = Callable[["Connection", X509, int, int, int], bool]
309class X509VerificationCodes:
310 """
311 Success and error codes for X509 verification, as returned by the
312 underlying ``X509_STORE_CTX_get_error()`` function and passed by pyOpenSSL
313 to verification callback functions.
315 See `OpenSSL Verification Errors
316 <https://www.openssl.org/docs/manmaster/man3/X509_verify_cert_error_string.html#ERROR-CODES>`_
317 for details.
318 """
320 OK = _lib.X509_V_OK
321 ERR_UNABLE_TO_GET_ISSUER_CERT = _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT
322 ERR_UNABLE_TO_GET_CRL = _lib.X509_V_ERR_UNABLE_TO_GET_CRL
323 ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE = (
324 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE
325 )
326 ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE = (
327 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE
328 )
329 ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY = (
330 _lib.X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY
331 )
332 ERR_CERT_SIGNATURE_FAILURE = _lib.X509_V_ERR_CERT_SIGNATURE_FAILURE
333 ERR_CRL_SIGNATURE_FAILURE = _lib.X509_V_ERR_CRL_SIGNATURE_FAILURE
334 ERR_CERT_NOT_YET_VALID = _lib.X509_V_ERR_CERT_NOT_YET_VALID
335 ERR_CERT_HAS_EXPIRED = _lib.X509_V_ERR_CERT_HAS_EXPIRED
336 ERR_CRL_NOT_YET_VALID = _lib.X509_V_ERR_CRL_NOT_YET_VALID
337 ERR_CRL_HAS_EXPIRED = _lib.X509_V_ERR_CRL_HAS_EXPIRED
338 ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD = (
339 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD
340 )
341 ERR_ERROR_IN_CERT_NOT_AFTER_FIELD = (
342 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD
343 )
344 ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD = (
345 _lib.X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD
346 )
347 ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD = (
348 _lib.X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD
349 )
350 ERR_OUT_OF_MEM = _lib.X509_V_ERR_OUT_OF_MEM
351 ERR_DEPTH_ZERO_SELF_SIGNED_CERT = (
352 _lib.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT
353 )
354 ERR_SELF_SIGNED_CERT_IN_CHAIN = _lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN
355 ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY = (
356 _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY
357 )
358 ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE = (
359 _lib.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
360 )
361 ERR_CERT_CHAIN_TOO_LONG = _lib.X509_V_ERR_CERT_CHAIN_TOO_LONG
362 ERR_CERT_REVOKED = _lib.X509_V_ERR_CERT_REVOKED
363 ERR_INVALID_CA = _lib.X509_V_ERR_INVALID_CA
364 ERR_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PATH_LENGTH_EXCEEDED
365 ERR_INVALID_PURPOSE = _lib.X509_V_ERR_INVALID_PURPOSE
366 ERR_CERT_UNTRUSTED = _lib.X509_V_ERR_CERT_UNTRUSTED
367 ERR_CERT_REJECTED = _lib.X509_V_ERR_CERT_REJECTED
368 ERR_SUBJECT_ISSUER_MISMATCH = _lib.X509_V_ERR_SUBJECT_ISSUER_MISMATCH
369 ERR_AKID_SKID_MISMATCH = _lib.X509_V_ERR_AKID_SKID_MISMATCH
370 ERR_AKID_ISSUER_SERIAL_MISMATCH = (
371 _lib.X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH
372 )
373 ERR_KEYUSAGE_NO_CERTSIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CERTSIGN
374 ERR_UNABLE_TO_GET_CRL_ISSUER = _lib.X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER
375 ERR_UNHANDLED_CRITICAL_EXTENSION = (
376 _lib.X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION
377 )
378 ERR_KEYUSAGE_NO_CRL_SIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CRL_SIGN
379 ERR_UNHANDLED_CRITICAL_CRL_EXTENSION = (
380 _lib.X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION
381 )
382 ERR_INVALID_NON_CA = _lib.X509_V_ERR_INVALID_NON_CA
383 ERR_PROXY_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED
384 ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE = (
385 _lib.X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE
386 )
387 ERR_PROXY_CERTIFICATES_NOT_ALLOWED = (
388 _lib.X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED
389 )
390 ERR_INVALID_EXTENSION = _lib.X509_V_ERR_INVALID_EXTENSION
391 ERR_INVALID_POLICY_EXTENSION = _lib.X509_V_ERR_INVALID_POLICY_EXTENSION
392 ERR_NO_EXPLICIT_POLICY = _lib.X509_V_ERR_NO_EXPLICIT_POLICY
393 ERR_DIFFERENT_CRL_SCOPE = _lib.X509_V_ERR_DIFFERENT_CRL_SCOPE
394 ERR_UNSUPPORTED_EXTENSION_FEATURE = (
395 _lib.X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE
396 )
397 ERR_UNNESTED_RESOURCE = _lib.X509_V_ERR_UNNESTED_RESOURCE
398 ERR_PERMITTED_VIOLATION = _lib.X509_V_ERR_PERMITTED_VIOLATION
399 ERR_EXCLUDED_VIOLATION = _lib.X509_V_ERR_EXCLUDED_VIOLATION
400 ERR_SUBTREE_MINMAX = _lib.X509_V_ERR_SUBTREE_MINMAX
401 ERR_UNSUPPORTED_CONSTRAINT_TYPE = (
402 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE
403 )
404 ERR_UNSUPPORTED_CONSTRAINT_SYNTAX = (
405 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX
406 )
407 ERR_UNSUPPORTED_NAME_SYNTAX = _lib.X509_V_ERR_UNSUPPORTED_NAME_SYNTAX
408 ERR_CRL_PATH_VALIDATION_ERROR = _lib.X509_V_ERR_CRL_PATH_VALIDATION_ERROR
409 ERR_HOSTNAME_MISMATCH = _lib.X509_V_ERR_HOSTNAME_MISMATCH
410 ERR_EMAIL_MISMATCH = _lib.X509_V_ERR_EMAIL_MISMATCH
411 ERR_IP_ADDRESS_MISMATCH = _lib.X509_V_ERR_IP_ADDRESS_MISMATCH
412 ERR_APPLICATION_VERIFICATION = _lib.X509_V_ERR_APPLICATION_VERIFICATION
415# Taken from https://golang.org/src/crypto/x509/root_linux.go
416_CERTIFICATE_FILE_LOCATIONS = [
417 "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc.
418 "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6
419 "/etc/ssl/ca-bundle.pem", # OpenSUSE
420 "/etc/pki/tls/cacert.pem", # OpenELEC
421 "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7
422]
424_CERTIFICATE_PATH_LOCATIONS = [
425 "/etc/ssl/certs", # SLES10/SLES11
426]
428# These values are compared to output from cffi's ffi.string so they must be
429# byte strings.
430_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs"
431_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem"
434class Error(Exception):
435 """
436 An error occurred in an `OpenSSL.SSL` API.
437 """
440_raise_current_error = partial(_exception_from_error_queue, Error)
441_openssl_assert = _make_assert(Error)
444class WantReadError(Error):
445 pass
448class WantWriteError(Error):
449 pass
452class WantX509LookupError(Error):
453 pass
456class ZeroReturnError(Error):
457 pass
460class SysCallError(Error):
461 pass
464class _CallbackExceptionHelper:
465 """
466 A base class for wrapper classes that allow for intelligent exception
467 handling in OpenSSL callbacks.
469 :ivar list _problems: Any exceptions that occurred while executing in a
470 context where they could not be raised in the normal way. Typically
471 this is because OpenSSL has called into some Python code and requires a
472 return value. The exceptions are saved to be raised later when it is
473 possible to do so.
474 """
476 def __init__(self) -> None:
477 self._problems: list[Exception] = []
479 def raise_if_problem(self) -> None:
480 """
481 Raise an exception from the OpenSSL error queue or that was previously
482 captured whe running a callback.
483 """
484 if self._problems:
485 try:
486 _raise_current_error()
487 except Error:
488 pass
489 raise self._problems.pop(0)
492class _VerifyHelper(_CallbackExceptionHelper):
493 """
494 Wrap a callback such that it can be used as a certificate verification
495 callback.
496 """
498 def __init__(self, callback: _VerifyCallback) -> None:
499 _CallbackExceptionHelper.__init__(self)
501 @wraps(callback)
502 def wrapper(ok, store_ctx): # type: ignore[no-untyped-def]
503 x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
504 _lib.X509_up_ref(x509)
505 cert = X509._from_raw_x509_ptr(x509)
506 error_number = _lib.X509_STORE_CTX_get_error(store_ctx)
507 error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx)
509 index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx()
510 ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index)
511 connection = Connection._reverse_mapping[ssl]
513 try:
514 result = callback(
515 connection, cert, error_number, error_depth, ok
516 )
517 except Exception as e:
518 self._problems.append(e)
519 return 0
520 else:
521 if result:
522 _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK)
523 return 1
524 else:
525 return 0
527 self.callback = _ffi.callback(
528 "int (*)(int, X509_STORE_CTX *)", wrapper
529 )
532class _ALPNSelectHelper(_CallbackExceptionHelper):
533 """
534 Wrap a callback such that it can be used as an ALPN selection callback.
535 """
537 def __init__(self, callback: _ALPNSelectCallback) -> None:
538 _CallbackExceptionHelper.__init__(self)
540 @wraps(callback)
541 def wrapper(ssl, out, outlen, in_, inlen, arg): # type: ignore[no-untyped-def]
542 try:
543 conn = Connection._reverse_mapping[ssl]
545 # The string passed to us is made up of multiple
546 # length-prefixed bytestrings. We need to split that into a
547 # list.
548 instr = _ffi.buffer(in_, inlen)[:]
549 protolist = []
550 while instr:
551 encoded_len = instr[0]
552 proto = instr[1 : encoded_len + 1]
553 protolist.append(proto)
554 instr = instr[encoded_len + 1 :]
556 # Call the callback
557 outbytes = callback(conn, protolist)
558 any_accepted = True
559 if outbytes is NO_OVERLAPPING_PROTOCOLS:
560 outbytes = b""
561 any_accepted = False
562 elif not isinstance(outbytes, bytes):
563 raise TypeError(
564 "ALPN callback must return a bytestring or the "
565 "special NO_OVERLAPPING_PROTOCOLS sentinel value."
566 )
568 # Save our callback arguments on the connection object to make
569 # sure that they don't get freed before OpenSSL can use them.
570 # Then, return them in the appropriate output parameters.
571 conn._alpn_select_callback_args = [
572 _ffi.new("unsigned char *", len(outbytes)),
573 _ffi.new("unsigned char[]", outbytes),
574 ]
575 outlen[0] = conn._alpn_select_callback_args[0][0]
576 out[0] = conn._alpn_select_callback_args[1]
577 if not any_accepted:
578 return _lib.SSL_TLSEXT_ERR_NOACK
579 return _lib.SSL_TLSEXT_ERR_OK
580 except Exception as e:
581 self._problems.append(e)
582 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL
584 self.callback = _ffi.callback(
585 (
586 "int (*)(SSL *, unsigned char **, unsigned char *, "
587 "const unsigned char *, unsigned int, void *)"
588 ),
589 wrapper,
590 )
593class _OCSPServerCallbackHelper(_CallbackExceptionHelper):
594 """
595 Wrap a callback such that it can be used as an OCSP callback for the server
596 side.
598 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
599 ways. For servers, that callback is expected to retrieve some OCSP data and
600 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
601 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
602 is expected to check the OCSP data, and returns a negative value on error,
603 0 if the response is not acceptable, or positive if it is. These are
604 mutually exclusive return code behaviours, and they mean that we need two
605 helpers so that we always return an appropriate error code if the user's
606 code throws an exception.
608 Given that we have to have two helpers anyway, these helpers are a bit more
609 helpery than most: specifically, they hide a few more of the OpenSSL
610 functions so that the user has an easier time writing these callbacks.
612 This helper implements the server side.
613 """
615 def __init__(self, callback: _OCSPServerCallback[Any]) -> None:
616 _CallbackExceptionHelper.__init__(self)
618 @wraps(callback)
619 def wrapper(ssl, cdata): # type: ignore[no-untyped-def]
620 try:
621 conn = Connection._reverse_mapping[ssl]
623 # Extract the data if any was provided.
624 if cdata != _ffi.NULL:
625 data = _ffi.from_handle(cdata)
626 else:
627 data = None
629 # Call the callback.
630 ocsp_data = callback(conn, data)
632 if not isinstance(ocsp_data, bytes):
633 raise TypeError("OCSP callback must return a bytestring.")
635 # If the OCSP data was provided, we will pass it to OpenSSL.
636 # However, we have an early exit here: if no OCSP data was
637 # provided we will just exit out and tell OpenSSL that there
638 # is nothing to do.
639 if not ocsp_data:
640 return 3 # SSL_TLSEXT_ERR_NOACK
642 # OpenSSL takes ownership of this data and expects it to have
643 # been allocated by OPENSSL_malloc.
644 ocsp_data_length = len(ocsp_data)
645 data_ptr = _lib.OPENSSL_malloc(ocsp_data_length)
646 _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data
648 _lib.SSL_set_tlsext_status_ocsp_resp(
649 ssl, data_ptr, ocsp_data_length
650 )
652 return 0
653 except Exception as e:
654 self._problems.append(e)
655 return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
657 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
660class _OCSPClientCallbackHelper(_CallbackExceptionHelper):
661 """
662 Wrap a callback such that it can be used as an OCSP callback for the client
663 side.
665 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
666 ways. For servers, that callback is expected to retrieve some OCSP data and
667 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
668 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
669 is expected to check the OCSP data, and returns a negative value on error,
670 0 if the response is not acceptable, or positive if it is. These are
671 mutually exclusive return code behaviours, and they mean that we need two
672 helpers so that we always return an appropriate error code if the user's
673 code throws an exception.
675 Given that we have to have two helpers anyway, these helpers are a bit more
676 helpery than most: specifically, they hide a few more of the OpenSSL
677 functions so that the user has an easier time writing these callbacks.
679 This helper implements the client side.
680 """
682 def __init__(self, callback: _OCSPClientCallback[Any]) -> None:
683 _CallbackExceptionHelper.__init__(self)
685 @wraps(callback)
686 def wrapper(ssl, cdata): # type: ignore[no-untyped-def]
687 try:
688 conn = Connection._reverse_mapping[ssl]
690 # Extract the data if any was provided.
691 if cdata != _ffi.NULL:
692 data = _ffi.from_handle(cdata)
693 else:
694 data = None
696 # Get the OCSP data.
697 ocsp_ptr = _ffi.new("unsigned char **")
698 ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr)
699 if ocsp_len < 0:
700 # No OCSP data.
701 ocsp_data = b""
702 else:
703 # Copy the OCSP data, then pass it to the callback.
704 ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:]
706 valid = callback(conn, ocsp_data, data)
708 # Return 1 on success or 0 on error.
709 return int(bool(valid))
711 except Exception as e:
712 self._problems.append(e)
713 # Return negative value if an exception is hit.
714 return -1
716 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
719class _CookieGenerateCallbackHelper(_CallbackExceptionHelper):
720 def __init__(self, callback: _CookieGenerateCallback) -> None:
721 _CallbackExceptionHelper.__init__(self)
723 max_cookie_len = getattr(_lib, "DTLS1_COOKIE_LENGTH", 255)
725 @wraps(callback)
726 def wrapper(ssl, out, outlen): # type: ignore[no-untyped-def]
727 try:
728 conn = Connection._reverse_mapping[ssl]
729 cookie = callback(conn)
730 if len(cookie) > max_cookie_len:
731 raise ValueError(
732 f"Cookie too long (got {len(cookie)} bytes, "
733 f"max {max_cookie_len})"
734 )
735 out[0 : len(cookie)] = cookie
736 outlen[0] = len(cookie)
737 return 1
738 except Exception as e:
739 self._problems.append(e)
740 # "a zero return value can be used to abort the handshake"
741 return 0
743 self.callback = _ffi.callback(
744 "int (*)(SSL *, unsigned char *, unsigned int *)",
745 wrapper,
746 )
749class _CookieVerifyCallbackHelper(_CallbackExceptionHelper):
750 def __init__(self, callback: _CookieVerifyCallback) -> None:
751 _CallbackExceptionHelper.__init__(self)
753 @wraps(callback)
754 def wrapper(ssl, c_cookie, cookie_len): # type: ignore[no-untyped-def]
755 try:
756 conn = Connection._reverse_mapping[ssl]
757 return callback(conn, bytes(c_cookie[0:cookie_len]))
758 except Exception as e:
759 self._problems.append(e)
760 return 0
762 self.callback = _ffi.callback(
763 "int (*)(SSL *, unsigned char *, unsigned int)",
764 wrapper,
765 )
768def _asFileDescriptor(obj: Any) -> int:
769 fd = None
770 if not isinstance(obj, int):
771 meth = getattr(obj, "fileno", None)
772 if meth is not None:
773 obj = meth()
775 if isinstance(obj, int):
776 fd = obj
778 if not isinstance(fd, int):
779 raise TypeError("argument must be an int, or have a fileno() method.")
780 elif fd < 0:
781 raise ValueError(
782 f"file descriptor cannot be a negative integer ({fd:i})"
783 )
785 return fd
788def OpenSSL_version(type: int) -> bytes:
789 """
790 Return a string describing the version of OpenSSL in use.
792 :param type: One of the :const:`OPENSSL_` constants defined in this module.
793 """
794 return _ffi.string(_lib.OpenSSL_version(type))
797SSLeay_version = OpenSSL_version
800def _make_requires(flag: int, error: str) -> Callable[[_T], _T]:
801 """
802 Builds a decorator that ensures that functions that rely on OpenSSL
803 functions that are not present in this build raise NotImplementedError,
804 rather than AttributeError coming out of cryptography.
806 :param flag: A cryptography flag that guards the functions, e.g.
807 ``Cryptography_HAS_NEXTPROTONEG``.
808 :param error: The string to be used in the exception if the flag is false.
809 """
811 def _requires_decorator(func): # type: ignore[no-untyped-def]
812 if not flag:
814 @wraps(func)
815 def explode(*args, **kwargs): # type: ignore[no-untyped-def]
816 raise NotImplementedError(error)
818 return explode
819 else:
820 return func
822 return _requires_decorator
825_requires_keylog = _make_requires(
826 getattr(_lib, "Cryptography_HAS_KEYLOG", 0), "Key logging not available"
827)
829_requires_ssl_get0_group_name = _make_requires(
830 getattr(_lib, "Cryptography_HAS_SSL_GET0_GROUP_NAME", 0),
831 "Getting group name is not supported by the linked OpenSSL version",
832)
834_requires_ssl_cookie = _make_requires(
835 getattr(_lib, "Cryptography_HAS_SSL_COOKIE", 0),
836 "DTLS cookie support is not available",
837)
840class Session:
841 """
842 A class representing an SSL session. A session defines certain connection
843 parameters which may be re-used to speed up the setup of subsequent
844 connections.
846 .. versionadded:: 0.14
847 """
849 _session: Any
852F = TypeVar("F", bound=Callable[..., Any])
855def _require_not_used(f: F) -> F:
856 @wraps(f)
857 def inner(self: Context, *args: Any, **kwargs: Any) -> Any:
858 if self._used:
859 raise ValueError(
860 "Context has already been used to create a Connection, it "
861 "cannot be mutated again"
862 )
863 return f(self, *args, **kwargs)
865 return typing.cast(F, inner)
868class Context:
869 """
870 :class:`OpenSSL.SSL.Context` instances define the parameters for setting
871 up new SSL connections.
873 :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD,
874 DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD.
875 SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should
876 not be used.
877 """
879 _methods: typing.ClassVar[
880 dict[int, tuple[Callable[[], Any], int | None]]
881 ] = {
882 SSLv23_METHOD: (_lib.TLS_method, None),
883 TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION),
884 TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION),
885 TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION),
886 TLS_METHOD: (_lib.TLS_method, None),
887 TLS_SERVER_METHOD: (_lib.TLS_server_method, None),
888 TLS_CLIENT_METHOD: (_lib.TLS_client_method, None),
889 DTLS_METHOD: (_lib.DTLS_method, None),
890 DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None),
891 DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None),
892 }
894 def __init__(self, method: int) -> None:
895 if not isinstance(method, int):
896 raise TypeError("method must be an integer")
898 try:
899 method_func, version = self._methods[method]
900 except KeyError:
901 raise ValueError("No such protocol")
903 method_obj = method_func()
904 _openssl_assert(method_obj != _ffi.NULL)
906 context = _lib.SSL_CTX_new(method_obj)
907 _openssl_assert(context != _ffi.NULL)
908 context = _ffi.gc(context, _lib.SSL_CTX_free)
910 self._context = context
911 self._used = False
912 self._passphrase_helper: _PassphraseHelper | None = None
913 self._passphrase_callback: _PassphraseCallback[Any] | None = None
914 self._passphrase_userdata: Any | None = None
915 self._verify_helper: _VerifyHelper | None = None
916 self._verify_callback: _VerifyCallback | None = None
917 self._info_callback = None
918 self._keylog_callback = None
919 self._tlsext_servername_callback = None
920 self._app_data = None
921 self._alpn_select_helper: _ALPNSelectHelper | None = None
922 self._alpn_select_callback: _ALPNSelectCallback | None = None
923 self._ocsp_helper: (
924 _OCSPClientCallbackHelper | _OCSPServerCallbackHelper | None
925 ) = None
926 self._ocsp_callback: (
927 _OCSPClientCallback[Any] | _OCSPServerCallback[Any] | None
928 ) = None
929 self._ocsp_data: Any | None = None
930 self._cookie_generate_helper: _CookieGenerateCallbackHelper | None = (
931 None
932 )
933 self._cookie_verify_helper: _CookieVerifyCallbackHelper | None = None
935 self.set_mode(
936 _lib.SSL_MODE_ENABLE_PARTIAL_WRITE
937 | _lib.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER
938 )
939 if version is not None:
940 self.set_min_proto_version(version)
941 self.set_max_proto_version(version)
943 @_require_not_used
944 def set_min_proto_version(self, version: int) -> None:
945 """
946 Set the minimum supported protocol version. Setting the minimum
947 version to 0 will enable protocol versions down to the lowest version
948 supported by the library.
950 If the underlying OpenSSL build is missing support for the selected
951 version, this method will raise an exception.
952 """
953 _openssl_assert(
954 _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1
955 )
957 @_require_not_used
958 def set_max_proto_version(self, version: int) -> None:
959 """
960 Set the maximum supported protocol version. Setting the maximum
961 version to 0 will enable protocol versions up to the highest version
962 supported by the library.
964 If the underlying OpenSSL build is missing support for the selected
965 version, this method will raise an exception.
966 """
967 _openssl_assert(
968 _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1
969 )
971 @_require_not_used
972 def load_verify_locations(
973 self,
974 cafile: _StrOrBytesPath | None,
975 capath: _StrOrBytesPath | None = None,
976 ) -> None:
977 """
978 Let SSL know where we can find trusted certificates for the certificate
979 chain. Note that the certificates have to be in PEM format.
981 If capath is passed, it must be a directory prepared using the
982 ``c_rehash`` tool included with OpenSSL. Either, but not both, of
983 *pemfile* or *capath* may be :data:`None`.
985 :param cafile: In which file we can find the certificates (``bytes`` or
986 ``str``).
987 :param capath: In which directory we can find the certificates
988 (``bytes`` or ``str``).
990 :return: None
991 """
992 if cafile is None:
993 cafile = _ffi.NULL
994 else:
995 cafile = _path_bytes(cafile)
997 if capath is None:
998 capath = _ffi.NULL
999 else:
1000 capath = _path_bytes(capath)
1002 load_result = _lib.SSL_CTX_load_verify_locations(
1003 self._context, cafile, capath
1004 )
1005 if not load_result:
1006 _raise_current_error()
1008 def _wrap_callback(
1009 self, callback: _PassphraseCallback[_T]
1010 ) -> _PassphraseHelper:
1011 @wraps(callback)
1012 def wrapper(size: int, verify: bool, userdata: Any) -> bytes:
1013 return callback(size, verify, self._passphrase_userdata)
1015 return _PassphraseHelper(
1016 FILETYPE_PEM, wrapper, more_args=True, truncate=True
1017 )
1019 @_require_not_used
1020 def set_passwd_cb(
1021 self,
1022 callback: _PassphraseCallback[_T],
1023 userdata: _T | None = None,
1024 ) -> None:
1025 """
1026 Set the passphrase callback. This function will be called
1027 when a private key with a passphrase is loaded.
1029 :param callback: The Python callback to use. This must accept three
1030 positional arguments. First, an integer giving the maximum length
1031 of the passphrase it may return. If the returned passphrase is
1032 longer than this, it will be truncated. Second, a boolean value
1033 which will be true if the user should be prompted for the
1034 passphrase twice and the callback should verify that the two values
1035 supplied are equal. Third, the value given as the *userdata*
1036 parameter to :meth:`set_passwd_cb`. The *callback* must return
1037 a byte string. If an error occurs, *callback* should return a false
1038 value (e.g. an empty string).
1039 :param userdata: (optional) A Python object which will be given as
1040 argument to the callback
1041 :return: None
1042 """
1043 if not callable(callback):
1044 raise TypeError("callback must be callable")
1046 self._passphrase_helper = self._wrap_callback(callback)
1047 self._passphrase_callback = self._passphrase_helper.callback
1048 _lib.SSL_CTX_set_default_passwd_cb(
1049 self._context, self._passphrase_callback
1050 )
1051 self._passphrase_userdata = userdata
1053 @_require_not_used
1054 def set_default_verify_paths(self) -> None:
1055 """
1056 Specify that the platform provided CA certificates are to be used for
1057 verification purposes. This method has some caveats related to the
1058 binary wheels that cryptography (pyOpenSSL's primary dependency) ships:
1060 * macOS will only load certificates using this method if the user has
1061 the ``openssl@3`` `Homebrew <https://brew.sh>`_ formula installed
1062 in the default location.
1063 * Windows will not work.
1064 * manylinux cryptography wheels will work on most common Linux
1065 distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the
1066 manylinux wheel and attempts to load roots via a fallback path.
1068 :return: None
1069 """
1070 # SSL_CTX_set_default_verify_paths will attempt to load certs from
1071 # both a cafile and capath that are set at compile time. However,
1072 # it will first check environment variables and, if present, load
1073 # those paths instead
1074 set_result = _lib.SSL_CTX_set_default_verify_paths(self._context)
1075 _openssl_assert(set_result == 1)
1076 # After attempting to set default_verify_paths we need to know whether
1077 # to go down the fallback path.
1078 # First we'll check to see if any env vars have been set. If so,
1079 # we won't try to do anything else because the user has set the path
1080 # themselves.
1081 if not self._check_env_vars_set("SSL_CERT_DIR", "SSL_CERT_FILE"):
1082 default_dir = _ffi.string(_lib.X509_get_default_cert_dir())
1083 default_file = _ffi.string(_lib.X509_get_default_cert_file())
1084 # Now we check to see if the default_dir and default_file are set
1085 # to the exact values we use in our manylinux builds. If they are
1086 # then we know to load the fallbacks
1087 if (
1088 default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR
1089 and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE
1090 ):
1091 # This is manylinux, let's load our fallback paths
1092 self._fallback_default_verify_paths(
1093 _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS
1094 )
1096 def _check_env_vars_set(self, dir_env_var: str, file_env_var: str) -> bool:
1097 """
1098 Check to see if the default cert dir/file environment vars are present.
1100 :return: bool
1101 """
1102 return (
1103 os.environ.get(file_env_var) is not None
1104 or os.environ.get(dir_env_var) is not None
1105 )
1107 def _fallback_default_verify_paths(
1108 self, file_path: list[str], dir_path: list[str]
1109 ) -> None:
1110 """
1111 Default verify paths are based on the compiled version of OpenSSL.
1112 However, when pyca/cryptography is compiled as a manylinux wheel
1113 that compiled location can potentially be wrong. So, like Go, we
1114 will try a predefined set of paths and attempt to load roots
1115 from there.
1117 :return: None
1118 """
1119 for cafile in file_path:
1120 if os.path.isfile(cafile):
1121 self.load_verify_locations(cafile)
1122 break
1124 for capath in dir_path:
1125 if os.path.isdir(capath):
1126 self.load_verify_locations(None, capath)
1127 break
1129 @_require_not_used
1130 def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None:
1131 """
1132 Load a certificate chain from a file.
1134 :param certfile: The name of the certificate chain file (``bytes`` or
1135 ``str``). Must be PEM encoded.
1137 :return: None
1138 """
1139 certfile = _path_bytes(certfile)
1141 result = _lib.SSL_CTX_use_certificate_chain_file(
1142 self._context, certfile
1143 )
1144 if not result:
1145 _raise_current_error()
1147 @_require_not_used
1148 def use_certificate_file(
1149 self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM
1150 ) -> None:
1151 """
1152 Load a certificate from a file
1154 :param certfile: The name of the certificate file (``bytes`` or
1155 ``str``).
1156 :param filetype: (optional) The encoding of the file, which is either
1157 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
1158 :const:`FILETYPE_PEM`.
1160 :return: None
1161 """
1162 certfile = _path_bytes(certfile)
1163 if not isinstance(filetype, int):
1164 raise TypeError("filetype must be an integer")
1166 use_result = _lib.SSL_CTX_use_certificate_file(
1167 self._context, certfile, filetype
1168 )
1169 if not use_result:
1170 _raise_current_error()
1172 @_require_not_used
1173 def use_certificate(self, cert: X509 | x509.Certificate) -> None:
1174 """
1175 Load a certificate from a X509 object
1177 :param cert: The X509 object
1178 :return: None
1179 """
1180 # Mirrored at Connection.use_certificate
1181 if not isinstance(cert, X509):
1182 cert = X509.from_cryptography(cert)
1183 else:
1184 warnings.warn(
1185 (
1186 "Passing pyOpenSSL X509 objects is deprecated. You "
1187 "should use a cryptography.x509.Certificate instead."
1188 ),
1189 DeprecationWarning,
1190 stacklevel=2,
1191 )
1193 use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509)
1194 if not use_result:
1195 _raise_current_error()
1197 @_require_not_used
1198 def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None:
1199 """
1200 Add certificate to chain
1202 :param certobj: The X509 certificate object to add to the chain
1203 :return: None
1204 """
1205 if not isinstance(certobj, X509):
1206 certobj = X509.from_cryptography(certobj)
1207 else:
1208 warnings.warn(
1209 (
1210 "Passing pyOpenSSL X509 objects is deprecated. You "
1211 "should use a cryptography.x509.Certificate instead."
1212 ),
1213 DeprecationWarning,
1214 stacklevel=2,
1215 )
1217 copy = _lib.X509_dup(certobj._x509)
1218 add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy)
1219 if not add_result:
1220 # TODO: This is untested.
1221 _lib.X509_free(copy)
1222 _raise_current_error()
1224 def _raise_passphrase_exception(self) -> None:
1225 if self._passphrase_helper is not None:
1226 self._passphrase_helper.raise_if_problem(Error)
1228 _raise_current_error()
1230 @_require_not_used
1231 def use_privatekey_file(
1232 self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM
1233 ) -> None:
1234 """
1235 Load a private key from a file
1237 :param keyfile: The name of the key file (``bytes`` or ``str``)
1238 :param filetype: (optional) The encoding of the file, which is either
1239 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
1240 :const:`FILETYPE_PEM`.
1242 :return: None
1243 """
1244 keyfile = _path_bytes(keyfile)
1246 if not isinstance(filetype, int):
1247 raise TypeError("filetype must be an integer")
1249 use_result = _lib.SSL_CTX_use_PrivateKey_file(
1250 self._context, keyfile, filetype
1251 )
1252 if not use_result:
1253 self._raise_passphrase_exception()
1255 @_require_not_used
1256 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None:
1257 """
1258 Load a private key from a PKey object
1260 :param pkey: The PKey object
1261 :return: None
1262 """
1263 # Mirrored at Connection.use_privatekey
1264 if not isinstance(pkey, PKey):
1265 pkey = PKey.from_cryptography_key(pkey)
1266 else:
1267 warnings.warn(
1268 (
1269 "Passing pyOpenSSL PKey objects is deprecated. You "
1270 "should use a cryptography private key instead."
1271 ),
1272 DeprecationWarning,
1273 stacklevel=2,
1274 )
1276 use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey)
1277 if not use_result:
1278 self._raise_passphrase_exception()
1280 def check_privatekey(self) -> None:
1281 """
1282 Check if the private key (loaded with :meth:`use_privatekey`) matches
1283 the certificate (loaded with :meth:`use_certificate`)
1285 :return: :data:`None` (raises :exc:`Error` if something's wrong)
1286 """
1287 if not _lib.SSL_CTX_check_private_key(self._context):
1288 _raise_current_error()
1290 @_require_not_used
1291 def load_client_ca(self, cafile: bytes) -> None:
1292 """
1293 Load the trusted certificates that will be sent to the client. Does
1294 not actually imply any of the certificates are trusted; that must be
1295 configured separately.
1297 :param bytes cafile: The path to a certificates file in PEM format.
1298 :return: None
1299 """
1300 ca_list = _lib.SSL_load_client_CA_file(
1301 _text_to_bytes_and_warn("cafile", cafile)
1302 )
1303 _openssl_assert(ca_list != _ffi.NULL)
1304 _lib.SSL_CTX_set_client_CA_list(self._context, ca_list)
1306 @_require_not_used
1307 def set_session_id(self, buf: bytes) -> None:
1308 """
1309 Set the session id to *buf* within which a session can be reused for
1310 this Context object. This is needed when doing session resumption,
1311 because there is no way for a stored session to know which Context
1312 object it is associated with.
1314 :param bytes buf: The session id.
1316 :returns: None
1317 """
1318 buf = _text_to_bytes_and_warn("buf", buf)
1319 _openssl_assert(
1320 _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf))
1321 == 1
1322 )
1324 @_require_not_used
1325 def set_session_cache_mode(self, mode: int) -> int:
1326 """
1327 Set the behavior of the session cache used by all connections using
1328 this Context. The previously set mode is returned. See
1329 :const:`SESS_CACHE_*` for details about particular modes.
1331 :param mode: One or more of the SESS_CACHE_* flags (combine using
1332 bitwise or)
1333 :returns: The previously set caching mode.
1335 .. versionadded:: 0.14
1336 """
1337 if not isinstance(mode, int):
1338 raise TypeError("mode must be an integer")
1340 return _lib.SSL_CTX_set_session_cache_mode(self._context, mode)
1342 def get_session_cache_mode(self) -> int:
1343 """
1344 Get the current session cache mode.
1346 :returns: The currently used cache mode.
1348 .. versionadded:: 0.14
1349 """
1350 return _lib.SSL_CTX_get_session_cache_mode(self._context)
1352 @_require_not_used
1353 def set_verify(
1354 self, mode: int, callback: _VerifyCallback | None = None
1355 ) -> None:
1356 """
1357 Set the verification flags for this Context object to *mode* and
1358 specify that *callback* should be used for verification callbacks.
1360 :param mode: The verify mode, this should be one of
1361 :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If
1362 :const:`VERIFY_PEER` is used, *mode* can be OR:ed with
1363 :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and
1364 :const:`VERIFY_CLIENT_ONCE` to further control the behaviour.
1365 :param callback: The optional Python verification callback to use.
1366 This should take five arguments: A Connection object, an X509
1367 object, and three integer variables, which are in turn potential
1368 error number, error depth and return code. *callback* should
1369 return True if verification passes and False otherwise.
1370 If omitted, OpenSSL's default verification is used.
1371 :return: None
1373 See SSL_CTX_set_verify(3SSL) for further details.
1374 """
1375 if not isinstance(mode, int):
1376 raise TypeError("mode must be an integer")
1378 if callback is None:
1379 self._verify_helper = None
1380 self._verify_callback = None
1381 _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL)
1382 else:
1383 if not callable(callback):
1384 raise TypeError("callback must be callable")
1386 self._verify_helper = _VerifyHelper(callback)
1387 self._verify_callback = self._verify_helper.callback
1388 _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
1390 @_require_not_used
1391 def set_verify_depth(self, depth: int) -> None:
1392 """
1393 Set the maximum depth for the certificate chain verification that shall
1394 be allowed for this Context object.
1396 :param depth: An integer specifying the verify depth
1397 :return: None
1398 """
1399 if not isinstance(depth, int):
1400 raise TypeError("depth must be an integer")
1402 _lib.SSL_CTX_set_verify_depth(self._context, depth)
1404 def get_verify_mode(self) -> int:
1405 """
1406 Retrieve the Context object's verify mode, as set by
1407 :meth:`set_verify`.
1409 :return: The verify mode
1410 """
1411 return _lib.SSL_CTX_get_verify_mode(self._context)
1413 def get_verify_depth(self) -> int:
1414 """
1415 Retrieve the Context object's verify depth, as set by
1416 :meth:`set_verify_depth`.
1418 :return: The verify depth
1419 """
1420 return _lib.SSL_CTX_get_verify_depth(self._context)
1422 @_require_not_used
1423 def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None:
1424 """
1425 Load parameters for Ephemeral Diffie-Hellman
1427 :param dhfile: The file to load EDH parameters from (``bytes`` or
1428 ``str``).
1430 :return: None
1431 """
1432 dhfile = _path_bytes(dhfile)
1434 bio = _lib.BIO_new_file(dhfile, b"r")
1435 if bio == _ffi.NULL:
1436 _raise_current_error()
1437 bio = _ffi.gc(bio, _lib.BIO_free)
1439 dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
1440 dh = _ffi.gc(dh, _lib.DH_free)
1441 res = _lib.SSL_CTX_set_tmp_dh(self._context, dh)
1442 _openssl_assert(res == 1)
1444 @_require_not_used
1445 def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None:
1446 """
1447 Select a curve to use for ECDHE key exchange.
1449 :param curve: A curve instance from cryptography
1450 (:class:`~cryptogragraphy.hazmat.primitives.asymmetric.ec.EllipticCurve`).
1451 Alternatively (deprecated) a curve object from either
1452 :meth:`OpenSSL.crypto.get_elliptic_curve` or
1453 :meth:`OpenSSL.crypto.get_elliptic_curves`.
1455 :return: None
1456 """
1458 if isinstance(curve, _EllipticCurve):
1459 warnings.warn(
1460 (
1461 "Passing pyOpenSSL elliptic curves to set_tmp_ecdh is "
1462 "deprecated. You should use cryptography's elliptic curve "
1463 "types instead."
1464 ),
1465 DeprecationWarning,
1466 stacklevel=2,
1467 )
1468 _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY())
1469 else:
1470 name = curve.name
1471 if name == "secp192r1":
1472 name = "prime192v1"
1473 elif name == "secp256r1":
1474 name = "prime256v1"
1475 nid = _lib.OBJ_txt2nid(name.encode())
1476 if nid == _lib.NID_undef:
1477 _raise_current_error()
1479 ec = _lib.EC_KEY_new_by_curve_name(nid)
1480 _openssl_assert(ec != _ffi.NULL)
1481 ec = _ffi.gc(ec, _lib.EC_KEY_free)
1482 _lib.SSL_CTX_set_tmp_ecdh(self._context, ec)
1484 @_require_not_used
1485 def set_cipher_list(self, cipher_list: bytes) -> None:
1486 """
1487 Set the list of ciphers to be used in this context.
1489 See the OpenSSL manual for more information (e.g.
1490 :manpage:`ciphers(1)`).
1492 Note this API does not change the cipher suites used in TLS 1.3
1493 Use `set_tls13_ciphersuites` for that.
1495 :param bytes cipher_list: An OpenSSL cipher string.
1496 :return: None
1497 """
1498 cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list)
1500 if not isinstance(cipher_list, bytes):
1501 raise TypeError("cipher_list must be a byte string.")
1503 _openssl_assert(
1504 _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1
1505 )
1507 @_require_not_used
1508 def set_tls13_ciphersuites(self, ciphersuites: bytes) -> None:
1509 """
1510 Set the list of TLS 1.3 ciphers to be used in this context.
1511 OpenSSL maintains a separate list of TLS 1.3+ ciphers to
1512 ciphers for TLS 1.2 and lowers.
1514 See the OpenSSL manual for more information (e.g.
1515 :manpage:`ciphers(1)`).
1517 :param bytes ciphersuites: An OpenSSL cipher string containing
1518 TLS 1.3+ ciphersuites.
1519 :return: None
1521 .. versionadded:: 25.2.0
1522 """
1523 if not isinstance(ciphersuites, bytes):
1524 raise TypeError("ciphersuites must be a byte string.")
1526 _openssl_assert(
1527 _lib.SSL_CTX_set_ciphersuites(self._context, ciphersuites) == 1
1528 )
1530 @_require_not_used
1531 def set_client_ca_list(
1532 self, certificate_authorities: Sequence[X509Name]
1533 ) -> None:
1534 """
1535 Set the list of preferred client certificate signers for this server
1536 context.
1538 This list of certificate authorities will be sent to the client when
1539 the server requests a client certificate.
1541 :param certificate_authorities: a sequence of X509Names.
1542 :return: None
1544 .. versionadded:: 0.10
1545 """
1546 name_stack = _lib.sk_X509_NAME_new_null()
1547 _openssl_assert(name_stack != _ffi.NULL)
1549 try:
1550 for ca_name in certificate_authorities:
1551 if not isinstance(ca_name, X509Name):
1552 raise TypeError(
1553 f"client CAs must be X509Name objects, not "
1554 f"{type(ca_name).__name__} objects"
1555 )
1556 copy = _lib.X509_NAME_dup(ca_name._name)
1557 _openssl_assert(copy != _ffi.NULL)
1558 push_result = _lib.sk_X509_NAME_push(name_stack, copy)
1559 if not push_result:
1560 _lib.X509_NAME_free(copy)
1561 _raise_current_error()
1562 except Exception:
1563 _lib.sk_X509_NAME_free(name_stack)
1564 raise
1566 _lib.SSL_CTX_set_client_CA_list(self._context, name_stack)
1568 @_require_not_used
1569 def add_client_ca(
1570 self, certificate_authority: X509 | x509.Certificate
1571 ) -> None:
1572 """
1573 Add the CA certificate to the list of preferred signers for this
1574 context.
1576 The list of certificate authorities will be sent to the client when the
1577 server requests a client certificate.
1579 :param certificate_authority: certificate authority's X509 certificate.
1580 :return: None
1582 .. versionadded:: 0.10
1583 """
1584 if not isinstance(certificate_authority, X509):
1585 certificate_authority = X509.from_cryptography(
1586 certificate_authority
1587 )
1588 else:
1589 warnings.warn(
1590 (
1591 "Passing pyOpenSSL X509 objects is deprecated. You "
1592 "should use a cryptography.x509.Certificate instead."
1593 ),
1594 DeprecationWarning,
1595 stacklevel=2,
1596 )
1598 add_result = _lib.SSL_CTX_add_client_CA(
1599 self._context, certificate_authority._x509
1600 )
1601 _openssl_assert(add_result == 1)
1603 @_require_not_used
1604 def set_timeout(self, timeout: int) -> None:
1605 """
1606 Set the timeout for newly created sessions for this Context object to
1607 *timeout*. The default value is 300 seconds. See the OpenSSL manual
1608 for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`).
1610 :param timeout: The timeout in (whole) seconds
1611 :return: The previous session timeout
1612 """
1613 if not isinstance(timeout, int):
1614 raise TypeError("timeout must be an integer")
1616 return _lib.SSL_CTX_set_timeout(self._context, timeout)
1618 def get_timeout(self) -> int:
1619 """
1620 Retrieve session timeout, as set by :meth:`set_timeout`. The default
1621 is 300 seconds.
1623 :return: The session timeout
1624 """
1625 return _lib.SSL_CTX_get_timeout(self._context)
1627 @_require_not_used
1628 def set_info_callback(
1629 self, callback: Callable[[Connection, int, int], None]
1630 ) -> None:
1631 """
1632 Set the information callback to *callback*. This function will be
1633 called from time to time during SSL handshakes.
1635 :param callback: The Python callback to use. This should take three
1636 arguments: a Connection object and two integers. The first integer
1637 specifies where in the SSL handshake the function was called, and
1638 the other the return code from a (possibly failed) internal
1639 function call.
1640 :return: None
1641 """
1643 @wraps(callback)
1644 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def]
1645 callback(Connection._reverse_mapping[ssl], where, return_code)
1647 self._info_callback = _ffi.callback(
1648 "void (*)(const SSL *, int, int)", wrapper
1649 )
1650 _lib.SSL_CTX_set_info_callback(self._context, self._info_callback)
1652 @_requires_keylog
1653 @_require_not_used
1654 def set_keylog_callback(
1655 self, callback: Callable[[Connection, bytes], None]
1656 ) -> None:
1657 """
1658 Set the TLS key logging callback to *callback*. This function will be
1659 called whenever TLS key material is generated or received, in order
1660 to allow applications to store this keying material for debugging
1661 purposes.
1663 :param callback: The Python callback to use. This should take two
1664 arguments: a Connection object and a bytestring that contains
1665 the key material in the format used by NSS for its SSLKEYLOGFILE
1666 debugging output.
1667 :return: None
1668 """
1670 @wraps(callback)
1671 def wrapper(ssl, line): # type: ignore[no-untyped-def]
1672 line = _ffi.string(line)
1673 callback(Connection._reverse_mapping[ssl], line)
1675 self._keylog_callback = _ffi.callback(
1676 "void (*)(const SSL *, const char *)", wrapper
1677 )
1678 _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback)
1680 def get_app_data(self) -> Any:
1681 """
1682 Get the application data (supplied via :meth:`set_app_data()`)
1684 :return: The application data
1685 """
1686 return self._app_data
1688 @_require_not_used
1689 def set_app_data(self, data: Any) -> None:
1690 """
1691 Set the application data (will be returned from get_app_data())
1693 :param data: Any Python object
1694 :return: None
1695 """
1696 self._app_data = data
1698 def get_cert_store(self) -> X509Store | None:
1699 """
1700 Get the certificate store for the context. This can be used to add
1701 "trusted" certificates without using the
1702 :meth:`load_verify_locations` method.
1704 :return: A X509Store object or None if it does not have one.
1705 """
1706 store = _lib.SSL_CTX_get_cert_store(self._context)
1707 if store == _ffi.NULL:
1708 # TODO: This is untested.
1709 return None
1711 pystore = X509Store.__new__(X509Store)
1712 pystore._store = store
1713 return pystore
1715 @_require_not_used
1716 def set_options(self, options: int) -> int:
1717 """
1718 Add options. Options set before are not cleared!
1719 This method should be used with the :const:`OP_*` constants.
1721 :param options: The options to add.
1722 :return: The new option bitmask.
1723 """
1724 if not isinstance(options, int):
1725 raise TypeError("options must be an integer")
1727 return _lib.SSL_CTX_set_options(self._context, options)
1729 @_require_not_used
1730 def set_mode(self, mode: int) -> int:
1731 """
1732 Add modes via bitmask. Modes set before are not cleared! This method
1733 should be used with the :const:`MODE_*` constants.
1735 :param mode: The mode to add.
1736 :return: The new mode bitmask.
1737 """
1738 if not isinstance(mode, int):
1739 raise TypeError("mode must be an integer")
1741 return _lib.SSL_CTX_set_mode(self._context, mode)
1743 @_require_not_used
1744 def clear_mode(self, mode_to_clear: int) -> int:
1745 """
1746 Modes previously set cannot be overwritten without being
1747 cleared first. This method should be used to clear existing modes.
1748 """
1749 return _lib.SSL_CTX_clear_mode(self._context, mode_to_clear)
1751 @_require_not_used
1752 def set_tlsext_servername_callback(
1753 self, callback: Callable[[Connection], None]
1754 ) -> None:
1755 """
1756 Specify a callback function to be called when clients specify a server
1757 name.
1759 :param callback: The callback function. It will be invoked with one
1760 argument, the Connection instance.
1762 .. versionadded:: 0.13
1763 """
1765 @wraps(callback)
1766 def wrapper(ssl, alert, arg): # type: ignore[no-untyped-def]
1767 try:
1768 callback(Connection._reverse_mapping[ssl])
1769 except Exception:
1770 sys.excepthook(*sys.exc_info())
1771 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL
1772 return 0
1774 self._tlsext_servername_callback = _ffi.callback(
1775 "int (*)(SSL *, int *, void *)", wrapper
1776 )
1777 _lib.SSL_CTX_set_tlsext_servername_callback(
1778 self._context, self._tlsext_servername_callback
1779 )
1781 @_require_not_used
1782 def set_tlsext_use_srtp(self, profiles: bytes) -> None:
1783 """
1784 Enable support for negotiating SRTP keying material.
1786 :param bytes profiles: A colon delimited list of protection profile
1787 names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``.
1788 :return: None
1789 """
1790 if not isinstance(profiles, bytes):
1791 raise TypeError("profiles must be a byte string.")
1793 _openssl_assert(
1794 _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0
1795 )
1797 @_require_not_used
1798 def set_alpn_protos(self, protos: list[bytes]) -> None:
1799 """
1800 Specify the protocols that the client is prepared to speak after the
1801 TLS connection has been negotiated using Application Layer Protocol
1802 Negotiation.
1804 :param protos: A list of the protocols to be offered to the server.
1805 This list should be a Python list of bytestrings representing the
1806 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
1807 """
1808 # Different versions of OpenSSL are inconsistent about how they handle
1809 # empty proto lists (see #1043), so we avoid the problem entirely by
1810 # rejecting them ourselves.
1811 if not protos:
1812 raise ValueError("at least one protocol must be specified")
1814 # Take the list of protocols and join them together, prefixing them
1815 # with their lengths.
1816 protostr = b"".join(
1817 chain.from_iterable((bytes((len(p),)), p) for p in protos)
1818 )
1820 # Build a C string from the list. We don't need to save this off
1821 # because OpenSSL immediately copies the data out.
1822 input_str = _ffi.new("unsigned char[]", protostr)
1824 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
1825 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
1826 # return 0 on success, and non-0 on failure.
1827 # WARNING: these functions reverse the return value convention.
1828 _openssl_assert(
1829 _lib.SSL_CTX_set_alpn_protos(
1830 self._context, input_str, len(protostr)
1831 )
1832 == 0
1833 )
1835 @_require_not_used
1836 def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None:
1837 """
1838 Specify a callback function that will be called on the server when a
1839 client offers protocols using ALPN.
1841 :param callback: The callback function. It will be invoked with two
1842 arguments: the Connection, and a list of offered protocols as
1843 bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return
1844 one of those bytestrings to indicate the chosen protocol, the
1845 empty bytestring to terminate the TLS connection, or the
1846 :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered
1847 protocol was selected, but that the connection should not be
1848 aborted.
1849 """
1850 self._alpn_select_helper = _ALPNSelectHelper(callback)
1851 self._alpn_select_callback = self._alpn_select_helper.callback
1852 _lib.SSL_CTX_set_alpn_select_cb(
1853 self._context, self._alpn_select_callback, _ffi.NULL
1854 )
1856 def _set_ocsp_callback(
1857 self,
1858 helper: _OCSPClientCallbackHelper | _OCSPServerCallbackHelper,
1859 data: Any | None,
1860 ) -> None:
1861 """
1862 This internal helper does the common work for
1863 ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is
1864 almost all of it.
1865 """
1866 self._ocsp_helper = helper
1867 self._ocsp_callback = helper.callback
1868 if data is None:
1869 self._ocsp_data = _ffi.NULL
1870 else:
1871 self._ocsp_data = _ffi.new_handle(data)
1873 rc = _lib.SSL_CTX_set_tlsext_status_cb(
1874 self._context, self._ocsp_callback
1875 )
1876 _openssl_assert(rc == 1)
1877 rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data)
1878 _openssl_assert(rc == 1)
1880 @_require_not_used
1881 def set_ocsp_server_callback(
1882 self,
1883 callback: _OCSPServerCallback[_T],
1884 data: _T | None = None,
1885 ) -> None:
1886 """
1887 Set a callback to provide OCSP data to be stapled to the TLS handshake
1888 on the server side.
1890 :param callback: The callback function. It will be invoked with two
1891 arguments: the Connection, and the optional arbitrary data you have
1892 provided. The callback must return a bytestring that contains the
1893 OCSP data to staple to the handshake. If no OCSP data is available
1894 for this connection, return the empty bytestring.
1895 :param data: Some opaque data that will be passed into the callback
1896 function when called. This can be used to avoid needing to do
1897 complex data lookups or to keep track of what context is being
1898 used. This parameter is optional.
1899 """
1900 helper = _OCSPServerCallbackHelper(callback)
1901 self._set_ocsp_callback(helper, data)
1903 @_require_not_used
1904 def set_ocsp_client_callback(
1905 self,
1906 callback: _OCSPClientCallback[_T],
1907 data: _T | None = None,
1908 ) -> None:
1909 """
1910 Set a callback to validate OCSP data stapled to the TLS handshake on
1911 the client side.
1913 :param callback: The callback function. It will be invoked with three
1914 arguments: the Connection, a bytestring containing the stapled OCSP
1915 assertion, and the optional arbitrary data you have provided. The
1916 callback must return a boolean that indicates the result of
1917 validating the OCSP data: ``True`` if the OCSP data is valid and
1918 the certificate can be trusted, or ``False`` if either the OCSP
1919 data is invalid or the certificate has been revoked.
1920 :param data: Some opaque data that will be passed into the callback
1921 function when called. This can be used to avoid needing to do
1922 complex data lookups or to keep track of what context is being
1923 used. This parameter is optional.
1924 """
1925 helper = _OCSPClientCallbackHelper(callback)
1926 self._set_ocsp_callback(helper, data)
1928 @_require_not_used
1929 @_requires_ssl_cookie
1930 def set_cookie_generate_callback(
1931 self, callback: _CookieGenerateCallback
1932 ) -> None:
1933 self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback)
1934 _lib.SSL_CTX_set_cookie_generate_cb(
1935 self._context,
1936 self._cookie_generate_helper.callback,
1937 )
1939 @_require_not_used
1940 @_requires_ssl_cookie
1941 def set_cookie_verify_callback(
1942 self, callback: _CookieVerifyCallback
1943 ) -> None:
1944 self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback)
1945 _lib.SSL_CTX_set_cookie_verify_cb(
1946 self._context,
1947 self._cookie_verify_helper.callback,
1948 )
1951class Connection:
1952 _reverse_mapping: typing.MutableMapping[Any, Connection] = (
1953 WeakValueDictionary()
1954 )
1956 def __init__(
1957 self, context: Context, socket: socket.socket | None = None
1958 ) -> None:
1959 """
1960 Create a new Connection object, using the given OpenSSL.SSL.Context
1961 instance and socket.
1963 :param context: An SSL Context to use for this connection
1964 :param socket: The socket to use for transport layer
1965 """
1966 if not isinstance(context, Context):
1967 raise TypeError("context must be a Context instance")
1969 context._used = True
1971 ssl = _lib.SSL_new(context._context)
1972 self._ssl = _ffi.gc(ssl, _lib.SSL_free)
1973 # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns
1974 # an SSL_ERROR_WANT_READ when processing a non-application data packet
1975 # even though there is still data on the underlying transport.
1976 # See https://github.com/openssl/openssl/issues/6234 for more details.
1977 _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY)
1978 self._context = context
1979 self._app_data = None
1981 # References to strings used for Application Layer Protocol
1982 # Negotiation. These strings get copied at some point but it's well
1983 # after the callback returns, so we have to hang them somewhere to
1984 # avoid them getting freed.
1985 self._alpn_select_callback_args: Any = None
1987 # Reference the verify_callback of the Context. This ensures that if
1988 # set_verify is called again after the SSL object has been created we
1989 # do not point to a dangling reference
1990 self._verify_helper = context._verify_helper
1991 self._verify_callback = context._verify_callback
1993 # And likewise for the cookie callbacks
1994 self._cookie_generate_helper = context._cookie_generate_helper
1995 self._cookie_verify_helper = context._cookie_verify_helper
1997 self._reverse_mapping[self._ssl] = self
1999 if socket is None:
2000 self._socket = None
2001 # Don't set up any gc for these, SSL_free will take care of them.
2002 self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem())
2003 _openssl_assert(self._into_ssl != _ffi.NULL)
2005 self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem())
2006 _openssl_assert(self._from_ssl != _ffi.NULL)
2008 _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl)
2009 else:
2010 self._into_ssl = None
2011 self._from_ssl = None
2012 self._socket = socket
2013 set_result = _lib.SSL_set_fd(
2014 self._ssl, _asFileDescriptor(self._socket)
2015 )
2016 _openssl_assert(set_result == 1)
2018 def __getattr__(self, name: str) -> Any:
2019 """
2020 Look up attributes on the wrapped socket object if they are not found
2021 on the Connection object.
2022 """
2023 if self._socket is None:
2024 raise AttributeError(
2025 f"'{self.__class__.__name__}' object has no attribute '{name}'"
2026 )
2027 else:
2028 return getattr(self._socket, name)
2030 def _raise_ssl_error(self, ssl: Any, result: int) -> None:
2031 if self._context._verify_helper is not None:
2032 self._context._verify_helper.raise_if_problem()
2033 if self._context._alpn_select_helper is not None:
2034 self._context._alpn_select_helper.raise_if_problem()
2035 if self._context._ocsp_helper is not None:
2036 self._context._ocsp_helper.raise_if_problem()
2038 error = _lib.SSL_get_error(ssl, result)
2039 if error == _lib.SSL_ERROR_WANT_READ:
2040 raise WantReadError()
2041 elif error == _lib.SSL_ERROR_WANT_WRITE:
2042 raise WantWriteError()
2043 elif error == _lib.SSL_ERROR_ZERO_RETURN:
2044 raise ZeroReturnError()
2045 elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP:
2046 # TODO: This is untested.
2047 raise WantX509LookupError()
2048 elif error == _lib.SSL_ERROR_SYSCALL:
2049 if platform == "win32":
2050 errno = _ffi.getwinerror()[0]
2051 else:
2052 errno = _ffi.errno
2053 if _lib.ERR_peek_error() == 0 or errno != 0:
2054 if result < 0 and errno != 0:
2055 raise SysCallError(errno, errorcode.get(errno))
2056 raise SysCallError(-1, "Unexpected EOF")
2057 else:
2058 # TODO: This is untested, but I think twisted hits it?
2059 _raise_current_error()
2060 elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0:
2061 # In 3.0.x an unexpected EOF no longer triggers syscall error
2062 # but we want to maintain compatibility so we check here and
2063 # raise syscall if it is an EOF. Since we're not actually sure
2064 # what else could raise SSL_ERROR_SSL we check for the presence
2065 # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING
2066 # and if it's not present we just raise an error, which matches
2067 # the behavior before we added this elif section
2068 peeked_error = _lib.ERR_peek_error()
2069 reason = _lib.ERR_GET_REASON(peeked_error)
2070 if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING:
2071 _openssl_assert(
2072 reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING
2073 )
2074 _lib.ERR_clear_error()
2075 raise SysCallError(-1, "Unexpected EOF")
2076 else:
2077 _raise_current_error()
2078 elif error == _lib.SSL_ERROR_NONE:
2079 pass
2080 else:
2081 _raise_current_error()
2083 def get_context(self) -> Context:
2084 """
2085 Retrieve the :class:`Context` object associated with this
2086 :class:`Connection`.
2087 """
2088 return self._context
2090 def set_context(self, context: Context) -> None:
2091 """
2092 Switch this connection to a new session context.
2094 :param context: A :class:`Context` instance giving the new session
2095 context to use.
2096 """
2097 if not isinstance(context, Context):
2098 raise TypeError("context must be a Context instance")
2100 _lib.SSL_set_SSL_CTX(self._ssl, context._context)
2101 self._context = context
2102 self._context._used = True
2104 def set_options(self, options: int) -> int:
2105 """
2106 Add options. Options set before are not cleared!
2107 This method should be used with the :const:`OP_*` constants.
2109 :param options: The options to add.
2110 :return: The new option bitmask.
2111 """
2112 if not isinstance(options, int):
2113 raise TypeError("options must be an integer")
2115 return _lib.SSL_set_options(self._ssl, options)
2117 def get_servername(self) -> bytes | None:
2118 """
2119 Retrieve the servername extension value if provided in the client hello
2120 message, or None if there wasn't one.
2122 :return: A byte string giving the server name or :data:`None`.
2124 .. versionadded:: 0.13
2125 """
2126 name = _lib.SSL_get_servername(
2127 self._ssl, _lib.TLSEXT_NAMETYPE_host_name
2128 )
2129 if name == _ffi.NULL:
2130 return None
2132 return _ffi.string(name)
2134 def set_verify(
2135 self, mode: int, callback: _VerifyCallback | None = None
2136 ) -> None:
2137 """
2138 Override the Context object's verification flags for this specific
2139 connection. See :py:meth:`Context.set_verify` for details.
2140 """
2141 if not isinstance(mode, int):
2142 raise TypeError("mode must be an integer")
2144 if callback is None:
2145 self._verify_helper = None
2146 self._verify_callback = None
2147 _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL)
2148 else:
2149 if not callable(callback):
2150 raise TypeError("callback must be callable")
2152 self._verify_helper = _VerifyHelper(callback)
2153 self._verify_callback = self._verify_helper.callback
2154 _lib.SSL_set_verify(self._ssl, mode, self._verify_callback)
2156 def get_verify_mode(self) -> int:
2157 """
2158 Retrieve the Connection object's verify mode, as set by
2159 :meth:`set_verify`.
2161 :return: The verify mode
2162 """
2163 return _lib.SSL_get_verify_mode(self._ssl)
2165 def use_certificate(self, cert: X509 | x509.Certificate) -> None:
2166 """
2167 Load a certificate from a X509 object
2169 :param cert: The X509 object
2170 :return: None
2171 """
2172 # Mirrored from Context.use_certificate
2173 if not isinstance(cert, X509):
2174 cert = X509.from_cryptography(cert)
2175 else:
2176 warnings.warn(
2177 (
2178 "Passing pyOpenSSL X509 objects is deprecated. You "
2179 "should use a cryptography.x509.Certificate instead."
2180 ),
2181 DeprecationWarning,
2182 stacklevel=2,
2183 )
2185 use_result = _lib.SSL_use_certificate(self._ssl, cert._x509)
2186 if not use_result:
2187 _raise_current_error()
2189 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None:
2190 """
2191 Load a private key from a PKey object
2193 :param pkey: The PKey object
2194 :return: None
2195 """
2196 # Mirrored from Context.use_privatekey
2197 if not isinstance(pkey, PKey):
2198 pkey = PKey.from_cryptography_key(pkey)
2199 else:
2200 warnings.warn(
2201 (
2202 "Passing pyOpenSSL PKey objects is deprecated. You "
2203 "should use a cryptography private key instead."
2204 ),
2205 DeprecationWarning,
2206 stacklevel=2,
2207 )
2209 use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey)
2210 if not use_result:
2211 self._context._raise_passphrase_exception()
2213 def set_ciphertext_mtu(self, mtu: int) -> None:
2214 """
2215 For DTLS, set the maximum UDP payload size (*not* including IP/UDP
2216 overhead).
2218 Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent
2219 OpenSSL from spontaneously clearing this.
2221 :param mtu: An integer giving the maximum transmission unit.
2223 .. versionadded:: 21.1
2224 """
2225 _lib.SSL_set_mtu(self._ssl, mtu)
2227 def get_cleartext_mtu(self) -> int:
2228 """
2229 For DTLS, get the maximum size of unencrypted data you can pass to
2230 :meth:`write` without exceeding the MTU (as passed to
2231 :meth:`set_ciphertext_mtu`).
2233 :return: The effective MTU as an integer.
2235 .. versionadded:: 21.1
2236 """
2238 if not hasattr(_lib, "DTLS_get_data_mtu"):
2239 raise NotImplementedError("requires OpenSSL 1.1.1 or better")
2240 return _lib.DTLS_get_data_mtu(self._ssl)
2242 def set_tlsext_host_name(self, name: bytes) -> None:
2243 """
2244 Set the value of the servername extension to send in the client hello.
2246 :param name: A byte string giving the name.
2248 .. versionadded:: 0.13
2249 """
2250 if not isinstance(name, bytes):
2251 raise TypeError("name must be a byte string")
2252 elif b"\0" in name:
2253 raise TypeError("name must not contain NUL byte")
2255 # XXX I guess this can fail sometimes?
2256 _lib.SSL_set_tlsext_host_name(self._ssl, name)
2258 def pending(self) -> int:
2259 """
2260 Get the number of bytes that can be safely read from the SSL buffer
2261 (**not** the underlying transport buffer).
2263 :return: The number of bytes available in the receive buffer.
2264 """
2265 return _lib.SSL_pending(self._ssl)
2267 def send(self, buf: _Buffer, flags: int = 0) -> int:
2268 """
2269 Send data on the connection. NOTE: If you get one of the WantRead,
2270 WantWrite or WantX509Lookup exceptions on this, you have to call the
2271 method again with the SAME buffer.
2273 :param buf: The string, buffer or memoryview to send
2274 :param flags: (optional) Included for compatibility with the socket
2275 API, the value is ignored
2276 :return: The number of bytes written
2277 """
2278 # Backward compatibility
2279 buf = _text_to_bytes_and_warn("buf", buf)
2281 with _ffi.from_buffer(buf) as data:
2282 # check len(buf) instead of len(data) for testability
2283 if len(buf) > 2147483647:
2284 raise ValueError(
2285 "Cannot send more than 2**31-1 bytes at once."
2286 )
2288 result = _lib.SSL_write(self._ssl, data, len(data))
2289 self._raise_ssl_error(self._ssl, result)
2291 return result
2293 write = send
2295 def sendall(self, buf: _Buffer, flags: int = 0) -> int:
2296 """
2297 Send "all" data on the connection. This calls send() repeatedly until
2298 all data is sent. If an error occurs, it's impossible to tell how much
2299 data has been sent.
2301 :param buf: The string, buffer or memoryview to send
2302 :param flags: (optional) Included for compatibility with the socket
2303 API, the value is ignored
2304 :return: The number of bytes written
2305 """
2306 buf = _text_to_bytes_and_warn("buf", buf)
2308 with _ffi.from_buffer(buf) as data:
2309 left_to_send = len(buf)
2310 total_sent = 0
2312 while left_to_send:
2313 # SSL_write's num arg is an int,
2314 # so we cannot send more than 2**31-1 bytes at once.
2315 result = _lib.SSL_write(
2316 self._ssl, data + total_sent, min(left_to_send, 2147483647)
2317 )
2318 self._raise_ssl_error(self._ssl, result)
2319 total_sent += result
2320 left_to_send -= result
2322 return total_sent
2324 def recv(self, bufsiz: int, flags: int | None = None) -> bytes:
2325 """
2326 Receive data on the connection.
2328 :param bufsiz: The maximum number of bytes to read
2329 :param flags: (optional) The only supported flag is ``MSG_PEEK``,
2330 all other flags are ignored.
2331 :return: The string read from the Connection
2332 """
2333 buf = _no_zero_allocator("char[]", bufsiz)
2334 if flags is not None and flags & socket.MSG_PEEK:
2335 result = _lib.SSL_peek(self._ssl, buf, bufsiz)
2336 else:
2337 result = _lib.SSL_read(self._ssl, buf, bufsiz)
2338 self._raise_ssl_error(self._ssl, result)
2339 return _ffi.buffer(buf, result)[:]
2341 read = recv
2343 def recv_into(
2344 self,
2345 buffer: Any, # collections.abc.Buffer once we use Python 3.12+
2346 nbytes: int | None = None,
2347 flags: int | None = None,
2348 ) -> int:
2349 """
2350 Receive data on the connection and copy it directly into the provided
2351 buffer, rather than creating a new string.
2353 :param buffer: The buffer to copy into.
2354 :param nbytes: (optional) The maximum number of bytes to read into the
2355 buffer. If not present, defaults to the size of the buffer. If
2356 larger than the size of the buffer, is reduced to the size of the
2357 buffer.
2358 :param flags: (optional) The only supported flag is ``MSG_PEEK``,
2359 all other flags are ignored.
2360 :return: The number of bytes read into the buffer.
2361 """
2362 if nbytes is None:
2363 nbytes = len(buffer)
2364 else:
2365 nbytes = min(nbytes, len(buffer))
2367 # We need to create a temporary buffer. This is annoying, it would be
2368 # better if we could pass memoryviews straight into the SSL_read call,
2369 # but right now we can't. Revisit this if CFFI gets that ability.
2370 buf = _no_zero_allocator("char[]", nbytes)
2371 if flags is not None and flags & socket.MSG_PEEK:
2372 result = _lib.SSL_peek(self._ssl, buf, nbytes)
2373 else:
2374 result = _lib.SSL_read(self._ssl, buf, nbytes)
2375 self._raise_ssl_error(self._ssl, result)
2377 # This strange line is all to avoid a memory copy. The buffer protocol
2378 # should allow us to assign a CFFI buffer to the LHS of this line, but
2379 # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
2380 # wrap it in a memoryview.
2381 buffer[:result] = memoryview(_ffi.buffer(buf, result))
2383 return result
2385 def _handle_bio_errors(self, bio: Any, result: int) -> typing.NoReturn:
2386 if _lib.BIO_should_retry(bio):
2387 if _lib.BIO_should_read(bio):
2388 raise WantReadError()
2389 elif _lib.BIO_should_write(bio):
2390 # TODO: This is untested.
2391 raise WantWriteError()
2392 elif _lib.BIO_should_io_special(bio):
2393 # TODO: This is untested. I think io_special means the socket
2394 # BIO has a not-yet connected socket.
2395 raise ValueError("BIO_should_io_special")
2396 else:
2397 # TODO: This is untested.
2398 raise ValueError("unknown bio failure")
2399 else:
2400 # TODO: This is untested.
2401 _raise_current_error()
2403 def bio_read(self, bufsiz: int) -> bytes:
2404 """
2405 If the Connection was created with a memory BIO, this method can be
2406 used to read bytes from the write end of that memory BIO. Many
2407 Connection methods will add bytes which must be read in this manner or
2408 the buffer will eventually fill up and the Connection will be able to
2409 take no further actions.
2411 :param bufsiz: The maximum number of bytes to read
2412 :return: The string read.
2413 """
2414 if self._from_ssl is None:
2415 raise TypeError("Connection sock was not None")
2417 if not isinstance(bufsiz, int):
2418 raise TypeError("bufsiz must be an integer")
2420 buf = _no_zero_allocator("char[]", bufsiz)
2421 result = _lib.BIO_read(self._from_ssl, buf, bufsiz)
2422 if result <= 0:
2423 self._handle_bio_errors(self._from_ssl, result)
2425 return _ffi.buffer(buf, result)[:]
2427 def bio_write(self, buf: _Buffer) -> int:
2428 """
2429 If the Connection was created with a memory BIO, this method can be
2430 used to add bytes to the read end of that memory BIO. The Connection
2431 can then read the bytes (for example, in response to a call to
2432 :meth:`recv`).
2434 :param buf: The string to put into the memory BIO.
2435 :return: The number of bytes written
2436 """
2437 buf = _text_to_bytes_and_warn("buf", buf)
2439 if self._into_ssl is None:
2440 raise TypeError("Connection sock was not None")
2442 with _ffi.from_buffer(buf) as data:
2443 result = _lib.BIO_write(self._into_ssl, data, len(data))
2444 if result <= 0:
2445 self._handle_bio_errors(self._into_ssl, result)
2446 return result
2448 def renegotiate(self) -> bool:
2449 """
2450 Renegotiate the session.
2452 :return: True if the renegotiation can be started, False otherwise
2453 """
2454 if not self.renegotiate_pending():
2455 _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1)
2456 return True
2457 return False
2459 def do_handshake(self) -> None:
2460 """
2461 Perform an SSL handshake (usually called after :meth:`renegotiate` or
2462 one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can
2463 raise the same exceptions as :meth:`send` and :meth:`recv`.
2465 :return: None.
2466 """
2467 result = _lib.SSL_do_handshake(self._ssl)
2468 self._raise_ssl_error(self._ssl, result)
2470 def renegotiate_pending(self) -> bool:
2471 """
2472 Check if there's a renegotiation in progress, it will return False once
2473 a renegotiation is finished.
2475 :return: Whether there's a renegotiation in progress
2476 """
2477 return _lib.SSL_renegotiate_pending(self._ssl) == 1
2479 def total_renegotiations(self) -> int:
2480 """
2481 Find out the total number of renegotiations.
2483 :return: The number of renegotiations.
2484 """
2485 return _lib.SSL_total_renegotiations(self._ssl)
2487 def connect(self, addr: Any) -> None:
2488 """
2489 Call the :meth:`connect` method of the underlying socket and set up SSL
2490 on the socket, using the :class:`Context` object supplied to this
2491 :class:`Connection` object at creation.
2493 :param addr: A remote address
2494 :return: What the socket's connect method returns
2495 """
2496 _lib.SSL_set_connect_state(self._ssl)
2497 return self._socket.connect(addr) # type: ignore[return-value, union-attr]
2499 def connect_ex(self, addr: Any) -> int:
2500 """
2501 Call the :meth:`connect_ex` method of the underlying socket and set up
2502 SSL on the socket, using the Context object supplied to this Connection
2503 object at creation. Note that if the :meth:`connect_ex` method of the
2504 socket doesn't return 0, SSL won't be initialized.
2506 :param addr: A remove address
2507 :return: What the socket's connect_ex method returns
2508 """
2509 connect_ex = self._socket.connect_ex # type: ignore[union-attr]
2510 self.set_connect_state()
2511 return connect_ex(addr)
2513 def accept(self) -> tuple[Connection, Any]:
2514 """
2515 Call the :meth:`accept` method of the underlying socket and set up SSL
2516 on the returned socket, using the Context object supplied to this
2517 :class:`Connection` object at creation.
2519 :return: A *(conn, addr)* pair where *conn* is the new
2520 :class:`Connection` object created, and *address* is as returned by
2521 the socket's :meth:`accept`.
2522 """
2523 client, addr = self._socket.accept() # type: ignore[union-attr]
2524 conn = Connection(self._context, client)
2525 conn.set_accept_state()
2526 return (conn, addr)
2528 def DTLSv1_listen(self) -> None:
2529 """
2530 Call the OpenSSL function DTLSv1_listen on this connection. See the
2531 OpenSSL manual for more details.
2533 :return: None
2534 """
2535 # Possible future extension: return the BIO_ADDR in some form.
2536 bio_addr = _lib.BIO_ADDR_new()
2537 try:
2538 result = _lib.DTLSv1_listen(self._ssl, bio_addr)
2539 finally:
2540 _lib.BIO_ADDR_free(bio_addr)
2541 # DTLSv1_listen is weird. A zero return value means 'didn't find a
2542 # ClientHello with valid cookie, but keep trying'. So basically
2543 # WantReadError. But it doesn't work correctly with _raise_ssl_error.
2544 # So we raise it manually instead.
2545 if self._cookie_generate_helper is not None:
2546 self._cookie_generate_helper.raise_if_problem()
2547 if self._cookie_verify_helper is not None:
2548 self._cookie_verify_helper.raise_if_problem()
2549 if result == 0:
2550 raise WantReadError()
2551 if result < 0:
2552 self._raise_ssl_error(self._ssl, result)
2554 def DTLSv1_get_timeout(self) -> int | None:
2555 """
2556 Determine when the DTLS SSL object next needs to perform internal
2557 processing due to the passage of time.
2559 When the returned number of seconds have passed, the
2560 :meth:`DTLSv1_handle_timeout` method needs to be called.
2562 :return: The time left in seconds before the next timeout or `None`
2563 if no timeout is currently active.
2564 """
2565 ptv_sec = _ffi.new("time_t *")
2566 ptv_usec = _ffi.new("long *")
2567 if _lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec):
2568 return ptv_sec[0] + (ptv_usec[0] / 1000000)
2569 else:
2570 return None
2572 def DTLSv1_handle_timeout(self) -> bool:
2573 """
2574 Handles any timeout events which have become pending on a DTLS SSL
2575 object.
2577 :return: `True` if there was a pending timeout, `False` otherwise.
2578 """
2579 result = _lib.DTLSv1_handle_timeout(self._ssl)
2580 if result < 0:
2581 self._raise_ssl_error(self._ssl, result)
2582 assert False, "unreachable"
2583 else:
2584 return bool(result)
2586 def bio_shutdown(self) -> None:
2587 """
2588 If the Connection was created with a memory BIO, this method can be
2589 used to indicate that *end of file* has been reached on the read end of
2590 that memory BIO.
2592 :return: None
2593 """
2594 if self._from_ssl is None:
2595 raise TypeError("Connection sock was not None")
2597 _lib.BIO_set_mem_eof_return(self._into_ssl, 0)
2599 def shutdown(self) -> bool:
2600 """
2601 Send the shutdown message to the Connection.
2603 :return: True if the shutdown completed successfully (i.e. both sides
2604 have sent closure alerts), False otherwise (in which case you
2605 call :meth:`recv` or :meth:`send` when the connection becomes
2606 readable/writeable).
2607 """
2608 result = _lib.SSL_shutdown(self._ssl)
2609 if result < 0:
2610 self._raise_ssl_error(self._ssl, result)
2611 assert False, "unreachable"
2612 elif result > 0:
2613 return True
2614 else:
2615 return False
2617 def get_cipher_list(self) -> list[str]:
2618 """
2619 Retrieve the list of ciphers used by the Connection object.
2621 :return: A list of native cipher strings.
2622 """
2623 ciphers = []
2624 for i in count():
2625 result = _lib.SSL_get_cipher_list(self._ssl, i)
2626 if result == _ffi.NULL:
2627 break
2628 ciphers.append(_ffi.string(result).decode("utf-8"))
2629 return ciphers
2631 def get_client_ca_list(self) -> list[X509Name]:
2632 """
2633 Get CAs whose certificates are suggested for client authentication.
2635 :return: If this is a server connection, the list of certificate
2636 authorities that will be sent or has been sent to the client, as
2637 controlled by this :class:`Connection`'s :class:`Context`.
2639 If this is a client connection, the list will be empty until the
2640 connection with the server is established.
2642 .. versionadded:: 0.10
2643 """
2644 ca_names = _lib.SSL_get_client_CA_list(self._ssl)
2645 if ca_names == _ffi.NULL:
2646 # TODO: This is untested.
2647 return []
2649 result = []
2650 for i in range(_lib.sk_X509_NAME_num(ca_names)):
2651 name = _lib.sk_X509_NAME_value(ca_names, i)
2652 copy = _lib.X509_NAME_dup(name)
2653 _openssl_assert(copy != _ffi.NULL)
2655 pyname = X509Name.__new__(X509Name)
2656 pyname._name = _ffi.gc(copy, _lib.X509_NAME_free)
2657 result.append(pyname)
2658 return result
2660 def makefile(self, *args: Any, **kwargs: Any) -> typing.NoReturn:
2661 """
2662 The makefile() method is not implemented, since there is no dup
2663 semantics for SSL connections
2665 :raise: NotImplementedError
2666 """
2667 raise NotImplementedError(
2668 "Cannot make file object of OpenSSL.SSL.Connection"
2669 )
2671 def get_app_data(self) -> Any:
2672 """
2673 Retrieve application data as set by :meth:`set_app_data`.
2675 :return: The application data
2676 """
2677 return self._app_data
2679 def set_app_data(self, data: Any) -> None:
2680 """
2681 Set application data
2683 :param data: The application data
2684 :return: None
2685 """
2686 self._app_data = data
2688 def get_shutdown(self) -> int:
2689 """
2690 Get the shutdown state of the Connection.
2692 :return: The shutdown state, a bitvector of SENT_SHUTDOWN,
2693 RECEIVED_SHUTDOWN.
2694 """
2695 return _lib.SSL_get_shutdown(self._ssl)
2697 def set_shutdown(self, state: int) -> None:
2698 """
2699 Set the shutdown state of the Connection.
2701 :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN.
2702 :return: None
2703 """
2704 if not isinstance(state, int):
2705 raise TypeError("state must be an integer")
2707 _lib.SSL_set_shutdown(self._ssl, state)
2709 def get_state_string(self) -> bytes:
2710 """
2711 Retrieve a verbose string detailing the state of the Connection.
2713 :return: A string representing the state
2714 """
2715 return _ffi.string(_lib.SSL_state_string_long(self._ssl))
2717 def server_random(self) -> bytes | None:
2718 """
2719 Retrieve the random value used with the server hello message.
2721 :return: A string representing the state
2722 """
2723 session = _lib.SSL_get_session(self._ssl)
2724 if session == _ffi.NULL:
2725 return None
2726 length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0)
2727 _openssl_assert(length > 0)
2728 outp = _no_zero_allocator("unsigned char[]", length)
2729 _lib.SSL_get_server_random(self._ssl, outp, length)
2730 return _ffi.buffer(outp, length)[:]
2732 def client_random(self) -> bytes | None:
2733 """
2734 Retrieve the random value used with the client hello message.
2736 :return: A string representing the state
2737 """
2738 session = _lib.SSL_get_session(self._ssl)
2739 if session == _ffi.NULL:
2740 return None
2742 length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0)
2743 _openssl_assert(length > 0)
2744 outp = _no_zero_allocator("unsigned char[]", length)
2745 _lib.SSL_get_client_random(self._ssl, outp, length)
2746 return _ffi.buffer(outp, length)[:]
2748 def master_key(self) -> bytes | None:
2749 """
2750 Retrieve the value of the master key for this session.
2752 :return: A string representing the state
2753 """
2754 session = _lib.SSL_get_session(self._ssl)
2755 if session == _ffi.NULL:
2756 return None
2758 length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0)
2759 _openssl_assert(length > 0)
2760 outp = _no_zero_allocator("unsigned char[]", length)
2761 _lib.SSL_SESSION_get_master_key(session, outp, length)
2762 return _ffi.buffer(outp, length)[:]
2764 def export_keying_material(
2765 self, label: bytes, olen: int, context: bytes | None = None
2766 ) -> bytes:
2767 """
2768 Obtain keying material for application use.
2770 :param: label - a disambiguating label string as described in RFC 5705
2771 :param: olen - the length of the exported key material in bytes
2772 :param: context - a per-association context value
2773 :return: the exported key material bytes or None
2774 """
2775 outp = _no_zero_allocator("unsigned char[]", olen)
2776 context_buf = _ffi.NULL
2777 context_len = 0
2778 use_context = 0
2779 if context is not None:
2780 context_buf = context
2781 context_len = len(context)
2782 use_context = 1
2783 success = _lib.SSL_export_keying_material(
2784 self._ssl,
2785 outp,
2786 olen,
2787 label,
2788 len(label),
2789 context_buf,
2790 context_len,
2791 use_context,
2792 )
2793 _openssl_assert(success == 1)
2794 return _ffi.buffer(outp, olen)[:]
2796 def sock_shutdown(self, *args: Any, **kwargs: Any) -> None:
2797 """
2798 Call the :meth:`shutdown` method of the underlying socket.
2799 See :manpage:`shutdown(2)`.
2801 :return: What the socket's shutdown() method returns
2802 """
2803 return self._socket.shutdown(*args, **kwargs) # type: ignore[return-value, union-attr]
2805 @typing.overload
2806 def get_certificate(
2807 self, *, as_cryptography: typing.Literal[True]
2808 ) -> x509.Certificate | None:
2809 pass
2811 @typing.overload
2812 def get_certificate(
2813 self, *, as_cryptography: typing.Literal[False] = False
2814 ) -> X509 | None:
2815 pass
2817 def get_certificate(
2818 self,
2819 *,
2820 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2821 ) -> X509 | x509.Certificate | None:
2822 """
2823 Retrieve the local certificate (if any)
2825 :param bool as_cryptography: Controls whether a
2826 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509``
2827 object should be returned.
2829 :return: The local certificate
2830 """
2831 cert = _lib.SSL_get_certificate(self._ssl)
2832 if cert != _ffi.NULL:
2833 _lib.X509_up_ref(cert)
2834 pycert = X509._from_raw_x509_ptr(cert)
2835 if as_cryptography:
2836 return pycert.to_cryptography()
2837 return pycert
2838 return None
2840 @typing.overload
2841 def get_peer_certificate(
2842 self, *, as_cryptography: typing.Literal[True]
2843 ) -> x509.Certificate | None:
2844 pass
2846 @typing.overload
2847 def get_peer_certificate(
2848 self, *, as_cryptography: typing.Literal[False] = False
2849 ) -> X509 | None:
2850 pass
2852 def get_peer_certificate(
2853 self,
2854 *,
2855 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2856 ) -> X509 | x509.Certificate | None:
2857 """
2858 Retrieve the other side's certificate (if any)
2860 :param bool as_cryptography: Controls whether a
2861 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509``
2862 object should be returned.
2864 :return: The peer's certificate
2865 """
2866 cert = _lib.SSL_get_peer_certificate(self._ssl)
2867 if cert != _ffi.NULL:
2868 pycert = X509._from_raw_x509_ptr(cert)
2869 if as_cryptography:
2870 return pycert.to_cryptography()
2871 return pycert
2872 return None
2874 @staticmethod
2875 def _cert_stack_to_list(cert_stack: Any) -> list[X509]:
2876 """
2877 Internal helper to convert a STACK_OF(X509) to a list of X509
2878 instances.
2879 """
2880 result = []
2881 for i in range(_lib.sk_X509_num(cert_stack)):
2882 cert = _lib.sk_X509_value(cert_stack, i)
2883 _openssl_assert(cert != _ffi.NULL)
2884 res = _lib.X509_up_ref(cert)
2885 _openssl_assert(res >= 1)
2886 pycert = X509._from_raw_x509_ptr(cert)
2887 result.append(pycert)
2888 return result
2890 @staticmethod
2891 def _cert_stack_to_cryptography_list(
2892 cert_stack: Any,
2893 ) -> list[x509.Certificate]:
2894 """
2895 Internal helper to convert a STACK_OF(X509) to a list of X509
2896 instances.
2897 """
2898 result = []
2899 for i in range(_lib.sk_X509_num(cert_stack)):
2900 cert = _lib.sk_X509_value(cert_stack, i)
2901 _openssl_assert(cert != _ffi.NULL)
2902 res = _lib.X509_up_ref(cert)
2903 _openssl_assert(res >= 1)
2904 pycert = X509._from_raw_x509_ptr(cert)
2905 result.append(pycert.to_cryptography())
2906 return result
2908 @typing.overload
2909 def get_peer_cert_chain(
2910 self, *, as_cryptography: typing.Literal[True]
2911 ) -> list[x509.Certificate] | None:
2912 pass
2914 @typing.overload
2915 def get_peer_cert_chain(
2916 self, *, as_cryptography: typing.Literal[False] = False
2917 ) -> list[X509] | None:
2918 pass
2920 def get_peer_cert_chain(
2921 self,
2922 *,
2923 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2924 ) -> list[X509] | list[x509.Certificate] | None:
2925 """
2926 Retrieve the other side's certificate (if any)
2928 :param bool as_cryptography: Controls whether a list of
2929 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509``
2930 object should be returned.
2932 :return: A list of X509 instances giving the peer's certificate chain,
2933 or None if it does not have one.
2934 """
2935 cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl)
2936 if cert_stack == _ffi.NULL:
2937 return None
2939 if as_cryptography:
2940 return self._cert_stack_to_cryptography_list(cert_stack)
2941 return self._cert_stack_to_list(cert_stack)
2943 @typing.overload
2944 def get_verified_chain(
2945 self, *, as_cryptography: typing.Literal[True]
2946 ) -> list[x509.Certificate] | None:
2947 pass
2949 @typing.overload
2950 def get_verified_chain(
2951 self, *, as_cryptography: typing.Literal[False] = False
2952 ) -> list[X509] | None:
2953 pass
2955 def get_verified_chain(
2956 self,
2957 *,
2958 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2959 ) -> list[X509] | list[x509.Certificate] | None:
2960 """
2961 Retrieve the verified certificate chain of the peer including the
2962 peer's end entity certificate. It must be called after a session has
2963 been successfully established. If peer verification was not successful
2964 the chain may be incomplete, invalid, or None.
2966 :param bool as_cryptography: Controls whether a list of
2967 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509``
2968 object should be returned.
2970 :return: A list of X509 instances giving the peer's verified
2971 certificate chain, or None if it does not have one.
2973 .. versionadded:: 20.0
2974 """
2975 # OpenSSL 1.1+
2976 cert_stack = _lib.SSL_get0_verified_chain(self._ssl)
2977 if cert_stack == _ffi.NULL:
2978 return None
2980 if as_cryptography:
2981 return self._cert_stack_to_cryptography_list(cert_stack)
2982 return self._cert_stack_to_list(cert_stack)
2984 def want_read(self) -> bool:
2985 """
2986 Checks if more data has to be read from the transport layer to complete
2987 an operation.
2989 :return: True iff more data has to be read
2990 """
2991 return _lib.SSL_want_read(self._ssl)
2993 def want_write(self) -> bool:
2994 """
2995 Checks if there is data to write to the transport layer to complete an
2996 operation.
2998 :return: True iff there is data to write
2999 """
3000 return _lib.SSL_want_write(self._ssl)
3002 def set_accept_state(self) -> None:
3003 """
3004 Set the connection to work in server mode. The handshake will be
3005 handled automatically by read/write.
3007 :return: None
3008 """
3009 _lib.SSL_set_accept_state(self._ssl)
3011 def set_connect_state(self) -> None:
3012 """
3013 Set the connection to work in client mode. The handshake will be
3014 handled automatically by read/write.
3016 :return: None
3017 """
3018 _lib.SSL_set_connect_state(self._ssl)
3020 def get_session(self) -> Session | None:
3021 """
3022 Returns the Session currently used.
3024 :return: An instance of :class:`OpenSSL.SSL.Session` or
3025 :obj:`None` if no session exists.
3027 .. versionadded:: 0.14
3028 """
3029 session = _lib.SSL_get1_session(self._ssl)
3030 if session == _ffi.NULL:
3031 return None
3033 pysession = Session.__new__(Session)
3034 pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free)
3035 return pysession
3037 def set_session(self, session: Session) -> None:
3038 """
3039 Set the session to be used when the TLS/SSL connection is established.
3041 :param session: A Session instance representing the session to use.
3042 :returns: None
3044 .. versionadded:: 0.14
3045 """
3046 if not isinstance(session, Session):
3047 raise TypeError("session must be a Session instance")
3049 result = _lib.SSL_set_session(self._ssl, session._session)
3050 _openssl_assert(result == 1)
3052 def _get_finished_message(
3053 self, function: Callable[[Any, Any, int], int]
3054 ) -> bytes | None:
3055 """
3056 Helper to implement :meth:`get_finished` and
3057 :meth:`get_peer_finished`.
3059 :param function: Either :data:`SSL_get_finished`: or
3060 :data:`SSL_get_peer_finished`.
3062 :return: :data:`None` if the desired message has not yet been
3063 received, otherwise the contents of the message.
3064 """
3065 # The OpenSSL documentation says nothing about what might happen if the
3066 # count argument given is zero. Specifically, it doesn't say whether
3067 # the output buffer may be NULL in that case or not. Inspection of the
3068 # implementation reveals that it calls memcpy() unconditionally.
3069 # Section 7.1.4, paragraph 1 of the C standard suggests that
3070 # memcpy(NULL, source, 0) is not guaranteed to produce defined (let
3071 # alone desirable) behavior (though it probably does on just about
3072 # every implementation...)
3073 #
3074 # Allocate a tiny buffer to pass in (instead of just passing NULL as
3075 # one might expect) for the initial call so as to be safe against this
3076 # potentially undefined behavior.
3077 empty = _ffi.new("char[]", 0)
3078 size = function(self._ssl, empty, 0)
3079 if size == 0:
3080 # No Finished message so far.
3081 return None
3083 buf = _no_zero_allocator("char[]", size)
3084 function(self._ssl, buf, size)
3085 return _ffi.buffer(buf, size)[:]
3087 def get_finished(self) -> bytes | None:
3088 """
3089 Obtain the latest TLS Finished message that we sent.
3091 :return: The contents of the message or :obj:`None` if the TLS
3092 handshake has not yet completed.
3094 .. versionadded:: 0.15
3095 """
3096 return self._get_finished_message(_lib.SSL_get_finished)
3098 def get_peer_finished(self) -> bytes | None:
3099 """
3100 Obtain the latest TLS Finished message that we received from the peer.
3102 :return: The contents of the message or :obj:`None` if the TLS
3103 handshake has not yet completed.
3105 .. versionadded:: 0.15
3106 """
3107 return self._get_finished_message(_lib.SSL_get_peer_finished)
3109 def get_cipher_name(self) -> str | None:
3110 """
3111 Obtain the name of the currently used cipher.
3113 :returns: The name of the currently used cipher or :obj:`None`
3114 if no connection has been established.
3116 .. versionadded:: 0.15
3117 """
3118 cipher = _lib.SSL_get_current_cipher(self._ssl)
3119 if cipher == _ffi.NULL:
3120 return None
3121 else:
3122 name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher))
3123 return name.decode("utf-8")
3125 def get_cipher_bits(self) -> int | None:
3126 """
3127 Obtain the number of secret bits of the currently used cipher.
3129 :returns: The number of secret bits of the currently used cipher
3130 or :obj:`None` if no connection has been established.
3132 .. versionadded:: 0.15
3133 """
3134 cipher = _lib.SSL_get_current_cipher(self._ssl)
3135 if cipher == _ffi.NULL:
3136 return None
3137 else:
3138 return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL)
3140 def get_cipher_version(self) -> str | None:
3141 """
3142 Obtain the protocol version of the currently used cipher.
3144 :returns: The protocol name of the currently used cipher
3145 or :obj:`None` if no connection has been established.
3147 .. versionadded:: 0.15
3148 """
3149 cipher = _lib.SSL_get_current_cipher(self._ssl)
3150 if cipher == _ffi.NULL:
3151 return None
3152 else:
3153 version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher))
3154 return version.decode("utf-8")
3156 def get_protocol_version_name(self) -> str:
3157 """
3158 Retrieve the protocol version of the current connection.
3160 :returns: The TLS version of the current connection, for example
3161 the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown``
3162 for connections that were not successfully established.
3163 """
3164 version = _ffi.string(_lib.SSL_get_version(self._ssl))
3165 return version.decode("utf-8")
3167 def get_protocol_version(self) -> int:
3168 """
3169 Retrieve the SSL or TLS protocol version of the current connection.
3171 :returns: The TLS version of the current connection. For example,
3172 it will return ``0x769`` for connections made over TLS version 1.
3173 """
3174 version = _lib.SSL_version(self._ssl)
3175 return version
3177 def set_alpn_protos(self, protos: list[bytes]) -> None:
3178 """
3179 Specify the client's ALPN protocol list.
3181 These protocols are offered to the server during protocol negotiation.
3183 :param protos: A list of the protocols to be offered to the server.
3184 This list should be a Python list of bytestrings representing the
3185 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
3186 """
3187 # Different versions of OpenSSL are inconsistent about how they handle
3188 # empty proto lists (see #1043), so we avoid the problem entirely by
3189 # rejecting them ourselves.
3190 if not protos:
3191 raise ValueError("at least one protocol must be specified")
3193 # Take the list of protocols and join them together, prefixing them
3194 # with their lengths.
3195 protostr = b"".join(
3196 chain.from_iterable((bytes((len(p),)), p) for p in protos)
3197 )
3199 # Build a C string from the list. We don't need to save this off
3200 # because OpenSSL immediately copies the data out.
3201 input_str = _ffi.new("unsigned char[]", protostr)
3203 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
3204 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
3205 # return 0 on success, and non-0 on failure.
3206 # WARNING: these functions reverse the return value convention.
3207 _openssl_assert(
3208 _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0
3209 )
3211 def get_alpn_proto_negotiated(self) -> bytes:
3212 """
3213 Get the protocol that was negotiated by ALPN.
3215 :returns: A bytestring of the protocol name. If no protocol has been
3216 negotiated yet, returns an empty bytestring.
3217 """
3218 data = _ffi.new("unsigned char **")
3219 data_len = _ffi.new("unsigned int *")
3221 _lib.SSL_get0_alpn_selected(self._ssl, data, data_len)
3223 if not data_len:
3224 return b""
3226 return _ffi.buffer(data[0], data_len[0])[:]
3228 def get_selected_srtp_profile(self) -> bytes:
3229 """
3230 Get the SRTP protocol which was negotiated.
3232 :returns: A bytestring of the SRTP profile name. If no profile has been
3233 negotiated yet, returns an empty bytestring.
3234 """
3235 profile = _lib.SSL_get_selected_srtp_profile(self._ssl)
3236 if not profile:
3237 return b""
3239 return _ffi.string(profile.name)
3241 @_requires_ssl_get0_group_name
3242 def get_group_name(self) -> str | None:
3243 """
3244 Get the name of the negotiated group for the key exchange.
3246 :return: A string giving the group name or :data:`None`.
3247 """
3248 # Do not remove this guard.
3249 # SSL_get0_group_name crashes with a segfault if called without
3250 # an established connection (should return NULL but doesn't).
3251 session = _lib.SSL_get_session(self._ssl)
3252 if session == _ffi.NULL:
3253 return None
3255 group_name = _lib.SSL_get0_group_name(self._ssl)
3256 if group_name == _ffi.NULL:
3257 return None
3259 return _ffi.string(group_name).decode("utf-8")
3261 def request_ocsp(self) -> None:
3262 """
3263 Called to request that the server sends stapled OCSP data, if
3264 available. If this is not called on the client side then the server
3265 will not send OCSP data. Should be used in conjunction with
3266 :meth:`Context.set_ocsp_client_callback`.
3267 """
3268 rc = _lib.SSL_set_tlsext_status_type(
3269 self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp
3270 )
3271 _openssl_assert(rc == 1)
3273 def set_info_callback(
3274 self, callback: Callable[[Connection, int, int], None]
3275 ) -> None:
3276 """
3277 Set the information callback to *callback*. This function will be
3278 called from time to time during SSL handshakes.
3280 :param callback: The Python callback to use. This should take three
3281 arguments: a Connection object and two integers. The first integer
3282 specifies where in the SSL handshake the function was called, and
3283 the other the return code from a (possibly failed) internal
3284 function call.
3285 :return: None
3286 """
3288 @wraps(callback)
3289 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def]
3290 callback(Connection._reverse_mapping[ssl], where, return_code)
3292 self._info_callback = _ffi.callback(
3293 "void (*)(const SSL *, int, int)", wrapper
3294 )
3295 _lib.SSL_set_info_callback(self._ssl, self._info_callback)