Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/cryptography/hazmat/primitives/ciphers/base.py: 91%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

56 statements  

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import abc 

8import typing 

9 

10from cryptography.hazmat.bindings._rust import openssl as rust_openssl 

11from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm 

12from cryptography.hazmat.primitives.ciphers import modes 

13from cryptography.utils import Buffer 

14 

15 

16class CipherContext(metaclass=abc.ABCMeta): 

17 @abc.abstractmethod 

18 def update(self, data: Buffer) -> bytes: 

19 """ 

20 Processes the provided bytes through the cipher and returns the results 

21 as bytes. 

22 """ 

23 

24 @abc.abstractmethod 

25 def update_into(self, data: Buffer, buf: Buffer) -> int: 

26 """ 

27 Processes the provided bytes and writes the resulting data into the 

28 provided buffer. Returns the number of bytes written. 

29 """ 

30 

31 @abc.abstractmethod 

32 def finalize(self) -> bytes: 

33 """ 

34 Returns the results of processing the final block as bytes. 

35 """ 

36 

37 @abc.abstractmethod 

38 def reset_nonce(self, nonce: bytes) -> None: 

39 """ 

40 Resets the nonce for the cipher context to the provided value. 

41 Raises an exception if it does not support reset or if the 

42 provided nonce does not have a valid length. 

43 """ 

44 

45 

46class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta): 

47 @abc.abstractmethod 

48 def authenticate_additional_data(self, data: Buffer) -> None: 

49 """ 

50 Authenticates the provided bytes. 

51 """ 

52 

53 

54class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 

55 @abc.abstractmethod 

56 def finalize_with_tag(self, tag: bytes) -> bytes: 

57 """ 

58 Returns the results of processing the final block as bytes and allows 

59 delayed passing of the authentication tag. 

60 """ 

61 

62 

63class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 

64 @property 

65 @abc.abstractmethod 

66 def tag(self) -> bytes: 

67 """ 

68 Returns tag bytes. This is only available after encryption is 

69 finalized. 

70 """ 

71 

72 

73Mode = typing.TypeVar( 

74 "Mode", bound=typing.Optional[modes.Mode], covariant=True 

75) 

76 

77 

78class Cipher(typing.Generic[Mode]): 

79 def __init__( 

80 self, 

81 algorithm: CipherAlgorithm, 

82 mode: Mode, 

83 backend: typing.Any = None, 

84 ) -> None: 

85 if not isinstance(algorithm, CipherAlgorithm): 

86 raise TypeError("Expected interface of CipherAlgorithm.") 

87 

88 if mode is not None: 

89 # mypy needs this assert to narrow the type from our generic 

90 # type. Maybe it won't some time in the future. 

91 assert isinstance(mode, modes.Mode) 

92 mode.validate_for_algorithm(algorithm) 

93 

94 self.algorithm = algorithm 

95 self.mode = mode 

96 

97 @typing.overload 

98 def encryptor( 

99 self: Cipher[modes.ModeWithAuthenticationTag], 

100 ) -> AEADEncryptionContext: ... 

101 

102 @typing.overload 

103 def encryptor( 

104 self: _CIPHER_TYPE, 

105 ) -> CipherContext: ... 

106 

107 def encryptor(self): 

108 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 

109 if self.mode.tag is not None: 

110 raise ValueError( 

111 "Authentication tag must be None when encrypting." 

112 ) 

113 

114 return rust_openssl.ciphers.create_encryption_ctx( 

115 self.algorithm, self.mode 

116 ) 

117 

118 @typing.overload 

119 def decryptor( 

120 self: Cipher[modes.ModeWithAuthenticationTag], 

121 ) -> AEADDecryptionContext: ... 

122 

123 @typing.overload 

124 def decryptor( 

125 self: _CIPHER_TYPE, 

126 ) -> CipherContext: ... 

127 

128 def decryptor(self): 

129 return rust_openssl.ciphers.create_decryption_ctx( 

130 self.algorithm, self.mode 

131 ) 

132 

133 

134_CIPHER_TYPE = Cipher[ 

135 typing.Union[ 

136 modes.ModeWithNonce, 

137 modes.ModeWithTweak, 

138 modes.ECB, 

139 modes.ModeWithInitializationVector, 

140 None, 

141 ] 

142] 

143 

144CipherContext.register(rust_openssl.ciphers.CipherContext) 

145AEADEncryptionContext.register(rust_openssl.ciphers.AEADEncryptionContext) 

146AEADDecryptionContext.register(rust_openssl.ciphers.AEADDecryptionContext)