Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/ciphers.py: 14%
124 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5import typing
7from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm, _Reasons
8from cryptography.hazmat.primitives import ciphers
9from cryptography.hazmat.primitives.ciphers import algorithms, modes
12if typing.TYPE_CHECKING:
13 from cryptography.hazmat.backends.openssl.backend import Backend
16class _CipherContext:
17 _ENCRYPT = 1
18 _DECRYPT = 0
19 _MAX_CHUNK_SIZE = 2**30 - 1
21 def __init__(
22 self, backend: "Backend", cipher, mode, operation: int
23 ) -> None:
24 self._backend = backend
25 self._cipher = cipher
26 self._mode = mode
27 self._operation = operation
28 self._tag: typing.Optional[bytes] = None
30 if isinstance(self._cipher, ciphers.BlockCipherAlgorithm):
31 self._block_size_bytes = self._cipher.block_size // 8
32 else:
33 self._block_size_bytes = 1
35 ctx = self._backend._lib.EVP_CIPHER_CTX_new()
36 ctx = self._backend._ffi.gc(
37 ctx, self._backend._lib.EVP_CIPHER_CTX_free
38 )
40 registry = self._backend._cipher_registry
41 try:
42 adapter = registry[type(cipher), type(mode)]
43 except KeyError:
44 raise UnsupportedAlgorithm(
45 "cipher {} in {} mode is not supported "
46 "by this backend.".format(
47 cipher.name, mode.name if mode else mode
48 ),
49 _Reasons.UNSUPPORTED_CIPHER,
50 )
52 evp_cipher = adapter(self._backend, cipher, mode)
53 if evp_cipher == self._backend._ffi.NULL:
54 msg = "cipher {0.name} ".format(cipher)
55 if mode is not None:
56 msg += "in {0.name} mode ".format(mode)
57 msg += (
58 "is not supported by this backend (Your version of OpenSSL "
59 "may be too old. Current version: {}.)"
60 ).format(self._backend.openssl_version_text())
61 raise UnsupportedAlgorithm(msg, _Reasons.UNSUPPORTED_CIPHER)
63 if isinstance(mode, modes.ModeWithInitializationVector):
64 iv_nonce = self._backend._ffi.from_buffer(
65 mode.initialization_vector
66 )
67 elif isinstance(mode, modes.ModeWithTweak):
68 iv_nonce = self._backend._ffi.from_buffer(mode.tweak)
69 elif isinstance(mode, modes.ModeWithNonce):
70 iv_nonce = self._backend._ffi.from_buffer(mode.nonce)
71 elif isinstance(cipher, algorithms.ChaCha20):
72 iv_nonce = self._backend._ffi.from_buffer(cipher.nonce)
73 else:
74 iv_nonce = self._backend._ffi.NULL
75 # begin init with cipher and operation type
76 res = self._backend._lib.EVP_CipherInit_ex(
77 ctx,
78 evp_cipher,
79 self._backend._ffi.NULL,
80 self._backend._ffi.NULL,
81 self._backend._ffi.NULL,
82 operation,
83 )
84 self._backend.openssl_assert(res != 0)
85 # set the key length to handle variable key ciphers
86 res = self._backend._lib.EVP_CIPHER_CTX_set_key_length(
87 ctx, len(cipher.key)
88 )
89 self._backend.openssl_assert(res != 0)
90 if isinstance(mode, modes.GCM):
91 res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
92 ctx,
93 self._backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
94 len(iv_nonce),
95 self._backend._ffi.NULL,
96 )
97 self._backend.openssl_assert(res != 0)
98 if mode.tag is not None:
99 res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
100 ctx,
101 self._backend._lib.EVP_CTRL_AEAD_SET_TAG,
102 len(mode.tag),
103 mode.tag,
104 )
105 self._backend.openssl_assert(res != 0)
106 self._tag = mode.tag
108 # pass key/iv
109 res = self._backend._lib.EVP_CipherInit_ex(
110 ctx,
111 self._backend._ffi.NULL,
112 self._backend._ffi.NULL,
113 self._backend._ffi.from_buffer(cipher.key),
114 iv_nonce,
115 operation,
116 )
118 # Check for XTS mode duplicate keys error
119 errors = self._backend._consume_errors()
120 lib = self._backend._lib
121 if res == 0 and (
122 (
123 lib.CRYPTOGRAPHY_OPENSSL_111D_OR_GREATER
124 and errors[0]._lib_reason_match(
125 lib.ERR_LIB_EVP, lib.EVP_R_XTS_DUPLICATED_KEYS
126 )
127 )
128 or (
129 lib.Cryptography_HAS_PROVIDERS
130 and errors[0]._lib_reason_match(
131 lib.ERR_LIB_PROV, lib.PROV_R_XTS_DUPLICATED_KEYS
132 )
133 )
134 ):
135 raise ValueError("In XTS mode duplicated keys are not allowed")
137 self._backend.openssl_assert(res != 0, errors=errors)
139 # We purposely disable padding here as it's handled higher up in the
140 # API.
141 self._backend._lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
142 self._ctx = ctx
144 def update(self, data: bytes) -> bytes:
145 buf = bytearray(len(data) + self._block_size_bytes - 1)
146 n = self.update_into(data, buf)
147 return bytes(buf[:n])
149 def update_into(self, data: bytes, buf: bytes) -> int:
150 total_data_len = len(data)
151 if len(buf) < (total_data_len + self._block_size_bytes - 1):
152 raise ValueError(
153 "buffer must be at least {} bytes for this "
154 "payload".format(len(data) + self._block_size_bytes - 1)
155 )
157 data_processed = 0
158 total_out = 0
159 outlen = self._backend._ffi.new("int *")
160 baseoutbuf = self._backend._ffi.from_buffer(buf)
161 baseinbuf = self._backend._ffi.from_buffer(data)
163 while data_processed != total_data_len:
164 outbuf = baseoutbuf + total_out
165 inbuf = baseinbuf + data_processed
166 inlen = min(self._MAX_CHUNK_SIZE, total_data_len - data_processed)
168 res = self._backend._lib.EVP_CipherUpdate(
169 self._ctx, outbuf, outlen, inbuf, inlen
170 )
171 if res == 0 and isinstance(self._mode, modes.XTS):
172 self._backend._consume_errors()
173 raise ValueError(
174 "In XTS mode you must supply at least a full block in the "
175 "first update call. For AES this is 16 bytes."
176 )
177 else:
178 self._backend.openssl_assert(res != 0)
179 data_processed += inlen
180 total_out += outlen[0]
182 return total_out
184 def finalize(self) -> bytes:
185 if (
186 self._operation == self._DECRYPT
187 and isinstance(self._mode, modes.ModeWithAuthenticationTag)
188 and self.tag is None
189 ):
190 raise ValueError(
191 "Authentication tag must be provided when decrypting."
192 )
194 buf = self._backend._ffi.new("unsigned char[]", self._block_size_bytes)
195 outlen = self._backend._ffi.new("int *")
196 res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
197 if res == 0:
198 errors = self._backend._consume_errors()
200 if not errors and isinstance(self._mode, modes.GCM):
201 raise InvalidTag
203 lib = self._backend._lib
204 self._backend.openssl_assert(
205 errors[0]._lib_reason_match(
206 lib.ERR_LIB_EVP,
207 lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH,
208 )
209 or (
210 lib.Cryptography_HAS_PROVIDERS
211 and errors[0]._lib_reason_match(
212 lib.ERR_LIB_PROV,
213 lib.PROV_R_WRONG_FINAL_BLOCK_LENGTH,
214 )
215 )
216 or (
217 lib.CRYPTOGRAPHY_IS_BORINGSSL
218 and errors[0].reason
219 == lib.CIPHER_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH
220 ),
221 errors=errors,
222 )
223 raise ValueError(
224 "The length of the provided data is not a multiple of "
225 "the block length."
226 )
228 if (
229 isinstance(self._mode, modes.GCM)
230 and self._operation == self._ENCRYPT
231 ):
232 tag_buf = self._backend._ffi.new(
233 "unsigned char[]", self._block_size_bytes
234 )
235 res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
236 self._ctx,
237 self._backend._lib.EVP_CTRL_AEAD_GET_TAG,
238 self._block_size_bytes,
239 tag_buf,
240 )
241 self._backend.openssl_assert(res != 0)
242 self._tag = self._backend._ffi.buffer(tag_buf)[:]
244 res = self._backend._lib.EVP_CIPHER_CTX_reset(self._ctx)
245 self._backend.openssl_assert(res == 1)
246 return self._backend._ffi.buffer(buf)[: outlen[0]]
248 def finalize_with_tag(self, tag: bytes) -> bytes:
249 tag_len = len(tag)
250 if tag_len < self._mode._min_tag_length:
251 raise ValueError(
252 "Authentication tag must be {} bytes or longer.".format(
253 self._mode._min_tag_length
254 )
255 )
256 elif tag_len > self._block_size_bytes:
257 raise ValueError(
258 "Authentication tag cannot be more than {} bytes.".format(
259 self._block_size_bytes
260 )
261 )
262 res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
263 self._ctx, self._backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
264 )
265 self._backend.openssl_assert(res != 0)
266 self._tag = tag
267 return self.finalize()
269 def authenticate_additional_data(self, data: bytes) -> None:
270 outlen = self._backend._ffi.new("int *")
271 res = self._backend._lib.EVP_CipherUpdate(
272 self._ctx,
273 self._backend._ffi.NULL,
274 outlen,
275 self._backend._ffi.from_buffer(data),
276 len(data),
277 )
278 self._backend.openssl_assert(res != 0)
280 @property
281 def tag(self) -> typing.Optional[bytes]:
282 return self._tag