Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/OpenSSL/SSL.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import socket
5import typing
6import warnings
7from collections.abc import Sequence
8from errno import errorcode
9from functools import partial, wraps
10from itertools import chain, count
11from sys import platform
12from typing import Any, Callable, Optional, TypeVar
13from weakref import WeakValueDictionary
15from cryptography import x509
16from cryptography.hazmat.primitives.asymmetric import ec
18from OpenSSL._util import (
19 StrOrBytesPath as _StrOrBytesPath,
20)
21from OpenSSL._util import (
22 exception_from_error_queue as _exception_from_error_queue,
23)
24from OpenSSL._util import (
25 ffi as _ffi,
26)
27from OpenSSL._util import (
28 lib as _lib,
29)
30from OpenSSL._util import (
31 make_assert as _make_assert,
32)
33from OpenSSL._util import (
34 no_zero_allocator as _no_zero_allocator,
35)
36from OpenSSL._util import (
37 path_bytes as _path_bytes,
38)
39from OpenSSL._util import (
40 text_to_bytes_and_warn as _text_to_bytes_and_warn,
41)
42from OpenSSL.crypto import (
43 FILETYPE_PEM,
44 X509,
45 PKey,
46 X509Name,
47 X509Store,
48 _EllipticCurve,
49 _PassphraseHelper,
50 _PrivateKey,
51)
53__all__ = [
54 "DTLS_CLIENT_METHOD",
55 "DTLS_METHOD",
56 "DTLS_SERVER_METHOD",
57 "MODE_RELEASE_BUFFERS",
58 "NO_OVERLAPPING_PROTOCOLS",
59 "OPENSSL_BUILT_ON",
60 "OPENSSL_CFLAGS",
61 "OPENSSL_DIR",
62 "OPENSSL_PLATFORM",
63 "OPENSSL_VERSION",
64 "OPENSSL_VERSION_NUMBER",
65 "OP_ALL",
66 "OP_CIPHER_SERVER_PREFERENCE",
67 "OP_COOKIE_EXCHANGE",
68 "OP_DONT_INSERT_EMPTY_FRAGMENTS",
69 "OP_EPHEMERAL_RSA",
70 "OP_MICROSOFT_BIG_SSLV3_BUFFER",
71 "OP_MICROSOFT_SESS_ID_BUG",
72 "OP_MSIE_SSLV2_RSA_PADDING",
73 "OP_NETSCAPE_CA_DN_BUG",
74 "OP_NETSCAPE_CHALLENGE_BUG",
75 "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG",
76 "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG",
77 "OP_NO_COMPRESSION",
78 "OP_NO_QUERY_MTU",
79 "OP_NO_TICKET",
80 "OP_PKCS1_CHECK_1",
81 "OP_PKCS1_CHECK_2",
82 "OP_SINGLE_DH_USE",
83 "OP_SINGLE_ECDH_USE",
84 "OP_SSLEAY_080_CLIENT_DH_BUG",
85 "OP_SSLREF2_REUSE_CERT_TYPE_BUG",
86 "OP_TLS_BLOCK_PADDING_BUG",
87 "OP_TLS_D5_BUG",
88 "OP_TLS_ROLLBACK_BUG",
89 "RECEIVED_SHUTDOWN",
90 "SENT_SHUTDOWN",
91 "SESS_CACHE_BOTH",
92 "SESS_CACHE_CLIENT",
93 "SESS_CACHE_NO_AUTO_CLEAR",
94 "SESS_CACHE_NO_INTERNAL",
95 "SESS_CACHE_NO_INTERNAL_LOOKUP",
96 "SESS_CACHE_NO_INTERNAL_STORE",
97 "SESS_CACHE_OFF",
98 "SESS_CACHE_SERVER",
99 "SSL3_VERSION",
100 "SSLEAY_BUILT_ON",
101 "SSLEAY_CFLAGS",
102 "SSLEAY_DIR",
103 "SSLEAY_PLATFORM",
104 "SSLEAY_VERSION",
105 "SSL_CB_ACCEPT_EXIT",
106 "SSL_CB_ACCEPT_LOOP",
107 "SSL_CB_ALERT",
108 "SSL_CB_CONNECT_EXIT",
109 "SSL_CB_CONNECT_LOOP",
110 "SSL_CB_EXIT",
111 "SSL_CB_HANDSHAKE_DONE",
112 "SSL_CB_HANDSHAKE_START",
113 "SSL_CB_LOOP",
114 "SSL_CB_READ",
115 "SSL_CB_READ_ALERT",
116 "SSL_CB_WRITE",
117 "SSL_CB_WRITE_ALERT",
118 "SSL_ST_ACCEPT",
119 "SSL_ST_CONNECT",
120 "SSL_ST_MASK",
121 "TLS1_1_VERSION",
122 "TLS1_2_VERSION",
123 "TLS1_3_VERSION",
124 "TLS1_VERSION",
125 "TLS_CLIENT_METHOD",
126 "TLS_METHOD",
127 "TLS_SERVER_METHOD",
128 "VERIFY_CLIENT_ONCE",
129 "VERIFY_FAIL_IF_NO_PEER_CERT",
130 "VERIFY_NONE",
131 "VERIFY_PEER",
132 "Connection",
133 "Context",
134 "Error",
135 "OP_NO_SSLv2",
136 "OP_NO_SSLv3",
137 "OP_NO_TLSv1",
138 "OP_NO_TLSv1_1",
139 "OP_NO_TLSv1_2",
140 "OP_NO_TLSv1_3",
141 "SSLeay_version",
142 "SSLv23_METHOD",
143 "Session",
144 "SysCallError",
145 "TLSv1_1_METHOD",
146 "TLSv1_2_METHOD",
147 "TLSv1_METHOD",
148 "WantReadError",
149 "WantWriteError",
150 "WantX509LookupError",
151 "X509VerificationCodes",
152 "ZeroReturnError",
153]
156OPENSSL_VERSION_NUMBER: int = _lib.OPENSSL_VERSION_NUMBER
157OPENSSL_VERSION: int = _lib.OPENSSL_VERSION
158OPENSSL_CFLAGS: int = _lib.OPENSSL_CFLAGS
159OPENSSL_PLATFORM: int = _lib.OPENSSL_PLATFORM
160OPENSSL_DIR: int = _lib.OPENSSL_DIR
161OPENSSL_BUILT_ON: int = _lib.OPENSSL_BUILT_ON
163SSLEAY_VERSION = OPENSSL_VERSION
164SSLEAY_CFLAGS = OPENSSL_CFLAGS
165SSLEAY_PLATFORM = OPENSSL_PLATFORM
166SSLEAY_DIR = OPENSSL_DIR
167SSLEAY_BUILT_ON = OPENSSL_BUILT_ON
169SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN
170RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN
172SSLv23_METHOD = 3
173TLSv1_METHOD = 4
174TLSv1_1_METHOD = 5
175TLSv1_2_METHOD = 6
176TLS_METHOD = 7
177TLS_SERVER_METHOD = 8
178TLS_CLIENT_METHOD = 9
179DTLS_METHOD = 10
180DTLS_SERVER_METHOD = 11
181DTLS_CLIENT_METHOD = 12
183SSL3_VERSION: int = _lib.SSL3_VERSION
184TLS1_VERSION: int = _lib.TLS1_VERSION
185TLS1_1_VERSION: int = _lib.TLS1_1_VERSION
186TLS1_2_VERSION: int = _lib.TLS1_2_VERSION
187TLS1_3_VERSION: int = _lib.TLS1_3_VERSION
189OP_NO_SSLv2: int = _lib.SSL_OP_NO_SSLv2
190OP_NO_SSLv3: int = _lib.SSL_OP_NO_SSLv3
191OP_NO_TLSv1: int = _lib.SSL_OP_NO_TLSv1
192OP_NO_TLSv1_1: int = _lib.SSL_OP_NO_TLSv1_1
193OP_NO_TLSv1_2: int = _lib.SSL_OP_NO_TLSv1_2
194OP_NO_TLSv1_3: int = _lib.SSL_OP_NO_TLSv1_3
196MODE_RELEASE_BUFFERS: int = _lib.SSL_MODE_RELEASE_BUFFERS
198OP_SINGLE_DH_USE: int = _lib.SSL_OP_SINGLE_DH_USE
199OP_SINGLE_ECDH_USE: int = _lib.SSL_OP_SINGLE_ECDH_USE
200OP_EPHEMERAL_RSA: int = _lib.SSL_OP_EPHEMERAL_RSA
201OP_MICROSOFT_SESS_ID_BUG: int = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG
202OP_NETSCAPE_CHALLENGE_BUG: int = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG
203OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG: int = (
204 _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG
205)
206OP_SSLREF2_REUSE_CERT_TYPE_BUG: int = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG
207OP_MICROSOFT_BIG_SSLV3_BUFFER: int = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
208OP_MSIE_SSLV2_RSA_PADDING: int = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING
209OP_SSLEAY_080_CLIENT_DH_BUG: int = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG
210OP_TLS_D5_BUG: int = _lib.SSL_OP_TLS_D5_BUG
211OP_TLS_BLOCK_PADDING_BUG: int = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG
212OP_DONT_INSERT_EMPTY_FRAGMENTS: int = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS
213OP_CIPHER_SERVER_PREFERENCE: int = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE
214OP_TLS_ROLLBACK_BUG: int = _lib.SSL_OP_TLS_ROLLBACK_BUG
215OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1
216OP_PKCS1_CHECK_2: int = _lib.SSL_OP_PKCS1_CHECK_2
217OP_NETSCAPE_CA_DN_BUG: int = _lib.SSL_OP_NETSCAPE_CA_DN_BUG
218OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG: int = (
219 _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG
220)
221OP_NO_COMPRESSION: int = _lib.SSL_OP_NO_COMPRESSION
223OP_NO_QUERY_MTU: int = _lib.SSL_OP_NO_QUERY_MTU
224OP_COOKIE_EXCHANGE: int = _lib.SSL_OP_COOKIE_EXCHANGE
225OP_NO_TICKET: int = _lib.SSL_OP_NO_TICKET
227try:
228 OP_NO_RENEGOTIATION: int = _lib.SSL_OP_NO_RENEGOTIATION
229 __all__.append("OP_NO_RENEGOTIATION")
230except AttributeError:
231 pass
233try:
234 OP_IGNORE_UNEXPECTED_EOF: int = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF
235 __all__.append("OP_IGNORE_UNEXPECTED_EOF")
236except AttributeError:
237 pass
239try:
240 OP_LEGACY_SERVER_CONNECT: int = _lib.SSL_OP_LEGACY_SERVER_CONNECT
241 __all__.append("OP_LEGACY_SERVER_CONNECT")
242except AttributeError:
243 pass
245OP_ALL: int = _lib.SSL_OP_ALL
247VERIFY_PEER: int = _lib.SSL_VERIFY_PEER
248VERIFY_FAIL_IF_NO_PEER_CERT: int = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT
249VERIFY_CLIENT_ONCE: int = _lib.SSL_VERIFY_CLIENT_ONCE
250VERIFY_NONE: int = _lib.SSL_VERIFY_NONE
252SESS_CACHE_OFF: int = _lib.SSL_SESS_CACHE_OFF
253SESS_CACHE_CLIENT: int = _lib.SSL_SESS_CACHE_CLIENT
254SESS_CACHE_SERVER: int = _lib.SSL_SESS_CACHE_SERVER
255SESS_CACHE_BOTH: int = _lib.SSL_SESS_CACHE_BOTH
256SESS_CACHE_NO_AUTO_CLEAR: int = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR
257SESS_CACHE_NO_INTERNAL_LOOKUP: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP
258SESS_CACHE_NO_INTERNAL_STORE: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE
259SESS_CACHE_NO_INTERNAL: int = _lib.SSL_SESS_CACHE_NO_INTERNAL
261SSL_ST_CONNECT: int = _lib.SSL_ST_CONNECT
262SSL_ST_ACCEPT: int = _lib.SSL_ST_ACCEPT
263SSL_ST_MASK: int = _lib.SSL_ST_MASK
265SSL_CB_LOOP: int = _lib.SSL_CB_LOOP
266SSL_CB_EXIT: int = _lib.SSL_CB_EXIT
267SSL_CB_READ: int = _lib.SSL_CB_READ
268SSL_CB_WRITE: int = _lib.SSL_CB_WRITE
269SSL_CB_ALERT: int = _lib.SSL_CB_ALERT
270SSL_CB_READ_ALERT: int = _lib.SSL_CB_READ_ALERT
271SSL_CB_WRITE_ALERT: int = _lib.SSL_CB_WRITE_ALERT
272SSL_CB_ACCEPT_LOOP: int = _lib.SSL_CB_ACCEPT_LOOP
273SSL_CB_ACCEPT_EXIT: int = _lib.SSL_CB_ACCEPT_EXIT
274SSL_CB_CONNECT_LOOP: int = _lib.SSL_CB_CONNECT_LOOP
275SSL_CB_CONNECT_EXIT: int = _lib.SSL_CB_CONNECT_EXIT
276SSL_CB_HANDSHAKE_START: int = _lib.SSL_CB_HANDSHAKE_START
277SSL_CB_HANDSHAKE_DONE: int = _lib.SSL_CB_HANDSHAKE_DONE
279_T = TypeVar("_T")
282class _NoOverlappingProtocols:
283 pass
286NO_OVERLAPPING_PROTOCOLS = _NoOverlappingProtocols()
288# Callback types.
289_ALPNSelectCallback = Callable[
290 [
291 "Connection",
292 typing.List[bytes],
293 ],
294 typing.Union[bytes, _NoOverlappingProtocols],
295]
296_CookieGenerateCallback = Callable[["Connection"], bytes]
297_CookieVerifyCallback = Callable[["Connection", bytes], bool]
298_OCSPClientCallback = Callable[["Connection", bytes, Optional[_T]], bool]
299_OCSPServerCallback = Callable[["Connection", Optional[_T]], bytes]
300_PassphraseCallback = Callable[[int, bool, Optional[_T]], bytes]
301_VerifyCallback = Callable[["Connection", X509, int, int, int], bool]
304class X509VerificationCodes:
305 """
306 Success and error codes for X509 verification, as returned by the
307 underlying ``X509_STORE_CTX_get_error()`` function and passed by pyOpenSSL
308 to verification callback functions.
310 See `OpenSSL Verification Errors
311 <https://www.openssl.org/docs/manmaster/man3/X509_verify_cert_error_string.html#ERROR-CODES>`_
312 for details.
313 """
315 OK = _lib.X509_V_OK
316 ERR_UNABLE_TO_GET_ISSUER_CERT = _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT
317 ERR_UNABLE_TO_GET_CRL = _lib.X509_V_ERR_UNABLE_TO_GET_CRL
318 ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE = (
319 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE
320 )
321 ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE = (
322 _lib.X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE
323 )
324 ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY = (
325 _lib.X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY
326 )
327 ERR_CERT_SIGNATURE_FAILURE = _lib.X509_V_ERR_CERT_SIGNATURE_FAILURE
328 ERR_CRL_SIGNATURE_FAILURE = _lib.X509_V_ERR_CRL_SIGNATURE_FAILURE
329 ERR_CERT_NOT_YET_VALID = _lib.X509_V_ERR_CERT_NOT_YET_VALID
330 ERR_CERT_HAS_EXPIRED = _lib.X509_V_ERR_CERT_HAS_EXPIRED
331 ERR_CRL_NOT_YET_VALID = _lib.X509_V_ERR_CRL_NOT_YET_VALID
332 ERR_CRL_HAS_EXPIRED = _lib.X509_V_ERR_CRL_HAS_EXPIRED
333 ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD = (
334 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD
335 )
336 ERR_ERROR_IN_CERT_NOT_AFTER_FIELD = (
337 _lib.X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD
338 )
339 ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD = (
340 _lib.X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD
341 )
342 ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD = (
343 _lib.X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD
344 )
345 ERR_OUT_OF_MEM = _lib.X509_V_ERR_OUT_OF_MEM
346 ERR_DEPTH_ZERO_SELF_SIGNED_CERT = (
347 _lib.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT
348 )
349 ERR_SELF_SIGNED_CERT_IN_CHAIN = _lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN
350 ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY = (
351 _lib.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY
352 )
353 ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE = (
354 _lib.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
355 )
356 ERR_CERT_CHAIN_TOO_LONG = _lib.X509_V_ERR_CERT_CHAIN_TOO_LONG
357 ERR_CERT_REVOKED = _lib.X509_V_ERR_CERT_REVOKED
358 ERR_INVALID_CA = _lib.X509_V_ERR_INVALID_CA
359 ERR_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PATH_LENGTH_EXCEEDED
360 ERR_INVALID_PURPOSE = _lib.X509_V_ERR_INVALID_PURPOSE
361 ERR_CERT_UNTRUSTED = _lib.X509_V_ERR_CERT_UNTRUSTED
362 ERR_CERT_REJECTED = _lib.X509_V_ERR_CERT_REJECTED
363 ERR_SUBJECT_ISSUER_MISMATCH = _lib.X509_V_ERR_SUBJECT_ISSUER_MISMATCH
364 ERR_AKID_SKID_MISMATCH = _lib.X509_V_ERR_AKID_SKID_MISMATCH
365 ERR_AKID_ISSUER_SERIAL_MISMATCH = (
366 _lib.X509_V_ERR_AKID_ISSUER_SERIAL_MISMATCH
367 )
368 ERR_KEYUSAGE_NO_CERTSIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CERTSIGN
369 ERR_UNABLE_TO_GET_CRL_ISSUER = _lib.X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER
370 ERR_UNHANDLED_CRITICAL_EXTENSION = (
371 _lib.X509_V_ERR_UNHANDLED_CRITICAL_EXTENSION
372 )
373 ERR_KEYUSAGE_NO_CRL_SIGN = _lib.X509_V_ERR_KEYUSAGE_NO_CRL_SIGN
374 ERR_UNHANDLED_CRITICAL_CRL_EXTENSION = (
375 _lib.X509_V_ERR_UNHANDLED_CRITICAL_CRL_EXTENSION
376 )
377 ERR_INVALID_NON_CA = _lib.X509_V_ERR_INVALID_NON_CA
378 ERR_PROXY_PATH_LENGTH_EXCEEDED = _lib.X509_V_ERR_PROXY_PATH_LENGTH_EXCEEDED
379 ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE = (
380 _lib.X509_V_ERR_KEYUSAGE_NO_DIGITAL_SIGNATURE
381 )
382 ERR_PROXY_CERTIFICATES_NOT_ALLOWED = (
383 _lib.X509_V_ERR_PROXY_CERTIFICATES_NOT_ALLOWED
384 )
385 ERR_INVALID_EXTENSION = _lib.X509_V_ERR_INVALID_EXTENSION
386 ERR_INVALID_POLICY_EXTENSION = _lib.X509_V_ERR_INVALID_POLICY_EXTENSION
387 ERR_NO_EXPLICIT_POLICY = _lib.X509_V_ERR_NO_EXPLICIT_POLICY
388 ERR_DIFFERENT_CRL_SCOPE = _lib.X509_V_ERR_DIFFERENT_CRL_SCOPE
389 ERR_UNSUPPORTED_EXTENSION_FEATURE = (
390 _lib.X509_V_ERR_UNSUPPORTED_EXTENSION_FEATURE
391 )
392 ERR_UNNESTED_RESOURCE = _lib.X509_V_ERR_UNNESTED_RESOURCE
393 ERR_PERMITTED_VIOLATION = _lib.X509_V_ERR_PERMITTED_VIOLATION
394 ERR_EXCLUDED_VIOLATION = _lib.X509_V_ERR_EXCLUDED_VIOLATION
395 ERR_SUBTREE_MINMAX = _lib.X509_V_ERR_SUBTREE_MINMAX
396 ERR_UNSUPPORTED_CONSTRAINT_TYPE = (
397 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_TYPE
398 )
399 ERR_UNSUPPORTED_CONSTRAINT_SYNTAX = (
400 _lib.X509_V_ERR_UNSUPPORTED_CONSTRAINT_SYNTAX
401 )
402 ERR_UNSUPPORTED_NAME_SYNTAX = _lib.X509_V_ERR_UNSUPPORTED_NAME_SYNTAX
403 ERR_CRL_PATH_VALIDATION_ERROR = _lib.X509_V_ERR_CRL_PATH_VALIDATION_ERROR
404 ERR_HOSTNAME_MISMATCH = _lib.X509_V_ERR_HOSTNAME_MISMATCH
405 ERR_EMAIL_MISMATCH = _lib.X509_V_ERR_EMAIL_MISMATCH
406 ERR_IP_ADDRESS_MISMATCH = _lib.X509_V_ERR_IP_ADDRESS_MISMATCH
407 ERR_APPLICATION_VERIFICATION = _lib.X509_V_ERR_APPLICATION_VERIFICATION
410# Taken from https://golang.org/src/crypto/x509/root_linux.go
411_CERTIFICATE_FILE_LOCATIONS = [
412 "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc.
413 "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6
414 "/etc/ssl/ca-bundle.pem", # OpenSUSE
415 "/etc/pki/tls/cacert.pem", # OpenELEC
416 "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7
417]
419_CERTIFICATE_PATH_LOCATIONS = [
420 "/etc/ssl/certs", # SLES10/SLES11
421]
423# These values are compared to output from cffi's ffi.string so they must be
424# byte strings.
425_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs"
426_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem"
429class Error(Exception):
430 """
431 An error occurred in an `OpenSSL.SSL` API.
432 """
435_raise_current_error = partial(_exception_from_error_queue, Error)
436_openssl_assert = _make_assert(Error)
439class WantReadError(Error):
440 pass
443class WantWriteError(Error):
444 pass
447class WantX509LookupError(Error):
448 pass
451class ZeroReturnError(Error):
452 pass
455class SysCallError(Error):
456 pass
459class _CallbackExceptionHelper:
460 """
461 A base class for wrapper classes that allow for intelligent exception
462 handling in OpenSSL callbacks.
464 :ivar list _problems: Any exceptions that occurred while executing in a
465 context where they could not be raised in the normal way. Typically
466 this is because OpenSSL has called into some Python code and requires a
467 return value. The exceptions are saved to be raised later when it is
468 possible to do so.
469 """
471 def __init__(self) -> None:
472 self._problems: list[Exception] = []
474 def raise_if_problem(self) -> None:
475 """
476 Raise an exception from the OpenSSL error queue or that was previously
477 captured whe running a callback.
478 """
479 if self._problems:
480 try:
481 _raise_current_error()
482 except Error:
483 pass
484 raise self._problems.pop(0)
487class _VerifyHelper(_CallbackExceptionHelper):
488 """
489 Wrap a callback such that it can be used as a certificate verification
490 callback.
491 """
493 def __init__(self, callback: _VerifyCallback) -> None:
494 _CallbackExceptionHelper.__init__(self)
496 @wraps(callback)
497 def wrapper(ok, store_ctx): # type: ignore[no-untyped-def]
498 x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
499 _lib.X509_up_ref(x509)
500 cert = X509._from_raw_x509_ptr(x509)
501 error_number = _lib.X509_STORE_CTX_get_error(store_ctx)
502 error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx)
504 index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx()
505 ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index)
506 connection = Connection._reverse_mapping[ssl]
508 try:
509 result = callback(
510 connection, cert, error_number, error_depth, ok
511 )
512 except Exception as e:
513 self._problems.append(e)
514 return 0
515 else:
516 if result:
517 _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK)
518 return 1
519 else:
520 return 0
522 self.callback = _ffi.callback(
523 "int (*)(int, X509_STORE_CTX *)", wrapper
524 )
527class _ALPNSelectHelper(_CallbackExceptionHelper):
528 """
529 Wrap a callback such that it can be used as an ALPN selection callback.
530 """
532 def __init__(self, callback: _ALPNSelectCallback) -> None:
533 _CallbackExceptionHelper.__init__(self)
535 @wraps(callback)
536 def wrapper(ssl, out, outlen, in_, inlen, arg): # type: ignore[no-untyped-def]
537 try:
538 conn = Connection._reverse_mapping[ssl]
540 # The string passed to us is made up of multiple
541 # length-prefixed bytestrings. We need to split that into a
542 # list.
543 instr = _ffi.buffer(in_, inlen)[:]
544 protolist = []
545 while instr:
546 encoded_len = instr[0]
547 proto = instr[1 : encoded_len + 1]
548 protolist.append(proto)
549 instr = instr[encoded_len + 1 :]
551 # Call the callback
552 outbytes = callback(conn, protolist)
553 any_accepted = True
554 if outbytes is NO_OVERLAPPING_PROTOCOLS:
555 outbytes = b""
556 any_accepted = False
557 elif not isinstance(outbytes, bytes):
558 raise TypeError(
559 "ALPN callback must return a bytestring or the "
560 "special NO_OVERLAPPING_PROTOCOLS sentinel value."
561 )
563 # Save our callback arguments on the connection object to make
564 # sure that they don't get freed before OpenSSL can use them.
565 # Then, return them in the appropriate output parameters.
566 conn._alpn_select_callback_args = [
567 _ffi.new("unsigned char *", len(outbytes)),
568 _ffi.new("unsigned char[]", outbytes),
569 ]
570 outlen[0] = conn._alpn_select_callback_args[0][0]
571 out[0] = conn._alpn_select_callback_args[1]
572 if not any_accepted:
573 return _lib.SSL_TLSEXT_ERR_NOACK
574 return _lib.SSL_TLSEXT_ERR_OK
575 except Exception as e:
576 self._problems.append(e)
577 return _lib.SSL_TLSEXT_ERR_ALERT_FATAL
579 self.callback = _ffi.callback(
580 (
581 "int (*)(SSL *, unsigned char **, unsigned char *, "
582 "const unsigned char *, unsigned int, void *)"
583 ),
584 wrapper,
585 )
588class _OCSPServerCallbackHelper(_CallbackExceptionHelper):
589 """
590 Wrap a callback such that it can be used as an OCSP callback for the server
591 side.
593 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
594 ways. For servers, that callback is expected to retrieve some OCSP data and
595 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
596 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
597 is expected to check the OCSP data, and returns a negative value on error,
598 0 if the response is not acceptable, or positive if it is. These are
599 mutually exclusive return code behaviours, and they mean that we need two
600 helpers so that we always return an appropriate error code if the user's
601 code throws an exception.
603 Given that we have to have two helpers anyway, these helpers are a bit more
604 helpery than most: specifically, they hide a few more of the OpenSSL
605 functions so that the user has an easier time writing these callbacks.
607 This helper implements the server side.
608 """
610 def __init__(self, callback: _OCSPServerCallback[Any]) -> None:
611 _CallbackExceptionHelper.__init__(self)
613 @wraps(callback)
614 def wrapper(ssl, cdata): # type: ignore[no-untyped-def]
615 try:
616 conn = Connection._reverse_mapping[ssl]
618 # Extract the data if any was provided.
619 if cdata != _ffi.NULL:
620 data = _ffi.from_handle(cdata)
621 else:
622 data = None
624 # Call the callback.
625 ocsp_data = callback(conn, data)
627 if not isinstance(ocsp_data, bytes):
628 raise TypeError("OCSP callback must return a bytestring.")
630 # If the OCSP data was provided, we will pass it to OpenSSL.
631 # However, we have an early exit here: if no OCSP data was
632 # provided we will just exit out and tell OpenSSL that there
633 # is nothing to do.
634 if not ocsp_data:
635 return 3 # SSL_TLSEXT_ERR_NOACK
637 # OpenSSL takes ownership of this data and expects it to have
638 # been allocated by OPENSSL_malloc.
639 ocsp_data_length = len(ocsp_data)
640 data_ptr = _lib.OPENSSL_malloc(ocsp_data_length)
641 _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data
643 _lib.SSL_set_tlsext_status_ocsp_resp(
644 ssl, data_ptr, ocsp_data_length
645 )
647 return 0
648 except Exception as e:
649 self._problems.append(e)
650 return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
652 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
655class _OCSPClientCallbackHelper(_CallbackExceptionHelper):
656 """
657 Wrap a callback such that it can be used as an OCSP callback for the client
658 side.
660 Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
661 ways. For servers, that callback is expected to retrieve some OCSP data and
662 hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
663 SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
664 is expected to check the OCSP data, and returns a negative value on error,
665 0 if the response is not acceptable, or positive if it is. These are
666 mutually exclusive return code behaviours, and they mean that we need two
667 helpers so that we always return an appropriate error code if the user's
668 code throws an exception.
670 Given that we have to have two helpers anyway, these helpers are a bit more
671 helpery than most: specifically, they hide a few more of the OpenSSL
672 functions so that the user has an easier time writing these callbacks.
674 This helper implements the client side.
675 """
677 def __init__(self, callback: _OCSPClientCallback[Any]) -> None:
678 _CallbackExceptionHelper.__init__(self)
680 @wraps(callback)
681 def wrapper(ssl, cdata): # type: ignore[no-untyped-def]
682 try:
683 conn = Connection._reverse_mapping[ssl]
685 # Extract the data if any was provided.
686 if cdata != _ffi.NULL:
687 data = _ffi.from_handle(cdata)
688 else:
689 data = None
691 # Get the OCSP data.
692 ocsp_ptr = _ffi.new("unsigned char **")
693 ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr)
694 if ocsp_len < 0:
695 # No OCSP data.
696 ocsp_data = b""
697 else:
698 # Copy the OCSP data, then pass it to the callback.
699 ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:]
701 valid = callback(conn, ocsp_data, data)
703 # Return 1 on success or 0 on error.
704 return int(bool(valid))
706 except Exception as e:
707 self._problems.append(e)
708 # Return negative value if an exception is hit.
709 return -1
711 self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
714class _CookieGenerateCallbackHelper(_CallbackExceptionHelper):
715 def __init__(self, callback: _CookieGenerateCallback) -> None:
716 _CallbackExceptionHelper.__init__(self)
718 @wraps(callback)
719 def wrapper(ssl, out, outlen): # type: ignore[no-untyped-def]
720 try:
721 conn = Connection._reverse_mapping[ssl]
722 cookie = callback(conn)
723 out[0 : len(cookie)] = cookie
724 outlen[0] = len(cookie)
725 return 1
726 except Exception as e:
727 self._problems.append(e)
728 # "a zero return value can be used to abort the handshake"
729 return 0
731 self.callback = _ffi.callback(
732 "int (*)(SSL *, unsigned char *, unsigned int *)",
733 wrapper,
734 )
737class _CookieVerifyCallbackHelper(_CallbackExceptionHelper):
738 def __init__(self, callback: _CookieVerifyCallback) -> None:
739 _CallbackExceptionHelper.__init__(self)
741 @wraps(callback)
742 def wrapper(ssl, c_cookie, cookie_len): # type: ignore[no-untyped-def]
743 try:
744 conn = Connection._reverse_mapping[ssl]
745 return callback(conn, bytes(c_cookie[0:cookie_len]))
746 except Exception as e:
747 self._problems.append(e)
748 return 0
750 self.callback = _ffi.callback(
751 "int (*)(SSL *, unsigned char *, unsigned int)",
752 wrapper,
753 )
756def _asFileDescriptor(obj: Any) -> int:
757 fd = None
758 if not isinstance(obj, int):
759 meth = getattr(obj, "fileno", None)
760 if meth is not None:
761 obj = meth()
763 if isinstance(obj, int):
764 fd = obj
766 if not isinstance(fd, int):
767 raise TypeError("argument must be an int, or have a fileno() method.")
768 elif fd < 0:
769 raise ValueError(
770 f"file descriptor cannot be a negative integer ({fd:i})"
771 )
773 return fd
776def OpenSSL_version(type: int) -> bytes:
777 """
778 Return a string describing the version of OpenSSL in use.
780 :param type: One of the :const:`OPENSSL_` constants defined in this module.
781 """
782 return _ffi.string(_lib.OpenSSL_version(type))
785SSLeay_version = OpenSSL_version
788def _make_requires(flag: int, error: str) -> Callable[[_T], _T]:
789 """
790 Builds a decorator that ensures that functions that rely on OpenSSL
791 functions that are not present in this build raise NotImplementedError,
792 rather than AttributeError coming out of cryptography.
794 :param flag: A cryptography flag that guards the functions, e.g.
795 ``Cryptography_HAS_NEXTPROTONEG``.
796 :param error: The string to be used in the exception if the flag is false.
797 """
799 def _requires_decorator(func): # type: ignore[no-untyped-def]
800 if not flag:
802 @wraps(func)
803 def explode(*args, **kwargs): # type: ignore[no-untyped-def]
804 raise NotImplementedError(error)
806 return explode
807 else:
808 return func
810 return _requires_decorator
813_requires_keylog = _make_requires(
814 getattr(_lib, "Cryptography_HAS_KEYLOG", 0), "Key logging not available"
815)
818class Session:
819 """
820 A class representing an SSL session. A session defines certain connection
821 parameters which may be re-used to speed up the setup of subsequent
822 connections.
824 .. versionadded:: 0.14
825 """
827 _session: Any
830F = TypeVar("F", bound=Callable[..., Any])
833def _require_not_used(f: F) -> F:
834 @wraps(f)
835 def inner(self: Context, *args: Any, **kwargs: Any) -> Any:
836 if self._used:
837 warnings.warn(
838 (
839 "Attempting to mutate a Context after a Connection was "
840 "created. In the future, this will raise an exception"
841 ),
842 DeprecationWarning,
843 stacklevel=2,
844 )
845 return f(self, *args, **kwargs)
847 return typing.cast(F, inner)
850class Context:
851 """
852 :class:`OpenSSL.SSL.Context` instances define the parameters for setting
853 up new SSL connections.
855 :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD,
856 DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD.
857 SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should
858 not be used.
859 """
861 _methods: typing.ClassVar[
862 dict[int, tuple[Callable[[], Any], int | None]]
863 ] = {
864 SSLv23_METHOD: (_lib.TLS_method, None),
865 TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION),
866 TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION),
867 TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION),
868 TLS_METHOD: (_lib.TLS_method, None),
869 TLS_SERVER_METHOD: (_lib.TLS_server_method, None),
870 TLS_CLIENT_METHOD: (_lib.TLS_client_method, None),
871 DTLS_METHOD: (_lib.DTLS_method, None),
872 DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None),
873 DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None),
874 }
876 def __init__(self, method: int) -> None:
877 if not isinstance(method, int):
878 raise TypeError("method must be an integer")
880 try:
881 method_func, version = self._methods[method]
882 except KeyError:
883 raise ValueError("No such protocol")
885 method_obj = method_func()
886 _openssl_assert(method_obj != _ffi.NULL)
888 context = _lib.SSL_CTX_new(method_obj)
889 _openssl_assert(context != _ffi.NULL)
890 context = _ffi.gc(context, _lib.SSL_CTX_free)
892 self._context = context
893 self._used = False
894 self._passphrase_helper: _PassphraseHelper | None = None
895 self._passphrase_callback: _PassphraseCallback[Any] | None = None
896 self._passphrase_userdata: Any | None = None
897 self._verify_helper: _VerifyHelper | None = None
898 self._verify_callback: _VerifyCallback | None = None
899 self._info_callback = None
900 self._keylog_callback = None
901 self._tlsext_servername_callback = None
902 self._app_data = None
903 self._alpn_select_helper: _ALPNSelectHelper | None = None
904 self._alpn_select_callback: _ALPNSelectCallback | None = None
905 self._ocsp_helper: (
906 _OCSPClientCallbackHelper | _OCSPServerCallbackHelper | None
907 ) = None
908 self._ocsp_callback: (
909 _OCSPClientCallback[Any] | _OCSPServerCallback[Any] | None
910 ) = None
911 self._ocsp_data: Any | None = None
912 self._cookie_generate_helper: _CookieGenerateCallbackHelper | None = (
913 None
914 )
915 self._cookie_verify_helper: _CookieVerifyCallbackHelper | None = None
917 self.set_mode(_lib.SSL_MODE_ENABLE_PARTIAL_WRITE)
918 if version is not None:
919 self.set_min_proto_version(version)
920 self.set_max_proto_version(version)
922 @_require_not_used
923 def set_min_proto_version(self, version: int) -> None:
924 """
925 Set the minimum supported protocol version. Setting the minimum
926 version to 0 will enable protocol versions down to the lowest version
927 supported by the library.
929 If the underlying OpenSSL build is missing support for the selected
930 version, this method will raise an exception.
931 """
932 _openssl_assert(
933 _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1
934 )
936 @_require_not_used
937 def set_max_proto_version(self, version: int) -> None:
938 """
939 Set the maximum supported protocol version. Setting the maximum
940 version to 0 will enable protocol versions up to the highest version
941 supported by the library.
943 If the underlying OpenSSL build is missing support for the selected
944 version, this method will raise an exception.
945 """
946 _openssl_assert(
947 _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1
948 )
950 @_require_not_used
951 def load_verify_locations(
952 self,
953 cafile: _StrOrBytesPath | None,
954 capath: _StrOrBytesPath | None = None,
955 ) -> None:
956 """
957 Let SSL know where we can find trusted certificates for the certificate
958 chain. Note that the certificates have to be in PEM format.
960 If capath is passed, it must be a directory prepared using the
961 ``c_rehash`` tool included with OpenSSL. Either, but not both, of
962 *pemfile* or *capath* may be :data:`None`.
964 :param cafile: In which file we can find the certificates (``bytes`` or
965 ``str``).
966 :param capath: In which directory we can find the certificates
967 (``bytes`` or ``str``).
969 :return: None
970 """
971 if cafile is None:
972 cafile = _ffi.NULL
973 else:
974 cafile = _path_bytes(cafile)
976 if capath is None:
977 capath = _ffi.NULL
978 else:
979 capath = _path_bytes(capath)
981 load_result = _lib.SSL_CTX_load_verify_locations(
982 self._context, cafile, capath
983 )
984 if not load_result:
985 _raise_current_error()
987 def _wrap_callback(
988 self, callback: _PassphraseCallback[_T]
989 ) -> _PassphraseHelper:
990 @wraps(callback)
991 def wrapper(size: int, verify: bool, userdata: Any) -> bytes:
992 return callback(size, verify, self._passphrase_userdata)
994 return _PassphraseHelper(
995 FILETYPE_PEM, wrapper, more_args=True, truncate=True
996 )
998 @_require_not_used
999 def set_passwd_cb(
1000 self,
1001 callback: _PassphraseCallback[_T],
1002 userdata: _T | None = None,
1003 ) -> None:
1004 """
1005 Set the passphrase callback. This function will be called
1006 when a private key with a passphrase is loaded.
1008 :param callback: The Python callback to use. This must accept three
1009 positional arguments. First, an integer giving the maximum length
1010 of the passphrase it may return. If the returned passphrase is
1011 longer than this, it will be truncated. Second, a boolean value
1012 which will be true if the user should be prompted for the
1013 passphrase twice and the callback should verify that the two values
1014 supplied are equal. Third, the value given as the *userdata*
1015 parameter to :meth:`set_passwd_cb`. The *callback* must return
1016 a byte string. If an error occurs, *callback* should return a false
1017 value (e.g. an empty string).
1018 :param userdata: (optional) A Python object which will be given as
1019 argument to the callback
1020 :return: None
1021 """
1022 if not callable(callback):
1023 raise TypeError("callback must be callable")
1025 self._passphrase_helper = self._wrap_callback(callback)
1026 self._passphrase_callback = self._passphrase_helper.callback
1027 _lib.SSL_CTX_set_default_passwd_cb(
1028 self._context, self._passphrase_callback
1029 )
1030 self._passphrase_userdata = userdata
1032 @_require_not_used
1033 def set_default_verify_paths(self) -> None:
1034 """
1035 Specify that the platform provided CA certificates are to be used for
1036 verification purposes. This method has some caveats related to the
1037 binary wheels that cryptography (pyOpenSSL's primary dependency) ships:
1039 * macOS will only load certificates using this method if the user has
1040 the ``openssl@1.1`` `Homebrew <https://brew.sh>`_ formula installed
1041 in the default location.
1042 * Windows will not work.
1043 * manylinux cryptography wheels will work on most common Linux
1044 distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the
1045 manylinux wheel and attempts to load roots via a fallback path.
1047 :return: None
1048 """
1049 # SSL_CTX_set_default_verify_paths will attempt to load certs from
1050 # both a cafile and capath that are set at compile time. However,
1051 # it will first check environment variables and, if present, load
1052 # those paths instead
1053 set_result = _lib.SSL_CTX_set_default_verify_paths(self._context)
1054 _openssl_assert(set_result == 1)
1055 # After attempting to set default_verify_paths we need to know whether
1056 # to go down the fallback path.
1057 # First we'll check to see if any env vars have been set. If so,
1058 # we won't try to do anything else because the user has set the path
1059 # themselves.
1060 if not self._check_env_vars_set("SSL_CERT_DIR", "SSL_CERT_FILE"):
1061 default_dir = _ffi.string(_lib.X509_get_default_cert_dir())
1062 default_file = _ffi.string(_lib.X509_get_default_cert_file())
1063 # Now we check to see if the default_dir and default_file are set
1064 # to the exact values we use in our manylinux builds. If they are
1065 # then we know to load the fallbacks
1066 if (
1067 default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR
1068 and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE
1069 ):
1070 # This is manylinux, let's load our fallback paths
1071 self._fallback_default_verify_paths(
1072 _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS
1073 )
1075 def _check_env_vars_set(self, dir_env_var: str, file_env_var: str) -> bool:
1076 """
1077 Check to see if the default cert dir/file environment vars are present.
1079 :return: bool
1080 """
1081 return (
1082 os.environ.get(file_env_var) is not None
1083 or os.environ.get(dir_env_var) is not None
1084 )
1086 def _fallback_default_verify_paths(
1087 self, file_path: list[str], dir_path: list[str]
1088 ) -> None:
1089 """
1090 Default verify paths are based on the compiled version of OpenSSL.
1091 However, when pyca/cryptography is compiled as a manylinux wheel
1092 that compiled location can potentially be wrong. So, like Go, we
1093 will try a predefined set of paths and attempt to load roots
1094 from there.
1096 :return: None
1097 """
1098 for cafile in file_path:
1099 if os.path.isfile(cafile):
1100 self.load_verify_locations(cafile)
1101 break
1103 for capath in dir_path:
1104 if os.path.isdir(capath):
1105 self.load_verify_locations(None, capath)
1106 break
1108 @_require_not_used
1109 def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None:
1110 """
1111 Load a certificate chain from a file.
1113 :param certfile: The name of the certificate chain file (``bytes`` or
1114 ``str``). Must be PEM encoded.
1116 :return: None
1117 """
1118 certfile = _path_bytes(certfile)
1120 result = _lib.SSL_CTX_use_certificate_chain_file(
1121 self._context, certfile
1122 )
1123 if not result:
1124 _raise_current_error()
1126 @_require_not_used
1127 def use_certificate_file(
1128 self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM
1129 ) -> None:
1130 """
1131 Load a certificate from a file
1133 :param certfile: The name of the certificate file (``bytes`` or
1134 ``str``).
1135 :param filetype: (optional) The encoding of the file, which is either
1136 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
1137 :const:`FILETYPE_PEM`.
1139 :return: None
1140 """
1141 certfile = _path_bytes(certfile)
1142 if not isinstance(filetype, int):
1143 raise TypeError("filetype must be an integer")
1145 use_result = _lib.SSL_CTX_use_certificate_file(
1146 self._context, certfile, filetype
1147 )
1148 if not use_result:
1149 _raise_current_error()
1151 @_require_not_used
1152 def use_certificate(self, cert: X509 | x509.Certificate) -> None:
1153 """
1154 Load a certificate from a X509 object
1156 :param cert: The X509 object
1157 :return: None
1158 """
1159 # Mirrored at Connection.use_certificate
1160 if not isinstance(cert, X509):
1161 cert = X509.from_cryptography(cert)
1162 else:
1163 warnings.warn(
1164 (
1165 "Passing pyOpenSSL X509 objects is deprecated. You "
1166 "should use a cryptography.x509.Certificate instead."
1167 ),
1168 DeprecationWarning,
1169 stacklevel=2,
1170 )
1172 use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509)
1173 if not use_result:
1174 _raise_current_error()
1176 @_require_not_used
1177 def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None:
1178 """
1179 Add certificate to chain
1181 :param certobj: The X509 certificate object to add to the chain
1182 :return: None
1183 """
1184 if not isinstance(certobj, X509):
1185 certobj = X509.from_cryptography(certobj)
1186 else:
1187 warnings.warn(
1188 (
1189 "Passing pyOpenSSL X509 objects is deprecated. You "
1190 "should use a cryptography.x509.Certificate instead."
1191 ),
1192 DeprecationWarning,
1193 stacklevel=2,
1194 )
1196 copy = _lib.X509_dup(certobj._x509)
1197 add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy)
1198 if not add_result:
1199 # TODO: This is untested.
1200 _lib.X509_free(copy)
1201 _raise_current_error()
1203 def _raise_passphrase_exception(self) -> None:
1204 if self._passphrase_helper is not None:
1205 self._passphrase_helper.raise_if_problem(Error)
1207 _raise_current_error()
1209 @_require_not_used
1210 def use_privatekey_file(
1211 self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM
1212 ) -> None:
1213 """
1214 Load a private key from a file
1216 :param keyfile: The name of the key file (``bytes`` or ``str``)
1217 :param filetype: (optional) The encoding of the file, which is either
1218 :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
1219 :const:`FILETYPE_PEM`.
1221 :return: None
1222 """
1223 keyfile = _path_bytes(keyfile)
1225 if not isinstance(filetype, int):
1226 raise TypeError("filetype must be an integer")
1228 use_result = _lib.SSL_CTX_use_PrivateKey_file(
1229 self._context, keyfile, filetype
1230 )
1231 if not use_result:
1232 self._raise_passphrase_exception()
1234 @_require_not_used
1235 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None:
1236 """
1237 Load a private key from a PKey object
1239 :param pkey: The PKey object
1240 :return: None
1241 """
1242 # Mirrored at Connection.use_privatekey
1243 if not isinstance(pkey, PKey):
1244 pkey = PKey.from_cryptography_key(pkey)
1245 else:
1246 warnings.warn(
1247 (
1248 "Passing pyOpenSSL PKey objects is deprecated. You "
1249 "should use a cryptography private key instead."
1250 ),
1251 DeprecationWarning,
1252 stacklevel=2,
1253 )
1255 use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey)
1256 if not use_result:
1257 self._raise_passphrase_exception()
1259 def check_privatekey(self) -> None:
1260 """
1261 Check if the private key (loaded with :meth:`use_privatekey`) matches
1262 the certificate (loaded with :meth:`use_certificate`)
1264 :return: :data:`None` (raises :exc:`Error` if something's wrong)
1265 """
1266 if not _lib.SSL_CTX_check_private_key(self._context):
1267 _raise_current_error()
1269 @_require_not_used
1270 def load_client_ca(self, cafile: bytes) -> None:
1271 """
1272 Load the trusted certificates that will be sent to the client. Does
1273 not actually imply any of the certificates are trusted; that must be
1274 configured separately.
1276 :param bytes cafile: The path to a certificates file in PEM format.
1277 :return: None
1278 """
1279 ca_list = _lib.SSL_load_client_CA_file(
1280 _text_to_bytes_and_warn("cafile", cafile)
1281 )
1282 _openssl_assert(ca_list != _ffi.NULL)
1283 _lib.SSL_CTX_set_client_CA_list(self._context, ca_list)
1285 @_require_not_used
1286 def set_session_id(self, buf: bytes) -> None:
1287 """
1288 Set the session id to *buf* within which a session can be reused for
1289 this Context object. This is needed when doing session resumption,
1290 because there is no way for a stored session to know which Context
1291 object it is associated with.
1293 :param bytes buf: The session id.
1295 :returns: None
1296 """
1297 buf = _text_to_bytes_and_warn("buf", buf)
1298 _openssl_assert(
1299 _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf))
1300 == 1
1301 )
1303 @_require_not_used
1304 def set_session_cache_mode(self, mode: int) -> int:
1305 """
1306 Set the behavior of the session cache used by all connections using
1307 this Context. The previously set mode is returned. See
1308 :const:`SESS_CACHE_*` for details about particular modes.
1310 :param mode: One or more of the SESS_CACHE_* flags (combine using
1311 bitwise or)
1312 :returns: The previously set caching mode.
1314 .. versionadded:: 0.14
1315 """
1316 if not isinstance(mode, int):
1317 raise TypeError("mode must be an integer")
1319 return _lib.SSL_CTX_set_session_cache_mode(self._context, mode)
1321 def get_session_cache_mode(self) -> int:
1322 """
1323 Get the current session cache mode.
1325 :returns: The currently used cache mode.
1327 .. versionadded:: 0.14
1328 """
1329 return _lib.SSL_CTX_get_session_cache_mode(self._context)
1331 @_require_not_used
1332 def set_verify(
1333 self, mode: int, callback: _VerifyCallback | None = None
1334 ) -> None:
1335 """
1336 Set the verification flags for this Context object to *mode* and
1337 specify that *callback* should be used for verification callbacks.
1339 :param mode: The verify mode, this should be one of
1340 :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If
1341 :const:`VERIFY_PEER` is used, *mode* can be OR:ed with
1342 :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and
1343 :const:`VERIFY_CLIENT_ONCE` to further control the behaviour.
1344 :param callback: The optional Python verification callback to use.
1345 This should take five arguments: A Connection object, an X509
1346 object, and three integer variables, which are in turn potential
1347 error number, error depth and return code. *callback* should
1348 return True if verification passes and False otherwise.
1349 If omitted, OpenSSL's default verification is used.
1350 :return: None
1352 See SSL_CTX_set_verify(3SSL) for further details.
1353 """
1354 if not isinstance(mode, int):
1355 raise TypeError("mode must be an integer")
1357 if callback is None:
1358 self._verify_helper = None
1359 self._verify_callback = None
1360 _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL)
1361 else:
1362 if not callable(callback):
1363 raise TypeError("callback must be callable")
1365 self._verify_helper = _VerifyHelper(callback)
1366 self._verify_callback = self._verify_helper.callback
1367 _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
1369 @_require_not_used
1370 def set_verify_depth(self, depth: int) -> None:
1371 """
1372 Set the maximum depth for the certificate chain verification that shall
1373 be allowed for this Context object.
1375 :param depth: An integer specifying the verify depth
1376 :return: None
1377 """
1378 if not isinstance(depth, int):
1379 raise TypeError("depth must be an integer")
1381 _lib.SSL_CTX_set_verify_depth(self._context, depth)
1383 def get_verify_mode(self) -> int:
1384 """
1385 Retrieve the Context object's verify mode, as set by
1386 :meth:`set_verify`.
1388 :return: The verify mode
1389 """
1390 return _lib.SSL_CTX_get_verify_mode(self._context)
1392 def get_verify_depth(self) -> int:
1393 """
1394 Retrieve the Context object's verify depth, as set by
1395 :meth:`set_verify_depth`.
1397 :return: The verify depth
1398 """
1399 return _lib.SSL_CTX_get_verify_depth(self._context)
1401 @_require_not_used
1402 def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None:
1403 """
1404 Load parameters for Ephemeral Diffie-Hellman
1406 :param dhfile: The file to load EDH parameters from (``bytes`` or
1407 ``str``).
1409 :return: None
1410 """
1411 dhfile = _path_bytes(dhfile)
1413 bio = _lib.BIO_new_file(dhfile, b"r")
1414 if bio == _ffi.NULL:
1415 _raise_current_error()
1416 bio = _ffi.gc(bio, _lib.BIO_free)
1418 dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
1419 dh = _ffi.gc(dh, _lib.DH_free)
1420 res = _lib.SSL_CTX_set_tmp_dh(self._context, dh)
1421 _openssl_assert(res == 1)
1423 @_require_not_used
1424 def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None:
1425 """
1426 Select a curve to use for ECDHE key exchange.
1428 :param curve: A curve instance from cryptography
1429 (:class:`~cryptogragraphy.hazmat.primitives.asymmetric.ec.EllipticCurve`).
1430 Alternatively (deprecated) a curve object from either
1431 :meth:`OpenSSL.crypto.get_elliptic_curve` or
1432 :meth:`OpenSSL.crypto.get_elliptic_curves`.
1434 :return: None
1435 """
1437 if isinstance(curve, _EllipticCurve):
1438 warnings.warn(
1439 (
1440 "Passing pyOpenSSL elliptic curves to set_tmp_ecdh is "
1441 "deprecated. You should use cryptography's elliptic curve "
1442 "types instead."
1443 ),
1444 DeprecationWarning,
1445 stacklevel=2,
1446 )
1447 _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY())
1448 else:
1449 name = curve.name
1450 if name == "secp192r1":
1451 name = "prime192v1"
1452 elif name == "secp256r1":
1453 name = "prime256v1"
1454 nid = _lib.OBJ_txt2nid(name.encode())
1455 if nid == _lib.NID_undef:
1456 _raise_current_error()
1458 ec = _lib.EC_KEY_new_by_curve_name(nid)
1459 _openssl_assert(ec != _ffi.NULL)
1460 ec = _ffi.gc(ec, _lib.EC_KEY_free)
1461 _lib.SSL_CTX_set_tmp_ecdh(self._context, ec)
1463 @_require_not_used
1464 def set_cipher_list(self, cipher_list: bytes) -> None:
1465 """
1466 Set the list of ciphers to be used in this context.
1468 See the OpenSSL manual for more information (e.g.
1469 :manpage:`ciphers(1)`).
1471 :param bytes cipher_list: An OpenSSL cipher string.
1472 :return: None
1473 """
1474 cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list)
1476 if not isinstance(cipher_list, bytes):
1477 raise TypeError("cipher_list must be a byte string.")
1479 _openssl_assert(
1480 _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1
1481 )
1482 # In OpenSSL 1.1.1 setting the cipher list will always return TLS 1.3
1483 # ciphers even if you pass an invalid cipher. Applications (like
1484 # Twisted) have tests that depend on an error being raised if an
1485 # invalid cipher string is passed, but without the following check
1486 # for the TLS 1.3 specific cipher suites it would never error.
1487 tmpconn = Connection(self, None)
1488 if tmpconn.get_cipher_list() == [
1489 "TLS_AES_256_GCM_SHA384",
1490 "TLS_CHACHA20_POLY1305_SHA256",
1491 "TLS_AES_128_GCM_SHA256",
1492 ]:
1493 raise Error(
1494 [
1495 (
1496 "SSL routines",
1497 "SSL_CTX_set_cipher_list",
1498 "no cipher match",
1499 ),
1500 ],
1501 )
1503 @_require_not_used
1504 def set_client_ca_list(
1505 self, certificate_authorities: Sequence[X509Name]
1506 ) -> None:
1507 """
1508 Set the list of preferred client certificate signers for this server
1509 context.
1511 This list of certificate authorities will be sent to the client when
1512 the server requests a client certificate.
1514 :param certificate_authorities: a sequence of X509Names.
1515 :return: None
1517 .. versionadded:: 0.10
1518 """
1519 name_stack = _lib.sk_X509_NAME_new_null()
1520 _openssl_assert(name_stack != _ffi.NULL)
1522 try:
1523 for ca_name in certificate_authorities:
1524 if not isinstance(ca_name, X509Name):
1525 raise TypeError(
1526 f"client CAs must be X509Name objects, not "
1527 f"{type(ca_name).__name__} objects"
1528 )
1529 copy = _lib.X509_NAME_dup(ca_name._name)
1530 _openssl_assert(copy != _ffi.NULL)
1531 push_result = _lib.sk_X509_NAME_push(name_stack, copy)
1532 if not push_result:
1533 _lib.X509_NAME_free(copy)
1534 _raise_current_error()
1535 except Exception:
1536 _lib.sk_X509_NAME_free(name_stack)
1537 raise
1539 _lib.SSL_CTX_set_client_CA_list(self._context, name_stack)
1541 @_require_not_used
1542 def add_client_ca(
1543 self, certificate_authority: X509 | x509.Certificate
1544 ) -> None:
1545 """
1546 Add the CA certificate to the list of preferred signers for this
1547 context.
1549 The list of certificate authorities will be sent to the client when the
1550 server requests a client certificate.
1552 :param certificate_authority: certificate authority's X509 certificate.
1553 :return: None
1555 .. versionadded:: 0.10
1556 """
1557 if not isinstance(certificate_authority, X509):
1558 certificate_authority = X509.from_cryptography(
1559 certificate_authority
1560 )
1561 else:
1562 warnings.warn(
1563 (
1564 "Passing pyOpenSSL X509 objects is deprecated. You "
1565 "should use a cryptography.x509.Certificate instead."
1566 ),
1567 DeprecationWarning,
1568 stacklevel=2,
1569 )
1571 add_result = _lib.SSL_CTX_add_client_CA(
1572 self._context, certificate_authority._x509
1573 )
1574 _openssl_assert(add_result == 1)
1576 @_require_not_used
1577 def set_timeout(self, timeout: int) -> None:
1578 """
1579 Set the timeout for newly created sessions for this Context object to
1580 *timeout*. The default value is 300 seconds. See the OpenSSL manual
1581 for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`).
1583 :param timeout: The timeout in (whole) seconds
1584 :return: The previous session timeout
1585 """
1586 if not isinstance(timeout, int):
1587 raise TypeError("timeout must be an integer")
1589 return _lib.SSL_CTX_set_timeout(self._context, timeout)
1591 def get_timeout(self) -> int:
1592 """
1593 Retrieve session timeout, as set by :meth:`set_timeout`. The default
1594 is 300 seconds.
1596 :return: The session timeout
1597 """
1598 return _lib.SSL_CTX_get_timeout(self._context)
1600 @_require_not_used
1601 def set_info_callback(
1602 self, callback: Callable[[Connection, int, int], None]
1603 ) -> None:
1604 """
1605 Set the information callback to *callback*. This function will be
1606 called from time to time during SSL handshakes.
1608 :param callback: The Python callback to use. This should take three
1609 arguments: a Connection object and two integers. The first integer
1610 specifies where in the SSL handshake the function was called, and
1611 the other the return code from a (possibly failed) internal
1612 function call.
1613 :return: None
1614 """
1616 @wraps(callback)
1617 def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def]
1618 callback(Connection._reverse_mapping[ssl], where, return_code)
1620 self._info_callback = _ffi.callback(
1621 "void (*)(const SSL *, int, int)", wrapper
1622 )
1623 _lib.SSL_CTX_set_info_callback(self._context, self._info_callback)
1625 @_requires_keylog
1626 @_require_not_used
1627 def set_keylog_callback(
1628 self, callback: Callable[[Connection, bytes], None]
1629 ) -> None:
1630 """
1631 Set the TLS key logging callback to *callback*. This function will be
1632 called whenever TLS key material is generated or received, in order
1633 to allow applications to store this keying material for debugging
1634 purposes.
1636 :param callback: The Python callback to use. This should take two
1637 arguments: a Connection object and a bytestring that contains
1638 the key material in the format used by NSS for its SSLKEYLOGFILE
1639 debugging output.
1640 :return: None
1641 """
1643 @wraps(callback)
1644 def wrapper(ssl, line): # type: ignore[no-untyped-def]
1645 line = _ffi.string(line)
1646 callback(Connection._reverse_mapping[ssl], line)
1648 self._keylog_callback = _ffi.callback(
1649 "void (*)(const SSL *, const char *)", wrapper
1650 )
1651 _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback)
1653 def get_app_data(self) -> Any:
1654 """
1655 Get the application data (supplied via :meth:`set_app_data()`)
1657 :return: The application data
1658 """
1659 return self._app_data
1661 @_require_not_used
1662 def set_app_data(self, data: Any) -> None:
1663 """
1664 Set the application data (will be returned from get_app_data())
1666 :param data: Any Python object
1667 :return: None
1668 """
1669 self._app_data = data
1671 def get_cert_store(self) -> X509Store | None:
1672 """
1673 Get the certificate store for the context. This can be used to add
1674 "trusted" certificates without using the
1675 :meth:`load_verify_locations` method.
1677 :return: A X509Store object or None if it does not have one.
1678 """
1679 store = _lib.SSL_CTX_get_cert_store(self._context)
1680 if store == _ffi.NULL:
1681 # TODO: This is untested.
1682 return None
1684 pystore = X509Store.__new__(X509Store)
1685 pystore._store = store
1686 return pystore
1688 @_require_not_used
1689 def set_options(self, options: int) -> int:
1690 """
1691 Add options. Options set before are not cleared!
1692 This method should be used with the :const:`OP_*` constants.
1694 :param options: The options to add.
1695 :return: The new option bitmask.
1696 """
1697 if not isinstance(options, int):
1698 raise TypeError("options must be an integer")
1700 return _lib.SSL_CTX_set_options(self._context, options)
1702 @_require_not_used
1703 def set_mode(self, mode: int) -> int:
1704 """
1705 Add modes via bitmask. Modes set before are not cleared! This method
1706 should be used with the :const:`MODE_*` constants.
1708 :param mode: The mode to add.
1709 :return: The new mode bitmask.
1710 """
1711 if not isinstance(mode, int):
1712 raise TypeError("mode must be an integer")
1714 return _lib.SSL_CTX_set_mode(self._context, mode)
1716 @_require_not_used
1717 def set_tlsext_servername_callback(
1718 self, callback: Callable[[Connection], None]
1719 ) -> None:
1720 """
1721 Specify a callback function to be called when clients specify a server
1722 name.
1724 :param callback: The callback function. It will be invoked with one
1725 argument, the Connection instance.
1727 .. versionadded:: 0.13
1728 """
1730 @wraps(callback)
1731 def wrapper(ssl, alert, arg): # type: ignore[no-untyped-def]
1732 callback(Connection._reverse_mapping[ssl])
1733 return 0
1735 self._tlsext_servername_callback = _ffi.callback(
1736 "int (*)(SSL *, int *, void *)", wrapper
1737 )
1738 _lib.SSL_CTX_set_tlsext_servername_callback(
1739 self._context, self._tlsext_servername_callback
1740 )
1742 @_require_not_used
1743 def set_tlsext_use_srtp(self, profiles: bytes) -> None:
1744 """
1745 Enable support for negotiating SRTP keying material.
1747 :param bytes profiles: A colon delimited list of protection profile
1748 names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``.
1749 :return: None
1750 """
1751 if not isinstance(profiles, bytes):
1752 raise TypeError("profiles must be a byte string.")
1754 _openssl_assert(
1755 _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0
1756 )
1758 @_require_not_used
1759 def set_alpn_protos(self, protos: list[bytes]) -> None:
1760 """
1761 Specify the protocols that the client is prepared to speak after the
1762 TLS connection has been negotiated using Application Layer Protocol
1763 Negotiation.
1765 :param protos: A list of the protocols to be offered to the server.
1766 This list should be a Python list of bytestrings representing the
1767 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
1768 """
1769 # Different versions of OpenSSL are inconsistent about how they handle
1770 # empty proto lists (see #1043), so we avoid the problem entirely by
1771 # rejecting them ourselves.
1772 if not protos:
1773 raise ValueError("at least one protocol must be specified")
1775 # Take the list of protocols and join them together, prefixing them
1776 # with their lengths.
1777 protostr = b"".join(
1778 chain.from_iterable((bytes((len(p),)), p) for p in protos)
1779 )
1781 # Build a C string from the list. We don't need to save this off
1782 # because OpenSSL immediately copies the data out.
1783 input_str = _ffi.new("unsigned char[]", protostr)
1785 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
1786 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
1787 # return 0 on success, and non-0 on failure.
1788 # WARNING: these functions reverse the return value convention.
1789 _openssl_assert(
1790 _lib.SSL_CTX_set_alpn_protos(
1791 self._context, input_str, len(protostr)
1792 )
1793 == 0
1794 )
1796 @_require_not_used
1797 def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None:
1798 """
1799 Specify a callback function that will be called on the server when a
1800 client offers protocols using ALPN.
1802 :param callback: The callback function. It will be invoked with two
1803 arguments: the Connection, and a list of offered protocols as
1804 bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return
1805 one of those bytestrings to indicate the chosen protocol, the
1806 empty bytestring to terminate the TLS connection, or the
1807 :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered
1808 protocol was selected, but that the connection should not be
1809 aborted.
1810 """
1811 self._alpn_select_helper = _ALPNSelectHelper(callback)
1812 self._alpn_select_callback = self._alpn_select_helper.callback
1813 _lib.SSL_CTX_set_alpn_select_cb(
1814 self._context, self._alpn_select_callback, _ffi.NULL
1815 )
1817 def _set_ocsp_callback(
1818 self,
1819 helper: _OCSPClientCallbackHelper | _OCSPServerCallbackHelper,
1820 data: Any | None,
1821 ) -> None:
1822 """
1823 This internal helper does the common work for
1824 ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is
1825 almost all of it.
1826 """
1827 self._ocsp_helper = helper
1828 self._ocsp_callback = helper.callback
1829 if data is None:
1830 self._ocsp_data = _ffi.NULL
1831 else:
1832 self._ocsp_data = _ffi.new_handle(data)
1834 rc = _lib.SSL_CTX_set_tlsext_status_cb(
1835 self._context, self._ocsp_callback
1836 )
1837 _openssl_assert(rc == 1)
1838 rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data)
1839 _openssl_assert(rc == 1)
1841 @_require_not_used
1842 def set_ocsp_server_callback(
1843 self,
1844 callback: _OCSPServerCallback[_T],
1845 data: _T | None = None,
1846 ) -> None:
1847 """
1848 Set a callback to provide OCSP data to be stapled to the TLS handshake
1849 on the server side.
1851 :param callback: The callback function. It will be invoked with two
1852 arguments: the Connection, and the optional arbitrary data you have
1853 provided. The callback must return a bytestring that contains the
1854 OCSP data to staple to the handshake. If no OCSP data is available
1855 for this connection, return the empty bytestring.
1856 :param data: Some opaque data that will be passed into the callback
1857 function when called. This can be used to avoid needing to do
1858 complex data lookups or to keep track of what context is being
1859 used. This parameter is optional.
1860 """
1861 helper = _OCSPServerCallbackHelper(callback)
1862 self._set_ocsp_callback(helper, data)
1864 @_require_not_used
1865 def set_ocsp_client_callback(
1866 self,
1867 callback: _OCSPClientCallback[_T],
1868 data: _T | None = None,
1869 ) -> None:
1870 """
1871 Set a callback to validate OCSP data stapled to the TLS handshake on
1872 the client side.
1874 :param callback: The callback function. It will be invoked with three
1875 arguments: the Connection, a bytestring containing the stapled OCSP
1876 assertion, and the optional arbitrary data you have provided. The
1877 callback must return a boolean that indicates the result of
1878 validating the OCSP data: ``True`` if the OCSP data is valid and
1879 the certificate can be trusted, or ``False`` if either the OCSP
1880 data is invalid or the certificate has been revoked.
1881 :param data: Some opaque data that will be passed into the callback
1882 function when called. This can be used to avoid needing to do
1883 complex data lookups or to keep track of what context is being
1884 used. This parameter is optional.
1885 """
1886 helper = _OCSPClientCallbackHelper(callback)
1887 self._set_ocsp_callback(helper, data)
1889 @_require_not_used
1890 def set_cookie_generate_callback(
1891 self, callback: _CookieGenerateCallback
1892 ) -> None:
1893 self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback)
1894 _lib.SSL_CTX_set_cookie_generate_cb(
1895 self._context,
1896 self._cookie_generate_helper.callback,
1897 )
1899 @_require_not_used
1900 def set_cookie_verify_callback(
1901 self, callback: _CookieVerifyCallback
1902 ) -> None:
1903 self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback)
1904 _lib.SSL_CTX_set_cookie_verify_cb(
1905 self._context,
1906 self._cookie_verify_helper.callback,
1907 )
1910class Connection:
1911 _reverse_mapping: typing.MutableMapping[Any, Connection] = (
1912 WeakValueDictionary()
1913 )
1915 def __init__(
1916 self, context: Context, socket: socket.socket | None = None
1917 ) -> None:
1918 """
1919 Create a new Connection object, using the given OpenSSL.SSL.Context
1920 instance and socket.
1922 :param context: An SSL Context to use for this connection
1923 :param socket: The socket to use for transport layer
1924 """
1925 if not isinstance(context, Context):
1926 raise TypeError("context must be a Context instance")
1928 context._used = True
1930 ssl = _lib.SSL_new(context._context)
1931 self._ssl = _ffi.gc(ssl, _lib.SSL_free)
1932 # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns
1933 # an SSL_ERROR_WANT_READ when processing a non-application data packet
1934 # even though there is still data on the underlying transport.
1935 # See https://github.com/openssl/openssl/issues/6234 for more details.
1936 _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY)
1937 self._context = context
1938 self._app_data = None
1940 # References to strings used for Application Layer Protocol
1941 # Negotiation. These strings get copied at some point but it's well
1942 # after the callback returns, so we have to hang them somewhere to
1943 # avoid them getting freed.
1944 self._alpn_select_callback_args: Any = None
1946 # Reference the verify_callback of the Context. This ensures that if
1947 # set_verify is called again after the SSL object has been created we
1948 # do not point to a dangling reference
1949 self._verify_helper = context._verify_helper
1950 self._verify_callback = context._verify_callback
1952 # And likewise for the cookie callbacks
1953 self._cookie_generate_helper = context._cookie_generate_helper
1954 self._cookie_verify_helper = context._cookie_verify_helper
1956 self._reverse_mapping[self._ssl] = self
1958 if socket is None:
1959 self._socket = None
1960 # Don't set up any gc for these, SSL_free will take care of them.
1961 self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem())
1962 _openssl_assert(self._into_ssl != _ffi.NULL)
1964 self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem())
1965 _openssl_assert(self._from_ssl != _ffi.NULL)
1967 _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl)
1968 else:
1969 self._into_ssl = None
1970 self._from_ssl = None
1971 self._socket = socket
1972 set_result = _lib.SSL_set_fd(
1973 self._ssl, _asFileDescriptor(self._socket)
1974 )
1975 _openssl_assert(set_result == 1)
1977 def __getattr__(self, name: str) -> Any:
1978 """
1979 Look up attributes on the wrapped socket object if they are not found
1980 on the Connection object.
1981 """
1982 if self._socket is None:
1983 raise AttributeError(
1984 f"'{self.__class__.__name__}' object has no attribute '{name}'"
1985 )
1986 else:
1987 return getattr(self._socket, name)
1989 def _raise_ssl_error(self, ssl: Any, result: int) -> None:
1990 if self._context._verify_helper is not None:
1991 self._context._verify_helper.raise_if_problem()
1992 if self._context._alpn_select_helper is not None:
1993 self._context._alpn_select_helper.raise_if_problem()
1994 if self._context._ocsp_helper is not None:
1995 self._context._ocsp_helper.raise_if_problem()
1997 error = _lib.SSL_get_error(ssl, result)
1998 if error == _lib.SSL_ERROR_WANT_READ:
1999 raise WantReadError()
2000 elif error == _lib.SSL_ERROR_WANT_WRITE:
2001 raise WantWriteError()
2002 elif error == _lib.SSL_ERROR_ZERO_RETURN:
2003 raise ZeroReturnError()
2004 elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP:
2005 # TODO: This is untested.
2006 raise WantX509LookupError()
2007 elif error == _lib.SSL_ERROR_SYSCALL:
2008 if platform == "win32":
2009 errno = _ffi.getwinerror()[0]
2010 else:
2011 errno = _ffi.errno
2012 if _lib.ERR_peek_error() == 0 or errno != 0:
2013 if result < 0 and errno != 0:
2014 raise SysCallError(errno, errorcode.get(errno))
2015 raise SysCallError(-1, "Unexpected EOF")
2016 else:
2017 # TODO: This is untested, but I think twisted hits it?
2018 _raise_current_error()
2019 elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0:
2020 # In 3.0.x an unexpected EOF no longer triggers syscall error
2021 # but we want to maintain compatibility so we check here and
2022 # raise syscall if it is an EOF. Since we're not actually sure
2023 # what else could raise SSL_ERROR_SSL we check for the presence
2024 # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING
2025 # and if it's not present we just raise an error, which matches
2026 # the behavior before we added this elif section
2027 peeked_error = _lib.ERR_peek_error()
2028 reason = _lib.ERR_GET_REASON(peeked_error)
2029 if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING:
2030 _openssl_assert(
2031 reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING
2032 )
2033 _lib.ERR_clear_error()
2034 raise SysCallError(-1, "Unexpected EOF")
2035 else:
2036 _raise_current_error()
2037 elif error == _lib.SSL_ERROR_NONE:
2038 pass
2039 else:
2040 _raise_current_error()
2042 def get_context(self) -> Context:
2043 """
2044 Retrieve the :class:`Context` object associated with this
2045 :class:`Connection`.
2046 """
2047 return self._context
2049 def set_context(self, context: Context) -> None:
2050 """
2051 Switch this connection to a new session context.
2053 :param context: A :class:`Context` instance giving the new session
2054 context to use.
2055 """
2056 if not isinstance(context, Context):
2057 raise TypeError("context must be a Context instance")
2059 _lib.SSL_set_SSL_CTX(self._ssl, context._context)
2060 self._context = context
2061 self._context._used = True
2063 def get_servername(self) -> bytes | None:
2064 """
2065 Retrieve the servername extension value if provided in the client hello
2066 message, or None if there wasn't one.
2068 :return: A byte string giving the server name or :data:`None`.
2070 .. versionadded:: 0.13
2071 """
2072 name = _lib.SSL_get_servername(
2073 self._ssl, _lib.TLSEXT_NAMETYPE_host_name
2074 )
2075 if name == _ffi.NULL:
2076 return None
2078 return _ffi.string(name)
2080 def set_verify(
2081 self, mode: int, callback: _VerifyCallback | None = None
2082 ) -> None:
2083 """
2084 Override the Context object's verification flags for this specific
2085 connection. See :py:meth:`Context.set_verify` for details.
2086 """
2087 if not isinstance(mode, int):
2088 raise TypeError("mode must be an integer")
2090 if callback is None:
2091 self._verify_helper = None
2092 self._verify_callback = None
2093 _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL)
2094 else:
2095 if not callable(callback):
2096 raise TypeError("callback must be callable")
2098 self._verify_helper = _VerifyHelper(callback)
2099 self._verify_callback = self._verify_helper.callback
2100 _lib.SSL_set_verify(self._ssl, mode, self._verify_callback)
2102 def get_verify_mode(self) -> int:
2103 """
2104 Retrieve the Connection object's verify mode, as set by
2105 :meth:`set_verify`.
2107 :return: The verify mode
2108 """
2109 return _lib.SSL_get_verify_mode(self._ssl)
2111 def use_certificate(self, cert: X509 | x509.Certificate) -> None:
2112 """
2113 Load a certificate from a X509 object
2115 :param cert: The X509 object
2116 :return: None
2117 """
2118 # Mirrored from Context.use_certificate
2119 if not isinstance(cert, X509):
2120 cert = X509.from_cryptography(cert)
2121 else:
2122 warnings.warn(
2123 (
2124 "Passing pyOpenSSL X509 objects is deprecated. You "
2125 "should use a cryptography.x509.Certificate instead."
2126 ),
2127 DeprecationWarning,
2128 stacklevel=2,
2129 )
2131 use_result = _lib.SSL_use_certificate(self._ssl, cert._x509)
2132 if not use_result:
2133 _raise_current_error()
2135 def use_privatekey(self, pkey: _PrivateKey | PKey) -> None:
2136 """
2137 Load a private key from a PKey object
2139 :param pkey: The PKey object
2140 :return: None
2141 """
2142 # Mirrored from Context.use_privatekey
2143 if not isinstance(pkey, PKey):
2144 pkey = PKey.from_cryptography_key(pkey)
2145 else:
2146 warnings.warn(
2147 (
2148 "Passing pyOpenSSL PKey objects is deprecated. You "
2149 "should use a cryptography private key instead."
2150 ),
2151 DeprecationWarning,
2152 stacklevel=2,
2153 )
2155 use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey)
2156 if not use_result:
2157 self._context._raise_passphrase_exception()
2159 def set_ciphertext_mtu(self, mtu: int) -> None:
2160 """
2161 For DTLS, set the maximum UDP payload size (*not* including IP/UDP
2162 overhead).
2164 Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent
2165 OpenSSL from spontaneously clearing this.
2167 :param mtu: An integer giving the maximum transmission unit.
2169 .. versionadded:: 21.1
2170 """
2171 _lib.SSL_set_mtu(self._ssl, mtu)
2173 def get_cleartext_mtu(self) -> int:
2174 """
2175 For DTLS, get the maximum size of unencrypted data you can pass to
2176 :meth:`write` without exceeding the MTU (as passed to
2177 :meth:`set_ciphertext_mtu`).
2179 :return: The effective MTU as an integer.
2181 .. versionadded:: 21.1
2182 """
2184 if not hasattr(_lib, "DTLS_get_data_mtu"):
2185 raise NotImplementedError("requires OpenSSL 1.1.1 or better")
2186 return _lib.DTLS_get_data_mtu(self._ssl)
2188 def set_tlsext_host_name(self, name: bytes) -> None:
2189 """
2190 Set the value of the servername extension to send in the client hello.
2192 :param name: A byte string giving the name.
2194 .. versionadded:: 0.13
2195 """
2196 if not isinstance(name, bytes):
2197 raise TypeError("name must be a byte string")
2198 elif b"\0" in name:
2199 raise TypeError("name must not contain NUL byte")
2201 # XXX I guess this can fail sometimes?
2202 _lib.SSL_set_tlsext_host_name(self._ssl, name)
2204 def pending(self) -> int:
2205 """
2206 Get the number of bytes that can be safely read from the SSL buffer
2207 (**not** the underlying transport buffer).
2209 :return: The number of bytes available in the receive buffer.
2210 """
2211 return _lib.SSL_pending(self._ssl)
2213 def send(self, buf: bytes, flags: int = 0) -> int:
2214 """
2215 Send data on the connection. NOTE: If you get one of the WantRead,
2216 WantWrite or WantX509Lookup exceptions on this, you have to call the
2217 method again with the SAME buffer.
2219 :param buf: The string, buffer or memoryview to send
2220 :param flags: (optional) Included for compatibility with the socket
2221 API, the value is ignored
2222 :return: The number of bytes written
2223 """
2224 # Backward compatibility
2225 buf = _text_to_bytes_and_warn("buf", buf)
2227 with _ffi.from_buffer(buf) as data:
2228 # check len(buf) instead of len(data) for testability
2229 if len(buf) > 2147483647:
2230 raise ValueError(
2231 "Cannot send more than 2**31-1 bytes at once."
2232 )
2234 result = _lib.SSL_write(self._ssl, data, len(data))
2235 self._raise_ssl_error(self._ssl, result)
2237 return result
2239 write = send
2241 def sendall(self, buf: bytes, flags: int = 0) -> int:
2242 """
2243 Send "all" data on the connection. This calls send() repeatedly until
2244 all data is sent. If an error occurs, it's impossible to tell how much
2245 data has been sent.
2247 :param buf: The string, buffer or memoryview to send
2248 :param flags: (optional) Included for compatibility with the socket
2249 API, the value is ignored
2250 :return: The number of bytes written
2251 """
2252 buf = _text_to_bytes_and_warn("buf", buf)
2254 with _ffi.from_buffer(buf) as data:
2255 left_to_send = len(buf)
2256 total_sent = 0
2258 while left_to_send:
2259 # SSL_write's num arg is an int,
2260 # so we cannot send more than 2**31-1 bytes at once.
2261 result = _lib.SSL_write(
2262 self._ssl, data + total_sent, min(left_to_send, 2147483647)
2263 )
2264 self._raise_ssl_error(self._ssl, result)
2265 total_sent += result
2266 left_to_send -= result
2268 return total_sent
2270 def recv(self, bufsiz: int, flags: int | None = None) -> bytes:
2271 """
2272 Receive data on the connection.
2274 :param bufsiz: The maximum number of bytes to read
2275 :param flags: (optional) The only supported flag is ``MSG_PEEK``,
2276 all other flags are ignored.
2277 :return: The string read from the Connection
2278 """
2279 buf = _no_zero_allocator("char[]", bufsiz)
2280 if flags is not None and flags & socket.MSG_PEEK:
2281 result = _lib.SSL_peek(self._ssl, buf, bufsiz)
2282 else:
2283 result = _lib.SSL_read(self._ssl, buf, bufsiz)
2284 self._raise_ssl_error(self._ssl, result)
2285 return _ffi.buffer(buf, result)[:]
2287 read = recv
2289 def recv_into(
2290 self,
2291 buffer: Any, # collections.abc.Buffer once we use Python 3.12+
2292 nbytes: int | None = None,
2293 flags: int | None = None,
2294 ) -> int:
2295 """
2296 Receive data on the connection and copy it directly into the provided
2297 buffer, rather than creating a new string.
2299 :param buffer: The buffer to copy into.
2300 :param nbytes: (optional) The maximum number of bytes to read into the
2301 buffer. If not present, defaults to the size of the buffer. If
2302 larger than the size of the buffer, is reduced to the size of the
2303 buffer.
2304 :param flags: (optional) The only supported flag is ``MSG_PEEK``,
2305 all other flags are ignored.
2306 :return: The number of bytes read into the buffer.
2307 """
2308 if nbytes is None:
2309 nbytes = len(buffer)
2310 else:
2311 nbytes = min(nbytes, len(buffer))
2313 # We need to create a temporary buffer. This is annoying, it would be
2314 # better if we could pass memoryviews straight into the SSL_read call,
2315 # but right now we can't. Revisit this if CFFI gets that ability.
2316 buf = _no_zero_allocator("char[]", nbytes)
2317 if flags is not None and flags & socket.MSG_PEEK:
2318 result = _lib.SSL_peek(self._ssl, buf, nbytes)
2319 else:
2320 result = _lib.SSL_read(self._ssl, buf, nbytes)
2321 self._raise_ssl_error(self._ssl, result)
2323 # This strange line is all to avoid a memory copy. The buffer protocol
2324 # should allow us to assign a CFFI buffer to the LHS of this line, but
2325 # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
2326 # wrap it in a memoryview.
2327 buffer[:result] = memoryview(_ffi.buffer(buf, result))
2329 return result
2331 def _handle_bio_errors(self, bio: Any, result: int) -> typing.NoReturn:
2332 if _lib.BIO_should_retry(bio):
2333 if _lib.BIO_should_read(bio):
2334 raise WantReadError()
2335 elif _lib.BIO_should_write(bio):
2336 # TODO: This is untested.
2337 raise WantWriteError()
2338 elif _lib.BIO_should_io_special(bio):
2339 # TODO: This is untested. I think io_special means the socket
2340 # BIO has a not-yet connected socket.
2341 raise ValueError("BIO_should_io_special")
2342 else:
2343 # TODO: This is untested.
2344 raise ValueError("unknown bio failure")
2345 else:
2346 # TODO: This is untested.
2347 _raise_current_error()
2349 def bio_read(self, bufsiz: int) -> bytes:
2350 """
2351 If the Connection was created with a memory BIO, this method can be
2352 used to read bytes from the write end of that memory BIO. Many
2353 Connection methods will add bytes which must be read in this manner or
2354 the buffer will eventually fill up and the Connection will be able to
2355 take no further actions.
2357 :param bufsiz: The maximum number of bytes to read
2358 :return: The string read.
2359 """
2360 if self._from_ssl is None:
2361 raise TypeError("Connection sock was not None")
2363 if not isinstance(bufsiz, int):
2364 raise TypeError("bufsiz must be an integer")
2366 buf = _no_zero_allocator("char[]", bufsiz)
2367 result = _lib.BIO_read(self._from_ssl, buf, bufsiz)
2368 if result <= 0:
2369 self._handle_bio_errors(self._from_ssl, result)
2371 return _ffi.buffer(buf, result)[:]
2373 def bio_write(self, buf: bytes) -> int:
2374 """
2375 If the Connection was created with a memory BIO, this method can be
2376 used to add bytes to the read end of that memory BIO. The Connection
2377 can then read the bytes (for example, in response to a call to
2378 :meth:`recv`).
2380 :param buf: The string to put into the memory BIO.
2381 :return: The number of bytes written
2382 """
2383 buf = _text_to_bytes_and_warn("buf", buf)
2385 if self._into_ssl is None:
2386 raise TypeError("Connection sock was not None")
2388 with _ffi.from_buffer(buf) as data:
2389 result = _lib.BIO_write(self._into_ssl, data, len(data))
2390 if result <= 0:
2391 self._handle_bio_errors(self._into_ssl, result)
2392 return result
2394 def renegotiate(self) -> bool:
2395 """
2396 Renegotiate the session.
2398 :return: True if the renegotiation can be started, False otherwise
2399 """
2400 if not self.renegotiate_pending():
2401 _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1)
2402 return True
2403 return False
2405 def do_handshake(self) -> None:
2406 """
2407 Perform an SSL handshake (usually called after :meth:`renegotiate` or
2408 one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can
2409 raise the same exceptions as :meth:`send` and :meth:`recv`.
2411 :return: None.
2412 """
2413 result = _lib.SSL_do_handshake(self._ssl)
2414 self._raise_ssl_error(self._ssl, result)
2416 def renegotiate_pending(self) -> bool:
2417 """
2418 Check if there's a renegotiation in progress, it will return False once
2419 a renegotiation is finished.
2421 :return: Whether there's a renegotiation in progress
2422 """
2423 return _lib.SSL_renegotiate_pending(self._ssl) == 1
2425 def total_renegotiations(self) -> int:
2426 """
2427 Find out the total number of renegotiations.
2429 :return: The number of renegotiations.
2430 """
2431 return _lib.SSL_total_renegotiations(self._ssl)
2433 def connect(self, addr: Any) -> None:
2434 """
2435 Call the :meth:`connect` method of the underlying socket and set up SSL
2436 on the socket, using the :class:`Context` object supplied to this
2437 :class:`Connection` object at creation.
2439 :param addr: A remote address
2440 :return: What the socket's connect method returns
2441 """
2442 _lib.SSL_set_connect_state(self._ssl)
2443 return self._socket.connect(addr) # type: ignore[return-value, union-attr]
2445 def connect_ex(self, addr: Any) -> int:
2446 """
2447 Call the :meth:`connect_ex` method of the underlying socket and set up
2448 SSL on the socket, using the Context object supplied to this Connection
2449 object at creation. Note that if the :meth:`connect_ex` method of the
2450 socket doesn't return 0, SSL won't be initialized.
2452 :param addr: A remove address
2453 :return: What the socket's connect_ex method returns
2454 """
2455 connect_ex = self._socket.connect_ex # type: ignore[union-attr]
2456 self.set_connect_state()
2457 return connect_ex(addr)
2459 def accept(self) -> tuple[Connection, Any]:
2460 """
2461 Call the :meth:`accept` method of the underlying socket and set up SSL
2462 on the returned socket, using the Context object supplied to this
2463 :class:`Connection` object at creation.
2465 :return: A *(conn, addr)* pair where *conn* is the new
2466 :class:`Connection` object created, and *address* is as returned by
2467 the socket's :meth:`accept`.
2468 """
2469 client, addr = self._socket.accept() # type: ignore[union-attr]
2470 conn = Connection(self._context, client)
2471 conn.set_accept_state()
2472 return (conn, addr)
2474 def DTLSv1_listen(self) -> None:
2475 """
2476 Call the OpenSSL function DTLSv1_listen on this connection. See the
2477 OpenSSL manual for more details.
2479 :return: None
2480 """
2481 # Possible future extension: return the BIO_ADDR in some form.
2482 bio_addr = _lib.BIO_ADDR_new()
2483 try:
2484 result = _lib.DTLSv1_listen(self._ssl, bio_addr)
2485 finally:
2486 _lib.BIO_ADDR_free(bio_addr)
2487 # DTLSv1_listen is weird. A zero return value means 'didn't find a
2488 # ClientHello with valid cookie, but keep trying'. So basically
2489 # WantReadError. But it doesn't work correctly with _raise_ssl_error.
2490 # So we raise it manually instead.
2491 if self._cookie_generate_helper is not None:
2492 self._cookie_generate_helper.raise_if_problem()
2493 if self._cookie_verify_helper is not None:
2494 self._cookie_verify_helper.raise_if_problem()
2495 if result == 0:
2496 raise WantReadError()
2497 if result < 0:
2498 self._raise_ssl_error(self._ssl, result)
2500 def DTLSv1_get_timeout(self) -> int | None:
2501 """
2502 Determine when the DTLS SSL object next needs to perform internal
2503 processing due to the passage of time.
2505 When the returned number of seconds have passed, the
2506 :meth:`DTLSv1_handle_timeout` method needs to be called.
2508 :return: The time left in seconds before the next timeout or `None`
2509 if no timeout is currently active.
2510 """
2511 ptv_sec = _ffi.new("time_t *")
2512 ptv_usec = _ffi.new("long *")
2513 if _lib.Cryptography_DTLSv1_get_timeout(self._ssl, ptv_sec, ptv_usec):
2514 return ptv_sec[0] + (ptv_usec[0] / 1000000)
2515 else:
2516 return None
2518 def DTLSv1_handle_timeout(self) -> bool:
2519 """
2520 Handles any timeout events which have become pending on a DTLS SSL
2521 object.
2523 :return: `True` if there was a pending timeout, `False` otherwise.
2524 """
2525 result = _lib.DTLSv1_handle_timeout(self._ssl)
2526 if result < 0:
2527 self._raise_ssl_error(self._ssl, result)
2528 assert False, "unreachable"
2529 else:
2530 return bool(result)
2532 def bio_shutdown(self) -> None:
2533 """
2534 If the Connection was created with a memory BIO, this method can be
2535 used to indicate that *end of file* has been reached on the read end of
2536 that memory BIO.
2538 :return: None
2539 """
2540 if self._from_ssl is None:
2541 raise TypeError("Connection sock was not None")
2543 _lib.BIO_set_mem_eof_return(self._into_ssl, 0)
2545 def shutdown(self) -> bool:
2546 """
2547 Send the shutdown message to the Connection.
2549 :return: True if the shutdown completed successfully (i.e. both sides
2550 have sent closure alerts), False otherwise (in which case you
2551 call :meth:`recv` or :meth:`send` when the connection becomes
2552 readable/writeable).
2553 """
2554 result = _lib.SSL_shutdown(self._ssl)
2555 if result < 0:
2556 self._raise_ssl_error(self._ssl, result)
2557 assert False, "unreachable"
2558 elif result > 0:
2559 return True
2560 else:
2561 return False
2563 def get_cipher_list(self) -> list[str]:
2564 """
2565 Retrieve the list of ciphers used by the Connection object.
2567 :return: A list of native cipher strings.
2568 """
2569 ciphers = []
2570 for i in count():
2571 result = _lib.SSL_get_cipher_list(self._ssl, i)
2572 if result == _ffi.NULL:
2573 break
2574 ciphers.append(_ffi.string(result).decode("utf-8"))
2575 return ciphers
2577 def get_client_ca_list(self) -> list[X509Name]:
2578 """
2579 Get CAs whose certificates are suggested for client authentication.
2581 :return: If this is a server connection, the list of certificate
2582 authorities that will be sent or has been sent to the client, as
2583 controlled by this :class:`Connection`'s :class:`Context`.
2585 If this is a client connection, the list will be empty until the
2586 connection with the server is established.
2588 .. versionadded:: 0.10
2589 """
2590 ca_names = _lib.SSL_get_client_CA_list(self._ssl)
2591 if ca_names == _ffi.NULL:
2592 # TODO: This is untested.
2593 return []
2595 result = []
2596 for i in range(_lib.sk_X509_NAME_num(ca_names)):
2597 name = _lib.sk_X509_NAME_value(ca_names, i)
2598 copy = _lib.X509_NAME_dup(name)
2599 _openssl_assert(copy != _ffi.NULL)
2601 pyname = X509Name.__new__(X509Name)
2602 pyname._name = _ffi.gc(copy, _lib.X509_NAME_free)
2603 result.append(pyname)
2604 return result
2606 def makefile(self, *args: Any, **kwargs: Any) -> typing.NoReturn:
2607 """
2608 The makefile() method is not implemented, since there is no dup
2609 semantics for SSL connections
2611 :raise: NotImplementedError
2612 """
2613 raise NotImplementedError(
2614 "Cannot make file object of OpenSSL.SSL.Connection"
2615 )
2617 def get_app_data(self) -> Any:
2618 """
2619 Retrieve application data as set by :meth:`set_app_data`.
2621 :return: The application data
2622 """
2623 return self._app_data
2625 def set_app_data(self, data: Any) -> None:
2626 """
2627 Set application data
2629 :param data: The application data
2630 :return: None
2631 """
2632 self._app_data = data
2634 def get_shutdown(self) -> int:
2635 """
2636 Get the shutdown state of the Connection.
2638 :return: The shutdown state, a bitvector of SENT_SHUTDOWN,
2639 RECEIVED_SHUTDOWN.
2640 """
2641 return _lib.SSL_get_shutdown(self._ssl)
2643 def set_shutdown(self, state: int) -> None:
2644 """
2645 Set the shutdown state of the Connection.
2647 :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN.
2648 :return: None
2649 """
2650 if not isinstance(state, int):
2651 raise TypeError("state must be an integer")
2653 _lib.SSL_set_shutdown(self._ssl, state)
2655 def get_state_string(self) -> bytes:
2656 """
2657 Retrieve a verbose string detailing the state of the Connection.
2659 :return: A string representing the state
2660 """
2661 return _ffi.string(_lib.SSL_state_string_long(self._ssl))
2663 def server_random(self) -> bytes | None:
2664 """
2665 Retrieve the random value used with the server hello message.
2667 :return: A string representing the state
2668 """
2669 session = _lib.SSL_get_session(self._ssl)
2670 if session == _ffi.NULL:
2671 return None
2672 length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0)
2673 _openssl_assert(length > 0)
2674 outp = _no_zero_allocator("unsigned char[]", length)
2675 _lib.SSL_get_server_random(self._ssl, outp, length)
2676 return _ffi.buffer(outp, length)[:]
2678 def client_random(self) -> bytes | None:
2679 """
2680 Retrieve the random value used with the client hello message.
2682 :return: A string representing the state
2683 """
2684 session = _lib.SSL_get_session(self._ssl)
2685 if session == _ffi.NULL:
2686 return None
2688 length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0)
2689 _openssl_assert(length > 0)
2690 outp = _no_zero_allocator("unsigned char[]", length)
2691 _lib.SSL_get_client_random(self._ssl, outp, length)
2692 return _ffi.buffer(outp, length)[:]
2694 def master_key(self) -> bytes | None:
2695 """
2696 Retrieve the value of the master key for this session.
2698 :return: A string representing the state
2699 """
2700 session = _lib.SSL_get_session(self._ssl)
2701 if session == _ffi.NULL:
2702 return None
2704 length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0)
2705 _openssl_assert(length > 0)
2706 outp = _no_zero_allocator("unsigned char[]", length)
2707 _lib.SSL_SESSION_get_master_key(session, outp, length)
2708 return _ffi.buffer(outp, length)[:]
2710 def export_keying_material(
2711 self, label: bytes, olen: int, context: bytes | None = None
2712 ) -> bytes:
2713 """
2714 Obtain keying material for application use.
2716 :param: label - a disambiguating label string as described in RFC 5705
2717 :param: olen - the length of the exported key material in bytes
2718 :param: context - a per-association context value
2719 :return: the exported key material bytes or None
2720 """
2721 outp = _no_zero_allocator("unsigned char[]", olen)
2722 context_buf = _ffi.NULL
2723 context_len = 0
2724 use_context = 0
2725 if context is not None:
2726 context_buf = context
2727 context_len = len(context)
2728 use_context = 1
2729 success = _lib.SSL_export_keying_material(
2730 self._ssl,
2731 outp,
2732 olen,
2733 label,
2734 len(label),
2735 context_buf,
2736 context_len,
2737 use_context,
2738 )
2739 _openssl_assert(success == 1)
2740 return _ffi.buffer(outp, olen)[:]
2742 def sock_shutdown(self, *args: Any, **kwargs: Any) -> None:
2743 """
2744 Call the :meth:`shutdown` method of the underlying socket.
2745 See :manpage:`shutdown(2)`.
2747 :return: What the socket's shutdown() method returns
2748 """
2749 return self._socket.shutdown(*args, **kwargs) # type: ignore[return-value, union-attr]
2751 @typing.overload
2752 def get_certificate(
2753 self, *, as_cryptography: typing.Literal[True]
2754 ) -> x509.Certificate | None:
2755 pass
2757 @typing.overload
2758 def get_certificate(
2759 self, *, as_cryptography: typing.Literal[False] = False
2760 ) -> X509 | None:
2761 pass
2763 def get_certificate(
2764 self,
2765 *,
2766 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2767 ) -> X509 | x509.Certificate | None:
2768 """
2769 Retrieve the local certificate (if any)
2771 :param bool as_cryptography: Controls whether a
2772 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509``
2773 object should be returned.
2775 :return: The local certificate
2776 """
2777 cert = _lib.SSL_get_certificate(self._ssl)
2778 if cert != _ffi.NULL:
2779 _lib.X509_up_ref(cert)
2780 pycert = X509._from_raw_x509_ptr(cert)
2781 if as_cryptography:
2782 return pycert.to_cryptography()
2783 return pycert
2784 return None
2786 @typing.overload
2787 def get_peer_certificate(
2788 self, *, as_cryptography: typing.Literal[True]
2789 ) -> x509.Certificate | None:
2790 pass
2792 @typing.overload
2793 def get_peer_certificate(
2794 self, *, as_cryptography: typing.Literal[False] = False
2795 ) -> X509 | None:
2796 pass
2798 def get_peer_certificate(
2799 self,
2800 *,
2801 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2802 ) -> X509 | x509.Certificate | None:
2803 """
2804 Retrieve the other side's certificate (if any)
2806 :param bool as_cryptography: Controls whether a
2807 ``cryptography.x509.Certificate`` or an ``OpenSSL.crypto.X509``
2808 object should be returned.
2810 :return: The peer's certificate
2811 """
2812 cert = _lib.SSL_get_peer_certificate(self._ssl)
2813 if cert != _ffi.NULL:
2814 pycert = X509._from_raw_x509_ptr(cert)
2815 if as_cryptography:
2816 return pycert.to_cryptography()
2817 return pycert
2818 return None
2820 @staticmethod
2821 def _cert_stack_to_list(cert_stack: Any) -> list[X509]:
2822 """
2823 Internal helper to convert a STACK_OF(X509) to a list of X509
2824 instances.
2825 """
2826 result = []
2827 for i in range(_lib.sk_X509_num(cert_stack)):
2828 cert = _lib.sk_X509_value(cert_stack, i)
2829 _openssl_assert(cert != _ffi.NULL)
2830 res = _lib.X509_up_ref(cert)
2831 _openssl_assert(res >= 1)
2832 pycert = X509._from_raw_x509_ptr(cert)
2833 result.append(pycert)
2834 return result
2836 @staticmethod
2837 def _cert_stack_to_cryptography_list(
2838 cert_stack: Any,
2839 ) -> list[x509.Certificate]:
2840 """
2841 Internal helper to convert a STACK_OF(X509) to a list of X509
2842 instances.
2843 """
2844 result = []
2845 for i in range(_lib.sk_X509_num(cert_stack)):
2846 cert = _lib.sk_X509_value(cert_stack, i)
2847 _openssl_assert(cert != _ffi.NULL)
2848 res = _lib.X509_up_ref(cert)
2849 _openssl_assert(res >= 1)
2850 pycert = X509._from_raw_x509_ptr(cert)
2851 result.append(pycert.to_cryptography())
2852 return result
2854 @typing.overload
2855 def get_peer_cert_chain(
2856 self, *, as_cryptography: typing.Literal[True]
2857 ) -> list[x509.Certificate] | None:
2858 pass
2860 @typing.overload
2861 def get_peer_cert_chain(
2862 self, *, as_cryptography: typing.Literal[False] = False
2863 ) -> list[X509] | None:
2864 pass
2866 def get_peer_cert_chain(
2867 self,
2868 *,
2869 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2870 ) -> list[X509] | list[x509.Certificate] | None:
2871 """
2872 Retrieve the other side's certificate (if any)
2874 :param bool as_cryptography: Controls whether a list of
2875 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509``
2876 object should be returned.
2878 :return: A list of X509 instances giving the peer's certificate chain,
2879 or None if it does not have one.
2880 """
2881 cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl)
2882 if cert_stack == _ffi.NULL:
2883 return None
2885 if as_cryptography:
2886 return self._cert_stack_to_cryptography_list(cert_stack)
2887 return self._cert_stack_to_list(cert_stack)
2889 @typing.overload
2890 def get_verified_chain(
2891 self, *, as_cryptography: typing.Literal[True]
2892 ) -> list[x509.Certificate] | None:
2893 pass
2895 @typing.overload
2896 def get_verified_chain(
2897 self, *, as_cryptography: typing.Literal[False] = False
2898 ) -> list[X509] | None:
2899 pass
2901 def get_verified_chain(
2902 self,
2903 *,
2904 as_cryptography: typing.Literal[True] | typing.Literal[False] = False,
2905 ) -> list[X509] | list[x509.Certificate] | None:
2906 """
2907 Retrieve the verified certificate chain of the peer including the
2908 peer's end entity certificate. It must be called after a session has
2909 been successfully established. If peer verification was not successful
2910 the chain may be incomplete, invalid, or None.
2912 :param bool as_cryptography: Controls whether a list of
2913 ``cryptography.x509.Certificate`` or ``OpenSSL.crypto.X509``
2914 object should be returned.
2916 :return: A list of X509 instances giving the peer's verified
2917 certificate chain, or None if it does not have one.
2919 .. versionadded:: 20.0
2920 """
2921 # OpenSSL 1.1+
2922 cert_stack = _lib.SSL_get0_verified_chain(self._ssl)
2923 if cert_stack == _ffi.NULL:
2924 return None
2926 if as_cryptography:
2927 return self._cert_stack_to_cryptography_list(cert_stack)
2928 return self._cert_stack_to_list(cert_stack)
2930 def want_read(self) -> bool:
2931 """
2932 Checks if more data has to be read from the transport layer to complete
2933 an operation.
2935 :return: True iff more data has to be read
2936 """
2937 return _lib.SSL_want_read(self._ssl)
2939 def want_write(self) -> bool:
2940 """
2941 Checks if there is data to write to the transport layer to complete an
2942 operation.
2944 :return: True iff there is data to write
2945 """
2946 return _lib.SSL_want_write(self._ssl)
2948 def set_accept_state(self) -> None:
2949 """
2950 Set the connection to work in server mode. The handshake will be
2951 handled automatically by read/write.
2953 :return: None
2954 """
2955 _lib.SSL_set_accept_state(self._ssl)
2957 def set_connect_state(self) -> None:
2958 """
2959 Set the connection to work in client mode. The handshake will be
2960 handled automatically by read/write.
2962 :return: None
2963 """
2964 _lib.SSL_set_connect_state(self._ssl)
2966 def get_session(self) -> Session | None:
2967 """
2968 Returns the Session currently used.
2970 :return: An instance of :class:`OpenSSL.SSL.Session` or
2971 :obj:`None` if no session exists.
2973 .. versionadded:: 0.14
2974 """
2975 session = _lib.SSL_get1_session(self._ssl)
2976 if session == _ffi.NULL:
2977 return None
2979 pysession = Session.__new__(Session)
2980 pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free)
2981 return pysession
2983 def set_session(self, session: Session) -> None:
2984 """
2985 Set the session to be used when the TLS/SSL connection is established.
2987 :param session: A Session instance representing the session to use.
2988 :returns: None
2990 .. versionadded:: 0.14
2991 """
2992 if not isinstance(session, Session):
2993 raise TypeError("session must be a Session instance")
2995 result = _lib.SSL_set_session(self._ssl, session._session)
2996 _openssl_assert(result == 1)
2998 def _get_finished_message(
2999 self, function: Callable[[Any, Any, int], int]
3000 ) -> bytes | None:
3001 """
3002 Helper to implement :meth:`get_finished` and
3003 :meth:`get_peer_finished`.
3005 :param function: Either :data:`SSL_get_finished`: or
3006 :data:`SSL_get_peer_finished`.
3008 :return: :data:`None` if the desired message has not yet been
3009 received, otherwise the contents of the message.
3010 """
3011 # The OpenSSL documentation says nothing about what might happen if the
3012 # count argument given is zero. Specifically, it doesn't say whether
3013 # the output buffer may be NULL in that case or not. Inspection of the
3014 # implementation reveals that it calls memcpy() unconditionally.
3015 # Section 7.1.4, paragraph 1 of the C standard suggests that
3016 # memcpy(NULL, source, 0) is not guaranteed to produce defined (let
3017 # alone desirable) behavior (though it probably does on just about
3018 # every implementation...)
3019 #
3020 # Allocate a tiny buffer to pass in (instead of just passing NULL as
3021 # one might expect) for the initial call so as to be safe against this
3022 # potentially undefined behavior.
3023 empty = _ffi.new("char[]", 0)
3024 size = function(self._ssl, empty, 0)
3025 if size == 0:
3026 # No Finished message so far.
3027 return None
3029 buf = _no_zero_allocator("char[]", size)
3030 function(self._ssl, buf, size)
3031 return _ffi.buffer(buf, size)[:]
3033 def get_finished(self) -> bytes | None:
3034 """
3035 Obtain the latest TLS Finished message that we sent.
3037 :return: The contents of the message or :obj:`None` if the TLS
3038 handshake has not yet completed.
3040 .. versionadded:: 0.15
3041 """
3042 return self._get_finished_message(_lib.SSL_get_finished)
3044 def get_peer_finished(self) -> bytes | None:
3045 """
3046 Obtain the latest TLS Finished message that we received from the peer.
3048 :return: The contents of the message or :obj:`None` if the TLS
3049 handshake has not yet completed.
3051 .. versionadded:: 0.15
3052 """
3053 return self._get_finished_message(_lib.SSL_get_peer_finished)
3055 def get_cipher_name(self) -> str | None:
3056 """
3057 Obtain the name of the currently used cipher.
3059 :returns: The name of the currently used cipher or :obj:`None`
3060 if no connection has been established.
3062 .. versionadded:: 0.15
3063 """
3064 cipher = _lib.SSL_get_current_cipher(self._ssl)
3065 if cipher == _ffi.NULL:
3066 return None
3067 else:
3068 name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher))
3069 return name.decode("utf-8")
3071 def get_cipher_bits(self) -> int | None:
3072 """
3073 Obtain the number of secret bits of the currently used cipher.
3075 :returns: The number of secret bits of the currently used cipher
3076 or :obj:`None` if no connection has been established.
3078 .. versionadded:: 0.15
3079 """
3080 cipher = _lib.SSL_get_current_cipher(self._ssl)
3081 if cipher == _ffi.NULL:
3082 return None
3083 else:
3084 return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL)
3086 def get_cipher_version(self) -> str | None:
3087 """
3088 Obtain the protocol version of the currently used cipher.
3090 :returns: The protocol name of the currently used cipher
3091 or :obj:`None` if no connection has been established.
3093 .. versionadded:: 0.15
3094 """
3095 cipher = _lib.SSL_get_current_cipher(self._ssl)
3096 if cipher == _ffi.NULL:
3097 return None
3098 else:
3099 version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher))
3100 return version.decode("utf-8")
3102 def get_protocol_version_name(self) -> str:
3103 """
3104 Retrieve the protocol version of the current connection.
3106 :returns: The TLS version of the current connection, for example
3107 the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown``
3108 for connections that were not successfully established.
3109 """
3110 version = _ffi.string(_lib.SSL_get_version(self._ssl))
3111 return version.decode("utf-8")
3113 def get_protocol_version(self) -> int:
3114 """
3115 Retrieve the SSL or TLS protocol version of the current connection.
3117 :returns: The TLS version of the current connection. For example,
3118 it will return ``0x769`` for connections made over TLS version 1.
3119 """
3120 version = _lib.SSL_version(self._ssl)
3121 return version
3123 def set_alpn_protos(self, protos: list[bytes]) -> None:
3124 """
3125 Specify the client's ALPN protocol list.
3127 These protocols are offered to the server during protocol negotiation.
3129 :param protos: A list of the protocols to be offered to the server.
3130 This list should be a Python list of bytestrings representing the
3131 protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
3132 """
3133 # Different versions of OpenSSL are inconsistent about how they handle
3134 # empty proto lists (see #1043), so we avoid the problem entirely by
3135 # rejecting them ourselves.
3136 if not protos:
3137 raise ValueError("at least one protocol must be specified")
3139 # Take the list of protocols and join them together, prefixing them
3140 # with their lengths.
3141 protostr = b"".join(
3142 chain.from_iterable((bytes((len(p),)), p) for p in protos)
3143 )
3145 # Build a C string from the list. We don't need to save this off
3146 # because OpenSSL immediately copies the data out.
3147 input_str = _ffi.new("unsigned char[]", protostr)
3149 # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
3150 # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
3151 # return 0 on success, and non-0 on failure.
3152 # WARNING: these functions reverse the return value convention.
3153 _openssl_assert(
3154 _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0
3155 )
3157 def get_alpn_proto_negotiated(self) -> bytes:
3158 """
3159 Get the protocol that was negotiated by ALPN.
3161 :returns: A bytestring of the protocol name. If no protocol has been
3162 negotiated yet, returns an empty bytestring.
3163 """
3164 data = _ffi.new("unsigned char **")
3165 data_len = _ffi.new("unsigned int *")
3167 _lib.SSL_get0_alpn_selected(self._ssl, data, data_len)
3169 if not data_len:
3170 return b""
3172 return _ffi.buffer(data[0], data_len[0])[:]
3174 def get_selected_srtp_profile(self) -> bytes:
3175 """
3176 Get the SRTP protocol which was negotiated.
3178 :returns: A bytestring of the SRTP profile name. If no profile has been
3179 negotiated yet, returns an empty bytestring.
3180 """
3181 profile = _lib.SSL_get_selected_srtp_profile(self._ssl)
3182 if not profile:
3183 return b""
3185 return _ffi.string(profile.name)
3187 def request_ocsp(self) -> None:
3188 """
3189 Called to request that the server sends stapled OCSP data, if
3190 available. If this is not called on the client side then the server
3191 will not send OCSP data. Should be used in conjunction with
3192 :meth:`Context.set_ocsp_client_callback`.
3193 """
3194 rc = _lib.SSL_set_tlsext_status_type(
3195 self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp
3196 )
3197 _openssl_assert(rc == 1)