1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import annotations
6
7import typing
8
9from cryptography.exceptions import InvalidTag
10
11if typing.TYPE_CHECKING:
12 from cryptography.hazmat.backends.openssl.backend import Backend
13 from cryptography.hazmat.primitives.ciphers.aead import (
14 AESCCM,
15 AESGCM,
16 )
17
18 _AEADTypes = typing.Union[AESCCM, AESGCM]
19
20
21def _aead_cipher_supported(backend: Backend, cipher: _AEADTypes) -> bool:
22 cipher_name = _evp_cipher_cipher_name(cipher)
23
24 return backend._lib.EVP_get_cipherbyname(cipher_name) != backend._ffi.NULL
25
26
27def _encrypt(
28 backend: Backend,
29 cipher: _AEADTypes,
30 nonce: bytes,
31 data: bytes,
32 associated_data: list[bytes],
33 tag_length: int,
34) -> bytes:
35 return _evp_cipher_encrypt(
36 backend, cipher, nonce, data, associated_data, tag_length
37 )
38
39
40def _decrypt(
41 backend: Backend,
42 cipher: _AEADTypes,
43 nonce: bytes,
44 data: bytes,
45 associated_data: list[bytes],
46 tag_length: int,
47) -> bytes:
48 return _evp_cipher_decrypt(
49 backend, cipher, nonce, data, associated_data, tag_length
50 )
51
52
53_ENCRYPT = 1
54_DECRYPT = 0
55
56
57def _evp_cipher_cipher_name(cipher: _AEADTypes) -> bytes:
58 from cryptography.hazmat.primitives.ciphers.aead import (
59 AESCCM,
60 AESGCM,
61 )
62
63 if isinstance(cipher, AESCCM):
64 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii")
65 else:
66 assert isinstance(cipher, AESGCM)
67 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii")
68
69
70def _evp_cipher(cipher_name: bytes, backend: Backend):
71 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name)
72 backend.openssl_assert(evp_cipher != backend._ffi.NULL)
73 return evp_cipher
74
75
76def _evp_cipher_aead_setup(
77 backend: Backend,
78 cipher_name: bytes,
79 key: bytes,
80 nonce: bytes,
81 tag: bytes | None,
82 tag_len: int,
83 operation: int,
84):
85 evp_cipher = _evp_cipher(cipher_name, backend)
86 ctx = backend._lib.EVP_CIPHER_CTX_new()
87 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
88 res = backend._lib.EVP_CipherInit_ex(
89 ctx,
90 evp_cipher,
91 backend._ffi.NULL,
92 backend._ffi.NULL,
93 backend._ffi.NULL,
94 int(operation == _ENCRYPT),
95 )
96 backend.openssl_assert(res != 0)
97 # CCM requires the IVLEN to be set before calling SET_TAG on decrypt
98 res = backend._lib.EVP_CIPHER_CTX_ctrl(
99 ctx,
100 backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
101 len(nonce),
102 backend._ffi.NULL,
103 )
104 backend.openssl_assert(res != 0)
105 if operation == _DECRYPT:
106 assert tag is not None
107 _evp_cipher_set_tag(backend, ctx, tag)
108 elif cipher_name.endswith(b"-ccm"):
109 res = backend._lib.EVP_CIPHER_CTX_ctrl(
110 ctx,
111 backend._lib.EVP_CTRL_AEAD_SET_TAG,
112 tag_len,
113 backend._ffi.NULL,
114 )
115 backend.openssl_assert(res != 0)
116
117 nonce_ptr = backend._ffi.from_buffer(nonce)
118 key_ptr = backend._ffi.from_buffer(key)
119 res = backend._lib.EVP_CipherInit_ex(
120 ctx,
121 backend._ffi.NULL,
122 backend._ffi.NULL,
123 key_ptr,
124 nonce_ptr,
125 int(operation == _ENCRYPT),
126 )
127 backend.openssl_assert(res != 0)
128 return ctx
129
130
131def _evp_cipher_set_tag(backend, ctx, tag: bytes) -> None:
132 tag_ptr = backend._ffi.from_buffer(tag)
133 res = backend._lib.EVP_CIPHER_CTX_ctrl(
134 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag_ptr
135 )
136 backend.openssl_assert(res != 0)
137
138
139def _evp_cipher_set_length(backend: Backend, ctx, data_len: int) -> None:
140 intptr = backend._ffi.new("int *")
141 res = backend._lib.EVP_CipherUpdate(
142 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len
143 )
144 backend.openssl_assert(res != 0)
145
146
147def _evp_cipher_process_aad(
148 backend: Backend, ctx, associated_data: bytes
149) -> None:
150 outlen = backend._ffi.new("int *")
151 a_data_ptr = backend._ffi.from_buffer(associated_data)
152 res = backend._lib.EVP_CipherUpdate(
153 ctx, backend._ffi.NULL, outlen, a_data_ptr, len(associated_data)
154 )
155 backend.openssl_assert(res != 0)
156
157
158def _evp_cipher_process_data(backend: Backend, ctx, data: bytes) -> bytes:
159 outlen = backend._ffi.new("int *")
160 buf = backend._ffi.new("unsigned char[]", len(data))
161 data_ptr = backend._ffi.from_buffer(data)
162 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data_ptr, len(data))
163 backend.openssl_assert(res != 0)
164 return backend._ffi.buffer(buf, outlen[0])[:]
165
166
167def _evp_cipher_encrypt(
168 backend: Backend,
169 cipher: _AEADTypes,
170 nonce: bytes,
171 data: bytes,
172 associated_data: list[bytes],
173 tag_length: int,
174) -> bytes:
175 from cryptography.hazmat.primitives.ciphers.aead import AESCCM
176
177 cipher_name = _evp_cipher_cipher_name(cipher)
178 ctx = _evp_cipher_aead_setup(
179 backend,
180 cipher_name,
181 cipher._key,
182 nonce,
183 None,
184 tag_length,
185 _ENCRYPT,
186 )
187
188 # CCM requires us to pass the length of the data before processing
189 # anything.
190 # However calling this with any other AEAD results in an error
191 if isinstance(cipher, AESCCM):
192 _evp_cipher_set_length(backend, ctx, len(data))
193
194 for ad in associated_data:
195 _evp_cipher_process_aad(backend, ctx, ad)
196 processed_data = _evp_cipher_process_data(backend, ctx, data)
197 outlen = backend._ffi.new("int *")
198 # All AEADs we support besides OCB are streaming so they return nothing
199 # in finalization. OCB can return up to (16 byte block - 1) bytes so
200 # we need a buffer here too.
201 buf = backend._ffi.new("unsigned char[]", 16)
202 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
203 backend.openssl_assert(res != 0)
204 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
205 tag_buf = backend._ffi.new("unsigned char[]", tag_length)
206 res = backend._lib.EVP_CIPHER_CTX_ctrl(
207 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf
208 )
209 backend.openssl_assert(res != 0)
210 tag = backend._ffi.buffer(tag_buf)[:]
211
212 return processed_data + tag
213
214
215def _evp_cipher_decrypt(
216 backend: Backend,
217 cipher: _AEADTypes,
218 nonce: bytes,
219 data: bytes,
220 associated_data: list[bytes],
221 tag_length: int,
222) -> bytes:
223 from cryptography.hazmat.primitives.ciphers.aead import AESCCM
224
225 if len(data) < tag_length:
226 raise InvalidTag
227
228 tag = data[-tag_length:]
229 data = data[:-tag_length]
230 cipher_name = _evp_cipher_cipher_name(cipher)
231 ctx = _evp_cipher_aead_setup(
232 backend,
233 cipher_name,
234 cipher._key,
235 nonce,
236 tag,
237 tag_length,
238 _DECRYPT,
239 )
240
241 # CCM requires us to pass the length of the data before processing
242 # anything.
243 # However calling this with any other AEAD results in an error
244 if isinstance(cipher, AESCCM):
245 _evp_cipher_set_length(backend, ctx, len(data))
246
247 for ad in associated_data:
248 _evp_cipher_process_aad(backend, ctx, ad)
249 # CCM has a different error path if the tag doesn't match. Errors are
250 # raised in Update and Final is irrelevant.
251 if isinstance(cipher, AESCCM):
252 outlen = backend._ffi.new("int *")
253 buf = backend._ffi.new("unsigned char[]", len(data))
254 d_ptr = backend._ffi.from_buffer(data)
255 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, d_ptr, len(data))
256 if res != 1:
257 backend._consume_errors()
258 raise InvalidTag
259
260 processed_data = backend._ffi.buffer(buf, outlen[0])[:]
261 else:
262 processed_data = _evp_cipher_process_data(backend, ctx, data)
263 outlen = backend._ffi.new("int *")
264 # OCB can return up to 15 bytes (16 byte block - 1) in finalization
265 buf = backend._ffi.new("unsigned char[]", 16)
266 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
267 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
268 if res == 0:
269 backend._consume_errors()
270 raise InvalidTag
271
272 return processed_data