Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/aead.py: 11%

143 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5import typing 

6 

7from cryptography.exceptions import InvalidTag 

8 

9if typing.TYPE_CHECKING: 

10 from cryptography.hazmat.backends.openssl.backend import Backend 

11 from cryptography.hazmat.primitives.ciphers.aead import ( 

12 AESCCM, 

13 AESGCM, 

14 AESOCB3, 

15 AESSIV, 

16 ChaCha20Poly1305, 

17 ) 

18 

19 _AEADTypes = typing.Union[ 

20 AESCCM, AESGCM, AESOCB3, AESSIV, ChaCha20Poly1305 

21 ] 

22 

23_ENCRYPT = 1 

24_DECRYPT = 0 

25 

26 

27def _aead_cipher_name(cipher: "_AEADTypes") -> bytes: 

28 from cryptography.hazmat.primitives.ciphers.aead import ( 

29 AESCCM, 

30 AESGCM, 

31 AESOCB3, 

32 AESSIV, 

33 ChaCha20Poly1305, 

34 ) 

35 

36 if isinstance(cipher, ChaCha20Poly1305): 

37 return b"chacha20-poly1305" 

38 elif isinstance(cipher, AESCCM): 

39 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii") 

40 elif isinstance(cipher, AESOCB3): 

41 return f"aes-{len(cipher._key) * 8}-ocb".encode("ascii") 

42 elif isinstance(cipher, AESSIV): 

43 return f"aes-{len(cipher._key) * 8 // 2}-siv".encode("ascii") 

44 else: 

45 assert isinstance(cipher, AESGCM) 

46 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii") 

47 

48 

49def _evp_cipher(cipher_name: bytes, backend: "Backend"): 

50 if cipher_name.endswith(b"-siv"): 

51 evp_cipher = backend._lib.EVP_CIPHER_fetch( 

52 backend._ffi.NULL, 

53 cipher_name, 

54 backend._ffi.NULL, 

55 ) 

56 backend.openssl_assert(evp_cipher != backend._ffi.NULL) 

57 evp_cipher = backend._ffi.gc(evp_cipher, backend._lib.EVP_CIPHER_free) 

58 else: 

59 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name) 

60 backend.openssl_assert(evp_cipher != backend._ffi.NULL) 

61 

62 return evp_cipher 

63 

64 

65def _aead_create_ctx( 

66 backend: "Backend", 

67 cipher: "_AEADTypes", 

68 key: bytes, 

69): 

70 ctx = backend._lib.EVP_CIPHER_CTX_new() 

71 backend.openssl_assert(ctx != backend._ffi.NULL) 

72 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) 

73 cipher_name = _aead_cipher_name(cipher) 

74 evp_cipher = _evp_cipher(cipher_name, backend) 

75 key_ptr = backend._ffi.from_buffer(key) 

76 res = backend._lib.EVP_CipherInit_ex( 

77 ctx, 

78 evp_cipher, 

79 backend._ffi.NULL, 

80 key_ptr, 

81 backend._ffi.NULL, 

82 0, 

83 ) 

84 backend.openssl_assert(res != 0) 

85 return ctx 

86 

87 

88def _aead_setup( 

89 backend: "Backend", 

90 cipher_name: bytes, 

91 key: bytes, 

92 nonce: bytes, 

93 tag: typing.Optional[bytes], 

94 tag_len: int, 

95 operation: int, 

96): 

97 evp_cipher = _evp_cipher(cipher_name, backend) 

98 ctx = backend._lib.EVP_CIPHER_CTX_new() 

99 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) 

100 res = backend._lib.EVP_CipherInit_ex( 

101 ctx, 

102 evp_cipher, 

103 backend._ffi.NULL, 

104 backend._ffi.NULL, 

105 backend._ffi.NULL, 

106 int(operation == _ENCRYPT), 

107 ) 

108 backend.openssl_assert(res != 0) 

