Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/aead.py: 11%

120 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5import typing 

6 

7from cryptography.exceptions import InvalidTag 

8 

9 

10if typing.TYPE_CHECKING: 

11 from cryptography.hazmat.backends.openssl.backend import Backend 

12 from cryptography.hazmat.primitives.ciphers.aead import ( 

13 AESCCM, 

14 AESGCM, 

15 AESOCB3, 

16 AESSIV, 

17 ChaCha20Poly1305, 

18 ) 

19 

20 _AEAD_TYPES = typing.Union[ 

21 AESCCM, AESGCM, AESOCB3, AESSIV, ChaCha20Poly1305 

22 ] 

23 

24_ENCRYPT = 1 

25_DECRYPT = 0 

26 

27 

28def _aead_cipher_name(cipher: "_AEAD_TYPES") -> bytes: 

29 from cryptography.hazmat.primitives.ciphers.aead import ( 

30 AESCCM, 

31 AESGCM, 

32 AESOCB3, 

33 AESSIV, 

34 ChaCha20Poly1305, 

35 ) 

36 

37 if isinstance(cipher, ChaCha20Poly1305): 

38 return b"chacha20-poly1305" 

39 elif isinstance(cipher, AESCCM): 

40 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii") 

41 elif isinstance(cipher, AESOCB3): 

42 return f"aes-{len(cipher._key) * 8}-ocb".encode("ascii") 

43 elif isinstance(cipher, AESSIV): 

44 return f"aes-{len(cipher._key) * 8 // 2}-siv".encode("ascii") 

45 else: 

46 assert isinstance(cipher, AESGCM) 

47 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii") 

48 

49 

50def _evp_cipher(cipher_name: bytes, backend: "Backend"): 

51 if cipher_name.endswith(b"-siv"): 

52 evp_cipher = backend._lib.EVP_CIPHER_fetch( 

53 backend._ffi.NULL, 

54 cipher_name, 

55 backend._ffi.NULL, 

56 ) 

57 backend.openssl_assert(evp_cipher != backend._ffi.NULL) 

58 evp_cipher = backend._ffi.gc(evp_cipher, backend._lib.EVP_CIPHER_free) 

59 else: 

60 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name) 

61 backend.openssl_assert(evp_cipher != backend._ffi.NULL) 

62 

63 return evp_cipher 

64 

65 

66def _aead_setup( 

67 backend: "Backend", 

68 cipher_name: bytes, 

69 key: bytes, 

70 nonce: bytes, 

71 tag: typing.Optional[bytes], 

72 tag_len: int, 

73 operation: int, 

74): 

75 evp_cipher = _evp_cipher(cipher_name, backend) 

76 ctx = backend._lib.EVP_CIPHER_CTX_new() 

77 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) 

78 res = backend._lib.EVP_CipherInit_ex( 

79 ctx, 

80 evp_cipher, 

81 backend._ffi.NULL, 

82 backend._ffi.NULL, 

83 backend._ffi.NULL, 

84 int(operation == _ENCRYPT), 

85 ) 

86 backend.openssl_assert(res != 0) 

87 res = backend._lib.EVP_CIPHER_CTX_set_key_length(ctx, len(key)) 

88 backend.openssl_assert(res != 0) 

89 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

90 ctx, 

91 backend._lib.EVP_CTRL_AEAD_SET_IVLEN, 

92 len(nonce), 

93 backend._ffi.NULL, 

94 ) 

95 backend.openssl_assert(res != 0) 

96 if operation == _DECRYPT: 

97 assert tag is not None 

98 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

99 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag 

100 ) 

101 backend.openssl_assert(res != 0) 

102 elif cipher_name.endswith(b"-ccm"): 

103 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

104 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, tag_len, backend._ffi.NULL 

105 ) 

106 backend.openssl_assert(res != 0) 

107 

108 nonce_ptr = backend._ffi.from_buffer(nonce) 

109 key_ptr = backend._ffi.from_buffer(key) 

110 res = backend._lib.EVP_CipherInit_ex( 

111 ctx, 

112 backend._ffi.NULL, 

113 backend._ffi.NULL, 

114 key_ptr, 

115 nonce_ptr, 

116 int(operation == _ENCRYPT), 

117 ) 

118 backend.openssl_assert(res != 0) 

119 return ctx 

120 

121 

122def _set_length(backend: "Backend", ctx, data_len: int) -> None: 

123 intptr = backend._ffi.new("int *") 

124 res = backend._lib.EVP_CipherUpdate( 

125 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len 

126 ) 

127 backend.openssl_assert(res != 0) 

128 

129 

130def _process_aad(backend: "Backend", ctx, associated_data: bytes) -> None: 

131 outlen = backend._ffi.new("int *") 

