Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/ciphers.py: 14%
125 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:13 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:13 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import typing
9from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm, _Reasons
10from cryptography.hazmat.primitives import ciphers
11from cryptography.hazmat.primitives.ciphers import algorithms, modes
13if typing.TYPE_CHECKING:
14 from cryptography.hazmat.backends.openssl.backend import Backend
17class _CipherContext:
18 _ENCRYPT = 1
19 _DECRYPT = 0
20 _MAX_CHUNK_SIZE = 2**30 - 1
22 def __init__(self, backend: Backend, cipher, mode, operation: int) -> None:
23 self._backend = backend
24 self._cipher = cipher
25 self._mode = mode
26 self._operation = operation
27 self._tag: typing.Optional[bytes] = None
29 if isinstance(self._cipher, ciphers.BlockCipherAlgorithm):
30 self._block_size_bytes = self._cipher.block_size // 8
31 else:
32 self._block_size_bytes = 1
34 ctx = self._backend._lib.EVP_CIPHER_CTX_new()
35 ctx = self._backend._ffi.gc(
36 ctx, self._backend._lib.EVP_CIPHER_CTX_free
37 )
39 registry = self._backend._cipher_registry
40 try:
41 adapter = registry[type(cipher), type(mode)]
42 except KeyError:
43 raise UnsupportedAlgorithm(
44 "cipher {} in {} mode is not supported "
45 "by this backend.".format(
46 cipher.name, mode.name if mode else mode
47 ),
48 _Reasons.UNSUPPORTED_CIPHER,
49 )
51 evp_cipher = adapter(self._backend, cipher, mode)
52 if evp_cipher == self._backend._ffi.NULL:
53 msg = f"cipher {cipher.name} "
54 if mode is not None:
55 msg += f"in {mode.name} mode "
56 msg += (
57 "is not supported by this backend (Your version of OpenSSL "
58 "may be too old. Current version: {}.)"
59 ).format(self._backend.openssl_version_text())
60 raise UnsupportedAlgorithm(msg, _Reasons.UNSUPPORTED_CIPHER)
62 if isinstance(mode, modes.ModeWithInitializationVector):
63 iv_nonce = self._backend._ffi.from_buffer(
64 mode.initialization_vector
65 )
66 elif isinstance(mode, modes.ModeWithTweak):
67 iv_nonce = self._backend._ffi.from_buffer(mode.tweak)
68 elif isinstance(mode, modes.ModeWithNonce):
69 iv_nonce = self._backend._ffi.from_buffer(mode.nonce)
70 elif isinstance(cipher, algorithms.ChaCha20):
71 iv_nonce = self._backend._ffi.from_buffer(cipher.nonce)
72 else:
73 iv_nonce = self._backend._ffi.NULL
74 # begin init with cipher and operation type
75 res = self._backend._lib.EVP_CipherInit_ex(
76 ctx,
77 evp_cipher,
78 self._backend._ffi.NULL,
79 self._backend._ffi.NULL,
80 self._backend._ffi.NULL,
81 operation,
82 )
83 self._backend.openssl_assert(res != 0)
84 # set the key length to handle variable key ciphers
85 res = self._backend._lib.EVP_CIPHER_CTX_set_key_length(
86 ctx, len(cipher.key)
87 )
88 self._backend.openssl_assert(res != 0)
89 if isinstance(mode, modes.GCM):
90 res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
91 ctx,
92 self._backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
93 len(iv_nonce),
94 self._backend._ffi.NULL,
95 )
96 self._backend.openssl_assert(res != 0)
97 if mode.tag is not None:
98 res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
99 ctx,
100 self._backend._lib.EVP_CTRL_AEAD_SET_TAG,
101 len(mode.tag),
102 mode.tag,
103 )
104 self._backend.openssl_assert(res != 0)
105 self._tag = mode.tag
107 # pass key/iv
108 res = self._backend._lib.EVP_CipherInit_ex(
109 ctx,
110 self._backend._ffi.NULL,
111 self._backend._ffi.NULL,
112 self._backend._ffi.from_buffer(cipher.key),
113 iv_nonce,
114 operation,
115 )
117 # Check for XTS mode duplicate keys error
118 errors = self._backend._consume_errors()
119 lib = self._backend._lib
120 if res == 0 and (
121 (
122 not lib.CRYPTOGRAPHY_IS_LIBRESSL
123 and errors[0]._lib_reason_match(
124 lib.ERR_LIB_EVP, lib.EVP_R_XTS_DUPLICATED_KEYS
125 )
126 )
127 or (
128 lib.Cryptography_HAS_PROVIDERS
129 and errors[0]._lib_reason_match(
130 lib.ERR_LIB_PROV, lib.PROV_R_XTS_DUPLICATED_KEYS
131 )
132 )
133 ):
134 raise ValueError("In XTS mode duplicated keys are not allowed")
136 self._backend.openssl_assert(res != 0, errors=errors)
138 # We purposely disable padding here as it's handled higher up in the
139 # API.
140 self._backend._lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
141 self._ctx = ctx
143 def update(self, data: bytes) -> bytes:
144 buf = bytearray(len(data) + self._block_size_bytes - 1)
145 n = self.update_into(data, buf)
146 return bytes(buf[:n])
148 def update_into(self, data: bytes, buf: bytes) -> int:
149 total_data_len = len(data)
150 if len(buf) < (total_data_len + self._block_size_bytes - 1):
151 raise ValueError(
152 "buffer must be at least {} bytes for this "
153 "payload".format(len(data) + self._block_size_bytes - 1)
154 )
156 data_processed = 0
157 total_out = 0
158 outlen = self._backend._ffi.new("int *")
159 baseoutbuf = self._backend._ffi.from_buffer(buf, require_writable=True)
160 baseinbuf = self._backend._ffi.from_buffer(data)
162 while data_processed != total_data_len:
163 outbuf = baseoutbuf + total_out
164 inbuf = baseinbuf + data_processed
165 inlen = min(self._MAX_CHUNK_SIZE, total_data_len - data_processed)
167 res = self._backend._lib.EVP_CipherUpdate(
168 self._ctx, outbuf, outlen, inbuf, inlen
169 )
170 if res == 0 and isinstance(self._mode, modes.XTS):
171 self._backend._consume_errors()
172 raise ValueError(
173 "In XTS mode you must supply at least a full block in the "
174 "first update call. For AES this is 16 bytes."
175 )
176 else:
177 self._backend.openssl_assert(res != 0)
178 data_processed += inlen
179 total_out += outlen[0]
181 return total_out
183 def finalize(self) -> bytes:
184 if (
185 self._operation == self._DECRYPT
186 and isinstance(self._mode, modes.ModeWithAuthenticationTag)
187 and self.tag is None
188 ):
189 raise ValueError(
190 "Authentication tag must be provided when decrypting."
191 )
193 buf = self._backend._ffi.new("unsigned char[]", self._block_size_bytes)
194 outlen = self._backend._ffi.new("int *")
195 res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
196 if res == 0:
197 errors = self._backend._consume_errors()
199 if not errors and isinstance(self._mode, modes.GCM):
200 raise InvalidTag
202 lib = self._backend._lib
203 self._backend.openssl_assert(
204 errors[0]._lib_reason_match(
205 lib.ERR_LIB_EVP,
206 lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH,
207 )
208 or (
209 lib.Cryptography_HAS_PROVIDERS
210 and errors[0]._lib_reason_match(
211 lib.ERR_LIB_PROV,
212 lib.PROV_R_WRONG_FINAL_BLOCK_LENGTH,
213 )
214 )
215 or (
216 lib.CRYPTOGRAPHY_IS_BORINGSSL
217 and errors[0].reason
218 == lib.CIPHER_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH
219 ),
220 errors=errors,
221 )
222 raise ValueError(
223 "The length of the provided data is not a multiple of "
224 "the block length."
225 )
227 if (
228 isinstance(self._mode, modes.GCM)
229 and self._operation == self._ENCRYPT
230 ):
231 tag_buf = self._backend._ffi.new(
232 "unsigned char[]", self._block_size_bytes
233 )
234 res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
235 self._ctx,
236 self._backend._lib.EVP_CTRL_AEAD_GET_TAG,
237 self._block_size_bytes,
238 tag_buf,
239 )
240 self._backend.openssl_assert(res != 0)
241 self._tag = self._backend._ffi.buffer(tag_buf)[:]
243 res = self._backend._lib.EVP_CIPHER_CTX_reset(self._ctx)
244 self._backend.openssl_assert(res == 1)
245 return self._backend._ffi.buffer(buf)[: outlen[0]]
247 def finalize_with_tag(self, tag: bytes) -> bytes:
248 tag_len = len(tag)
249 if tag_len < self._mode._min_tag_length:
250 raise ValueError(
251 "Authentication tag must be {} bytes or longer.".format(
252 self._mode._min_tag_length
253 )
254 )
255 elif tag_len > self._block_size_bytes:
256 raise ValueError(
257 "Authentication tag cannot be more than {} bytes.".format(
258 self._block_size_bytes
259 )
260 )
261 res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
262 self._ctx, self._backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
263 )
264 self._backend.openssl_assert(res != 0)
265 self._tag = tag
266 return self.finalize()
268 def authenticate_additional_data(self, data: bytes) -> None:
269 outlen = self._backend._ffi.new("int *")
270 res = self._backend._lib.EVP_CipherUpdate(
271 self._ctx,
272 self._backend._ffi.NULL,
273 outlen,
274 self._backend._ffi.from_buffer(data),
275 len(data),
276 )
277 self._backend.openssl_assert(res != 0)
279 @property
280 def tag(self) -> typing.Optional[bytes]:
281 return self._tag