Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/hashes.py: 27%

45 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5import typing 

6 

7from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 

8from cryptography.hazmat.primitives import hashes 

9 

10if typing.TYPE_CHECKING: 

11 from cryptography.hazmat.backends.openssl.backend import Backend 

12 

13 

14class _HashContext(hashes.HashContext): 

15 def __init__( 

16 self, backend: "Backend", algorithm: hashes.HashAlgorithm, ctx=None 

17 ) -> None: 

18 self._algorithm = algorithm 

19 

20 self._backend = backend 

21 

22 if ctx is None: 

23 ctx = self._backend._lib.EVP_MD_CTX_new() 

24 ctx = self._backend._ffi.gc( 

25 ctx, self._backend._lib.EVP_MD_CTX_free 

26 ) 

27 evp_md = self._backend._evp_md_from_algorithm(algorithm) 

28 if evp_md == self._backend._ffi.NULL: 

29 raise UnsupportedAlgorithm( 

30 "{} is not a supported hash on this backend.".format( 

31 algorithm.name 

32 ), 

33 _Reasons.UNSUPPORTED_HASH, 

34 ) 

35 res = self._backend._lib.EVP_DigestInit_ex( 

36 ctx, evp_md, self._backend._ffi.NULL 

37 ) 

38 self._backend.openssl_assert(res != 0) 

39 

40 self._ctx = ctx 

41 

42 @property 

43 def algorithm(self) -> hashes.HashAlgorithm: 

44 return self._algorithm 

45 

46 def copy(self) -> "_HashContext": 

47 copied_ctx = self._backend._lib.EVP_MD_CTX_new() 

48 copied_ctx = self._backend._ffi.gc( 

49 copied_ctx, self._backend._lib.EVP_MD_CTX_free 

50 ) 

51 res = self._backend._lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx) 

52 self._backend.openssl_assert(res != 0) 

53 return _HashContext(self._backend, self.algorithm, ctx=copied_ctx) 

54 

55 def update(self, data: bytes) -> None: 

56 data_ptr = self._backend._ffi.from_buffer(data) 

57 res = self._backend._lib.EVP_DigestUpdate( 

58 self._ctx, data_ptr, len(data) 

59 ) 

60 self._backend.openssl_assert(res != 0) 

61 

62 def finalize(self) -> bytes: 

63 if isinstance(self.algorithm, hashes.ExtendableOutputFunction): 

64 # extendable output functions use a different finalize 

65 return self._finalize_xof() 

66 else: 

67 buf = self._backend._ffi.new( 

68 "unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE 

69 ) 

70 outlen = self._backend._ffi.new("unsigned int *") 

71 res = self._backend._lib.EVP_DigestFinal_ex(self._ctx, buf, outlen) 

72 self._backend.openssl_assert(res != 0) 

73 self._backend.openssl_assert( 

74 outlen[0] == self.algorithm.digest_size 

75 ) 

76 return self._backend._ffi.buffer(buf)[: outlen[0]] 

77 

78 def _finalize_xof(self) -> bytes: 

79 buf = self._backend._ffi.new( 

80 "unsigned char[]", self.algorithm.digest_size 

81 ) 

82 res = self._backend._lib.EVP_DigestFinalXOF( 

83 self._ctx, buf, self.algorithm.digest_size 

84 ) 

85 self._backend.openssl_assert(res != 0) 

86 return self._backend._ffi.buffer(buf)[: self.algorithm.digest_size]