132 res = backend._lib.EVP_CipherUpdate( 

133 ctx, backend._ffi.NULL, outlen, associated_data, len(associated_data) 

134 ) 

135 backend.openssl_assert(res != 0) 

136 

137 

138def _process_data(backend: "Backend", ctx, data: bytes) -> bytes: 

139 outlen = backend._ffi.new("int *") 

140 buf = backend._ffi.new("unsigned char[]", len(data)) 

141 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data)) 

142 if res == 0: 

143 # AES SIV can error here if the data is invalid on decrypt 

144 backend._consume_errors() 

145 raise InvalidTag 

146 return backend._ffi.buffer(buf, outlen[0])[:] 

147 

148 

149def _encrypt( 

150 backend: "Backend", 

151 cipher: "_AEAD_TYPES", 

152 nonce: bytes, 

153 data: bytes, 

154 associated_data: typing.List[bytes], 

155 tag_length: int, 

156) -> bytes: 

157 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV 

158 

159 cipher_name = _aead_cipher_name(cipher) 

160 ctx = _aead_setup( 

161 backend, cipher_name, cipher._key, nonce, None, tag_length, _ENCRYPT 

162 ) 

163 # CCM requires us to pass the length of the data before processing anything 

164 # However calling this with any other AEAD results in an error 

165 if isinstance(cipher, AESCCM): 

166 _set_length(backend, ctx, len(data)) 

167 

168 for ad in associated_data: 

169 _process_aad(backend, ctx, ad) 

170 processed_data = _process_data(backend, ctx, data) 

171 outlen = backend._ffi.new("int *") 

172 # All AEADs we support besides OCB are streaming so they return nothing 

173 # in finalization. OCB can return up to (16 byte block - 1) bytes so 

174 # we need a buffer here too. 

175 buf = backend._ffi.new("unsigned char[]", 16) 

176 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

177 backend.openssl_assert(res != 0) 

178 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

179 tag_buf = backend._ffi.new("unsigned char[]", tag_length) 

180 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

181 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf 

182 ) 

183 backend.openssl_assert(res != 0) 

184 tag = backend._ffi.buffer(tag_buf)[:] 

185 

186 if isinstance(cipher, AESSIV): 

187 # RFC 5297 defines the output as IV || C, where the tag we generate is 

188 # the "IV" and C is the ciphertext. This is the opposite of our 

189 # other AEADs, which are Ciphertext || Tag 

190 backend.openssl_assert(len(tag) == 16) 

191 return tag + processed_data 

192 else: 

193 return processed_data + tag 

194 

195 

196def _decrypt( 

197 backend: "Backend", 

198 cipher: "_AEAD_TYPES", 

199 nonce: bytes, 

200 data: bytes, 

201 associated_data: typing.List[bytes], 

202 tag_length: int, 

203) -> bytes: 

204 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV 

205 

206 if len(data) < tag_length: 

207 raise InvalidTag 

208 

209 if isinstance(cipher, AESSIV): 

210 # RFC 5297 defines the output as IV || C, where the tag we generate is 

211 # the "IV" and C is the ciphertext. This is the opposite of our 

212 # other AEADs, which are Ciphertext || Tag 

213 tag = data[:tag_length] 

214 data = data[tag_length:] 

215 else: 

216 tag = data[-tag_length:] 

217 data = data[:-tag_length] 

218 cipher_name = _aead_cipher_name(cipher) 

219 ctx = _aead_setup( 

220 backend, cipher_name, cipher._key, nonce, tag, tag_length, _DECRYPT 

221 ) 

222 # CCM requires us to pass the length of the data before processing anything 

223 # However calling this with any other AEAD results in an error 

224 if isinstance(cipher, AESCCM): 

225 _set_length(backend, ctx, len(data)) 

226 

227 for ad in associated_data: 

228 _process_aad(backend, ctx, ad) 

229 # CCM has a different error path if the tag doesn't match. Errors are 

230 # raised in Update and Final is irrelevant. 

231 if isinstance(cipher, AESCCM): 

232 outlen = backend._ffi.new("int *") 

233 buf = backend._ffi.new("unsigned char[]", len(data)) 

234 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data)) 

235 if res != 1: 

236 backend._consume_errors() 

237 raise InvalidTag 

238 

239 processed_data = backend._ffi.buffer(buf, outlen[0])[:] 

240 else: 

241 processed_data = _process_data(backend, ctx, data) 

242 outlen = backend._ffi.new("int *") 

243 # OCB can return up to 15 bytes (16 byte block - 1) in finalization 

244 buf = backend._ffi.new("unsigned char[]", 16) 

245 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

246 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

247 if res == 0: 

248 backend._consume_errors() 

249 raise InvalidTag 

250 

251 return processed_data