Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/primitives/ciphers/base.py: 43%
138 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import abc
8import typing
10from cryptography.exceptions import (
11 AlreadyFinalized,
12 AlreadyUpdated,
13 NotYetFinalized,
14)
15from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
16from cryptography.hazmat.primitives.ciphers import modes
18if typing.TYPE_CHECKING:
19 from cryptography.hazmat.backends.openssl.ciphers import (
20 _CipherContext as _BackendCipherContext,
21 )
24class CipherContext(metaclass=abc.ABCMeta):
25 @abc.abstractmethod
26 def update(self, data: bytes) -> bytes:
27 """
28 Processes the provided bytes through the cipher and returns the results
29 as bytes.
30 """
32 @abc.abstractmethod
33 def update_into(self, data: bytes, buf: bytes) -> int:
34 """
35 Processes the provided bytes and writes the resulting data into the
36 provided buffer. Returns the number of bytes written.
37 """
39 @abc.abstractmethod
40 def finalize(self) -> bytes:
41 """
42 Returns the results of processing the final block as bytes.
43 """
46class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
47 @abc.abstractmethod
48 def authenticate_additional_data(self, data: bytes) -> None:
49 """
50 Authenticates the provided bytes.
51 """
54class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
55 @abc.abstractmethod
56 def finalize_with_tag(self, tag: bytes) -> bytes:
57 """
58 Returns the results of processing the final block as bytes and allows
59 delayed passing of the authentication tag.
60 """
63class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
64 @property
65 @abc.abstractmethod
66 def tag(self) -> bytes:
67 """
68 Returns tag bytes. This is only available after encryption is
69 finalized.
70 """
73Mode = typing.TypeVar(
74 "Mode", bound=typing.Optional[modes.Mode], covariant=True
75)
78class Cipher(typing.Generic[Mode]):
79 def __init__(
80 self,
81 algorithm: CipherAlgorithm,
82 mode: Mode,
83 backend: typing.Any = None,
84 ) -> None:
85 if not isinstance(algorithm, CipherAlgorithm):
86 raise TypeError("Expected interface of CipherAlgorithm.")
88 if mode is not None:
89 # mypy needs this assert to narrow the type from our generic
90 # type. Maybe it won't some time in the future.
91 assert isinstance(mode, modes.Mode)
92 mode.validate_for_algorithm(algorithm)
94 self.algorithm = algorithm
95 self.mode = mode
97 @typing.overload
98 def encryptor(
99 self: Cipher[modes.ModeWithAuthenticationTag],
100 ) -> AEADEncryptionContext:
101 ...
103 @typing.overload
104 def encryptor(
105 self: _CIPHER_TYPE,
106 ) -> CipherContext:
107 ...
109 def encryptor(self):
110 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
111 if self.mode.tag is not None:
112 raise ValueError(
113 "Authentication tag must be None when encrypting."
114 )
115 from cryptography.hazmat.backends.openssl.backend import backend
117 ctx = backend.create_symmetric_encryption_ctx(
118 self.algorithm, self.mode
119 )
120 return self._wrap_ctx(ctx, encrypt=True)
122 @typing.overload
123 def decryptor(
124 self: Cipher[modes.ModeWithAuthenticationTag],
125 ) -> AEADDecryptionContext:
126 ...
128 @typing.overload
129 def decryptor(
130 self: _CIPHER_TYPE,
131 ) -> CipherContext:
132 ...
134 def decryptor(self):
135 from cryptography.hazmat.backends.openssl.backend import backend
137 ctx = backend.create_symmetric_decryption_ctx(
138 self.algorithm, self.mode
139 )
140 return self._wrap_ctx(ctx, encrypt=False)
142 def _wrap_ctx(
143 self, ctx: _BackendCipherContext, encrypt: bool
144 ) -> typing.Union[
145 AEADEncryptionContext, AEADDecryptionContext, CipherContext
146 ]:
147 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
148 if encrypt:
149 return _AEADEncryptionContext(ctx)
150 else:
151 return _AEADDecryptionContext(ctx)
152 else:
153 return _CipherContext(ctx)
156_CIPHER_TYPE = Cipher[
157 typing.Union[
158 modes.ModeWithNonce,
159 modes.ModeWithTweak,
160 None,
161 modes.ECB,
162 modes.ModeWithInitializationVector,
163 ]
164]
167class _CipherContext(CipherContext):
168 _ctx: typing.Optional[_BackendCipherContext]
170 def __init__(self, ctx: _BackendCipherContext) -> None:
171 self._ctx = ctx
173 def update(self, data: bytes) -> bytes:
174 if self._ctx is None:
175 raise AlreadyFinalized("Context was already finalized.")
176 return self._ctx.update(data)
178 def update_into(self, data: bytes, buf: bytes) -> int:
179 if self._ctx is None:
180 raise AlreadyFinalized("Context was already finalized.")
181 return self._ctx.update_into(data, buf)
183 def finalize(self) -> bytes:
184 if self._ctx is None:
185 raise AlreadyFinalized("Context was already finalized.")
186 data = self._ctx.finalize()
187 self._ctx = None
188 return data
191class _AEADCipherContext(AEADCipherContext):
192 _ctx: typing.Optional[_BackendCipherContext]
193 _tag: typing.Optional[bytes]
195 def __init__(self, ctx: _BackendCipherContext) -> None:
196 self._ctx = ctx
197 self._bytes_processed = 0
198 self._aad_bytes_processed = 0
199 self._tag = None
200 self._updated = False
202 def _check_limit(self, data_size: int) -> None:
203 if self._ctx is None:
204 raise AlreadyFinalized("Context was already finalized.")
205 self._updated = True
206 self._bytes_processed += data_size
207 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES:
208 raise ValueError(
209 "{} has a maximum encrypted byte limit of {}".format(
210 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES
211 )
212 )
214 def update(self, data: bytes) -> bytes:
215 self._check_limit(len(data))
216 # mypy needs this assert even though _check_limit already checked
217 assert self._ctx is not None
218 return self._ctx.update(data)
220 def update_into(self, data: bytes, buf: bytes) -> int:
221 self._check_limit(len(data))
222 # mypy needs this assert even though _check_limit already checked
223 assert self._ctx is not None
224 return self._ctx.update_into(data, buf)
226 def finalize(self) -> bytes:
227 if self._ctx is None:
228 raise AlreadyFinalized("Context was already finalized.")
229 data = self._ctx.finalize()
230 self._tag = self._ctx.tag
231 self._ctx = None
232 return data
234 def authenticate_additional_data(self, data: bytes) -> None:
235 if self._ctx is None:
236 raise AlreadyFinalized("Context was already finalized.")
237 if self._updated:
238 raise AlreadyUpdated("Update has been called on this context.")
240 self._aad_bytes_processed += len(data)
241 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES:
242 raise ValueError(
243 "{} has a maximum AAD byte limit of {}".format(
244 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES
245 )
246 )
248 self._ctx.authenticate_additional_data(data)
251class _AEADDecryptionContext(_AEADCipherContext, AEADDecryptionContext):
252 def finalize_with_tag(self, tag: bytes) -> bytes:
253 if self._ctx is None:
254 raise AlreadyFinalized("Context was already finalized.")
255 data = self._ctx.finalize_with_tag(tag)
256 self._tag = self._ctx.tag
257 self._ctx = None
258 return data
261class _AEADEncryptionContext(_AEADCipherContext, AEADEncryptionContext):
262 @property
263 def tag(self) -> bytes:
264 if self._ctx is not None:
265 raise NotYetFinalized(
266 "You must finalize encryption before " "getting the tag."
267 )
268 assert self._tag is not None
269 return self._tag