Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/aead.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

114 statements  

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import typing 

8 

9from cryptography.exceptions import InvalidTag 

10 

11if typing.TYPE_CHECKING: 

12 from cryptography.hazmat.backends.openssl.backend import Backend 

13 from cryptography.hazmat.primitives.ciphers.aead import ( 

14 AESCCM, 

15 AESGCM, 

16 ) 

17 

18 _AEADTypes = typing.Union[AESCCM, AESGCM] 

19 

20 

21def _aead_cipher_supported(backend: Backend, cipher: _AEADTypes) -> bool: 

22 cipher_name = _evp_cipher_cipher_name(cipher) 

23 

24 return backend._lib.EVP_get_cipherbyname(cipher_name) != backend._ffi.NULL 

25 

26 

27def _encrypt( 

28 backend: Backend, 

29 cipher: _AEADTypes, 

30 nonce: bytes, 

31 data: bytes, 

32 associated_data: list[bytes], 

33 tag_length: int, 

34) -> bytes: 

35 return _evp_cipher_encrypt( 

36 backend, cipher, nonce, data, associated_data, tag_length 

37 ) 

38 

39 

40def _decrypt( 

41 backend: Backend, 

42 cipher: _AEADTypes, 

43 nonce: bytes, 

44 data: bytes, 

45 associated_data: list[bytes], 

46 tag_length: int, 

47) -> bytes: 

48 return _evp_cipher_decrypt( 

49 backend, cipher, nonce, data, associated_data, tag_length 

50 ) 

51 

52 

53_ENCRYPT = 1 

54_DECRYPT = 0 

55 

56 

57def _evp_cipher_cipher_name(cipher: _AEADTypes) -> bytes: 

58 from cryptography.hazmat.primitives.ciphers.aead import ( 

59 AESCCM, 

60 AESGCM, 

61 ) 

62 

63 if isinstance(cipher, AESCCM): 

64 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii") 

65 else: 

66 assert isinstance(cipher, AESGCM) 

67 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii") 

68 

69 

70def _evp_cipher(cipher_name: bytes, backend: Backend): 

71 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name) 

72 backend.openssl_assert(evp_cipher != backend._ffi.NULL) 

73 return evp_cipher 

74 

75 

76def _evp_cipher_aead_setup( 

77 backend: Backend, 

78 cipher_name: bytes, 

79 key: bytes, 

80 nonce: bytes, 

81 tag: bytes | None, 

82 tag_len: int, 

83 operation: int, 

84): 

85 evp_cipher = _evp_cipher(cipher_name, backend) 

86 ctx = backend._lib.EVP_CIPHER_CTX_new() 

87 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) 

88 res = backend._lib.EVP_CipherInit_ex( 

89 ctx, 

90 evp_cipher, 

91 backend._ffi.NULL, 

92 backend._ffi.NULL, 

93 backend._ffi.NULL, 

94 int(operation == _ENCRYPT), 

95 ) 

96 backend.openssl_assert(res != 0) 

97 # CCM requires the IVLEN to be set before calling SET_TAG on decrypt 

98 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

99 ctx, 

100 backend._lib.EVP_CTRL_AEAD_SET_IVLEN, 

101 len(nonce), 

102 backend._ffi.NULL, 

103 ) 

104 backend.openssl_assert(res != 0) 

105 if operation == _DECRYPT: 

106 assert tag is not None 

107 _evp_cipher_set_tag(backend, ctx, tag) 

108 elif cipher_name.endswith(b"-ccm"): 

109 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

110 ctx, 

111 backend._lib.EVP_CTRL_AEAD_SET_TAG, 

112 tag_len, 

113 backend._ffi.NULL, 

114 ) 

115 backend.openssl_assert(res != 0) 

116 

117 nonce_ptr = backend._ffi.from_buffer(nonce) 

118 key_ptr = backend._ffi.from_buffer(key) 

119 res = backend._lib.EVP_CipherInit_ex( 

120 ctx, 

121 backend._ffi.NULL, 

122 backend._ffi.NULL, 

123 key_ptr, 

124 nonce_ptr, 

125 int(operation == _ENCRYPT), 

126 ) 

127 backend.openssl_assert(res != 0) 

128 return ctx 

129 

130 

131def _evp_cipher_set_tag(backend, ctx, tag: bytes) -> None: 

132 tag_ptr = backend._ffi.from_buffer(tag) 

133 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

134 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag_ptr 

135 ) 

136 backend.openssl_assert(res != 0) 

137 

138 

139def _evp_cipher_set_length(backend: Backend, ctx, data_len: int) -> None: 

140 intptr = backend._ffi.new("int *") 

141 res = backend._lib.EVP_CipherUpdate( 

142 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len 

143 ) 

144 backend.openssl_assert(res != 0) 

145 

146 

