1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import annotations
6
7import abc
8import typing
9
10from cryptography.hazmat.bindings._rust import openssl as rust_openssl
11from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
12from cryptography.hazmat.primitives.ciphers import modes
13
14
15class CipherContext(metaclass=abc.ABCMeta):
16 @abc.abstractmethod
17 def update(self, data: bytes) -> bytes:
18 """
19 Processes the provided bytes through the cipher and returns the results
20 as bytes.
21 """
22
23 @abc.abstractmethod
24 def update_into(self, data: bytes, buf: bytes) -> int:
25 """
26 Processes the provided bytes and writes the resulting data into the
27 provided buffer. Returns the number of bytes written.
28 """
29
30 @abc.abstractmethod
31 def finalize(self) -> bytes:
32 """
33 Returns the results of processing the final block as bytes.
34 """
35
36 @abc.abstractmethod
37 def reset_nonce(self, nonce: bytes) -> None:
38 """
39 Resets the nonce for the cipher context to the provided value.
40 Raises an exception if it does not support reset or if the
41 provided nonce does not have a valid length.
42 """
43
44
45class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
46 @abc.abstractmethod
47 def authenticate_additional_data(self, data: bytes) -> None:
48 """
49 Authenticates the provided bytes.
50 """
51
52
53class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
54 @abc.abstractmethod
55 def finalize_with_tag(self, tag: bytes) -> bytes:
56 """
57 Returns the results of processing the final block as bytes and allows
58 delayed passing of the authentication tag.
59 """
60
61
62class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
63 @property
64 @abc.abstractmethod
65 def tag(self) -> bytes:
66 """
67 Returns tag bytes. This is only available after encryption is
68 finalized.
69 """
70
71
72Mode = typing.TypeVar(
73 "Mode", bound=typing.Optional[modes.Mode], covariant=True
74)
75
76
77class Cipher(typing.Generic[Mode]):
78 def __init__(
79 self,
80 algorithm: CipherAlgorithm,
81 mode: Mode,
82 backend: typing.Any = None,
83 ) -> None:
84 if not isinstance(algorithm, CipherAlgorithm):
85 raise TypeError("Expected interface of CipherAlgorithm.")
86
87 if mode is not None:
88 # mypy needs this assert to narrow the type from our generic
89 # type. Maybe it won't some time in the future.
90 assert isinstance(mode, modes.Mode)
91 mode.validate_for_algorithm(algorithm)
92
93 self.algorithm = algorithm
94 self.mode = mode
95
96 @typing.overload
97 def encryptor(
98 self: Cipher[modes.ModeWithAuthenticationTag],
99 ) -> AEADEncryptionContext: ...
100
101 @typing.overload
102 def encryptor(
103 self: _CIPHER_TYPE,
104 ) -> CipherContext: ...
105
106 def encryptor(self):
107 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
108 if self.mode.tag is not None:
109 raise ValueError(
110 "Authentication tag must be None when encrypting."
111 )
112
113 return rust_openssl.ciphers.create_encryption_ctx(
114 self.algorithm, self.mode
115 )
116
117 @typing.overload
118 def decryptor(
119 self: Cipher[modes.ModeWithAuthenticationTag],
120 ) -> AEADDecryptionContext: ...
121
122 @typing.overload
123 def decryptor(
124 self: _CIPHER_TYPE,
125 ) -> CipherContext: ...
126
127 def decryptor(self):
128 return rust_openssl.ciphers.create_decryption_ctx(
129 self.algorithm, self.mode
130 )
131
132
133_CIPHER_TYPE = Cipher[
134 typing.Union[
135 modes.ModeWithNonce,
136 modes.ModeWithTweak,
137 None,
138 modes.ECB,
139 modes.ModeWithInitializationVector,
140 ]
141]
142
143CipherContext.register(rust_openssl.ciphers.CipherContext)
144AEADEncryptionContext.register(rust_openssl.ciphers.AEADEncryptionContext)
145AEADDecryptionContext.register(rust_openssl.ciphers.AEADDecryptionContext)