109 # CCM requires the IVLEN to be set before calling SET_TAG on decrypt 

110 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

111 ctx, 

112 backend._lib.EVP_CTRL_AEAD_SET_IVLEN, 

113 len(nonce), 

114 backend._ffi.NULL, 

115 ) 

116 backend.openssl_assert(res != 0) 

117 if operation == _DECRYPT: 

118 assert tag is not None 

119 _set_tag(backend, ctx, tag) 

120 elif cipher_name.endswith(b"-ccm"): 

121 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

122 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, tag_len, backend._ffi.NULL 

123 ) 

124 backend.openssl_assert(res != 0) 

125 

126 nonce_ptr = backend._ffi.from_buffer(nonce) 

127 key_ptr = backend._ffi.from_buffer(key) 

128 res = backend._lib.EVP_CipherInit_ex( 

129 ctx, 

130 backend._ffi.NULL, 

131 backend._ffi.NULL, 

132 key_ptr, 

133 nonce_ptr, 

134 int(operation == _ENCRYPT), 

135 ) 

136 backend.openssl_assert(res != 0) 

137 return ctx 

138 

139 

140def _set_tag(backend, ctx, tag: bytes) -> None: 

141 tag_ptr = backend._ffi.from_buffer(tag) 

142 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

143 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag_ptr 

144 ) 

145 backend.openssl_assert(res != 0) 

146 

147 

148def _set_nonce_operation(backend, ctx, nonce: bytes, operation: int) -> None: 

149 nonce_ptr = backend._ffi.from_buffer(nonce) 

150 res = backend._lib.EVP_CipherInit_ex( 

151 ctx, 

152 backend._ffi.NULL, 

153 backend._ffi.NULL, 

154 backend._ffi.NULL, 

155 nonce_ptr, 

156 int(operation == _ENCRYPT), 

157 ) 

158 backend.openssl_assert(res != 0) 

159 

160 

161def _set_length(backend: "Backend", ctx, data_len: int) -> None: 

162 intptr = backend._ffi.new("int *") 

163 res = backend._lib.EVP_CipherUpdate( 

164 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len 

165 ) 

166 backend.openssl_assert(res != 0) 

167 

168 

169def _process_aad(backend: "Backend", ctx, associated_data: bytes) -> None: 

170 outlen = backend._ffi.new("int *") 

171 a_data_ptr = backend._ffi.from_buffer(associated_data) 

172 res = backend._lib.EVP_CipherUpdate( 

173 ctx, backend._ffi.NULL, outlen, a_data_ptr, len(associated_data) 

174 ) 

175 backend.openssl_assert(res != 0) 

176 

177 

178def _process_data(backend: "Backend", ctx, data: bytes) -> bytes: 

179 outlen = backend._ffi.new("int *") 

180 buf = backend._ffi.new("unsigned char[]", len(data)) 

181 data_ptr = backend._ffi.from_buffer(data) 

182 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data_ptr, len(data)) 

183 if res == 0: 

184 # AES SIV can error here if the data is invalid on decrypt 

185 backend._consume_errors() 

186 raise InvalidTag 

187 return backend._ffi.buffer(buf, outlen[0])[:] 

188 

189 

190def _encrypt( 

191 backend: "Backend", 

192 cipher: "_AEADTypes", 

193 nonce: bytes, 

194 data: bytes, 

195 associated_data: typing.List[bytes], 

196 tag_length: int, 

197 ctx: typing.Any = None, 

198) -> bytes: 

199 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV 

200 

201 if ctx is None: 

202 cipher_name = _aead_cipher_name(cipher) 

203 ctx = _aead_setup( 

204 backend, 

205 cipher_name, 

206 cipher._key, 

207 nonce, 

208 None, 

209 tag_length, 

210 _ENCRYPT, 

211 ) 

212 else: 

213 _set_nonce_operation(backend, ctx, nonce, _ENCRYPT) 