147def _evp_cipher_process_aad( 

148 backend: Backend, ctx, associated_data: bytes 

149) -> None: 

150 outlen = backend._ffi.new("int *") 

151 a_data_ptr = backend._ffi.from_buffer(associated_data) 

152 res = backend._lib.EVP_CipherUpdate( 

153 ctx, backend._ffi.NULL, outlen, a_data_ptr, len(associated_data) 

154 ) 

155 backend.openssl_assert(res != 0) 

156 

157 

158def _evp_cipher_process_data(backend: Backend, ctx, data: bytes) -> bytes: 

159 outlen = backend._ffi.new("int *") 

160 buf = backend._ffi.new("unsigned char[]", len(data)) 

161 data_ptr = backend._ffi.from_buffer(data) 

162 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data_ptr, len(data)) 

163 backend.openssl_assert(res != 0) 

164 return backend._ffi.buffer(buf, outlen[0])[:] 

165 

166 

167def _evp_cipher_encrypt( 

168 backend: Backend, 

169 cipher: _AEADTypes, 

170 nonce: bytes, 

171 data: bytes, 

172 associated_data: list[bytes], 

173 tag_length: int, 

174) -> bytes: 

175 from cryptography.hazmat.primitives.ciphers.aead import AESCCM 

176 

177 cipher_name = _evp_cipher_cipher_name(cipher) 

178 ctx = _evp_cipher_aead_setup( 

179 backend, 

180 cipher_name, 

181 cipher._key, 

182 nonce, 

183 None, 

184 tag_length, 

185 _ENCRYPT, 

186 ) 

187 

188 # CCM requires us to pass the length of the data before processing 

189 # anything. 

190 # However calling this with any other AEAD results in an error 

191 if isinstance(cipher, AESCCM): 

192 _evp_cipher_set_length(backend, ctx, len(data)) 

193 

194 for ad in associated_data: 

195 _evp_cipher_process_aad(backend, ctx, ad) 

196 processed_data = _evp_cipher_process_data(backend, ctx, data) 

197 outlen = backend._ffi.new("int *") 

198 # All AEADs we support besides OCB are streaming so they return nothing 

199 # in finalization. OCB can return up to (16 byte block - 1) bytes so 

200 # we need a buffer here too. 

201 buf = backend._ffi.new("unsigned char[]", 16) 

202 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

203 backend.openssl_assert(res != 0) 

204 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

205 tag_buf = backend._ffi.new("unsigned char[]", tag_length) 

206 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

207 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf 

208 ) 

209 backend.openssl_assert(res != 0) 

210 tag = backend._ffi.buffer(tag_buf)[:] 

211 

212 return processed_data + tag 

213 

214 

215def _evp_cipher_decrypt( 

216 backend: Backend, 

217 cipher: _AEADTypes, 

218 nonce: bytes, 

219 data: bytes, 

220 associated_data: list[bytes], 

221 tag_length: int, 

222) -> bytes: 

223 from cryptography.hazmat.primitives.ciphers.aead import AESCCM 

224 

225 if len(data) < tag_length: 

226 raise InvalidTag 

227 

228 tag = data[-tag_length:] 

229 data = data[:-tag_length] 

230 cipher_name = _evp_cipher_cipher_name(cipher) 

231 ctx = _evp_cipher_aead_setup( 

232 backend, 

233 cipher_name, 

234 cipher._key, 

235 nonce, 

236 tag, 

237 tag_length, 

238 _DECRYPT, 

239 ) 

240 

241 # CCM requires us to pass the length of the data before processing 

242 # anything. 

243 # However calling this with any other AEAD results in an error 

244 if isinstance(cipher, AESCCM): 

245 _evp_cipher_set_length(backend, ctx, len(data)) 

246 

247 for ad in associated_data: 

248 _evp_cipher_process_aad(backend, ctx, ad) 

249 # CCM has a different error path if the tag doesn't match. Errors are 

250 # raised in Update and Final is irrelevant. 

251 if isinstance(cipher, AESCCM): 

252 outlen = backend._ffi.new("int *") 

253 buf = backend._ffi.new("unsigned char[]", len(data)) 

254 d_ptr = backend._ffi.from_buffer(data) 

255 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, d_ptr, len(data)) 

256 if res != 1: 

257 backend._consume_errors() 

258 raise InvalidTag 

259 

260 processed_data = backend._ffi.buffer(buf, outlen[0])[:] 

261 else: 

262 processed_data = _evp_cipher_process_data(backend, ctx, data) 

263 outlen = backend._ffi.new("int *") 

264 # OCB can return up to 15 bytes (16 byte block - 1) in finalization 

265 buf = backend._ffi.new("unsigned char[]", 16) 

266 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

267 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

268 if res == 0: 

269 backend._consume_errors() 

270 raise InvalidTag 

271 

272 return processed_data