Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/primitives/ciphers/base.py: 53%
137 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
6import abc
7import typing
9from cryptography.exceptions import (
10 AlreadyFinalized,
11 AlreadyUpdated,
12 NotYetFinalized,
13)
14from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
15from cryptography.hazmat.primitives.ciphers import modes
17if typing.TYPE_CHECKING:
18 from cryptography.hazmat.backends.openssl.ciphers import (
19 _CipherContext as _BackendCipherContext,
20 )
23class CipherContext(metaclass=abc.ABCMeta):
24 @abc.abstractmethod
25 def update(self, data: bytes) -> bytes:
26 """
27 Processes the provided bytes through the cipher and returns the results
28 as bytes.
29 """
31 @abc.abstractmethod
32 def update_into(self, data: bytes, buf: bytes) -> int:
33 """
34 Processes the provided bytes and writes the resulting data into the
35 provided buffer. Returns the number of bytes written.
36 """
38 @abc.abstractmethod
39 def finalize(self) -> bytes:
40 """
41 Returns the results of processing the final block as bytes.
42 """
45class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
46 @abc.abstractmethod
47 def authenticate_additional_data(self, data: bytes) -> None:
48 """
49 Authenticates the provided bytes.
50 """
53class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
54 @abc.abstractmethod
55 def finalize_with_tag(self, tag: bytes) -> bytes:
56 """
57 Returns the results of processing the final block as bytes and allows
58 delayed passing of the authentication tag.
59 """
62class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
63 @property
64 @abc.abstractmethod
65 def tag(self) -> bytes:
66 """
67 Returns tag bytes. This is only available after encryption is
68 finalized.
69 """
72Mode = typing.TypeVar(
73 "Mode", bound=typing.Optional[modes.Mode], covariant=True
74)
77class Cipher(typing.Generic[Mode]):
78 def __init__(
79 self,
80 algorithm: CipherAlgorithm,
81 mode: Mode,
82 backend: typing.Any = None,
83 ) -> None:
84 if not isinstance(algorithm, CipherAlgorithm):
85 raise TypeError("Expected interface of CipherAlgorithm.")
87 if mode is not None:
88 # mypy needs this assert to narrow the type from our generic
89 # type. Maybe it won't some time in the future.
90 assert isinstance(mode, modes.Mode)
91 mode.validate_for_algorithm(algorithm)
93 self.algorithm = algorithm
94 self.mode = mode
96 @typing.overload
97 def encryptor(
98 self: "Cipher[modes.ModeWithAuthenticationTag]",
99 ) -> AEADEncryptionContext:
100 ...
102 @typing.overload
103 def encryptor(
104 self: "_CIPHER_TYPE",
105 ) -> CipherContext:
106 ...
108 def encryptor(self):
109 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
110 if self.mode.tag is not None:
111 raise ValueError(
112 "Authentication tag must be None when encrypting."
113 )
114 from cryptography.hazmat.backends.openssl.backend import backend
116 ctx = backend.create_symmetric_encryption_ctx(
117 self.algorithm, self.mode
118 )
119 return self._wrap_ctx(ctx, encrypt=True)
121 @typing.overload
122 def decryptor(
123 self: "Cipher[modes.ModeWithAuthenticationTag]",
124 ) -> AEADDecryptionContext:
125 ...
127 @typing.overload
128 def decryptor(
129 self: "_CIPHER_TYPE",
130 ) -> CipherContext:
131 ...
133 def decryptor(self):
134 from cryptography.hazmat.backends.openssl.backend import backend
136 ctx = backend.create_symmetric_decryption_ctx(
137 self.algorithm, self.mode
138 )
139 return self._wrap_ctx(ctx, encrypt=False)
141 def _wrap_ctx(
142 self, ctx: "_BackendCipherContext", encrypt: bool
143 ) -> typing.Union[
144 AEADEncryptionContext, AEADDecryptionContext, CipherContext
145 ]:
146 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
147 if encrypt:
148 return _AEADEncryptionContext(ctx)
149 else:
150 return _AEADDecryptionContext(ctx)
151 else:
152 return _CipherContext(ctx)
155_CIPHER_TYPE = Cipher[
156 typing.Union[
157 modes.ModeWithNonce,
158 modes.ModeWithTweak,
159 None,
160 modes.ECB,
161 modes.ModeWithInitializationVector,
162 ]
163]
166class _CipherContext(CipherContext):
167 _ctx: typing.Optional["_BackendCipherContext"]
169 def __init__(self, ctx: "_BackendCipherContext") -> None:
170 self._ctx = ctx
172 def update(self, data: bytes) -> bytes:
173 if self._ctx is None:
174 raise AlreadyFinalized("Context was already finalized.")
175 return self._ctx.update(data)
177 def update_into(self, data: bytes, buf: bytes) -> int:
178 if self._ctx is None:
179 raise AlreadyFinalized("Context was already finalized.")
180 return self._ctx.update_into(data, buf)
182 def finalize(self) -> bytes:
183 if self._ctx is None:
184 raise AlreadyFinalized("Context was already finalized.")
185 data = self._ctx.finalize()
186 self._ctx = None
187 return data
190class _AEADCipherContext(AEADCipherContext):
191 _ctx: typing.Optional["_BackendCipherContext"]
192 _tag: typing.Optional[bytes]
194 def __init__(self, ctx: "_BackendCipherContext") -> None:
195 self._ctx = ctx
196 self._bytes_processed = 0
197 self._aad_bytes_processed = 0
198 self._tag = None
199 self._updated = False
201 def _check_limit(self, data_size: int) -> None:
202 if self._ctx is None:
203 raise AlreadyFinalized("Context was already finalized.")
204 self._updated = True
205 self._bytes_processed += data_size
206 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES:
207 raise ValueError(
208 "{} has a maximum encrypted byte limit of {}".format(
209 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES
210 )
211 )
213 def update(self, data: bytes) -> bytes:
214 self._check_limit(len(data))
215 # mypy needs this assert even though _check_limit already checked
216 assert self._ctx is not None
217 return self._ctx.update(data)
219 def update_into(self, data: bytes, buf: bytes) -> int:
220 self._check_limit(len(data))
221 # mypy needs this assert even though _check_limit already checked
222 assert self._ctx is not None
223 return self._ctx.update_into(data, buf)
225 def finalize(self) -> bytes:
226 if self._ctx is None:
227 raise AlreadyFinalized("Context was already finalized.")
228 data = self._ctx.finalize()
229 self._tag = self._ctx.tag
230 self._ctx = None
231 return data
233 def authenticate_additional_data(self, data: bytes) -> None:
234 if self._ctx is None:
235 raise AlreadyFinalized("Context was already finalized.")
236 if self._updated:
237 raise AlreadyUpdated("Update has been called on this context.")
239 self._aad_bytes_processed += len(data)
240 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES:
241 raise ValueError(
242 "{} has a maximum AAD byte limit of {}".format(
243 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES
244 )
245 )
247 self._ctx.authenticate_additional_data(data)
250class _AEADDecryptionContext(_AEADCipherContext, AEADDecryptionContext):
251 def finalize_with_tag(self, tag: bytes) -> bytes:
252 if self._ctx is None:
253 raise AlreadyFinalized("Context was already finalized.")
254 data = self._ctx.finalize_with_tag(tag)
255 self._tag = self._ctx.tag
256 self._ctx = None
257 return data
260class _AEADEncryptionContext(_AEADCipherContext, AEADEncryptionContext):
261 @property
262 def tag(self) -> bytes:
263 if self._ctx is not None:
264 raise NotYetFinalized(
265 "You must finalize encryption before " "getting the tag."
266 )
267 assert self._tag is not None
268 return self._tag