Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/aead.py: 72%
195 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 07:26 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 07:26 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import typing
9from cryptography.exceptions import InvalidTag
11if typing.TYPE_CHECKING:
12 from cryptography.hazmat.backends.openssl.backend import Backend
13 from cryptography.hazmat.primitives.ciphers.aead import (
14 AESCCM,
15 AESGCM,
16 ChaCha20Poly1305,
17 )
19 _AEADTypes = typing.Union[AESCCM, AESGCM, ChaCha20Poly1305]
22def _is_evp_aead_supported_cipher(
23 backend: Backend, cipher: _AEADTypes
24) -> bool:
25 """
26 Checks whether the given cipher is supported through
27 EVP_AEAD rather than the normal OpenSSL EVP_CIPHER API.
28 """
29 from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
31 return backend._lib.Cryptography_HAS_EVP_AEAD and isinstance(
32 cipher, ChaCha20Poly1305
33 )
36def _aead_cipher_supported(backend: Backend, cipher: _AEADTypes) -> bool:
37 if _is_evp_aead_supported_cipher(backend, cipher):
38 return True
39 else:
40 cipher_name = _evp_cipher_cipher_name(cipher)
41 if backend._fips_enabled and cipher_name not in backend._fips_aead:
42 return False
43 return (
44 backend._lib.EVP_get_cipherbyname(cipher_name) != backend._ffi.NULL
45 )
48def _aead_create_ctx(
49 backend: Backend,
50 cipher: _AEADTypes,
51 key: bytes,
52):
53 if _is_evp_aead_supported_cipher(backend, cipher):
54 return _evp_aead_create_ctx(backend, cipher, key)
55 else:
56 return _evp_cipher_create_ctx(backend, cipher, key)
59def _encrypt(
60 backend: Backend,
61 cipher: _AEADTypes,
62 nonce: bytes,
63 data: bytes,
64 associated_data: list[bytes],
65 tag_length: int,
66 ctx: typing.Any = None,
67) -> bytes:
68 if _is_evp_aead_supported_cipher(backend, cipher):
69 return _evp_aead_encrypt(
70 backend, cipher, nonce, data, associated_data, tag_length, ctx
71 )
72 else:
73 return _evp_cipher_encrypt(
74 backend, cipher, nonce, data, associated_data, tag_length, ctx
75 )
78def _decrypt(
79 backend: Backend,
80 cipher: _AEADTypes,
81 nonce: bytes,
82 data: bytes,
83 associated_data: list[bytes],
84 tag_length: int,
85 ctx: typing.Any = None,
86) -> bytes:
87 if _is_evp_aead_supported_cipher(backend, cipher):
88 return _evp_aead_decrypt(
89 backend, cipher, nonce, data, associated_data, tag_length, ctx
90 )
91 else:
92 return _evp_cipher_decrypt(
93 backend, cipher, nonce, data, associated_data, tag_length, ctx
94 )
97def _evp_aead_create_ctx(
98 backend: Backend,
99 cipher: _AEADTypes,
100 key: bytes,
101 tag_len: int | None = None,
102):
103 aead_cipher = _evp_aead_get_cipher(backend, cipher)
104 assert aead_cipher is not None
105 key_ptr = backend._ffi.from_buffer(key)
106 tag_len = (
107 backend._lib.EVP_AEAD_DEFAULT_TAG_LENGTH
108 if tag_len is None
109 else tag_len
110 )
111 ctx = backend._lib.Cryptography_EVP_AEAD_CTX_new(
112 aead_cipher, key_ptr, len(key), tag_len
113 )
114 backend.openssl_assert(ctx != backend._ffi.NULL)
115 ctx = backend._ffi.gc(ctx, backend._lib.EVP_AEAD_CTX_free)
116 return ctx
119def _evp_aead_get_cipher(backend: Backend, cipher: _AEADTypes):
120 from cryptography.hazmat.primitives.ciphers.aead import (
121 ChaCha20Poly1305,
122 )
124 # Currently only ChaCha20-Poly1305 is supported using this API
125 assert isinstance(cipher, ChaCha20Poly1305)
126 return backend._lib.EVP_aead_chacha20_poly1305()
129def _evp_aead_encrypt(
130 backend: Backend,
131 cipher: _AEADTypes,
132 nonce: bytes,
133 data: bytes,
134 associated_data: list[bytes],
135 tag_length: int,
136 ctx: typing.Any,
137) -> bytes:
138 assert ctx is not None
140 aead_cipher = _evp_aead_get_cipher(backend, cipher)
141 assert aead_cipher is not None
143 out_len = backend._ffi.new("size_t *")
144 # max_out_len should be in_len plus the result of
145 # EVP_AEAD_max_overhead.
146 max_out_len = len(data) + backend._lib.EVP_AEAD_max_overhead(aead_cipher)
147 out_buf = backend._ffi.new("uint8_t[]", max_out_len)
148 data_ptr = backend._ffi.from_buffer(data)
149 nonce_ptr = backend._ffi.from_buffer(nonce)
150 aad = b"".join(associated_data)
151 aad_ptr = backend._ffi.from_buffer(aad)
153 res = backend._lib.EVP_AEAD_CTX_seal(
154 ctx,
155 out_buf,
156 out_len,
157 max_out_len,
158 nonce_ptr,
159 len(nonce),
160 data_ptr,
161 len(data),
162 aad_ptr,
163 len(aad),
164 )
165 backend.openssl_assert(res == 1)
166 encrypted_data = backend._ffi.buffer(out_buf, out_len[0])[:]
167 return encrypted_data
170def _evp_aead_decrypt(
171 backend: Backend,
172 cipher: _AEADTypes,
173 nonce: bytes,
174 data: bytes,
175 associated_data: list[bytes],
176 tag_length: int,
177 ctx: typing.Any,
178) -> bytes:
179 if len(data) < tag_length:
180 raise InvalidTag
182 assert ctx is not None
184 out_len = backend._ffi.new("size_t *")
185 # max_out_len should at least in_len
186 max_out_len = len(data)
187 out_buf = backend._ffi.new("uint8_t[]", max_out_len)
188 data_ptr = backend._ffi.from_buffer(data)
189 nonce_ptr = backend._ffi.from_buffer(nonce)
190 aad = b"".join(associated_data)
191 aad_ptr = backend._ffi.from_buffer(aad)
193 res = backend._lib.EVP_AEAD_CTX_open(
194 ctx,
195 out_buf,
196 out_len,
197 max_out_len,
198 nonce_ptr,
199 len(nonce),
200 data_ptr,
201 len(data),
202 aad_ptr,
203 len(aad),
204 )
206 if res == 0:
207 backend._consume_errors()
208 raise InvalidTag
210 decrypted_data = backend._ffi.buffer(out_buf, out_len[0])[:]
211 return decrypted_data
214_ENCRYPT = 1
215_DECRYPT = 0
218def _evp_cipher_cipher_name(cipher: _AEADTypes) -> bytes:
219 from cryptography.hazmat.primitives.ciphers.aead import (
220 AESCCM,
221 AESGCM,
222 ChaCha20Poly1305,
223 )
225 if isinstance(cipher, ChaCha20Poly1305):
226 return b"chacha20-poly1305"
227 elif isinstance(cipher, AESCCM):
228 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii")
229 else:
230 assert isinstance(cipher, AESGCM)
231 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii")
234def _evp_cipher(cipher_name: bytes, backend: Backend):
235 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name)
236 backend.openssl_assert(evp_cipher != backend._ffi.NULL)
237 return evp_cipher
240def _evp_cipher_create_ctx(
241 backend: Backend,
242 cipher: _AEADTypes,
243 key: bytes,
244):
245 ctx = backend._lib.EVP_CIPHER_CTX_new()
246 backend.openssl_assert(ctx != backend._ffi.NULL)
247 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
248 cipher_name = _evp_cipher_cipher_name(cipher)
249 evp_cipher = _evp_cipher(cipher_name, backend)
250 key_ptr = backend._ffi.from_buffer(key)
251 res = backend._lib.EVP_CipherInit_ex(
252 ctx,
253 evp_cipher,
254 backend._ffi.NULL,
255 key_ptr,
256 backend._ffi.NULL,
257 0,
258 )
259 backend.openssl_assert(res != 0)
260 return ctx
263def _evp_cipher_aead_setup(
264 backend: Backend,
265 cipher_name: bytes,
266 key: bytes,
267 nonce: bytes,
268 tag: bytes | None,
269 tag_len: int,
270 operation: int,
271):
272 evp_cipher = _evp_cipher(cipher_name, backend)
273 ctx = backend._lib.EVP_CIPHER_CTX_new()
274 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
275 res = backend._lib.EVP_CipherInit_ex(
276 ctx,
277 evp_cipher,
278 backend._ffi.NULL,
279 backend._ffi.NULL,
280 backend._ffi.NULL,
281 int(operation == _ENCRYPT),
282 )
283 backend.openssl_assert(res != 0)
284 # CCM requires the IVLEN to be set before calling SET_TAG on decrypt
285 res = backend._lib.EVP_CIPHER_CTX_ctrl(
286 ctx,
287 backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
288 len(nonce),
289 backend._ffi.NULL,
290 )
291 backend.openssl_assert(res != 0)
292 if operation == _DECRYPT:
293 assert tag is not None
294 _evp_cipher_set_tag(backend, ctx, tag)
295 elif cipher_name.endswith(b"-ccm"):
296 res = backend._lib.EVP_CIPHER_CTX_ctrl(
297 ctx,
298 backend._lib.EVP_CTRL_AEAD_SET_TAG,
299 tag_len,
300 backend._ffi.NULL,
301 )
302 backend.openssl_assert(res != 0)
304 nonce_ptr = backend._ffi.from_buffer(nonce)
305 key_ptr = backend._ffi.from_buffer(key)
306 res = backend._lib.EVP_CipherInit_ex(
307 ctx,
308 backend._ffi.NULL,
309 backend._ffi.NULL,
310 key_ptr,
311 nonce_ptr,
312 int(operation == _ENCRYPT),
313 )
314 backend.openssl_assert(res != 0)
315 return ctx
318def _evp_cipher_set_tag(backend, ctx, tag: bytes) -> None:
319 tag_ptr = backend._ffi.from_buffer(tag)
320 res = backend._lib.EVP_CIPHER_CTX_ctrl(
321 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag_ptr
322 )
323 backend.openssl_assert(res != 0)
326def _evp_cipher_set_nonce_operation(
327 backend, ctx, nonce: bytes, operation: int
328) -> None:
329 nonce_ptr = backend._ffi.from_buffer(nonce)
330 res = backend._lib.EVP_CipherInit_ex(
331 ctx,
332 backend._ffi.NULL,
333 backend._ffi.NULL,
334 backend._ffi.NULL,
335 nonce_ptr,
336 int(operation == _ENCRYPT),
337 )
338 backend.openssl_assert(res != 0)
341def _evp_cipher_set_length(backend: Backend, ctx, data_len: int) -> None:
342 intptr = backend._ffi.new("int *")
343 res = backend._lib.EVP_CipherUpdate(
344 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len
345 )
346 backend.openssl_assert(res != 0)
349def _evp_cipher_process_aad(
350 backend: Backend, ctx, associated_data: bytes
351) -> None:
352 outlen = backend._ffi.new("int *")
353 a_data_ptr = backend._ffi.from_buffer(associated_data)
354 res = backend._lib.EVP_CipherUpdate(
355 ctx, backend._ffi.NULL, outlen, a_data_ptr, len(associated_data)
356 )
357 backend.openssl_assert(res != 0)
360def _evp_cipher_process_data(backend: Backend, ctx, data: bytes) -> bytes:
361 outlen = backend._ffi.new("int *")
362 buf = backend._ffi.new("unsigned char[]", len(data))
363 data_ptr = backend._ffi.from_buffer(data)
364 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data_ptr, len(data))
365 backend.openssl_assert(res != 0)
366 return backend._ffi.buffer(buf, outlen[0])[:]
369def _evp_cipher_encrypt(
370 backend: Backend,
371 cipher: _AEADTypes,
372 nonce: bytes,
373 data: bytes,
374 associated_data: list[bytes],
375 tag_length: int,
376 ctx: typing.Any = None,
377) -> bytes:
378 from cryptography.hazmat.primitives.ciphers.aead import AESCCM
380 if ctx is None:
381 cipher_name = _evp_cipher_cipher_name(cipher)
382 ctx = _evp_cipher_aead_setup(
383 backend,
384 cipher_name,
385 cipher._key,
386 nonce,
387 None,
388 tag_length,
389 _ENCRYPT,
390 )
391 else:
392 _evp_cipher_set_nonce_operation(backend, ctx, nonce, _ENCRYPT)
394 # CCM requires us to pass the length of the data before processing
395 # anything.
396 # However calling this with any other AEAD results in an error
397 if isinstance(cipher, AESCCM):
398 _evp_cipher_set_length(backend, ctx, len(data))
400 for ad in associated_data:
401 _evp_cipher_process_aad(backend, ctx, ad)
402 processed_data = _evp_cipher_process_data(backend, ctx, data)
403 outlen = backend._ffi.new("int *")
404 # All AEADs we support besides OCB are streaming so they return nothing
405 # in finalization. OCB can return up to (16 byte block - 1) bytes so
406 # we need a buffer here too.
407 buf = backend._ffi.new("unsigned char[]", 16)
408 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
409 backend.openssl_assert(res != 0)
410 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
411 tag_buf = backend._ffi.new("unsigned char[]", tag_length)
412 res = backend._lib.EVP_CIPHER_CTX_ctrl(
413 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf
414 )
415 backend.openssl_assert(res != 0)
416 tag = backend._ffi.buffer(tag_buf)[:]
418 return processed_data + tag
421def _evp_cipher_decrypt(
422 backend: Backend,
423 cipher: _AEADTypes,
424 nonce: bytes,
425 data: bytes,
426 associated_data: list[bytes],
427 tag_length: int,
428 ctx: typing.Any = None,
429) -> bytes:
430 from cryptography.hazmat.primitives.ciphers.aead import AESCCM
432 if len(data) < tag_length:
433 raise InvalidTag
435 tag = data[-tag_length:]
436 data = data[:-tag_length]
437 if ctx is None:
438 cipher_name = _evp_cipher_cipher_name(cipher)
439 ctx = _evp_cipher_aead_setup(
440 backend,
441 cipher_name,
442 cipher._key,
443 nonce,
444 tag,
445 tag_length,
446 _DECRYPT,
447 )
448 else:
449 _evp_cipher_set_nonce_operation(backend, ctx, nonce, _DECRYPT)
450 _evp_cipher_set_tag(backend, ctx, tag)
452 # CCM requires us to pass the length of the data before processing
453 # anything.
454 # However calling this with any other AEAD results in an error
455 if isinstance(cipher, AESCCM):
456 _evp_cipher_set_length(backend, ctx, len(data))
458 for ad in associated_data:
459 _evp_cipher_process_aad(backend, ctx, ad)
460 # CCM has a different error path if the tag doesn't match. Errors are
461 # raised in Update and Final is irrelevant.
462 if isinstance(cipher, AESCCM):
463 outlen = backend._ffi.new("int *")
464 buf = backend._ffi.new("unsigned char[]", len(data))
465 d_ptr = backend._ffi.from_buffer(data)
466 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, d_ptr, len(data))
467 if res != 1:
468 backend._consume_errors()
469 raise InvalidTag
471 processed_data = backend._ffi.buffer(buf, outlen[0])[:]
472 else:
473 processed_data = _evp_cipher_process_data(backend, ctx, data)
474 outlen = backend._ffi.new("int *")
475 # OCB can return up to 15 bytes (16 byte block - 1) in finalization
476 buf = backend._ffi.new("unsigned char[]", 16)
477 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
478 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
479 if res == 0:
480 backend._consume_errors()
481 raise InvalidTag
483 return processed_data