Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/primitives/ciphers/base.py: 53%

137 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5 

6import abc 

7import typing 

8 

9from cryptography.exceptions import ( 

10 AlreadyFinalized, 

11 AlreadyUpdated, 

12 NotYetFinalized, 

13) 

14from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm 

15from cryptography.hazmat.primitives.ciphers import modes 

16 

17if typing.TYPE_CHECKING: 

18 from cryptography.hazmat.backends.openssl.ciphers import ( 

19 _CipherContext as _BackendCipherContext, 

20 ) 

21 

22 

23class CipherContext(metaclass=abc.ABCMeta): 

24 @abc.abstractmethod 

25 def update(self, data: bytes) -> bytes: 

26 """ 

27 Processes the provided bytes through the cipher and returns the results 

28 as bytes. 

29 """ 

30 

31 @abc.abstractmethod 

32 def update_into(self, data: bytes, buf: bytes) -> int: 

33 """ 

34 Processes the provided bytes and writes the resulting data into the 

35 provided buffer. Returns the number of bytes written. 

36 """ 

37 

38 @abc.abstractmethod 

39 def finalize(self) -> bytes: 

40 """ 

41 Returns the results of processing the final block as bytes. 

42 """ 

43 

44 

45class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta): 

46 @abc.abstractmethod 

47 def authenticate_additional_data(self, data: bytes) -> None: 

48 """ 

49 Authenticates the provided bytes. 

50 """ 

51 

52 

53class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 

54 @abc.abstractmethod 

55 def finalize_with_tag(self, tag: bytes) -> bytes: 

56 """ 

57 Returns the results of processing the final block as bytes and allows 

58 delayed passing of the authentication tag. 

59 """ 

60 

61 

62class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 

63 @property 

64 @abc.abstractmethod 

65 def tag(self) -> bytes: 

66 """ 

67 Returns tag bytes. This is only available after encryption is 

68 finalized. 

69 """ 

70 

71 

72Mode = typing.TypeVar( 

73 "Mode", bound=typing.Optional[modes.Mode], covariant=True 

74) 

75 

76 

77class Cipher(typing.Generic[Mode]): 

78 def __init__( 

79 self, 

80 algorithm: CipherAlgorithm, 

81 mode: Mode, 

82 backend: typing.Any = None, 

83 ) -> None: 

84 if not isinstance(algorithm, CipherAlgorithm): 

85 raise TypeError("Expected interface of CipherAlgorithm.") 

86 

87 if mode is not None: 

88 # mypy needs this assert to narrow the type from our generic 

89 # type. Maybe it won't some time in the future. 

90 assert isinstance(mode, modes.Mode) 

91 mode.validate_for_algorithm(algorithm) 

92 

93 self.algorithm = algorithm 

94 self.mode = mode 

95 

96 @typing.overload 

97 def encryptor( 

98 self: "Cipher[modes.ModeWithAuthenticationTag]", 

99 ) -> AEADEncryptionContext: 

100 ... 

101 

102 @typing.overload 

103 def encryptor( 

104 self: "_CIPHER_TYPE", 

105 ) -> CipherContext: 

106 ... 

107 

108 def encryptor(self): 

109 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 

110 if self.mode.tag is not None: 

111 raise ValueError( 

112 "Authentication tag must be None when encrypting." 

113 ) 

114 from cryptography.hazmat.backends.openssl.backend import backend 

115 

116 ctx = backend.create_symmetric_encryption_ctx( 

117 self.algorithm, self.mode 

118 ) 

119 return self._wrap_ctx(ctx, encrypt=True) 

120 

121 @typing.overload 

122 def decryptor( 

123 self: "Cipher[modes.ModeWithAuthenticationTag]", 

124 ) -> AEADDecryptionContext: 

125 ... 

126 

127 @typing.overload 

128 def decryptor( 

129 self: "_CIPHER_TYPE", 

130 ) -> CipherContext: 

131 ... 

132 

133 def decryptor(self): 

134 from cryptography.hazmat.backends.openssl.backend import backend 

135 

136 ctx = backend.create_symmetric_decryption_ctx( 

137 self.algorithm, self.mode 

138 ) 

139 return self._wrap_ctx(ctx, encrypt=False) 

140 

141 def _wrap_ctx( 

142 self, ctx: "_BackendCipherContext", encrypt: bool 

143 ) -> typing.Union[ 

144 AEADEncryptionContext, AEADDecryptionContext, CipherContext 

145 ]: 