214 

215 # CCM requires us to pass the length of the data before processing anything 

216 # However calling this with any other AEAD results in an error 

217 if isinstance(cipher, AESCCM): 

218 _set_length(backend, ctx, len(data)) 

219 

220 for ad in associated_data: 

221 _process_aad(backend, ctx, ad) 

222 processed_data = _process_data(backend, ctx, data) 

223 outlen = backend._ffi.new("int *") 

224 # All AEADs we support besides OCB are streaming so they return nothing 

225 # in finalization. OCB can return up to (16 byte block - 1) bytes so 

226 # we need a buffer here too. 

227 buf = backend._ffi.new("unsigned char[]", 16) 

228 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

229 backend.openssl_assert(res != 0) 

230 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

231 tag_buf = backend._ffi.new("unsigned char[]", tag_length) 

232 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

233 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf 

234 ) 

235 backend.openssl_assert(res != 0) 

236 tag = backend._ffi.buffer(tag_buf)[:] 

237 

238 if isinstance(cipher, AESSIV): 

239 # RFC 5297 defines the output as IV || C, where the tag we generate is 

240 # the "IV" and C is the ciphertext. This is the opposite of our 

241 # other AEADs, which are Ciphertext || Tag 

242 backend.openssl_assert(len(tag) == 16) 

243 return tag + processed_data 

244 else: 

245 return processed_data + tag 

246 

247 

248def _decrypt( 

249 backend: "Backend", 

250 cipher: "_AEADTypes", 

251 nonce: bytes, 

252 data: bytes, 

253 associated_data: typing.List[bytes], 

254 tag_length: int, 

255 ctx: typing.Any = None, 

256) -> bytes: 

257 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV 

258 

259 if len(data) < tag_length: 

260 raise InvalidTag 

261 

262 if isinstance(cipher, AESSIV): 

263 # RFC 5297 defines the output as IV || C, where the tag we generate is 

264 # the "IV" and C is the ciphertext. This is the opposite of our 

265 # other AEADs, which are Ciphertext || Tag 

266 tag = data[:tag_length] 

267 data = data[tag_length:] 

268 else: 

269 tag = data[-tag_length:] 

270 data = data[:-tag_length] 

271 if ctx is None: 

272 cipher_name = _aead_cipher_name(cipher) 

273 ctx = _aead_setup( 

274 backend, cipher_name, cipher._key, nonce, tag, tag_length, _DECRYPT 

275 ) 

276 else: 

277 _set_nonce_operation(backend, ctx, nonce, _DECRYPT) 

278 _set_tag(backend, ctx, tag) 

279 

280 # CCM requires us to pass the length of the data before processing anything 

281 # However calling this with any other AEAD results in an error 

282 if isinstance(cipher, AESCCM): 

283 _set_length(backend, ctx, len(data)) 

284 

285 for ad in associated_data: 

286 _process_aad(backend, ctx, ad) 

287 # CCM has a different error path if the tag doesn't match. Errors are 

288 # raised in Update and Final is irrelevant. 

289 if isinstance(cipher, AESCCM): 

290 outlen = backend._ffi.new("int *") 

291 buf = backend._ffi.new("unsigned char[]", len(data)) 

292 d_ptr = backend._ffi.from_buffer(data) 

293 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, d_ptr, len(data)) 

294 if res != 1: 

295 backend._consume_errors() 

296 raise InvalidTag 

297 

298 processed_data = backend._ffi.buffer(buf, outlen[0])[:] 

299 else: 

300 processed_data = _process_data(backend, ctx, data) 

301 outlen = backend._ffi.new("int *") 

302 # OCB can return up to 15 bytes (16 byte block - 1) in finalization 

303 buf = backend._ffi.new("unsigned char[]", 16) 

304 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

305 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

306 if res == 0: 

307 backend._consume_errors() 

308 raise InvalidTag 

309 

310 return processed_data