Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/fernet.py: 26%

137 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import base64 

8import binascii 

9import os 

10import time 

11import typing 

12 

13from cryptography import utils 

14from cryptography.exceptions import InvalidSignature 

15from cryptography.hazmat.primitives import hashes, padding 

16from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

17from cryptography.hazmat.primitives.hmac import HMAC 

18 

19 

20class InvalidToken(Exception): 

21 pass 

22 

23 

24_MAX_CLOCK_SKEW = 60 

25 

26 

27class Fernet: 

28 def __init__( 

29 self, 

30 key: typing.Union[bytes, str], 

31 backend: typing.Any = None, 

32 ) -> None: 

33 try: 

34 key = base64.urlsafe_b64decode(key) 

35 except binascii.Error as exc: 

36 raise ValueError( 

37 "Fernet key must be 32 url-safe base64-encoded bytes." 

38 ) from exc 

39 if len(key) != 32: 

40 raise ValueError( 

41 "Fernet key must be 32 url-safe base64-encoded bytes." 

42 ) 

43 

44 self._signing_key = key[:16] 

45 self._encryption_key = key[16:] 

46 

47 @classmethod 

48 def generate_key(cls) -> bytes: 

49 return base64.urlsafe_b64encode(os.urandom(32)) 

50 

51 def encrypt(self, data: bytes) -> bytes: 

52 return self.encrypt_at_time(data, int(time.time())) 

53 

54 def encrypt_at_time(self, data: bytes, current_time: int) -> bytes: 

55 iv = os.urandom(16) 

56 return self._encrypt_from_parts(data, current_time, iv) 

57 

58 def _encrypt_from_parts( 

59 self, data: bytes, current_time: int, iv: bytes 

60 ) -> bytes: 

61 utils._check_bytes("data", data) 

62 

63 padder = padding.PKCS7(algorithms.AES.block_size).padder() 

64 padded_data = padder.update(data) + padder.finalize() 

65 encryptor = Cipher( 

66 algorithms.AES(self._encryption_key), 

67 modes.CBC(iv), 

68 ).encryptor() 

69 ciphertext = encryptor.update(padded_data) + encryptor.finalize() 

70 

71 basic_parts = ( 

72 b"\x80" 

73 + current_time.to_bytes(length=8, byteorder="big") 

74 + iv 

75 + ciphertext 

76 ) 

77 

78 h = HMAC(self._signing_key, hashes.SHA256()) 

79 h.update(basic_parts) 

80 hmac = h.finalize() 

81 return base64.urlsafe_b64encode(basic_parts + hmac) 

82 

83 def decrypt( 

84 self, token: typing.Union[bytes, str], ttl: typing.Optional[int] = None 

85 ) -> bytes: 

86 timestamp, data = Fernet._get_unverified_token_data(token) 

87 if ttl is None: 

88 time_info = None 

89 else: 

90 time_info = (ttl, int(time.time())) 

91 return self._decrypt_data(data, timestamp, time_info) 

92 

93 def decrypt_at_time( 

94 self, token: typing.Union[bytes, str], ttl: int, current_time: int 

95 ) -> bytes: 

96 if ttl is None: 

97 raise ValueError( 

98 "decrypt_at_time() can only be used with a non-None ttl" 

99 ) 

100 timestamp, data = Fernet._get_unverified_token_data(token) 

101 return self._decrypt_data(data, timestamp, (ttl, current_time)) 

102 

103 def extract_timestamp(self, token: typing.Union[bytes, str]) -> int: 

104 timestamp, data = Fernet._get_unverified_token_data(token) 

105 # Verify the token was not tampered with. 

106 self._verify_signature(data) 

107 return timestamp 

108 

109 @staticmethod 

110 def _get_unverified_token_data( 

111 token: typing.Union[bytes, str] 

112 ) -> typing.Tuple[int, bytes]: 

113 if not isinstance(token, (str, bytes)): 

114 raise TypeError("token must be bytes or str") 

115 

116 try: 

117 data = base64.urlsafe_b64decode(token) 

118 except (TypeError, binascii.Error): 

119 raise InvalidToken 

120 

121 if not data or data[0] != 0x80: 

122 raise InvalidToken 

123 

124 if len(data) < 9: 

125 raise InvalidToken 

126 

127 timestamp = int.from_bytes(data[1:9], byteorder="big") 

128 return timestamp, data 

129 

130 def _verify_signature(self, data: bytes) -> None: 

131 h = HMAC(self._signing_key, hashes.SHA256()) 

132 h.update(data[:-32]) 

133 try: 

134 h.verify(data[-32:]) 

135 except InvalidSignature: 

136 raise InvalidToken 

137 

138 def _decrypt_data( 

139 self, 

140 data: bytes, 

141 timestamp: int, 

142 time_info: typing.Optional[typing.Tuple[int, int]], 

143 ) -> bytes: 

144 if time_info is not None: 

145 ttl, current_time = time_info 

146 if timestamp + ttl < current_time: 

147 raise InvalidToken 

148 

149 if current_time + _MAX_CLOCK_SKEW < timestamp: 

150 raise InvalidToken 

151 

152 self._verify_signature(data) 

153 

154 iv = data[9:25] 

155 ciphertext = data[25:-32] 

156 decryptor = Cipher( 

157 algorithms.AES(self._encryption_key), modes.CBC(iv) 

158 ).decryptor() 

159 plaintext_padded = decryptor.update(ciphertext) 

160 try: 

161 plaintext_padded += decryptor.finalize() 

162 except ValueError: 

163 raise InvalidToken 

164 unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() 

165 

166 unpadded = unpadder.update(plaintext_padded) 

167 try: 

168 unpadded += unpadder.finalize() 

169 except ValueError: 

170 raise InvalidToken 

171 return unpadded 

172 

173 

174class MultiFernet: 

175 def __init__(self, fernets: typing.Iterable[Fernet]): 

176 fernets = list(fernets) 

177 if not fernets: 

178 raise ValueError( 

179 "MultiFernet requires at least one Fernet instance" 

180 ) 

181 self._fernets = fernets 

182 

183 def encrypt(self, msg: bytes) -> bytes: 

184 return self.encrypt_at_time(msg, int(time.time())) 

185 

186 def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes: 

187 return self._fernets[0].encrypt_at_time(msg, current_time) 

188 

189 def rotate(self, msg: typing.Union[bytes, str]) -> bytes: 

190 timestamp, data = Fernet._get_unverified_token_data(msg) 

191 for f in self._fernets: 

192 try: 

193 p = f._decrypt_data(data, timestamp, None) 

194 break 

195 except InvalidToken: 

196 pass 

197 else: 

198 raise InvalidToken 

199 

200 iv = os.urandom(16) 

201 return self._fernets[0]._encrypt_from_parts(p, timestamp, iv) 

202 

203 def decrypt( 

204 self, msg: typing.Union[bytes, str], ttl: typing.Optional[int] = None 

205 ) -> bytes: 

206 for f in self._fernets: 

207 try: 

208 return f.decrypt(msg, ttl) 

209 except InvalidToken: 

210 pass 

211 raise InvalidToken 

212 

213 def decrypt_at_time( 

214 self, msg: typing.Union[bytes, str], ttl: int, current_time: int 

215 ) -> bytes: 

216 for f in self._fernets: 

217 try: 

218 return f.decrypt_at_time(msg, ttl, current_time) 

219 except InvalidToken: 

220 pass 

221 raise InvalidToken