146 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 

147 if encrypt: 

148 return _AEADEncryptionContext(ctx) 

149 else: 

150 return _AEADDecryptionContext(ctx) 

151 else: 

152 return _CipherContext(ctx) 

153 

154 

155_CIPHER_TYPE = Cipher[ 

156 typing.Union[ 

157 modes.ModeWithNonce, 

158 modes.ModeWithTweak, 

159 None, 

160 modes.ECB, 

161 modes.ModeWithInitializationVector, 

162 ] 

163] 

164 

165 

166class _CipherContext(CipherContext): 

167 _ctx: typing.Optional["_BackendCipherContext"] 

168 

169 def __init__(self, ctx: "_BackendCipherContext") -> None: 

170 self._ctx = ctx 

171 

172 def update(self, data: bytes) -> bytes: 

173 if self._ctx is None: 

174 raise AlreadyFinalized("Context was already finalized.") 

175 return self._ctx.update(data) 

176 

177 def update_into(self, data: bytes, buf: bytes) -> int: 

178 if self._ctx is None: 

179 raise AlreadyFinalized("Context was already finalized.") 

180 return self._ctx.update_into(data, buf) 

181 

182 def finalize(self) -> bytes: 

183 if self._ctx is None: 

184 raise AlreadyFinalized("Context was already finalized.") 

185 data = self._ctx.finalize() 

186 self._ctx = None 

187 return data 

188 

189 

190class _AEADCipherContext(AEADCipherContext): 

191 _ctx: typing.Optional["_BackendCipherContext"] 

192 _tag: typing.Optional[bytes] 

193 

194 def __init__(self, ctx: "_BackendCipherContext") -> None: 

195 self._ctx = ctx 

196 self._bytes_processed = 0 

197 self._aad_bytes_processed = 0 

198 self._tag = None 

199 self._updated = False 

200 

201 def _check_limit(self, data_size: int) -> None: 

202 if self._ctx is None: 

203 raise AlreadyFinalized("Context was already finalized.") 

204 self._updated = True 

205 self._bytes_processed += data_size 

206 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES: 

207 raise ValueError( 

208 "{} has a maximum encrypted byte limit of {}".format( 

209 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES 

210 ) 

211 ) 

212 

213 def update(self, data: bytes) -> bytes: 

214 self._check_limit(len(data)) 

215 # mypy needs this assert even though _check_limit already checked 

216 assert self._ctx is not None 

217 return self._ctx.update(data) 

218 

219 def update_into(self, data: bytes, buf: bytes) -> int: 

220 self._check_limit(len(data)) 

221 # mypy needs this assert even though _check_limit already checked 

222 assert self._ctx is not None 

223 return self._ctx.update_into(data, buf) 

224 

225 def finalize(self) -> bytes: 

226 if self._ctx is None: 

227 raise AlreadyFinalized("Context was already finalized.") 

228 data = self._ctx.finalize() 

229 self._tag = self._ctx.tag 

230 self._ctx = None 

231 return data 

232 

233 def authenticate_additional_data(self, data: bytes) -> None: 

234 if self._ctx is None: 

235 raise AlreadyFinalized("Context was already finalized.") 

236 if self._updated: 

237 raise AlreadyUpdated("Update has been called on this context.") 

238 

239 self._aad_bytes_processed += len(data) 

240 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES: 

241 raise ValueError( 

242 "{} has a maximum AAD byte limit of {}".format( 

243 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES 

244 ) 

245 ) 

246 

247 self._ctx.authenticate_additional_data(data) 

248 

249 

250class _AEADDecryptionContext(_AEADCipherContext, AEADDecryptionContext): 

251 def finalize_with_tag(self, tag: bytes) -> bytes: 

252 if self._ctx is None: 

253 raise AlreadyFinalized("Context was already finalized.") 

254 data = self._ctx.finalize_with_tag(tag) 

255 self._tag = self._ctx.tag 

256 self._ctx = None 

257 return data 

258 

259 

260class _AEADEncryptionContext(_AEADCipherContext, AEADEncryptionContext): 

261 @property 

262 def tag(self) -> bytes: 

263 if self._ctx is not None: 

264 raise NotYetFinalized( 

265 "You must finalize encryption before " "getting the tag." 

266 ) 

267 assert self._tag is not None 

268 return self._tag