1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import annotations
6
7import abc
8import typing
9
10from cryptography.exceptions import (
11 AlreadyFinalized,
12 AlreadyUpdated,
13 NotYetFinalized,
14)
15from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
16from cryptography.hazmat.primitives.ciphers import modes
17
18if typing.TYPE_CHECKING:
19 from cryptography.hazmat.backends.openssl.ciphers import (
20 _CipherContext as _BackendCipherContext,
21 )
22
23
24class CipherContext(metaclass=abc.ABCMeta):
25 @abc.abstractmethod
26 def update(self, data: bytes) -> bytes:
27 """
28 Processes the provided bytes through the cipher and returns the results
29 as bytes.
30 """
31
32 @abc.abstractmethod
33 def update_into(self, data: bytes, buf: bytes) -> int:
34 """
35 Processes the provided bytes and writes the resulting data into the
36 provided buffer. Returns the number of bytes written.
37 """
38
39 @abc.abstractmethod
40 def finalize(self) -> bytes:
41 """
42 Returns the results of processing the final block as bytes.
43 """
44
45
46class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
47 @abc.abstractmethod
48 def authenticate_additional_data(self, data: bytes) -> None:
49 """
50 Authenticates the provided bytes.
51 """
52
53
54class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
55 @abc.abstractmethod
56 def finalize_with_tag(self, tag: bytes) -> bytes:
57 """
58 Returns the results of processing the final block as bytes and allows
59 delayed passing of the authentication tag.
60 """
61
62
63class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
64 @property
65 @abc.abstractmethod
66 def tag(self) -> bytes:
67 """
68 Returns tag bytes. This is only available after encryption is
69 finalized.
70 """
71
72
73Mode = typing.TypeVar(
74 "Mode", bound=typing.Optional[modes.Mode], covariant=True
75)
76
77
78class Cipher(typing.Generic[Mode]):
79 def __init__(
80 self,
81 algorithm: CipherAlgorithm,
82 mode: Mode,
83 backend: typing.Any = None,
84 ) -> None:
85 if not isinstance(algorithm, CipherAlgorithm):
86 raise TypeError("Expected interface of CipherAlgorithm.")
87
88 if mode is not None:
89 # mypy needs this assert to narrow the type from our generic
90 # type. Maybe it won't some time in the future.
91 assert isinstance(mode, modes.Mode)
92 mode.validate_for_algorithm(algorithm)
93
94 self.algorithm = algorithm
95 self.mode = mode
96
97 @typing.overload
98 def encryptor(
99 self: Cipher[modes.ModeWithAuthenticationTag],
100 ) -> AEADEncryptionContext:
101 ...
102
103 @typing.overload
104 def encryptor(
105 self: _CIPHER_TYPE,
106 ) -> CipherContext:
107 ...
108
109 def encryptor(self):
110 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
111 if self.mode.tag is not None:
112 raise ValueError(
113 "Authentication tag must be None when encrypting."
114 )
115 from cryptography.hazmat.backends.openssl.backend import backend
116
117 ctx = backend.create_symmetric_encryption_ctx(
118 self.algorithm, self.mode
119 )
120 return self._wrap_ctx(ctx, encrypt=True)
121
122 @typing.overload
123 def decryptor(
124 self: Cipher[modes.ModeWithAuthenticationTag],
125 ) -> AEADDecryptionContext:
126 ...
127
128 @typing.overload
129 def decryptor(
130 self: _CIPHER_TYPE,
131 ) -> CipherContext:
132 ...
133
134 def decryptor(self):
135 from cryptography.hazmat.backends.openssl.backend import backend
136
137 ctx = backend.create_symmetric_decryption_ctx(
138 self.algorithm, self.mode
139 )
140 return self._wrap_ctx(ctx, encrypt=False)
141
142 def _wrap_ctx(
143 self, ctx: _BackendCipherContext, encrypt: bool
144 ) -> AEADEncryptionContext | AEADDecryptionContext | CipherContext:
145 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
146 if encrypt:
147 return _AEADEncryptionContext(ctx)
148 else:
149 return _AEADDecryptionContext(ctx)
150 else:
151 return _CipherContext(ctx)
152
153
154_CIPHER_TYPE = Cipher[
155 typing.Union[
156 modes.ModeWithNonce,
157 modes.ModeWithTweak,
158 None,
159 modes.ECB,
160 modes.ModeWithInitializationVector,
161 ]
162]
163
164
165class _CipherContext(CipherContext):
166 _ctx: _BackendCipherContext | None
167
168 def __init__(self, ctx: _BackendCipherContext) -> None:
169 self._ctx = ctx
170
171 def update(self, data: bytes) -> bytes:
172 if self._ctx is None:
173 raise AlreadyFinalized("Context was already finalized.")
174 return self._ctx.update(data)
175
176 def update_into(self, data: bytes, buf: bytes) -> int:
177 if self._ctx is None:
178 raise AlreadyFinalized("Context was already finalized.")
179 return self._ctx.update_into(data, buf)
180
181 def finalize(self) -> bytes:
182 if self._ctx is None:
183 raise AlreadyFinalized("Context was already finalized.")
184 data = self._ctx.finalize()
185 self._ctx = None
186 return data
187
188
189class _AEADCipherContext(AEADCipherContext):
190 _ctx: _BackendCipherContext | None
191 _tag: bytes | None
192
193 def __init__(self, ctx: _BackendCipherContext) -> None:
194 self._ctx = ctx
195 self._bytes_processed = 0
196 self._aad_bytes_processed = 0
197 self._tag = None
198 self._updated = False
199
200 def _check_limit(self, data_size: int) -> None:
201 if self._ctx is None:
202 raise AlreadyFinalized("Context was already finalized.")
203 self._updated = True
204 self._bytes_processed += data_size
205 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES:
206 raise ValueError(
207 "{} has a maximum encrypted byte limit of {}".format(
208 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES
209 )
210 )
211
212 def update(self, data: bytes) -> bytes:
213 self._check_limit(len(data))
214 # mypy needs this assert even though _check_limit already checked
215 assert self._ctx is not None
216 return self._ctx.update(data)
217
218 def update_into(self, data: bytes, buf: bytes) -> int:
219 self._check_limit(len(data))
220 # mypy needs this assert even though _check_limit already checked
221 assert self._ctx is not None
222 return self._ctx.update_into(data, buf)
223
224 def finalize(self) -> bytes:
225 if self._ctx is None:
226 raise AlreadyFinalized("Context was already finalized.")
227 data = self._ctx.finalize()
228 self._tag = self._ctx.tag
229 self._ctx = None
230 return data
231
232 def authenticate_additional_data(self, data: bytes) -> None:
233 if self._ctx is None:
234 raise AlreadyFinalized("Context was already finalized.")
235 if self._updated:
236 raise AlreadyUpdated("Update has been called on this context.")
237
238 self._aad_bytes_processed += len(data)
239 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES:
240 raise ValueError(
241 "{} has a maximum AAD byte limit of {}".format(
242 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES
243 )
244 )
245
246 self._ctx.authenticate_additional_data(data)
247
248
249class _AEADDecryptionContext(_AEADCipherContext, AEADDecryptionContext):
250 def finalize_with_tag(self, tag: bytes) -> bytes:
251 if self._ctx is None:
252 raise AlreadyFinalized("Context was already finalized.")
253 if self._ctx._tag is not None:
254 raise ValueError(
255 "tag provided both in mode and in call with finalize_with_tag:"
256 " tag should only be provided once"
257 )
258 data = self._ctx.finalize_with_tag(tag)
259 self._tag = self._ctx.tag
260 self._ctx = None
261 return data
262
263
264class _AEADEncryptionContext(_AEADCipherContext, AEADEncryptionContext):
265 @property
266 def tag(self) -> bytes:
267 if self._ctx is not None:
268 raise NotYetFinalized(
269 "You must finalize encryption before " "getting the tag."
270 )
271 assert self._tag is not None
272 return self._tag