Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/fernet.py: 60%

137 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 07:26 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import base64 

8import binascii 

9import os 

10import time 

11import typing 

12 

13from cryptography import utils 

14from cryptography.exceptions import InvalidSignature 

15from cryptography.hazmat.primitives import hashes, padding 

16from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

17from cryptography.hazmat.primitives.hmac import HMAC 

18 

19 

20class InvalidToken(Exception): 

21 pass 

22 

23 

24_MAX_CLOCK_SKEW = 60 

25 

26 

27class Fernet: 

28 def __init__( 

29 self, 

30 key: bytes | str, 

31 backend: typing.Any = None, 

32 ) -> None: 

33 try: 

34 key = base64.urlsafe_b64decode(key) 

35 except binascii.Error as exc: 

36 raise ValueError( 

37 "Fernet key must be 32 url-safe base64-encoded bytes." 

38 ) from exc 

39 if len(key) != 32: 

40 raise ValueError( 

41 "Fernet key must be 32 url-safe base64-encoded bytes." 

42 ) 

43 

44 self._signing_key = key[:16] 

45 self._encryption_key = key[16:] 

46 

47 @classmethod 

48 def generate_key(cls) -> bytes: 

49 return base64.urlsafe_b64encode(os.urandom(32)) 

50 

51 def encrypt(self, data: bytes) -> bytes: 

52 return self.encrypt_at_time(data, int(time.time())) 

53 

54 def encrypt_at_time(self, data: bytes, current_time: int) -> bytes: 

55 iv = os.urandom(16) 

56 return self._encrypt_from_parts(data, current_time, iv) 

57 

58 def _encrypt_from_parts( 

59 self, data: bytes, current_time: int, iv: bytes 

60 ) -> bytes: 

61 utils._check_bytes("data", data) 

62 

63 padder = padding.PKCS7(algorithms.AES.block_size).padder() 

64 padded_data = padder.update(data) + padder.finalize() 

65 encryptor = Cipher( 

66 algorithms.AES(self._encryption_key), 

67 modes.CBC(iv), 

68 ).encryptor() 

69 ciphertext = encryptor.update(padded_data) + encryptor.finalize() 

70 

71 basic_parts = ( 

72 b"\x80" 

73 + current_time.to_bytes(length=8, byteorder="big") 

74 + iv 

75 + ciphertext 

76 ) 

77 

78 h = HMAC(self._signing_key, hashes.SHA256()) 

79 h.update(basic_parts) 

80 hmac = h.finalize() 

81 return base64.urlsafe_b64encode(basic_parts + hmac) 

82 

83 def decrypt(self, token: bytes | str, ttl: int | None = None) -> bytes: 

84 timestamp, data = Fernet._get_unverified_token_data(token) 

85 if ttl is None: 

86 time_info = None 

87 else: 

88 time_info = (ttl, int(time.time())) 

89 return self._decrypt_data(data, timestamp, time_info) 

90 

91 def decrypt_at_time( 

92 self, token: bytes | str, ttl: int, current_time: int 

93 ) -> bytes: 

94 if ttl is None: 

95 raise ValueError( 

96 "decrypt_at_time() can only be used with a non-None ttl" 

97 ) 

98 timestamp, data = Fernet._get_unverified_token_data(token) 

99 return self._decrypt_data(data, timestamp, (ttl, current_time)) 

100 

101 def extract_timestamp(self, token: bytes | str) -> int: 

102 timestamp, data = Fernet._get_unverified_token_data(token) 

103 # Verify the token was not tampered with. 

104 self._verify_signature(data) 

105 return timestamp 

106 

107 @staticmethod 

108 def _get_unverified_token_data(token: bytes | str) -> tuple[int, bytes]: 

109 if not isinstance(token, (str, bytes)): 

110 raise TypeError("token must be bytes or str") 

111 

112 try: 

113 data = base64.urlsafe_b64decode(token) 

114 except (TypeError, binascii.Error): 

115 raise InvalidToken 

116 

117 if not data or data[0] != 0x80: 

118 raise InvalidToken 

119 

120 if len(data) < 9: 

121 raise InvalidToken 

122 

123 timestamp = int.from_bytes(data[1:9], byteorder="big") 

124 return timestamp, data 

125 

126 def _verify_signature(self, data: bytes) -> None: 

127 h = HMAC(self._signing_key, hashes.SHA256()) 

128 h.update(data[:-32]) 

129 try: 

130 h.verify(data[-32:]) 

131 except InvalidSignature: 

132 raise InvalidToken 

133 

134 def _decrypt_data( 

135 self, 

136 data: bytes, 

137 timestamp: int, 

138 time_info: tuple[int, int] | None, 

139 ) -> bytes: 

140 if time_info is not None: 

141 ttl, current_time = time_info 

142 if timestamp + ttl < current_time: 

143 raise InvalidToken 

144 

145 if current_time + _MAX_CLOCK_SKEW < timestamp: 

146 raise InvalidToken 

147 

148 self._verify_signature(data) 

149 

150 iv = data[9:25] 

151 ciphertext = data[25:-32] 

152 decryptor = Cipher( 

153 algorithms.AES(self._encryption_key), modes.CBC(iv) 

154 ).decryptor() 

155 plaintext_padded = decryptor.update(ciphertext) 

156 try: 

157 plaintext_padded += decryptor.finalize() 

158 except ValueError: 

159 raise InvalidToken 

160 unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() 

161 

162 unpadded = unpadder.update(plaintext_padded) 

163 try: 

164 unpadded += unpadder.finalize() 

165 except ValueError: 

166 raise InvalidToken 

167 return unpadded 

168 

169 

170class MultiFernet: 

171 def __init__(self, fernets: typing.Iterable[Fernet]): 

172 fernets = list(fernets) 

173 if not fernets: 

174 raise ValueError( 

175 "MultiFernet requires at least one Fernet instance" 

176 ) 

177 self._fernets = fernets 

178 

179 def encrypt(self, msg: bytes) -> bytes: 

180 return self.encrypt_at_time(msg, int(time.time())) 

181 

182 def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes: 

183 return self._fernets[0].encrypt_at_time(msg, current_time) 

184 

185 def rotate(self, msg: bytes | str) -> bytes: 

186 timestamp, data = Fernet._get_unverified_token_data(msg) 

187 for f in self._fernets: 

188 try: 

189 p = f._decrypt_data(data, timestamp, None) 

190 break 

191 except InvalidToken: 

192 pass 

193 else: 

194 raise InvalidToken 

195 

196 iv = os.urandom(16) 

197 return self._fernets[0]._encrypt_from_parts(p, timestamp, iv) 

198 

199 def decrypt(self, msg: bytes | str, ttl: int | None = None) -> bytes: 

200 for f in self._fernets: 

201 try: 

202 return f.decrypt(msg, ttl) 

203 except InvalidToken: 

204 pass 

205 raise InvalidToken 

206 

207 def decrypt_at_time( 

208 self, msg: bytes | str, ttl: int, current_time: int 

209 ) -> bytes: 

210 for f in self._fernets: 

211 try: 

212 return f.decrypt_at_time(msg, ttl, current_time) 

213 except InvalidToken: 

214 pass 

215 raise InvalidToken